diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java index d00f961c50..d396845d48 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/blockmanagement/BlockPlacementPolicyDefault.java @@ -912,6 +912,24 @@ private static void logNodeIsNotChosen(DatanodeDescriptor node, reasonMap.put(reason, base + 1); } + /** + * Determine if a datanode should be chosen based on current workload. + * + * @param node The target datanode + * @return Return true if the datanode should be excluded, otherwise false + */ + boolean excludeNodeByLoad(DatanodeDescriptor node){ + final double maxLoad = considerLoadFactor * + stats.getInServiceXceiverAverage(); + final int nodeLoad = node.getXceiverCount(); + if ((nodeLoad > maxLoad) && (maxLoad > 0)) { + logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY, + "(load: " + nodeLoad + " > " + maxLoad + ")"); + return true; + } + return false; + } + /** * Determine if a datanode is good for placing block. * @@ -923,7 +941,7 @@ private static void logNodeIsNotChosen(DatanodeDescriptor node, * @param results A list containing currently chosen nodes. Used to check if * too many nodes has been chosen in the target rack. * @param avoidStaleNodes Whether or not to avoid choosing stale nodes - * @return Reture true if the datanode is good candidate, otherwise false + * @return Return true if the datanode is good candidate, otherwise false */ boolean isGoodDatanode(DatanodeDescriptor node, int maxTargetPerRack, boolean considerLoad, @@ -943,13 +961,8 @@ boolean isGoodDatanode(DatanodeDescriptor node, } // check the communication traffic of the target machine - if (considerLoad) { - final double maxLoad = considerLoadFactor * - stats.getInServiceXceiverAverage(); - final int nodeLoad = node.getXceiverCount(); - if (nodeLoad > maxLoad) { - logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY, - "(load: " + nodeLoad + " > " + maxLoad + ")"); + if(considerLoad){ + if(excludeNodeByLoad(node)){ return false; } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java index 27dcbf1856..f08fa131bf 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/blockmanagement/TestReplicationPolicy.java @@ -65,6 +65,7 @@ import org.apache.hadoop.hdfs.server.namenode.TestINodeFile; import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage; import org.apache.hadoop.net.Node; +import org.apache.hadoop.util.ReflectionUtils; import org.apache.log4j.Level; import org.apache.log4j.Logger; import org.apache.log4j.spi.LoggingEvent; @@ -1559,4 +1560,31 @@ public void testAvoidLocalWriteNoEnoughNodes() throws IOException { } assertTrue(found); } + + @Test + public void testMaxLoad() { + FSClusterStats statistics = mock(FSClusterStats.class); + DatanodeDescriptor node = mock(DatanodeDescriptor.class); + + when(statistics.getInServiceXceiverAverage()).thenReturn(0.0); + when(node.getXceiverCount()).thenReturn(1); + + final Configuration conf = new Configuration(); + final Class replicatorClass = conf + .getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY, + DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT, + BlockPlacementPolicy.class); + BlockPlacementPolicy bpp = ReflectionUtils. + newInstance(replicatorClass, conf); + assertTrue(bpp instanceof BlockPlacementPolicyDefault); + + BlockPlacementPolicyDefault bppd = (BlockPlacementPolicyDefault) bpp; + bppd.initialize(conf, statistics, null, null); + assertFalse(bppd.excludeNodeByLoad(node)); + + when(statistics.getInServiceXceiverAverage()).thenReturn(1.0); + when(node.getXceiverCount()).thenReturn(10); + assertTrue(bppd.excludeNodeByLoad(node)); + + } }