HADOOP-16947. Stale record should be remove when MutableRollingAverages generating aggregate data. Contributed by Haibin Huang.

This commit is contained in:
He Xiaoqiao 2021-01-16 14:06:56 +08:00
parent 56576f080b
commit 26cd02fb29
No known key found for this signature in database
GPG Key ID: A80CC124E9A0FA63
4 changed files with 128 additions and 7 deletions

View File

@ -163,6 +163,7 @@ private synchronized MutableRate addMetricIfNotExists(String name) {
MutableRate metric = globalMetrics.get(name); MutableRate metric = globalMetrics.get(name);
if (metric == null) { if (metric == null) {
metric = new MutableRate(name + typePrefix, name + typePrefix, false); metric = new MutableRate(name + typePrefix, name + typePrefix, false);
metric.setUpdateTimeStamp(true);
globalMetrics.put(name, metric); globalMetrics.put(name, metric);
} }
return metric; return metric;

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions; import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.hadoop.util.Time;
import javax.annotation.Nullable; import javax.annotation.Nullable;
@ -77,13 +78,26 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
private final String avgInfoDescTemplate; private final String avgInfoDescTemplate;
private int numWindows; private int numWindows;
/**
* This class maintains sub-sum and sub-total of SampleStat.
*/
private static class SumAndCount { private static class SumAndCount {
private final double sum; private final double sum;
private final long count; private final long count;
private final long snapshotTimeStamp;
SumAndCount(final double sum, final long count) { /**
* Constructor for {@link SumAndCount}.
*
* @param sum sub-sum in sliding windows
* @param count sub-total in sliding windows
* @param snapshotTimeStamp when is a new SampleStat snapshot.
*/
SumAndCount(final double sum, final long count,
final long snapshotTimeStamp) {
this.sum = sum; this.sum = sum;
this.count = count; this.count = count;
this.snapshotTimeStamp = snapshotTimeStamp;
} }
public double getSum() { public double getSum() {
@ -93,6 +107,10 @@ public double getSum() {
public long getCount() { public long getCount() {
return count; return count;
} }
public long getSnapshotTimeStamp() {
return snapshotTimeStamp;
}
} }
/** /**
@ -110,6 +128,16 @@ public long getCount() {
private static final long WINDOW_SIZE_MS_DEFAULT = 300_000; private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
private static final int NUM_WINDOWS_DEFAULT = 36; private static final int NUM_WINDOWS_DEFAULT = 36;
/**
* Time duration after which a record is considered stale.
* {@link MutableRollingAverages} should be time-sensitive, and it should use
* the time window length(i.e. NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT)
* as the valid time to make sure some too old record won't be use to compute
* average.
*/
private long recordValidityMs =
NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT;
/** /**
* Constructor for {@link MutableRollingAverages}. * Constructor for {@link MutableRollingAverages}.
* @param metricValueName * @param metricValueName
@ -231,7 +259,8 @@ public LinkedBlockingDeque<SumAndCount> apply(String k) {
}); });
final SumAndCount sumAndCount = new SumAndCount( final SumAndCount sumAndCount = new SumAndCount(
rate.lastStat().total(), rate.lastStat().total(),
rate.lastStat().numSamples()); rate.lastStat().numSamples(),
rate.getSnapshotTimeStamp());
/* put newest sum and count to the end */ /* put newest sum and count to the end */
if (!deque.offerLast(sumAndCount)) { if (!deque.offerLast(sumAndCount)) {
deque.pollFirst(); deque.pollFirst();
@ -267,8 +296,11 @@ public synchronized Map<String, Double> getStats(long minSamples) {
long totalCount = 0; long totalCount = 0;
for (final SumAndCount sumAndCount : entry.getValue()) { for (final SumAndCount sumAndCount : entry.getValue()) {
totalCount += sumAndCount.getCount(); if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
totalSum += sumAndCount.getSum(); < recordValidityMs) {
totalCount += sumAndCount.getCount();
totalSum += sumAndCount.getSum();
}
} }
if (totalCount > minSamples) { if (totalCount > minSamples) {
@ -277,4 +309,12 @@ public synchronized Map<String, Double> getStats(long minSamples) {
} }
return stats; return stats;
} }
/**
* Use for test only.
*/
@VisibleForTesting
public synchronized void setRecordValidityMs(long value) {
this.recordValidityMs = value;
}
} }

View File

@ -24,6 +24,8 @@
import org.apache.hadoop.metrics2.MetricsInfo; import org.apache.hadoop.metrics2.MetricsInfo;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.util.SampleStat; import org.apache.hadoop.metrics2.util.SampleStat;
import org.apache.hadoop.util.Time;
import static org.apache.hadoop.metrics2.lib.Interns.*; import static org.apache.hadoop.metrics2.lib.Interns.*;
/** /**
@ -47,7 +49,9 @@ public class MutableStat extends MutableMetric {
private final SampleStat prevStat = new SampleStat(); private final SampleStat prevStat = new SampleStat();
private final SampleStat.MinMax minMax = new SampleStat.MinMax(); private final SampleStat.MinMax minMax = new SampleStat.MinMax();
private long numSamples = 0; private long numSamples = 0;
private long snapshotTimeStamp = 0;
private boolean extended = false; private boolean extended = false;
private boolean updateTimeStamp = false;
/** /**
* Construct a sample statistics metric * Construct a sample statistics metric
@ -100,6 +104,13 @@ public synchronized void setExtended(boolean extended) {
this.extended = extended; this.extended = extended;
} }
/**
* Set whether to update the snapshot time or not.
* @param updateTimeStamp enable update stats snapshot timestamp
*/
public synchronized void setUpdateTimeStamp(boolean updateTimeStamp) {
this.updateTimeStamp = updateTimeStamp;
}
/** /**
* Add a number of samples and their sum to the running stat * Add a number of samples and their sum to the running stat
* *
@ -115,7 +126,7 @@ public synchronized void add(long numSamples, long sum) {
} }
/** /**
* Add a snapshot to the metric * Add a snapshot to the metric.
* @param value of the metric * @param value of the metric
*/ */
public synchronized void add(long value) { public synchronized void add(long value) {
@ -142,6 +153,9 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
if (numSamples > 0) { if (numSamples > 0) {
intervalStat.copyTo(prevStat); intervalStat.copyTo(prevStat);
intervalStat.reset(); intervalStat.reset();
if (updateTimeStamp) {
snapshotTimeStamp = Time.monotonicNow();
}
} }
clearChanged(); clearChanged();
} }
@ -164,6 +178,12 @@ public void resetMinMax() {
minMax.reset(); minMax.reset();
} }
/**
* Return the SampleStat snapshot timestamp
*/
public long getSnapshotTimeStamp() {
return snapshotTimeStamp;
}
@Override @Override
public String toString() { public String toString() {
return lastStat().toString(); return lastStat().toString();

View File

@ -17,17 +17,24 @@
*/ */
package org.apache.hadoop.hdfs.server.datanode; package org.apache.hadoop.hdfs.server.datanode;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper; import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Test; import org.junit.Test;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
import static org.hamcrest.CoreMatchers.containsString; import static org.hamcrest.CoreMatchers.containsString;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat; import static org.junit.Assert.assertThat;
/** /**
@ -43,7 +50,7 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
final int numOpsPerIteration = 1000; final int numOpsPerIteration = 1000;
final Configuration conf = new HdfsConfiguration(); final Configuration conf = new HdfsConfiguration();
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create( final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
"Sample-DataNode", conf); "Sample-DataNode", conf);
@ -80,6 +87,59 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
} }
} }
@Test(timeout = 30000)
public void testRemoveStaleRecord() throws Exception {
final int numWindows = 5;
final long scheduleInterval = 1000;
final int iterations = 3;
final int numSamples = 100;
final Configuration conf = new HdfsConfiguration();
conf.setLong(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
numSamples);
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
final DataNodePeerMetrics peerMetrics =
DataNodePeerMetrics.create("Sample-DataNode", conf);
MutableRollingAverages rollingAverages =
peerMetrics.getSendPacketDownstreamRollingAverages();
rollingAverages.setRecordValidityMs(numWindows * scheduleInterval);
MetricsTestHelper.replaceRollingAveragesScheduler(rollingAverages,
numWindows, scheduleInterval, TimeUnit.MILLISECONDS);
List<String> peerAddrList = new ArrayList<>();
for (int i = 1; i <= iterations; i++) {
peerAddrList.add(genPeerAddress());
}
for (String peerAddr : peerAddrList) {
for (int j = 1; j <= numSamples; j++) {
/* simulate to get latency of 1 to 1000 ms */
final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
peerMetrics.addSendPacketDownstream(peerAddr, latency);
}
}
GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
assertEquals(3, rollingAverages.getStats(numSamples).size());
/* wait for stale report to be removed */
GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
assertEquals(0, rollingAverages.getStats(numSamples).size());
/* dn can report peer metrics normally when it added back to cluster */
for (String peerAddr : peerAddrList) {
for (int j = 1; j <= numSamples; j++) {
/* simulate to get latency of 1 to 1000 ms */
final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
peerMetrics.addSendPacketDownstream(peerAddr, latency);
}
}
GenericTestUtils.waitFor(
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
assertEquals(3, rollingAverages.getStats(numSamples).size());
}
/** /**
* Simulates to generate different peer addresses, e.g. [84.125.113.65:9801]. * Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
*/ */