diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java index 207916589f..7795343de3 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java @@ -163,6 +163,7 @@ private synchronized MutableRate addMetricIfNotExists(String name) { MutableRate metric = globalMetrics.get(name); if (metric == null) { metric = new MutableRate(name + typePrefix, name + typePrefix, false); + metric.setUpdateTimeStamp(true); globalMetrics.put(name, metric); } return metric; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java index e6111e36bb..17233629c7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRollingAverages.java @@ -41,6 +41,7 @@ import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; +import org.apache.hadoop.util.Time; import javax.annotation.Nullable; @@ -77,13 +78,26 @@ public class MutableRollingAverages extends MutableMetric implements Closeable { private final String avgInfoDescTemplate; private int numWindows; + /** + * This class maintains sub-sum and sub-total of SampleStat. + */ private static class SumAndCount { private final double sum; private final long count; + private final long snapshotTimeStamp; - SumAndCount(final double sum, final long count) { + /** + * Constructor for {@link SumAndCount}. + * + * @param sum sub-sum in sliding windows + * @param count sub-total in sliding windows + * @param snapshotTimeStamp when is a new SampleStat snapshot. + */ + SumAndCount(final double sum, final long count, + final long snapshotTimeStamp) { this.sum = sum; this.count = count; + this.snapshotTimeStamp = snapshotTimeStamp; } public double getSum() { @@ -93,6 +107,10 @@ public double getSum() { public long getCount() { return count; } + + public long getSnapshotTimeStamp() { + return snapshotTimeStamp; + } } /** @@ -110,6 +128,16 @@ public long getCount() { private static final long WINDOW_SIZE_MS_DEFAULT = 300_000; private static final int NUM_WINDOWS_DEFAULT = 36; + /** + * Time duration after which a record is considered stale. + * {@link MutableRollingAverages} should be time-sensitive, and it should use + * the time window length(i.e. NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT) + * as the valid time to make sure some too old record won't be use to compute + * average. + */ + private long recordValidityMs = + NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT; + /** * Constructor for {@link MutableRollingAverages}. * @param metricValueName @@ -231,7 +259,8 @@ public LinkedBlockingDeque apply(String k) { }); final SumAndCount sumAndCount = new SumAndCount( rate.lastStat().total(), - rate.lastStat().numSamples()); + rate.lastStat().numSamples(), + rate.getSnapshotTimeStamp()); /* put newest sum and count to the end */ if (!deque.offerLast(sumAndCount)) { deque.pollFirst(); @@ -267,8 +296,11 @@ public synchronized Map getStats(long minSamples) { long totalCount = 0; for (final SumAndCount sumAndCount : entry.getValue()) { - totalCount += sumAndCount.getCount(); - totalSum += sumAndCount.getSum(); + if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp() + < recordValidityMs) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } } if (totalCount > minSamples) { @@ -277,4 +309,12 @@ public synchronized Map getStats(long minSamples) { } return stats; } + + /** + * Use for test only. + */ + @VisibleForTesting + public synchronized void setRecordValidityMs(long value) { + this.recordValidityMs = value; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java index 5ef31785a6..e04b4b58ec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableStat.java @@ -24,6 +24,8 @@ import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.util.SampleStat; +import org.apache.hadoop.util.Time; + import static org.apache.hadoop.metrics2.lib.Interns.*; /** @@ -47,7 +49,9 @@ public class MutableStat extends MutableMetric { private final SampleStat prevStat = new SampleStat(); private final SampleStat.MinMax minMax = new SampleStat.MinMax(); private long numSamples = 0; + private long snapshotTimeStamp = 0; private boolean extended = false; + private boolean updateTimeStamp = false; /** * Construct a sample statistics metric @@ -100,6 +104,13 @@ public synchronized void setExtended(boolean extended) { this.extended = extended; } + /** + * Set whether to update the snapshot time or not. + * @param updateTimeStamp enable update stats snapshot timestamp + */ + public synchronized void setUpdateTimeStamp(boolean updateTimeStamp) { + this.updateTimeStamp = updateTimeStamp; + } /** * Add a number of samples and their sum to the running stat * @@ -115,7 +126,7 @@ public synchronized void add(long numSamples, long sum) { } /** - * Add a snapshot to the metric + * Add a snapshot to the metric. * @param value of the metric */ public synchronized void add(long value) { @@ -142,6 +153,9 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { if (numSamples > 0) { intervalStat.copyTo(prevStat); intervalStat.reset(); + if (updateTimeStamp) { + snapshotTimeStamp = Time.monotonicNow(); + } } clearChanged(); } @@ -164,6 +178,12 @@ public void resetMinMax() { minMax.reset(); } + /** + * Return the SampleStat snapshot timestamp + */ + public long getSnapshotTimeStamp() { + return snapshotTimeStamp; + } @Override public String toString() { return lastStat().toString(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 3caf24d83f..41fb41f480 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -17,17 +17,24 @@ */ package org.apache.hadoop.hdfs.server.datanode; +import java.util.ArrayList; +import java.util.List; import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.TimeUnit; -import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.metrics2.lib.MetricsTestHelper; +import org.apache.hadoop.metrics2.lib.MutableRollingAverages; +import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.conf.Configuration; import org.junit.Test; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY; import static org.hamcrest.CoreMatchers.containsString; +import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertThat; /** @@ -43,7 +50,7 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception { final int numOpsPerIteration = 1000; final Configuration conf = new HdfsConfiguration(); - conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); + conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create( "Sample-DataNode", conf); @@ -80,6 +87,59 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception { } } + @Test(timeout = 30000) + public void testRemoveStaleRecord() throws Exception { + final int numWindows = 5; + final long scheduleInterval = 1000; + final int iterations = 3; + final int numSamples = 100; + + final Configuration conf = new HdfsConfiguration(); + conf.setLong(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY, + numSamples); + conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); + + final DataNodePeerMetrics peerMetrics = + DataNodePeerMetrics.create("Sample-DataNode", conf); + MutableRollingAverages rollingAverages = + peerMetrics.getSendPacketDownstreamRollingAverages(); + rollingAverages.setRecordValidityMs(numWindows * scheduleInterval); + MetricsTestHelper.replaceRollingAveragesScheduler(rollingAverages, + numWindows, scheduleInterval, TimeUnit.MILLISECONDS); + + List peerAddrList = new ArrayList<>(); + for (int i = 1; i <= iterations; i++) { + peerAddrList.add(genPeerAddress()); + } + for (String peerAddr : peerAddrList) { + for (int j = 1; j <= numSamples; j++) { + /* simulate to get latency of 1 to 1000 ms */ + final long latency = ThreadLocalRandom.current().nextLong(1, 1000); + peerMetrics.addSendPacketDownstream(peerAddr, latency); + } + } + + GenericTestUtils.waitFor( + () -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000); + assertEquals(3, rollingAverages.getStats(numSamples).size()); + /* wait for stale report to be removed */ + GenericTestUtils.waitFor( + () -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000); + assertEquals(0, rollingAverages.getStats(numSamples).size()); + + /* dn can report peer metrics normally when it added back to cluster */ + for (String peerAddr : peerAddrList) { + for (int j = 1; j <= numSamples; j++) { + /* simulate to get latency of 1 to 1000 ms */ + final long latency = ThreadLocalRandom.current().nextLong(1, 1000); + peerMetrics.addSendPacketDownstream(peerAddr, latency); + } + } + GenericTestUtils.waitFor( + () -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000); + assertEquals(3, rollingAverages.getStats(numSamples).size()); + } + /** * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801]. */