HDFS-13833. Improve BlockPlacementPolicyDefault's consider load logic. Contributed by Shweta.
Signed-off-by: Xiao Chen <xiao@apache.org>
This commit is contained in:
parent
3929653707
commit
27978bcb66
@ -912,6 +912,24 @@ private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
|||||||
reasonMap.put(reason, base + 1);
|
reasonMap.put(reason, base + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Determine if a datanode should be chosen based on current workload.
|
||||||
|
*
|
||||||
|
* @param node The target datanode
|
||||||
|
* @return Return true if the datanode should be excluded, otherwise false
|
||||||
|
*/
|
||||||
|
boolean excludeNodeByLoad(DatanodeDescriptor node){
|
||||||
|
final double maxLoad = considerLoadFactor *
|
||||||
|
stats.getInServiceXceiverAverage();
|
||||||
|
final int nodeLoad = node.getXceiverCount();
|
||||||
|
if ((nodeLoad > maxLoad) && (maxLoad > 0)) {
|
||||||
|
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
|
||||||
|
"(load: " + nodeLoad + " > " + maxLoad + ")");
|
||||||
|
return true;
|
||||||
|
}
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Determine if a datanode is good for placing block.
|
* Determine if a datanode is good for placing block.
|
||||||
*
|
*
|
||||||
@ -923,7 +941,7 @@ private static void logNodeIsNotChosen(DatanodeDescriptor node,
|
|||||||
* @param results A list containing currently chosen nodes. Used to check if
|
* @param results A list containing currently chosen nodes. Used to check if
|
||||||
* too many nodes has been chosen in the target rack.
|
* too many nodes has been chosen in the target rack.
|
||||||
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
|
* @param avoidStaleNodes Whether or not to avoid choosing stale nodes
|
||||||
* @return Reture true if the datanode is good candidate, otherwise false
|
* @return Return true if the datanode is good candidate, otherwise false
|
||||||
*/
|
*/
|
||||||
boolean isGoodDatanode(DatanodeDescriptor node,
|
boolean isGoodDatanode(DatanodeDescriptor node,
|
||||||
int maxTargetPerRack, boolean considerLoad,
|
int maxTargetPerRack, boolean considerLoad,
|
||||||
@ -943,13 +961,8 @@ boolean isGoodDatanode(DatanodeDescriptor node,
|
|||||||
}
|
}
|
||||||
|
|
||||||
// check the communication traffic of the target machine
|
// check the communication traffic of the target machine
|
||||||
if (considerLoad) {
|
if(considerLoad){
|
||||||
final double maxLoad = considerLoadFactor *
|
if(excludeNodeByLoad(node)){
|
||||||
stats.getInServiceXceiverAverage();
|
|
||||||
final int nodeLoad = node.getXceiverCount();
|
|
||||||
if (nodeLoad > maxLoad) {
|
|
||||||
logNodeIsNotChosen(node, NodeNotChosenReason.NODE_TOO_BUSY,
|
|
||||||
"(load: " + nodeLoad + " > " + maxLoad + ")");
|
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -65,6 +65,7 @@
|
|||||||
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
import org.apache.hadoop.hdfs.server.namenode.TestINodeFile;
|
||||||
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
import org.apache.hadoop.hdfs.server.protocol.DatanodeStorage;
|
||||||
import org.apache.hadoop.net.Node;
|
import org.apache.hadoop.net.Node;
|
||||||
|
import org.apache.hadoop.util.ReflectionUtils;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.log4j.Logger;
|
import org.apache.log4j.Logger;
|
||||||
import org.apache.log4j.spi.LoggingEvent;
|
import org.apache.log4j.spi.LoggingEvent;
|
||||||
@ -1559,4 +1560,31 @@ public void testAvoidLocalWriteNoEnoughNodes() throws IOException {
|
|||||||
}
|
}
|
||||||
assertTrue(found);
|
assertTrue(found);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testMaxLoad() {
|
||||||
|
FSClusterStats statistics = mock(FSClusterStats.class);
|
||||||
|
DatanodeDescriptor node = mock(DatanodeDescriptor.class);
|
||||||
|
|
||||||
|
when(statistics.getInServiceXceiverAverage()).thenReturn(0.0);
|
||||||
|
when(node.getXceiverCount()).thenReturn(1);
|
||||||
|
|
||||||
|
final Configuration conf = new Configuration();
|
||||||
|
final Class<? extends BlockPlacementPolicy> replicatorClass = conf
|
||||||
|
.getClass(DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_KEY,
|
||||||
|
DFSConfigKeys.DFS_BLOCK_REPLICATOR_CLASSNAME_DEFAULT,
|
||||||
|
BlockPlacementPolicy.class);
|
||||||
|
BlockPlacementPolicy bpp = ReflectionUtils.
|
||||||
|
newInstance(replicatorClass, conf);
|
||||||
|
assertTrue(bpp instanceof BlockPlacementPolicyDefault);
|
||||||
|
|
||||||
|
BlockPlacementPolicyDefault bppd = (BlockPlacementPolicyDefault) bpp;
|
||||||
|
bppd.initialize(conf, statistics, null, null);
|
||||||
|
assertFalse(bppd.excludeNodeByLoad(node));
|
||||||
|
|
||||||
|
when(statistics.getInServiceXceiverAverage()).thenReturn(1.0);
|
||||||
|
when(node.getXceiverCount()).thenReturn(10);
|
||||||
|
assertTrue(bppd.excludeNodeByLoad(node));
|
||||||
|
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user