HDFS-4270. Introduce soft and hard limits for max replication so that replications of the highest priority are allowed to choose a source datanode that has reached its soft limit but not the hard limit. Contributed by Derek Dagit

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1428739 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2013-01-04 08:09:43 +00:00
parent 32052a1e3a
commit 2c15726999
4 changed files with 116 additions and 13 deletions

View File

@ -443,6 +443,11 @@ Release 2.0.3-alpha - Unreleased
HDFS-4326. bump up Tomcat version for HttpFS to 6.0.36. (tucu via acmurthy)
HDFS-4270. Introduce soft and hard limits for max replication so that
replications of the highest priority are allowed to choose a source datanode
that has reached its soft limit but not the hard limit. (Derek Dagit via
szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -143,6 +143,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final int DFS_NAMENODE_REPLICATION_PENDING_TIMEOUT_SEC_DEFAULT = -1;
public static final String DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY = "dfs.namenode.replication.max-streams";
public static final int DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT = 2;
public static final String DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY = "dfs.namenode.replication.max-streams-hard-limit";
public static final int DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT = 4;
public static final String DFS_WEBHDFS_ENABLED_KEY = "dfs.webhdfs.enabled";
public static final boolean DFS_WEBHDFS_ENABLED_DEFAULT = false;
public static final String DFS_PERMISSIONS_ENABLED_KEY = "dfs.permissions.enabled";

View File

@ -189,10 +189,16 @@ public int getPendingDataNodeMessageCount() {
/** The maximum number of replicas allowed for a block */
public final short maxReplication;
/** The maximum number of outgoing replication streams
* a given node should have at one time
*/
/**
* The maximum number of outgoing replication streams a given node should have
* at one time considering all but the highest priority replications needed.
*/
int maxReplicationStreams;
/**
* The maximum number of outgoing replication streams a given node should have
* at one time.
*/
int replicationStreamsHardLimit;
/** Minimum copies needed or else write is disallowed */
public final short minReplication;
/** Default number of replicas */
@ -263,9 +269,16 @@ public BlockManager(final Namesystem namesystem, final FSClusterStats stats,
this.minReplication = (short)minR;
this.maxReplication = (short)maxR;
this.maxReplicationStreams = conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.shouldCheckForEnoughRacks = conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) != null;
this.maxReplicationStreams =
conf.getInt(DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_MAX_STREAMS_DEFAULT);
this.replicationStreamsHardLimit =
conf.getInt(
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_STREAMS_HARD_LIMIT_DEFAULT);
this.shouldCheckForEnoughRacks =
conf.get(DFSConfigKeys.NET_TOPOLOGY_SCRIPT_FILE_NAME_KEY) == null
? false : true;
this.blocksInvalidateWorkPct = DFSUtil.getInvalidateWorkPctPerIteration(conf);
this.blocksReplWorkMultiplier = DFSUtil.getReplWorkMultiplier(conf);
@ -435,7 +448,8 @@ private void dumpBlockMeta(Block block, PrintWriter out) {
NumberReplicas numReplicas = new NumberReplicas();
// source node returned is not used
chooseSourceDatanode(block, containingNodes,
containingLiveReplicasNodes, numReplicas);
containingLiveReplicasNodes, numReplicas,
UnderReplicatedBlocks.LEVEL);
assert containingLiveReplicasNodes.size() == numReplicas.liveReplicas();
int usableReplicas = numReplicas.liveReplicas() +
numReplicas.decommissionedReplicas();
@ -1145,11 +1159,12 @@ int computeReplicationWorkForBlocks(List<List<Block>> blocksToReplicate) {
liveReplicaNodes = new ArrayList<DatanodeDescriptor>();
NumberReplicas numReplicas = new NumberReplicas();
srcNode = chooseSourceDatanode(
block, containingNodes, liveReplicaNodes, numReplicas);
block, containingNodes, liveReplicaNodes, numReplicas,
priority);
if(srcNode == null) { // block can not be replicated from any node
LOG.debug("Block " + block + " cannot be repl from any node");
continue;
}
}
assert liveReplicaNodes.size() == numReplicas.liveReplicas();
// do not schedule more if enough replicas is already pending
@ -1339,16 +1354,34 @@ public DatanodeDescriptor[] chooseTarget(final String src,
* since the former do not have write traffic and hence are less busy.
* We do not use already decommissioned nodes as a source.
* Otherwise we choose a random node among those that did not reach their
* replication limit.
* replication limits. However, if the replication is of the highest priority
* and all nodes have reached their replication limits, we will choose a
* random node despite the replication limit.
*
* In addition form a list of all nodes containing the block
* and calculate its replication numbers.
*
* @param block Block for which a replication source is needed
* @param containingNodes List to be populated with nodes found to contain the
* given block
* @param nodesContainingLiveReplicas List to be populated with nodes found to
* contain live replicas of the given block
* @param numReplicas NumberReplicas instance to be initialized with the
* counts of live, corrupt, excess, and
* decommissioned replicas of the given
* block.
* @param priority integer representing replication priority of the given
* block
* @return the DatanodeDescriptor of the chosen node from which to replicate
* the given block
*/
private DatanodeDescriptor chooseSourceDatanode(
@VisibleForTesting
DatanodeDescriptor chooseSourceDatanode(
Block block,
List<DatanodeDescriptor> containingNodes,
List<DatanodeDescriptor> nodesContainingLiveReplicas,
NumberReplicas numReplicas) {
NumberReplicas numReplicas,
int priority) {
containingNodes.clear();
nodesContainingLiveReplicas.clear();
DatanodeDescriptor srcNode = null;
@ -1377,8 +1410,15 @@ else if (excessBlocks != null && excessBlocks.contains(block)) {
// If so, do not select the node as src node
if ((nodesCorrupt != null) && nodesCorrupt.contains(node))
continue;
if(node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
if(priority != UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY
&& node.getNumberOfBlocksToBeReplicated() >= maxReplicationStreams)
{
continue; // already reached replication limit
}
if (node.getNumberOfBlocksToBeReplicated() >= replicationStreamsHardLimit)
{
continue;
}
// the block must not be scheduled for removal on srcNode
if(excessBlocks != null && excessBlocks.contains(block))
continue;

View File

@ -19,10 +19,13 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.ArrayList;
import java.util.LinkedList;
import java.util.List;
import java.util.Map.Entry;
@ -429,4 +432,57 @@ private LinkedListMultimap<DatanodeDescriptor, BlockTargetPair> getAllPendingRep
}
return repls;
}
/**
* Test that a source node for a highest-priority replication is chosen even if all available
* source nodes have reached their replication limits.
*/
@Test
public void testHighestPriReplSrcChosenDespiteMaxReplLimit() throws Exception {
bm.maxReplicationStreams = 0;
bm.replicationStreamsHardLimit = 1;
long blockId = 42; // arbitrary
Block aBlock = new Block(blockId, 0, 0);
List<DatanodeDescriptor> origNodes = getNodes(0, 1);
// Add the block to the first node.
addBlockOnNodes(blockId,origNodes.subList(0,1));
List<DatanodeDescriptor> cntNodes = new LinkedList<DatanodeDescriptor>();
List<DatanodeDescriptor> liveNodes = new LinkedList<DatanodeDescriptor>();
assertNotNull("Chooses source node for a highest-priority replication"
+ " even if all available source nodes have reached their replication"
+ " limits below the hard limit.",
bm.chooseSourceDatanode(
aBlock,
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
assertNull("Does not choose a source node for a less-than-highest-priority"
+ " replication since all available source nodes have reached"
+ " their replication limits.",
bm.chooseSourceDatanode(
aBlock,
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_VERY_UNDER_REPLICATED));
// Increase the replication count to test replication count > hard limit
DatanodeDescriptor targets[] = { origNodes.get(1) };
origNodes.get(0).addBlockToBeReplicated(aBlock, targets);
assertNull("Does not choose a source node for a highest-priority"
+ " replication when all available nodes exceed the hard limit.",
bm.chooseSourceDatanode(
aBlock,
cntNodes,
liveNodes,
new NumberReplicas(),
UnderReplicatedBlocks.QUEUE_HIGHEST_PRIORITY));
}
}