HDFS-17642. Add target node list, exclude source node list, and exclude target node list parameters to balancer (#7127)
HDFS-17642. Add target node list, exclude source node list, and exclude target node list parameters to balancer (#7127) --------- Co-authored-by: Joseph DellAringa <jdellari@linkedin.com>
This commit is contained in:
parent
487727a5d1
commit
9657276492
@ -195,6 +195,12 @@ public class Balancer {
|
||||
+ "\tIncludes only the specified datanodes."
|
||||
+ "\n\t[-source [-f <hosts-file> | <comma-separated list of hosts>]]"
|
||||
+ "\tPick only the specified datanodes as source nodes."
|
||||
+ "\n\t[-excludeSource [-f <hosts-file> | <comma-separated list of hosts>]]"
|
||||
+ "\tExcludes the specified datanodes to be selected as a source."
|
||||
+ "\n\t[-target [-f <hosts-file> | <comma-separated list of hosts>]]"
|
||||
+ "\tPick only the specified datanodes as target nodes."
|
||||
+ "\n\t[-excludeTarget [-f <hosts-file> | <comma-separated list of hosts>]]"
|
||||
+ "\tExcludes the specified datanodes from being selected as a target."
|
||||
+ "\n\t[-blockpools <comma-separated list of blockpool ids>]"
|
||||
+ "\tThe balancer will only run on blockpools included in this list."
|
||||
+ "\n\t[-idleiterations <idleiterations>]"
|
||||
@ -224,6 +230,9 @@ public class Balancer {
|
||||
private final NameNodeConnector nnc;
|
||||
private final BalancingPolicy policy;
|
||||
private final Set<String> sourceNodes;
|
||||
private final Set<String> excludedSourceNodes;
|
||||
private final Set<String> targetNodes;
|
||||
private final Set<String> excludedTargetNodes;
|
||||
private final boolean runDuringUpgrade;
|
||||
private final double threshold;
|
||||
private final long maxSizeToMove;
|
||||
@ -353,6 +362,9 @@ static int getFailedTimesSinceLastSuccessfulBalance() {
|
||||
this.threshold = p.getThreshold();
|
||||
this.policy = p.getBalancingPolicy();
|
||||
this.sourceNodes = p.getSourceNodes();
|
||||
this.excludedSourceNodes = p.getExcludedSourceNodes();
|
||||
this.targetNodes = p.getTargetNodes();
|
||||
this.excludedTargetNodes = p.getExcludedTargetNodes();
|
||||
this.runDuringUpgrade = p.getRunDuringUpgrade();
|
||||
this.sortTopNodes = p.getSortTopNodes();
|
||||
this.limitOverUtilizedNum = p.getLimitOverUtilizedNum();
|
||||
@ -411,7 +423,10 @@ private long init(List<DatanodeStorageReport> reports) {
|
||||
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
||||
for(DatanodeStorageReport r : reports) {
|
||||
final DDatanode dn = dispatcher.newDatanode(r.getDatanodeInfo());
|
||||
final boolean isSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo());
|
||||
final boolean isValidSource = Util.isIncluded(sourceNodes, dn.getDatanodeInfo()) &&
|
||||
!Util.isExcluded(excludedSourceNodes, dn.getDatanodeInfo());
|
||||
final boolean isValidTarget = Util.isIncluded(targetNodes, dn.getDatanodeInfo()) &&
|
||||
!Util.isExcluded(excludedTargetNodes, dn.getDatanodeInfo());
|
||||
for(StorageType t : StorageType.getMovableTypes()) {
|
||||
final Double utilization = policy.getUtilization(r, t);
|
||||
if (utilization == null) { // datanode does not have such storage type
|
||||
@ -419,10 +434,15 @@ private long init(List<DatanodeStorageReport> reports) {
|
||||
}
|
||||
|
||||
final double average = policy.getAvgUtilization(t);
|
||||
if (utilization >= average && !isSource) {
|
||||
LOG.info(dn + "[" + t + "] has utilization=" + utilization
|
||||
+ " >= average=" + average
|
||||
+ " but it is not specified as a source; skipping it.");
|
||||
if (utilization >= average && !isValidSource) {
|
||||
LOG.info("{} [{}] utilization {} >= average {}, but it's either not specified"
|
||||
+ " or excluded as a source; skipping.", dn, t, utilization, average);
|
||||
continue;
|
||||
}
|
||||
if (utilization <= average && !isValidTarget) {
|
||||
LOG.info("{} [{}] utilization {} <= average {}, but it's either not specified"
|
||||
+ " or excluded as a target; skipping.",
|
||||
dn, t, utilization, average);
|
||||
continue;
|
||||
}
|
||||
|
||||
@ -829,6 +849,9 @@ static private int doBalance(Collection<URI> namenodes,
|
||||
LOG.info("included nodes = " + p.getIncludedNodes());
|
||||
LOG.info("excluded nodes = " + p.getExcludedNodes());
|
||||
LOG.info("source nodes = " + p.getSourceNodes());
|
||||
LOG.info("excluded source nodes = " + p.getExcludedSourceNodes());
|
||||
LOG.info("target nodes = " + p.getTargetNodes());
|
||||
LOG.info("excluded target nodes = " + p.getExcludedTargetNodes());
|
||||
checkKeytabAndInit(conf);
|
||||
System.out.println("Time Stamp Iteration#"
|
||||
+ " Bytes Already Moved Bytes Left To Move Bytes Being Moved"
|
||||
@ -1016,6 +1039,10 @@ public int run(String[] args) {
|
||||
static BalancerParameters parse(String[] args) {
|
||||
Set<String> excludedNodes = null;
|
||||
Set<String> includedNodes = null;
|
||||
Set<String> sourceNodes = null;
|
||||
Set<String> excludedSourceNodes = null;
|
||||
Set<String> targetNodes = null;
|
||||
Set<String> excludedTargetNodes = null;
|
||||
BalancerParameters.Builder b = new BalancerParameters.Builder();
|
||||
|
||||
if (args != null) {
|
||||
@ -1056,9 +1083,21 @@ static BalancerParameters parse(String[] args) {
|
||||
i = processHostList(args, i, "include", includedNodes);
|
||||
b.setIncludedNodes(includedNodes);
|
||||
} else if ("-source".equalsIgnoreCase(args[i])) {
|
||||
Set<String> sourceNodes = new HashSet<>();
|
||||
sourceNodes = new HashSet<>();
|
||||
i = processHostList(args, i, "source", sourceNodes);
|
||||
b.setSourceNodes(sourceNodes);
|
||||
} else if ("-excludeSource".equalsIgnoreCase(args[i])) {
|
||||
excludedSourceNodes = new HashSet<>();
|
||||
i = processHostList(args, i, "exclude source", excludedSourceNodes);
|
||||
b.setExcludedSourceNodes(excludedSourceNodes);
|
||||
} else if ("-target".equalsIgnoreCase(args[i])) {
|
||||
targetNodes = new HashSet<>();
|
||||
i = processHostList(args, i, "target", targetNodes);
|
||||
b.setTargetNodes(targetNodes);
|
||||
} else if ("-excludeTarget".equalsIgnoreCase(args[i])) {
|
||||
excludedTargetNodes = new HashSet<>();
|
||||
i = processHostList(args, i, "exclude target", excludedTargetNodes);
|
||||
b.setExcludedTargetNodes(excludedTargetNodes);
|
||||
} else if ("-blockpools".equalsIgnoreCase(args[i])) {
|
||||
Preconditions.checkArgument(
|
||||
++i < args.length,
|
||||
@ -1111,6 +1150,10 @@ static BalancerParameters parse(String[] args) {
|
||||
}
|
||||
Preconditions.checkArgument(excludedNodes == null || includedNodes == null,
|
||||
"-exclude and -include options cannot be specified together.");
|
||||
Preconditions.checkArgument(excludedSourceNodes == null || sourceNodes == null,
|
||||
"-excludeSource and -source options cannot be specified together.");
|
||||
Preconditions.checkArgument(excludedTargetNodes == null || targetNodes == null,
|
||||
"-excludeTarget and -target options cannot be specified together.");
|
||||
} catch(RuntimeException e) {
|
||||
printUsage(System.err);
|
||||
throw e;
|
||||
|
@ -37,6 +37,21 @@ final class BalancerParameters {
|
||||
* source nodes.
|
||||
*/
|
||||
private final Set<String> sourceNodes;
|
||||
/**
|
||||
* If empty, any node can be a source; otherwise, these nodes will be excluded as
|
||||
* source nodes.
|
||||
*/
|
||||
private final Set<String> excludedSourceNodes;
|
||||
/**
|
||||
* If empty, any node can be a target; otherwise, use only these nodes as
|
||||
* target nodes.
|
||||
*/
|
||||
private final Set<String> targetNodes;
|
||||
/**
|
||||
* If empty, any node can be a target; otherwise, these nodes will be excluded as
|
||||
* target nodes.
|
||||
*/
|
||||
private final Set<String> excludedTargetNodes;
|
||||
/**
|
||||
* A set of block pools to run the balancer on.
|
||||
*/
|
||||
@ -65,6 +80,9 @@ private BalancerParameters(Builder builder) {
|
||||
this.excludedNodes = builder.excludedNodes;
|
||||
this.includedNodes = builder.includedNodes;
|
||||
this.sourceNodes = builder.sourceNodes;
|
||||
this.excludedSourceNodes = builder.excludedSourceNodes;
|
||||
this.targetNodes = builder.targetNodes;
|
||||
this.excludedTargetNodes = builder.excludedTargetNodes;
|
||||
this.blockpools = builder.blockpools;
|
||||
this.runDuringUpgrade = builder.runDuringUpgrade;
|
||||
this.runAsService = builder.runAsService;
|
||||
@ -97,6 +115,18 @@ Set<String> getSourceNodes() {
|
||||
return this.sourceNodes;
|
||||
}
|
||||
|
||||
Set<String> getExcludedSourceNodes() {
|
||||
return this.excludedSourceNodes;
|
||||
}
|
||||
|
||||
Set<String> getTargetNodes() {
|
||||
return this.targetNodes;
|
||||
}
|
||||
|
||||
Set<String> getExcludedTargetNodes() {
|
||||
return this.excludedTargetNodes;
|
||||
}
|
||||
|
||||
Set<String> getBlockPools() {
|
||||
return this.blockpools;
|
||||
}
|
||||
@ -126,12 +156,15 @@ public String toString() {
|
||||
return String.format("%s.%s [%s," + " threshold = %s,"
|
||||
+ " max idle iteration = %s," + " #excluded nodes = %s,"
|
||||
+ " #included nodes = %s," + " #source nodes = %s,"
|
||||
+ " #excluded source nodes = %s," + " #target nodes = %s,"
|
||||
+ " #excluded target nodes = %s,"
|
||||
+ " #blockpools = %s," + " run during upgrade = %s,"
|
||||
+ " sort top nodes = %s," + " limit overUtilized nodes num = %s,"
|
||||
+ " hot block time interval = %s]",
|
||||
Balancer.class.getSimpleName(), getClass().getSimpleName(), policy,
|
||||
threshold, maxIdleIteration, excludedNodes.size(),
|
||||
includedNodes.size(), sourceNodes.size(), blockpools.size(),
|
||||
includedNodes.size(), sourceNodes.size(), excludedSourceNodes.size(), targetNodes.size(),
|
||||
excludedTargetNodes.size(), blockpools.size(),
|
||||
runDuringUpgrade, sortTopNodes, limitOverUtilizedNum, hotBlockTimeInterval);
|
||||
}
|
||||
|
||||
@ -144,6 +177,9 @@ static class Builder {
|
||||
private Set<String> excludedNodes = Collections.<String> emptySet();
|
||||
private Set<String> includedNodes = Collections.<String> emptySet();
|
||||
private Set<String> sourceNodes = Collections.<String> emptySet();
|
||||
private Set<String> excludedSourceNodes = Collections.<String> emptySet();
|
||||
private Set<String> targetNodes = Collections.<String> emptySet();
|
||||
private Set<String> excludedTargetNodes = Collections.<String> emptySet();
|
||||
private Set<String> blockpools = Collections.<String> emptySet();
|
||||
private boolean runDuringUpgrade = false;
|
||||
private boolean runAsService = false;
|
||||
@ -189,6 +225,21 @@ Builder setSourceNodes(Set<String> nodes) {
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setExcludedSourceNodes(Set<String> nodes) {
|
||||
this.excludedSourceNodes = nodes;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setTargetNodes(Set<String> nodes) {
|
||||
this.targetNodes = nodes;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setExcludedTargetNodes(Set<String> nodes) {
|
||||
this.excludedTargetNodes = nodes;
|
||||
return this;
|
||||
}
|
||||
|
||||
Builder setBlockpools(Set<String> pools) {
|
||||
this.blockpools = pools;
|
||||
return this;
|
||||
|
@ -47,6 +47,7 @@
|
||||
import static org.junit.Assert.assertFalse;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.junit.Assert.assertThrows;
|
||||
import static org.mockito.Mockito.any;
|
||||
import static org.mockito.Mockito.anyLong;
|
||||
import static org.mockito.Mockito.doAnswer;
|
||||
@ -473,7 +474,8 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
||||
int expectedExcludedNodes) throws IOException, TimeoutException {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes, true);
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||
cluster, p, expectedExcludedNodes, true);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -495,6 +497,9 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
if (!p.getExcludedNodes().isEmpty()) {
|
||||
totalCapacity -= p.getExcludedNodes().size() * CAPACITY;
|
||||
}
|
||||
if (!p.getExcludedTargetNodes().isEmpty()) {
|
||||
totalCapacity -= p.getExcludedTargetNodes().size() * CAPACITY;
|
||||
}
|
||||
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
|
||||
boolean balanced;
|
||||
do {
|
||||
@ -539,6 +544,69 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
} while (!balanced);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until balanced: each datanode gives utilization within.
|
||||
* Used when testing for included / excluded target and source nodes.
|
||||
* BALANCE_ALLOWED_VARIANCE of average
|
||||
* @throws IOException
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster, BalancerParameters p,
|
||||
int expectedExcludedSourceNodes, int expectedExcludedTargetNodes)
|
||||
throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
: Time.monotonicNow() + timeout;
|
||||
if (!p.getExcludedTargetNodes().isEmpty()) {
|
||||
totalCapacity -= p.getExcludedTargetNodes().size() * CAPACITY;
|
||||
}
|
||||
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
|
||||
boolean balanced;
|
||||
do {
|
||||
DatanodeInfo[] datanodeReport =
|
||||
client.getDatanodeReport(DatanodeReportType.ALL);
|
||||
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
|
||||
balanced = true;
|
||||
int actualExcludedSourceNodeCount = 0;
|
||||
int actualExcludedTargetNodeCount = 0;
|
||||
for (DatanodeInfo datanode : datanodeReport) {
|
||||
double nodeUtilization =
|
||||
((double) datanode.getDfsUsed() + datanode.getNonDfsUsed()) /
|
||||
datanode.getCapacity();
|
||||
if(Dispatcher.Util.isExcluded(p.getExcludedTargetNodes(), datanode)) {
|
||||
actualExcludedTargetNodeCount++;
|
||||
}
|
||||
if(!Dispatcher.Util.isIncluded(p.getTargetNodes(), datanode)) {
|
||||
actualExcludedTargetNodeCount++;
|
||||
}
|
||||
if(Dispatcher.Util.isExcluded(p.getExcludedSourceNodes(), datanode)) {
|
||||
actualExcludedSourceNodeCount++;
|
||||
}
|
||||
if(!Dispatcher.Util.isIncluded(p.getSourceNodes(), datanode)) {
|
||||
actualExcludedSourceNodeCount++;
|
||||
}
|
||||
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
|
||||
balanced = false;
|
||||
if (Time.monotonicNow() > failtime) {
|
||||
throw new TimeoutException(
|
||||
"Rebalancing expected avg utilization to become "
|
||||
+ avgUtilization + ", but on datanode " + datanode
|
||||
+ " it remains at " + nodeUtilization
|
||||
+ " after more than " + TIMEOUT + " msec.");
|
||||
}
|
||||
try {
|
||||
Thread.sleep(100);
|
||||
} catch (InterruptedException ignored) {
|
||||
}
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(expectedExcludedSourceNodes, actualExcludedSourceNodeCount);
|
||||
assertEquals(expectedExcludedTargetNodes, actualExcludedTargetNodeCount);
|
||||
} while (!balanced);
|
||||
}
|
||||
|
||||
String long2String(long[] array) {
|
||||
if (array.length == 0) {
|
||||
return "<empty>";
|
||||
@ -636,6 +704,14 @@ int getNumberofExcludeNodes() {
|
||||
}
|
||||
}
|
||||
|
||||
private void doTest(Configuration conf,
|
||||
long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile, BalancerParameters p) throws Exception {
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
newCapacity, 0L, newRack, nodes,
|
||||
useTool, useFile, false, 0.3, p);
|
||||
}
|
||||
|
||||
private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long newCapacity, String newRack, boolean useTool) throws Exception {
|
||||
doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
|
||||
@ -645,7 +721,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile) throws Exception {
|
||||
doTest(conf, capacities, racks, newCapacity, 0L, newRack, nodes,
|
||||
useTool, useFile, false, 0.3);
|
||||
useTool, useFile, false, 0.3, null);
|
||||
}
|
||||
|
||||
/** This test start a cluster with specified number of nodes,
|
||||
@ -671,7 +747,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
private void doTest(Configuration conf, long[] capacities,
|
||||
String[] racks, long newCapacity, long newNonDfsUsed, String newRack,
|
||||
NewNodeInfo nodes, boolean useTool, boolean useFile,
|
||||
boolean useNamesystemSpy, double clusterUtilization) throws Exception {
|
||||
boolean useNamesystemSpy, double clusterUtilization, BalancerParameters p) throws Exception {
|
||||
LOG.info("capacities = " + long2String(capacities));
|
||||
LOG.info("racks = " + Arrays.asList(racks));
|
||||
LOG.info("newCapacity= " + newCapacity);
|
||||
@ -746,7 +822,7 @@ private void doTest(Configuration conf, long[] capacities,
|
||||
totalNodes-1-i).getDatanodeId().getXferAddr());
|
||||
}
|
||||
}
|
||||
//polulate the exclude nodes
|
||||
//populate the exclude nodes
|
||||
if (nodes.getNumberofExcludeNodes() > 0) {
|
||||
int totalNodes = cluster.getDataNodes().size();
|
||||
for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
|
||||
@ -756,16 +832,16 @@ private void doTest(Configuration conf, long[] capacities,
|
||||
}
|
||||
}
|
||||
}
|
||||
// run balancer and validate results
|
||||
BalancerParameters.Builder pBuilder =
|
||||
new BalancerParameters.Builder();
|
||||
if (nodes != null) {
|
||||
pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded());
|
||||
pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded());
|
||||
pBuilder.setRunDuringUpgrade(false);
|
||||
if(p == null) {
|
||||
// run balancer and validate results
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
if (nodes != null) {
|
||||
pBuilder.setExcludedNodes(nodes.getNodesToBeExcluded());
|
||||
pBuilder.setIncludedNodes(nodes.getNodesToBeIncluded());
|
||||
pBuilder.setRunDuringUpgrade(false);
|
||||
}
|
||||
p = pBuilder.build();
|
||||
}
|
||||
BalancerParameters p = pBuilder.build();
|
||||
|
||||
int expectedExcludedNodes = 0;
|
||||
if (nodes != null) {
|
||||
if (!nodes.getNodesToBeExcluded().isEmpty()) {
|
||||
@ -821,6 +897,9 @@ private void runBalancer(Configuration conf, long totalDfsUsedSpace,
|
||||
== 0) {
|
||||
assertEquals(ExitStatus.NO_MOVE_PROGRESS.getExitCode(), run);
|
||||
return;
|
||||
} else if(run == ExitStatus.NO_MOVE_BLOCK.getExitCode()) {
|
||||
LOG.error("Exit status returned: " + run);
|
||||
throw new Exception(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode()));
|
||||
} else {
|
||||
assertEquals(ExitStatus.SUCCESS.getExitCode(), run);
|
||||
}
|
||||
@ -828,8 +907,22 @@ private void runBalancer(Configuration conf, long totalDfsUsedSpace,
|
||||
LOG.info(" .");
|
||||
try {
|
||||
long totalUsedSpace = totalDfsUsedSpace + totalNonDfsUsedSpace;
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||
excludedNodes, checkExcludeNodesUtilization);
|
||||
int expectedExcludedSourceNodes = 0;
|
||||
int expectedExcludedTargetNodes = 0;
|
||||
if(!p.getExcludedSourceNodes().isEmpty()) {
|
||||
expectedExcludedSourceNodes = p.getExcludedSourceNodes().size();
|
||||
}
|
||||
if(!p.getExcludedTargetNodes().isEmpty()) {
|
||||
expectedExcludedTargetNodes = p.getExcludedTargetNodes().size();
|
||||
}
|
||||
if(expectedExcludedSourceNodes > 0 || expectedExcludedTargetNodes > 0) {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||
expectedExcludedSourceNodes, expectedExcludedTargetNodes);
|
||||
} else {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p,
|
||||
excludedNodes,
|
||||
checkExcludeNodesUtilization);
|
||||
}
|
||||
} catch (TimeoutException e) {
|
||||
// See HDFS-11682. NN may not get heartbeat to reflect the newest
|
||||
// block changes.
|
||||
@ -879,8 +972,9 @@ private static int runBalancer(Collection<URI> namenodes,
|
||||
b.resetData(conf);
|
||||
if (r.getExitStatus() == ExitStatus.IN_PROGRESS) {
|
||||
done = false;
|
||||
} else if (r.getExitStatus() != ExitStatus.SUCCESS) {
|
||||
//must be an error statue, return.
|
||||
} else if (r.getExitStatus() != ExitStatus.SUCCESS
|
||||
|| r.getExitStatus() != ExitStatus.NO_MOVE_BLOCK) {
|
||||
//must be an error status, return.
|
||||
return r.getExitStatus().getExitCode();
|
||||
} else {
|
||||
if (iteration > 0) {
|
||||
@ -1129,7 +1223,7 @@ public void testBalancer3() throws Exception {
|
||||
Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, 1000L, RACK2, null, false, false, false, 0.3);
|
||||
CAPACITY, 1000L, RACK2, null, false, false, false, 0.3, null);
|
||||
}
|
||||
|
||||
private void testBalancerDefaultConstructor(Configuration conf,
|
||||
@ -1245,8 +1339,49 @@ public void testBalancerCliParseWithWrongParams() {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail(reason + " for -source parameter");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
// expected
|
||||
|
||||
}
|
||||
|
||||
parameters = new String[] {"-excludeSource"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail(reason + " for -excludeSource parameter");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
|
||||
}
|
||||
|
||||
parameters = new String[] {"-source", "testnode1", "-excludeSource", "testnode2"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail("Exception is expected when both -source and -excludeSource are specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
}
|
||||
|
||||
parameters = new String[] {"-target"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail(reason + " for -target parameter");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
|
||||
}
|
||||
|
||||
parameters = new String[] {"-excludeTarget"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail(reason + " for -excludeTarget parameter");
|
||||
} catch (IllegalArgumentException ignored) {
|
||||
|
||||
}
|
||||
|
||||
parameters = new String[] {"-target", "testnode1", "-excludeTarget", "testnode2"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail("Exception expected when both -target and -excludeTarget are specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -1920,7 +2055,7 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
|
||||
// all get block calls, so if two iterations are performed, the duration
|
||||
// also includes the time it took to perform the block move ops in the
|
||||
// first iteration
|
||||
new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5);
|
||||
new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5, null);
|
||||
assertTrue("Number of getBlocks should be not less than " +
|
||||
getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps);
|
||||
long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get();
|
||||
@ -1932,6 +2067,132 @@ void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception {
|
||||
getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test balancer with excluded target nodes.
|
||||
* One of three added nodes is excluded in the target nodes list.
|
||||
* Balancer should only move blocks to the two included nodes.
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerExcludeTargetNodesNoMoveBlock() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> excludeTargetNodes = new HashSet<>();
|
||||
excludeTargetNodes.add("datanodeZ");
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setExcludedTargetNodes(excludeTargetNodes);
|
||||
BalancerParameters p = pBuilder.build();
|
||||
Exception exception = assertThrows(Exception.class, () -> {
|
||||
doTest(conf, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"},
|
||||
BalancerParameters.DEFAULT.getExcludedNodes(),
|
||||
BalancerParameters.DEFAULT.getIncludedNodes()),
|
||||
false, false, p);
|
||||
});
|
||||
|
||||
assertTrue(exception.getMessage()
|
||||
.contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test balancer with included target nodes.
|
||||
* Two of three added nodes are included in the target nodes list.
|
||||
* Balancer should only move blocks to the included nodes.
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerIncludeTargetNodesNoMoveBlock() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeTargetNodes = new HashSet<>();
|
||||
includeTargetNodes.add("datanodeY");
|
||||
includeTargetNodes.add("datanodeZ");
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setTargetNodes(includeTargetNodes);
|
||||
BalancerParameters p = pBuilder.build();
|
||||
Exception exception = assertThrows(Exception.class, () -> {
|
||||
doTest(conf, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"},
|
||||
BalancerParameters.DEFAULT.getExcludedNodes(),
|
||||
BalancerParameters.DEFAULT.getIncludedNodes()),
|
||||
false, false, p);
|
||||
});
|
||||
|
||||
assertTrue(exception.getMessage()
|
||||
.contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test balancer with included target nodes.
|
||||
* Three of three added nodes are included in the target nodes list.
|
||||
* Balancer should exit with success code.
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerIncludeTargetNodesSuccess() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeTargetNodes = new HashSet<>();
|
||||
includeTargetNodes.add("datanodeX");
|
||||
includeTargetNodes.add("datanodeY");
|
||||
includeTargetNodes.add("datanodeZ");
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setTargetNodes(includeTargetNodes);
|
||||
BalancerParameters p = pBuilder.build();
|
||||
doTest(conf, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"},
|
||||
BalancerParameters.DEFAULT.getExcludedNodes(),
|
||||
BalancerParameters.DEFAULT.getIncludedNodes()),
|
||||
false, false, p);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test balancer with included source nodes.
|
||||
* Since newly added nodes are the only included source nodes no balancing will occur.
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerIncludeSourceNodesNoMoveBlock() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeSourceNodes = new HashSet<>();
|
||||
includeSourceNodes.add("datanodeX");
|
||||
includeSourceNodes.add("datanodeY");
|
||||
includeSourceNodes.add("datanodeZ");
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setSourceNodes(includeSourceNodes);
|
||||
BalancerParameters p = pBuilder.build();
|
||||
Exception exception = assertThrows(Exception.class, () -> {
|
||||
doTest(conf, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"},
|
||||
BalancerParameters.DEFAULT.getExcludedNodes(),
|
||||
BalancerParameters.DEFAULT.getIncludedNodes()),
|
||||
false, false, p);
|
||||
});
|
||||
|
||||
assertTrue(exception.getMessage()
|
||||
.contains(String.valueOf(ExitStatus.NO_MOVE_BLOCK.getExitCode())));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test balancer with excluded source nodes.
|
||||
* Since newly added nodes will not be selected as a source,
|
||||
* all nodes will be included in balancing.
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerExcludeSourceNodes() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> excludeSourceNodes = new HashSet<>();
|
||||
excludeSourceNodes.add("datanodeX");
|
||||
excludeSourceNodes.add("datanodeY");
|
||||
BalancerParameters.Builder pBuilder = new BalancerParameters.Builder();
|
||||
pBuilder.setExcludedSourceNodes(excludeSourceNodes);
|
||||
BalancerParameters p = pBuilder.build();
|
||||
doTest(conf, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[]{"datanodeX", "datanodeY", "datanodeZ"},
|
||||
BalancerParameters.DEFAULT.getExcludedNodes(),
|
||||
BalancerParameters.DEFAULT.getIncludedNodes()), false,
|
||||
false, p);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* @param args
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user