HADOOP-16947. Stale record should be remove when MutableRollingAverages generating aggregate data. Contributed by Haibin Huang.
This commit is contained in:
parent
7743d40ac5
commit
97f843de3a
@ -163,6 +163,7 @@ private synchronized MutableRate addMetricIfNotExists(String name) {
|
|||||||
MutableRate metric = globalMetrics.get(name);
|
MutableRate metric = globalMetrics.get(name);
|
||||||
if (metric == null) {
|
if (metric == null) {
|
||||||
metric = new MutableRate(name + typePrefix, name + typePrefix, false);
|
metric = new MutableRate(name + typePrefix, name + typePrefix, false);
|
||||||
|
metric.setUpdateTimeStamp(true);
|
||||||
globalMetrics.put(name, metric);
|
globalMetrics.put(name, metric);
|
||||||
}
|
}
|
||||||
return metric;
|
return metric;
|
||||||
|
@ -41,6 +41,7 @@
|
|||||||
|
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
import org.apache.hadoop.thirdparty.com.google.common.base.Preconditions;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import javax.annotation.Nullable;
|
import javax.annotation.Nullable;
|
||||||
|
|
||||||
@ -77,13 +78,26 @@ public class MutableRollingAverages extends MutableMetric implements Closeable {
|
|||||||
private final String avgInfoDescTemplate;
|
private final String avgInfoDescTemplate;
|
||||||
private int numWindows;
|
private int numWindows;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class maintains sub-sum and sub-total of SampleStat.
|
||||||
|
*/
|
||||||
private static class SumAndCount {
|
private static class SumAndCount {
|
||||||
private final double sum;
|
private final double sum;
|
||||||
private final long count;
|
private final long count;
|
||||||
|
private final long snapshotTimeStamp;
|
||||||
|
|
||||||
SumAndCount(final double sum, final long count) {
|
/**
|
||||||
|
* Constructor for {@link SumAndCount}.
|
||||||
|
*
|
||||||
|
* @param sum sub-sum in sliding windows
|
||||||
|
* @param count sub-total in sliding windows
|
||||||
|
* @param snapshotTimeStamp when is a new SampleStat snapshot.
|
||||||
|
*/
|
||||||
|
SumAndCount(final double sum, final long count,
|
||||||
|
final long snapshotTimeStamp) {
|
||||||
this.sum = sum;
|
this.sum = sum;
|
||||||
this.count = count;
|
this.count = count;
|
||||||
|
this.snapshotTimeStamp = snapshotTimeStamp;
|
||||||
}
|
}
|
||||||
|
|
||||||
public double getSum() {
|
public double getSum() {
|
||||||
@ -93,6 +107,10 @@ public double getSum() {
|
|||||||
public long getCount() {
|
public long getCount() {
|
||||||
return count;
|
return count;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public long getSnapshotTimeStamp() {
|
||||||
|
return snapshotTimeStamp;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -110,6 +128,16 @@ public long getCount() {
|
|||||||
private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
|
private static final long WINDOW_SIZE_MS_DEFAULT = 300_000;
|
||||||
private static final int NUM_WINDOWS_DEFAULT = 36;
|
private static final int NUM_WINDOWS_DEFAULT = 36;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Time duration after which a record is considered stale.
|
||||||
|
* {@link MutableRollingAverages} should be time-sensitive, and it should use
|
||||||
|
* the time window length(i.e. NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT)
|
||||||
|
* as the valid time to make sure some too old record won't be use to compute
|
||||||
|
* average.
|
||||||
|
*/
|
||||||
|
private long recordValidityMs =
|
||||||
|
NUM_WINDOWS_DEFAULT * WINDOW_SIZE_MS_DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Constructor for {@link MutableRollingAverages}.
|
* Constructor for {@link MutableRollingAverages}.
|
||||||
* @param metricValueName
|
* @param metricValueName
|
||||||
@ -231,7 +259,8 @@ public LinkedBlockingDeque<SumAndCount> apply(String k) {
|
|||||||
});
|
});
|
||||||
final SumAndCount sumAndCount = new SumAndCount(
|
final SumAndCount sumAndCount = new SumAndCount(
|
||||||
rate.lastStat().total(),
|
rate.lastStat().total(),
|
||||||
rate.lastStat().numSamples());
|
rate.lastStat().numSamples(),
|
||||||
|
rate.getSnapshotTimeStamp());
|
||||||
/* put newest sum and count to the end */
|
/* put newest sum and count to the end */
|
||||||
if (!deque.offerLast(sumAndCount)) {
|
if (!deque.offerLast(sumAndCount)) {
|
||||||
deque.pollFirst();
|
deque.pollFirst();
|
||||||
@ -267,8 +296,11 @@ public synchronized Map<String, Double> getStats(long minSamples) {
|
|||||||
long totalCount = 0;
|
long totalCount = 0;
|
||||||
|
|
||||||
for (final SumAndCount sumAndCount : entry.getValue()) {
|
for (final SumAndCount sumAndCount : entry.getValue()) {
|
||||||
totalCount += sumAndCount.getCount();
|
if (Time.monotonicNow() - sumAndCount.getSnapshotTimeStamp()
|
||||||
totalSum += sumAndCount.getSum();
|
< recordValidityMs) {
|
||||||
|
totalCount += sumAndCount.getCount();
|
||||||
|
totalSum += sumAndCount.getSum();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
if (totalCount > minSamples) {
|
if (totalCount > minSamples) {
|
||||||
@ -277,4 +309,12 @@ public synchronized Map<String, Double> getStats(long minSamples) {
|
|||||||
}
|
}
|
||||||
return stats;
|
return stats;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Use for test only.
|
||||||
|
*/
|
||||||
|
@VisibleForTesting
|
||||||
|
public synchronized void setRecordValidityMs(long value) {
|
||||||
|
this.recordValidityMs = value;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -24,6 +24,8 @@
|
|||||||
import org.apache.hadoop.metrics2.MetricsInfo;
|
import org.apache.hadoop.metrics2.MetricsInfo;
|
||||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||||
import org.apache.hadoop.metrics2.util.SampleStat;
|
import org.apache.hadoop.metrics2.util.SampleStat;
|
||||||
|
import org.apache.hadoop.util.Time;
|
||||||
|
|
||||||
import static org.apache.hadoop.metrics2.lib.Interns.*;
|
import static org.apache.hadoop.metrics2.lib.Interns.*;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -47,7 +49,9 @@ public class MutableStat extends MutableMetric {
|
|||||||
private final SampleStat prevStat = new SampleStat();
|
private final SampleStat prevStat = new SampleStat();
|
||||||
private final SampleStat.MinMax minMax = new SampleStat.MinMax();
|
private final SampleStat.MinMax minMax = new SampleStat.MinMax();
|
||||||
private long numSamples = 0;
|
private long numSamples = 0;
|
||||||
|
private long snapshotTimeStamp = 0;
|
||||||
private boolean extended = false;
|
private boolean extended = false;
|
||||||
|
private boolean updateTimeStamp = false;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Construct a sample statistics metric
|
* Construct a sample statistics metric
|
||||||
@ -100,6 +104,13 @@ public synchronized void setExtended(boolean extended) {
|
|||||||
this.extended = extended;
|
this.extended = extended;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Set whether to update the snapshot time or not.
|
||||||
|
* @param updateTimeStamp enable update stats snapshot timestamp
|
||||||
|
*/
|
||||||
|
public synchronized void setUpdateTimeStamp(boolean updateTimeStamp) {
|
||||||
|
this.updateTimeStamp = updateTimeStamp;
|
||||||
|
}
|
||||||
/**
|
/**
|
||||||
* Add a number of samples and their sum to the running stat
|
* Add a number of samples and their sum to the running stat
|
||||||
*
|
*
|
||||||
@ -115,7 +126,7 @@ public synchronized void add(long numSamples, long sum) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Add a snapshot to the metric
|
* Add a snapshot to the metric.
|
||||||
* @param value of the metric
|
* @param value of the metric
|
||||||
*/
|
*/
|
||||||
public synchronized void add(long value) {
|
public synchronized void add(long value) {
|
||||||
@ -142,6 +153,9 @@ public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
|||||||
if (numSamples > 0) {
|
if (numSamples > 0) {
|
||||||
intervalStat.copyTo(prevStat);
|
intervalStat.copyTo(prevStat);
|
||||||
intervalStat.reset();
|
intervalStat.reset();
|
||||||
|
if (updateTimeStamp) {
|
||||||
|
snapshotTimeStamp = Time.monotonicNow();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
clearChanged();
|
clearChanged();
|
||||||
}
|
}
|
||||||
@ -164,6 +178,12 @@ public void resetMinMax() {
|
|||||||
minMax.reset();
|
minMax.reset();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Return the SampleStat snapshot timestamp
|
||||||
|
*/
|
||||||
|
public long getSnapshotTimeStamp() {
|
||||||
|
return snapshotTimeStamp;
|
||||||
|
}
|
||||||
@Override
|
@Override
|
||||||
public String toString() {
|
public String toString() {
|
||||||
return lastStat().toString();
|
return lastStat().toString();
|
||||||
|
@ -17,17 +17,24 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.hdfs.server.datanode;
|
package org.apache.hadoop.hdfs.server.datanode;
|
||||||
|
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
import java.util.concurrent.ThreadLocalRandom;
|
import java.util.concurrent.ThreadLocalRandom;
|
||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
|
|
||||||
import org.apache.hadoop.hdfs.DFSConfigKeys;
|
|
||||||
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
import org.apache.hadoop.hdfs.HdfsConfiguration;
|
||||||
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics;
|
||||||
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
|
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
|
||||||
|
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY;
|
||||||
|
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY;
|
||||||
import static org.hamcrest.CoreMatchers.containsString;
|
import static org.hamcrest.CoreMatchers.containsString;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertThat;
|
import static org.junit.Assert.assertThat;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -43,7 +50,7 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
|
|||||||
final int numOpsPerIteration = 1000;
|
final int numOpsPerIteration = 1000;
|
||||||
|
|
||||||
final Configuration conf = new HdfsConfiguration();
|
final Configuration conf = new HdfsConfiguration();
|
||||||
conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
||||||
|
|
||||||
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
|
final DataNodePeerMetrics peerMetrics = DataNodePeerMetrics.create(
|
||||||
"Sample-DataNode", conf);
|
"Sample-DataNode", conf);
|
||||||
@ -80,6 +87,59 @@ public void testGetSendPacketDownstreamAvgInfo() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout = 30000)
|
||||||
|
public void testRemoveStaleRecord() throws Exception {
|
||||||
|
final int numWindows = 5;
|
||||||
|
final long scheduleInterval = 1000;
|
||||||
|
final int iterations = 3;
|
||||||
|
final int numSamples = 100;
|
||||||
|
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
conf.setLong(DFS_DATANODE_PEER_METRICS_MIN_OUTLIER_DETECTION_SAMPLES_KEY,
|
||||||
|
numSamples);
|
||||||
|
conf.setBoolean(DFS_DATANODE_PEER_STATS_ENABLED_KEY, true);
|
||||||
|
|
||||||
|
final DataNodePeerMetrics peerMetrics =
|
||||||
|
DataNodePeerMetrics.create("Sample-DataNode", conf);
|
||||||
|
MutableRollingAverages rollingAverages =
|
||||||
|
peerMetrics.getSendPacketDownstreamRollingAverages();
|
||||||
|
rollingAverages.setRecordValidityMs(numWindows * scheduleInterval);
|
||||||
|
MetricsTestHelper.replaceRollingAveragesScheduler(rollingAverages,
|
||||||
|
numWindows, scheduleInterval, TimeUnit.MILLISECONDS);
|
||||||
|
|
||||||
|
List<String> peerAddrList = new ArrayList<>();
|
||||||
|
for (int i = 1; i <= iterations; i++) {
|
||||||
|
peerAddrList.add(genPeerAddress());
|
||||||
|
}
|
||||||
|
for (String peerAddr : peerAddrList) {
|
||||||
|
for (int j = 1; j <= numSamples; j++) {
|
||||||
|
/* simulate to get latency of 1 to 1000 ms */
|
||||||
|
final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
|
||||||
|
peerMetrics.addSendPacketDownstream(peerAddr, latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 5000);
|
||||||
|
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
||||||
|
/* wait for stale report to be removed */
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rollingAverages.getStats(numSamples).isEmpty(), 500, 10000);
|
||||||
|
assertEquals(0, rollingAverages.getStats(numSamples).size());
|
||||||
|
|
||||||
|
/* dn can report peer metrics normally when it added back to cluster */
|
||||||
|
for (String peerAddr : peerAddrList) {
|
||||||
|
for (int j = 1; j <= numSamples; j++) {
|
||||||
|
/* simulate to get latency of 1 to 1000 ms */
|
||||||
|
final long latency = ThreadLocalRandom.current().nextLong(1, 1000);
|
||||||
|
peerMetrics.addSendPacketDownstream(peerAddr, latency);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
GenericTestUtils.waitFor(
|
||||||
|
() -> rollingAverages.getStats(numSamples).size() > 0, 500, 10000);
|
||||||
|
assertEquals(3, rollingAverages.getStats(numSamples).size());
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
|
* Simulates to generate different peer addresses, e.g. [84.125.113.65:9801].
|
||||||
*/
|
*/
|
||||||
|
Loading…
Reference in New Issue
Block a user