HDFS-4234. Use generic code for choosing datanode in Balancer.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1417130 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Tsz-wo Sze 2012-12-04 19:22:51 +00:00
parent c0a8957c2b
commit 1f4b135b1f
3 changed files with 81 additions and 194 deletions

View File

@ -179,6 +179,8 @@ Trunk (Unreleased)
HDFS-3358. Specify explicitly that the NN UI status total is talking HDFS-3358. Specify explicitly that the NN UI status total is talking
of persistent objects on heap. (harsh) of persistent objects on heap. (harsh)
HDFS-4234. Use generic code for choosing datanode in Balancer. (szetszwo)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -75,6 +75,7 @@
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.net.Node;
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Time;
@ -557,7 +558,7 @@ protected String getStorageID() {
} }
/** Decide if still need to move more bytes */ /** Decide if still need to move more bytes */
protected boolean isMoveQuotaFull() { protected boolean hasSpaceForScheduling() {
return scheduledSize<maxSize2Move; return scheduledSize<maxSize2Move;
} }
@ -922,23 +923,53 @@ private static <T extends BalancerDatanode> void logNodes(
LOG.info(nodes.size() + " " + name + ": " + nodes); LOG.info(nodes.size() + " " + name + ": " + nodes);
} }
/* Decide all <source, target> pairs and /** A matcher interface for matching nodes. */
private interface Matcher {
/** Given the cluster topology, does the left node match the right node? */
boolean match(NetworkTopology cluster, Node left, Node right);
}
/** Match datanodes in the same node group. */
static final Matcher SAME_NODE_GROUP = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameNodeGroup(left, right);
}
};
/** Match datanodes in the same rack. */
static final Matcher SAME_RACK = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return cluster.isOnSameRack(left, right);
}
};
/** Match any datanode with any other datanode. */
static final Matcher ANY_OTHER = new Matcher() {
@Override
public boolean match(NetworkTopology cluster, Node left, Node right) {
return left != right;
}
};
/**
* Decide all <source, target> pairs and
* the number of bytes to move from a source to a target * the number of bytes to move from a source to a target
* Maximum bytes to be moved per node is * Maximum bytes to be moved per node is
* Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE). * Min(1 Band worth of bytes, MAX_SIZE_TO_MOVE).
* Return total number of bytes to move in this iteration * Return total number of bytes to move in this iteration
*/ */
private long chooseNodes() { private long chooseNodes() {
// First, match nodes on the same node group if cluster has nodegroup // First, match nodes on the same node group if cluster is node group aware
// awareness
if (cluster.isNodeGroupAware()) { if (cluster.isNodeGroupAware()) {
chooseNodesOnSameNodeGroup(); chooseNodes(SAME_NODE_GROUP);
} }
// Then, match nodes on the same rack // Then, match nodes on the same rack
chooseNodes(true); chooseNodes(SAME_RACK);
// At last, match nodes on different racks // At last, match all remaining nodes
chooseNodes(false); chooseNodes(ANY_OTHER);
assert (datanodes.size() >= sources.size()+targets.size()) assert (datanodes.size() >= sources.size()+targets.size())
: "Mismatched number of datanodes (" + : "Mismatched number of datanodes (" +
@ -952,57 +983,55 @@ private long chooseNodes() {
} }
return bytesToMove; return bytesToMove;
} }
/**
* Decide all <source, target> pairs where source and target are
* on the same NodeGroup
*/
private void chooseNodesOnSameNodeGroup() {
/** Decide all <source, target> pairs according to the matcher. */
private void chooseNodes(final Matcher matcher) {
/* first step: match each overUtilized datanode (source) to /* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes within same NodeGroup(targets). * one or more underUtilized datanodes (targets).
*/ */
chooseOnSameNodeGroup(overUtilizedDatanodes, underUtilizedDatanodes); chooseDatanodes(overUtilizedDatanodes, underUtilizedDatanodes, matcher);
/* match each remaining overutilized datanode (source) to below average /* match each remaining overutilized datanode (source) to
* utilized datanodes within the same NodeGroup(targets). * below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move * Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected * satisfied in step 1 are selected
*/ */
chooseOnSameNodeGroup(overUtilizedDatanodes, belowAvgUtilizedDatanodes); chooseDatanodes(overUtilizedDatanodes, belowAvgUtilizedDatanodes, matcher);
/* match each remaining underutilized datanode to above average utilized /* match each remaining underutilized datanode (target) to
* datanodes within the same NodeGroup. * above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to * Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected. * move satisfied in step 1 are selected.
*/ */
chooseOnSameNodeGroup(underUtilizedDatanodes, aboveAvgUtilizedDatanodes); chooseDatanodes(underUtilizedDatanodes, aboveAvgUtilizedDatanodes, matcher);
} }
/** /**
* Match two sets of nodes within the same NodeGroup, one should be source * For each datanode, choose matching nodes from the candidates. Either the
* nodes (utilization > Avg), and the other should be destination nodes * datanodes or the candidates are source nodes with (utilization > Avg), and
* (utilization < Avg). * the others are target nodes with (utilization < Avg).
* @param datanodes
* @param candidates
*/ */
private <D extends BalancerDatanode, C extends BalancerDatanode> void private <D extends BalancerDatanode, C extends BalancerDatanode> void
chooseOnSameNodeGroup(Collection<D> datanodes, Collection<C> candidates) { chooseDatanodes(Collection<D> datanodes, Collection<C> candidates,
Matcher matcher) {
for (Iterator<D> i = datanodes.iterator(); i.hasNext();) { for (Iterator<D> i = datanodes.iterator(); i.hasNext();) {
final D datanode = i.next(); final D datanode = i.next();
for(; chooseOnSameNodeGroup(datanode, candidates.iterator()); ); for(; chooseForOneDatanode(datanode, candidates, matcher); );
if (!datanode.isMoveQuotaFull()) { if (!datanode.hasSpaceForScheduling()) {
i.remove(); i.remove();
} }
} }
} }
/** /**
* Match one datanode with a set of candidates nodes within the same NodeGroup. * For the given datanode, choose a candidate and then schedule it.
* @return true if a candidate is chosen; false if no candidates is chosen.
*/ */
private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup( private <C extends BalancerDatanode> boolean chooseForOneDatanode(
BalancerDatanode dn, Iterator<T> candidates) { BalancerDatanode dn, Collection<C> candidates, Matcher matcher) {
final T chosen = chooseCandidateOnSameNodeGroup(dn, candidates); final Iterator<C> i = candidates.iterator();
final C chosen = chooseCandidate(dn, i, matcher);
if (chosen == null) { if (chosen == null) {
return false; return false;
} }
@ -1011,8 +1040,8 @@ private <T extends BalancerDatanode> boolean chooseOnSameNodeGroup(
} else { } else {
matchSourceWithTargetToMove((Source)chosen, dn); matchSourceWithTargetToMove((Source)chosen, dn);
} }
if (!chosen.isMoveQuotaFull()) { if (!chosen.hasSpaceForScheduling()) {
candidates.remove(); i.remove();
} }
return true; return true;
} }
@ -1029,19 +1058,15 @@ private void matchSourceWithTargetToMove(
+source.datanode.getName() + " to " + target.datanode.getName()); +source.datanode.getName() + " to " + target.datanode.getName());
} }
/** choose a datanode from <code>candidates</code> within the same NodeGroup /** Choose a candidate for the given datanode. */
* of <code>dn</code>. private <D extends BalancerDatanode, C extends BalancerDatanode>
*/ C chooseCandidate(D dn, Iterator<C> candidates, Matcher matcher) {
private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup( if (dn.hasSpaceForScheduling()) {
BalancerDatanode dn, Iterator<T> candidates) {
if (dn.isMoveQuotaFull()) {
for(; candidates.hasNext(); ) { for(; candidates.hasNext(); ) {
final T c = candidates.next(); final C c = candidates.next();
if (!c.isMoveQuotaFull()) { if (!c.hasSpaceForScheduling()) {
candidates.remove(); candidates.remove();
continue; } else if (matcher.match(cluster, dn.getDatanode(), c.getDatanode())) {
}
if (cluster.isOnSameNodeGroup(dn.getDatanode(), c.getDatanode())) {
return c; return c;
} }
} }
@ -1049,148 +1074,6 @@ private <T extends BalancerDatanode> T chooseCandidateOnSameNodeGroup(
return null; return null;
} }
/* if onRack is true, decide all <source, target> pairs
* where source and target are on the same rack; Otherwise
* decide all <source, target> pairs where source and target are
* on different racks
*/
private void chooseNodes(boolean onRack) {
/* first step: match each overUtilized datanode (source) to
* one or more underUtilized datanodes (targets).
*/
chooseTargets(underUtilizedDatanodes, onRack);
/* match each remaining overutilized datanode (source) to
* below average utilized datanodes (targets).
* Note only overutilized datanodes that haven't had that max bytes to move
* satisfied in step 1 are selected
*/
chooseTargets(belowAvgUtilizedDatanodes, onRack);
/* match each remaining underutilized datanode (target) to
* above average utilized datanodes (source).
* Note only underutilized datanodes that have not had that max bytes to
* move satisfied in step 1 are selected.
*/
chooseSources(aboveAvgUtilizedDatanodes, onRack);
}
/* choose targets from the target candidate list for each over utilized
* source datanode. OnRackTarget determines if the chosen target
* should be on the same rack as the source
*/
private void chooseTargets(
Collection<BalancerDatanode> targetCandidates, boolean onRackTarget ) {
for (Iterator<Source> srcIterator = overUtilizedDatanodes.iterator();
srcIterator.hasNext();) {
Source source = srcIterator.next();
while (chooseTarget(source, targetCandidates.iterator(), onRackTarget)) {
}
if (!source.isMoveQuotaFull()) {
srcIterator.remove();
}
}
return;
}
/* choose sources from the source candidate list for each under utilized
* target datanode. onRackSource determines if the chosen source
* should be on the same rack as the target
*/
private void chooseSources(
Collection<Source> sourceCandidates, boolean onRackSource) {
for (Iterator<BalancerDatanode> targetIterator =
underUtilizedDatanodes.iterator(); targetIterator.hasNext();) {
BalancerDatanode target = targetIterator.next();
while (chooseSource(target, sourceCandidates.iterator(), onRackSource)) {
}
if (!target.isMoveQuotaFull()) {
targetIterator.remove();
}
}
return;
}
/* For the given source, choose targets from the target candidate list.
* OnRackTarget determines if the chosen target
* should be on the same rack as the source
*/
private boolean chooseTarget(Source source,
Iterator<BalancerDatanode> targetCandidates, boolean onRackTarget) {
if (!source.isMoveQuotaFull()) {
return false;
}
boolean foundTarget = false;
BalancerDatanode target = null;
while (!foundTarget && targetCandidates.hasNext()) {
target = targetCandidates.next();
if (!target.isMoveQuotaFull()) {
targetCandidates.remove();
continue;
}
if (onRackTarget) {
// choose from on-rack nodes
if (cluster.isOnSameRack(source.datanode, target.datanode)) {
foundTarget = true;
}
} else {
// choose from off-rack nodes
if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
foundTarget = true;
}
}
}
if (foundTarget) {
assert(target != null):"Choose a null target";
matchSourceWithTargetToMove(source, target);
if (!target.isMoveQuotaFull()) {
targetCandidates.remove();
}
return true;
}
return false;
}
/* For the given target, choose sources from the source candidate list.
* OnRackSource determines if the chosen source
* should be on the same rack as the target
*/
private boolean chooseSource(BalancerDatanode target,
Iterator<Source> sourceCandidates, boolean onRackSource) {
if (!target.isMoveQuotaFull()) {
return false;
}
boolean foundSource = false;
Source source = null;
while (!foundSource && sourceCandidates.hasNext()) {
source = sourceCandidates.next();
if (!source.isMoveQuotaFull()) {
sourceCandidates.remove();
continue;
}
if (onRackSource) {
// choose from on-rack nodes
if ( cluster.isOnSameRack(source.getDatanode(), target.getDatanode())) {
foundSource = true;
}
} else {
// choose from off-rack nodes
if (!cluster.isOnSameRack(source.datanode, target.datanode)) {
foundSource = true;
}
}
}
if (foundSource) {
assert(source != null):"Choose a null source";
matchSourceWithTargetToMove(source, target);
if ( !source.isMoveQuotaFull()) {
sourceCandidates.remove();
}
return true;
}
return false;
}
private static class BytesMoved { private static class BytesMoved {
private long bytesMoved = 0L;; private long bytesMoved = 0L;;
private synchronized void inc( long bytes ) { private synchronized void inc( long bytes ) {

View File

@ -17,13 +17,15 @@
*/ */
package org.apache.hadoop.hdfs.server.balancer; package org.apache.hadoop.hdfs.server.balancer;
import static org.junit.Assert.assertEquals;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.Map; import java.util.Map;
import java.util.concurrent.TimeoutException; import java.util.concurrent.TimeoutException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -43,7 +45,7 @@
/** /**
* This class tests if a balancer schedules tasks correctly. * This class tests if a balancer schedules tasks correctly.
*/ */
public class TestBalancerWithNodeGroup extends TestCase { public class TestBalancerWithNodeGroup {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
"org.apache.hadoop.hdfs.TestBalancerWithNodeGroup"); "org.apache.hadoop.hdfs.TestBalancerWithNodeGroup");