From f14f3050517ca09e5366210b3f1fca90c8feb81a Mon Sep 17 00:00:00 2001 From: Renukaprasad C <48682981+prasad-acit@users.noreply.github.com> Date: Fri, 15 Apr 2022 21:37:05 +0530 Subject: [PATCH] HDFS-16526. Add metrics for slow DataNode (#4162) --- .../src/site/markdown/Metrics.md | 3 +- .../hdfs/server/datanode/BlockReceiver.java | 4 ++ .../datanode/metrics/DataNodeMetrics.java | 10 ++++ .../server/datanode/TestDataNodeMetrics.java | 54 +++++++++++++++++++ 4 files changed, 70 insertions(+), 1 deletion(-) diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index b0fc525776..4b17e15048 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -481,7 +481,8 @@ Each metrics record contains tags such as SessionId and Hostname as additional i | `PacketsSlowWriteToMirror` | Total number of packets whose write to other Datanodes in the pipeline takes more than a certain time (300ms by default) | | `PacketsSlowWriteToDisk` | Total number of packets whose write to disk takes more than a certain time (300ms by default) | | `PacketsSlowWriteToOsCache` | Total number of packets whose write to os cache takes more than a certain time (300ms by default) | - +| `slowFlushOrSyncCount` | Total number of packets whose sync/flush takes more than a certain time (300ms by default) | +| `slowAckToUpstreamCount` | Total number of packets whose upstream ack takes more than a certain time (300ms by default) | FsVolume -------- diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 62bc66080c..9b3a899323 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -412,6 +412,7 @@ boolean packetSentInTime() { void flushOrSync(boolean isSync, long seqno) throws IOException { long flushTotalNanos = 0; long begin = Time.monotonicNow(); + DataNodeFaultInjector.get().delay(); if (checksumOut != null) { long flushStartNanos = System.nanoTime(); checksumOut.flush(); @@ -445,6 +446,7 @@ void flushOrSync(boolean isSync, long seqno) throws IOException { } long duration = Time.monotonicNow() - begin; if (duration > datanodeSlowLogThresholdMs && LOG.isWarnEnabled()) { + datanode.metrics.incrSlowFlushOrSyncCount(); LOG.warn("Slow flushOrSync took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), isSync:" + isSync + ", flushTotalNanos=" + flushTotalNanos + "ns, volume=" + getVolumeBaseUri() @@ -1656,6 +1658,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, } // send my ack back to upstream datanode long begin = Time.monotonicNow(); + DataNodeFaultInjector.get().delay(); /* for test only, no-op in production system */ DataNodeFaultInjector.get().delaySendingAckToUpstream(inAddr); replyAck.write(upstreamOut); @@ -1665,6 +1668,7 @@ private void sendAckUpstreamUnprotected(PipelineAck ack, long seqno, inAddr, duration); if (duration > datanodeSlowLogThresholdMs) { + datanode.metrics.incrSlowAckToUpstreamCount(); LOG.warn("Slow PacketResponder send ack to upstream took " + duration + "ms (threshold=" + datanodeSlowLogThresholdMs + "ms), " + myString + ", replyAck=" + replyAck diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 5203d7bf87..649d30e91e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -191,6 +191,8 @@ public class DataNodeMetrics { @Metric MutableCounterLong packetsSlowWriteToMirror; @Metric MutableCounterLong packetsSlowWriteToDisk; @Metric MutableCounterLong packetsSlowWriteToOsCache; + @Metric private MutableCounterLong slowFlushOrSyncCount; + @Metric private MutableCounterLong slowAckToUpstreamCount; @Metric("Number of replaceBlock ops between" + " storage types on same host with local copy") @@ -440,6 +442,14 @@ public void incrVolumeFailures(int size) { volumeFailures.incr(size); } + public void incrSlowFlushOrSyncCount() { + slowFlushOrSyncCount.incr(); + } + + public void incrSlowAckToUpstreamCount() { + slowAckToUpstreamCount.incr(); + } + public void incrDatanodeNetworkErrors() { datanodeNetworkErrors.incr(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index f8c89d4abe..2bf7861287 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -603,6 +603,60 @@ public void testNNRpcMetricsWithNonHA() throws IOException { MetricsRecordBuilder rb = getMetrics(dn.getMetrics().name()); assertCounter("HeartbeatsNumOps", 1L, rb); } + @Test(timeout = 60000) + public void testSlowMetrics() throws Exception { + DataNodeFaultInjector dnFaultInjector = new DataNodeFaultInjector() { + @Override public void delay() { + try { + Thread.sleep(310); + } catch (InterruptedException e) { + } + } + }; + DataNodeFaultInjector oldDnInjector = DataNodeFaultInjector.get(); + DataNodeFaultInjector.set(dnFaultInjector); + + Configuration conf = new HdfsConfiguration(); + MiniDFSCluster cluster = null; + try { + cluster = new MiniDFSCluster.Builder(conf).numDataNodes(3).build(); + final FileSystem fs = cluster.getFileSystem(); + List datanodes = cluster.getDataNodes(); + assertEquals(datanodes.size(), 3); + final DataNode datanode = datanodes.get(0); + MetricsRecordBuilder rb = getMetrics(datanode.getMetrics().name()); + final long longFileLen = 10; + final long startFlushOrSyncValue = + getLongCounter("SlowFlushOrSyncCount", rb); + final long startAckToUpstreamValue = + getLongCounter("SlowAckToUpstreamCount", rb); + final AtomicInteger x = new AtomicInteger(0); + + GenericTestUtils.waitFor(new Supplier() { + @Override public Boolean get() { + x.getAndIncrement(); + try { + DFSTestUtil + .createFile(fs, new Path("/time.txt." + x.get()), longFileLen, + (short) 3, Time.monotonicNow()); + } catch (IOException ioe) { + LOG.error("Caught IOException while ingesting DN metrics", ioe); + return false; + } + MetricsRecordBuilder rbNew = getMetrics(datanode.getMetrics().name()); + final long endFlushOrSyncValue = getLongCounter("SlowFlushOrSyncCount", rbNew); + final long endAckToUpstreamValue = getLongCounter("SlowAckToUpstreamCount", rbNew); + return endFlushOrSyncValue > startFlushOrSyncValue + && endAckToUpstreamValue > startAckToUpstreamValue; + } + }, 30, 30000); + } finally { + DataNodeFaultInjector.set(oldDnInjector); + if (cluster != null) { + cluster.shutdown(); + } + } + } @Test public void testNNRpcMetricsWithHA() throws IOException {