HDFS-16526. Add metrics for slow DataNode (#4162)

This commit is contained in:
Renukaprasad C 2022-04-15 21:37:05 +05:30 committed by GitHub
parent 8ea3358380
commit f14f305051
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
4 changed files with 70 additions and 1 deletions

View File

@ -481,7 +481,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i
| `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) | | `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) | | `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) |
| `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) | | `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) |
| `slowFlushOrSyncCount` | Total number of packets whose sync/flush takes more than a certain time (300ms by default) |
| `slowAckToUpstreamCount` | Total number of packets whose upstream ack takes more than a certain time (300ms by default) |
FsVolume FsVolume
-------- --------

View File

@ -412,6 +412,7 @@ boolean packetSentInTime() {
void flushOrSync(boolean isSync, long seqno) throws IOException { void flushOrSync(boolean isSync, long seqno) throws IOException {
long flushTotalNanos = 0; long flushTotalNanos = 0;
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
DataNodeFaultInjector.get().delay();
if (checksumOut != null) { if (checksumOut != null) {
long flushStartNanos = System.nanoTime(); long flushStartNanos = System.nanoTime();
checksumOut.flush(); checksumOut.flush();
@ -445,6 +446,7 @@ void flushOrSync(boolean isSync, long seqno) throws IOException {
} }
long duration = Time.monotonicNow() - begin; long duration = Time.monotonicNow() - begin;
if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) {
datanode.metrics.incrSlowFlushOrSyncCount();
LOG.warn("Slow flushOrSync took " + duration + "ms (threshold=" LOG.warn("Slow flushOrSync took " + duration + "ms (threshold="
+ datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos=" + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos="
+ flushTotalNanos + "ns, volume=" + getVolumeBaseUri() + flushTotalNanos + "ns, volume=" + getVolumeBaseUri()
@ -1656,6 +1658,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
} }
// send my ack back to upstream datanode // send my ack back to upstream datanode
long begin = Time.monotonicNow(); long begin = Time.monotonicNow();
DataNodeFaultInjector.get().delay();
/* for test only, no-op in production system */ /* for test only, no-op in production system */
DataNodeFaultInjector.get().delaySendingAckToUpstream(inAddr); DataNodeFaultInjector.get().delaySendingAckToUpstream(inAddr);
replyAck.write(upstreamOut); replyAck.write(upstreamOut);
@ -1665,6 +1668,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno,
inAddr, inAddr,
duration); duration);
if (duration > datanodeSlowLogThresholdMs) { if (duration > datanodeSlowLogThresholdMs) {
datanode.metrics.incrSlowAckToUpstreamCount();
LOG.warn("Slow PacketResponder send ack to upstream took " + duration LOG.warn("Slow PacketResponder send ack to upstream took " + duration
+ "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString
+ ", replyAck=" + replyAck + ", replyAck=" + replyAck

View File

@ -191,6 +191,8 @@ public class DataNodeMetrics {
@Metric MutableCounterLong packetsSlowWriteToMirror; @Metric MutableCounterLong packetsSlowWriteToMirror;
@Metric MutableCounterLong packetsSlowWriteToDisk; @Metric MutableCounterLong packetsSlowWriteToDisk;
@Metric MutableCounterLong packetsSlowWriteToOsCache; @Metric MutableCounterLong packetsSlowWriteToOsCache;
@Metric private MutableCounterLong slowFlushOrSyncCount;
@Metric private MutableCounterLong slowAckToUpstreamCount;
@Metric("Number of replaceBlock ops between" + @Metric("Number of replaceBlock ops between" +
" storage types on same host with local copy") " storage types on same host with local copy")
@ -440,6 +442,14 @@ public void incrVolumeFailures(int size) {
volumeFailures.incr(size); volumeFailures.incr(size);
} }
public void incrSlowFlushOrSyncCount() {
slowFlushOrSyncCount.incr();
}
public void incrSlowAckToUpstreamCount() {
slowAckToUpstreamCount.incr();
}
public void incrDatanodeNetworkErrors() { public void incrDatanodeNetworkErrors() {
datanodeNetworkErrors.incr(); datanodeNetworkErrors.incr();
} }

View File

@ -603,6 +603,60 @@ public void testNNRpcMetricsWithNonHA() throws IOException {
MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name());
assertCounter("HeartbeatsNumOps", 1L, rb); assertCounter("HeartbeatsNumOps", 1L, rb);
} }
@Test(timeout = 60000)
public void testSlowMetrics() throws Exception {
DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() {
@Override public void delay() {
try {
Thread.sleep(310);
} catch (InterruptedException e) {
}
}
};
DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get();
DataNodeFaultInjector.set(dnFaultInjector);
Configuration conf = new HdfsConfiguration();
MiniDFSCluster cluster = null;
try {
cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build();
final FileSystem fs = cluster.getFileSystem();
List<DataNode> datanodes = cluster.getDataNodes();
assertEquals(datanodes.size(), 3);
final DataNode datanode = datanodes.get(0);
MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name());
final long longFileLen = 10;
final long startFlushOrSyncValue =
getLongCounter("SlowFlushOrSyncCount", rb);
final long startAckToUpstreamValue =
getLongCounter("SlowAckToUpstreamCount", rb);
final AtomicInteger x = new AtomicInteger(0);
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override public Boolean get() {
x.getAndIncrement();
try {
DFSTestUtil
.createFile(fs, new Path("/time.txt." + x.get()), longFileLen,
(short) 3, Time.monotonicNow());
} catch (IOException ioe) {
LOG.error("Caught IOException while ingesting DN metrics", ioe);
return false;
}
MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name());
final long endFlushOrSyncValue = getLongCounter("SlowFlushOrSyncCount", rbNew);
final long endAckToUpstreamValue = getLongCounter("SlowAckToUpstreamCount", rbNew);
return endFlushOrSyncValue > startFlushOrSyncValue
&& endAckToUpstreamValue > startAckToUpstreamValue;
}
}, 30, 30000);
} finally {
DataNodeFaultInjector.set(oldDnInjector);
if (cluster != null) {
cluster.shutdown();
}
}
}
@Test @Test
public void testNNRpcMetricsWithHA() throws IOException { public void testNNRpcMetricsWithHA() throws IOException {