diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 5bb6e5380c..3f72608099 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -434,6 +434,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0; public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold"; public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000; + public static final String DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES = "dfs.namenode.max.full.block.report.leases"; + public static final int DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6; + public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms"; + public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L; public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec"; public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000; public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java index 825e83586b..94028a2523 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolClientSideTranslatorPB.java @@ -132,11 +132,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException { HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder() .setRegistration(PBHelper.convert(registration)) .setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount) - .setFailedVolumes(failedVolumes); + .setFailedVolumes(failedVolumes) + .setRequestFullBlockReportLease(requestFullBlockReportLease); builder.addAllReports(PBHelper.convertStorageReports(reports)); if (cacheCapacity != 0) { builder.setCacheCapacity(cacheCapacity); @@ -165,7 +167,7 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, rollingUpdateStatus = PBHelper.convert(resp.getRollingUpgradeStatus()); } return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()), - rollingUpdateStatus); + rollingUpdateStatus, resp.getFullBlockReportLeaseId()); } @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java index 873eb6d170..e133ec7923 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/DatanodeProtocolServerSideTranslatorPB.java @@ -114,7 +114,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, report, request.getCacheCapacity(), request.getCacheUsed(), request.getXmitsInProgress(), request.getXceiverCount(), request.getFailedVolumes(), - volumeFailureSummary); + volumeFailureSummary, request.getRequestFullBlockReportLease()); } catch (IOException e) { throw new ServiceException(e); } @@ -135,6 +135,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller, builder.setRollingUpgradeStatus(PBHelper .convertRollingUpgradeStatus(rollingUpdateStatus)); } + builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId()); return builder.build(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java index c9a9c337a0..32d961477e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/protocolPB/PBHelper.java @@ -3042,7 +3042,7 @@ public static boolean[] convertBooleanList( public static BlockReportContext convert(BlockReportContextProto proto) { return new BlockReportContext(proto.getTotalRpcs(), - proto.getCurRpc(), proto.getId()); + proto.getCurRpc(), proto.getId(), proto.getLeaseId()); } public static BlockReportContextProto convert(BlockReportContext context) { @@ -3050,6 +3050,7 @@ public static BlockReportContextProto convert(BlockReportContext context) { setTotalRpcs(context.getTotalRpcs()). setCurRpc(context.getCurRpc()). setId(context.getReportId()). + setLeaseId(context.getLeaseId()). build(); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index d18d7fe729..4562d94cbe 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -75,6 +75,7 @@ import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations; import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations; import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand; +import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State; import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand; @@ -124,6 +125,7 @@ public class BlockManager { private final AtomicLong excessBlocksCount = new AtomicLong(0L); private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L); private final long startupDelayBlockDeletionInMs; + private final BlockReportLeaseManager blockReportLeaseManager; /** Used by metrics */ public long getPendingReplicationBlocksCount() { @@ -348,7 +350,8 @@ public BlockManager(final Namesystem namesystem, final Configuration conf) this.numBlocksPerIteration = conf.getInt( DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT, DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT); - + this.blockReportLeaseManager = new BlockReportLeaseManager(conf); + LOG.info("defaultReplication = " + defaultReplication); LOG.info("maxReplication = " + maxReplication); LOG.info("minReplication = " + minReplication); @@ -1712,7 +1715,28 @@ private void processPendingReplications() { */ } } - + + public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) { + assert namesystem.hasReadLock(); + DatanodeDescriptor node = null; + try { + node = datanodeManager.getDatanode(nodeReg); + } catch (UnregisteredNodeException e) { + LOG.warn("Unregistered datanode {}", nodeReg); + return 0; + } + if (node == null) { + LOG.warn("Failed to find datanode {}", nodeReg); + return 0; + } + // Request a new block report lease. The BlockReportLeaseManager has + // its own internal locking. + long leaseId = blockReportLeaseManager.requestLease(node); + BlockManagerFaultInjector.getInstance(). + requestBlockReportLease(node, leaseId); + return leaseId; + } + /** * StatefulBlockInfo is used to build the "toUC" list, which is a list of * updates to the information about under-construction blocks. @@ -1817,6 +1841,12 @@ public boolean processReport(final DatanodeID nodeID, + " because namenode still in startup phase", nodeID); return !node.hasStaleStorages(); } + if (context != null) { + if (!blockReportLeaseManager.checkLease(node, startTime, + context.getLeaseId())) { + return false; + } + } if (storageInfo.getBlockReportCount() == 0) { // The first block report can be processed a lot more efficiently than @@ -1835,6 +1865,9 @@ public boolean processReport(final DatanodeID nodeID, if (lastStorageInRpc) { int rpcsSeen = node.updateBlockReportContext(context); if (rpcsSeen >= context.getTotalRpcs()) { + long leaseId = blockReportLeaseManager.removeLease(node); + BlockManagerFaultInjector.getInstance(). + removeBlockReportLease(node, leaseId); List zombies = node.removeZombieStorages(); if (zombies.isEmpty()) { LOG.debug("processReport 0x{}: no zombie storages found.", @@ -3845,4 +3878,8 @@ public void clear() { clearQueues(); blocksMap.clear(); } + + public BlockReportLeaseManager getBlockReportLeaseManager() { + return blockReportLeaseManager; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java new file mode 100644 index 0000000000..957c5c0c37 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManagerFaultInjector.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import java.io.IOException; + +import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; + +/** + * Used to inject certain faults for testing. + */ +public class BlockManagerFaultInjector { + @VisibleForTesting + public static BlockManagerFaultInjector instance = + new BlockManagerFaultInjector(); + + @VisibleForTesting + public static BlockManagerFaultInjector getInstance() { + return instance; + } + + @VisibleForTesting + public void incomingBlockReportRpc(DatanodeID nodeID, + BlockReportContext context) throws IOException { + + } + + @VisibleForTesting + public void requestBlockReportLease(DatanodeDescriptor node, long leaseId) { + } + + @VisibleForTesting + public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) { + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java new file mode 100644 index 0000000000..cd037f5c18 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockReportLeaseManager.java @@ -0,0 +1,355 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.blockmanagement; + +import com.google.common.base.Joiner; +import com.google.common.base.Preconditions; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashMap; +import java.util.concurrent.ThreadLocalRandom; + +/** + * The BlockReportLeaseManager manages block report leases.

+ * + * DataNodes request BR leases from the NameNode by sending a heartbeat with + * the requestBlockReportLease field set. The NameNode may choose to respond + * with a non-zero lease ID. If so, that DataNode can send a block report with + * the given lease ID for the next few minutes. The NameNode will accept + * these full block reports.

+ * + * BR leases limit the number of incoming full block reports to the NameNode + * at any given time. For compatibility reasons, the NN will always accept + * block reports sent with a lease ID of 0 and queue them for processing + * immediately. Full block reports which were manually triggered will also + * have a lease ID of 0, bypassing the rate-limiting.

+ * + * Block report leases expire after a certain amount of time. This mechanism + * is in place so that a DN which dies while holding a lease does not + * permanently decrease the number of concurrent block reports which the NN is + * willing to accept.

+ * + * When considering which DNs to grant a BR lease, the NameNode gives priority + * to the DNs which have gone the longest without sending a full block + * report.

+ */ +class BlockReportLeaseManager { + static final Logger LOG = + LoggerFactory.getLogger(BlockReportLeaseManager.class); + + private static class NodeData { + /** + * The UUID of the datanode. + */ + final String datanodeUuid; + + /** + * The lease ID, or 0 if there is no lease. + */ + long leaseId; + + /** + * The time when the lease was issued, or 0 if there is no lease. + */ + long leaseTimeMs; + + /** + * Previous element in the list. + */ + NodeData prev; + + /** + * Next element in the list. + */ + NodeData next; + + static NodeData ListHead(String name) { + NodeData node = new NodeData(name); + node.next = node; + node.prev = node; + return node; + } + + NodeData(String datanodeUuid) { + this.datanodeUuid = datanodeUuid; + } + + void removeSelf() { + if (this.prev != null) { + this.prev.next = this.next; + } + if (this.next != null) { + this.next.prev = this.prev; + } + this.next = null; + this.prev = null; + } + + void addToEnd(NodeData node) { + Preconditions.checkState(node.next == null); + Preconditions.checkState(node.prev == null); + node.prev = this.prev; + node.next = this; + this.prev.next = node; + this.prev = node; + } + + void addToBeginning(NodeData node) { + Preconditions.checkState(node.next == null); + Preconditions.checkState(node.prev == null); + node.next = this.next; + node.prev = this; + this.next.prev = node; + this.next = node; + } + } + + /** + * List of datanodes which don't currently have block report leases. + */ + private final NodeData deferredHead = NodeData.ListHead("deferredHead"); + + /** + * List of datanodes which currently have block report leases. + */ + private final NodeData pendingHead = NodeData.ListHead("pendingHead"); + + /** + * Maps datanode UUIDs to NodeData. + */ + private final HashMap nodes = new HashMap<>(); + + /** + * The current length of the pending list. + */ + private int numPending = 0; + + /** + * The maximum number of leases to hand out at any given time. + */ + private final int maxPending; + + /** + * The number of milliseconds after which a lease will expire. + */ + private final long leaseExpiryMs; + + /** + * The next ID we will use for a block report lease. + */ + private long nextId = ThreadLocalRandom.current().nextLong(); + + BlockReportLeaseManager(Configuration conf) { + this(conf.getInt( + DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, + DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT), + conf.getLong( + DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, + DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT)); + } + + BlockReportLeaseManager(int maxPending, long leaseExpiryMs) { + Preconditions.checkArgument(maxPending >= 1, + "Cannot set the maximum number of block report leases to a " + + "value less than 1."); + this.maxPending = maxPending; + Preconditions.checkArgument(leaseExpiryMs >= 1, + "Cannot set full block report lease expiry period to a value " + + "less than 1."); + this.leaseExpiryMs = leaseExpiryMs; + } + + /** + * Get the next block report lease ID. Any number is valid except 0. + */ + private synchronized long getNextId() { + long id; + do { + id = nextId++; + } while (id == 0); + return id; + } + + public synchronized void register(DatanodeDescriptor dn) { + registerNode(dn); + } + + private synchronized NodeData registerNode(DatanodeDescriptor dn) { + if (nodes.containsKey(dn.getDatanodeUuid())) { + LOG.info("Can't register DN {} because it is already registered.", + dn.getDatanodeUuid()); + return null; + } + NodeData node = new NodeData(dn.getDatanodeUuid()); + deferredHead.addToBeginning(node); + nodes.put(dn.getDatanodeUuid(), node); + LOG.info("Registered DN {} ({}).", dn.getDatanodeUuid(), dn.getXferAddr()); + return node; + } + + private synchronized void remove(NodeData node) { + if (node.leaseId != 0) { + numPending--; + node.leaseId = 0; + node.leaseTimeMs = 0; + } + node.removeSelf(); + } + + public synchronized void unregister(DatanodeDescriptor dn) { + NodeData node = nodes.remove(dn.getDatanodeUuid()); + if (node == null) { + LOG.info("Can't unregister DN {} because it is not currently " + + "registered.", dn.getDatanodeUuid()); + return; + } + remove(node); + } + + public synchronized long requestLease(DatanodeDescriptor dn) { + NodeData node = nodes.get(dn.getDatanodeUuid()); + if (node == null) { + LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " + + "registered. Registering now.", dn.getDatanodeUuid(), + dn.getXferAddr()); + node = registerNode(dn); + } + if (node.leaseId != 0) { + // The DataNode wants a new lease, even though it already has one. + // This can happen if the DataNode is restarted in between requesting + // a lease and using it. + LOG.debug("Removing existing BR lease 0x{} for DN {} in order to " + + "issue a new one.", Long.toHexString(node.leaseId), + dn.getDatanodeUuid()); + } + remove(node); + long monotonicNowMs = Time.monotonicNow(); + pruneExpiredPending(monotonicNowMs); + if (numPending >= maxPending) { + if (LOG.isDebugEnabled()) { + StringBuilder allLeases = new StringBuilder(); + String prefix = ""; + for (NodeData cur = pendingHead.next; cur != pendingHead; + cur = cur.next) { + allLeases.append(prefix).append(cur.datanodeUuid); + prefix = ", "; + } + LOG.debug("Can't create a new BR lease for DN {}, because " + + "numPending equals maxPending at {}. Current leases: {}", + dn.getDatanodeUuid(), numPending, allLeases.toString()); + } + return 0; + } + numPending++; + node.leaseId = getNextId(); + node.leaseTimeMs = monotonicNowMs; + pendingHead.addToEnd(node); + if (LOG.isDebugEnabled()) { + LOG.debug("Created a new BR lease 0x{} for DN {}. numPending = {}", + Long.toHexString(node.leaseId), dn.getDatanodeUuid(), numPending); + } + return node.leaseId; + } + + private synchronized boolean pruneIfExpired(long monotonicNowMs, + NodeData node) { + if (monotonicNowMs < node.leaseTimeMs + leaseExpiryMs) { + return false; + } + LOG.info("Removing expired block report lease 0x{} for DN {}.", + Long.toHexString(node.leaseId), node.datanodeUuid); + Preconditions.checkState(node.leaseId != 0); + remove(node); + deferredHead.addToBeginning(node); + return true; + } + + private synchronized void pruneExpiredPending(long monotonicNowMs) { + NodeData cur = pendingHead.next; + while (cur != pendingHead) { + NodeData next = cur.next; + if (!pruneIfExpired(monotonicNowMs, cur)) { + return; + } + cur = next; + } + LOG.trace("No entries remaining in the pending list."); + } + + public synchronized boolean checkLease(DatanodeDescriptor dn, + long monotonicNowMs, long id) { + if (id == 0) { + LOG.debug("Datanode {} is using BR lease id 0x0 to bypass " + + "rate-limiting.", dn.getDatanodeUuid()); + return true; + } + NodeData node = nodes.get(dn.getDatanodeUuid()); + if (node == null) { + LOG.info("BR lease 0x{} is not valid for unknown datanode {}", + Long.toHexString(id), dn.getDatanodeUuid()); + return false; + } + if (node.leaseId == 0) { + LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " + + "is not in the pending set.", + Long.toHexString(id), dn.getDatanodeUuid()); + return false; + } + if (pruneIfExpired(monotonicNowMs, node)) { + LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " + + "has expired.", Long.toHexString(id), dn.getDatanodeUuid()); + return false; + } + if (id != node.leaseId) { + LOG.warn("BR lease 0x{} is not valid for DN {}. Expected BR lease 0x{}.", + Long.toHexString(id), dn.getDatanodeUuid(), + Long.toHexString(node.leaseId)); + return false; + } + if (LOG.isTraceEnabled()) { + LOG.trace("BR lease 0x{} is valid for DN {}.", + Long.toHexString(id), dn.getDatanodeUuid()); + } + return true; + } + + public synchronized long removeLease(DatanodeDescriptor dn) { + NodeData node = nodes.get(dn.getDatanodeUuid()); + if (node == null) { + LOG.info("Can't remove lease for unknown datanode {}", + dn.getDatanodeUuid()); + return 0; + } + long id = node.leaseId; + if (id == 0) { + LOG.debug("DN {} has no lease to remove.", dn.getDatanodeUuid()); + return 0; + } + remove(node); + deferredHead.addToEnd(node); + if (LOG.isTraceEnabled()) { + LOG.trace("Removed BR lease 0x{} for DN {}. numPending = {}", + Long.toHexString(id), dn.getDatanodeUuid(), numPending); + } + return id; + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java index 01f7972f66..58349cced7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/DatanodeManager.java @@ -540,6 +540,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) { blockManager.removeBlocksAssociatedTo(nodeInfo); networktopology.remove(nodeInfo); decrementVersionCount(nodeInfo.getSoftwareVersion()); + blockManager.getBlockReportLeaseManager().unregister(nodeInfo); if (LOG.isDebugEnabled()) { LOG.debug("remove datanode " + nodeInfo); @@ -602,6 +603,7 @@ void addDatanode(final DatanodeDescriptor node) { networktopology.add(node); // may throw InvalidTopologyException host2DatanodeMap.add(node); checkIfClusterIsNowMultiRack(node); + blockManager.getBlockReportLeaseManager().register(node); if (LOG.isDebugEnabled()) { LOG.debug(getClass().getSimpleName() + ".addDatanode: " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java index 63a0bb640b..ea1abbd03d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/BPServiceActor.java @@ -29,6 +29,8 @@ import java.util.List; import java.util.Map; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; import com.google.common.base.Joiner; import org.apache.commons.logging.Log; @@ -355,9 +357,10 @@ void notifyNamenodeDeletedBlock( void triggerBlockReportForTests() { synchronized (pendingIncrementalBRperStorage) { scheduler.scheduleHeartbeat(); - long nextBlockReportTime = scheduler.scheduleBlockReport(0); + long oldBlockReportTime = scheduler.nextBlockReportTime; + scheduler.forceFullBlockReportNow(); pendingIncrementalBRperStorage.notifyAll(); - while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) { + while (oldBlockReportTime == scheduler.nextBlockReportTime) { try { pendingIncrementalBRperStorage.wait(100); } catch (InterruptedException e) { @@ -419,12 +422,7 @@ private long generateUniqueBlockReportId() { * @return DatanodeCommands returned by the NN. May be null. * @throws IOException */ - List blockReport() throws IOException { - // send block report if timer has expired. - if (!scheduler.isBlockReportDue()) { - return null; - } - + List blockReport(long fullBrLeaseId) throws IOException { final ArrayList cmds = new ArrayList(); // Flush any block information that precedes the block report. Otherwise @@ -460,7 +458,7 @@ List blockReport() throws IOException { // Below split threshold, send all reports in a single message. DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), reports, - new BlockReportContext(1, 0, reportId)); + new BlockReportContext(1, 0, reportId, fullBrLeaseId)); numRPCs = 1; numReportsSent = reports.length; if (cmd != null) { @@ -472,7 +470,8 @@ List blockReport() throws IOException { StorageBlockReport singleReport[] = { reports[r] }; DatanodeCommand cmd = bpNamenode.blockReport( bpRegistration, bpos.getBlockPoolId(), singleReport, - new BlockReportContext(reports.length, r, reportId)); + new BlockReportContext(reports.length, r, reportId, + fullBrLeaseId)); numReportsSent++; numRPCs++; if (cmd != null) { @@ -538,7 +537,8 @@ DatanodeCommand cacheReport() throws IOException { return cmd; } - HeartbeatResponse sendHeartBeat() throws IOException { + HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease) + throws IOException { StorageReport[] reports = dn.getFSDataset().getStorageReports(bpos.getBlockPoolId()); if (LOG.isDebugEnabled()) { @@ -557,7 +557,8 @@ HeartbeatResponse sendHeartBeat() throws IOException { dn.getXmitsInProgress(), dn.getXceiverCount(), numFailedVolumes, - volumeFailureSummary); + volumeFailureSummary, + requestBlockReportLease); } //This must be called only by BPOfferService @@ -625,8 +626,9 @@ private void offerService() throws Exception { LOG.info("For namenode " + nnAddr + " using" + " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec" + " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec" - + " Initial delay: " + dnConf.initialBlockReportDelay + "msec" + + " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec" + "; heartBeatInterval=" + dnConf.heartBeatInterval); + long fullBlockReportLeaseId = 0; // // Now loop for a long time.... @@ -639,6 +641,7 @@ private void offerService() throws Exception { // Every so often, send heartbeat or block-report // final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime); + HeartbeatResponse resp = null; if (sendHeartbeat) { // // All heartbeat messages include following info: @@ -647,10 +650,23 @@ private void offerService() throws Exception { // -- Total capacity // -- Bytes remaining // + boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) && + scheduler.isBlockReportDue(startTime); scheduler.scheduleNextHeartbeat(); if (!dn.areHeartbeatsDisabledForTests()) { - HeartbeatResponse resp = sendHeartBeat(); + resp = sendHeartBeat(requestBlockReportLease); assert resp != null; + if (resp.getFullBlockReportLeaseId() != 0) { + if (fullBlockReportLeaseId != 0) { + LOG.warn(nnAddr + " sent back a full block report lease " + + "ID of 0x" + + Long.toHexString(resp.getFullBlockReportLeaseId()) + + ", but we already have a lease ID of 0x" + + Long.toHexString(fullBlockReportLeaseId) + ". " + + "Overwriting old lease ID."); + } + fullBlockReportLeaseId = resp.getFullBlockReportLeaseId(); + } dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime); // If the state of this NN has changed (eg STANDBY->ACTIVE) @@ -682,7 +698,16 @@ private void offerService() throws Exception { reportReceivedDeletedBlocks(); } - List cmds = blockReport(); + List cmds = null; + boolean forceFullBr = + scheduler.forceFullBlockReport.getAndSet(false); + if (forceFullBr) { + LOG.info("Forcing a full block report to " + nnAddr); + } + if ((fullBlockReportLeaseId != 0) || forceFullBr) { + cmds = blockReport(fullBlockReportLeaseId); + fullBlockReportLeaseId = 0; + } processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()])); DatanodeCommand cmd = cacheReport(); @@ -765,7 +790,7 @@ void register(NamespaceInfo nsInfo) throws IOException { bpos.registrationSucceeded(this, bpRegistration); // random short delay - helps scatter the BR from all DNs - scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay); + scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs); } @@ -958,7 +983,7 @@ void triggerBlockReport(BlockReportOptions options) throws IOException { } else { LOG.info(bpos.toString() + ": scheduling a full block report."); synchronized(pendingIncrementalBRperStorage) { - scheduler.scheduleBlockReport(0); + scheduler.forceFullBlockReportNow(); pendingIncrementalBRperStorage.notifyAll(); } } @@ -1011,6 +1036,9 @@ static class Scheduler { @VisibleForTesting boolean resetBlockReportTime = true; + private final AtomicBoolean forceFullBlockReport = + new AtomicBoolean(false); + private final long heartbeatIntervalMs; private final long blockReportIntervalMs; @@ -1042,8 +1070,13 @@ boolean isHeartbeatDue(long startTime) { return (nextHeartbeatTime - startTime <= 0); } - boolean isBlockReportDue() { - return nextBlockReportTime - monotonicNow() <= 0; + boolean isBlockReportDue(long curTime) { + return nextBlockReportTime - curTime <= 0; + } + + void forceFullBlockReportNow() { + forceFullBlockReport.set(true); + resetBlockReportTime = true; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java index 4b7fbc3c1f..42b1b46e31 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/DNConf.java @@ -82,7 +82,7 @@ public class DNConf { final long heartBeatInterval; final long blockReportInterval; final long blockReportSplitThreshold; - final long initialBlockReportDelay; + final long initialBlockReportDelayMs; final long cacheReportInterval; final long dfsclientSlowIoWarningThresholdMs; final long datanodeSlowIoWarningThresholdMs; @@ -159,7 +159,7 @@ public DNConf(Configuration conf) { + "greater than or equal to" + "dfs.blockreport.intervalMsec." + " Setting initial delay to 0 msec:"); } - initialBlockReportDelay = initBRDelay; + initialBlockReportDelayMs = initBRDelay; heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY, DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java index d3d98fdb41..d3b32dad6c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/FSNamesystem.java @@ -3976,7 +3976,8 @@ String getRegistrationID() { HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, StorageReport[] reports, long cacheCapacity, long cacheUsed, int xceiverCount, int xmitsInProgress, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) throws IOException { + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException { readLock(); try { //get datanode commands @@ -3985,13 +3986,17 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg, DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat( nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed, xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary); - + long blockReportLeaseId = 0; + if (requestFullBlockReportLease) { + blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg); + } //create ha status final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat( haContext.getState().getServiceState(), getFSImage().getLastAppliedOrWrittenTxId()); - return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo); + return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo, + blockReportLeaseId); } finally { readUnlock(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java index 4a146ffd3f..52aaabd13e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/NameNodeRpcServer.java @@ -116,6 +116,7 @@ import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys; import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier; import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager; +import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants; import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole; import org.apache.hadoop.hdfs.server.common.IncorrectVersionException; @@ -1277,13 +1278,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg) public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg, StorageReport[] report, long dnCacheCapacity, long dnCacheUsed, int xmitsInProgress, int xceiverCount, - int failedVolumes, VolumeFailureSummary volumeFailureSummary) - throws IOException { + int failedVolumes, VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException { checkNNStartup(); verifyRequest(nodeReg); return namesystem.handleHeartbeat(nodeReg, report, dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress, - failedVolumes, volumeFailureSummary); + failedVolumes, volumeFailureSummary, requestFullBlockReportLease); } @Override // DatanodeProtocol @@ -1309,6 +1310,8 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg, blocks, context, (r == reports.length - 1)); metrics.incrStorageBlockReportOps(); } + BlockManagerFaultInjector.getInstance(). + incomingBlockReportRpc(nodeReg, context); if (nn.getFSImage().isUpgradeFinalized() && !namesystem.isRollingUpgrade() && diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java index d0b02825f1..5bcd719b70 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/BlockReportContext.java @@ -31,14 +31,33 @@ */ @InterfaceAudience.Private public class BlockReportContext { + /** + * The total number of RPCs contained in the block report. + */ private final int totalRpcs; + + /** + * The index of this particular RPC. + */ private final int curRpc; + + /** + * A 64-bit ID which identifies the block report as a whole. + */ private final long reportId; - public BlockReportContext(int totalRpcs, int curRpc, long reportId) { + /** + * The lease ID which this block report is using, or 0 if this block report is + * bypassing rate-limiting. + */ + private final long leaseId; + + public BlockReportContext(int totalRpcs, int curRpc, + long reportId, long leaseId) { this.totalRpcs = totalRpcs; this.curRpc = curRpc; this.reportId = reportId; + this.leaseId = leaseId; } public int getTotalRpcs() { @@ -52,4 +71,8 @@ public int getCurRpc() { public long getReportId() { return reportId; } + + public long getLeaseId() { + return leaseId; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java index a3b6004644..dfe081382c 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/DatanodeProtocol.java @@ -102,6 +102,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration * @param xceiverCount number of active transceiver threads * @param failedVolumes number of failed volumes * @param volumeFailureSummary info about volume failures + * @param requestFullBlockReportLease whether to request a full block + * report lease. * @throws IOException on error */ @Idempotent @@ -112,7 +114,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration, int xmitsInProgress, int xceiverCount, int failedVolumes, - VolumeFailureSummary volumeFailureSummary) + VolumeFailureSummary volumeFailureSummary, + boolean requestFullBlockReportLease) throws IOException; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java index d00179ecad..8d6384e700 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/HeartbeatResponse.java @@ -34,12 +34,16 @@ public class HeartbeatResponse { private final NNHAStatusHeartbeat haStatus; private final RollingUpgradeStatus rollingUpdateStatus; + + private final long fullBlockReportLeaseId; public HeartbeatResponse(DatanodeCommand[] cmds, - NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus) { + NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus, + long fullBlockReportLeaseId) { commands = cmds; this.haStatus = haStatus; this.rollingUpdateStatus = rollingUpdateStatus; + this.fullBlockReportLeaseId = fullBlockReportLeaseId; } public DatanodeCommand[] getCommands() { @@ -53,4 +57,8 @@ public NNHAStatusHeartbeat getNameNodeHaState() { public RollingUpgradeStatus getRollingUpdateStatus() { return rollingUpdateStatus; } + + public long getFullBlockReportLeaseId() { + return fullBlockReportLeaseId; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java index a102c8291b..2f7d334241 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/protocol/RegisterCommand.java @@ -21,7 +21,7 @@ import org.apache.hadoop.classification.InterfaceStability; /** - * A BlockCommand is an instruction to a datanode to register with the namenode. + * A RegisterCommand is an instruction to a datanode to register with the namenode. * This command can't be combined with other commands in the same response. * This is because after the datanode processes RegisterCommand, it will skip * the rest of the DatanodeCommands in the same HeartbeatResponse. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto index 3083dc90f6..b87e7533bb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/proto/DatanodeProtocol.proto @@ -195,6 +195,7 @@ message HeartbeatRequestProto { optional uint64 cacheCapacity = 6 [ default = 0 ]; optional uint64 cacheUsed = 7 [default = 0 ]; optional VolumeFailureSummaryProto volumeFailureSummary = 8; + optional bool requestFullBlockReportLease = 9 [ default = false ]; } /** @@ -214,6 +215,7 @@ message HeartbeatResponseProto { repeated DatanodeCommandProto cmds = 1; // Returned commands can be null required NNHAStatusHeartbeatProto haStatus = 2; optional RollingUpgradeStatusProto rollingUpgradeStatus = 3; + optional uint64 fullBlockReportLeaseId = 4 [ default = 0 ]; } /** @@ -243,6 +245,10 @@ message BlockReportContextProto { // The unique 64-bit ID of this block report required int64 id = 3; + + // The block report lease ID, or 0 if we are sending without a lease to + // bypass rate-limiting. + optional uint64 leaseId = 4 [ default = 0 ]; } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 7b579cb51c..fdb0bc82d5 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -577,6 +577,27 @@ + + dfs.namenode.max.full.block.report.leases + 6 + The maximum number of leases for full block reports that the + NameNode will issue at any given time. This prevents the NameNode from + being flooded with full block reports that use up all the RPC handler + threads. This number should never be more than the number of RPC handler + threads or less than 1. + + + + + dfs.namenode.full.block.report.lease.length.ms + 300000 + + The number of milliseconds that the NameNode will wait before invalidating + a full block report lease. This prevents a crashed DataNode from + permanently using up a full block report lease. + + + dfs.datanode.directoryscan.interval 21600 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java index f0dab4c013..9ead7656d0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/protocol/TestBlockListAsLongs.java @@ -221,7 +221,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) { request.set(null); nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask()); nn.blockReport(reg, "pool", sbr, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); BlockReportRequestProto proto = request.get(); assertNotNull(proto); assertTrue(proto.getReports(0).getBlocksList().isEmpty()); @@ -231,7 +231,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) { request.set(null); nsInfo.setCapabilities(Capability.UNKNOWN.getMask()); nn.blockReport(reg, "pool", sbr, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); proto = request.get(); assertNotNull(proto); assertFalse(proto.getReports(0).getBlocksList().isEmpty()); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java new file mode 100644 index 0000000000..fc5f9e7cb3 --- /dev/null +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestBlockReportRateLimiting.java @@ -0,0 +1,246 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.hdfs.server.blockmanagement; + +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES; +import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS; + +import com.google.common.base.Joiner; +import com.google.common.base.Supplier; +import com.google.common.util.concurrent.Uninterruptibles; +import org.apache.commons.lang.mutable.MutableObject; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.hdfs.protocol.DatanodeID; +import org.apache.hadoop.hdfs.server.datanode.DataNode; +import org.apache.hadoop.hdfs.server.protocol.BlockReportContext; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.log4j.Level; +import org.junit.Assert; +import org.junit.After; +import org.junit.BeforeClass; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashSet; +import java.util.List; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Semaphore; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; + +public class TestBlockReportRateLimiting { + static final Log LOG = LogFactory.getLog(TestBlockReportRateLimiting.class); + + private static void setFailure(AtomicReference failure, + String what) { + failure.compareAndSet("", what); + LOG.error("Test error: " + what); + } + + @After + public void restoreNormalBlockManagerFaultInjector() { + BlockManagerFaultInjector.instance = new BlockManagerFaultInjector(); + } + + @BeforeClass + public static void raiseBlockManagerLogLevels() { + GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL); + GenericTestUtils.setLogLevel(BlockReportLeaseManager.LOG, Level.ALL); + } + + @Test(timeout=180000) + public void testRateLimitingDuringDataNodeStartup() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1); + conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, + 20L * 60L * 1000L); + + final Semaphore fbrSem = new Semaphore(0); + final HashSet expectedFbrDns = new HashSet<>(); + final HashSet fbrDns = new HashSet<>(); + final AtomicReference failure = new AtomicReference(""); + + final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() { + private int numLeases = 0; + + @Override + public void incomingBlockReportRpc(DatanodeID nodeID, + BlockReportContext context) throws IOException { + LOG.info("Incoming full block report from " + nodeID + + ". Lease ID = 0x" + Long.toHexString(context.getLeaseId())); + if (context.getLeaseId() == 0) { + setFailure(failure, "Got unexpected rate-limiting-" + + "bypassing full block report RPC from " + nodeID); + } + fbrSem.acquireUninterruptibly(); + synchronized (this) { + fbrDns.add(nodeID); + if (!expectedFbrDns.remove(nodeID)) { + setFailure(failure, "Got unexpected full block report " + + "RPC from " + nodeID + ". expectedFbrDns = " + + Joiner.on(", ").join(expectedFbrDns)); + } + LOG.info("Proceeding with full block report from " + + nodeID + ". Lease ID = 0x" + + Long.toHexString(context.getLeaseId())); + } + } + + @Override + public void requestBlockReportLease(DatanodeDescriptor node, + long leaseId) { + if (leaseId == 0) { + return; + } + synchronized (this) { + numLeases++; + expectedFbrDns.add(node); + LOG.info("requestBlockReportLease(node=" + node + + ", leaseId=0x" + Long.toHexString(leaseId) + "). " + + "expectedFbrDns = " + Joiner.on(", ").join(expectedFbrDns)); + if (numLeases > 1) { + setFailure(failure, "More than 1 lease was issued at once."); + } + } + } + + @Override + public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) { + LOG.info("removeBlockReportLease(node=" + node + + ", leaseId=0x" + Long.toHexString(leaseId) + ")"); + synchronized (this) { + numLeases--; + } + } + }; + BlockManagerFaultInjector.instance = injector; + + final int NUM_DATANODES = 5; + MiniDFSCluster cluster = + new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build(); + cluster.waitActive(); + for (int n = 1; n <= NUM_DATANODES; n++) { + LOG.info("Waiting for " + n + " datanode(s) to report in."); + fbrSem.release(); + Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS); + final int currentN = n; + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + synchronized (injector) { + if (fbrDns.size() > currentN) { + setFailure(failure, "Expected at most " + currentN + + " datanodes to have sent a block report, but actually " + + fbrDns.size() + " have."); + } + return (fbrDns.size() >= currentN); + } + } + }, 25, 50000); + } + cluster.shutdown(); + Assert.assertEquals("", failure.get()); + } + + /** + * Start a 2-node cluster with only one block report lease. When the + * first datanode gets a lease, kill it. Then wait for the lease to + * expire, and the second datanode to send a full block report. + */ + @Test(timeout=180000) + public void testLeaseExpiration() throws Exception { + Configuration conf = new Configuration(); + conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1); + conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L); + + final Semaphore gotFbrSem = new Semaphore(0); + final AtomicReference failure = new AtomicReference(""); + final AtomicReference cluster = + new AtomicReference<>(null); + final BlockingQueue datanodeToStop = + new ArrayBlockingQueue(1); + final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() { + private String uuidToStop = ""; + + @Override + public void incomingBlockReportRpc(DatanodeID nodeID, + BlockReportContext context) throws IOException { + if (context.getLeaseId() == 0) { + setFailure(failure, "Got unexpected rate-limiting-" + + "bypassing full block report RPC from " + nodeID); + } + synchronized (this) { + if (uuidToStop.equals(nodeID.getDatanodeUuid())) { + throw new IOException("Injecting failure into block " + + "report RPC for " + nodeID); + } + } + gotFbrSem.release(); + } + + @Override + public void requestBlockReportLease(DatanodeDescriptor node, + long leaseId) { + if (leaseId == 0) { + return; + } + synchronized (this) { + if (uuidToStop.isEmpty()) { + MiniDFSCluster cl; + do { + cl = cluster.get(); + } while (cl == null); + int datanodeIndexToStop = getDatanodeIndex(cl, node); + uuidToStop = node.getDatanodeUuid(); + datanodeToStop.add(Integer.valueOf(datanodeIndexToStop)); + } + } + } + + private int getDatanodeIndex(MiniDFSCluster cl, + DatanodeDescriptor node) { + List datanodes = cl.getDataNodes(); + for (int i = 0; i < datanodes.size(); i++) { + DataNode datanode = datanodes.get(i); + if (datanode.getDatanodeUuid().equals(node.getDatanodeUuid())) { + return i; + } + } + throw new RuntimeException("Failed to find UUID " + + node.getDatanodeUuid() + " in the list of datanodes."); + } + + @Override + public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) { + } + }; + BlockManagerFaultInjector.instance = injector; + cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build()); + cluster.get().waitActive(); + int datanodeIndexToStop = datanodeToStop.take(); + cluster.get().stopDataNode(datanodeIndexToStop); + gotFbrSem.acquire(); + cluster.get().shutdown(); + Assert.assertEquals("", failure.get()); + } +} diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java index bf167a58ba..39bd5d13aa 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestDatanodeManager.java @@ -59,6 +59,15 @@ public class TestDatanodeManager { //The number of times the registration / removal of nodes should happen final int NUM_ITERATIONS = 500; + private static DatanodeManager mockDatanodeManager( + FSNamesystem fsn, Configuration conf) throws IOException { + BlockManager bm = Mockito.mock(BlockManager.class); + BlockReportLeaseManager blm = new BlockReportLeaseManager(conf); + Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm); + DatanodeManager dm = new DatanodeManager(bm, fsn, conf); + return dm; + } + /** * This test sends a random sequence of node registrations and node removals * to the DatanodeManager (of nodes with different IDs and versions), and @@ -70,8 +79,7 @@ public void testNumVersionsReportedCorrect() throws IOException { //Create the DatanodeManager which will be tested FSNamesystem fsn = Mockito.mock(FSNamesystem.class); Mockito.when(fsn.hasWriteLock()).thenReturn(true); - DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), - fsn, new Configuration()); + DatanodeManager dm = mockDatanodeManager(fsn, new Configuration()); //Seed the RNG with a known value so test failures are easier to reproduce Random rng = new Random(); @@ -183,9 +191,8 @@ public void testRejectUnresolvedDatanodes() throws IOException { TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class); //create DatanodeManager - DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), - fsn, conf); - + DatanodeManager dm = mockDatanodeManager(fsn, conf); + //storageID to register. String storageID = "someStorageID-123"; @@ -258,7 +265,6 @@ public void testBadScript() throws IOException, URISyntaxException { HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script")); } - /** * Helper function that tests the DatanodeManagers SortedBlock function * we invoke this function with and without topology scripts @@ -281,8 +287,7 @@ public void HelperFunction(String scriptFileName) conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY, resourcePath.toString()); } - DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class), - fsn, conf); + DatanodeManager dm = mockDatanodeManager(fsn, conf); // register 5 datanodes, each with different storage ID and type DatanodeInfo[] locs = new DatanodeInfo[5]; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java index d73f63e738..cea686596e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestNameNodePrunesMissingStorages.java @@ -115,7 +115,7 @@ private static void runTest(final String testCaseName, // Stop the DataNode and send fake heartbeat with missing storage. cluster.stopDataNode(0); cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0, - 0, null); + 0, null, true); // Check that the missing storage was pruned. assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java index 64cc78bf21..f970b3f3cf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBPOfferService.java @@ -28,6 +28,7 @@ import java.net.InetSocketAddress; import java.util.List; import java.util.Map; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; @@ -143,7 +144,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx) Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), - Mockito.any(VolumeFailureSummary.class)); + Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean()); mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0); return mock; } @@ -164,7 +166,8 @@ public HeartbeatAnswer(int nnIdx) { public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable { heartbeatCounts[nnIdx]++; return new HeartbeatResponse(new DatanodeCommand[0], - mockHaStatuses[nnIdx], null); + mockHaStatuses[nnIdx], null, + ThreadLocalRandom.current().nextLong() | 1L); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java index c65ef852bb..27d1ceac9d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockHasMultipleReplicasOnSameDN.java @@ -126,7 +126,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException { // Should not assert! cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); // Get the block locations once again. locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java index f91c0bc3cc..7552e109e2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBlockRecovery.java @@ -39,6 +39,7 @@ import java.util.ArrayList; import java.util.Collection; import java.util.List; +import java.util.concurrent.ThreadLocalRandom; import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; @@ -162,11 +163,12 @@ public DatanodeRegistration answer(InvocationOnMock invocation) Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), - Mockito.any(VolumeFailureSummary.class))) + Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean())) .thenReturn(new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), - null)); + null, ThreadLocalRandom.current().nextLong() | 1L)); dn = new DataNode(conf, locations, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java index 0d7484c866..b9b6512d57 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestBpServiceActorScheduler.java @@ -57,7 +57,7 @@ public void testInit() { for (final long now : getTimestamps()) { Scheduler scheduler = makeMockScheduler(now); assertTrue(scheduler.isHeartbeatDue(now)); - assertTrue(scheduler.isBlockReportDue()); + assertTrue(scheduler.isBlockReportDue(scheduler.monotonicNow())); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java index bf80887313..e784c7aaf1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDatanodeProtocolRetryPolicy.java @@ -27,6 +27,7 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; import java.util.ArrayList; +import java.util.concurrent.ThreadLocalRandom; import com.google.common.base.Supplier; import org.apache.commons.logging.Log; @@ -199,13 +200,13 @@ public HeartbeatResponse answer(InvocationOnMock invocation) heartbeatResponse = new HeartbeatResponse( new DatanodeCommand[]{RegisterCommand.REGISTER}, new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), - null); + null, ThreadLocalRandom.current().nextLong() | 1L); } else { LOG.info("mockito heartbeatResponse " + i); heartbeatResponse = new HeartbeatResponse( new DatanodeCommand[0], new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1), - null); + null, ThreadLocalRandom.current().nextLong() | 1L); } return heartbeatResponse; } @@ -217,7 +218,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation) Mockito.anyInt(), Mockito.anyInt(), Mockito.anyInt(), - Mockito.any(VolumeFailureSummary.class)); + Mockito.any(VolumeFailureSummary.class), + Mockito.anyBoolean()); dn = new DataNode(conf, locations, null) { @Override diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java index 58932fbda1..cb4022ea91 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestFsDatasetCache.java @@ -21,6 +21,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.mockito.Matchers.any; +import static org.mockito.Matchers.anyBoolean; import static org.mockito.Matchers.anyInt; import static org.mockito.Matchers.anyLong; import static org.mockito.Mockito.doReturn; @@ -31,6 +32,7 @@ import java.nio.channels.FileChannel; import java.util.HashSet; import java.util.Set; +import java.util.concurrent.ThreadLocalRandom; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -159,11 +161,14 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds) throws IOException { NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE, fsImage.getLastAppliedOrWrittenTxId()); - HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null); + HeartbeatResponse response = + new HeartbeatResponse(cmds, ha, null, + ThreadLocalRandom.current().nextLong() | 1L); doReturn(response).when(spyNN).sendHeartbeat( (DatanodeRegistration) any(), (StorageReport[]) any(), anyLong(), anyLong(), - anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any()); + anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(), + anyBoolean()); } private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java index b150b0d345..67bbefe342 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesBlockReportPerStorage.java @@ -40,7 +40,7 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId, LOG.info("Sending block report for storage " + report.getStorage().getStorageID()); StorageBlockReport[] singletonReport = { report }; cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport, - new BlockReportContext(reports.length, i, System.nanoTime())); + new BlockReportContext(reports.length, i, System.nanoTime(), 0L)); i++; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java index dca3c880bf..fd19ba6f0b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestNNHandlesCombinedBlockReport.java @@ -36,6 +36,6 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId, StorageBlockReport[] reports) throws IOException { LOG.info("Sending combined block reports for " + dnR); cluster.getNameNodeRpc().blockReport(dnR, poolId, reports, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java index ecb28dcf05..a6032c1ce2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestStorageReport.java @@ -106,7 +106,7 @@ public void testStorageReportHasStorageTypeAndState() throws IOException { any(DatanodeRegistration.class), captor.capture(), anyLong(), anyLong(), anyInt(), anyInt(), anyInt(), - Mockito.any(VolumeFailureSummary.class)); + Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean()); StorageReport[] reports = captor.getValue(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java index 2964f9a018..39894b5b09 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NNThroughputBenchmark.java @@ -968,7 +968,7 @@ void register() throws IOException { new StorageBlockReport(storage, BlockListAsLongs.EMPTY) }; dataNodeProto.blockReport(dnRegistration, bpid, reports, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); } /** @@ -981,7 +981,7 @@ void sendHeartbeat() throws IOException { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep, - 0L, 0L, 0, 0, 0, null).getCommands(); + 0L, 0L, 0, 0, 0, null, true).getCommands(); if(cmds != null) { for (DatanodeCommand cmd : cmds ) { if(LOG.isDebugEnabled()) { @@ -1030,7 +1030,7 @@ int replicateBlocks() throws IOException { StorageReport[] rep = { new StorageReport(storage, false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) }; DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, - rep, 0L, 0L, 0, 0, 0, null).getCommands(); + rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); if (cmds != null) { for (DatanodeCommand cmd : cmds) { if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) { @@ -1213,7 +1213,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException { StorageBlockReport[] report = { new StorageBlockReport( dn.storage, dn.getBlockReportList()) }; dataNodeProto.blockReport(dn.dnRegistration, bpid, report, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); long end = Time.now(); return end-start; } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java index 4ca5edaf16..b314584987 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/NameNodeAdapter.java @@ -117,7 +117,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg, DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException { return namesystem.handleHeartbeat(nodeReg, BlockManagerTestUtil.getStorageReportsForDatanode(dd), - dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null); + dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true); } public static boolean setReplication(final FSNamesystem ns, diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java index 92c329e332..ff70c3f144 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/TestDeadDatanode.java @@ -109,7 +109,7 @@ public void testDeadDatanode() throws Exception { BlockListAsLongs.EMPTY) }; try { dnp.blockReport(reg, poolId, report, - new BlockReportContext(1, 0, System.nanoTime())); + new BlockReportContext(1, 0, System.nanoTime(), 0L)); fail("Expected IOException is not thrown"); } catch (IOException ex) { // Expected @@ -120,8 +120,8 @@ public void testDeadDatanode() throws Exception { StorageReport[] rep = { new StorageReport( new DatanodeStorage(reg.getDatanodeUuid()), false, 0, 0, 0, 0) }; - DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null) - .getCommands(); + DatanodeCommand[] cmd = + dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands(); assertEquals(1, cmd.length); assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER .getAction());