HDFS-6441. Add ability to exclude/include specific datanodes while balancing. (Contributed by Benoy Antony and Yu Li)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1614812 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4996bf8257
commit
b8b8f3f5e7
@ -27,6 +27,7 @@
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Date;
|
||||
import java.util.HashSet;
|
||||
import java.util.Iterator;
|
||||
import java.util.LinkedHashSet;
|
||||
import java.util.List;
|
||||
@ -377,6 +378,19 @@ public static String[] getTrimmedStrings(String str){
|
||||
return str.trim().split("\\s*,\\s*");
|
||||
}
|
||||
|
||||
/**
|
||||
* Trims all the strings in a Collection<String> and returns a Set<String>.
|
||||
* @param strings
|
||||
* @return
|
||||
*/
|
||||
public static Set<String> getTrimmedStrings(Collection<String> strings) {
|
||||
Set<String> trimmedStrings = new HashSet<String>();
|
||||
for (String string: strings) {
|
||||
trimmedStrings.add(string.trim());
|
||||
}
|
||||
return trimmedStrings;
|
||||
}
|
||||
|
||||
final public static String[] emptyStringArray = {};
|
||||
final public static char COMMA = ',';
|
||||
final public static String COMMA_STR = ",";
|
||||
|
@ -335,6 +335,9 @@ Release 2.6.0 - UNRELEASED
|
||||
HDFS-6570. add api that enables checking if a user has certain permissions on
|
||||
a file. (Jitendra Pandey via cnauroth)
|
||||
|
||||
HDFS-6441. Add ability to exclude/include specific datanodes while
|
||||
balancing. (Benoy Antony and Yu Li via Arpit Agarwal)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HDFS-6690. Deduplicate xattr names in memory. (wang)
|
||||
|
@ -45,6 +45,7 @@
|
||||
import java.util.LinkedList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
@ -83,6 +84,7 @@
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.net.Node;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.HostsFileReader;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
@ -203,12 +205,20 @@ public class Balancer {
|
||||
+ "\n\t[-policy <policy>]\tthe balancing policy: "
|
||||
+ BalancingPolicy.Node.INSTANCE.getName() + " or "
|
||||
+ BalancingPolicy.Pool.INSTANCE.getName()
|
||||
+ "\n\t[-threshold <threshold>]\tPercentage of disk capacity";
|
||||
+ "\n\t[-threshold <threshold>]\tPercentage of disk capacity"
|
||||
+ "\n\t[-exclude [-f <hosts-file> | comma-sperated list of hosts]]"
|
||||
+ "\tExcludes the specified datanodes."
|
||||
+ "\n\t[-include [-f <hosts-file> | comma-sperated list of hosts]]"
|
||||
+ "\tIncludes only the specified datanodes.";
|
||||
|
||||
private final NameNodeConnector nnc;
|
||||
private final BalancingPolicy policy;
|
||||
private final SaslDataTransferClient saslClient;
|
||||
private final double threshold;
|
||||
// set of data nodes to be excluded from balancing operations.
|
||||
Set<String> nodesToBeExcluded;
|
||||
//Restrict balancing to the following nodes.
|
||||
Set<String> nodesToBeIncluded;
|
||||
|
||||
// all data node lists
|
||||
private final Collection<Source> overUtilizedDatanodes
|
||||
@ -869,6 +879,8 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
||||
Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) {
|
||||
this.threshold = p.threshold;
|
||||
this.policy = p.policy;
|
||||
this.nodesToBeExcluded = p.nodesToBeExcluded;
|
||||
this.nodesToBeIncluded = p.nodesToBeIncluded;
|
||||
this.nnc = theblockpool;
|
||||
cluster = NetworkTopology.getInstance(conf);
|
||||
|
||||
@ -905,8 +917,13 @@ private static void checkReplicationPolicyCompatibility(Configuration conf
|
||||
private long initNodes(DatanodeInfo[] datanodes) {
|
||||
// compute average utilization
|
||||
for (DatanodeInfo datanode : datanodes) {
|
||||
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
|
||||
continue; // ignore decommissioning or decommissioned nodes
|
||||
// ignore decommissioning or decommissioned nodes or
|
||||
// ignore nodes in exclude list
|
||||
// or nodes not in the include list (if include list is not empty)
|
||||
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
|
||||
Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
|
||||
!Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
|
||||
continue;
|
||||
}
|
||||
policy.accumulateSpaces(datanode);
|
||||
}
|
||||
@ -919,8 +936,16 @@ private long initNodes(DatanodeInfo[] datanodes) {
|
||||
*/
|
||||
long overLoadedBytes = 0L, underLoadedBytes = 0L;
|
||||
for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) {
|
||||
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) {
|
||||
continue; // ignore decommissioning or decommissioned nodes
|
||||
// ignore decommissioning or decommissioned nodes or
|
||||
// ignore nodes in exclude list
|
||||
// or nodes not in the include list (if include list is not empty)
|
||||
if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() ||
|
||||
Util.shouldBeExcluded(nodesToBeExcluded, datanode) ||
|
||||
!Util.shouldBeIncluded(nodesToBeIncluded, datanode)) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace("Excluding datanode " + datanode);
|
||||
}
|
||||
continue;
|
||||
}
|
||||
cluster.add(datanode);
|
||||
BalancerDatanode datanodeS;
|
||||
@ -1526,21 +1551,101 @@ private static String time2Str(long elapsedTime) {
|
||||
}
|
||||
|
||||
static class Parameters {
|
||||
static final Parameters DEFALUT = new Parameters(
|
||||
BalancingPolicy.Node.INSTANCE, 10.0);
|
||||
static final Parameters DEFAULT = new Parameters(
|
||||
BalancingPolicy.Node.INSTANCE, 10.0,
|
||||
Collections.<String> emptySet(), Collections.<String> emptySet());
|
||||
|
||||
final BalancingPolicy policy;
|
||||
final double threshold;
|
||||
// exclude the nodes in this set from balancing operations
|
||||
Set<String> nodesToBeExcluded;
|
||||
//include only these nodes in balancing operations
|
||||
Set<String> nodesToBeIncluded;
|
||||
|
||||
Parameters(BalancingPolicy policy, double threshold) {
|
||||
Parameters(BalancingPolicy policy, double threshold,
|
||||
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
|
||||
this.policy = policy;
|
||||
this.threshold = threshold;
|
||||
this.nodesToBeExcluded = nodesToBeExcluded;
|
||||
this.nodesToBeIncluded = nodesToBeIncluded;
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return Balancer.class.getSimpleName() + "." + getClass().getSimpleName()
|
||||
+ "[" + policy + ", threshold=" + threshold + "]";
|
||||
+ "[" + policy + ", threshold=" + threshold +
|
||||
", number of nodes to be excluded = "+ nodesToBeExcluded.size() +
|
||||
", number of nodes to be included = "+ nodesToBeIncluded.size() +"]";
|
||||
}
|
||||
}
|
||||
|
||||
static class Util {
|
||||
|
||||
/**
|
||||
* @param datanode
|
||||
* @return returns true if data node is part of the excludedNodes.
|
||||
*/
|
||||
static boolean shouldBeExcluded(Set<String> excludedNodes, DatanodeInfo datanode) {
|
||||
return isIn(excludedNodes, datanode);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param datanode
|
||||
* @return returns true if includedNodes is empty or data node is part of the includedNodes.
|
||||
*/
|
||||
static boolean shouldBeIncluded(Set<String> includedNodes, DatanodeInfo datanode) {
|
||||
return (includedNodes.isEmpty() ||
|
||||
isIn(includedNodes, datanode));
|
||||
}
|
||||
/**
|
||||
* Match is checked using host name , ip address with and without port number.
|
||||
* @param datanodeSet
|
||||
* @param datanode
|
||||
* @return true if the datanode's transfer address matches the set of nodes.
|
||||
*/
|
||||
private static boolean isIn(Set<String> datanodeSet, DatanodeInfo datanode) {
|
||||
return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) ||
|
||||
isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) ||
|
||||
isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort());
|
||||
}
|
||||
|
||||
/**
|
||||
* returns true if nodes contains host or host:port
|
||||
* @param nodes
|
||||
* @param host
|
||||
* @param port
|
||||
* @return
|
||||
*/
|
||||
private static boolean isIn(Set<String> nodes, String host, int port) {
|
||||
if (host == null) {
|
||||
return false;
|
||||
}
|
||||
return (nodes.contains(host) || nodes.contains(host +":"+ port));
|
||||
}
|
||||
|
||||
/**
|
||||
* parse a comma separated string to obtain set of host names
|
||||
* @param string
|
||||
* @return
|
||||
*/
|
||||
static Set<String> parseHostList(String string) {
|
||||
String[] addrs = StringUtils.getTrimmedStrings(string);
|
||||
return new HashSet<String>(Arrays.asList(addrs));
|
||||
}
|
||||
|
||||
/**
|
||||
* read set of host names from a file
|
||||
* @param fileName
|
||||
* @return
|
||||
*/
|
||||
static Set<String> getHostListFromFile(String fileName) {
|
||||
Set<String> nodes = new HashSet <String> ();
|
||||
try {
|
||||
HostsFileReader.readFileToSet("nodes", fileName, nodes);
|
||||
return StringUtils.getTrimmedStrings(nodes);
|
||||
} catch (IOException e) {
|
||||
throw new IllegalArgumentException("Unable to open file: " + fileName);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@ -1578,8 +1683,10 @@ public int run(String[] args) {
|
||||
|
||||
/** parse command line arguments */
|
||||
static Parameters parse(String[] args) {
|
||||
BalancingPolicy policy = Parameters.DEFALUT.policy;
|
||||
double threshold = Parameters.DEFALUT.threshold;
|
||||
BalancingPolicy policy = Parameters.DEFAULT.policy;
|
||||
double threshold = Parameters.DEFAULT.threshold;
|
||||
Set<String> nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded;
|
||||
Set<String> nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded;
|
||||
|
||||
if (args != null) {
|
||||
try {
|
||||
@ -1608,18 +1715,38 @@ static Parameters parse(String[] args) {
|
||||
System.err.println("Illegal policy name: " + args[i]);
|
||||
throw e;
|
||||
}
|
||||
} else if ("-exclude".equalsIgnoreCase(args[i])) {
|
||||
i++;
|
||||
if ("-f".equalsIgnoreCase(args[i])) {
|
||||
nodesTobeExcluded = Util.getHostListFromFile(args[++i]);
|
||||
} else {
|
||||
nodesTobeExcluded = Util.parseHostList(args[i]);
|
||||
}
|
||||
} else if ("-include".equalsIgnoreCase(args[i])) {
|
||||
i++;
|
||||
if ("-f".equalsIgnoreCase(args[i])) {
|
||||
nodesTobeIncluded = Util.getHostListFromFile(args[++i]);
|
||||
} else {
|
||||
nodesTobeIncluded = Util.parseHostList(args[i]);
|
||||
}
|
||||
} else {
|
||||
throw new IllegalArgumentException("args = "
|
||||
+ Arrays.toString(args));
|
||||
}
|
||||
}
|
||||
if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) {
|
||||
System.err.println(
|
||||
"-exclude and -include options cannot be specified together.");
|
||||
throw new IllegalArgumentException(
|
||||
"-exclude and -include options cannot be specified together.");
|
||||
}
|
||||
} catch(RuntimeException e) {
|
||||
printUsage(System.err);
|
||||
throw e;
|
||||
}
|
||||
}
|
||||
|
||||
return new Parameters(policy, threshold);
|
||||
return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded);
|
||||
}
|
||||
|
||||
private static void printUsage(PrintStream out) {
|
||||
|
@ -18,17 +18,23 @@
|
||||
package org.apache.hadoop.hdfs.server.balancer;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.io.PrintWriter;
|
||||
import java.net.URI;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Random;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
|
||||
import org.apache.commons.lang.StringUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
@ -48,6 +54,8 @@
|
||||
import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType;
|
||||
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli;
|
||||
import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters;
|
||||
import org.apache.hadoop.hdfs.server.datanode.DataNode;
|
||||
import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
@ -255,6 +263,18 @@ static void waitForHeartBeat(long expectedUsedSpace,
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until balanced: each datanode gives utilization within
|
||||
* BALANCE_ALLOWED_VARIANCE of average
|
||||
* @throws IOException
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p)
|
||||
throws IOException, TimeoutException {
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait until balanced: each datanode gives utilization within
|
||||
@ -263,11 +283,17 @@ static void waitForHeartBeat(long expectedUsedSpace,
|
||||
* @throws TimeoutException
|
||||
*/
|
||||
static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
ClientProtocol client, MiniDFSCluster cluster)
|
||||
throws IOException, TimeoutException {
|
||||
ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p,
|
||||
int expectedExcludedNodes) throws IOException, TimeoutException {
|
||||
long timeout = TIMEOUT;
|
||||
long failtime = (timeout <= 0L) ? Long.MAX_VALUE
|
||||
: Time.now() + timeout;
|
||||
if (!p.nodesToBeIncluded.isEmpty()) {
|
||||
totalCapacity = p.nodesToBeIncluded.size() * CAPACITY;
|
||||
}
|
||||
if (!p.nodesToBeExcluded.isEmpty()) {
|
||||
totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY;
|
||||
}
|
||||
final double avgUtilization = ((double)totalUsedSpace) / totalCapacity;
|
||||
boolean balanced;
|
||||
do {
|
||||
@ -275,9 +301,20 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
client.getDatanodeReport(DatanodeReportType.ALL);
|
||||
assertEquals(datanodeReport.length, cluster.getDataNodes().size());
|
||||
balanced = true;
|
||||
int actualExcludedNodeCount = 0;
|
||||
for (DatanodeInfo datanode : datanodeReport) {
|
||||
double nodeUtilization = ((double)datanode.getDfsUsed())
|
||||
/ datanode.getCapacity();
|
||||
if (Balancer.Util.shouldBeExcluded(p.nodesToBeExcluded, datanode)) {
|
||||
assertTrue(nodeUtilization == 0);
|
||||
actualExcludedNodeCount++;
|
||||
continue;
|
||||
}
|
||||
if (!Balancer.Util.shouldBeIncluded(p.nodesToBeIncluded, datanode)) {
|
||||
assertTrue(nodeUtilization == 0);
|
||||
actualExcludedNodeCount++;
|
||||
continue;
|
||||
}
|
||||
if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) {
|
||||
balanced = false;
|
||||
if (Time.now() > failtime) {
|
||||
@ -294,6 +331,7 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity,
|
||||
break;
|
||||
}
|
||||
}
|
||||
assertEquals(expectedExcludedNodes,actualExcludedNodeCount);
|
||||
} while (!balanced);
|
||||
}
|
||||
|
||||
@ -307,22 +345,118 @@ String long2String(long[] array) {
|
||||
}
|
||||
return b.append("]").toString();
|
||||
}
|
||||
/** This test start a cluster with specified number of nodes,
|
||||
/**
|
||||
* Class which contains information about the
|
||||
* new nodes to be added to the cluster for balancing.
|
||||
*/
|
||||
static abstract class NewNodeInfo {
|
||||
|
||||
Set<String> nodesToBeExcluded = new HashSet<String>();
|
||||
Set<String> nodesToBeIncluded = new HashSet<String>();
|
||||
|
||||
abstract String[] getNames();
|
||||
abstract int getNumberofNewNodes();
|
||||
abstract int getNumberofIncludeNodes();
|
||||
abstract int getNumberofExcludeNodes();
|
||||
|
||||
public Set<String> getNodesToBeIncluded() {
|
||||
return nodesToBeIncluded;
|
||||
}
|
||||
public Set<String> getNodesToBeExcluded() {
|
||||
return nodesToBeExcluded;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The host names of new nodes are specified
|
||||
*/
|
||||
static class HostNameBasedNodes extends NewNodeInfo {
|
||||
String[] hostnames;
|
||||
|
||||
public HostNameBasedNodes(String[] hostnames,
|
||||
Set<String> nodesToBeExcluded, Set<String> nodesToBeIncluded) {
|
||||
this.hostnames = hostnames;
|
||||
this.nodesToBeExcluded = nodesToBeExcluded;
|
||||
this.nodesToBeIncluded = nodesToBeIncluded;
|
||||
}
|
||||
|
||||
@Override
|
||||
String[] getNames() {
|
||||
return hostnames;
|
||||
}
|
||||
@Override
|
||||
int getNumberofNewNodes() {
|
||||
return hostnames.length;
|
||||
}
|
||||
@Override
|
||||
int getNumberofIncludeNodes() {
|
||||
return nodesToBeIncluded.size();
|
||||
}
|
||||
@Override
|
||||
int getNumberofExcludeNodes() {
|
||||
return nodesToBeExcluded.size();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The number of data nodes to be started are specified.
|
||||
* The data nodes will have same host name, but different port numbers.
|
||||
*
|
||||
*/
|
||||
static class PortNumberBasedNodes extends NewNodeInfo {
|
||||
int newNodes;
|
||||
int excludeNodes;
|
||||
int includeNodes;
|
||||
|
||||
public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) {
|
||||
this.newNodes = newNodes;
|
||||
this.excludeNodes = excludeNodes;
|
||||
this.includeNodes = includeNodes;
|
||||
}
|
||||
|
||||
@Override
|
||||
String[] getNames() {
|
||||
return null;
|
||||
}
|
||||
@Override
|
||||
int getNumberofNewNodes() {
|
||||
return newNodes;
|
||||
}
|
||||
@Override
|
||||
int getNumberofIncludeNodes() {
|
||||
return includeNodes;
|
||||
}
|
||||
@Override
|
||||
int getNumberofExcludeNodes() {
|
||||
return excludeNodes;
|
||||
}
|
||||
}
|
||||
|
||||
private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long newCapacity, String newRack, boolean useTool) throws Exception {
|
||||
doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false);
|
||||
}
|
||||
|
||||
/** This test start a cluster with specified number of nodes,
|
||||
* and fills it to be 30% full (with a single file replicated identically
|
||||
* to all datanodes);
|
||||
* It then adds one new empty node and starts balancing.
|
||||
*
|
||||
*
|
||||
* @param conf - configuration
|
||||
* @param capacities - array of capacities of original nodes in cluster
|
||||
* @param racks - array of racks for original nodes in cluster
|
||||
* @param newCapacity - new node's capacity
|
||||
* @param newRack - new node's rack
|
||||
* @param nodes - information about new nodes to be started.
|
||||
* @param useTool - if true run test via Cli with command-line argument
|
||||
* parsing, etc. Otherwise invoke balancer API directly.
|
||||
* @param useFile - if true, the hosts to included or excluded will be stored in a
|
||||
* file and then later read from the file.
|
||||
* @throws Exception
|
||||
*/
|
||||
private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long newCapacity, String newRack, boolean useTool) throws Exception {
|
||||
private void doTest(Configuration conf, long[] capacities,
|
||||
String[] racks, long newCapacity, String newRack, NewNodeInfo nodes,
|
||||
boolean useTool, boolean useFile) throws Exception {
|
||||
LOG.info("capacities = " + long2String(capacities));
|
||||
LOG.info("racks = " + Arrays.asList(racks));
|
||||
LOG.info("newCapacity= " + newCapacity);
|
||||
@ -346,17 +480,75 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
long totalUsedSpace = totalCapacity*3/10;
|
||||
createFile(cluster, filePath, totalUsedSpace / numOfDatanodes,
|
||||
(short) numOfDatanodes, 0);
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null,
|
||||
new String[]{newRack}, new long[]{newCapacity});
|
||||
|
||||
totalCapacity += newCapacity;
|
||||
if (nodes == null) { // there is no specification of new nodes.
|
||||
// start up an empty node with the same capacity and on the same rack
|
||||
cluster.startDataNodes(conf, 1, true, null,
|
||||
new String[]{newRack}, null,new long[]{newCapacity});
|
||||
totalCapacity += newCapacity;
|
||||
} else {
|
||||
//if running a test with "include list", include original nodes as well
|
||||
if (nodes.getNumberofIncludeNodes()>0) {
|
||||
for (DataNode dn: cluster.getDataNodes())
|
||||
nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName());
|
||||
}
|
||||
String[] newRacks = new String[nodes.getNumberofNewNodes()];
|
||||
long[] newCapacities = new long[nodes.getNumberofNewNodes()];
|
||||
for (int i=0; i < nodes.getNumberofNewNodes(); i++) {
|
||||
newRacks[i] = newRack;
|
||||
newCapacities[i] = newCapacity;
|
||||
}
|
||||
// if host names are specified for the new nodes to be created.
|
||||
if (nodes.getNames() != null) {
|
||||
cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
|
||||
newRacks, nodes.getNames(), newCapacities);
|
||||
totalCapacity += newCapacity*nodes.getNumberofNewNodes();
|
||||
} else { // host names are not specified
|
||||
cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null,
|
||||
newRacks, null, newCapacities);
|
||||
totalCapacity += newCapacity*nodes.getNumberofNewNodes();
|
||||
//populate the include nodes
|
||||
if (nodes.getNumberofIncludeNodes() > 0) {
|
||||
int totalNodes = cluster.getDataNodes().size();
|
||||
for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) {
|
||||
nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get(
|
||||
totalNodes-1-i).getDatanodeId().getXferAddr());
|
||||
}
|
||||
}
|
||||
//polulate the exclude nodes
|
||||
if (nodes.getNumberofExcludeNodes() > 0) {
|
||||
int totalNodes = cluster.getDataNodes().size();
|
||||
for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) {
|
||||
nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get(
|
||||
totalNodes-1-i).getDatanodeId().getXferAddr());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
// run balancer and validate results
|
||||
Balancer.Parameters p = Balancer.Parameters.DEFAULT;
|
||||
if (nodes != null) {
|
||||
p = new Balancer.Parameters(
|
||||
Balancer.Parameters.DEFAULT.policy,
|
||||
Balancer.Parameters.DEFAULT.threshold,
|
||||
nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded());
|
||||
}
|
||||
|
||||
int expectedExcludedNodes = 0;
|
||||
if (nodes != null) {
|
||||
if (!nodes.getNodesToBeExcluded().isEmpty()) {
|
||||
expectedExcludedNodes = nodes.getNodesToBeExcluded().size();
|
||||
} else if (!nodes.getNodesToBeIncluded().isEmpty()) {
|
||||
expectedExcludedNodes =
|
||||
cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size();
|
||||
}
|
||||
}
|
||||
|
||||
// run balancer and validate results
|
||||
if (useTool) {
|
||||
runBalancerCli(conf, totalUsedSpace, totalCapacity);
|
||||
runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes);
|
||||
} else {
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity);
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes);
|
||||
}
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
@ -365,11 +557,17 @@ private void doTest(Configuration conf, long[] capacities, String[] racks,
|
||||
|
||||
private void runBalancer(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity) throws Exception {
|
||||
runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0);
|
||||
}
|
||||
|
||||
private void runBalancer(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity, Balancer.Parameters p,
|
||||
int excludedNodes) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
// start rebalancing
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
final int r = Balancer.run(namenodes, p, conf);
|
||||
if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) {
|
||||
assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r);
|
||||
@ -379,22 +577,66 @@ private void runBalancer(Configuration conf,
|
||||
}
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
LOG.info("Rebalancing with default ctor.");
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes);
|
||||
}
|
||||
|
||||
private void runBalancerCli(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
|
||||
final String[] args = { "-policy", "datanode" };
|
||||
private void runBalancerCli(Configuration conf,
|
||||
long totalUsedSpace, long totalCapacity,
|
||||
Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception {
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
List <String> args = new ArrayList<String>();
|
||||
args.add("-policy");
|
||||
args.add("datanode");
|
||||
|
||||
File excludeHostsFile = null;
|
||||
if (!p.nodesToBeExcluded.isEmpty()) {
|
||||
args.add("-exclude");
|
||||
if (useFile) {
|
||||
excludeHostsFile = new File ("exclude-hosts-file");
|
||||
PrintWriter pw = new PrintWriter(excludeHostsFile);
|
||||
for (String host: p.nodesToBeExcluded) {
|
||||
pw.write( host + "\n");
|
||||
}
|
||||
pw.close();
|
||||
args.add("-f");
|
||||
args.add("exclude-hosts-file");
|
||||
} else {
|
||||
args.add(StringUtils.join(p.nodesToBeExcluded, ','));
|
||||
}
|
||||
}
|
||||
|
||||
File includeHostsFile = null;
|
||||
if (!p.nodesToBeIncluded.isEmpty()) {
|
||||
args.add("-include");
|
||||
if (useFile) {
|
||||
includeHostsFile = new File ("include-hosts-file");
|
||||
PrintWriter pw = new PrintWriter(includeHostsFile);
|
||||
for (String host: p.nodesToBeIncluded){
|
||||
pw.write( host + "\n");
|
||||
}
|
||||
pw.close();
|
||||
args.add("-f");
|
||||
args.add("include-hosts-file");
|
||||
} else {
|
||||
args.add(StringUtils.join(p.nodesToBeIncluded, ','));
|
||||
}
|
||||
}
|
||||
|
||||
final Tool tool = new Cli();
|
||||
tool.setConf(conf);
|
||||
final int r = tool.run(args); // start rebalancing
|
||||
final int r = tool.run(args.toArray(new String[0])); // start rebalancing
|
||||
|
||||
assertEquals("Tools should exit 0 on success", 0, r);
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster);
|
||||
LOG.info("Rebalancing with default ctor.");
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster);
|
||||
waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes);
|
||||
|
||||
if (excludeHostsFile != null && excludeHostsFile.exists()) {
|
||||
excludeHostsFile.delete();
|
||||
}
|
||||
if (includeHostsFile != null && includeHostsFile.exists()) {
|
||||
includeHostsFile.delete();
|
||||
}
|
||||
}
|
||||
|
||||
/** one-node cluster test*/
|
||||
@ -440,7 +682,7 @@ public void testBalancerCliParseWithThresholdOutOfBoundaries() {
|
||||
}
|
||||
}
|
||||
|
||||
/** Test a cluster with even distribution,
|
||||
/** Test a cluster with even distribution,
|
||||
* then a new empty node is added to the cluster*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancer0() throws Exception {
|
||||
@ -554,7 +796,13 @@ public void testBalancerCliParseWithWrongParams() {
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
}
|
||||
parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"};
|
||||
try {
|
||||
Balancer.Cli.parse(parameters);
|
||||
fail("IllegalArgumentException is expected when both -exclude and -include are specified");
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
@ -569,6 +817,183 @@ public void testExitZeroOnSuccess() throws Exception {
|
||||
oneNodeTest(conf, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerWithExcludeList() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> excludeHosts = new HashSet<String>();
|
||||
excludeHosts.add( "datanodeY");
|
||||
excludeHosts.add( "datanodeZ");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||
excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerWithExcludeListWithPorts() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithExcludeList() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> excludeHosts = new HashSet<String>();
|
||||
excludeHosts.add( "datanodeY");
|
||||
excludeHosts.add( "datanodeZ");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts,
|
||||
Parameters.DEFAULT.nodesToBeIncluded), true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithExcludeListWithPorts() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list in a file
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithExcludeListInAFile() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> excludeHosts = new HashSet<String>();
|
||||
excludeHosts.add( "datanodeY");
|
||||
excludeHosts.add( "datanodeZ");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||
excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,G
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the exclude list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerWithIncludeList() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeHosts = new HashSet<String>();
|
||||
includeHosts.add( "datanodeY");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerWithIncludeListWithPorts() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithIncludeList() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeHosts = new HashSet<String>();
|
||||
includeHosts.add( "datanodeY");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithIncludeListWithPorts() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithIncludeListInAFile() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
Set<String> includeHosts = new HashSet<String>();
|
||||
includeHosts.add( "datanodeY");
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2,
|
||||
new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"},
|
||||
Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test a cluster with even distribution,
|
||||
* then three nodes are added to the cluster,
|
||||
* runs balancer with two of the nodes in the include list
|
||||
*/
|
||||
@Test(timeout=100000)
|
||||
public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
|
||||
final Configuration conf = new HdfsConfiguration();
|
||||
initConf(conf);
|
||||
doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1},
|
||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
||||
}
|
||||
|
||||
/**
|
||||
* @param args
|
||||
*/
|
||||
|
@ -97,10 +97,10 @@ public void testBalancerWithHANameNodes() throws Exception {
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
assertEquals(1, namenodes.size());
|
||||
assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster)));
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||
TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client,
|
||||
cluster);
|
||||
cluster, Balancer.Parameters.DEFAULT);
|
||||
} finally {
|
||||
cluster.shutdown();
|
||||
}
|
||||
|
@ -159,7 +159,7 @@ static void runBalancer(Suite s,
|
||||
|
||||
// start rebalancing
|
||||
final Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(s.conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf);
|
||||
Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||
|
||||
LOG.info("BALANCER 2");
|
||||
@ -195,7 +195,7 @@ static void runBalancer(Suite s,
|
||||
balanced = true;
|
||||
for(int d = 0; d < used.length; d++) {
|
||||
final double p = used[d]*100.0/cap[d];
|
||||
balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold;
|
||||
balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold;
|
||||
if (!balanced) {
|
||||
if (i % 100 == 0) {
|
||||
LOG.warn("datanodes " + d + " is not yet balanced: "
|
||||
|
@ -175,7 +175,7 @@ private void runBalancer(Configuration conf,
|
||||
|
||||
// start rebalancing
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||
assertEquals(Balancer.ReturnStatus.SUCCESS.code, r);
|
||||
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
@ -189,7 +189,7 @@ private void runBalancerCanFinish(Configuration conf,
|
||||
|
||||
// start rebalancing
|
||||
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf);
|
||||
final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf);
|
||||
Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code ||
|
||||
(r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code));
|
||||
waitForHeartBeat(totalUsedSpace, totalCapacity);
|
||||
|
Loading…
Reference in New Issue
Block a user