HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P. McCabe via yliu)

This commit is contained in:
yliu 2015-01-16 00:23:51 +08:00
parent 3ab3a64988
commit c4ccbe62c0
8 changed files with 479 additions and 138 deletions

View File

@ -517,6 +517,9 @@ Release 2.7.0 - UNRELEASED
HDFS-7457. DatanodeID generates excessive garbage. (daryn via kihwal)
HDFS-7189. Add trace spans for DFSClient metadata operations. (Colin P.
McCabe via yliu)
OPTIMIZATIONS
HDFS-7454. Reduce memory footprint for AclEntries in NameNode.

View File

@ -51,6 +51,10 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import org.htrace.Sampler;
import org.htrace.Span;
import org.htrace.Trace;
import org.htrace.TraceScope;
@InterfaceAudience.Private
@InterfaceStability.Unstable
@ -71,7 +75,7 @@ class BlockStorageLocationUtil {
*/
private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallables(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int timeout, boolean connectToDnViaHostname) {
int timeout, boolean connectToDnViaHostname, Span parent) {
if (datanodeBlocks.isEmpty()) {
return Lists.newArrayList();
@ -111,7 +115,7 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
}
VolumeBlockLocationCallable callable = new VolumeBlockLocationCallable(
conf, datanode, poolId, blockIds, dnTokens, timeout,
connectToDnViaHostname);
connectToDnViaHostname, parent);
callables.add(callable);
}
return callables;
@ -131,11 +135,11 @@ private static List<VolumeBlockLocationCallable> createVolumeBlockLocationCallab
static Map<DatanodeInfo, HdfsBlocksMetadata> queryDatanodesForHdfsBlocksMetadata(
Configuration conf, Map<DatanodeInfo, List<LocatedBlock>> datanodeBlocks,
int poolsize, int timeoutMs, boolean connectToDnViaHostname)
throws InvalidBlockTokenException {
throws InvalidBlockTokenException {
List<VolumeBlockLocationCallable> callables =
createVolumeBlockLocationCallables(conf, datanodeBlocks, timeoutMs,
connectToDnViaHostname);
connectToDnViaHostname, Trace.currentSpan());
// Use a thread pool to execute the Callables in parallel
List<Future<HdfsBlocksMetadata>> futures =
@ -319,11 +323,12 @@ private static class VolumeBlockLocationCallable implements
private final long[] blockIds;
private final List<Token<BlockTokenIdentifier>> dnTokens;
private final boolean connectToDnViaHostname;
private final Span parentSpan;
VolumeBlockLocationCallable(Configuration configuration,
DatanodeInfo datanode, String poolId, long []blockIds,
List<Token<BlockTokenIdentifier>> dnTokens, int timeout,
boolean connectToDnViaHostname) {
boolean connectToDnViaHostname, Span parentSpan) {
this.configuration = configuration;
this.timeout = timeout;
this.datanode = datanode;
@ -331,6 +336,7 @@ private static class VolumeBlockLocationCallable implements
this.blockIds = blockIds;
this.dnTokens = dnTokens;
this.connectToDnViaHostname = connectToDnViaHostname;
this.parentSpan = parentSpan;
}
public DatanodeInfo getDatanodeInfo() {
@ -342,6 +348,8 @@ public HdfsBlocksMetadata call() throws Exception {
HdfsBlocksMetadata metadata = null;
// Create the RPC proxy and make the RPC
ClientDatanodeProtocol cdp = null;
TraceScope scope =
Trace.startSpan("getHdfsBlocksMetadata", parentSpan);
try {
cdp = DFSUtil.createClientDatanodeProtocolProxy(datanode, configuration,
timeout, connectToDnViaHostname);
@ -350,6 +358,7 @@ public HdfsBlocksMetadata call() throws Exception {
// Bubble this up to the caller, handle with the Future
throw e;
} finally {
scope.close();
if (cdp != null) {
RPC.stopProxy(cdp);
}

View File

@ -26,6 +26,9 @@
import org.apache.hadoop.hdfs.inotify.MissingEventsException;
import org.apache.hadoop.hdfs.protocol.ClientProtocol;
import org.apache.hadoop.util.Time;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -44,6 +47,11 @@ public class DFSInotifyEventInputStream {
public static Logger LOG = LoggerFactory.getLogger(DFSInotifyEventInputStream
.class);
/**
* The trace sampler to use when making RPCs to the NameNode.
*/
private final Sampler<?> traceSampler;
private final ClientProtocol namenode;
private Iterator<EventBatch> it;
private long lastReadTxid;
@ -59,12 +67,15 @@ public class DFSInotifyEventInputStream {
private static final int INITIAL_WAIT_MS = 10;
DFSInotifyEventInputStream(ClientProtocol namenode) throws IOException {
this(namenode, namenode.getCurrentEditLogTxid()); // only consider new txn's
DFSInotifyEventInputStream(Sampler<?> traceSampler, ClientProtocol namenode)
throws IOException {
// Only consider new transaction IDs.
this(traceSampler, namenode, namenode.getCurrentEditLogTxid());
}
DFSInotifyEventInputStream(ClientProtocol namenode, long lastReadTxid)
throws IOException {
DFSInotifyEventInputStream(Sampler traceSampler, ClientProtocol namenode,
long lastReadTxid) throws IOException {
this.traceSampler = traceSampler;
this.namenode = namenode;
this.it = Iterators.emptyIterator();
this.lastReadTxid = lastReadTxid;
@ -87,39 +98,45 @@ public class DFSInotifyEventInputStream {
* The next available batch of events will be returned.
*/
public EventBatch poll() throws IOException, MissingEventsException {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
lastReadTxid = namenode.getCurrentEditLogTxid();
return null;
}
if (!it.hasNext()) {
EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
throw new MissingEventsException(formerLastReadTxid + 1,
el.getFirstTxid());
}
} else {
LOG.debug("poll(): read no edits from the NN when requesting edits " +
"after txid {}", lastReadTxid);
TraceScope scope =
Trace.startSpan("inotifyPoll", traceSampler);
try {
// need to keep retrying until the NN sends us the latest committed txid
if (lastReadTxid == -1) {
LOG.debug("poll(): lastReadTxid is -1, reading current txid from NN");
lastReadTxid = namenode.getCurrentEditLogTxid();
return null;
}
}
if (!it.hasNext()) {
EventBatchList el = namenode.getEditsFromTxid(lastReadTxid + 1);
if (el.getLastTxid() != -1) {
// we only want to set syncTxid when we were actually able to read some
// edits on the NN -- otherwise it will seem like edits are being
// generated faster than we can read them when the problem is really
// that we are temporarily unable to read edits
syncTxid = el.getSyncTxid();
it = el.getBatches().iterator();
long formerLastReadTxid = lastReadTxid;
lastReadTxid = el.getLastTxid();
if (el.getFirstTxid() != formerLastReadTxid + 1) {
throw new MissingEventsException(formerLastReadTxid + 1,
el.getFirstTxid());
}
} else {
LOG.debug("poll(): read no edits from the NN when requesting edits " +
"after txid {}", lastReadTxid);
return null;
}
}
if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
// newly seen edit log ops actually got converted to events
return it.next();
} else {
return null;
if (it.hasNext()) { // can be empty if el.getLastTxid != -1 but none of the
// newly seen edit log ops actually got converted to events
return it.next();
} else {
return null;
}
} finally {
scope.close();
}
}
@ -163,25 +180,29 @@ public long getTxidsBehindEstimate() {
*/
public EventBatch poll(long time, TimeUnit tu) throws IOException,
InterruptedException, MissingEventsException {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
TraceScope scope = Trace.startSpan("inotifyPollWithTimeout", traceSampler);
EventBatch next = null;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
LOG.debug("timed poll(): timed out");
break;
} else if (timeLeft < nextWait * 2) {
nextWait = timeLeft;
} else {
nextWait *= 2;
try {
long initialTime = Time.monotonicNow();
long totalWait = TimeUnit.MILLISECONDS.convert(time, tu);
long nextWait = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
long timeLeft = totalWait - (Time.monotonicNow() - initialTime);
if (timeLeft <= 0) {
LOG.debug("timed poll(): timed out");
break;
} else if (timeLeft < nextWait * 2) {
nextWait = timeLeft;
} else {
nextWait *= 2;
}
LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
nextWait);
Thread.sleep(nextWait);
}
LOG.debug("timed poll(): poll() returned null, sleeping for {} ms",
nextWait);
Thread.sleep(nextWait);
} finally {
scope.close();
}
return next;
}
@ -196,18 +217,23 @@ public EventBatch poll(long time, TimeUnit tu) throws IOException,
*/
public EventBatch take() throws IOException, InterruptedException,
MissingEventsException {
TraceScope scope = Trace.startSpan("inotifyTake", traceSampler);
EventBatch next = null;
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2
// to avoid stampedes at the NN if there are multiple clients
int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
Thread.sleep(sleepTime);
// the maximum sleep is 2 minutes
nextWaitMin = Math.min(60000, nextWaitMin * 2);
try {
int nextWaitMin = INITIAL_WAIT_MS;
while ((next = poll()) == null) {
// sleep for a random period between nextWaitMin and nextWaitMin * 2
// to avoid stampedes at the NN if there are multiple clients
int sleepTime = nextWaitMin + rng.nextInt(nextWaitMin);
LOG.debug("take(): poll() returned null, sleeping for {} ms", sleepTime);
Thread.sleep(sleepTime);
// the maximum sleep is 2 minutes
nextWaitMin = Math.min(60000, nextWaitMin * 2);
}
} finally {
scope.close();
}
return next;
}
}
}

View File

@ -27,6 +27,9 @@
import org.apache.hadoop.ipc.RemoteException;
import com.google.common.base.Preconditions;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/**
* CacheDirectiveIterator is a remote iterator that iterates cache directives.
@ -39,12 +42,14 @@ public class CacheDirectiveIterator
private CacheDirectiveInfo filter;
private final ClientProtocol namenode;
private final Sampler<?> traceSampler;
public CacheDirectiveIterator(ClientProtocol namenode,
CacheDirectiveInfo filter) {
CacheDirectiveInfo filter, Sampler<?> traceSampler) {
super(0L);
this.namenode = namenode;
this.filter = filter;
this.traceSampler = traceSampler;
}
private static CacheDirectiveInfo removeIdFromFilter(CacheDirectiveInfo filter) {
@ -89,6 +94,7 @@ public boolean hasMore() {
public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
throws IOException {
BatchedEntries<CacheDirectiveEntry> entries = null;
TraceScope scope = Trace.startSpan("listCacheDirectives", traceSampler);
try {
entries = namenode.listCacheDirectives(prevKey, filter);
} catch (IOException e) {
@ -110,6 +116,8 @@ public BatchedEntries<CacheDirectiveEntry> makeRequest(Long prevKey)
"Did not find requested id " + id);
}
throw e;
} finally {
scope.close();
}
Preconditions.checkNotNull(entries);
return entries;

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/**
* CachePoolIterator is a remote iterator that iterates cache pools.
@ -34,16 +37,23 @@ public class CachePoolIterator
extends BatchedRemoteIterator<String, CachePoolEntry> {
private final ClientProtocol namenode;
private final Sampler traceSampler;
public CachePoolIterator(ClientProtocol namenode) {
public CachePoolIterator(ClientProtocol namenode, Sampler traceSampler) {
super("");
this.namenode = namenode;
this.traceSampler = traceSampler;
}
@Override
public BatchedEntries<CachePoolEntry> makeRequest(String prevKey)
throws IOException {
return namenode.listCachePools(prevKey);
TraceScope scope = Trace.startSpan("listCachePools", traceSampler);
try {
return namenode.listCachePools(prevKey);
} finally {
scope.close();
}
}
@Override

View File

@ -23,6 +23,9 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.fs.BatchedRemoteIterator;
import org.htrace.Sampler;
import org.htrace.Trace;
import org.htrace.TraceScope;
/**
* EncryptionZoneIterator is a remote iterator that iterates over encryption
@ -34,16 +37,24 @@ public class EncryptionZoneIterator
extends BatchedRemoteIterator<Long, EncryptionZone> {
private final ClientProtocol namenode;
private final Sampler<?> traceSampler;
public EncryptionZoneIterator(ClientProtocol namenode) {
public EncryptionZoneIterator(ClientProtocol namenode,
Sampler<?> traceSampler) {
super(Long.valueOf(0));
this.namenode = namenode;
this.traceSampler = traceSampler;
}
@Override
public BatchedEntries<EncryptionZone> makeRequest(Long prevId)
throws IOException {
return namenode.listEncryptionZones(prevId);
TraceScope scope = Trace.startSpan("listEncryptionZones", traceSampler);
try {
return namenode.listEncryptionZones(prevId);
} finally {
scope.close();
}
}
@Override

View File

@ -88,6 +88,7 @@
import org.apache.log4j.Level;
import org.apache.log4j.LogManager;
import org.apache.log4j.Logger;
import org.htrace.Sampler;
import org.junit.After;
import org.junit.Assert;
import org.junit.Before;
@ -908,7 +909,7 @@ public Boolean get() {
// Uncache and check each path in sequence
RemoteIterator<CacheDirectiveEntry> entries =
new CacheDirectiveIterator(nnRpc, null);
new CacheDirectiveIterator(nnRpc, null, Sampler.NEVER);
for (int i=0; i<numFiles; i++) {
CacheDirectiveEntry entry = entries.next();
nnRpc.removeCacheDirective(entry.getInfo().getId());