HDFS-2495. Increase granularity of write operations in ReplicationMonitor thus reducing contention for write lock. Contributed by Tomasz Nykiel.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1199024 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2011-11-08 00:16:23 +00:00
parent a83753b735
commit 6dff9329d5
3 changed files with 209 additions and 140 deletions

View File

@ -52,9 +52,14 @@ Trunk (unreleased changes)
HDFS-2334. Add Closeable to JournalManager. (Ivan Kelly via jitendra)
OPTIMIZATIONS
HDFS-2477. Optimize computing the diff between a block report and the
namenode state. (Tomasz Nykiel via hairong)
HDFS-2495. Increase granularity of write operations in ReplicationMonitor
thus reducing contention for write lock. (Tomasz Nykiel via hairong)
BUG FIXES
HDFS-2287. TestParallelRead has a small off-by-one bug. (todd)

View File

@ -929,15 +929,7 @@ private int computeReplicationWork(int blocksToProcess) throws IOException {
chooseUnderReplicatedBlocks(blocksToProcess);
// replicate blocks
int scheduledReplicationCount = 0;
for (int i=0; i<blocksToReplicate.size(); i++) {
for(Block block : blocksToReplicate.get(i)) {
if (computeReplicationWorkForBlock(block, i)) {
scheduledReplicationCount++;
}
}
}
return scheduledReplicationCount;
return computeReplicationWorkForBlocks(blocksToReplicate);
}
/**
@ -1002,170 +994,201 @@ private List<List<Block>> chooseUnderReplicatedBlocks(int blocksToProcess) {
return blocksToReplicate;
}
/** Replicate a block
/** Replicate a set of blocks
*
* @param block block to be replicated
* @param priority a hint of its priority in the neededReplication queue
* @return if the block gets replicated or not
* @param blocksToReplicate blocks to be replicated, for each priority
* @return the number of blocks scheduled for replication
*/
@VisibleForTesting
boolean computeReplicationWorkForBlock(Block block, int priority) {
int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
int requiredReplication, numEffectiveReplicas;
List<DatanodeDescriptor> containingNodes, liveReplicaNodes;
DatanodeDescriptor srcNode;
INodeFile fileINode = null;
int additionalReplRequired;
int scheduledWork = 0;
List<ReplicationWork> work = new LinkedList<ReplicationWork>();
namesystem.writeLock();
try {
synchronized (neededReplications) {
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
return false;
}
for (int priority = 0; priority < blocksToReplicate.size(); priority++) {
for (Block block : blocksToReplicate.get(priority)) {
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
continue;
}
requiredReplication = fileINode.getReplication();
requiredReplication = fileINode.getReplication();
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas);
if(srcNode == null) // block can not be replicated from any node
return false;
// get a source data-node
containingNodes = new ArrayList<DatanodeDescriptor>();
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas);
if(srcNode == null) // block can not be replicated from any node
continue;
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
return false;
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
continue;
}
}
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication
- numEffectiveReplicas;
} else {
additionalReplRequired = 1; // Needed on a new rack
}
work.add(new ReplicationWork(block, fileINode, srcNode,
containingNodes, liveReplicaNodes, additionalReplRequired,
priority));
}
}
if (numReplicas.liveReplicas() < requiredReplication) {
additionalReplRequired = requiredReplication - numEffectiveReplicas;
} else {
additionalReplRequired = 1; //Needed on a new rack
}
}
} finally {
namesystem.writeUnlock();
}
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
HashMap<Node, Node> excludedNodes = new HashMap<Node, Node>();
for (DatanodeDescriptor dn : containingNodes) {
excludedNodes.put(dn, dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the Inode itself.
DatanodeDescriptor targets[] =
blockplacement.chooseTarget(fileINode, additionalReplRequired,
srcNode, liveReplicaNodes, excludedNodes, block.getNumBytes());
if(targets.length == 0)
return false;
HashMap<Node, Node> excludedNodes
= new HashMap<Node, Node>();
for(ReplicationWork rw : work){
// Exclude all of the containing nodes from being targets.
// This list includes decommissioning or corrupt nodes.
excludedNodes.clear();
for (DatanodeDescriptor dn : rw.containingNodes) {
excludedNodes.put(dn, dn);
}
// choose replication targets: NOT HOLDING THE GLOBAL LOCK
// It is costly to extract the filename for which chooseTargets is called,
// so for now we pass in the Inode itself.
rw.targets = blockplacement.chooseTarget(rw.fileINode,
rw.additionalReplRequired, rw.srcNode, rw.liveReplicaNodes,
excludedNodes, rw.block.getNumBytes());
}
namesystem.writeLock();
try {
synchronized (neededReplications) {
// Recheck since global lock was released
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
return false;
for(ReplicationWork rw : work){
DatanodeDescriptor[] targets = rw.targets;
if(targets == null || targets.length == 0){
rw.targets = null;
continue;
}
requiredReplication = fileINode.getReplication();
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
synchronized (neededReplications) {
Block block = rw.block;
int priority = rw.priority;
// Recheck since global lock was released
// block should belong to a file
fileINode = blocksMap.getINode(block);
// abandoned block or block reopened for append
if(fileINode == null || fileINode.isUnderConstruction()) {
neededReplications.remove(block, priority); // remove from neededReplications
rw.targets = null;
replIndex--;
continue;
}
requiredReplication = fileINode.getReplication();
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
// do not schedule more if enough replicas is already pending
NumberReplicas numReplicas = countNodes(block);
numEffectiveReplicas = numReplicas.liveReplicas() +
pendingReplications.getNumReplicas(block);
if (numEffectiveReplicas >= requiredReplication) {
if ( (pendingReplications.getNumReplicas(block) > 0) ||
(blockHasEnoughRacks(block)) ) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
rw.targets = null;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
continue;
}
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (rw.srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
//No use continuing, unless a new rack in this case
continue;
}
}
// Add block to the to be replicated list
rw.srcNode.addBlockToBeReplicated(block, targets);
scheduledWork++;
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.add(block, targets.length);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* block " + block
+ " is moved from neededReplications to pendingReplications");
}
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
NameNode.stateChangeLog.info("BLOCK* "
+ "Removing block " + block
+ " from neededReplications as it has enough replicas.");
return false;
}
}
}
} finally {
namesystem.writeUnlock();
}
if ( (numReplicas.liveReplicas() >= requiredReplication) &&
(!blockHasEnoughRacks(block)) ) {
if (srcNode.getNetworkLocation().equals(targets[0].getNetworkLocation())) {
//No use continuing, unless a new rack in this case
return false;
}
}
// Add block to the to be replicated list
srcNode.addBlockToBeReplicated(block, targets);
for (DatanodeDescriptor dn : targets) {
dn.incBlocksScheduled();
}
// Move the block-replication into a "pending" state.
// The reason we use 'pending' is so we can retry
// replications that fail after an appropriate amount of time.
pendingReplications.add(block, targets.length);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* block " + block
+ " is moved from neededReplications to pendingReplications");
}
// remove from neededReplications
if(numEffectiveReplicas + targets.length >= requiredReplication) {
neededReplications.remove(block, priority); // remove from neededReplications
replIndex--;
}
if (NameNode.stateChangeLog.isInfoEnabled()) {
if (NameNode.stateChangeLog.isInfoEnabled()) {
// log which blocks have been scheduled for replication
for(ReplicationWork rw : work){
DatanodeDescriptor[] targets = rw.targets;
if (targets != null && targets.length != 0) {
StringBuilder targetList = new StringBuilder("datanode(s)");
for (int k = 0; k < targets.length; k++) {
targetList.append(' ');
targetList.append(targets[k].getName());
}
NameNode.stateChangeLog.info(
"BLOCK* ask "
+ srcNode.getName() + " to replicate "
+ block + " to " + targetList);
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* neededReplications = " + neededReplications.size()
+ " pendingReplications = " + pendingReplications.size());
}
"BLOCK* ask "
+ rw.srcNode.getName() + " to replicate "
+ rw.block + " to " + targetList);
}
}
} finally {
namesystem.writeUnlock();
}
if(NameNode.stateChangeLog.isDebugEnabled()) {
NameNode.stateChangeLog.debug(
"BLOCK* neededReplications = " + neededReplications.size()
+ " pendingReplications = " + pendingReplications.size());
}
return true;
return scheduledWork;
}
/**
@ -2596,4 +2619,34 @@ int computeDatanodeWork() throws IOException {
return workFound;
}
private static class ReplicationWork {
private Block block;
private INodeFile fileINode;
private DatanodeDescriptor srcNode;
private List<DatanodeDescriptor> containingNodes;
private List<DatanodeDescriptor> liveReplicaNodes;
private int additionalReplRequired;
private DatanodeDescriptor targets[];
private int priority;
public ReplicationWork(Block block,
INodeFile fileINode,
DatanodeDescriptor srcNode,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> liveReplicaNodes,
int additionalReplRequired,
int priority) {
this.block = block;
this.fileINode = fileINode;
this.srcNode = srcNode;
this.containingNodes = containingNodes;
this.liveReplicaNodes = liveReplicaNodes;
this.additionalReplRequired = additionalReplRequired;
this.priority = priority;
this.targets = null;
}
}
}

View File

@ -20,6 +20,7 @@
import static org.junit.Assert.*;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.Map.Entry;
@ -355,25 +356,35 @@ private BlockInfo addBlockOnNodes(long blockId, List<DatanodeDescriptor> nodes)
bm.blocksMap.addINode(blockInfo, iNode);
return blockInfo;
}
private DatanodeDescriptor[] scheduleSingleReplication(Block block) {
assertEquals("Block not initially pending replication",
0, bm.pendingReplications.getNumReplicas(block));
assertTrue("computeReplicationWork should indicate replication is needed",
bm.computeReplicationWorkForBlock(block, 1));
// list for priority 1
List<Block> list_p1 = new ArrayList<Block>();
list_p1.add(block);
// list of lists for each priority
List<List<Block>> list_all = new ArrayList<List<Block>>();
list_all.add(new ArrayList<Block>()); // for priority 0
list_all.add(list_p1); // for priority 1
assertEquals("Block not initially pending replication", 0,
bm.pendingReplications.getNumReplicas(block));
assertEquals(
"computeReplicationWork should indicate replication is needed", 1,
bm.computeReplicationWorkForBlocks(list_all));
assertTrue("replication is pending after work is computed",
bm.pendingReplications.getNumReplicas(block) > 0);
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls =
getAllPendingReplications();
LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> repls = getAllPendingReplications();
assertEquals(1, repls.size());
Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries().iterator().next();
Entry<DatanodeDescriptor, BlockTargetPair> repl = repls.entries()
.iterator().next();
DatanodeDescriptor[] targets = repl.getValue().targets;
DatanodeDescriptor[] pipeline = new DatanodeDescriptor[1 + targets.length];
pipeline[0] = repl.getKey();
System.arraycopy(targets, 0, pipeline, 1, targets.length);
return pipeline;
}