diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java index 06ae30d552..4e3b73f754 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/RollingAverages.java @@ -20,6 +20,7 @@ package org.apache.hadoop.metrics2.lib; import java.io.Closeable; import java.io.IOException; +import java.util.HashMap; import java.util.Map; import java.util.Map.Entry; import java.util.concurrent.ConcurrentHashMap; @@ -39,6 +40,9 @@ import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import com.google.common.base.Preconditions; import com.google.common.util.concurrent.ThreadFactoryBuilder; + +import javax.annotation.Nullable; + import static org.apache.hadoop.metrics2.lib.Interns.*; /** @@ -63,7 +67,10 @@ public class RollingAverages extends MutableMetric implements Closeable { .setNameFormat("RollingAverages-%d").build()); private ScheduledFuture scheduledTask = null; + + @Nullable private Map currentSnapshot; + private final int numWindows; private final String avgInfoNameTemplate; private final String avgInfoDescTemplate; @@ -100,31 +107,31 @@ public class RollingAverages extends MutableMetric implements Closeable { /** * Constructor of {@link RollingAverages}. - * @param windowSize - * The number of seconds of each window for which sub set of samples - * are gathered to compute the rolling average, A.K.A. roll over - * interval. + * @param windowSizeMs + * The number of milliseconds of each window for which subset + * of samples are gathered to compute the rolling average, A.K.A. + * roll over interval. * @param numWindows * The number of windows maintained to compute the rolling average. * @param valueName * of the metric (e.g. "Time", "Latency") */ public RollingAverages( - final int windowSize, + final long windowSizeMs, final int numWindows, final String valueName) { String uvName = StringUtils.capitalize(valueName); String lvName = StringUtils.uncapitalize(valueName); - avgInfoNameTemplate = "%s" + "RollingAvg"+ uvName; + avgInfoNameTemplate = "[%s]" + "RollingAvg"+ uvName; avgInfoDescTemplate = "Rolling average "+ lvName +" for "+ "%s"; this.numWindows = numWindows; scheduledTask = SCHEDULER.scheduleAtFixedRate(new RatesRoller(this), - windowSize, windowSize, TimeUnit.SECONDS); + windowSizeMs, windowSizeMs, TimeUnit.MILLISECONDS); } /** * Constructor of {@link RollingAverages}. - * @param windowSize + * @param windowSizeMs * The number of seconds of each window for which sub set of samples * are gathered to compute rolling average, also A.K.A roll over * interval. @@ -133,9 +140,9 @@ public class RollingAverages extends MutableMetric implements Closeable { * average of the rolling averages. */ public RollingAverages( - final int windowSize, + final long windowSizeMs, final int numWindows) { - this(windowSize, numWindows, "Time"); + this(windowSizeMs, numWindows, "Time"); } @Override @@ -213,7 +220,7 @@ public class RollingAverages extends MutableMetric implements Closeable { * Iterates over snapshot to capture all Avg metrics into rolling structure * {@link RollingAverages#averages}. */ - private void rollOverAvgs() { + private synchronized void rollOverAvgs() { if (currentSnapshot == null) { return; } @@ -248,4 +255,32 @@ public class RollingAverages extends MutableMetric implements Closeable { } scheduledTask = null; } + + /** + * Retrieve a map of metric name -> (aggregate). + * Filter out entries that don't have at least minSamples. + * + * @return a map of peer DataNode Id to the average latency to that + * node seen over the measurement period. + */ + public synchronized Map getStats(long minSamples) { + final Map stats = new HashMap<>(); + + for (final Entry> entry + : averages.entrySet()) { + final String name = entry.getKey(); + double totalSum = 0; + long totalCount = 0; + + for (final SumAndCount sumAndCount : entry.getValue()) { + totalCount += sumAndCount.getCount(); + totalSum += sumAndCount.getSum(); + } + + if (totalCount > minSamples) { + stats.put(name, totalSum / totalCount); + } + } + return stats; + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java index 899d98c4b6..44202e788e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestRollingAverages.java @@ -42,7 +42,8 @@ public class TestRollingAverages { public void testRollingAveragesEmptyRollover() throws Exception { final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); /* 5s interval and 2 windows */ - try (final RollingAverages rollingAverages = new RollingAverages(5, 2)) { + try (final RollingAverages rollingAverages = + new RollingAverages(5000, 2)) { /* Check it initially */ rollingAverages.snapshot(rb, true); verify(rb, never()).addGauge( @@ -74,10 +75,10 @@ public class TestRollingAverages { public void testRollingAveragesRollover() throws Exception { final MetricsRecordBuilder rb = mockMetricsRecordBuilder(); final String name = "foo2"; - final int windowSize = 5; // 5s roll over interval + final int windowSizeMs = 5000; // 5s roll over interval final int numWindows = 2; final int numOpsPerIteration = 1000; - try (RollingAverages rollingAverages = new RollingAverages(windowSize, + try (RollingAverages rollingAverages = new RollingAverages(windowSizeMs, numWindows)) { /* Push values for three intervals */ @@ -92,7 +93,7 @@ public class TestRollingAverages { * Sleep until 1s after the next windowSize seconds interval, to let the * metrics roll over */ - final long sleep = (start + (windowSize * 1000 * i) + 1000) + final long sleep = (start + (windowSizeMs * i) + 1000) - Time.monotonicNow(); Thread.sleep(sleep); @@ -110,12 +111,12 @@ public class TestRollingAverages { final long rollingTotal = i > 1 ? 2 * numOpsPerIteration : numOpsPerIteration; verify(rb).addGauge( - info("Foo2RollingAvgTime", "Rolling average time for foo2"), + info("[Foo2]RollingAvgTime", "Rolling average time for foo2"), rollingSum / rollingTotal); /* Verify the metrics were added the right number of times */ verify(rb, times(i)).addGauge( - eq(info("Foo2RollingAvgTime", "Rolling average time for foo2")), + eq(info("[Foo2]RollingAvgTime", "Rolling average time for foo2")), anyDouble()); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java new file mode 100644 index 0000000000..218e30df4d --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs-client/src/main/java/org/apache/hadoop/hdfs/server/protocol/SlowPeerReports.java @@ -0,0 +1,107 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.protocol; + +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +import javax.annotation.Nonnull; +import javax.annotation.Nullable; +import java.util.Map; + +/** + * A class that allows a DataNode to communicate information about all + * its peer DataNodes that appear to be slow. + * + * The wire representation of this structure is a list of + * SlowPeerReportProto messages. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public final class SlowPeerReports { + /** + * A map from the DataNode's DataNodeUUID to its aggregate latency + * as seen by the reporting node. + * + * The exact choice of the aggregate is opaque to the NameNode but it + * should be chosen consistently by all DataNodes in the cluster. + * Examples of aggregates are 90th percentile (good) and mean (not so + * good). + * + * The NameNode must not attempt to interpret the aggregate latencies + * beyond exposing them as a diagnostic. e.g. metrics. Also, comparing + * latencies across reports from different DataNodes may not be not + * meaningful and must be avoided. + */ + @Nonnull + private final Map slowPeers; + + /** + * An object representing a SlowPeerReports with no entries. Should + * be used instead of null or creating new objects when there are + * no slow peers to report. + */ + public static final SlowPeerReports EMPTY_REPORT = + new SlowPeerReports(ImmutableMap.of()); + + private SlowPeerReports(Map slowPeers) { + this.slowPeers = slowPeers; + } + + public static SlowPeerReports create( + @Nullable Map slowPeers) { + if (slowPeers == null || slowPeers.isEmpty()) { + return EMPTY_REPORT; + } + return new SlowPeerReports(slowPeers); + } + + public Map getSlowPeers() { + return slowPeers; + } + + public boolean haveSlowPeers() { + return slowPeers.size() > 0; + } + + /** + * Return true if the two objects represent the same set slow peer + * entries. Primarily for unit testing convenience. + */ + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + + if (!(o instanceof SlowPeerReports)) { + return false; + } + + SlowPeerReports that = (SlowPeerReports) o; + + return slowPeers.equals(that.slowPeers); + } + + @Override + public int hashCode() { + return slowPeers.hashCode(); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 8f60af0cd4..3cc4b5f4a7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -457,14 +457,19 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_METRICS_SESSION_ID_KEY = HdfsClientConfigKeys.DeprecatedKeys.DFS_METRICS_SESSION_ID_KEY; public static final String DFS_METRICS_PERCENTILES_INTERVALS_KEY = "dfs.metrics.percentiles.intervals"; - public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY = - "dfs.metrics.rolling.average.window.size"; - public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT = - 3600; - public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY = - "dfs.metrics.rolling.average.window.numbers"; - public static final int DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT = - 48; + + // The following setting is not meant to be changed by administrators. + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY = + "dfs.metrics.rolling.averages.window.length"; + public static final String DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT = + "5m"; + + // The following setting is not meant to be changed by administrators. + public static final String DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY = + "dfs.metrics.rolling.average.num.windows"; + public static final int DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT = + 36; + public static final String DFS_DATANODE_PEER_STATS_ENABLED_KEY = "dfs.datanode.peer.stats.enabled"; public static final boolean DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT = false; @@ -669,6 +674,11 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT = "dfs.block.misreplication.processing.limit"; public static final int DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT = 10000; + public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY = + "dfs.datanode.slow.peers.report.interval"; + public static final String DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT = + "30m"; + // property for fsimage compression public static final String DFS_IMAGE_COMPRESS_KEY = "dfs.image.compress"; public static final boolean DFS_IMAGE_COMPRESS_DEFAULT = false; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 79113dd8b2..d9e6026060 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -55,6 +55,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -71,6 +72,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.RpcController; import com.google.protobuf.ServiceException; +import javax.annotation.Nonnull; + /** * This class is the client side translator to translate the requests made on * {@link DatanodeProtocol} interfaces to the RPC server implementing @@ -132,7 +135,8 @@ public class DatanodeProtocolClientSideTranslatorPB implements StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) @@ -149,6 +153,9 @@ public class DatanodeProtocolClientSideTranslatorPB implements builder.setVolumeFailureSummary(PBHelper.convertVolumeFailureSummary( volumeFailureSummary)); } + if (slowPeers.haveSlowPeers()) { + builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers)); + } HeartbeatResponseProto resp; try { resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 257adf9792..b1c8e344fd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -120,7 +120,8 @@ public class DatanodeProtocolServerSideTranslatorPB implements report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes(), - volumeFailureSummary, request.getRequestFullBlockReportLease()); + volumeFailureSummary, request.getRequestFullBlockReportLease(), + PBHelper.convertSlowPeerInfo(request.getSlowPeersList())); } catch (IOException e) { throw new ServiceException(e); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index d97708f106..69c3c8372b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -20,7 +20,10 @@ package org.apache.hadoop.hdfs.protocolPB; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; import java.util.List; +import java.util.Map; import com.google.protobuf.ByteString; @@ -44,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.FinalizeComm import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto; +import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto; import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto; import org.apache.hadoop.hdfs.protocol.proto.ErasureCodingProtos.BlockECReconstructionInfoProto; @@ -107,6 +111,7 @@ import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo.BlockStat import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; /** @@ -829,6 +834,45 @@ public class PBHelper { return builder.build(); } + public static List convertSlowPeerInfo( + SlowPeerReports slowPeers) { + if (slowPeers.getSlowPeers().size() == 0) { + return Collections.emptyList(); + } + + List slowPeerInfoProtos = + new ArrayList<>(slowPeers.getSlowPeers().size()); + for (Map.Entry entry : + slowPeers.getSlowPeers().entrySet()) { + slowPeerInfoProtos.add(SlowPeerReportProto.newBuilder() + .setDataNodeId(entry.getKey()) + .setAggregateLatency(entry.getValue()) + .build()); + } + return slowPeerInfoProtos; + } + + public static SlowPeerReports convertSlowPeerInfo( + List slowPeerProtos) { + + // No slow peers, or possibly an older DataNode. + if (slowPeerProtos == null || slowPeerProtos.size() == 0) { + return SlowPeerReports.EMPTY_REPORT; + } + + Map slowPeersMap = new HashMap<>(slowPeerProtos.size()); + for (SlowPeerReportProto proto : slowPeerProtos) { + if (!proto.hasDataNodeId()) { + // The DataNodeId should be reported. + continue; + } + slowPeersMap.put( + proto.getDataNodeId(), + proto.hasAggregateLatency() ? proto.getAggregateLatency() : 0.0); + } + return SlowPeerReports.create(slowPeersMap); + } + public static JournalInfo convert(JournalInfoProto info) { int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0; int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index cc64a04e45..fed1864e1d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -50,7 +50,10 @@ import org.apache.hadoop.net.*; import org.apache.hadoop.net.NetworkTopology.InvalidTopologyException; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.Timer; +import javax.annotation.Nonnull; +import javax.annotation.Nullable; import java.io.IOException; import java.io.PrintWriter; import java.net.InetAddress; @@ -172,6 +175,14 @@ public class DatanodeManager { */ private final HashMap datanodesSoftwareVersions = new HashMap<>(4, 0.75f); + + /** + * True if we should process latency metrics from downstream peers. + */ + private final boolean dataNodePeerStatsEnabled; + + @Nullable + private final SlowPeerTracker slowPeerTracker; /** * The minimum time between resending caching directives to Datanodes, @@ -194,6 +205,12 @@ public class DatanodeManager { this.decomManager = new DecommissionManager(namesystem, blockManager, heartbeatManager); this.fsClusterStats = newFSClusterStats(); + this.dataNodePeerStatsEnabled = conf.getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + + this.slowPeerTracker = dataNodePeerStatsEnabled ? + new SlowPeerTracker(conf, new Timer()) : null; this.defaultXferPort = NetUtils.createSocketAddr( conf.getTrimmed(DFSConfigKeys.DFS_DATANODE_ADDRESS_KEY, @@ -1566,7 +1583,8 @@ public class DatanodeManager { StorageReport[] reports, final String blockPoolId, long cacheCapacity, long cacheUsed, int xceiverCount, int maxTransfers, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + @Nonnull SlowPeerReports slowPeers) throws IOException { final DatanodeDescriptor nodeinfo; try { nodeinfo = getDatanode(nodeReg); @@ -1632,6 +1650,19 @@ public class DatanodeManager { nodeinfo.setBalancerBandwidth(0); } + if (slowPeerTracker != null) { + final Map slowPeersMap = slowPeers.getSlowPeers(); + if (!slowPeersMap.isEmpty()) { + if (LOG.isDebugEnabled()) { + LOG.debug("DataNode " + nodeReg + " reported slow peers: " + + slowPeersMap); + } + for (String slowNodeId : slowPeersMap.keySet()) { + slowPeerTracker.addReport(slowNodeId, nodeReg.getIpcAddr(false)); + } + } + } + if (!cmds.isEmpty()) { return cmds.toArray(new DatanodeCommand[cmds.size()]); } @@ -1834,5 +1865,14 @@ public class DatanodeManager { this.blockInvalidateLimit = Math.max(20 * (int) (intervalSeconds), DFSConfigKeys.DFS_BLOCK_INVALIDATE_LIMIT_DEFAULT); } + + /** + * Retrieve information about slow peers as a JSON. + * Returns null if we are not tracking slow peers. + * @return + */ + public String getSlowPeersReport() { + return slowPeerTracker != null ? slowPeerTracker.getJson() : null; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java new file mode 100644 index 0000000000..cf3a20cc65 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/SlowPeerTracker.java @@ -0,0 +1,273 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.fasterxml.jackson.annotation.JsonProperty; +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import com.google.common.primitives.Ints; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.Timer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Collection; +import java.util.Collections; +import java.util.Comparator; +import java.util.HashMap; +import java.util.Map; +import java.util.PriorityQueue; +import java.util.Set; +import java.util.SortedSet; +import java.util.TreeSet; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; + + +/** + * This class aggregates information from {@link SlowPeerReports} received via + * heartbeats. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SlowPeerTracker { + public static final Logger LOG = + LoggerFactory.getLogger(SlowPeerTracker.class); + + /** + * Time duration after which a report is considered stale. This is + * set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e. + * maintained for at least two successive reports. + */ + private final long reportValidityMs; + + /** + * Timer object for querying the current time. Separated out for + * unit testing. + */ + private final Timer timer; + + /** + * Number of nodes to include in JSON report. We will return nodes with + * the highest number of votes from peers. + */ + private static final int MAX_NODES_TO_REPORT = 5; + + /** + * Information about peers that have reported a node as being slow. + * Each outer map entry is a map of (DatanodeId) -> (timestamp), + * mapping reporting nodes to the timestamp of the last report from + * that node. + * + * DatanodeId could be the DataNodeId or its address. We + * don't care as long as the caller uses it consistently. + * + * Stale reports are not evicted proactively and can potentially + * hang around forever. + */ + private final ConcurrentMap> + allReports; + + public SlowPeerTracker(Configuration conf, Timer timer) { + this.timer = timer; + this.allReports = new ConcurrentHashMap<>(); + this.reportValidityMs = conf.getTimeDuration( + DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, + DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS) * 3; + } + + /** + * Add a new report. DatanodeIds can be the DataNodeIds or addresses + * We don't care as long as the caller is consistent. + * + * @param reportingNode DataNodeId of the node reporting on its peer. + * @param slowNode DataNodeId of the peer suspected to be slow. + */ + public void addReport(String slowNode, + String reportingNode) { + ConcurrentMap nodeEntries = allReports.get(slowNode); + + if (nodeEntries == null) { + // putIfAbsent guards against multiple writers. + allReports.putIfAbsent(slowNode, new ConcurrentHashMap<>()); + nodeEntries = allReports.get(slowNode); + } + + // Replace the existing entry from this node, if any. + nodeEntries.put(reportingNode, timer.monotonicNow()); + } + + /** + * Retrieve the non-expired reports that mark a given DataNode + * as slow. Stale reports are excluded. + * + * @param slowNode target node Id. + * @return set of reports which implicate the target node as being slow. + */ + public Set getReportsForNode(String slowNode) { + final ConcurrentMap nodeEntries = + allReports.get(slowNode); + + if (nodeEntries == null || nodeEntries.isEmpty()) { + return Collections.emptySet(); + } + + return filterNodeReports(nodeEntries, timer.monotonicNow()); + } + + /** + * Retrieve all reports for all nodes. Stale reports are excluded. + * + * @return map from SlowNodeId -> (set of nodes reporting peers). + */ + public Map> getReportsForAllDataNodes() { + if (allReports.isEmpty()) { + return ImmutableMap.of(); + } + + final Map> allNodesValidReports = new HashMap<>(); + final long now = timer.monotonicNow(); + + for (Map.Entry> entry : + allReports.entrySet()) { + SortedSet validReports = filterNodeReports(entry.getValue(), now); + if (!validReports.isEmpty()) { + allNodesValidReports.put(entry.getKey(), validReports); + } + } + return allNodesValidReports; + } + + /** + * Filter the given reports to return just the valid ones. + * + * @param reports + * @param now + * @return + */ + private SortedSet filterNodeReports( + ConcurrentMap reports, long now) { + final SortedSet validReports = new TreeSet<>(); + + for (Map.Entry entry : reports.entrySet()) { + if (now - entry.getValue() < reportValidityMs) { + validReports.add(entry.getKey()); + } + } + return validReports; + } + + /** + * Retrieve all valid reports as a JSON string. + * @return serialized representation of valid reports. null if + * serialization failed. + */ + public String getJson() { + Collection validReports = getJsonReports( + MAX_NODES_TO_REPORT); + ObjectMapper objectMapper = new ObjectMapper(); + try { + return objectMapper.writeValueAsString(validReports); + } catch (JsonProcessingException e) { + // Failed to serialize. Don't log the exception call stack. + LOG.debug("Failed to serialize statistics" + e); + return null; + } + } + + /** + * This structure is a thin wrapper over reports to make Json + * [de]serialization easy. + */ + public static class ReportForJson { + @JsonProperty("SlowNode") + final private String slowNode; + + @JsonProperty("ReportingNodes") + final private SortedSet reportingNodes; + + public ReportForJson( + @JsonProperty("SlowNode") String slowNode, + @JsonProperty("ReportingNodes") SortedSet reportingNodes) { + this.slowNode = slowNode; + this.reportingNodes = reportingNodes; + } + + public String getSlowNode() { + return slowNode; + } + + public SortedSet getReportingNodes() { + return reportingNodes; + } + } + + /** + * Retrieve reports in a structure for generating JSON, limiting the + * output to the top numNodes nodes i.e nodes with the most reports. + * @param numNodes number of nodes to return. This is to limit the + * size of the generated JSON. + */ + private Collection getJsonReports(int numNodes) { + if (allReports.isEmpty()) { + return Collections.emptyList(); + } + + final PriorityQueue topNReports = + new PriorityQueue<>(allReports.size(), + new Comparator() { + @Override + public int compare(ReportForJson o1, ReportForJson o2) { + return Ints.compare(o1.reportingNodes.size(), + o2.reportingNodes.size()); + } + }); + + final long now = timer.monotonicNow(); + + for (Map.Entry> entry : + allReports.entrySet()) { + SortedSet validReports = filterNodeReports( + entry.getValue(), now); + if (!validReports.isEmpty()) { + if (topNReports.size() < numNodes) { + topNReports.add(new ReportForJson(entry.getKey(), validReports)); + } else if (topNReports.peek().getReportingNodes().size() < + validReports.size()){ + // Remove the lowest element + topNReports.poll(); + topNReports.add(new ReportForJson(entry.getKey(), validReports)); + } + } + } + return topNReports; + } + + @VisibleForTesting + long getReportValidityMs() { + return reportValidityMs; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 52947992bb..644a8abd1b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -57,6 +57,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -127,7 +128,8 @@ class BPServiceActor implements Runnable { this.ibrManager = new IncrementalBlockReportManager(dnConf.ibrInterval); prevBlockReportId = ThreadLocalRandom.current().nextLong(); scheduler = new Scheduler(dnConf.heartBeatInterval, - dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval); + dnConf.getLifelineIntervalMs(), dnConf.blockReportInterval, + dnConf.slowPeersReportIntervalMs); // get the value of maxDataLength. this.maxDataLength = dnConf.getMaxDataLength(); } @@ -489,12 +491,18 @@ class BPServiceActor implements Runnable { " storage reports from service actor: " + this); } - scheduler.updateLastHeartbeatTime(monotonicNow()); + final long now = monotonicNow(); + scheduler.updateLastHeartbeatTime(now); VolumeFailureSummary volumeFailureSummary = dn.getFSDataset() .getVolumeFailureSummary(); int numFailedVolumes = volumeFailureSummary != null ? volumeFailureSummary.getFailedStorageLocations().length : 0; - return bpNamenode.sendHeartbeat(bpRegistration, + final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now); + final SlowPeerReports slowPeers = + slowPeersReportDue && dn.getPeerMetrics() != null ? + SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) : + SlowPeerReports.EMPTY_REPORT; + HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration, reports, dn.getFSDataset().getCacheCapacity(), dn.getFSDataset().getCacheUsed(), @@ -502,7 +510,14 @@ class BPServiceActor implements Runnable { dn.getXceiverCount(), numFailedVolumes, volumeFailureSummary, - requestBlockReportLease); + requestBlockReportLease, + slowPeers); + + if (slowPeersReportDue) { + // If the report was due and successfully sent, schedule the next one. + scheduler.scheduleNextSlowPeerReport(); + } + return response; } @VisibleForTesting @@ -1079,18 +1094,23 @@ class BPServiceActor implements Runnable { @VisibleForTesting boolean resetBlockReportTime = true; + @VisibleForTesting + volatile long nextSlowPeersReportTime = monotonicNow(); + private final AtomicBoolean forceFullBlockReport = new AtomicBoolean(false); private final long heartbeatIntervalMs; private final long lifelineIntervalMs; private final long blockReportIntervalMs; + private final long slowPeersReportIntervalMs; Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs, - long blockReportIntervalMs) { + long blockReportIntervalMs, long slowPeersReportIntervalMs) { this.heartbeatIntervalMs = heartbeatIntervalMs; this.lifelineIntervalMs = lifelineIntervalMs; this.blockReportIntervalMs = blockReportIntervalMs; + this.slowPeersReportIntervalMs = slowPeersReportIntervalMs; scheduleNextLifeline(nextHeartbeatTime); } @@ -1123,6 +1143,10 @@ class BPServiceActor implements Runnable { lastBlockReportTime = blockReportTime; } + void scheduleNextSlowPeerReport() { + nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs; + } + long getLastHearbeatTime() { return (monotonicNow() - lastHeartbeatTime)/1000; } @@ -1149,6 +1173,10 @@ class BPServiceActor implements Runnable { return nextBlockReportTime - curTime <= 0; } + boolean isSlowPeersReportDue(long curTime) { + return nextSlowPeersReportTime - curTime <= 0; + } + void forceFullBlockReportNow() { forceFullBlockReport.set(true); resetBlockReportTime = true; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java index 567597d012..dd4b58b080 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BlockReceiver.java @@ -47,6 +47,7 @@ import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.BlockOpResponseP import org.apache.hadoop.hdfs.protocol.proto.DataTransferProtos.Status; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaInputStreams; import org.apache.hadoop.hdfs.server.datanode.fsdataset.ReplicaOutputStreams; +import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodePeerMetrics; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.util.DataTransferThrottler; import org.apache.hadoop.io.IOUtils; @@ -93,7 +94,7 @@ class BlockReceiver implements Closeable { protected final String inAddr; protected final String myAddr; private String mirrorAddr; - private String bracketedMirrorAddr; + private String mirrorNameForMetrics; private DataOutputStream mirrorOut; private Daemon responder = null; private DataTransferThrottler throttler; @@ -843,10 +844,9 @@ class BlockReceiver implements Closeable { *

*/ private void trackSendPacketToLastNodeInPipeline(final long elapsedMs) { - if (isPenultimateNode && mirrorAddr != null) { - datanode.getPeerMetrics().addSendPacketDownstream( - bracketedMirrorAddr, - elapsedMs); + final DataNodePeerMetrics peerMetrics = datanode.getPeerMetrics(); + if (peerMetrics != null && isPenultimateNode) { + peerMetrics.addSendPacketDownstream(mirrorNameForMetrics, elapsedMs); } } @@ -927,8 +927,13 @@ class BlockReceiver implements Closeable { boolean responderClosed = false; mirrorOut = mirrOut; mirrorAddr = mirrAddr; - bracketedMirrorAddr = "[" + mirrAddr + "]"; isPenultimateNode = ((downstreams != null) && (downstreams.length == 1)); + if (isPenultimateNode) { + mirrorNameForMetrics = (downstreams[0].getInfoSecurePort() != 0 ? + downstreams[0].getInfoSecureAddr() : downstreams[0].getInfoAddr()); + LOG.debug("Will collect peer metrics for downstream node {}", + mirrorNameForMetrics); + } throttler = throttlerArg; this.replyOut = replyOut; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index c1487b1a4a..e2c5fbce9f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -30,6 +30,8 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CACHEREPORT_INTERVAL_MSEC import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_LIFELINE_INTERVAL_SECONDS_KEY; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST; import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_NON_LOCAL_LAZY_PERSIST_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_DEFAULT; import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_WRITE_PACKET_SIZE_KEY; @@ -94,6 +96,8 @@ public class DNConf { private final long lifelineIntervalMs; final long blockReportInterval; final long blockReportSplitThreshold; + final boolean peerStatsEnabled; + final long slowPeersReportIntervalMs; final long ibrInterval; final long initialBlockReportDelayMs; final long cacheReportInterval; @@ -173,6 +177,13 @@ public class DNConf { this.blockReportInterval = getConf().getLong( DFS_BLOCKREPORT_INTERVAL_MSEC_KEY, DFS_BLOCKREPORT_INTERVAL_MSEC_DEFAULT); + this.peerStatsEnabled = getConf().getBoolean( + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, + DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + this.slowPeersReportIntervalMs = getConf().getTimeDuration( + DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_KEY, + DFS_DATANODE_SLOW_PEERS_REPORT_INTERVAL_DEFAULT, + TimeUnit.MILLISECONDS); this.ibrInterval = getConf().getLong( DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_KEY, DFSConfigKeys.DFS_BLOCKREPORT_INCREMENTAL_INTERVAL_MSEC_DEFAULT); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java index 090d8b982c..a6dfa46c22 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataNode.java @@ -334,6 +334,7 @@ public class DataNode extends ReconfigurableBase private int infoSecurePort; DataNodeMetrics metrics; + @Nullable private DataNodePeerMetrics peerMetrics; private InetSocketAddress streamingAddr; @@ -422,6 +423,7 @@ public class DataNode extends ReconfigurableBase this.blockScanner = new BlockScanner(this, this.getConf()); this.pipelineSupportECN = false; this.socketFactory = NetUtils.getDefaultSocketFactory(conf); + this.dnConf = new DNConf(this); initOOBTimeout(); storageLocationChecker = null; volumeChecker = new DatasetVolumeChecker(conf, new Timer()); @@ -1363,7 +1365,8 @@ public class DataNode extends ReconfigurableBase initIpcServer(); metrics = DataNodeMetrics.create(getConf(), getDisplayName()); - peerMetrics = DataNodePeerMetrics.create(getConf(), getDisplayName()); + peerMetrics = dnConf.peerStatsEnabled ? + DataNodePeerMetrics.create(getConf(), getDisplayName()) : null; metrics.getJvmMetrics().setPauseMonitor(pauseMonitor); ecWorker = new ErasureCodingWorker(getConf(), this); @@ -3456,6 +3459,7 @@ public class DataNode extends ReconfigurableBase @Override // DataNodeMXBean public String getSendPacketDownstreamAvgInfo() { - return peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + return peerMetrics != null ? + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson() : null; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java index abcaa4a857..f838fd913e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DataXceiver.java @@ -341,7 +341,9 @@ class DataXceiver extends Receiver implements Runnable { * the thread dies away. */ private void collectThreadLocalStates() { - datanode.getPeerMetrics().collectThreadLocalStates(); + if (datanode.getPeerMetrics() != null) { + datanode.getPeerMetrics().collectThreadLocalStates(); + } } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java index 9344d1b5a5..5241c78c01 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodePeerMetrics.java @@ -18,40 +18,59 @@ package org.apache.hadoop.hdfs.server.datanode.metrics; -import java.util.concurrent.ThreadLocalRandom; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.metrics2.MetricsJsonBuilder; import org.apache.hadoop.metrics2.lib.RollingAverages; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; /** * This class maintains DataNode peer metrics (e.g. numOps, AvgTime, etc.) for * various peer operations. */ @InterfaceAudience.Private +@InterfaceStability.Unstable public class DataNodePeerMetrics { - static final Log LOG = LogFactory.getLog(DataNodePeerMetrics.class); + public static final Logger LOG = LoggerFactory.getLogger( + DataNodePeerMetrics.class); private final RollingAverages sendPacketDownstreamRollingAvgerages; private final String name; - private final boolean peerStatsEnabled; + + /** + * Threshold in milliseconds below which a DataNode is definitely not slow. + */ + private static final long LOW_THRESHOLD_MS = 5; + + private final SlowNodeDetector slowNodeDetector; + + /** + * Minimum number of packet send samples which are required to qualify + * for outlier detection. If the number of samples is below this then + * outlier detection is skipped. + */ + @VisibleForTesting + static final long MIN_OUTLIER_DETECTION_SAMPLES = 1000; public DataNodePeerMetrics( final String name, - final int windowSize, - final int numWindows, - final boolean peerStatsEnabled) { + final long windowSizeMs, + final int numWindows) { this.name = name; - this.peerStatsEnabled = peerStatsEnabled; + this.slowNodeDetector = new SlowNodeDetector(LOW_THRESHOLD_MS); sendPacketDownstreamRollingAvgerages = new RollingAverages( - windowSize, - numWindows); + windowSizeMs, numWindows); } public String name() { @@ -66,21 +85,18 @@ public class DataNodePeerMetrics { ? "UndefinedDataNodeName" + ThreadLocalRandom.current().nextInt() : dnName.replace(':', '-')); - final int windowSize = conf.getInt( - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_DEFAULT); + final long windowSizeMs = conf.getTimeDuration( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_DEFAULT, + TimeUnit.MILLISECONDS); final int numWindows = conf.getInt( - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, - DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_DEFAULT); - final boolean peerStatsEnabled = conf.getBoolean( - DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, - DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_DEFAULT); + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY, + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_DEFAULT); return new DataNodePeerMetrics( name, - windowSize, - numWindows, - peerStatsEnabled); + windowSizeMs, + numWindows); } /** @@ -94,9 +110,7 @@ public class DataNodePeerMetrics { public void addSendPacketDownstream( final String peerAddr, final long elapsedMs) { - if (peerStatsEnabled) { - sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs); - } + sendPacketDownstreamRollingAvgerages.add(peerAddr, elapsedMs); } /** @@ -114,4 +128,19 @@ public class DataNodePeerMetrics { public void collectThreadLocalStates() { sendPacketDownstreamRollingAvgerages.collectThreadLocalStates(); } + + /** + * Retrieve the set of dataNodes that look significantly slower + * than their peers. + */ + public Map getOutliers() { + // This maps the metric name to the aggregate latency. + // The metric name is the datanode ID. + final Map stats = + sendPacketDownstreamRollingAvgerages.getStats( + MIN_OUTLIER_DETECTION_SAMPLES); + LOG.trace("DataNodePeerMetrics: Got stats: {}", stats); + + return slowNodeDetector.getOutliers(stats); + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java new file mode 100644 index 0000000000..b6278cee65 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/SlowNodeDetector.java @@ -0,0 +1,194 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + + +/** + * A utility class to help detect nodes whose aggregate latency + * is an outlier within a given set. + * + * We use the median absolute deviation for outlier detection as + * described in the following publication: + * + * Leys, C., et al., Detecting outliers: Do not use standard deviation + * around the mean, use absolute deviation around the median. + * http://dx.doi.org/10.1016/j.jesp.2013.03.013 + * + * We augment the above scheme with the following heuristics to be even + * more conservative: + * + * 1. Skip outlier detection if the sample size is too small. + * 2. Never flag nodes whose aggregate latency is below a low threshold. + * 3. Never flag nodes whose aggregate latency is less than a small + * multiple of the median. + */ +@InterfaceAudience.Private +@InterfaceStability.Unstable +public class SlowNodeDetector { + public static final Logger LOG = + LoggerFactory.getLogger(SlowNodeDetector.class); + + /** + * Minimum number of peers to run outlier detection. + */ + private static long minOutlierDetectionPeers = 10; + + /** + * The multiplier is from Leys, C. et al. + */ + private static final double MAD_MULTIPLIER = (double) 1.4826; + + /** + * Threshold in milliseconds below which a DataNode is definitely not slow. + */ + private final long lowThresholdMs; + + /** + * Deviation multiplier. A sample is considered to be an outlier if it + * exceeds the median by (multiplier * median abs. deviation). 3 is a + * conservative choice. + */ + private static final int DEVIATION_MULTIPLIER = 3; + + /** + * If most of the samples are clustered together, the MAD can be + * low. The median multiplier introduces another safeguard to avoid + * overaggressive outlier detection. + */ + @VisibleForTesting + static final int MEDIAN_MULTIPLIER = 3; + + public SlowNodeDetector(long lowThresholdMs) { + this.lowThresholdMs = lowThresholdMs; + } + + /** + * Return a set of DataNodes whose latency is much higher than + * their peers. The input is a map of (node -> aggregate latency) + * entries. + * + * The aggregate may be an arithmetic mean or a percentile e.g. + * 90th percentile. Percentiles are a better choice than median + * since latency is usually not a normal distribution. + * + * This method allocates temporary memory O(n) and + * has run time O(n.log(n)), where n = stats.size(). + * + * @return + */ + public Map getOutliers(Map stats) { + if (stats.size() < minOutlierDetectionPeers) { + LOG.debug("Skipping statistical outlier detection as we don't have " + + "latency data for enough peers. Have {}, need at least {}", + stats.size(), minOutlierDetectionPeers); + return ImmutableMap.of(); + } + // Compute the median absolute deviation of the aggregates. + final List sorted = new ArrayList<>(stats.values()); + Collections.sort(sorted); + final Double median = computeMedian(sorted); + final Double mad = computeMad(sorted); + Double upperLimitLatency = Math.max( + lowThresholdMs, median * MEDIAN_MULTIPLIER); + upperLimitLatency = Math.max( + upperLimitLatency, median + (DEVIATION_MULTIPLIER * mad)); + + final Map slowNodes = new HashMap<>(); + + LOG.trace("getOutliers: List={}, MedianLatency={}, " + + "MedianAbsoluteDeviation={}, upperLimitLatency={}", + sorted, median, mad, upperLimitLatency); + + // Find nodes whose latency exceeds the threshold. + for (Map.Entry entry : stats.entrySet()) { + if (entry.getValue() > upperLimitLatency) { + slowNodes.put(entry.getKey(), entry.getValue()); + } + } + + return slowNodes; + } + + /** + * Compute the Median Absolute Deviation of a sorted list. + */ + public static Double computeMad(List sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the Median Absolute Deviation " + + "of an empty list."); + } + + // First get the median of the values. + Double median = computeMedian(sortedValues); + List deviations = new ArrayList<>(sortedValues); + + // Then update the list to store deviation from the median. + for (int i = 0; i < sortedValues.size(); ++i) { + deviations.set(i, Math.abs(sortedValues.get(i) - median)); + } + + // Finally get the median absolute deviation. + Collections.sort(deviations); + return computeMedian(deviations) * MAD_MULTIPLIER; + } + + /** + * Compute the median of a sorted list. + */ + public static Double computeMedian(List sortedValues) { + if (sortedValues.size() == 0) { + throw new IllegalArgumentException( + "Cannot compute the median of an empty list."); + } + + Double median = sortedValues.get(sortedValues.size() / 2); + if (sortedValues.size() % 2 == 0) { + median += sortedValues.get((sortedValues.size() / 2) - 1); + median /= 2; + } + return median; + } + + /** + * This method *must not* be used outside of unit tests. + */ + @VisibleForTesting + static void setMinOutlierDetectionPeers(long minOutlierDetectionPeers) { + SlowNodeDetector.minOutlierDetectionPeers = minOutlierDetectionPeers; + } + + @VisibleForTesting + static long getMinOutlierDetectionPeers() { + return minOutlierDetectionPeers; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index 6ec0ee9761..38a326c7dd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -129,6 +129,7 @@ import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantReadWriteLock; +import javax.annotation.Nonnull; import javax.management.NotCompliantMBeanException; import javax.management.ObjectName; import javax.management.StandardMBean; @@ -255,6 +256,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -3639,7 +3641,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { readLock(); try { //get datanode commands @@ -3647,7 +3650,8 @@ public class FSNamesystem implements Namesystem, FSNamesystemMBean, - xmitsInProgress; DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed, - xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); + xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary, + slowPeers); long blockReportLeaseId = 0; if (requestFullBlockReportLease) { blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java index f6c724b7cb..df5ee0f819 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNode.java @@ -1822,6 +1822,12 @@ public class NameNode extends ReconfigurableBase implements return getNamesystem().getBytesInFuture(); } + @Override + public String getSlowPeersReport() { + return namesystem.getBlockManager().getDatanodeManager() + .getSlowPeersReport(); + } + /** * Shutdown the NN immediately in an ungraceful way. Used when it would be * unsafe for the NN to continue operating, e.g. during a failed HA state diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4a1e8dd471..f9cfa42f3a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -155,6 +155,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NodeRegistration; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -205,6 +206,8 @@ import org.slf4j.Logger; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; +import javax.annotation.Nonnull; + /** * This class is responsible for handling all of the RPC calls to the NameNode. * It is created, started, and stopped by {@link NameNode}. @@ -1418,12 +1421,14 @@ public class NameNodeRpcServer implements NamenodeProtocols { StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) throws IOException { + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, - failedVolumes, volumeFailureSummary, requestFullBlockReportLease); + failedVolumes, volumeFailureSummary, requestFullBlockReportLease, + slowPeers); } @Override // DatanodeProtocol diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java index 7b373723ed..f46b9ae927 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeStatusMXBean.java @@ -69,4 +69,10 @@ public interface NameNodeStatusMXBean { * @return number of bytes that can be deleted if exited from safe mode. */ long getBytesWithFutureGenerationStamps(); + + /** + * Retrieves information about slow DataNodes, if the feature is + * enabled. The report is in a JSON format. + */ + String getSlowPeersReport(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index 8c4359f756..d738e79e49 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -29,6 +29,8 @@ import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.io.retry.Idempotent; import org.apache.hadoop.security.KerberosInfo; +import javax.annotation.Nonnull; + /********************************************************************** * Protocol that a DFS datanode uses to communicate with the NameNode. * It's used to upload current load information and block reports. @@ -105,6 +107,9 @@ public interface DatanodeProtocol { * @param volumeFailureSummary info about volume failures * @param requestFullBlockReportLease whether to request a full block * report lease. + * @param slowPeers Details of peer DataNodes that were detected as being + * slow to respond to packet writes. Empty report if no + * slow peers were detected by the DataNode. * @throws IOException on error */ @Idempotent @@ -116,7 +121,8 @@ public interface DatanodeProtocol { int xceiverCount, int failedVolumes, VolumeFailureSummary volumeFailureSummary, - boolean requestFullBlockReportLease) + boolean requestFullBlockReportLease, + @Nonnull SlowPeerReports slowPeers) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 7423b330cf..3b25a43581 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -195,6 +195,7 @@ message VolumeFailureSummaryProto { * cacheCapacity - total cache capacity available at the datanode * cacheUsed - amount of cache used * volumeFailureSummary - info about volume failures + * slowPeers - info about peer DataNodes that are suspected to be slow. */ message HeartbeatRequestProto { required DatanodeRegistrationProto registration = 1; // Datanode info @@ -206,6 +207,7 @@ message HeartbeatRequestProto { optional uint64 cacheUsed = 7 [default = 0 ]; optional VolumeFailureSummaryProto volumeFailureSummary = 8; optional bool requestFullBlockReportLease = 9 [ default = false ]; + repeated SlowPeerReportProto slowPeers = 10; } /** @@ -385,6 +387,24 @@ message CommitBlockSynchronizationRequestProto { message CommitBlockSynchronizationResponseProto { } +/** + * Information about a single slow peer that may be reported by + * the DataNode to the NameNode as part of the heartbeat request. + * The message includes the peer's DataNodeId and its + * aggregate packet latency as observed by the reporting DataNode. + * (DataNodeId must be transmitted as a string for protocol compability + * with earlier versions of Hadoop). + * + * The exact choice of the aggregate is opaque to the NameNode but it + * _should_ be chosen consistenly by all DataNodes in the cluster. + * Examples of aggregates are 90th percentile (good) and mean (not so + * good). + */ +message SlowPeerReportProto { + optional string dataNodeId = 1; + optional double aggregateLatency = 2; +} + /** * Protocol used from datanode to the namenode * See the request and response for details of rpc call. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 3389d84ca4..966cb2fec0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -1980,19 +1980,15 @@ - dfs.metrics.rolling.average.window.size - 3600 + dfs.datanode.slow.peers.report.interval + 30m - The number of seconds of each window for which sub set of samples are gathered - to compute the rolling average, A.K.A. roll over interval. - - + This setting controls how frequently DataNodes will report their peer + latencies to the NameNode via heartbeats. This setting supports + multiple time unit suffixes as described in dfs.heartbeat.interval. + If no suffix is specified then milliseconds is assumed. - - dfs.metrics.rolling.average.window.numbers - 48 - - The number of windows maintained to compute the rolling average. + It is ignored if dfs.datanode.peer.stats.enabled is false. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java index 7f5cf2dddd..ff0852809a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocolPB/TestPBHelper.java @@ -28,6 +28,7 @@ import java.util.Arrays; import java.util.Iterator; import java.util.List; +import com.google.common.collect.ImmutableMap; import org.apache.hadoop.fs.permission.AclEntry; import org.apache.hadoop.fs.permission.AclEntryScope; import org.apache.hadoop.fs.permission.AclEntryType; @@ -92,6 +93,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog; import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.io.Text; import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy; import org.apache.hadoop.security.proto.SecurityProtos.TokenProto; @@ -770,6 +772,26 @@ public class TestPBHelper { assertEquals(dnInfos0.getNonDfsUsed(), dnInfos3.getNonDfsUsed()); } + @Test + public void testSlowPeerInfoPBHelper() { + // Test with a map that has a few slow peer entries. + final SlowPeerReports slowPeers = SlowPeerReports.create( + ImmutableMap.of("peer1", 0.0, "peer2", 1.0, "peer3", 2.0)); + SlowPeerReports slowPeersConverted1 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(slowPeers)); + assertTrue( + "Expected map:" + slowPeers + ", got map:" + + slowPeersConverted1.getSlowPeers(), + slowPeersConverted1.equals(slowPeers)); + + // Test with an empty map. + SlowPeerReports slowPeersConverted2 = PBHelper.convertSlowPeerInfo( + PBHelper.convertSlowPeerInfo(SlowPeerReports.EMPTY_REPORT)); + assertTrue( + "Expected empty map:" + ", got map:" + slowPeersConverted2, + slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT)); + } + private void assertBlockECRecoveryInfoEquals( BlockECReconstructionInfo blkECRecoveryInfo1, BlockECReconstructionInfo blkECRecoveryInfo2) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java index ab607eaa8d..f12f6f59f8 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestHeartbeatHandling.java @@ -42,13 +42,23 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.Timeout; import org.mockito.Mockito; /** * Test if FSNamesystem handles heartbeat right */ public class TestHeartbeatHandling { + + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + /** * Test if * {@link FSNamesystem#handleHeartbeat} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index 070a76859a..a5c6e0d1ed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -21,6 +21,7 @@ package org.apache.hadoop.hdfs.server.blockmanagement; import com.google.common.base.Supplier; import java.util.ArrayList; import java.util.Collection; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -40,6 +41,7 @@ import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi.FsVolumeRef import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.test.GenericTestUtils; import org.apache.log4j.Level; @@ -112,7 +114,7 @@ public class TestNameNodePrunesMissingStorages { // Stop the DataNode and send fake heartbeat with missing storage. cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, - 0, null, true); + 0, null, true, SlowPeerReports.EMPTY_REPORT); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java new file mode 100644 index 0000000000..15eb3a511e --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestSlowPeerTracker.java @@ -0,0 +1,226 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.fasterxml.jackson.core.type.TypeReference; +import com.fasterxml.jackson.databind.ObjectMapper; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.HdfsConfiguration; +import org.apache.hadoop.hdfs.server.blockmanagement.SlowPeerTracker.ReportForJson; +import org.apache.hadoop.util.FakeTimer; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.util.Set; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * Tests for {@link SlowPeerTracker}. + */ +public class TestSlowPeerTracker { + public static final Logger LOG = LoggerFactory.getLogger( + TestSlowPeerTracker.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + private Configuration conf; + private SlowPeerTracker tracker; + private FakeTimer timer; + private long reportValidityMs; + + @Before + public void setup() { + conf = new HdfsConfiguration(); + timer = new FakeTimer(); + tracker = new SlowPeerTracker(conf, timer); + reportValidityMs = tracker.getReportValidityMs(); + } + + /** + * Edge case, there are no reports to retrieve. + */ + @Test + public void testEmptyReports() { + assertTrue(tracker.getReportsForAllDataNodes().isEmpty()); + assertTrue(tracker.getReportsForNode("noSuchNode").isEmpty()); + } + + @Test + public void testReportsAreRetrieved() { + tracker.addReport("node2", "node1"); + tracker.addReport("node3", "node1"); + tracker.addReport("node3", "node2"); + + assertThat(tracker.getReportsForAllDataNodes().size(), is(2)); + assertThat(tracker.getReportsForNode("node2").size(), is(1)); + assertThat(tracker.getReportsForNode("node3").size(), is(2)); + assertThat(tracker.getReportsForNode("node1").size(), is(0)); + } + + /** + * Test that when all reports are expired, we get back nothing. + */ + @Test + public void testAllReportsAreExpired() { + tracker.addReport("node2", "node1"); + tracker.addReport("node3", "node2"); + tracker.addReport("node1", "node3"); + + // No reports should expire after 1ms. + timer.advance(1); + assertThat(tracker.getReportsForAllDataNodes().size(), is(3)); + + // All reports should expire after REPORT_VALIDITY_MS. + timer.advance(reportValidityMs); + assertTrue(tracker.getReportsForAllDataNodes().isEmpty()); + assertTrue(tracker.getReportsForNode("node1").isEmpty()); + assertTrue(tracker.getReportsForNode("node2").isEmpty()); + assertTrue(tracker.getReportsForNode("node3").isEmpty()); + } + + /** + * Test the case when a subset of reports has expired. + * Ensure that we only get back non-expired reports. + */ + @Test + public void testSomeReportsAreExpired() { + tracker.addReport("node3", "node1"); + tracker.addReport("node3", "node2"); + timer.advance(reportValidityMs); + tracker.addReport("node3", "node4"); + assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); + assertThat(tracker.getReportsForNode("node3").size(), is(1)); + assertTrue(tracker.getReportsForNode("node3").contains("node4")); + } + + /** + * Test the case when an expired report is replaced by a valid one. + */ + @Test + public void testReplacement() { + tracker.addReport("node2", "node1"); + timer.advance(reportValidityMs); // Expire the report. + assertThat(tracker.getReportsForAllDataNodes().size(), is(0)); + + // This should replace the expired report with a newer valid one. + tracker.addReport("node2", "node1"); + assertThat(tracker.getReportsForAllDataNodes().size(), is(1)); + assertThat(tracker.getReportsForNode("node2").size(), is(1)); + } + + @Test + public void testGetJson() throws IOException { + tracker.addReport("node1", "node2"); + tracker.addReport("node2", "node3"); + tracker.addReport("node2", "node1"); + tracker.addReport("node4", "node1"); + + final Set reports = getAndDeserializeJson(); + + // And ensure its contents are what we expect. + assertThat(reports.size(), is(3)); + assertTrue(isNodeInReports(reports, "node1")); + assertTrue(isNodeInReports(reports, "node2")); + assertTrue(isNodeInReports(reports, "node4")); + + assertFalse(isNodeInReports(reports, "node3")); + } + + @Test + public void testGetJsonSizeIsLimited() throws IOException { + tracker.addReport("node1", "node2"); + tracker.addReport("node1", "node3"); + tracker.addReport("node2", "node3"); + tracker.addReport("node2", "node4"); + tracker.addReport("node3", "node4"); + tracker.addReport("node3", "node5"); + tracker.addReport("node4", "node6"); + tracker.addReport("node5", "node6"); + tracker.addReport("node5", "node7"); + tracker.addReport("node6", "node7"); + tracker.addReport("node6", "node8"); + + final Set reports = getAndDeserializeJson(); + + // Ensure that node4 is not in the list since it was + // tagged by just one peer and we already have 5 other nodes. + assertFalse(isNodeInReports(reports, "node4")); + + // Remaining nodes should be in the list. + assertTrue(isNodeInReports(reports, "node1")); + assertTrue(isNodeInReports(reports, "node2")); + assertTrue(isNodeInReports(reports, "node3")); + assertTrue(isNodeInReports(reports, "node5")); + assertTrue(isNodeInReports(reports, "node6")); + } + + @Test + public void testLowRankedElementsIgnored() throws IOException { + // Insert 5 nodes with 2 peer reports each. + for (int i = 0; i < 5; ++i) { + tracker.addReport("node" + i, "reporter1"); + tracker.addReport("node" + i, "reporter2"); + } + + // Insert 10 nodes with 1 peer report each. + for (int i = 10; i < 20; ++i) { + tracker.addReport("node" + i, "reporter1"); + } + + final Set reports = getAndDeserializeJson(); + + // Ensure that only the first 5 nodes with two reports each were + // included in the JSON. + for (int i = 0; i < 5; ++i) { + assertTrue(isNodeInReports(reports, "node" + i)); + } + } + + private boolean isNodeInReports( + Set reports, String node) { + for (ReportForJson report : reports) { + if (report.getSlowNode().equalsIgnoreCase(node)) { + return true; + } + } + return false; + } + + private Set getAndDeserializeJson() + throws IOException { + final String json = tracker.getJson(); + LOG.info("Got JSON: {}", json); + return (new ObjectMapper()).readValue( + json, new TypeReference>() {}); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java index de856e609f..cf43fd0fdd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/InternalDataNodeTestUtils.java @@ -38,6 +38,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.junit.Assert; @@ -136,7 +137,8 @@ public class InternalDataNodeTestUtils { Mockito.any(StorageReport[].class), Mockito.anyLong(), Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean())).thenReturn( + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class))).thenReturn( new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat( HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current() .nextLong() | 1L)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index b7b89667ef..c6b38eea7d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -56,6 +56,7 @@ import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -119,7 +120,7 @@ public class TestBPOfferService { Mockito.doReturn(conf).when(mockDn).getConf(); Mockito.doReturn(new DNConf(mockDn)).when(mockDn).getDnConf(); Mockito.doReturn(DataNodeMetrics.create(conf, "fake dn")) - .when(mockDn).getMetrics(); + .when(mockDn).getMetrics(); // Set up a simulated dataset with our fake BP mockFSDataset = Mockito.spy(new SimulatedFSDataset(null, conf)); @@ -152,7 +153,8 @@ public class TestBPOfferService { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean()); + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); datanodeCommands[nnIdx] = new DatanodeCommand[0]; return mock; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index 619eda0040..b64f1e29f3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.util.AutoCloseableLock; import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; @@ -217,7 +218,8 @@ public class TestBlockRecovery { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean())) + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class))) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index 76885e417b..6435d4d1a5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -20,10 +20,15 @@ package org.apache.hadoop.hdfs.server.datanode; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; +import java.util.Arrays; +import java.util.List; +import java.util.Random; + import static java.lang.Math.abs; import static org.hamcrest.core.Is.is; import static org.junit.Assert.assertFalse; @@ -31,11 +36,6 @@ import static org.junit.Assert.assertThat; import static org.junit.Assert.assertTrue; import static org.mockito.Mockito.doReturn; import static org.mockito.Mockito.spy; -import org.apache.hadoop.hdfs.server.datanode.BPServiceActor.Scheduler; - -import java.util.Arrays; -import java.util.List; -import java.util.Random; /** @@ -51,6 +51,7 @@ public class TestBpServiceActorScheduler { private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS; private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds + private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds private final Random random = new Random(System.nanoTime()); @Test @@ -180,13 +181,28 @@ public class TestBpServiceActorScheduler { } } + @Test + public void testSlowPeerReportScheduling() { + for (final long now : getTimestamps()) { + Scheduler scheduler = makeMockScheduler(now); + assertTrue(scheduler.isSlowPeersReportDue(now)); + scheduler.scheduleNextSlowPeerReport(); + assertFalse(scheduler.isSlowPeersReportDue(now)); + assertFalse(scheduler.isSlowPeersReportDue(now + 1)); + assertTrue(scheduler.isSlowPeersReportDue( + now + SLOW_PEER_REPORT_INTERVAL_MS)); + } + } + private Scheduler makeMockScheduler(long now) { LOG.info("Using now = " + now); - Scheduler mockScheduler = spy(new Scheduler(HEARTBEAT_INTERVAL_MS, - LIFELINE_INTERVAL_MS, BLOCK_REPORT_INTERVAL_MS)); + Scheduler mockScheduler = spy(new Scheduler( + HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS, + BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS)); doReturn(now).when(mockScheduler).monotonicNow(); mockScheduler.nextBlockReportTime = now; mockScheduler.nextHeartbeatTime = now; + mockScheduler.nextSlowPeersReportTime = now; return mockScheduler; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java index df2fe5aa75..8a9f0b8da1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeLifeline.java @@ -50,6 +50,7 @@ import org.apache.hadoop.hdfs.server.datanode.metrics.DataNodeMetrics; import org.apache.hadoop.hdfs.server.namenode.FSNamesystem; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.test.GenericTestUtils; @@ -167,7 +168,8 @@ public class TestDataNodeLifeline { anyInt(), anyInt(), any(VolumeFailureSummary.class), - anyBoolean()); + anyBoolean(), + any(SlowPeerReports.class)); // Intercept lifeline to trigger latch count-down on each call. doAnswer(new LatchCountingAnswer(lifelinesSent)) @@ -230,7 +232,8 @@ public class TestDataNodeLifeline { anyInt(), anyInt(), any(VolumeFailureSummary.class), - anyBoolean()); + anyBoolean(), + any(SlowPeerReports.class)); // While waiting on the latch for the expected number of heartbeat messages, // poll DataNode tracking information. We expect that the DataNode always diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java index 5af54a4cbb..b18ff2a2a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodePeerMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.hdfs.server.datanode; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import org.apache.hadoop.hdfs.DFSConfigKeys; import org.apache.hadoop.hdfs.HdfsConfiguration; @@ -41,9 +42,10 @@ public class TestDataNodePeerMetrics { final int numOpsPerIteration = 1000; final Configuration conf = new HdfsConfiguration(); - conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_SIZE_KEY, - windowSize); - conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_NUMBERS_KEY, + conf.setTimeDuration( + DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY, + windowSize, TimeUnit.SECONDS); + conf.setInt(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY, numWindows); conf.setBoolean(DFSConfigKeys.DFS_DATANODE_PEER_STATS_ENABLED_KEY, true); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index d447a76f0d..c94f74ecc5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -49,6 +49,7 @@ import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; @@ -218,7 +219,8 @@ public class TestDatanodeProtocolRetryPolicy { Mockito.anyInt(), Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class), - Mockito.anyBoolean()); + Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); dn = new DataNode(conf, locations, null, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 6557055f78..eb015c0357 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -67,6 +67,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeProtocol; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NNHAStatusHeartbeat; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.apache.hadoop.io.IOUtils; @@ -172,7 +173,7 @@ public class TestFsDatasetCache { (DatanodeRegistration) any(), (StorageReport[]) any(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), - anyBoolean()); + anyBoolean(), any(SlowPeerReports.class)); } private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index d8418d46b6..2b793e9caa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -31,6 +31,7 @@ import org.apache.hadoop.hdfs.protocolPB.DatanodeProtocolClientSideTranslatorPB; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageReport; import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary; import org.junit.After; @@ -106,7 +107,8 @@ public class TestStorageReport { any(DatanodeRegistration.class), captor.capture(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), - Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean()); + Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(), + Mockito.any(SlowPeerReports.class)); StorageReport[] reports = captor.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java new file mode 100644 index 0000000000..34e15e546b --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestDataNodeOutlierDetectionViaMetrics.java @@ -0,0 +1,142 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.Map; +import java.util.Random; + +import static org.hamcrest.core.Is.is; +import static org.junit.Assert.assertThat; +import static org.junit.Assert.assertTrue; + + +/** + * Test that the {@link DataNodePeerMetrics} class is able to detect + * outliers i.e. slow nodes via the metrics it maintains. + */ +public class TestDataNodeOutlierDetectionViaMetrics { + public static final Logger LOG = + LoggerFactory.getLogger(TestDataNodeOutlierDetectionViaMetrics.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + // A few constants to keep the test run time short. + private static final int WINDOW_INTERVAL_SECONDS = 3; + private static final int ROLLING_AVERAGE_WINDOWS = 10; + private static final int SLOW_NODE_LATENCY_MS = 20_000; + private static final int FAST_NODE_MAX_LATENCY_MS = 5; + + private Random random = new Random(System.currentTimeMillis()); + + @Before + public void setup() { + GenericTestUtils.setLogLevel(DataNodePeerMetrics.LOG, Level.ALL); + GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + } + + /** + * Test that a very slow peer is detected as an outlier. + */ + @Test + public void testOutlierIsDetected() throws Exception { + final String slowNodeName = "SlowNode"; + + DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics( + "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS, + ROLLING_AVERAGE_WINDOWS); + + injectFastNodesSamples(peerMetrics); + injectSlowNodeSamples(peerMetrics, slowNodeName); + + // Trigger a snapshot. + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + + final Map outliers = peerMetrics.getOutliers(); + LOG.info("Got back outlier nodes: {}", outliers); + assertThat(outliers.size(), is(1)); + assertTrue(outliers.containsKey(slowNodeName)); + } + + /** + * Test that when there are no outliers, we get back nothing. + */ + @Test + public void testWithNoOutliers() throws Exception { + DataNodePeerMetrics peerMetrics = new DataNodePeerMetrics( + "PeerMetrics-For-Test", WINDOW_INTERVAL_SECONDS, + ROLLING_AVERAGE_WINDOWS); + + injectFastNodesSamples(peerMetrics); + + // Trigger a snapshot. + peerMetrics.dumpSendPacketDownstreamAvgInfoAsJson(); + + // Ensure that we get back the outlier. + assertTrue(peerMetrics.getOutliers().isEmpty()); + } + + /** + * Inject fake stats for MIN_OUTLIER_DETECTION_PEERS fast nodes. + * + * @param peerMetrics + */ + public void injectFastNodesSamples(DataNodePeerMetrics peerMetrics) { + for (int nodeIndex = 0; + nodeIndex < SlowNodeDetector.getMinOutlierDetectionPeers(); + ++nodeIndex) { + final String nodeName = "FastNode-" + nodeIndex; + LOG.info("Generating stats for node {}", nodeName); + for (int i = 0; + i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES; + ++i) { + peerMetrics.addSendPacketDownstream( + nodeName, random.nextInt(FAST_NODE_MAX_LATENCY_MS)); + } + } + } + + /** + * Inject fake stats for one extremely slow node. + */ + public void injectSlowNodeSamples( + DataNodePeerMetrics peerMetrics, String slowNodeName) + throws InterruptedException { + + // And the one slow node. + for (int i = 0; + i < 2 * DataNodePeerMetrics.MIN_OUTLIER_DETECTION_SAMPLES; + ++i) { + peerMetrics.addSendPacketDownstream( + slowNodeName, SLOW_NODE_LATENCY_MS); + } + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java new file mode 100644 index 0000000000..7b368c4701 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/metrics/TestSlowNodeDetector.java @@ -0,0 +1,335 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.datanode.metrics; + +import com.google.common.collect.ImmutableList; +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Before; +import org.junit.Rule; +import org.junit.Test; +import org.junit.rules.Timeout; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import static org.junit.Assert.assertTrue; + +/** + * Unit tests for {@link SlowNodeDetector}. + */ +public class TestSlowNodeDetector { + public static final Logger LOG = + LoggerFactory.getLogger(TestSlowNodeDetector.class); + + /** + * Set a timeout for every test case. + */ + @Rule + public Timeout testTimeout = new Timeout(300_000); + + private final static double LOW_THRESHOLD = 1000; + private final static long MIN_OUTLIER_DETECTION_PEERS = 3; + + // Randomly generated test cases for median and MAD. The first entry + // in each pair is the expected median and the second entry is the + // expected Median Absolute Deviation. The small sets of size 1 and 2 + // exist to test the edge cases however in practice the MAD of a very + // small set is not useful. + private Map, Pair> medianTestMatrix = + new ImmutableMap.Builder, Pair>() + // Single element. + .put(new ImmutableList.Builder() + .add(9.6502431302).build(), + Pair.of(9.6502431302, 0.0)) + + // Two elements. + .put(new ImmutableList.Builder() + .add(1.72168104625) + .add(11.7872544459).build(), + Pair.of(6.75446774606, 7.4616095611)) + + // The Remaining lists were randomly generated with sizes 3-10. + .put(new ImmutableList.Builder() + .add(76.2635686249) + .add(27.0652018553) + .add(1.3868476443) + .add(49.7194624164) + .add(47.385680883) + .add(57.8721199173).build(), + Pair.of(48.5525716497, 22.837202532)) + + .put(new ImmutableList.Builder() + .add(86.0573389581) + .add(93.2399572424) + .add(64.9545429122) + .add(35.8509730085) + .add(1.6534313654).build(), + Pair.of(64.9545429122, 41.9360180373)) + + .put(new ImmutableList.Builder() + .add(5.00127007366) + .add(37.9790589127) + .add(67.5784746266).build(), + Pair.of(37.9790589127, 43.8841594039)) + + .put(new ImmutableList.Builder() + .add(1.43442932944) + .add(70.6769829947) + .add(37.47579656) + .add(51.1126141394) + .add(72.2465914419) + .add(32.2930549225) + .add(39.677459781).build(), + Pair.of(39.677459781, 16.9537852208)) + + .put(new ImmutableList.Builder() + .add(26.7913745214) + .add(68.9833706658) + .add(29.3882180746) + .add(68.3455244453) + .add(74.9277265022) + .add(12.1469972942) + .add(72.5395402683) + .add(7.87917492506) + .add(33.3253447774) + .add(72.2753759125).build(), + Pair.of(50.8354346113, 31.9881230079)) + + .put(new ImmutableList.Builder() + .add(38.6482290705) + .add(88.0690746319) + .add(50.6673611649) + .add(64.5329814115) + .add(25.2580979294) + .add(59.6709630711) + .add(71.5406993741) + .add(81.3073035091) + .add(20.5549547284).build(), + Pair.of(59.6709630711, 31.1683520683)) + + .put(new ImmutableList.Builder() + .add(87.352734249) + .add(65.4760359094) + .add(28.9206803169) + .add(36.5908574008) + .add(87.7407653175) + .add(99.3704511335) + .add(41.3227434076) + .add(46.2713494909) + .add(3.49940920921).build(), + Pair.of(46.2713494909, 28.4729106898)) + + .put(new ImmutableList.Builder() + .add(95.3251533286) + .add(27.2777870437) + .add(43.73477168).build(), + Pair.of(43.73477168, 24.3991619317)) + + .build(); + + // A test matrix that maps inputs to the expected output list of + // slow nodes i.e. outliers. + private Map, Set> outlierTestMatrix = + new ImmutableMap.Builder, Set>() + // The number of samples is too low and all samples are below + // the low threshold. Nothing should be returned. + .put(ImmutableMap.of( + "n1", 0.0, + "n2", LOW_THRESHOLD + 1), + ImmutableSet.of()) + + // A statistical outlier below the low threshold must not be + // returned. + .put(ImmutableMap.of( + "n1", 1.0, + "n2", 1.0, + "n3", LOW_THRESHOLD - 1), + ImmutableSet.of()) + + // A statistical outlier above the low threshold must be returned. + .put(ImmutableMap.of( + "n1", 1.0, + "n2", 1.0, + "n3", LOW_THRESHOLD + 1), + ImmutableSet.of("n3")) + + // A statistical outlier must not be returned if it is within a + // MEDIAN_MULTIPLIER multiple of the median. + .put(ImmutableMap.of( + "n1", LOW_THRESHOLD + 0.1, + "n2", LOW_THRESHOLD + 0.1, + "n3", LOW_THRESHOLD * SlowNodeDetector.MEDIAN_MULTIPLIER - 0.1), + ImmutableSet.of()) + + // A statistical outlier must be returned if it is outside a + // MEDIAN_MULTIPLIER multiple of the median. + .put(ImmutableMap.of( + "n1", LOW_THRESHOLD + 0.1, + "n2", LOW_THRESHOLD + 0.1, + "n3", (LOW_THRESHOLD + 0.1) * + SlowNodeDetector.MEDIAN_MULTIPLIER + 0.1), + ImmutableSet.of("n3")) + + // Only the statistical outliers n3 and n11 should be returned. + .put(new ImmutableMap.Builder() + .put("n1", 1029.4322) + .put("n2", 2647.876) + .put("n3", 9194.312) + .put("n4", 2.2) + .put("n5", 2012.92) + .put("n6", 1843.81) + .put("n7", 1201.43) + .put("n8", 6712.01) + .put("n9", 3278.554) + .put("n10", 2091.765) + .put("n11", 9194.77).build(), + ImmutableSet.of("n3", "n11")) + + // The following input set has multiple outliers. + // - The low outliers (n4, n6) should not be returned. + // - High outlier n2 is within 3 multiples of the median + // and so it should not be returned. + // - Only the high outlier n8 should be returned. + .put(new ImmutableMap.Builder() + .put("n1", 5002.0) + .put("n2", 9001.0) + .put("n3", 5004.0) + .put("n4", 1001.0) + .put("n5", 5003.0) + .put("n6", 2001.0) + .put("n7", 5000.0) + .put("n8", 101002.0) + .put("n9", 5001.0) + .put("n10", 5002.0) + .put("n11", 5105.0) + .put("n12", 5006.0).build(), + ImmutableSet.of("n8")) + + .build(); + + + private SlowNodeDetector slowNodeDetector; + + @Before + public void setup() { + slowNodeDetector = new SlowNodeDetector((long) LOW_THRESHOLD); + SlowNodeDetector.setMinOutlierDetectionPeers(MIN_OUTLIER_DETECTION_PEERS); + GenericTestUtils.setLogLevel(SlowNodeDetector.LOG, Level.ALL); + } + + @Test + public void testOutliersFromTestMatrix() { + for (Map.Entry, Set> entry : + outlierTestMatrix.entrySet()) { + + LOG.info("Verifying set {}", entry.getKey()); + final Set outliers = + slowNodeDetector.getOutliers(entry.getKey()).keySet(); + assertTrue( + "Running outlier detection on " + entry.getKey() + + " was expected to yield set " + entry.getValue() + ", but " + + " we got set " + outliers, + outliers.equals(entry.getValue())); + } + } + + /** + * Unit test for {@link SlowNodeDetector#computeMedian(List)}. + */ + @Test + public void testMediansFromTestMatrix() { + for (Map.Entry, Pair> entry : + medianTestMatrix.entrySet()) { + final List inputList = new ArrayList<>(entry.getKey()); + Collections.sort(inputList); + final Double median = SlowNodeDetector.computeMedian(inputList); + final Double expectedMedian = entry.getValue().getLeft(); + + // Ensure that the median is within 0.001% of expected. + // We need some fudge factor for floating point comparison. + final Double errorPercent = + Math.abs(median - expectedMedian) * 100.0 / expectedMedian; + + assertTrue( + "Set " + inputList + "; Expected median: " + + expectedMedian + ", got: " + median, + errorPercent < 0.001); + } + } + + /** + * Unit test for {@link SlowNodeDetector#computeMad(List)}. + */ + @Test + public void testMadsFromTestMatrix() { + for (Map.Entry, Pair> entry : + medianTestMatrix.entrySet()) { + final List inputList = new ArrayList<>(entry.getKey()); + Collections.sort(inputList); + final Double mad = SlowNodeDetector.computeMad(inputList); + final Double expectedMad = entry.getValue().getRight(); + + // Ensure that the MAD is within 0.001% of expected. + // We need some fudge factor for floating point comparison. + if (entry.getKey().size() > 1) { + final Double errorPercent = + Math.abs(mad - expectedMad) * 100.0 / expectedMad; + + assertTrue( + "Set " + entry.getKey() + "; Expected M.A.D.: " + + expectedMad + ", got: " + mad, + errorPercent < 0.001); + } else { + // For an input list of size 1, the MAD should be 0.0. + final Double epsilon = 0.000001; // Allow for some FP math error. + assertTrue( + "Set " + entry.getKey() + "; Expected M.A.D.: " + + expectedMad + ", got: " + mad, + mad < epsilon); + } + } + } + + /** + * Verify that {@link SlowNodeDetector#computeMedian(List)} throws when + * passed an empty list. + */ + @Test(expected=IllegalArgumentException.class) + public void testMedianOfEmptyList() { + SlowNodeDetector.computeMedian(Collections.emptyList()); + } + + /** + * Verify that {@link SlowNodeDetector#computeMad(List)} throws when + * passed an empty list. + */ + @Test(expected=IllegalArgumentException.class) + public void testMadOfEmptyList() { + SlowNodeDetector.computeMedian(Collections.emptyList()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index a3d0be5c74..b86b3fb515 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -64,6 +64,7 @@ import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocol; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -951,7 +952,8 @@ public class NNThroughputBenchmark implements Tool { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, - 0L, 0L, 0, 0, 0, null, true).getCommands(); + 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1000,7 +1002,8 @@ public class NNThroughputBenchmark implements Tool { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, - rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); + rep, 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index ed6c92aa7f..2b8faf46a9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -42,6 +42,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse; import org.apache.hadoop.hdfs.server.protocol.NamenodeCommand; import org.apache.hadoop.hdfs.server.protocol.NamenodeRegistration; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.AccessControlException; @@ -122,7 +123,8 @@ public class NameNodeAdapter { DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), - dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true); + dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 5c2d29172e..b9161c3438 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -46,6 +46,7 @@ import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo; import org.apache.hadoop.hdfs.server.protocol.RegisterCommand; +import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports; import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport; import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks; import org.apache.hadoop.hdfs.server.protocol.StorageReport; @@ -132,7 +133,8 @@ public class TestDeadDatanode { new DatanodeStorage(reg.getDatanodeUuid()), false, 0, 0, 0, 0, 0) }; DatanodeCommand[] cmd = - dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); + dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true, + SlowPeerReports.EMPTY_REPORT).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java index 6b0dced2bf..cdce3428e9 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/tools/TestHdfsConfigFields.java @@ -66,6 +66,10 @@ public class TestHdfsConfigFields extends TestConfigurationFieldsBase { // Purposely hidden, based on comments in DFSConfigKeys configurationPropsToSkipCompare .add(DFSConfigKeys.DFS_DATANODE_XCEIVER_STOP_TIMEOUT_MILLIS_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGES_WINDOW_LENGTH_KEY); + configurationPropsToSkipCompare + .add(DFSConfigKeys.DFS_METRICS_ROLLING_AVERAGE_NUM_WINDOWS_KEY); // Fully deprecated properties? configurationPropsToSkipCompare