diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 61ae9947a8..ad5f222b89 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -179,6 +179,8 @@ Trunk (Unreleased) HDFS-3358. Specify explicitly that the NN UI status total is talking of persistent objects on heap. (harsh) + HDFS-4234. Use generic code for choosing datanode in Balancer. (szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 734b3ed269..473f259234 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -75,6 +75,7 @@ import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetworkTopology; +import org.apache.hadoop.net.Node; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; @@ -557,7 +558,7 @@ protected String getStorageID() { } /** Decide if still need to move more bytes */ - protected boolean isMoveQuotaFull() { + protected boolean hasSpaceForScheduling() { return scheduledSize void logNodes( LOG.info(nodes.size() + " " + name + ": " + nodes); } - /* Decide all pairs and + /** A matcher interface for matching nodes. */ + private interface Matcher { + /** Given the cluster topology, does the left node match the right node? */ + boolean match(NetworkTopology cluster, Node left, Node right); + } + + /** Match datanodes in the same node group. */ + static final Matcher SAME_NODE_GROUP = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameNodeGroup(left, right); + } + }; + + /** Match datanodes in the same rack. */ + static final Matcher SAME_RACK = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return cluster.isOnSameRack(left, right); + } + }; + + /** Match any datanode with any other datanode. */ + static final Matcher ANY_OTHER = new Matcher() { + @Override + public boolean match(NetworkTopology cluster, Node left, Node right) { + return left != right; + } + }; + + /** + * Decide all pairs and * the number of bytes to move from a source to a target * Maximum bytes to be moved per node is * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). * Return total number of bytes to move in this iteration */ private long chooseNodes() { - // First, match nodes on the same node group if cluster has nodegroup - // awareness + // First, match nodes on the same node group if cluster is node group aware if (cluster.isNodeGroupAware()) { - chooseNodesOnSameNodeGroup(); + chooseNodes(SAME_NODE_GROUP); } // Then, match nodes on the same rack - chooseNodes(true); - // At last, match nodes on different racks - chooseNodes(false); + chooseNodes(SAME_RACK); + // At last, match all remaining nodes + chooseNodes(ANY_OTHER); assert (datanodes.size() >= sources.size()+targets.size()) : "Mismatched number of datanodes (" + @@ -952,57 +983,55 @@ private long chooseNodes() { } return bytesToMove; } - - /** - * Decide all pairs where source and target are - * on the same NodeGroup - */ - private void chooseNodesOnSameNodeGroup() { + /** Decide all pairs according to the matcher. */ + private void chooseNodes(final Matcher matcher) { /* first step: match each overUtilized datanode (source) to - * one or more underUtilized datanodes within same NodeGroup(targets). + * one or more underUtilized datanodes (targets). */ - chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes); - - /* match each remaining overutilized datanode (source) to below average - * utilized datanodes within the same NodeGroup(targets). + chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher); + + /* match each remaining overutilized datanode (source) to + * below average utilized datanodes (targets). * Note only overutilized datanodes that haven't had that max bytes to move * satisfied in step 1 are selected */ - chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes); + chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher); - /* match each remaining underutilized datanode to above average utilized - * datanodes within the same NodeGroup. + /* match each remaining underutilized datanode (target) to + * above average utilized datanodes (source). * Note only underutilized datanodes that have not had that max bytes to * move satisfied in step 1 are selected. */ - chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes); + chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher); } - + /** - * Match two sets of nodes within the same NodeGroup, one should be source - * nodes (utilization > Avg), and the other should be destination nodes - * (utilization < Avg). - * @param datanodes - * @param candidates + * For each datanode, choose matching nodes from the candidates. Either the + * datanodes or the candidates are source nodes with (utilization > Avg), and + * the others are target nodes with (utilization < Avg). */ private void - chooseOnSameNodeGroup(Collection datanodes, Collection candidates) { + chooseDatanodes(Collection datanodes, Collection candidates, + Matcher matcher) { for (Iterator i = datanodes.iterator(); i.hasNext();) { final D datanode = i.next(); - for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); ); - if (!datanode.isMoveQuotaFull()) { + for(; chooseForOneDatanode(datanode, candidates, matcher); ); + if (!datanode.hasSpaceForScheduling()) { i.remove(); } } } - + /** - * Match one datanode with a set of candidates nodes within the same NodeGroup. + * For the given datanode, choose a candidate and then schedule it. + * @return true if a candidate is chosen; false if no candidates is chosen. */ - private boolean chooseOnSameNodeGroup( - BalancerDatanode dn, Iterator candidates) { - final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates); + private boolean chooseForOneDatanode( + BalancerDatanode dn, Collection candidates, Matcher matcher) { + final Iterator i = candidates.iterator(); + final C chosen = chooseCandidate(dn, i, matcher); + if (chosen == null) { return false; } @@ -1011,8 +1040,8 @@ private boolean chooseOnSameNodeGroup( } else { matchSourceWithTargetToMove((Source)chosen, dn); } - if (!chosen.isMoveQuotaFull()) { - candidates.remove(); + if (!chosen.hasSpaceForScheduling()) { + i.remove(); } return true; } @@ -1029,19 +1058,15 @@ private void matchSourceWithTargetToMove( +source.datanode.getName() + " to " + target.datanode.getName()); } - /** choose a datanode from candidates within the same NodeGroup - * of dn. - */ - private T chooseCandidateOnSameNodeGroup( - BalancerDatanode dn, Iterator candidates) { - if (dn.isMoveQuotaFull()) { + /** Choose a candidate for the given datanode. */ + private + C chooseCandidate(D dn, Iterator candidates, Matcher matcher) { + if (dn.hasSpaceForScheduling()) { for(; candidates.hasNext(); ) { - final T c = candidates.next(); - if (!c.isMoveQuotaFull()) { + final C c = candidates.next(); + if (!c.hasSpaceForScheduling()) { candidates.remove(); - continue; - } - if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) { + } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) { return c; } } @@ -1049,148 +1074,6 @@ private T chooseCandidateOnSameNodeGroup( return null; } - /* if onRack is true, decide all pairs - * where source and target are on the same rack; Otherwise - * decide all pairs where source and target are - * on different racks - */ - private void chooseNodes(boolean onRack) { - /* first step: match each overUtilized datanode (source) to - * one or more underUtilized datanodes (targets). - */ - chooseTargets(underUtilizedDatanodes, onRack); - - /* match each remaining overutilized datanode (source) to - * below average utilized datanodes (targets). - * Note only overutilized datanodes that haven't had that max bytes to move - * satisfied in step 1 are selected - */ - chooseTargets(belowAvgUtilizedDatanodes, onRack); - - /* match each remaining underutilized datanode (target) to - * above average utilized datanodes (source). - * Note only underutilized datanodes that have not had that max bytes to - * move satisfied in step 1 are selected. - */ - chooseSources(aboveAvgUtilizedDatanodes, onRack); - } - - /* choose targets from the target candidate list for each over utilized - * source datanode. OnRackTarget determines if the chosen target - * should be on the same rack as the source - */ - private void chooseTargets( - Collection targetCandidates, boolean onRackTarget ) { - for (Iterator srcIterator = overUtilizedDatanodes.iterator(); - srcIterator.hasNext();) { - Source source = srcIterator.next(); - while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) { - } - if (!source.isMoveQuotaFull()) { - srcIterator.remove(); - } - } - return; - } - - /* choose sources from the source candidate list for each under utilized - * target datanode. onRackSource determines if the chosen source - * should be on the same rack as the target - */ - private void chooseSources( - Collection sourceCandidates, boolean onRackSource) { - for (Iterator targetIterator = - underUtilizedDatanodes.iterator(); targetIterator.hasNext();) { - BalancerDatanode target = targetIterator.next(); - while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) { - } - if (!target.isMoveQuotaFull()) { - targetIterator.remove(); - } - } - return; - } - - /* For the given source, choose targets from the target candidate list. - * OnRackTarget determines if the chosen target - * should be on the same rack as the source - */ - private boolean chooseTarget(Source source, - Iterator targetCandidates, boolean onRackTarget) { - if (!source.isMoveQuotaFull()) { - return false; - } - boolean foundTarget = false; - BalancerDatanode target = null; - while (!foundTarget && targetCandidates.hasNext()) { - target = targetCandidates.next(); - if (!target.isMoveQuotaFull()) { - targetCandidates.remove(); - continue; - } - if (onRackTarget) { - // choose from on-rack nodes - if (cluster.isOnSameRack(source.datanode, target.datanode)) { - foundTarget = true; - } - } else { - // choose from off-rack nodes - if (!cluster.isOnSameRack(source.datanode, target.datanode)) { - foundTarget = true; - } - } - } - if (foundTarget) { - assert(target != null):"Choose a null target"; - matchSourceWithTargetToMove(source, target); - if (!target.isMoveQuotaFull()) { - targetCandidates.remove(); - } - return true; - } - return false; - } - - /* For the given target, choose sources from the source candidate list. - * OnRackSource determines if the chosen source - * should be on the same rack as the target - */ - private boolean chooseSource(BalancerDatanode target, - Iterator sourceCandidates, boolean onRackSource) { - if (!target.isMoveQuotaFull()) { - return false; - } - boolean foundSource = false; - Source source = null; - while (!foundSource && sourceCandidates.hasNext()) { - source = sourceCandidates.next(); - if (!source.isMoveQuotaFull()) { - sourceCandidates.remove(); - continue; - } - if (onRackSource) { - // choose from on-rack nodes - if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) { - foundSource = true; - } - } else { - // choose from off-rack nodes - if (!cluster.isOnSameRack(source.datanode, target.datanode)) { - foundSource = true; - } - } - } - if (foundSource) { - assert(source != null):"Choose a null source"; - matchSourceWithTargetToMove(source, target); - if ( !source.isMoveQuotaFull()) { - sourceCandidates.remove(); - } - return true; - } - return false; - } - private static class BytesMoved { private long bytesMoved = 0L;; private synchronized void inc( long bytes ) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 33e4fa82b2..15cd7d7086 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -17,13 +17,15 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import static org.junit.Assert.assertEquals; + import java.io.IOException; import java.net.URI; import java.util.Collection; import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeoutException; -import junit.framework.TestCase; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -43,7 +45,7 @@ /** * This class tests if a balancer schedules tasks correctly. */ -public class TestBalancerWithNodeGroup extends TestCase { +public class TestBalancerWithNodeGroup { private static final Log LOG = LogFactory.getLog( "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");