HDFS-11545. Propagate DataNode's slow disks info to the NameNode via Heartbeats. Contributed by Hanisha Koneru.

This commit is contained in:
Arpit Agarwal 2017-03-20 21:54:58 -07:00
parent b104f3a282
commit e7c8da614c
26 changed files with 339 additions and 65 deletions

View File

@ -0,0 +1,126 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.protocol;
import com.google.common.collect.ImmutableMap;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import javax.annotation.Nonnull;
import javax.annotation.Nullable;
import java.util.Map;
/**
* A class that allows a DataNode to communicate information about all
* its disks that appear to be slow.
*
* The wire representation of this structure is a list of
* SlowDiskReportProto messages.
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public final class SlowDiskReports {
/**
* A map from the DataNode Disk's BasePath to its mean metadata op latency,
* mean read io latency and mean write io latency.
*
* The NameNode must not attempt to interpret the mean latencies
* beyond exposing them as a diagnostic. e.g. metrics. Also, comparing
* latencies across reports from different DataNodes may not be not
* meaningful and must be avoided.
*/
@Nonnull
private final Map<String, Map<DiskOp, Double>> slowDisks;
/**
* An object representing a SlowPeerReports with no entries. Should
* be used instead of null or creating new objects when there are
* no slow peers to report.
*/
public static final SlowDiskReports EMPTY_REPORT =
new SlowDiskReports(ImmutableMap.of());
private SlowDiskReports(Map<String, Map<DiskOp, Double>> slowDisks) {
this.slowDisks = slowDisks;
}
public static SlowDiskReports create(
@Nullable Map<String, Map<DiskOp, Double>> slowDisks) {
if (slowDisks == null || slowDisks.isEmpty()) {
return EMPTY_REPORT;
}
return new SlowDiskReports(slowDisks);
}
public Map<String, Map<DiskOp, Double>> getSlowDisks() {
return slowDisks;
}
public boolean haveSlowDisks() {
return slowDisks.size() > 0;
}
/**
* Return true if the two objects represent the same set slow disk
* entries. Primarily for unit testing convenience.
*/
@Override
public boolean equals(Object o) {
if (this == o) {
return true;
}
if (!(o instanceof SlowDiskReports)) {
return false;
}
SlowDiskReports that = (SlowDiskReports) o;
if (this.slowDisks.size() != that.slowDisks.size()) {
return false;
}
if (!this.slowDisks.keySet().containsAll(that.slowDisks.keySet()) ||
!that.slowDisks.keySet().containsAll(this.slowDisks.keySet())) {
return false;
}
boolean areEqual;
for (String disk : this.slowDisks.keySet()) {
if (!this.slowDisks.get(disk).equals(that.slowDisks.get(disk))) {
return false;
}
}
return true;
}
@Override
public int hashCode() {
return slowDisks.hashCode();
}
/**
* Lists the types of operations on which disk latencies are measured.
*/
public enum DiskOp {
METADATA,
READ,
WRITE
}
}

View File

@ -55,6 +55,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo.Capability;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -136,7 +137,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
@ -156,6 +158,9 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
if (slowPeers.haveSlowPeers()) {
builder.addAllSlowPeers(PBHelper.convertSlowPeerInfo(slowPeers));
}
if (slowDisks.haveSlowDisks()) {
builder.addAllSlowDisks(PBHelper.convertSlowDiskInfo(slowDisks));
}
HeartbeatResponseProto resp;
try {
resp = rpcProxy.sendHeartbeat(NULL_CONTROLLER, builder.build());

View File

@ -121,7 +121,8 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary, request.getRequestFullBlockReportLease(),
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()));
PBHelper.convertSlowPeerInfo(request.getSlowPeersList()),
PBHelper.convertSlowDiskInfo(request.getSlowDisksList()));
} catch (IOException e) {
throw new ServiceException(e);
}

View File

@ -47,6 +47,8 @@
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.KeyUpdateCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.ReceivedDeletedBlockInfoProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.RegisterCommandProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos
.SlowDiskReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.SlowPeerReportProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.VolumeFailureSummaryProto;
import org.apache.hadoop.hdfs.protocol.proto.DatanodeProtocolProtos.BlockReportContextProto;
@ -111,6 +113,7 @@
import org.apache.hadoop.hdfs.server.protocol.RegisterCommand;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLog;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -873,6 +876,71 @@ public static SlowPeerReports convertSlowPeerInfo(
return SlowPeerReports.create(slowPeersMap);
}
public static List<SlowDiskReportProto> convertSlowDiskInfo(
SlowDiskReports slowDisks) {
if (slowDisks.getSlowDisks().size() == 0) {
return Collections.emptyList();
}
List<SlowDiskReportProto> slowDiskInfoProtos =
new ArrayList<>(slowDisks.getSlowDisks().size());
for (Map.Entry<String, Map<SlowDiskReports.DiskOp, Double>> entry :
slowDisks.getSlowDisks().entrySet()) {
SlowDiskReportProto.Builder builder = SlowDiskReportProto.newBuilder();
builder.setBasePath(entry.getKey());
Map<SlowDiskReports.DiskOp, Double> value = entry.getValue();
if (value.get(SlowDiskReports.DiskOp.METADATA) != null) {
builder.setMeanMetadataOpLatency(value.get(
SlowDiskReports.DiskOp.METADATA));
}
if (value.get(SlowDiskReports.DiskOp.READ) != null) {
builder.setMeanReadIoLatency(value.get(
SlowDiskReports.DiskOp.READ));
}
if (value.get(SlowDiskReports.DiskOp.WRITE) != null) {
builder.setMeanWriteIoLatency(value.get(
SlowDiskReports.DiskOp.WRITE));
}
slowDiskInfoProtos.add(builder.build());
}
return slowDiskInfoProtos;
}
public static SlowDiskReports convertSlowDiskInfo(
List<SlowDiskReportProto> slowDiskProtos) {
// No slow disks, or possibly an older DataNode.
if (slowDiskProtos == null || slowDiskProtos.size() == 0) {
return SlowDiskReports.EMPTY_REPORT;
}
Map<String, Map<SlowDiskReports.DiskOp, Double>> slowDisksMap =
new HashMap<>(slowDiskProtos.size());
for (SlowDiskReportProto proto : slowDiskProtos) {
if (!proto.hasBasePath()) {
// The disk basePath should be reported.
continue;
}
Map<SlowDiskReports.DiskOp, Double> latencyMap = new HashMap<>();
if (proto.hasMeanMetadataOpLatency()) {
latencyMap.put(SlowDiskReports.DiskOp.METADATA,
proto.getMeanMetadataOpLatency());
}
if (proto.hasMeanReadIoLatency()) {
latencyMap.put(SlowDiskReports.DiskOp.READ,
proto.getMeanReadIoLatency());
}
if (proto.hasMeanWriteIoLatency()) {
latencyMap.put(SlowDiskReports.DiskOp.WRITE,
proto.getMeanWriteIoLatency());
}
slowDisksMap.put(proto.getBasePath(), latencyMap);
}
return SlowDiskReports.create(slowDisksMap);
}
public static JournalInfo convert(JournalInfoProto info) {
int lv = info.hasLayoutVersion() ? info.getLayoutVersion() : 0;
int nsID = info.hasNamespaceID() ? info.getNamespaceID() : 0;

View File

@ -1584,7 +1584,8 @@ public DatanodeCommand[] handleHeartbeat(DatanodeRegistration nodeReg,
long cacheCapacity, long cacheUsed, int xceiverCount,
int maxTransfers, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
@Nonnull SlowPeerReports slowPeers) throws IOException {
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
final DatanodeDescriptor nodeinfo;
try {
nodeinfo = getDatanode(nodeReg);

View File

@ -58,7 +58,7 @@ public class SlowPeerTracker {
/**
* Time duration after which a report is considered stale. This is
* set to DFS_DATANODE_SLOW_PEER_REPORT_INTERVAL_KEY * 3 i.e.
* set to DFS_DATANODE_OUTLIERS_REPORT_INTERVAL_KEY * 3 i.e.
* maintained for at least two successive reports.
*/
private final long reportValidityMs;

View File

@ -57,6 +57,7 @@
import org.apache.hadoop.hdfs.server.protocol.DisallowedDatanodeException;
import org.apache.hadoop.hdfs.server.protocol.HeartbeatResponse;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
@ -497,11 +498,15 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
.getVolumeFailureSummary();
int numFailedVolumes = volumeFailureSummary != null ?
volumeFailureSummary.getFailedStorageLocations().length : 0;
final boolean slowPeersReportDue = scheduler.isSlowPeersReportDue(now);
final boolean outliersReportDue = scheduler.isOutliersReportDue(now);
final SlowPeerReports slowPeers =
slowPeersReportDue && dn.getPeerMetrics() != null ?
outliersReportDue && dn.getPeerMetrics() != null ?
SlowPeerReports.create(dn.getPeerMetrics().getOutliers()) :
SlowPeerReports.EMPTY_REPORT;
final SlowDiskReports slowDisks =
outliersReportDue && dn.getDiskMetrics() != null ?
SlowDiskReports.create(dn.getDiskMetrics().getDiskOutliersStats()) :
SlowDiskReports.EMPTY_REPORT;
HeartbeatResponse response = bpNamenode.sendHeartbeat(bpRegistration,
reports,
dn.getFSDataset().getCacheCapacity(),
@ -511,11 +516,12 @@ HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
numFailedVolumes,
volumeFailureSummary,
requestBlockReportLease,
slowPeers);
slowPeers,
slowDisks);
if (slowPeersReportDue) {
if (outliersReportDue) {
// If the report was due and successfully sent, schedule the next one.
scheduler.scheduleNextSlowPeerReport();
scheduler.scheduleNextOutlierReport();
}
return response;
}
@ -1095,7 +1101,7 @@ static class Scheduler {
boolean resetBlockReportTime = true;
@VisibleForTesting
volatile long nextSlowPeersReportTime = monotonicNow();
volatile long nextOutliersReportTime = monotonicNow();
private final AtomicBoolean forceFullBlockReport =
new AtomicBoolean(false);
@ -1103,14 +1109,14 @@ static class Scheduler {
private final long heartbeatIntervalMs;
private final long lifelineIntervalMs;
private final long blockReportIntervalMs;
private final long slowPeersReportIntervalMs;
private final long outliersReportIntervalMs;
Scheduler(long heartbeatIntervalMs, long lifelineIntervalMs,
long blockReportIntervalMs, long slowPeersReportIntervalMs) {
long blockReportIntervalMs, long outliersReportIntervalMs) {
this.heartbeatIntervalMs = heartbeatIntervalMs;
this.lifelineIntervalMs = lifelineIntervalMs;
this.blockReportIntervalMs = blockReportIntervalMs;
this.slowPeersReportIntervalMs = slowPeersReportIntervalMs;
this.outliersReportIntervalMs = outliersReportIntervalMs;
scheduleNextLifeline(nextHeartbeatTime);
}
@ -1143,8 +1149,8 @@ void updateLastBlockReportTime(long blockReportTime) {
lastBlockReportTime = blockReportTime;
}
void scheduleNextSlowPeerReport() {
nextSlowPeersReportTime = monotonicNow() + slowPeersReportIntervalMs;
void scheduleNextOutlierReport() {
nextOutliersReportTime = monotonicNow() + outliersReportIntervalMs;
}
long getLastHearbeatTime() {
@ -1173,8 +1179,8 @@ boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
}
boolean isSlowPeersReportDue(long curTime) {
return nextSlowPeersReportTime - curTime <= 0;
boolean isOutliersReportDue(long curTime) {
return nextOutliersReportTime - curTime <= 0;
}
void forceFullBlockReportNow() {

View File

@ -27,6 +27,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.DataNodeVolumeMetrics;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsDatasetSpi;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports.DiskOp;
import org.apache.hadoop.util.Daemon;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -54,7 +55,7 @@ public class DataNodeDiskMetrics {
private volatile boolean shouldRun;
private OutlierDetector slowDiskDetector;
private Daemon slowDiskDetectionDaemon;
private volatile Map<String, Map<DiskOutlierDetectionOp, Double>>
private volatile Map<String, Map<DiskOp, Double>>
diskOutliersStats = Maps.newHashMap();
public DataNodeDiskMetrics(DataNode dn, long diskOutlierDetectionIntervalMs) {
@ -144,13 +145,13 @@ private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
diskOutliersSet.addAll(writeIoOutliers.keySet());
}
Map<String, Map<DiskOutlierDetectionOp, Double>> diskStats =
Map<String, Map<DiskOp, Double>> diskStats =
Maps.newHashMap();
for (String disk : diskOutliersSet) {
Map<DiskOutlierDetectionOp, Double> diskStat = Maps.newHashMap();
diskStat.put(DiskOutlierDetectionOp.METADATA, metadataOpStats.get(disk));
diskStat.put(DiskOutlierDetectionOp.READ, readIoStats.get(disk));
diskStat.put(DiskOutlierDetectionOp.WRITE, writeIoStats.get(disk));
Map<DiskOp, Double> diskStat = Maps.newHashMap();
diskStat.put(DiskOp.METADATA, metadataOpStats.get(disk));
diskStat.put(DiskOp.READ, readIoStats.get(disk));
diskStat.put(DiskOp.WRITE, writeIoStats.get(disk));
diskStats.put(disk, diskStat);
}
@ -158,17 +159,7 @@ private void detectAndUpdateDiskOutliers(Map<String, Double> metadataOpStats,
LOG.debug("Updated disk outliers.");
}
/**
* Lists the types of operations on which disk latencies are measured.
*/
public enum DiskOutlierDetectionOp {
METADATA,
READ,
WRITE
}
public Map<String,
Map<DiskOutlierDetectionOp, Double>> getDiskOutliersStats() {
public Map<String, Map<DiskOp, Double>> getDiskOutliersStats() {
return diskOutliersStats;
}
@ -186,7 +177,12 @@ public void shutdownAndWait() {
* Use only for testing.
*/
@VisibleForTesting
public void addSlowDiskForTesting(String slowDiskPath) {
diskOutliersStats.put(slowDiskPath, ImmutableMap.of());
public void addSlowDiskForTesting(String slowDiskPath,
Map<DiskOp, Double> latencies) {
if (latencies == null) {
diskOutliersStats.put(slowDiskPath, ImmutableMap.of());
} else {
diskOutliersStats.put(slowDiskPath, latencies);
}
}
}

View File

@ -88,6 +88,7 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_DEFAULT;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_REPLICATION_KEY;
import static org.apache.hadoop.hdfs.server.namenode.FSDirStatAndListingOp.*;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.util.Time.now;
import static org.apache.hadoop.util.Time.monotonicNow;
import static org.apache.hadoop.hdfs.server.namenode.top.metrics.TopMetrics.TOPMETRICS_METRICS_SOURCE_NAME;
@ -3647,7 +3648,8 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
readLock();
try {
//get datanode commands
@ -3656,7 +3658,7 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, getBlockPoolId(), cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary,
slowPeers);
slowPeers, slowDisks);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);

View File

@ -155,6 +155,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.NodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.RemoteEditLogManifest;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -1422,13 +1423,14 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers) throws IOException {
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary, requestFullBlockReportLease,
slowPeers);
slowPeers, slowDisks);
}
@Override // DatanodeProtocol

View File

@ -122,7 +122,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease,
@Nonnull SlowPeerReports slowPeers)
@Nonnull SlowPeerReports slowPeers,
@Nonnull SlowDiskReports slowDisks)
throws IOException;
/**

View File

@ -196,6 +196,7 @@ message VolumeFailureSummaryProto {
* cacheUsed - amount of cache used
* volumeFailureSummary - info about volume failures
* slowPeers - info about peer DataNodes that are suspected to be slow.
* slowDisks - info about DataNode disks that are suspected to be slow.
*/
message HeartbeatRequestProto {
required DatanodeRegistrationProto registration = 1; // Datanode info
@ -208,6 +209,7 @@ message HeartbeatRequestProto {
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
optional bool requestFullBlockReportLease = 9 [ default = false ];
repeated SlowPeerReportProto slowPeers = 10;
repeated SlowDiskReportProto slowDisks = 11;
}
/**
@ -405,6 +407,19 @@ message SlowPeerReportProto {
optional double aggregateLatency = 2;
}
/**
* Information about a single slow disk that may be reported by
* the DataNode to the NameNode as part of the heartbeat request.
* The message includes the disk's basePath, mean metadata op latency,
* mean read io latency and mean write io latency as observed by the DataNode.
*/
message SlowDiskReportProto {
optional string basePath = 1;
optional double meanMetadataOpLatency = 2;
optional double meanReadIoLatency = 3;
optional double meanWriteIoLatency = 4;
}
/**
* Protocol used from datanode to the namenode
* See the request and response for details of rpc call.

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.protocolPB;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertSame;
@ -792,6 +793,32 @@ public void testSlowPeerInfoPBHelper() {
slowPeersConverted2.equals(SlowPeerReports.EMPTY_REPORT));
}
@Test
public void testSlowDiskInfoPBHelper() {
// Test with a map that has a few slow disk entries.
final SlowDiskReports slowDisks = SlowDiskReports.create(
ImmutableMap.of(
"disk1", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 0.5),
"disk2", ImmutableMap.of(SlowDiskReports.DiskOp.READ, 1.0,
SlowDiskReports.DiskOp.WRITE, 1.0),
"disk3", ImmutableMap.of(SlowDiskReports.DiskOp.METADATA, 1.2,
SlowDiskReports.DiskOp.READ, 1.5,
SlowDiskReports.DiskOp.WRITE, 1.3)));
SlowDiskReports slowDisksConverted1 = PBHelper.convertSlowDiskInfo(
PBHelper.convertSlowDiskInfo(slowDisks));
assertTrue(
"Expected map:" + slowDisks + ", got map:" +
slowDisksConverted1.getSlowDisks(),
slowDisksConverted1.equals(slowDisks));
// Test with an empty map
SlowDiskReports slowDisksConverted2 = PBHelper.convertSlowDiskInfo(
PBHelper.convertSlowDiskInfo(SlowDiskReports.EMPTY_REPORT));
assertTrue(
"Expected empty map:" + ", got map:" + slowDisksConverted2,
slowDisksConverted2.equals(SlowDiskReports.EMPTY_REPORT));
}
private void assertBlockECRecoveryInfoEquals(
BlockECReconstructionInfo blkECRecoveryInfo1,
BlockECReconstructionInfo blkECRecoveryInfo2) {

View File

@ -41,6 +41,7 @@
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.test.GenericTestUtils;
@ -114,7 +115,8 @@ private static void runTest(final String testCaseName,
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null, true, SlowPeerReports.EMPTY_REPORT);
0, null, true, SlowPeerReports.EMPTY_REPORT,
SlowDiskReports.EMPTY_REPORT);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -138,7 +139,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyLong(), Mockito.anyInt(), Mockito.anyInt(),
Mockito.anyInt(), Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class))).thenReturn(
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class))).thenReturn(
new HeartbeatResponse(new DatanodeCommand[0], new NNHAStatusHeartbeat(
HAServiceState.ACTIVE, 1), null, ThreadLocalRandom.current()
.nextLong() | 1L));

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_DATANODE_DATA_DIR_KEY;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertNotNull;
@ -154,7 +155,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
datanodeCommands[nnIdx] = new DatanodeCommand[0];
return mock;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
@ -220,7 +221,8 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class)))
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class)))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),

View File

@ -51,7 +51,7 @@ public class TestBpServiceActorScheduler {
private static final long HEARTBEAT_INTERVAL_MS = 5000; // 5 seconds
private static final long LIFELINE_INTERVAL_MS = 3 * HEARTBEAT_INTERVAL_MS;
private static final long BLOCK_REPORT_INTERVAL_MS = 10000; // 10 seconds
private static final long SLOW_PEER_REPORT_INTERVAL_MS = 10000; // 10 seconds
private static final long OUTLIER_REPORT_INTERVAL_MS = 10000; // 10 seconds
private final Random random = new Random(System.nanoTime());
@Test
@ -182,15 +182,15 @@ public void testScheduleLifeline() {
}
@Test
public void testSlowPeerReportScheduling() {
public void testOutlierReportScheduling() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.isSlowPeersReportDue(now));
scheduler.scheduleNextSlowPeerReport();
assertFalse(scheduler.isSlowPeersReportDue(now));
assertFalse(scheduler.isSlowPeersReportDue(now + 1));
assertTrue(scheduler.isSlowPeersReportDue(
now + SLOW_PEER_REPORT_INTERVAL_MS));
assertTrue(scheduler.isOutliersReportDue(now));
scheduler.scheduleNextOutlierReport();
assertFalse(scheduler.isOutliersReportDue(now));
assertFalse(scheduler.isOutliersReportDue(now + 1));
assertTrue(scheduler.isOutliersReportDue(
now + OUTLIER_REPORT_INTERVAL_MS));
}
}
@ -198,11 +198,11 @@ private Scheduler makeMockScheduler(long now) {
LOG.info("Using now = " + now);
Scheduler mockScheduler = spy(new Scheduler(
HEARTBEAT_INTERVAL_MS, LIFELINE_INTERVAL_MS,
BLOCK_REPORT_INTERVAL_MS, SLOW_PEER_REPORT_INTERVAL_MS));
BLOCK_REPORT_INTERVAL_MS, OUTLIER_REPORT_INTERVAL_MS));
doReturn(now).when(mockScheduler).monotonicNow();
mockScheduler.nextBlockReportTime = now;
mockScheduler.nextHeartbeatTime = now;
mockScheduler.nextSlowPeersReportTime = now;
mockScheduler.nextOutliersReportTime = now;
return mockScheduler;
}

View File

@ -23,6 +23,8 @@
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_LIFELINE_RPC_ADDRESS_KEY;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_STALE_DATANODE_INTERVAL_KEY;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
@ -169,7 +171,8 @@ public void testSendLifelineIfHeartbeatBlocked() throws Exception {
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class));
any(SlowPeerReports.class),
any(SlowDiskReports.class));
// Intercept lifeline to trigger latch count-down on each call.
doAnswer(new LatchCountingAnswer<Void>(lifelinesSent))
@ -233,7 +236,8 @@ public void testNoLifelineSentIfHeartbeatsOnTime() throws Exception {
anyInt(),
any(VolumeFailureSummary.class),
anyBoolean(),
any(SlowPeerReports.class));
any(SlowPeerReports.class),
any(SlowDiskReports.class));
// While waiting on the latch for the expected number of heartbeat messages,
// poll DataNode tracking information. We expect that the DataNode always

View File

@ -227,7 +227,7 @@ public void testDataNodeMXBeanSlowDisksEnabled() throws Exception {
Assert.assertEquals(datanodes.size(), 1);
DataNode datanode = datanodes.get(0);
String slowDiskPath = "test/data1/slowVolume";
datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath);
datanode.getDiskMetrics().addSlowDiskForTesting(slowDiskPath, null);
MBeanServer mbs = ManagementFactory.getPlatformMBeanServer();
ObjectName mxbeanName = new ObjectName(

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when;
@ -220,7 +221,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
dn = new DataNode(conf, locations, null, null) {
@Override

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.datanode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -173,7 +174,8 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean(), any(SlowPeerReports.class));
anyBoolean(), any(SlowPeerReports.class),
any(SlowDiskReports.class));
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.hdfs.server.namenode.NameNode;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageReport;
import org.apache.hadoop.hdfs.server.protocol.VolumeFailureSummary;
@ -108,7 +109,8 @@ public void testStorageReportHasStorageTypeAndState() throws IOException {
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean(),
Mockito.any(SlowPeerReports.class));
Mockito.any(SlowPeerReports.class),
Mockito.any(SlowDiskReports.class));
StorageReport[] reports = captor.getValue();

View File

@ -64,6 +64,7 @@
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
import org.apache.hadoop.hdfs.server.protocol.NamespaceInfo;
import org.apache.hadoop.hdfs.server.protocol.ReceivedDeletedBlockInfo;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import org.apache.hadoop.hdfs.server.protocol.SlowPeerReports;
import org.apache.hadoop.hdfs.server.protocol.StorageBlockReport;
import org.apache.hadoop.hdfs.server.protocol.StorageReceivedDeletedBlocks;
@ -953,7 +954,8 @@ void sendHeartbeat() throws IOException {
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0L) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -1003,7 +1005,8 @@ int replicateBlocks() throws IOException {
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED, 0) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.mockito.Mockito.spy;
import java.io.File;
@ -124,7 +125,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT);
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hdfs.server.namenode;
import org.apache.hadoop.hdfs.server.protocol.SlowDiskReports;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.fail;
@ -134,7 +135,8 @@ public void testDeadDatanode() throws Exception {
false, 0, 0, 0, 0, 0) };
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true,
SlowPeerReports.EMPTY_REPORT).getCommands();
SlowPeerReports.EMPTY_REPORT, SlowDiskReports.EMPTY_REPORT)
.getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());