HDFS-15754. Add DataNode packet metrics (#2578)
Contributed by Fengnan Li.
This commit is contained in:
parent
1b1791075a
commit
87bd4d2aca
@ -469,6 +469,10 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
|
||||
| `CheckAndUpdateOpAvgTime` | Average time of check and update operations in milliseconds |
|
||||
| `UpdateReplicaUnderRecoveryOpNumOps` | Total number of update replica under recovery operations |
|
||||
| `UpdateReplicaUnderRecoveryOpAvgTime` | Average time of update replica under recovery operations in milliseconds |
|
||||
| `PacketsReceived` | Total number of packets received by Datanode (excluding heartbeat packet from client) |
|
||||
| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
|
||||
| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
|
||||
| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |
|
||||
|
||||
FsVolume
|
||||
--------
|
||||
|
@ -586,6 +586,7 @@ private int receivePacket() throws IOException {
|
||||
return 0;
|
||||
}
|
||||
|
||||
datanode.metrics.incrPacketsReceived();
|
||||
//First write the packet to the mirror:
|
||||
if (mirrorOut != null && !mirrorError) {
|
||||
try {
|
||||
@ -601,12 +602,15 @@ private int receivePacket() throws IOException {
|
||||
mirrorAddr,
|
||||
duration);
|
||||
trackSendPacketToLastNodeInPipeline(duration);
|
||||
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow BlockReceiver write packet to mirror took " + duration
|
||||
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
||||
+ "downstream DNs=" + Arrays.toString(downstreamDNs)
|
||||
+ ", blockId=" + replicaInfo.getBlockId()
|
||||
+ ", seqno=" + seqno);
|
||||
if (duration > datanodeSlowLogThresholdMs) {
|
||||
datanode.metrics.incrPacketsSlowWriteToMirror();
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow BlockReceiver write packet to mirror took {}ms " +
|
||||
"(threshold={}ms), downstream DNs={}, blockId={}, seqno={}",
|
||||
duration, datanodeSlowLogThresholdMs,
|
||||
Arrays.toString(downstreamDNs), replicaInfo.getBlockId(),
|
||||
seqno);
|
||||
}
|
||||
}
|
||||
} catch (IOException e) {
|
||||
handleMirrorOutError(e);
|
||||
@ -736,13 +740,17 @@ private int receivePacket() throws IOException {
|
||||
long begin = Time.monotonicNow();
|
||||
streams.writeDataToDisk(dataBuf.array(),
|
||||
startByteToDisk, numBytesToDisk);
|
||||
// no-op in prod
|
||||
DataNodeFaultInjector.get().delayWriteToDisk();
|
||||
long duration = Time.monotonicNow() - begin;
|
||||
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow BlockReceiver write data to disk cost:" + duration
|
||||
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), "
|
||||
+ "volume=" + getVolumeBaseUri()
|
||||
+ ", blockId=" + replicaInfo.getBlockId()
|
||||
+ ", seqno=" + seqno);
|
||||
if (duration > datanodeSlowLogThresholdMs) {
|
||||
datanode.metrics.incrPacketsSlowWriteToDisk();
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow BlockReceiver write data to disk cost: {}ms " +
|
||||
"(threshold={}ms), volume={}, blockId={}, seqno={}",
|
||||
duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
|
||||
replicaInfo.getBlockId(), seqno);
|
||||
}
|
||||
}
|
||||
|
||||
if (duration > maxWriteToDiskMs) {
|
||||
@ -930,13 +938,17 @@ private void manageWriterOsCache(long offsetInBlock, long seqno) {
|
||||
POSIX_FADV_DONTNEED);
|
||||
}
|
||||
lastCacheManagementOffset = offsetInBlock;
|
||||
// For testing. Normally no-op.
|
||||
DataNodeFaultInjector.get().delayWriteToOsCache();
|
||||
long duration = Time.monotonicNow() - begin;
|
||||
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow manageWriterOsCache took " + duration
|
||||
+ "ms (threshold=" + datanodeSlowLogThresholdMs
|
||||
+ "ms), volume=" + getVolumeBaseUri()
|
||||
+ ", blockId=" + replicaInfo.getBlockId()
|
||||
+ ", seqno=" + seqno);
|
||||
if (duration > datanodeSlowLogThresholdMs) {
|
||||
datanode.metrics.incrPacketsSlowWriteToOsCache();
|
||||
if (LOG.isWarnEnabled()) {
|
||||
LOG.warn("Slow manageWriterOsCache took {}ms " +
|
||||
"(threshold={}ms), volume={}, blockId={}, seqno={}",
|
||||
duration, datanodeSlowLogThresholdMs, getVolumeBaseUri(),
|
||||
replicaInfo.getBlockId(), seqno);
|
||||
}
|
||||
}
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
|
@ -67,6 +67,16 @@ public void delaySendingAckToUpstream(final String upstreamAddr)
|
||||
throws IOException {
|
||||
}
|
||||
|
||||
/**
|
||||
* Used as a hook to delay writing a packet to disk.
|
||||
*/
|
||||
public void delayWriteToDisk() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to delay writing a packet to os cache.
|
||||
*/
|
||||
public void delayWriteToOsCache() {}
|
||||
|
||||
/**
|
||||
* Used as a hook to intercept the latency of sending ack.
|
||||
*/
|
||||
|
@ -183,6 +183,11 @@ public class DataNodeMetrics {
|
||||
@Metric private MutableRate checkAndUpdateOp;
|
||||
@Metric private MutableRate updateReplicaUnderRecoveryOp;
|
||||
|
||||
@Metric MutableCounterLong packetsReceived;
|
||||
@Metric MutableCounterLong packetsSlowWriteToMirror;
|
||||
@Metric MutableCounterLong packetsSlowWriteToDisk;
|
||||
@Metric MutableCounterLong packetsSlowWriteToOsCache;
|
||||
|
||||
final MetricsRegistry registry = new MetricsRegistry("datanode");
|
||||
@Metric("Milliseconds spent on calling NN rpc")
|
||||
private MutableRatesWithAggregation
|
||||
@ -690,4 +695,20 @@ public void addCheckAndUpdateOp(long latency) {
|
||||
public void addUpdateReplicaUnderRecoveryOp(long latency) {
|
||||
updateReplicaUnderRecoveryOp.add(latency);
|
||||
}
|
||||
|
||||
public void incrPacketsReceived() {
|
||||
packetsReceived.incr();
|
||||
}
|
||||
|
||||
public void incrPacketsSlowWriteToMirror() {
|
||||
packetsSlowWriteToMirror.incr();
|
||||
}
|
||||
|
||||
public void incrPacketsSlowWriteToDisk() {
|
||||
packetsSlowWriteToDisk.incr();
|
||||
}
|
||||
|
||||
public void incrPacketsSlowWriteToOsCache() {
|
||||
packetsSlowWriteToOsCache.incr();
|
||||
}
|
||||
}
|
||||
|
@ -64,6 +64,8 @@
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import javax.management.MBeanServer;
|
||||
import javax.management.ObjectName;
|
||||
@ -161,6 +163,53 @@ public void testReceivePacketMetrics() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testReceivePacketSlowMetrics() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
final int interval = 1;
|
||||
conf.setInt(DFSConfigKeys.DFS_METRICS_PERCENTILES_INTERVALS_KEY, interval);
|
||||
MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||
.numDataNodes(3).build();
|
||||
try {
|
||||
cluster.waitActive();
|
||||
DistributedFileSystem fs = cluster.getFileSystem();
|
||||
final DataNodeFaultInjector injector =
|
||||
Mockito.mock(DataNodeFaultInjector.class);
|
||||
Answer answer = new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocationOnMock)
|
||||
throws Throwable {
|
||||
// make the op taking longer time
|
||||
Thread.sleep(1000);
|
||||
return null;
|
||||
}
|
||||
};
|
||||
Mockito.doAnswer(answer).when(injector).
|
||||
stopSendingPacketDownstream(Mockito.anyString());
|
||||
Mockito.doAnswer(answer).when(injector).delayWriteToOsCache();
|
||||
Mockito.doAnswer(answer).when(injector).delayWriteToDisk();
|
||||
DataNodeFaultInjector.set(injector);
|
||||
Path testFile = new Path("/testFlushNanosMetric.txt");
|
||||
FSDataOutputStream fout = fs.create(testFile);
|
||||
fout.write(new byte[1]);
|
||||
fout.hsync();
|
||||
fout.close();
|
||||
List<DataNode> datanodes = cluster.getDataNodes();
|
||||
DataNode datanode = datanodes.get(0);
|
||||
MetricsRecordBuilder dnMetrics = getMetrics(datanode.getMetrics().name());
|
||||
assertTrue("More than 1 packet received",
|
||||
getLongCounter("TotalPacketsReceived", dnMetrics) > 1L);
|
||||
assertTrue("More than 1 slow packet to mirror",
|
||||
getLongCounter("TotalPacketsSlowWriteToMirror", dnMetrics) > 1L);
|
||||
assertCounter("TotalPacketsSlowWriteToDisk", 1L, dnMetrics);
|
||||
assertCounter("TotalPacketsSlowWriteToOsCache", 0L, dnMetrics);
|
||||
} finally {
|
||||
if (cluster != null) {
|
||||
cluster.shutdown();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* HDFS-15242: This function ensures that writing causes some metrics
|
||||
* of FSDatasetImpl to increment.
|
||||
|
Loading…
Reference in New Issue
Block a user