HDFS-16595. Slow peer metrics - add median, mad and upper latency limits (#4357)

Reviewed-by: Tao Li <tomscut@apache.org>
Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Viraj Jasani 2022-06-03 15:10:14 -07:00 committed by GitHub
parent 7bd4ac3ce0
commit 25591ef51b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
15 changed files with 282 additions and 77 deletions

View File

@ -0,0 +1,90 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import org.apache.commons.lang3.builder.EqualsBuilder;
import org.apache.commons.lang3.builder.HashCodeBuilder;
import org.apache.hadoop.classification.InterfaceAudience;
/**
* Outlier detection metrics - median, median absolute deviation, upper latency limit,
* actual latency etc.
*/
@InterfaceAudience.Private
public class OutlierMetrics {
private final Double median;
private final Double mad;
private final Double upperLimitLatency;
private final Double actualLatency;
public OutlierMetrics(Double median, Double mad, Double upperLimitLatency,
Double actualLatency) {
this.median = median;
this.mad = mad;
this.upperLimitLatency = upperLimitLatency;
this.actualLatency = actualLatency;
}
public Double getMedian() {
return median;
}
public Double getMad() {
return mad;
}
public Double getUpperLimitLatency() {
return upperLimitLatency;
}
public Double getActualLatency() {
return actualLatency;
}
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (o == null || getClass() != o.getClass()) {
return false;
}
OutlierMetrics that = (OutlierMetrics) o;
return new EqualsBuilder()
.append(median, that.median)
.append(mad, that.mad)
.append(upperLimitLatency, that.upperLimitLatency)
.append(actualLatency, that.actualLatency)
.isEquals();
}
@Override
public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(median)
.append(mad)
.append(upperLimitLatency)
.append(actualLatency)
.toHashCode();
}
}

View File

@ -51,7 +51,7 @@ public final class SlowPeerReports {
* meaningful and must be avoided.
*/
@Nonnull
private final Map<String, Double> slowPeers;
private final Map<String, OutlierMetrics> slowPeers;
/**
* An object representing a SlowPeerReports with no entries. Should
@ -61,19 +61,19 @@ public final class SlowPeerReports {
public static final SlowPeerReports EMPTY_REPORT =
new SlowPeerReports(ImmutableMap.of());
private SlowPeerReports(Map<String, Double> slowPeers) {
private SlowPeerReports(Map<String, OutlierMetrics> slowPeers) {
this.slowPeers = slowPeers;
}
public static SlowPeerReports create(
@Nullable Map<String, Double> slowPeers) {
@Nullable Map<String, OutlierMetrics> slowPeers) {
if (slowPeers == null || slowPeers.isEmpty()) {
return EMPTY_REPORT;
}
return new SlowPeerReports(slowPeers);
}
public Map<String, Double> getSlowPeers() {
public Map<String, OutlierMetrics> getSlowPeers() {
return slowPeers;
}

View File

@ -112,6 +112,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand;
import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStatus;
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
@ -853,11 +854,15 @@ public static List<SlowPeerReportProto> convertSlowPeerInfo(
List<SlowPeerReportProto> slowPeerInfoProtos =
new ArrayList<>(slowPeers.getSlowPeers().size());
for (Map.Entry<String, Double> entry :
slowPeers.getSlowPeers().entrySet()) {
slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder()
for (Map.Entry<String, OutlierMetrics> entry : slowPeers.getSlowPeers().entrySet()) {
OutlierMetrics outlierMetrics = entry.getValue();
slowPeerInfoProtos.add(
SlowPeerReportProto.newBuilder()
.setDataNodeId(entry.getKey())
.setAggregateLatency(entry.getValue())
.setAggregateLatency(outlierMetrics.getActualLatency())
.setMedian(outlierMetrics.getMedian())
.setMad(outlierMetrics.getMad())
.setUpperLimitLatency(outlierMetrics.getUpperLimitLatency())
.build());
}
return slowPeerInfoProtos;
@ -871,15 +876,19 @@ public static SlowPeerReports convertSlowPeerInfo(
return SlowPeerReports.EMPTY_REPORT;
}
Map<String, Double> slowPeersMap = new HashMap<>(slowPeerProtos.size());
Map<String, OutlierMetrics> slowPeersMap = new HashMap<>(slowPeerProtos.size());
for (SlowPeerReportProto proto : slowPeerProtos) {
if (!proto.hasDataNodeId()) {
// The DataNodeId should be reported.
continue;
}
slowPeersMap.put(
proto.getDataNodeId(),
proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0);
Double aggregateLatency = proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0;
Double medianLatency = proto.hasMedian() ? proto.getMedian() : 0.0;
Double madLatency = proto.hasMad() ? proto.getMad() : 0.0;
Double upperLimitLatency = proto.hasUpperLimitLatency() ? proto.getUpperLimitLatency() : 0.0;
OutlierMetrics outlierMetrics =
new OutlierMetrics(medianLatency, madLatency, upperLimitLatency, aggregateLatency);
slowPeersMap.put(proto.getDataNodeId(), outlierMetrics);
}
return SlowPeerReports.create(slowPeersMap);
}

View File

@ -1896,14 +1896,14 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
Preconditions.checkNotNull(slowPeerTracker, "slowPeerTracker should not be un-assigned");
if (slowPeerTracker.isSlowPeerTrackerEnabled()) {
final Map<String, Double> slowPeersMap = slowPeers.getSlowPeers();
final Map<String, OutlierMetrics> slowPeersMap = slowPeers.getSlowPeers();
if (!slowPeersMap.isEmpty()) {
if (LOG.isDebugEnabled()) {
LOG.debug("DataNode " + nodeReg + " reported slow peers: " + slowPeersMap);
}
for (Map.Entry<String, Double> slowNodeId : slowPeersMap.entrySet()) {
slowPeerTracker.addReport(slowNodeId.getKey(), nodeReg.getIpcAddr(false),
slowNodeId.getValue());
for (Map.Entry<String, OutlierMetrics> slowNodeEntry : slowPeersMap.entrySet()) {
slowPeerTracker.addReport(slowNodeEntry.getKey(), nodeReg.getIpcAddr(false),
slowNodeEntry.getValue());
}
}
}

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.util.Preconditions;
import org.apache.hadoop.util.Timer;
@ -58,7 +59,7 @@ public boolean isSlowPeerTrackerEnabled() {
}
@Override
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) {
LOG.trace("Adding slow peer report is disabled. To enable it, please enable config {}.",
DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY);
}

View File

@ -38,13 +38,31 @@ final class SlowPeerLatencyWithReportingNode
@JsonProperty("ReportedLatency")
private final Double reportedLatency;
@JsonProperty("MedianLatency")
private final Double medianLatency;
@JsonProperty("MadLatency")
private final Double madLatency;
@JsonProperty("UpperLimitLatency")
private final Double upperLimitLatency;
SlowPeerLatencyWithReportingNode(
@JsonProperty("ReportingNode")
String reportingNode,
@JsonProperty("ReportedLatency")
Double reportedLatency) {
Double reportedLatency,
@JsonProperty("MedianLatency")
Double medianLatency,
@JsonProperty("MadLatency")
Double madLatency,
@JsonProperty("UpperLimitLatency")
Double upperLimitLatency) {
this.reportingNode = reportingNode;
this.reportedLatency = reportedLatency;
this.medianLatency = medianLatency;
this.madLatency = madLatency;
this.upperLimitLatency = upperLimitLatency;
}
public String getReportingNode() {
@ -55,6 +73,18 @@ public Double getReportedLatency() {
return reportedLatency;
}
public Double getMedianLatency() {
return medianLatency;
}
public Double getMadLatency() {
return madLatency;
}
public Double getUpperLimitLatency() {
return upperLimitLatency;
}
@Override
public int compareTo(SlowPeerLatencyWithReportingNode o) {
return this.reportingNode.compareTo(o.getReportingNode());
@ -75,6 +105,9 @@ public boolean equals(Object o) {
return new EqualsBuilder()
.append(reportingNode, that.reportingNode)
.append(reportedLatency, that.reportedLatency)
.append(medianLatency, that.medianLatency)
.append(madLatency, that.madLatency)
.append(upperLimitLatency, that.upperLimitLatency)
.isEquals();
}
@ -83,6 +116,9 @@ public int hashCode() {
return new HashCodeBuilder(17, 37)
.append(reportingNode)
.append(reportedLatency)
.append(medianLatency)
.append(madLatency)
.append(upperLimitLatency)
.toHashCode();
}
}

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.util.Timer;
import org.slf4j.Logger;
@ -123,9 +124,10 @@ public boolean isSlowPeerTrackerEnabled() {
*
* @param slowNode DataNodeId of the peer suspected to be slow.
* @param reportingNode DataNodeId of the node reporting on its peer.
* @param slowNodeLatency Aggregate latency of slownode as reported by the reporting node.
* @param slowNodeMetrics Aggregate latency metrics of slownode as reported by the
* reporting node.
*/
public void addReport(String slowNode, String reportingNode, Double slowNodeLatency) {
public void addReport(String slowNode, String reportingNode, OutlierMetrics slowNodeMetrics) {
ConcurrentMap<String, LatencyWithLastReportTime> nodeEntries = allReports.get(slowNode);
if (nodeEntries == null) {
@ -136,7 +138,7 @@ public void addReport(String slowNode, String reportingNode, Double slowNodeLate
// Replace the existing entry from this node, if any.
nodeEntries.put(reportingNode,
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeLatency));
new LatencyWithLastReportTime(timer.monotonicNow(), slowNodeMetrics));
}
/**
@ -195,8 +197,11 @@ private SortedSet<SlowPeerLatencyWithReportingNode> filterNodeReports(
for (Map.Entry<String, LatencyWithLastReportTime> entry : reports.entrySet()) {
if (now - entry.getValue().getTime() < reportValidityMs) {
OutlierMetrics outlierMetrics = entry.getValue().getLatency();
validReports.add(
new SlowPeerLatencyWithReportingNode(entry.getKey(), entry.getValue().getLatency()));
new SlowPeerLatencyWithReportingNode(entry.getKey(), outlierMetrics.getActualLatency(),
outlierMetrics.getMedian(), outlierMetrics.getMad(),
outlierMetrics.getUpperLimitLatency()));
}
}
return validReports;
@ -279,9 +284,9 @@ long getReportValidityMs() {
private static class LatencyWithLastReportTime {
private final Long time;
private final Double latency;
private final OutlierMetrics latency;
LatencyWithLastReportTime(Long time, Double latency) {
LatencyWithLastReportTime(Long time, OutlierMetrics latency) {
this.time = time;
this.latency = latency;
}
@ -290,7 +295,7 @@ public Long getTime() {
return time;
}
public Double getLatency() {
public OutlierMetrics getLatency() {
return latency;
}
}

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.classification.VisibleForTesting;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.metrics2.MetricsJsonBuilder;
import org.apache.hadoop.metrics2.lib.MutableRollingAverages;
import org.apache.hadoop.util.Preconditions;
@ -55,7 +56,7 @@ public class DataNodePeerMetrics {
private final String name;
// Strictly to be used by test code only. Source code is not supposed to use this.
private Map<String, Double> testOutlier = null;
private Map<String, OutlierMetrics> testOutlier = null;
private final OutlierDetector slowNodeDetector;
@ -143,7 +144,7 @@ public void collectThreadLocalStates() {
* Retrieve the set of dataNodes that look significantly slower
* than their peers.
*/
public Map<String, Double> getOutliers() {
public Map<String, OutlierMetrics> getOutliers() {
// outlier must be null for source code.
if (testOutlier == null) {
// This maps the metric name to the aggregate latency.
@ -151,7 +152,7 @@ public Map<String, Double> getOutliers() {
final Map<String, Double> stats =
sendPacketDownstreamRollingAverages.getStats(minOutlierDetectionSamples);
LOG.trace("DataNodePeerMetrics: Got stats: {}", stats);
return slowNodeDetector.getOutliers(stats);
return slowNodeDetector.getOutlierMetrics(stats);
} else {
// this happens only for test code.
return testOutlier;
@ -164,7 +165,7 @@ public Map<String, Double> getOutliers() {
*
* @param outlier outlier directly set by tests.
*/
public void setTestOutliers(Map<String, Double> outlier) {
public void setTestOutliers(Map<String, OutlierMetrics> outlier) {
this.testOutlier = outlier;
}

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -108,6 +110,26 @@ public OutlierDetector(long minNumResources, long lowThresholdMs) {
* @return
*/
public Map<String, Double> getOutliers(Map<String, Double> stats) {
final Map<String, Double> slowResources = new HashMap<>();
Map<String, OutlierMetrics> slowResourceMetrics = getOutlierMetrics(stats);
slowResourceMetrics.forEach(
(node, outlierMetrics) -> slowResources.put(node, outlierMetrics.getActualLatency()));
return slowResources;
}
/**
* Return a set of nodes whose latency is much higher than
* their counterparts. The input is a map of (resource {@literal ->} aggregate
* latency) entries.
*
* The aggregate may be an arithmetic mean or a percentile e.g.
* 90th percentile. Percentiles are a better choice than median
* since latency is usually not a normal distribution.
*
* @param stats map of aggregate latency entries.
* @return map of outlier nodes to outlier metrics.
*/
public Map<String, OutlierMetrics> getOutlierMetrics(Map<String, Double> stats) {
if (stats.size() < minNumResources) {
LOG.debug("Skipping statistical outlier detection as we don't have " +
"latency data for enough resources. Have {}, need at least {}",
@ -124,19 +146,20 @@ public Map<String, Double> getOutliers(Map<String, Double> stats) {
upperLimitLatency = Math.max(
upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad));
final Map<String, Double> slowResources = new HashMap<>();
final Map<String, OutlierMetrics> slowResources = new HashMap<>();
LOG.trace("getOutliers: List={}, MedianLatency={}, " +
"MedianAbsoluteDeviation={}, upperLimitLatency={}",
sorted, median, mad, upperLimitLatency);
LOG.trace("getOutliers: List={}, MedianLatency={}, "
+ "MedianAbsoluteDeviation={}, upperLimitLatency={}", sorted, median, mad,
upperLimitLatency);
// Find resources whose latency exceeds the threshold.
for (Map.Entry<String, Double> entry : stats.entrySet()) {
if (entry.getValue() > upperLimitLatency) {
slowResources.put(entry.getKey(), entry.getValue());
OutlierMetrics outlierMetrics =
new OutlierMetrics(median, mad, upperLimitLatency, entry.getValue());
slowResources.put(entry.getKey(), outlierMetrics);
}
}
return slowResources;
}

View File

@ -408,6 +408,9 @@ message CommitBlockSynchronizationResponseProto {
message SlowPeerReportProto {
optional string dataNodeId = 1;
optional double aggregateLatency = 2;
optional double median = 3;
optional double mad = 4;
optional double upperLimitLatency = 5;
}
/**

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.test.GenericTestUtils;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_MIN_OUTLIER_DETECTION_NODES_KEY;
@ -72,8 +73,10 @@ public void tearDown() throws Exception {
public void testSingleNodeReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
DataNode slowNode = dataNodes.get(1);
OutlierMetrics outlierMetrics = new OutlierMetrics(1.245, 2.69375, 4.5667, 15.5);
dataNodes.get(0).getPeerMetrics().setTestOutliers(
ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(), 15.5));
ImmutableMap.of(slowNode.getDatanodeHostname() + ":" + slowNode.getIpcPort(),
outlierMetrics));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
GenericTestUtils.waitFor(() -> {
@ -91,15 +94,22 @@ public void testSingleNodeReport() throws Exception {
Assert.assertTrue(
cluster.getNameNode().getSlowPeersReport().contains(slowNode.getDatanodeHostname()));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("15.5"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("1.245"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("2.69375"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("4.5667"));
}
@Test
public void testMultiNodesReport() throws Exception {
List<DataNode> dataNodes = cluster.getDataNodes();
OutlierMetrics outlierMetrics1 = new OutlierMetrics(2.498237, 19.2495, 23.568204, 14.5);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(3.2535, 22.4945, 44.5667, 18.7);
dataNodes.get(0).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(), 14.5));
dataNodes.get(1).getDatanodeHostname() + ":" + dataNodes.get(1).getIpcPort(),
outlierMetrics1));
dataNodes.get(1).getPeerMetrics().setTestOutliers(ImmutableMap.of(
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(), 18.7));
dataNodes.get(2).getDatanodeHostname() + ":" + dataNodes.get(2).getIpcPort(),
outlierMetrics2));
DistributedFileSystem distributedFileSystem = cluster.getFileSystem();
Assert.assertEquals(3, distributedFileSystem.getDataNodeStats().length);
GenericTestUtils.waitFor(() -> {
@ -120,6 +130,8 @@ public void testMultiNodesReport() throws Exception {
.contains(dataNodes.get(2).getDatanodeHostname()));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("14.5"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("18.7"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("23.568204"));
Assert.assertTrue(cluster.getNameNode().getSlowPeersReport().contains("22.4945"));
}
}

View File

@ -21,6 +21,7 @@
import org.apache.hadoop.thirdparty.protobuf.UninitializedMessageException;
import org.apache.hadoop.hdfs.protocol.AddErasureCodingPolicyResponse;
import org.apache.hadoop.hdfs.protocol.SystemErasureCodingPolicies;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.hamcrest.CoreMatchers.is;
@ -805,8 +806,14 @@ public void testDataNodeInfoPBHelper() {
@Test
public void testSlowPeerInfoPBHelper() {
// Test with a map that has a few slow peer entries.
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 0.0);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.0);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.0);
final SlowPeerReports slowPeers = SlowPeerReports.create(
ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0));
ImmutableMap.of(
"peer1", outlierMetrics1,
"peer2", outlierMetrics2,
"peer3", outlierMetrics3));
SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo(
PBHelper.convertSlowPeerInfo(slowPeers));
assertTrue(

View File

@ -22,6 +22,8 @@
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.TestBlockStoragePolicy;
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
@ -88,12 +90,18 @@ public void testChooseTargetExcludeSlowNodes() throws Exception {
// mock slow nodes
SlowPeerTracker tracker = dnManager.getSlowPeerTracker();
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), 1.29463);
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), 2.9576);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), 3.59674);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), 4.238456);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), 5.18375);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), 6.39576);
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.29463);
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[3].getInfoAddr(), outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.9576);
tracker.addReport(dataNodes[0].getInfoAddr(), dataNodes[4].getInfoAddr(), outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 3.59674);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[4].getInfoAddr(), outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 4.238456);
tracker.addReport(dataNodes[1].getInfoAddr(), dataNodes[5].getInfoAddr(), outlierMetrics4);
OutlierMetrics outlierMetrics5 = new OutlierMetrics(0.0, 0.0, 0.0, 5.18375);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[3].getInfoAddr(), outlierMetrics5);
OutlierMetrics outlierMetrics6 = new OutlierMetrics(0.0, 0.0, 0.0, 6.39576);
tracker.addReport(dataNodes[2].getInfoAddr(), dataNodes[5].getInfoAddr(), outlierMetrics6);
// waiting for slow nodes collector run
Thread.sleep(3000);

View File

@ -23,6 +23,7 @@
import com.fasterxml.jackson.databind.ObjectReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.util.FakeTimer;
import org.junit.Before;
import org.junit.Rule;
@ -44,8 +45,7 @@
* Tests for {@link SlowPeerTracker}.
*/
public class TestSlowPeerTracker {
public static final Logger LOG = LoggerFactory.getLogger(
TestSlowPeerTracker.class);
private static final Logger LOG = LoggerFactory.getLogger(TestSlowPeerTracker.class);
/**
* Set a timeout for every test case.
@ -79,9 +79,9 @@ public void testEmptyReports() {
@Test
public void testReportsAreRetrieved() {
tracker.addReport("node2", "node1", 1.2);
tracker.addReport("node3", "node1", 2.1);
tracker.addReport("node3", "node2", 1.22);
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.2));
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 2.1));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.22));
assertThat(tracker.getReportsForAllDataNodes().size(), is(2));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
@ -94,9 +94,9 @@ public void testReportsAreRetrieved() {
*/
@Test
public void testAllReportsAreExpired() {
tracker.addReport("node2", "node1", 0.123);
tracker.addReport("node3", "node2", 0.2334);
tracker.addReport("node1", "node3", 1.234);
tracker.addReport("node2", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 0.123));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 0.2334));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
// No reports should expire after 1ms.
timer.advance(1);
@ -116,10 +116,10 @@ public void testAllReportsAreExpired() {
*/
@Test
public void testSomeReportsAreExpired() {
tracker.addReport("node3", "node1", 1.234);
tracker.addReport("node3", "node2", 1.222);
tracker.addReport("node3", "node1", new OutlierMetrics(0.0, 0.0, 0.0, 1.234));
tracker.addReport("node3", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.222));
timer.advance(reportValidityMs);
tracker.addReport("node3", "node4", 1.20);
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.20));
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node3").size(), is(1));
assertEquals(1, tracker.getReportsForNode("node3").stream()
@ -131,22 +131,28 @@ public void testSomeReportsAreExpired() {
*/
@Test
public void testReplacement() {
tracker.addReport("node2", "node1", 2.1);
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 2.1);
tracker.addReport("node2", "node1", outlierMetrics1);
timer.advance(reportValidityMs); // Expire the report.
assertThat(tracker.getReportsForAllDataNodes().size(), is(0));
// This should replace the expired report with a newer valid one.
tracker.addReport("node2", "node1", 0.001);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 0.001);
tracker.addReport("node2", "node1", outlierMetrics2);
assertThat(tracker.getReportsForAllDataNodes().size(), is(1));
assertThat(tracker.getReportsForNode("node2").size(), is(1));
}
@Test
public void testGetJson() throws IOException {
tracker.addReport("node1", "node2", 1.1);
tracker.addReport("node2", "node3", 1.23);
tracker.addReport("node2", "node1", 2.13);
tracker.addReport("node4", "node1", 1.244);
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.1);
tracker.addReport("node1", "node2", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 1.23);
tracker.addReport("node2", "node3", outlierMetrics2);
OutlierMetrics outlierMetrics3 = new OutlierMetrics(0.0, 0.0, 0.0, 2.13);
tracker.addReport("node2", "node1", outlierMetrics3);
OutlierMetrics outlierMetrics4 = new OutlierMetrics(0.0, 0.0, 0.0, 1.244);
tracker.addReport("node4", "node1", outlierMetrics4);
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
@ -161,17 +167,17 @@ public void testGetJson() throws IOException {
@Test
public void testGetJsonSizeIsLimited() throws IOException {
tracker.addReport("node1", "node2", 1.634);
tracker.addReport("node1", "node3", 2.3566);
tracker.addReport("node2", "node3", 3.869);
tracker.addReport("node2", "node4", 4.1356);
tracker.addReport("node3", "node4", 1.73057);
tracker.addReport("node3", "node5", 2.4956730);
tracker.addReport("node4", "node6", 3.29847);
tracker.addReport("node5", "node6", 4.13444);
tracker.addReport("node5", "node7", 5.10845);
tracker.addReport("node6", "node8", 2.37464);
tracker.addReport("node6", "node7", 1.29475656);
tracker.addReport("node1", "node2", new OutlierMetrics(0.0, 0.0, 0.0, 1.634));
tracker.addReport("node1", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 2.3566));
tracker.addReport("node2", "node3", new OutlierMetrics(0.0, 0.0, 0.0, 3.869));
tracker.addReport("node2", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 4.1356));
tracker.addReport("node3", "node4", new OutlierMetrics(0.0, 0.0, 0.0, 1.73057));
tracker.addReport("node3", "node5", new OutlierMetrics(0.0, 0.0, 0.0, 2.4956730));
tracker.addReport("node4", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 3.29847));
tracker.addReport("node5", "node6", new OutlierMetrics(0.0, 0.0, 0.0, 4.13444));
tracker.addReport("node5", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 5.10845));
tracker.addReport("node6", "node8", new OutlierMetrics(0.0, 0.0, 0.0, 2.37464));
tracker.addReport("node6", "node7", new OutlierMetrics(0.0, 0.0, 0.0, 1.29475656));
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();
@ -215,13 +221,16 @@ public void testGetJsonSizeIsLimited() throws IOException {
public void testLowRankedElementsIgnored() throws IOException {
// Insert 5 nodes with 2 peer reports each.
for (int i = 0; i < 5; ++i) {
tracker.addReport("node" + i, "reporter1", 1.295673);
tracker.addReport("node" + i, "reporter2", 2.38560);
OutlierMetrics outlierMetrics1 = new OutlierMetrics(0.0, 0.0, 0.0, 1.295673);
tracker.addReport("node" + i, "reporter1", outlierMetrics1);
OutlierMetrics outlierMetrics2 = new OutlierMetrics(0.0, 0.0, 0.0, 2.38560);
tracker.addReport("node" + i, "reporter2", outlierMetrics2);
}
// Insert 10 nodes with 1 peer report each.
for (int i = 10; i < 20; ++i) {
tracker.addReport("node" + i, "reporter1", 3.4957);
OutlierMetrics outlierMetrics = new OutlierMetrics(0.0, 0.0, 0.0, 3.4957);
tracker.addReport("node" + i, "reporter1", outlierMetrics);
}
final Set<SlowPeerJsonReport> reports = getAndDeserializeJson();

View File

@ -21,6 +21,7 @@
import java.util.function.Supplier;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.server.protocol.OutlierMetrics;
import org.apache.hadoop.metrics2.lib.MetricsTestHelper;
import org.apache.hadoop.test.GenericTestUtils;
import org.junit.Before;
@ -100,7 +101,7 @@ public Boolean get() {
}
}, 500, 100_000);
final Map<String, Double> outliers = peerMetrics.getOutliers();
final Map<String, OutlierMetrics> outliers = peerMetrics.getOutliers();
LOG.info("Got back outlier nodes: {}", outliers);
assertThat(outliers.size(), is(1));
assertTrue(outliers.containsKey(slowNodeName));