diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java index 193ed0f71d..aa4d4b9ca0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java @@ -167,7 +167,7 @@ synchronized void replaceScheduledTask(int windows, long interval, } @Override - public void snapshot(MetricsRecordBuilder builder, boolean all) { + public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { if (all || changed()) { for (final Entry> entry : averages.entrySet()) { @@ -179,8 +179,11 @@ public void snapshot(MetricsRecordBuilder builder, boolean all) { long totalCount = 0; for (final SumAndCount sumAndCount : entry.getValue()) { - totalCount += sumAndCount.getCount(); - totalSum += sumAndCount.getSum(); + if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp() + < recordValidityMs) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } } if (totalCount != 0) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 41fb41f480..9ea8a08a76 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -122,10 +122,16 @@ public void testRemoveStaleRecord() throws Exception { GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000); assertEquals(3, rollingAverages.getStats(numSamples).size()); + String json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + for (String peerAddr : peerAddrList) { + assertThat(json, containsString(peerAddr)); + } /* wait for stale report to be removed */ GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000); assertEquals(0, rollingAverages.getStats(numSamples).size()); + json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + assertEquals("{}", json); /* dn can report peer metrics normally when it added back to cluster */ for (String peerAddr : peerAddrList) { @@ -138,6 +144,10 @@ public void testRemoveStaleRecord() throws Exception { GenericTestUtils.waitFor( () -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000); assertEquals(3, rollingAverages.getStats(numSamples).size()); + json = peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + for (String peerAddr : peerAddrList) { + assertThat(json, containsString(peerAddr)); + } } /**