HDFS-7923. The DataNodes should rate-limit their full block reports by asking the NN on heartbeat messages (cmccabe)

This commit is contained in:
Colin Patrick Mccabe 2015-06-12 11:17:51 -07:00
parent e4489d97e5
commit 12b5b06c06
34 changed files with 890 additions and 71 deletions

View File

@ -434,6 +434,10 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_BLOCKREPORT_INITIAL_DELAY_DEFAULT = 0;
public static final String DFS_BLOCKREPORT_SPLIT_THRESHOLD_KEY = "dfs.blockreport.split.threshold";
public static final long DFS_BLOCKREPORT_SPLIT_THRESHOLD_DEFAULT = 1000 * 1000;
public static final String DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES = "dfs.namenode.max.full.block.report.leases";
public static final int DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT = 6;
public static final String DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS = "dfs.namenode.full.block.report.lease.length.ms";
public static final long DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT = 5L * 60L * 1000L;
public static final String DFS_CACHEREPORT_INTERVAL_MSEC_KEY = "dfs.cachereport.intervalMsec";
public static final long DFS_CACHEREPORT_INTERVAL_MSEC_DEFAULT = 10 * 1000;
public static final String DFS_BLOCK_INVALIDATE_LIMIT_KEY = "dfs.block.invalidate.limit";

View File

@ -132,11 +132,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xmitsInProgress, int xceiverCount, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
HeartbeatRequestProto.Builder builder = HeartbeatRequestProto.newBuilder()
.setRegistration(PBHelper.convert(registration))
.setXmitsInProgress(xmitsInProgress).setXceiverCount(xceiverCount)
.setFailedVolumes(failedVolumes);
.setFailedVolumes(failedVolumes)
.setRequestFullBlockReportLease(requestFullBlockReportLease);
builder.addAllReports(PBHelper.convertStorageReports(reports));
if (cacheCapacity != 0) {
builder.setCacheCapacity(cacheCapacity);
@ -165,7 +167,7 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
rollingUpdateStatus = PBHelper.convert(resp.getRollingUpgradeStatus());
}
return new HeartbeatResponse(cmds, PBHelper.convert(resp.getHaStatus()),
rollingUpdateStatus);
rollingUpdateStatus, resp.getFullBlockReportLeaseId());
}
@Override

View File

@ -114,7 +114,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
report, request.getCacheCapacity(), request.getCacheUsed(),
request.getXmitsInProgress(),
request.getXceiverCount(), request.getFailedVolumes(),
volumeFailureSummary);
volumeFailureSummary, request.getRequestFullBlockReportLease());
} catch (IOException e) {
throw new ServiceException(e);
}
@ -135,6 +135,7 @@ public HeartbeatResponseProto sendHeartbeat(RpcController controller,
builder.setRollingUpgradeStatus(PBHelper
.convertRollingUpgradeStatus(rollingUpdateStatus));
}
builder.setFullBlockReportLeaseId(response.getFullBlockReportLeaseId());
return builder.build();
}

View File

@ -3042,7 +3042,7 @@ public static boolean[] convertBooleanList(
public static BlockReportContext convert(BlockReportContextProto proto) {
return new BlockReportContext(proto.getTotalRpcs(),
proto.getCurRpc(), proto.getId());
proto.getCurRpc(), proto.getId(), proto.getLeaseId());
}
public static BlockReportContextProto convert(BlockReportContext context) {
@ -3050,6 +3050,7 @@ public static BlockReportContextProto convert(BlockReportContext context) {
setTotalRpcs(context.getTotalRpcs()).
setCurRpc(context.getCurRpc()).
setId(context.getReportId()).
setLeaseId(context.getLeaseId()).
build();
}
}

View File

@ -75,6 +75,7 @@
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations;
import org.apache.hadoop.hdfs.server.protocol.BlocksWithLocations.BlockWithLocations;
import org.apache.hadoop.hdfs.server.protocol.DatanodeCommand;
import org.apache.hadoop.hdfs.server.protocol.DatanodeRegistration;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage.State;
import org.apache.hadoop.hdfs.server.protocol.KeyUpdateCommand;
@ -124,6 +125,7 @@ public class BlockManager {
private final AtomicLong excessBlocksCount = new AtomicLong(0L);
private final AtomicLong postponedMisreplicatedBlocksCount = new AtomicLong(0L);
private final long startupDelayBlockDeletionInMs;
private final BlockReportLeaseManager blockReportLeaseManager;
/** Used by metrics */
public long getPendingReplicationBlocksCount() {
@ -348,6 +350,7 @@ public BlockManager(final Namesystem namesystem, final Configuration conf)
this.numBlocksPerIteration = conf.getInt(
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT,
DFSConfigKeys.DFS_BLOCK_MISREPLICATION_PROCESSING_LIMIT_DEFAULT);
this.blockReportLeaseManager = new BlockReportLeaseManager(conf);
LOG.info("defaultReplication = " + defaultReplication);
LOG.info("maxReplication = " + maxReplication);
@ -1713,6 +1716,27 @@ private void processPendingReplications() {
}
}
public long requestBlockReportLeaseId(DatanodeRegistration nodeReg) {
assert namesystem.hasReadLock();
DatanodeDescriptor node = null;
try {
node = datanodeManager.getDatanode(nodeReg);
} catch (UnregisteredNodeException e) {
LOG.warn("Unregistered datanode {}", nodeReg);
return 0;
}
if (node == null) {
LOG.warn("Failed to find datanode {}", nodeReg);
return 0;
}
// Request a new block report lease. The BlockReportLeaseManager has
// its own internal locking.
long leaseId = blockReportLeaseManager.requestLease(node);
BlockManagerFaultInjector.getInstance().
requestBlockReportLease(node, leaseId);
return leaseId;
}
/**
* StatefulBlockInfo is used to build the "toUC" list, which is a list of
* updates to the information about under-construction blocks.
@ -1817,6 +1841,12 @@ public boolean processReport(final DatanodeID nodeID,
+ " because namenode still in startup phase", nodeID);
return !node.hasStaleStorages();
}
if (context != null) {
if (!blockReportLeaseManager.checkLease(node, startTime,
context.getLeaseId())) {
return false;
}
}
if (storageInfo.getBlockReportCount() == 0) {
// The first block report can be processed a lot more efficiently than
@ -1835,6 +1865,9 @@ public boolean processReport(final DatanodeID nodeID,
if (lastStorageInRpc) {
int rpcsSeen = node.updateBlockReportContext(context);
if (rpcsSeen >= context.getTotalRpcs()) {
long leaseId = blockReportLeaseManager.removeLease(node);
BlockManagerFaultInjector.getInstance().
removeBlockReportLease(node, leaseId);
List<DatanodeStorageInfo> zombies = node.removeZombieStorages();
if (zombies.isEmpty()) {
LOG.debug("processReport 0x{}: no zombie storages found.",
@ -3845,4 +3878,8 @@ public void clear() {
clearQueues();
blocksMap.clear();
}
public BlockReportLeaseManager getBlockReportLeaseManager() {
return blockReportLeaseManager;
}
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import java.io.IOException;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
/**
* Used to inject certain faults for testing.
*/
public class BlockManagerFaultInjector {
@VisibleForTesting
public static BlockManagerFaultInjector instance =
new BlockManagerFaultInjector();
@VisibleForTesting
public static BlockManagerFaultInjector getInstance() {
return instance;
}
@VisibleForTesting
public void incomingBlockReportRpc(DatanodeID nodeID,
BlockReportContext context) throws IOException {
}
@VisibleForTesting
public void requestBlockReportLease(DatanodeDescriptor node, long leaseId) {
}
@VisibleForTesting
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
}
}

View File

@ -0,0 +1,355 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.util.Time;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.HashMap;
import java.util.concurrent.ThreadLocalRandom;
/**
* The BlockReportLeaseManager manages block report leases.<p/>
*
* DataNodes request BR leases from the NameNode by sending a heartbeat with
* the requestBlockReportLease field set. The NameNode may choose to respond
* with a non-zero lease ID. If so, that DataNode can send a block report with
* the given lease ID for the next few minutes. The NameNode will accept
* these full block reports.<p/>
*
* BR leases limit the number of incoming full block reports to the NameNode
* at any given time. For compatibility reasons, the NN will always accept
* block reports sent with a lease ID of 0 and queue them for processing
* immediately. Full block reports which were manually triggered will also
* have a lease ID of 0, bypassing the rate-limiting.<p/>
*
* Block report leases expire after a certain amount of time. This mechanism
* is in place so that a DN which dies while holding a lease does not
* permanently decrease the number of concurrent block reports which the NN is
* willing to accept.<p/>
*
* When considering which DNs to grant a BR lease, the NameNode gives priority
* to the DNs which have gone the longest without sending a full block
* report.<p/>
*/
class BlockReportLeaseManager {
static final Logger LOG =
LoggerFactory.getLogger(BlockReportLeaseManager.class);
private static class NodeData {
/**
* The UUID of the datanode.
*/
final String datanodeUuid;
/**
* The lease ID, or 0 if there is no lease.
*/
long leaseId;
/**
* The time when the lease was issued, or 0 if there is no lease.
*/
long leaseTimeMs;
/**
* Previous element in the list.
*/
NodeData prev;
/**
* Next element in the list.
*/
NodeData next;
static NodeData ListHead(String name) {
NodeData node = new NodeData(name);
node.next = node;
node.prev = node;
return node;
}
NodeData(String datanodeUuid) {
this.datanodeUuid = datanodeUuid;
}
void removeSelf() {
if (this.prev != null) {
this.prev.next = this.next;
}
if (this.next != null) {
this.next.prev = this.prev;
}
this.next = null;
this.prev = null;
}
void addToEnd(NodeData node) {
Preconditions.checkState(node.next == null);
Preconditions.checkState(node.prev == null);
node.prev = this.prev;
node.next = this;
this.prev.next = node;
this.prev = node;
}
void addToBeginning(NodeData node) {
Preconditions.checkState(node.next == null);
Preconditions.checkState(node.prev == null);
node.next = this.next;
node.prev = this;
this.next.prev = node;
this.next = node;
}
}
/**
* List of datanodes which don't currently have block report leases.
*/
private final NodeData deferredHead = NodeData.ListHead("deferredHead");
/**
* List of datanodes which currently have block report leases.
*/
private final NodeData pendingHead = NodeData.ListHead("pendingHead");
/**
* Maps datanode UUIDs to NodeData.
*/
private final HashMap<String, NodeData> nodes = new HashMap<>();
/**
* The current length of the pending list.
*/
private int numPending = 0;
/**
* The maximum number of leases to hand out at any given time.
*/
private final int maxPending;
/**
* The number of milliseconds after which a lease will expire.
*/
private final long leaseExpiryMs;
/**
* The next ID we will use for a block report lease.
*/
private long nextId = ThreadLocalRandom.current().nextLong();
BlockReportLeaseManager(Configuration conf) {
this(conf.getInt(
DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES,
DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES_DEFAULT),
conf.getLong(
DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS_DEFAULT));
}
BlockReportLeaseManager(int maxPending, long leaseExpiryMs) {
Preconditions.checkArgument(maxPending >= 1,
"Cannot set the maximum number of block report leases to a " +
"value less than 1.");
this.maxPending = maxPending;
Preconditions.checkArgument(leaseExpiryMs >= 1,
"Cannot set full block report lease expiry period to a value " +
"less than 1.");
this.leaseExpiryMs = leaseExpiryMs;
}
/**
* Get the next block report lease ID. Any number is valid except 0.
*/
private synchronized long getNextId() {
long id;
do {
id = nextId++;
} while (id == 0);
return id;
}
public synchronized void register(DatanodeDescriptor dn) {
registerNode(dn);
}
private synchronized NodeData registerNode(DatanodeDescriptor dn) {
if (nodes.containsKey(dn.getDatanodeUuid())) {
LOG.info("Can't register DN {} because it is already registered.",
dn.getDatanodeUuid());
return null;
}
NodeData node = new NodeData(dn.getDatanodeUuid());
deferredHead.addToBeginning(node);
nodes.put(dn.getDatanodeUuid(), node);
LOG.info("Registered DN {} ({}).", dn.getDatanodeUuid(), dn.getXferAddr());
return node;
}
private synchronized void remove(NodeData node) {
if (node.leaseId != 0) {
numPending--;
node.leaseId = 0;
node.leaseTimeMs = 0;
}
node.removeSelf();
}
public synchronized void unregister(DatanodeDescriptor dn) {
NodeData node = nodes.remove(dn.getDatanodeUuid());
if (node == null) {
LOG.info("Can't unregister DN {} because it is not currently " +
"registered.", dn.getDatanodeUuid());
return;
}
remove(node);
}
public synchronized long requestLease(DatanodeDescriptor dn) {
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.warn("DN {} ({}) requested a lease even though it wasn't yet " +
"registered. Registering now.", dn.getDatanodeUuid(),
dn.getXferAddr());
node = registerNode(dn);
}
if (node.leaseId != 0) {
// The DataNode wants a new lease, even though it already has one.
// This can happen if the DataNode is restarted in between requesting
// a lease and using it.
LOG.debug("Removing existing BR lease 0x{} for DN {} in order to " +
"issue a new one.", Long.toHexString(node.leaseId),
dn.getDatanodeUuid());
}
remove(node);
long monotonicNowMs = Time.monotonicNow();
pruneExpiredPending(monotonicNowMs);
if (numPending >= maxPending) {
if (LOG.isDebugEnabled()) {
StringBuilder allLeases = new StringBuilder();
String prefix = "";
for (NodeData cur = pendingHead.next; cur != pendingHead;
cur = cur.next) {
allLeases.append(prefix).append(cur.datanodeUuid);
prefix = ", ";
}
LOG.debug("Can't create a new BR lease for DN {}, because " +
"numPending equals maxPending at {}. Current leases: {}",
dn.getDatanodeUuid(), numPending, allLeases.toString());
}
return 0;
}
numPending++;
node.leaseId = getNextId();
node.leaseTimeMs = monotonicNowMs;
pendingHead.addToEnd(node);
if (LOG.isDebugEnabled()) {
LOG.debug("Created a new BR lease 0x{} for DN {}. numPending = {}",
Long.toHexString(node.leaseId), dn.getDatanodeUuid(), numPending);
}
return node.leaseId;
}
private synchronized boolean pruneIfExpired(long monotonicNowMs,
NodeData node) {
if (monotonicNowMs < node.leaseTimeMs + leaseExpiryMs) {
return false;
}
LOG.info("Removing expired block report lease 0x{} for DN {}.",
Long.toHexString(node.leaseId), node.datanodeUuid);
Preconditions.checkState(node.leaseId != 0);
remove(node);
deferredHead.addToBeginning(node);
return true;
}
private synchronized void pruneExpiredPending(long monotonicNowMs) {
NodeData cur = pendingHead.next;
while (cur != pendingHead) {
NodeData next = cur.next;
if (!pruneIfExpired(monotonicNowMs, cur)) {
return;
}
cur = next;
}
LOG.trace("No entries remaining in the pending list.");
}
public synchronized boolean checkLease(DatanodeDescriptor dn,
long monotonicNowMs, long id) {
if (id == 0) {
LOG.debug("Datanode {} is using BR lease id 0x0 to bypass " +
"rate-limiting.", dn.getDatanodeUuid());
return true;
}
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.info("BR lease 0x{} is not valid for unknown datanode {}",
Long.toHexString(id), dn.getDatanodeUuid());
return false;
}
if (node.leaseId == 0) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the DN " +
"is not in the pending set.",
Long.toHexString(id), dn.getDatanodeUuid());
return false;
}
if (pruneIfExpired(monotonicNowMs, node)) {
LOG.warn("BR lease 0x{} is not valid for DN {}, because the lease " +
"has expired.", Long.toHexString(id), dn.getDatanodeUuid());
return false;
}
if (id != node.leaseId) {
LOG.warn("BR lease 0x{} is not valid for DN {}. Expected BR lease 0x{}.",
Long.toHexString(id), dn.getDatanodeUuid(),
Long.toHexString(node.leaseId));
return false;
}
if (LOG.isTraceEnabled()) {
LOG.trace("BR lease 0x{} is valid for DN {}.",
Long.toHexString(id), dn.getDatanodeUuid());
}
return true;
}
public synchronized long removeLease(DatanodeDescriptor dn) {
NodeData node = nodes.get(dn.getDatanodeUuid());
if (node == null) {
LOG.info("Can't remove lease for unknown datanode {}",
dn.getDatanodeUuid());
return 0;
}
long id = node.leaseId;
if (id == 0) {
LOG.debug("DN {} has no lease to remove.", dn.getDatanodeUuid());
return 0;
}
remove(node);
deferredHead.addToEnd(node);
if (LOG.isTraceEnabled()) {
LOG.trace("Removed BR lease 0x{} for DN {}. numPending = {}",
Long.toHexString(id), dn.getDatanodeUuid(), numPending);
}
return id;
}
}

View File

@ -540,6 +540,7 @@ private void removeDatanode(DatanodeDescriptor nodeInfo) {
blockManager.removeBlocksAssociatedTo(nodeInfo);
networktopology.remove(nodeInfo);
decrementVersionCount(nodeInfo.getSoftwareVersion());
blockManager.getBlockReportLeaseManager().unregister(nodeInfo);
if (LOG.isDebugEnabled()) {
LOG.debug("remove datanode " + nodeInfo);
@ -602,6 +603,7 @@ void addDatanode(final DatanodeDescriptor node) {
networktopology.add(node); // may throw InvalidTopologyException
host2DatanodeMap.add(node);
checkIfClusterIsNowMultiRack(node);
blockManager.getBlockReportLeaseManager().register(node);
if (LOG.isDebugEnabled()) {
LOG.debug(getClass().getSimpleName() + ".addDatanode: "

View File

@ -29,6 +29,8 @@
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import com.google.common.base.Joiner;
import org.apache.commons.logging.Log;
@ -355,9 +357,10 @@ void notifyNamenodeDeletedBlock(
void triggerBlockReportForTests() {
synchronized (pendingIncrementalBRperStorage) {
scheduler.scheduleHeartbeat();
long nextBlockReportTime = scheduler.scheduleBlockReport(0);
long oldBlockReportTime = scheduler.nextBlockReportTime;
scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
while (nextBlockReportTime - scheduler.nextBlockReportTime >= 0) {
while (oldBlockReportTime == scheduler.nextBlockReportTime) {
try {
pendingIncrementalBRperStorage.wait(100);
} catch (InterruptedException e) {
@ -419,12 +422,7 @@ private long generateUniqueBlockReportId() {
* @return DatanodeCommands returned by the NN. May be null.
* @throws IOException
*/
List<DatanodeCommand> blockReport() throws IOException {
// send block report if timer has expired.
if (!scheduler.isBlockReportDue()) {
return null;
}
List<DatanodeCommand> blockReport(long fullBrLeaseId) throws IOException {
final ArrayList<DatanodeCommand> cmds = new ArrayList<DatanodeCommand>();
// Flush any block information that precedes the block report. Otherwise
@ -460,7 +458,7 @@ List<DatanodeCommand> blockReport() throws IOException {
// Below split threshold, send all reports in a single message.
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), reports,
new BlockReportContext(1, 0, reportId));
new BlockReportContext(1, 0, reportId, fullBrLeaseId));
numRPCs = 1;
numReportsSent = reports.length;
if (cmd != null) {
@ -472,7 +470,8 @@ List<DatanodeCommand> blockReport() throws IOException {
StorageBlockReport singleReport[] = { reports[r] };
DatanodeCommand cmd = bpNamenode.blockReport(
bpRegistration, bpos.getBlockPoolId(), singleReport,
new BlockReportContext(reports.length, r, reportId));
new BlockReportContext(reports.length, r, reportId,
fullBrLeaseId));
numReportsSent++;
numRPCs++;
if (cmd != null) {
@ -538,7 +537,8 @@ DatanodeCommand cacheReport() throws IOException {
return cmd;
}
HeartbeatResponse sendHeartBeat() throws IOException {
HeartbeatResponse sendHeartBeat(boolean requestBlockReportLease)
throws IOException {
StorageReport[] reports =
dn.getFSDataset().getStorageReports(bpos.getBlockPoolId());
if (LOG.isDebugEnabled()) {
@ -557,7 +557,8 @@ HeartbeatResponse sendHeartBeat() throws IOException {
dn.getXmitsInProgress(),
dn.getXceiverCount(),
numFailedVolumes,
volumeFailureSummary);
volumeFailureSummary,
requestBlockReportLease);
}
//This must be called only by BPOfferService
@ -625,8 +626,9 @@ private void offerService() throws Exception {
LOG.info("For namenode " + nnAddr + " using"
+ " BLOCKREPORT_INTERVAL of " + dnConf.blockReportInterval + "msec"
+ " CACHEREPORT_INTERVAL of " + dnConf.cacheReportInterval + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelay + "msec"
+ " Initial delay: " + dnConf.initialBlockReportDelayMs + "msec"
+ "; heartBeatInterval=" + dnConf.heartBeatInterval);
long fullBlockReportLeaseId = 0;
//
// Now loop for a long time....
@ -639,6 +641,7 @@ private void offerService() throws Exception {
// Every so often, send heartbeat or block-report
//
final boolean sendHeartbeat = scheduler.isHeartbeatDue(startTime);
HeartbeatResponse resp = null;
if (sendHeartbeat) {
//
// All heartbeat messages include following info:
@ -647,10 +650,23 @@ private void offerService() throws Exception {
// -- Total capacity
// -- Bytes remaining
//
boolean requestBlockReportLease = (fullBlockReportLeaseId == 0) &&
scheduler.isBlockReportDue(startTime);
scheduler.scheduleNextHeartbeat();
if (!dn.areHeartbeatsDisabledForTests()) {
HeartbeatResponse resp = sendHeartBeat();
resp = sendHeartBeat(requestBlockReportLease);
assert resp != null;
if (resp.getFullBlockReportLeaseId() != 0) {
if (fullBlockReportLeaseId != 0) {
LOG.warn(nnAddr + " sent back a full block report lease " +
"ID of 0x" +
Long.toHexString(resp.getFullBlockReportLeaseId()) +
", but we already have a lease ID of 0x" +
Long.toHexString(fullBlockReportLeaseId) + ". " +
"Overwriting old lease ID.");
}
fullBlockReportLeaseId = resp.getFullBlockReportLeaseId();
}
dn.getMetrics().addHeartbeat(scheduler.monotonicNow() - startTime);
// If the state of this NN has changed (eg STANDBY->ACTIVE)
@ -682,7 +698,16 @@ private void offerService() throws Exception {
reportReceivedDeletedBlocks();
}
List<DatanodeCommand> cmds = blockReport();
List<DatanodeCommand> cmds = null;
boolean forceFullBr =
scheduler.forceFullBlockReport.getAndSet(false);
if (forceFullBr) {
LOG.info("Forcing a full block report to " + nnAddr);
}
if ((fullBlockReportLeaseId != 0) || forceFullBr) {
cmds = blockReport(fullBlockReportLeaseId);
fullBlockReportLeaseId = 0;
}
processCommand(cmds == null ? null : cmds.toArray(new DatanodeCommand[cmds.size()]));
DatanodeCommand cmd = cacheReport();
@ -765,7 +790,7 @@ void register(NamespaceInfo nsInfo) throws IOException {
bpos.registrationSucceeded(this, bpRegistration);
// random short delay - helps scatter the BR from all DNs
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelay);
scheduler.scheduleBlockReport(dnConf.initialBlockReportDelayMs);
}
@ -958,7 +983,7 @@ void triggerBlockReport(BlockReportOptions options) throws IOException {
} else {
LOG.info(bpos.toString() + ": scheduling a full block report.");
synchronized(pendingIncrementalBRperStorage) {
scheduler.scheduleBlockReport(0);
scheduler.forceFullBlockReportNow();
pendingIncrementalBRperStorage.notifyAll();
}
}
@ -1011,6 +1036,9 @@ static class Scheduler {
@VisibleForTesting
boolean resetBlockReportTime = true;
private final AtomicBoolean forceFullBlockReport =
new AtomicBoolean(false);
private final long heartbeatIntervalMs;
private final long blockReportIntervalMs;
@ -1042,8 +1070,13 @@ boolean isHeartbeatDue(long startTime) {
return (nextHeartbeatTime - startTime <= 0);
}
boolean isBlockReportDue() {
return nextBlockReportTime - monotonicNow() <= 0;
boolean isBlockReportDue(long curTime) {
return nextBlockReportTime - curTime <= 0;
}
void forceFullBlockReportNow() {
forceFullBlockReport.set(true);
resetBlockReportTime = true;
}
/**

View File

@ -82,7 +82,7 @@ public class DNConf {
final long heartBeatInterval;
final long blockReportInterval;
final long blockReportSplitThreshold;
final long initialBlockReportDelay;
final long initialBlockReportDelayMs;
final long cacheReportInterval;
final long dfsclientSlowIoWarningThresholdMs;
final long datanodeSlowIoWarningThresholdMs;
@ -159,7 +159,7 @@ public DNConf(Configuration conf) {
+ "greater than or equal to" + "dfs.blockreport.intervalMsec."
+ " Setting initial delay to 0 msec:");
}
initialBlockReportDelay = initBRDelay;
initialBlockReportDelayMs = initBRDelay;
heartBeatInterval = conf.getLong(DFS_HEARTBEAT_INTERVAL_KEY,
DFS_HEARTBEAT_INTERVAL_DEFAULT) * 1000L;

View File

@ -3976,7 +3976,8 @@ String getRegistrationID() {
HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] reports, long cacheCapacity, long cacheUsed,
int xceiverCount, int xmitsInProgress, int failedVolumes,
VolumeFailureSummary volumeFailureSummary) throws IOException {
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
readLock();
try {
//get datanode commands
@ -3985,13 +3986,17 @@ HeartbeatResponse handleHeartbeat(DatanodeRegistration nodeReg,
DatanodeCommand[] cmds = blockManager.getDatanodeManager().handleHeartbeat(
nodeReg, reports, blockPoolId, cacheCapacity, cacheUsed,
xceiverCount, maxTransfer, failedVolumes, volumeFailureSummary);
long blockReportLeaseId = 0;
if (requestFullBlockReportLease) {
blockReportLeaseId = blockManager.requestBlockReportLeaseId(nodeReg);
}
//create ha status
final NNHAStatusHeartbeat haState = new NNHAStatusHeartbeat(
haContext.getState().getServiceState(),
getFSImage().getLastAppliedOrWrittenTxId());
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo);
return new HeartbeatResponse(cmds, haState, rollingUpgradeInfo,
blockReportLeaseId);
} finally {
readUnlock();
}

View File

@ -116,6 +116,7 @@
import org.apache.hadoop.hdfs.security.token.block.ExportedBlockKeys;
import org.apache.hadoop.hdfs.security.token.delegation.DelegationTokenIdentifier;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManager;
import org.apache.hadoop.hdfs.server.blockmanagement.BlockManagerFaultInjector;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants;
import org.apache.hadoop.hdfs.server.common.HdfsServerConstants.NamenodeRole;
import org.apache.hadoop.hdfs.server.common.IncorrectVersionException;
@ -1277,13 +1278,13 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration nodeReg)
public HeartbeatResponse sendHeartbeat(DatanodeRegistration nodeReg,
StorageReport[] report, long dnCacheCapacity, long dnCacheUsed,
int xmitsInProgress, int xceiverCount,
int failedVolumes, VolumeFailureSummary volumeFailureSummary)
throws IOException {
int failedVolumes, VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease) throws IOException {
checkNNStartup();
verifyRequest(nodeReg);
return namesystem.handleHeartbeat(nodeReg, report,
dnCacheCapacity, dnCacheUsed, xceiverCount, xmitsInProgress,
failedVolumes, volumeFailureSummary);
failedVolumes, volumeFailureSummary, requestFullBlockReportLease);
}
@Override // DatanodeProtocol
@ -1309,6 +1310,8 @@ public DatanodeCommand blockReport(DatanodeRegistration nodeReg,
blocks, context, (r == reports.length - 1));
metrics.incrStorageBlockReportOps();
}
BlockManagerFaultInjector.getInstance().
incomingBlockReportRpc(nodeReg, context);
if (nn.getFSImage().isUpgradeFinalized() &&
!namesystem.isRollingUpgrade() &&

View File

@ -31,14 +31,33 @@
*/
@InterfaceAudience.Private
public class BlockReportContext {
/**
* The total number of RPCs contained in the block report.
*/
private final int totalRpcs;
/**
* The index of this particular RPC.
*/
private final int curRpc;
/**
* A 64-bit ID which identifies the block report as a whole.
*/
private final long reportId;
public BlockReportContext(int totalRpcs, int curRpc, long reportId) {
/**
* The lease ID which this block report is using, or 0 if this block report is
* bypassing rate-limiting.
*/
private final long leaseId;
public BlockReportContext(int totalRpcs, int curRpc,
long reportId, long leaseId) {
this.totalRpcs = totalRpcs;
this.curRpc = curRpc;
this.reportId = reportId;
this.leaseId = leaseId;
}
public int getTotalRpcs() {
@ -52,4 +71,8 @@ public int getCurRpc() {
public long getReportId() {
return reportId;
}
public long getLeaseId() {
return leaseId;
}
}

View File

@ -102,6 +102,8 @@ public DatanodeRegistration registerDatanode(DatanodeRegistration registration
* @param xceiverCount number of active transceiver threads
* @param failedVolumes number of failed volumes
* @param volumeFailureSummary info about volume failures
* @param requestFullBlockReportLease whether to request a full block
* report lease.
* @throws IOException on error
*/
@Idempotent
@ -112,7 +114,8 @@ public HeartbeatResponse sendHeartbeat(DatanodeRegistration registration,
int xmitsInProgress,
int xceiverCount,
int failedVolumes,
VolumeFailureSummary volumeFailureSummary)
VolumeFailureSummary volumeFailureSummary,
boolean requestFullBlockReportLease)
throws IOException;
/**

View File

@ -35,11 +35,15 @@ public class HeartbeatResponse {
private final RollingUpgradeStatus rollingUpdateStatus;
private final long fullBlockReportLeaseId;
public HeartbeatResponse(DatanodeCommand[] cmds,
NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus) {
NNHAStatusHeartbeat haStatus, RollingUpgradeStatus rollingUpdateStatus,
long fullBlockReportLeaseId) {
commands = cmds;
this.haStatus = haStatus;
this.rollingUpdateStatus = rollingUpdateStatus;
this.fullBlockReportLeaseId = fullBlockReportLeaseId;
}
public DatanodeCommand[] getCommands() {
@ -53,4 +57,8 @@ public NNHAStatusHeartbeat getNameNodeHaState() {
public RollingUpgradeStatus getRollingUpdateStatus() {
return rollingUpdateStatus;
}
public long getFullBlockReportLeaseId() {
return fullBlockReportLeaseId;
}
}

View File

@ -21,7 +21,7 @@
import org.apache.hadoop.classification.InterfaceStability;
/**
* A BlockCommand is an instruction to a datanode to register with the namenode.
* A RegisterCommand is an instruction to a datanode to register with the namenode.
* This command can't be combined with other commands in the same response.
* This is because after the datanode processes RegisterCommand, it will skip
* the rest of the DatanodeCommands in the same HeartbeatResponse.

View File

@ -195,6 +195,7 @@ message HeartbeatRequestProto {
optional uint64 cacheCapacity = 6 [ default = 0 ];
optional uint64 cacheUsed = 7 [default = 0 ];
optional VolumeFailureSummaryProto volumeFailureSummary = 8;
optional bool requestFullBlockReportLease = 9 [ default = false ];
}
/**
@ -214,6 +215,7 @@ message HeartbeatResponseProto {
repeated DatanodeCommandProto cmds = 1; // Returned commands can be null
required NNHAStatusHeartbeatProto haStatus = 2;
optional RollingUpgradeStatusProto rollingUpgradeStatus = 3;
optional uint64 fullBlockReportLeaseId = 4 [ default = 0 ];
}
/**
@ -243,6 +245,10 @@ message BlockReportContextProto {
// The unique 64-bit ID of this block report
required int64 id = 3;
// The block report lease ID, or 0 if we are sending without a lease to
// bypass rate-limiting.
optional uint64 leaseId = 4 [ default = 0 ];
}
/**

View File

@ -577,6 +577,27 @@
</description>
</property>
<property>
<name>dfs.namenode.max.full.block.report.leases</name>
<value>6</value>
<description>The maximum number of leases for full block reports that the
NameNode will issue at any given time. This prevents the NameNode from
being flooded with full block reports that use up all the RPC handler
threads. This number should never be more than the number of RPC handler
threads or less than 1.
</description>
</property>
<property>
<name>dfs.namenode.full.block.report.lease.length.ms</name>
<value>300000</value>
<description>
The number of milliseconds that the NameNode will wait before invalidating
a full block report lease. This prevents a crashed DataNode from
permanently using up a full block report lease.
</description>
</property>
<property>
<name>dfs.datanode.directoryscan.interval</name>
<value>21600</value>

View File

@ -221,7 +221,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
request.set(null);
nsInfo.setCapabilities(Capability.STORAGE_BLOCK_REPORT_BUFFERS.getMask());
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
BlockReportRequestProto proto = request.get();
assertNotNull(proto);
assertTrue(proto.getReports(0).getBlocksList().isEmpty());
@ -231,7 +231,7 @@ public BlockReportResponseProto answer(InvocationOnMock invocation) {
request.set(null);
nsInfo.setCapabilities(Capability.UNKNOWN.getMask());
nn.blockReport(reg, "pool", sbr,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
proto = request.get();
assertNotNull(proto);
assertFalse(proto.getReports(0).getBlocksList().isEmpty());

View File

@ -0,0 +1,246 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.blockmanagement;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES;
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS;
import com.google.common.base.Joiner;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.Uninterruptibles;
import org.apache.commons.lang.mutable.MutableObject;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.apache.hadoop.hdfs.protocol.DatanodeID;
import org.apache.hadoop.hdfs.server.datanode.DataNode;
import org.apache.hadoop.hdfs.server.protocol.BlockReportContext;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.log4j.Level;
import org.junit.Assert;
import org.junit.After;
import org.junit.BeforeClass;
import org.junit.Test;
import java.io.IOException;
import java.util.HashSet;
import java.util.List;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
public class TestBlockReportRateLimiting {
static final Log LOG = LogFactory.getLog(TestBlockReportRateLimiting.class);
private static void setFailure(AtomicReference<String> failure,
String what) {
failure.compareAndSet("", what);
LOG.error("Test error: " + what);
}
@After
public void restoreNormalBlockManagerFaultInjector() {
BlockManagerFaultInjector.instance = new BlockManagerFaultInjector();
}
@BeforeClass
public static void raiseBlockManagerLogLevels() {
GenericTestUtils.setLogLevel(BlockManager.LOG, Level.ALL);
GenericTestUtils.setLogLevel(BlockReportLeaseManager.LOG, Level.ALL);
}
@Test(timeout=180000)
public void testRateLimitingDuringDataNodeStartup() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS,
20L * 60L * 1000L);
final Semaphore fbrSem = new Semaphore(0);
final HashSet<DatanodeID> expectedFbrDns = new HashSet<>();
final HashSet<DatanodeID> fbrDns = new HashSet<>();
final AtomicReference<String> failure = new AtomicReference<String>("");
final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
private int numLeases = 0;
@Override
public void incomingBlockReportRpc(DatanodeID nodeID,
BlockReportContext context) throws IOException {
LOG.info("Incoming full block report from " + nodeID +
". Lease ID = 0x" + Long.toHexString(context.getLeaseId()));
if (context.getLeaseId() == 0) {
setFailure(failure, "Got unexpected rate-limiting-" +
"bypassing full block report RPC from " + nodeID);
}
fbrSem.acquireUninterruptibly();
synchronized (this) {
fbrDns.add(nodeID);
if (!expectedFbrDns.remove(nodeID)) {
setFailure(failure, "Got unexpected full block report " +
"RPC from " + nodeID + ". expectedFbrDns = " +
Joiner.on(", ").join(expectedFbrDns));
}
LOG.info("Proceeding with full block report from " +
nodeID + ". Lease ID = 0x" +
Long.toHexString(context.getLeaseId()));
}
}
@Override
public void requestBlockReportLease(DatanodeDescriptor node,
long leaseId) {
if (leaseId == 0) {
return;
}
synchronized (this) {
numLeases++;
expectedFbrDns.add(node);
LOG.info("requestBlockReportLease(node=" + node +
", leaseId=0x" + Long.toHexString(leaseId) + "). " +
"expectedFbrDns = " + Joiner.on(", ").join(expectedFbrDns));
if (numLeases > 1) {
setFailure(failure, "More than 1 lease was issued at once.");
}
}
}
@Override
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
LOG.info("removeBlockReportLease(node=" + node +
", leaseId=0x" + Long.toHexString(leaseId) + ")");
synchronized (this) {
numLeases--;
}
}
};
BlockManagerFaultInjector.instance = injector;
final int NUM_DATANODES = 5;
MiniDFSCluster cluster =
new MiniDFSCluster.Builder(conf).numDataNodes(NUM_DATANODES).build();
cluster.waitActive();
for (int n = 1; n <= NUM_DATANODES; n++) {
LOG.info("Waiting for " + n + " datanode(s) to report in.");
fbrSem.release();
Uninterruptibles.sleepUninterruptibly(20, TimeUnit.MILLISECONDS);
final int currentN = n;
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
synchronized (injector) {
if (fbrDns.size() > currentN) {
setFailure(failure, "Expected at most " + currentN +
" datanodes to have sent a block report, but actually " +
fbrDns.size() + " have.");
}
return (fbrDns.size() >= currentN);
}
}
}, 25, 50000);
}
cluster.shutdown();
Assert.assertEquals("", failure.get());
}
/**
* Start a 2-node cluster with only one block report lease. When the
* first datanode gets a lease, kill it. Then wait for the lease to
* expire, and the second datanode to send a full block report.
*/
@Test(timeout=180000)
public void testLeaseExpiration() throws Exception {
Configuration conf = new Configuration();
conf.setInt(DFS_NAMENODE_MAX_FULL_BLOCK_REPORT_LEASES, 1);
conf.setLong(DFS_NAMENODE_FULL_BLOCK_REPORT_LEASE_LENGTH_MS, 100L);
final Semaphore gotFbrSem = new Semaphore(0);
final AtomicReference<String> failure = new AtomicReference<String>("");
final AtomicReference<MiniDFSCluster> cluster =
new AtomicReference<>(null);
final BlockingQueue<Integer> datanodeToStop =
new ArrayBlockingQueue<Integer>(1);
final BlockManagerFaultInjector injector = new BlockManagerFaultInjector() {
private String uuidToStop = "";
@Override
public void incomingBlockReportRpc(DatanodeID nodeID,
BlockReportContext context) throws IOException {
if (context.getLeaseId() == 0) {
setFailure(failure, "Got unexpected rate-limiting-" +
"bypassing full block report RPC from " + nodeID);
}
synchronized (this) {
if (uuidToStop.equals(nodeID.getDatanodeUuid())) {
throw new IOException("Injecting failure into block " +
"report RPC for " + nodeID);
}
}
gotFbrSem.release();
}
@Override
public void requestBlockReportLease(DatanodeDescriptor node,
long leaseId) {
if (leaseId == 0) {
return;
}
synchronized (this) {
if (uuidToStop.isEmpty()) {
MiniDFSCluster cl;
do {
cl = cluster.get();
} while (cl == null);
int datanodeIndexToStop = getDatanodeIndex(cl, node);
uuidToStop = node.getDatanodeUuid();
datanodeToStop.add(Integer.valueOf(datanodeIndexToStop));
}
}
}
private int getDatanodeIndex(MiniDFSCluster cl,
DatanodeDescriptor node) {
List<DataNode> datanodes = cl.getDataNodes();
for (int i = 0; i < datanodes.size(); i++) {
DataNode datanode = datanodes.get(i);
if (datanode.getDatanodeUuid().equals(node.getDatanodeUuid())) {
return i;
}
}
throw new RuntimeException("Failed to find UUID " +
node.getDatanodeUuid() + " in the list of datanodes.");
}
@Override
public void removeBlockReportLease(DatanodeDescriptor node, long leaseId) {
}
};
BlockManagerFaultInjector.instance = injector;
cluster.set(new MiniDFSCluster.Builder(conf).numDataNodes(2).build());
cluster.get().waitActive();
int datanodeIndexToStop = datanodeToStop.take();
cluster.get().stopDataNode(datanodeIndexToStop);
gotFbrSem.acquire();
cluster.get().shutdown();
Assert.assertEquals("", failure.get());
}
}

View File

@ -59,6 +59,15 @@ public class TestDatanodeManager {
//The number of times the registration / removal of nodes should happen
final int NUM_ITERATIONS = 500;
private static DatanodeManager mockDatanodeManager(
FSNamesystem fsn, Configuration conf) throws IOException {
BlockManager bm = Mockito.mock(BlockManager.class);
BlockReportLeaseManager blm = new BlockReportLeaseManager(conf);
Mockito.when(bm.getBlockReportLeaseManager()).thenReturn(blm);
DatanodeManager dm = new DatanodeManager(bm, fsn, conf);
return dm;
}
/**
* This test sends a random sequence of node registrations and node removals
* to the DatanodeManager (of nodes with different IDs and versions), and
@ -70,8 +79,7 @@ public void testNumVersionsReportedCorrect() throws IOException {
//Create the DatanodeManager which will be tested
FSNamesystem fsn = Mockito.mock(FSNamesystem.class);
Mockito.when(fsn.hasWriteLock()).thenReturn(true);
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, new Configuration());
DatanodeManager dm = mockDatanodeManager(fsn, new Configuration());
//Seed the RNG with a known value so test failures are easier to reproduce
Random rng = new Random();
@ -183,8 +191,7 @@ public void testRejectUnresolvedDatanodes() throws IOException {
TestDatanodeManager.MyResolver.class, DNSToSwitchMapping.class);
//create DatanodeManager
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, conf);
DatanodeManager dm = mockDatanodeManager(fsn, conf);
//storageID to register.
String storageID = "someStorageID-123";
@ -258,7 +265,6 @@ public void testBadScript() throws IOException, URISyntaxException {
HelperFunction("/"+ Shell.appendScriptExtension("topology-broken-script"));
}
/**
* Helper function that tests the DatanodeManagers SortedBlock function
* we invoke this function with and without topology scripts
@ -281,8 +287,7 @@ public void HelperFunction(String scriptFileName)
conf.set(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY,
resourcePath.toString());
}
DatanodeManager dm = new DatanodeManager(Mockito.mock(BlockManager.class),
fsn, conf);
DatanodeManager dm = mockDatanodeManager(fsn, conf);
// register 5 datanodes, each with different storage ID and type
DatanodeInfo[] locs = new DatanodeInfo[5];

View File

@ -115,7 +115,7 @@ private static void runTest(final String testCaseName,
// Stop the DataNode and send fake heartbeat with missing storage.
cluster.stopDataNode(0);
cluster.getNameNodeRpc().sendHeartbeat(dnReg, prunedReports, 0L, 0L, 0, 0,
0, null);
0, null, true);
// Check that the missing storage was pruned.
assertThat(dnDescriptor.getStorageInfos().length, is(expectedStoragesAfterTest));

View File

@ -28,6 +28,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.commons.logging.Log;
@ -143,7 +144,8 @@ private DatanodeProtocolClientSideTranslatorPB setupNNMock(int nnIdx)
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class));
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean());
mockHaStatuses[nnIdx] = new NNHAStatusHeartbeat(HAServiceState.STANDBY, 0);
return mock;
}
@ -164,7 +166,8 @@ public HeartbeatAnswer(int nnIdx) {
public HeartbeatResponse answer(InvocationOnMock invocation) throws Throwable {
heartbeatCounts[nnIdx]++;
return new HeartbeatResponse(new DatanodeCommand[0],
mockHaStatuses[nnIdx], null);
mockHaStatuses[nnIdx], null,
ThreadLocalRandom.current().nextLong() | 1L);
}
}

View File

@ -126,7 +126,7 @@ public void testBlockHasMultipleReplicasOnSameDN() throws IOException {
// Should not assert!
cluster.getNameNodeRpc().blockReport(dnReg, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
// Get the block locations once again.
locatedBlocks = client.getLocatedBlocks(filename, 0, BLOCK_SIZE * NUM_BLOCKS);

View File

@ -39,6 +39,7 @@
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.concurrent.ThreadLocalRandom;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log;
@ -162,11 +163,12 @@ public DatanodeRegistration answer(InvocationOnMock invocation)
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class)))
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean()))
.thenReturn(new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null));
null, ThreadLocalRandom.current().nextLong() | 1L));
dn = new DataNode(conf, locations, null) {
@Override

View File

@ -57,7 +57,7 @@ public void testInit() {
for (final long now : getTimestamps()) {
Scheduler scheduler = makeMockScheduler(now);
assertTrue(scheduler.isHeartbeatDue(now));
assertTrue(scheduler.isBlockReportDue());
assertTrue(scheduler.isBlockReportDue(scheduler.monotonicNow()));
}
}

View File

@ -27,6 +27,7 @@
import java.net.InetSocketAddress;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.concurrent.ThreadLocalRandom;
import com.google.common.base.Supplier;
import org.apache.commons.logging.Log;
@ -199,13 +200,13 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
heartbeatResponse = new HeartbeatResponse(
new DatanodeCommand[]{RegisterCommand.REGISTER},
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null);
null, ThreadLocalRandom.current().nextLong() | 1L);
} else {
LOG.info("mockito heartbeatResponse " + i);
heartbeatResponse = new HeartbeatResponse(
new DatanodeCommand[0],
new NNHAStatusHeartbeat(HAServiceState.ACTIVE, 1),
null);
null, ThreadLocalRandom.current().nextLong() | 1L);
}
return heartbeatResponse;
}
@ -217,7 +218,8 @@ public HeartbeatResponse answer(InvocationOnMock invocation)
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.anyInt(),
Mockito.any(VolumeFailureSummary.class));
Mockito.any(VolumeFailureSummary.class),
Mockito.anyBoolean());
dn = new DataNode(conf, locations, null) {
@Override

View File

@ -21,6 +21,7 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyBoolean;
import static org.mockito.Matchers.anyInt;
import static org.mockito.Matchers.anyLong;
import static org.mockito.Mockito.doReturn;
@ -31,6 +32,7 @@
import java.nio.channels.FileChannel;
import java.util.HashSet;
import java.util.Set;
import java.util.concurrent.ThreadLocalRandom;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -159,11 +161,14 @@ private static void setHeartbeatResponse(DatanodeCommand[] cmds)
throws IOException {
NNHAStatusHeartbeat ha = new NNHAStatusHeartbeat(HAServiceState.ACTIVE,
fsImage.getLastAppliedOrWrittenTxId());
HeartbeatResponse response = new HeartbeatResponse(cmds, ha, null);
HeartbeatResponse response =
new HeartbeatResponse(cmds, ha, null,
ThreadLocalRandom.current().nextLong() | 1L);
doReturn(response).when(spyNN).sendHeartbeat(
(DatanodeRegistration) any(),
(StorageReport[]) any(), anyLong(), anyLong(),
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any());
anyInt(), anyInt(), anyInt(), (VolumeFailureSummary) any(),
anyBoolean());
}
private static DatanodeCommand[] cacheBlock(HdfsBlockLocation loc) {

View File

@ -40,7 +40,7 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
LOG.info("Sending block report for storage " + report.getStorage().getStorageID());
StorageBlockReport[] singletonReport = { report };
cluster.getNameNodeRpc().blockReport(dnR, poolId, singletonReport,
new BlockReportContext(reports.length, i, System.nanoTime()));
new BlockReportContext(reports.length, i, System.nanoTime(), 0L));
i++;
}
}

View File

@ -36,6 +36,6 @@ protected void sendBlockReports(DatanodeRegistration dnR, String poolId,
StorageBlockReport[] reports) throws IOException {
LOG.info("Sending combined block reports for " + dnR);
cluster.getNameNodeRpc().blockReport(dnR, poolId, reports,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
}
}

View File

@ -106,7 +106,7 @@ public void testStorageReportHasStorageTypeAndState() throws IOException {
any(DatanodeRegistration.class),
captor.capture(),
anyLong(), anyLong(), anyInt(), anyInt(), anyInt(),
Mockito.any(VolumeFailureSummary.class));
Mockito.any(VolumeFailureSummary.class), Mockito.anyBoolean());
StorageReport[] reports = captor.getValue();

View File

@ -968,7 +968,7 @@ void register() throws IOException {
new StorageBlockReport(storage, BlockListAsLongs.EMPTY)
};
dataNodeProto.blockReport(dnRegistration, bpid, reports,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
}
/**
@ -981,7 +981,7 @@ void sendHeartbeat() throws IOException {
StorageReport[] rep = { new StorageReport(storage, false,
DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration, rep,
0L, 0L, 0, 0, 0, null).getCommands();
0L, 0L, 0, 0, 0, null, true).getCommands();
if(cmds != null) {
for (DatanodeCommand cmd : cmds ) {
if(LOG.isDebugEnabled()) {
@ -1030,7 +1030,7 @@ int replicateBlocks() throws IOException {
StorageReport[] rep = { new StorageReport(storage,
false, DF_CAPACITY, DF_USED, DF_CAPACITY - DF_USED, DF_USED) };
DatanodeCommand[] cmds = dataNodeProto.sendHeartbeat(dnRegistration,
rep, 0L, 0L, 0, 0, 0, null).getCommands();
rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
if (cmds != null) {
for (DatanodeCommand cmd : cmds) {
if (cmd.getAction() == DatanodeProtocol.DNA_TRANSFER) {
@ -1213,7 +1213,7 @@ long executeOp(int daemonId, int inputIdx, String ignore) throws IOException {
StorageBlockReport[] report = { new StorageBlockReport(
dn.storage, dn.getBlockReportList()) };
dataNodeProto.blockReport(dn.dnRegistration, bpid, report,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
long end = Time.now();
return end-start;
}

View File

@ -117,7 +117,7 @@ public static HeartbeatResponse sendHeartBeat(DatanodeRegistration nodeReg,
DatanodeDescriptor dd, FSNamesystem namesystem) throws IOException {
return namesystem.handleHeartbeat(nodeReg,
BlockManagerTestUtil.getStorageReportsForDatanode(dd),
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null);
dd.getCacheCapacity(), dd.getCacheRemaining(), 0, 0, 0, null, true);
}
public static boolean setReplication(final FSNamesystem ns,

View File

@ -109,7 +109,7 @@ public void testDeadDatanode() throws Exception {
BlockListAsLongs.EMPTY) };
try {
dnp.blockReport(reg, poolId, report,
new BlockReportContext(1, 0, System.nanoTime()));
new BlockReportContext(1, 0, System.nanoTime(), 0L));
fail("Expected IOException is not thrown");
} catch (IOException ex) {
// Expected
@ -120,8 +120,8 @@ public void testDeadDatanode() throws Exception {
StorageReport[] rep = { new StorageReport(
new DatanodeStorage(reg.getDatanodeUuid()),
false, 0, 0, 0, 0) };
DatanodeCommand[] cmd = dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null)
.getCommands();
DatanodeCommand[] cmd =
dnp.sendHeartbeat(reg, rep, 0L, 0L, 0, 0, 0, null, true).getCommands();
assertEquals(1, cmd.length);
assertEquals(cmd[0].getAction(), RegisterCommand.REGISTER
.getAction());