From bbab35e6d87aeebbc1848d7072c59af780536425 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Tue, 26 Jun 2012 03:25:47 +0000 Subject: [PATCH] HDFS-3498. Support replica removal in BlockPlacementPolicy and make BlockPlacementPolicyDefault extensible for reusing code in subclasses. Contributed by Junping Du git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1353807 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt | 4 + .../server/blockmanagement/BlockManager.java | 52 +++---------- .../blockmanagement/BlockPlacementPolicy.java | 77 +++++++++++++++++++ .../BlockPlacementPolicyDefault.java | 39 ++++++---- .../TestReplicationPolicy.java | 50 +++++++++++- 5 files changed, 167 insertions(+), 55 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 7acb002ee3..a02ab0fb27 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -93,6 +93,10 @@ Trunk (unreleased changes) HDFS-3478. Test quotas with Long.Max_Value. (Sujay Rau via eli) + HDFS-3498. Support replica removal in BlockPlacementPolicy and make + BlockPlacementPolicyDefault extensible for reusing code in subclasses. + (Junping Du via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java index 527a99765c..7e4a4857c7 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockManager.java @@ -2259,30 +2259,14 @@ private void chooseExcessReplicates(Collection nonExcess, BlockCollection bc = getBlockCollection(b); final Map> rackMap = new HashMap>(); - for(final Iterator iter = nonExcess.iterator(); - iter.hasNext(); ) { - final DatanodeDescriptor node = iter.next(); - final String rackName = node.getNetworkLocation(); - List datanodeList = rackMap.get(rackName); - if (datanodeList == null) { - datanodeList = new ArrayList(); - rackMap.put(rackName, datanodeList); - } - datanodeList.add(node); - } + final List moreThanOne = new ArrayList(); + final List exactlyOne = new ArrayList(); // split nodes into two sets - // priSet contains nodes on rack with more than one replica - // remains contains the remaining nodes - final List priSet = new ArrayList(); - final List remains = new ArrayList(); - for(List datanodeList : rackMap.values()) { - if (datanodeList.size() == 1 ) { - remains.add(datanodeList.get(0)); - } else { - priSet.addAll(datanodeList); - } - } + // moreThanOne contains nodes on rack with more than one replica + // exactlyOne contains the remaining nodes + replicator.splitNodesWithRack(nonExcess, rackMap, moreThanOne, + exactlyOne); // pick one node to delete that favors the delete hint // otherwise pick one with least space from priSet if it is not empty @@ -2292,30 +2276,18 @@ private void chooseExcessReplicates(Collection nonExcess, // check if we can delete delNodeHint final DatanodeInfo cur; if (firstOne && delNodeHint !=null && nonExcess.contains(delNodeHint) - && (priSet.contains(delNodeHint) - || (addedNode != null && !priSet.contains(addedNode))) ) { + && (moreThanOne.contains(delNodeHint) + || (addedNode != null && !moreThanOne.contains(addedNode))) ) { cur = delNodeHint; } else { // regular excessive replica removal cur = replicator.chooseReplicaToDelete(bc, b, replication, - priSet, remains); + moreThanOne, exactlyOne); } firstOne = false; - // adjust rackmap, priSet, and remains - String rack = cur.getNetworkLocation(); - final List datanodes = rackMap.get(rack); - datanodes.remove(cur); - if (datanodes.isEmpty()) { - rackMap.remove(rack); - } - if (priSet.remove(cur)) { - if (datanodes.size() == 1) { - priSet.remove(datanodes.get(0)); - remains.add(datanodes.get(0)); - } - } else { - remains.remove(cur); - } + // adjust rackmap, moreThanOne, and exactlyOne + replicator.adjustSetsWithChosenReplica(rackMap, moreThanOne, + exactlyOne, cur); nonExcess.remove(cur); addToExcessReplicate(cur, b); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java index e1efae5419..e3317467bd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicy.java @@ -21,12 +21,14 @@ import java.util.Collection; import java.util.HashMap; import java.util.List; +import java.util.Map; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.hdfs.protocol.Block; +import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.namenode.FSClusterStats; import org.apache.hadoop.net.NetworkTopology; @@ -241,5 +243,80 @@ public DatanodeDescriptor[] chooseTarget(String srcPath, excludedNodes, blocksize); } + + /** + * Adjust rackmap, moreThanOne, and exactlyOne after removing replica on cur. + * + * @param rackMap a map from rack to replica + * @param moreThanOne The List of replica nodes on rack which has more than + * one replica + * @param exactlyOne The List of replica nodes on rack with only one replica + * @param cur current replica to remove + */ + public void adjustSetsWithChosenReplica(final Map> rackMap, + final List moreThanOne, + final List exactlyOne, final DatanodeInfo cur) { + + String rack = getRack(cur); + final List datanodes = rackMap.get(rack); + datanodes.remove(cur); + if (datanodes.isEmpty()) { + rackMap.remove(rack); + } + if (moreThanOne.remove(cur)) { + if (datanodes.size() == 1) { + moreThanOne.remove(datanodes.get(0)); + exactlyOne.add(datanodes.get(0)); + } + } else { + exactlyOne.remove(cur); + } + } + + /** + * Get rack string from a data node + * @param datanode + * @return rack of data node + */ + protected String getRack(final DatanodeInfo datanode) { + return datanode.getNetworkLocation(); + } + + /** + * Split data nodes into two sets, one set includes nodes on rack with + * more than one replica, the other set contains the remaining nodes. + * + * @param dataNodes + * @param rackMap a map from rack to datanodes + * @param moreThanOne contains nodes on rack with more than one replica + * @param exactlyOne remains contains the remaining nodes + */ + public void splitNodesWithRack( + Collection dataNodes, + final Map> rackMap, + final List moreThanOne, + final List exactlyOne) { + for(DatanodeDescriptor node : dataNodes) { + final String rackName = getRack(node); + List datanodeList = rackMap.get(rackName); + if (datanodeList == null) { + datanodeList = new ArrayList(); + rackMap.put(rackName, datanodeList); + } + datanodeList.add(node); + } + + // split nodes into two sets + for(List datanodeList : rackMap.values()) { + if (datanodeList.size() == 1) { + // exactlyOne contains nodes on rack with only one replica + exactlyOne.add(datanodeList.get(0)); + } else { + // moreThanOne contains nodes on rack with more than one replica + moreThanOne.addAll(datanodeList); + } + } + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index 6995a2e30f..350f863eae 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -56,15 +56,15 @@ public class BlockPlacementPolicyDefault extends BlockPlacementPolicy { "For more information, please enable DEBUG log level on " + ((Log4JLogger)LOG).getLogger().getName(); - private boolean considerLoad; + protected boolean considerLoad; private boolean preferLocalNode = true; - private NetworkTopology clusterMap; + protected NetworkTopology clusterMap; private FSClusterStats stats; - private long heartbeatInterval; // interval for DataNode heartbeats + protected long heartbeatInterval; // interval for DataNode heartbeats /** * A miss of that many heartbeats is tolerated for replica deletion policy. */ - private int tolerateHeartbeatMultiplier; + protected int tolerateHeartbeatMultiplier; BlockPlacementPolicyDefault(Configuration conf, FSClusterStats stats, NetworkTopology clusterMap) { @@ -88,7 +88,7 @@ public void initialize(Configuration conf, FSClusterStats stats, DFSConfigKeys.DFS_NAMENODE_TOLERATE_HEARTBEAT_MULTIPLIER_DEFAULT); } - private ThreadLocal threadLocalBuilder = + protected ThreadLocal threadLocalBuilder = new ThreadLocal() { @Override protected StringBuilder initialValue() { @@ -229,7 +229,7 @@ private DatanodeDescriptor chooseTarget(int numOfReplicas, * choose a node on the same rack * @return the chosen node */ - private DatanodeDescriptor chooseLocalNode( + protected DatanodeDescriptor chooseLocalNode( DatanodeDescriptor localMachine, HashMap excludedNodes, long blocksize, @@ -263,7 +263,7 @@ private DatanodeDescriptor chooseLocalNode( * in the cluster. * @return the chosen node */ - private DatanodeDescriptor chooseLocalRack( + protected DatanodeDescriptor chooseLocalRack( DatanodeDescriptor localMachine, HashMap excludedNodes, long blocksize, @@ -316,7 +316,7 @@ private DatanodeDescriptor chooseLocalRack( * from the local rack */ - private void chooseRemoteRack(int numOfReplicas, + protected void chooseRemoteRack(int numOfReplicas, DatanodeDescriptor localMachine, HashMap excludedNodes, long blocksize, @@ -338,7 +338,7 @@ private void chooseRemoteRack(int numOfReplicas, /* Randomly choose one target from nodes. * @return the chosen node */ - private DatanodeDescriptor chooseRandom( + protected DatanodeDescriptor chooseRandom( String nodes, HashMap excludedNodes, long blocksize, @@ -382,7 +382,7 @@ private DatanodeDescriptor chooseRandom( /* Randomly choose numOfReplicas targets from nodes. */ - private void chooseRandom(int numOfReplicas, + protected void chooseRandom(int numOfReplicas, String nodes, HashMap excludedNodes, long blocksize, @@ -438,7 +438,7 @@ private boolean isGoodTarget(DatanodeDescriptor node, this.considerLoad, results); } - private boolean isGoodTarget(DatanodeDescriptor node, + protected boolean isGoodTarget(DatanodeDescriptor node, long blockSize, int maxTargetPerLoc, boolean considerLoad, List results) { @@ -574,8 +574,7 @@ public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, // pick replica from the first Set. If first is empty, then pick replicas // from second set. - Iterator iter = - first.isEmpty() ? second.iterator() : first.iterator(); + Iterator iter = pickupReplicaSet(first, second); // Pick the node with the oldest heartbeat or with the least free space, // if all hearbeats are within the tolerable heartbeat interval @@ -594,6 +593,20 @@ public DatanodeDescriptor chooseReplicaToDelete(BlockCollection bc, } return oldestHeartbeatNode != null ? oldestHeartbeatNode : minSpaceNode; } + + /** + * Pick up replica node set for deleting replica as over-replicated. + * First set contains replica nodes on rack with more than one + * replica while second set contains remaining replica nodes. + * So pick up first set if not empty. If first is empty, then pick second. + */ + protected Iterator pickupReplicaSet( + Collection first, + Collection second) { + Iterator iter = + first.isEmpty() ? second.iterator() : first.iterator(); + return iter; + } @VisibleForTesting void setPreferLocalNode(boolean prefer) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index ce570f7eba..aefd0befed 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -24,6 +24,7 @@ import java.util.Arrays; import java.util.HashMap; import java.util.List; +import java.util.Map; import java.util.Random; import org.apache.hadoop.conf.Configuration; @@ -34,7 +35,6 @@ import org.apache.hadoop.hdfs.HdfsConfiguration; import org.apache.hadoop.hdfs.MiniDFSCluster; import org.apache.hadoop.hdfs.protocol.Block; -import org.apache.hadoop.hdfs.protocol.DatanodeID; import org.apache.hadoop.hdfs.protocol.HdfsConstants; import org.apache.hadoop.hdfs.server.namenode.NameNode; import org.apache.hadoop.net.NetworkTopology; @@ -61,7 +61,7 @@ public static void setupCluster() throws Exception { DFSTestUtil.getDatanodeDescriptor("3.3.3.3", "/d1/r2"), DFSTestUtil.getDatanodeDescriptor("4.4.4.4", "/d1/r2"), DFSTestUtil.getDatanodeDescriptor("5.5.5.5", "/d2/r3"), - DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3") + DFSTestUtil.getDatanodeDescriptor("6.6.6.6", "/d2/r3") }; FileSystem.setDefaultUri(conf, "hdfs://localhost:0"); @@ -587,4 +587,50 @@ private void assertTheChosenBlocks( fifthPrioritySize, chosenBlocks.get( UnderReplicatedBlocks.QUEUE_WITH_CORRUPT_BLOCKS).size()); } + + /** + * Test for the chooseReplicaToDelete are processed based on + * block locality and free space + */ + @Test + public void testChooseReplicaToDelete() throws Exception { + List replicaNodeList = new + ArrayList(); + final Map> rackMap + = new HashMap>(); + + dataNodes[0].setRemaining(4*1024*1024); + replicaNodeList.add(dataNodes[0]); + + dataNodes[1].setRemaining(3*1024*1024); + replicaNodeList.add(dataNodes[1]); + + dataNodes[2].setRemaining(2*1024*1024); + replicaNodeList.add(dataNodes[2]); + + dataNodes[5].setRemaining(1*1024*1024); + replicaNodeList.add(dataNodes[5]); + + List first = new ArrayList(); + List second = new ArrayList(); + replicator.splitNodesWithRack( + replicaNodeList, rackMap, first, second); + // dataNodes[0] and dataNodes[1] are in first set as their rack has two + // replica nodes, while datanodes[2] and dataNodes[5] are in second set. + assertEquals(2, first.size()); + assertEquals(2, second.size()); + DatanodeDescriptor chosenNode = replicator.chooseReplicaToDelete( + null, null, (short)3, first, second); + // Within first set, dataNodes[1] with less free space + assertEquals(chosenNode, dataNodes[1]); + + replicator.adjustSetsWithChosenReplica( + rackMap, first, second, chosenNode); + assertEquals(0, first.size()); + assertEquals(3, second.size()); + // Within second set, dataNodes[5] with less free space + chosenNode = replicator.chooseReplicaToDelete( + null, null, (short)2, first, second); + assertEquals(chosenNode, dataNodes[5]); + } }