From 91af256a5b44925e5dfdf333293251a19685ba2a Mon Sep 17 00:00:00 2001 From: huhaiyang Date: Wed, 17 Nov 2021 22:41:06 +0800 Subject: [PATCH] HADOOP-17995. Stale record should be remove when DataNodePeerMetrics#dumpSendPacketDownstreamAvgInfoAsJson (#3630) --- .../hadoop/metrics2/lib/MutableRollingAverages.java | 7 +++++-- .../hdfs/server/datanode/TestDataNodePeerMetrics.java | 10 ++++++++++ 2 files changed, 15 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java index 193ed0f71d..3217add651 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java @@ -179,8 +179,11 @@ public void snapshot(MetricsRecordBuilder builder, boolean all) { long totalCount = 0; for (final SumAndCount sumAndCount : entry.getValue()) { - totalCount += sumAndCount.getCount(); - totalSum += sumAndCount.getSum(); + if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp() + < recordValidityMs) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } } if (totalCount != 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 41fb41f480..9ea8a08a76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -122,10 +122,16 @@ public void testRemoveStaleRecord() throws Exception { GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000); assertEquals(3, rollingAverages.getStats(numSamples).size()); + String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + for (String peerAddr : peerAddrList) { + assertThat(json, containsString(peerAddr)); + } /* wait for stale report to be removed */ GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000); assertEquals(0, rollingAverages.getStats(numSamples).size()); + json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + assertEquals("{}", json); /* dn can report peer metrics normally when it added back to cluster */ for (String peerAddr : peerAddrList) { @@ -138,6 +144,10 @@ public void testRemoveStaleRecord() throws Exception { GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000); assertEquals(3, rollingAverages.getStats(numSamples).size()); + json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + for (String peerAddr : peerAddrList) { + assertThat(json, containsString(peerAddr)); + } } /**