diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index e7f983ac66..4e2783df88 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -27,6 +27,7 @@ import java.util.Arrays; import java.util.Collection; import java.util.Date; +import java.util.HashSet; import java.util.Iterator; import java.util.LinkedHashSet; import java.util.List; @@ -377,6 +378,19 @@ public static String[] getTrimmedStrings(String str){ return str.trim().split("\\s*,\\s*"); } + /** + * Trims all the strings in a Collection and returns a Set. + * @param strings + * @return + */ + public static Set getTrimmedStrings(Collection strings) { + Set trimmedStrings = new HashSet(); + for (String string: strings) { + trimmedStrings.add(string.trim()); + } + return trimmedStrings; + } + final public static String[] emptyStringArray = {}; final public static char COMMA = ','; final public static String COMMA_STR = ","; diff --git a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt index 61e48af581..880e209f97 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt +++ b/hadoop-hdfs-project/hadoop-hdfs/CHANGES.txt @@ -335,6 +335,9 @@ Release 2.6.0 - UNRELEASED HDFS-6570. add api that enables checking if a user has certain permissions on a file. (Jitendra Pandey via cnauroth) + HDFS-6441. Add ability to exclude/include specific datanodes while + balancing. (Benoy Antony and Yu Li via Arpit Agarwal) + OPTIMIZATIONS HDFS-6690. Deduplicate xattr names in memory. (wang) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 5dbdd643cd..1ddb3a4199 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -45,6 +45,7 @@ import java.util.LinkedList; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; @@ -83,6 +84,7 @@ import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.net.Node; import org.apache.hadoop.security.token.Token; +import org.apache.hadoop.util.HostsFileReader; import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -203,12 +205,20 @@ public class Balancer { + "\n\t[-policy ]\tthe balancing policy: " + BalancingPolicy.Node.INSTANCE.getName() + " or " + BalancingPolicy.Pool.INSTANCE.getName() - + "\n\t[-threshold ]\tPercentage of disk capacity"; + + "\n\t[-threshold ]\tPercentage of disk capacity" + + "\n\t[-exclude [-f | comma-sperated list of hosts]]" + + "\tExcludes the specified datanodes." + + "\n\t[-include [-f | comma-sperated list of hosts]]" + + "\tIncludes only the specified datanodes."; private final NameNodeConnector nnc; private final BalancingPolicy policy; private final SaslDataTransferClient saslClient; private final double threshold; + // set of data nodes to be excluded from balancing operations. + Set nodesToBeExcluded; + //Restrict balancing to the following nodes. + Set nodesToBeIncluded; // all data node lists private final Collection overUtilizedDatanodes @@ -869,6 +879,8 @@ private static void checkReplicationPolicyCompatibility(Configuration conf Balancer(NameNodeConnector theblockpool, Parameters p, Configuration conf) { this.threshold = p.threshold; this.policy = p.policy; + this.nodesToBeExcluded = p.nodesToBeExcluded; + this.nodesToBeIncluded = p.nodesToBeIncluded; this.nnc = theblockpool; cluster = NetworkTopology.getInstance(conf); @@ -905,8 +917,13 @@ private static void checkReplicationPolicyCompatibility(Configuration conf private long initNodes(DatanodeInfo[] datanodes) { // compute average utilization for (DatanodeInfo datanode : datanodes) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { - continue; // ignore decommissioning or decommissioned nodes + // ignore decommissioning or decommissioned nodes or + // ignore nodes in exclude list + // or nodes not in the include list (if include list is not empty) + if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() || + Util.shouldBeExcluded(nodesToBeExcluded, datanode) || + !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) { + continue; } policy.accumulateSpaces(datanode); } @@ -919,8 +936,16 @@ private long initNodes(DatanodeInfo[] datanodes) { */ long overLoadedBytes = 0L, underLoadedBytes = 0L; for (DatanodeInfo datanode : DFSUtil.shuffle(datanodes)) { - if (datanode.isDecommissioned() || datanode.isDecommissionInProgress()) { - continue; // ignore decommissioning or decommissioned nodes + // ignore decommissioning or decommissioned nodes or + // ignore nodes in exclude list + // or nodes not in the include list (if include list is not empty) + if (datanode.isDecommissioned() || datanode.isDecommissionInProgress() || + Util.shouldBeExcluded(nodesToBeExcluded, datanode) || + !Util.shouldBeIncluded(nodesToBeIncluded, datanode)) { + if (LOG.isTraceEnabled()) { + LOG.trace("Excluding datanode " + datanode); + } + continue; } cluster.add(datanode); BalancerDatanode datanodeS; @@ -1526,21 +1551,101 @@ private static String time2Str(long elapsedTime) { } static class Parameters { - static final Parameters DEFALUT = new Parameters( - BalancingPolicy.Node.INSTANCE, 10.0); + static final Parameters DEFAULT = new Parameters( + BalancingPolicy.Node.INSTANCE, 10.0, + Collections. emptySet(), Collections. emptySet()); final BalancingPolicy policy; final double threshold; + // exclude the nodes in this set from balancing operations + Set nodesToBeExcluded; + //include only these nodes in balancing operations + Set nodesToBeIncluded; - Parameters(BalancingPolicy policy, double threshold) { + Parameters(BalancingPolicy policy, double threshold, + Set nodesToBeExcluded, Set nodesToBeIncluded) { this.policy = policy; this.threshold = threshold; + this.nodesToBeExcluded = nodesToBeExcluded; + this.nodesToBeIncluded = nodesToBeIncluded; } @Override public String toString() { return Balancer.class.getSimpleName() + "." + getClass().getSimpleName() - + "[" + policy + ", threshold=" + threshold + "]"; + + "[" + policy + ", threshold=" + threshold + + ", number of nodes to be excluded = "+ nodesToBeExcluded.size() + + ", number of nodes to be included = "+ nodesToBeIncluded.size() +"]"; + } + } + + static class Util { + + /** + * @param datanode + * @return returns true if data node is part of the excludedNodes. + */ + static boolean shouldBeExcluded(Set excludedNodes, DatanodeInfo datanode) { + return isIn(excludedNodes, datanode); + } + + /** + * @param datanode + * @return returns true if includedNodes is empty or data node is part of the includedNodes. + */ + static boolean shouldBeIncluded(Set includedNodes, DatanodeInfo datanode) { + return (includedNodes.isEmpty() || + isIn(includedNodes, datanode)); + } + /** + * Match is checked using host name , ip address with and without port number. + * @param datanodeSet + * @param datanode + * @return true if the datanode's transfer address matches the set of nodes. + */ + private static boolean isIn(Set datanodeSet, DatanodeInfo datanode) { + return isIn(datanodeSet, datanode.getPeerHostName(), datanode.getXferPort()) || + isIn(datanodeSet, datanode.getIpAddr(), datanode.getXferPort()) || + isIn(datanodeSet, datanode.getHostName(), datanode.getXferPort()); + } + + /** + * returns true if nodes contains host or host:port + * @param nodes + * @param host + * @param port + * @return + */ + private static boolean isIn(Set nodes, String host, int port) { + if (host == null) { + return false; + } + return (nodes.contains(host) || nodes.contains(host +":"+ port)); + } + + /** + * parse a comma separated string to obtain set of host names + * @param string + * @return + */ + static Set parseHostList(String string) { + String[] addrs = StringUtils.getTrimmedStrings(string); + return new HashSet(Arrays.asList(addrs)); + } + + /** + * read set of host names from a file + * @param fileName + * @return + */ + static Set getHostListFromFile(String fileName) { + Set nodes = new HashSet (); + try { + HostsFileReader.readFileToSet("nodes", fileName, nodes); + return StringUtils.getTrimmedStrings(nodes); + } catch (IOException e) { + throw new IllegalArgumentException("Unable to open file: " + fileName); + } } } @@ -1578,8 +1683,10 @@ public int run(String[] args) { /** parse command line arguments */ static Parameters parse(String[] args) { - BalancingPolicy policy = Parameters.DEFALUT.policy; - double threshold = Parameters.DEFALUT.threshold; + BalancingPolicy policy = Parameters.DEFAULT.policy; + double threshold = Parameters.DEFAULT.threshold; + Set nodesTobeExcluded = Parameters.DEFAULT.nodesToBeExcluded; + Set nodesTobeIncluded = Parameters.DEFAULT.nodesToBeIncluded; if (args != null) { try { @@ -1608,18 +1715,38 @@ static Parameters parse(String[] args) { System.err.println("Illegal policy name: " + args[i]); throw e; } + } else if ("-exclude".equalsIgnoreCase(args[i])) { + i++; + if ("-f".equalsIgnoreCase(args[i])) { + nodesTobeExcluded = Util.getHostListFromFile(args[++i]); + } else { + nodesTobeExcluded = Util.parseHostList(args[i]); + } + } else if ("-include".equalsIgnoreCase(args[i])) { + i++; + if ("-f".equalsIgnoreCase(args[i])) { + nodesTobeIncluded = Util.getHostListFromFile(args[++i]); + } else { + nodesTobeIncluded = Util.parseHostList(args[i]); + } } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); } } + if (!nodesTobeExcluded.isEmpty() && !nodesTobeIncluded.isEmpty()) { + System.err.println( + "-exclude and -include options cannot be specified together."); + throw new IllegalArgumentException( + "-exclude and -include options cannot be specified together."); + } } catch(RuntimeException e) { printUsage(System.err); throw e; } } - return new Parameters(policy, threshold); + return new Parameters(policy, threshold, nodesTobeExcluded, nodesTobeIncluded); } private static void printUsage(PrintStream out) { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index fe774aaac9..e9c86dfe3f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -18,17 +18,23 @@ package org.apache.hadoop.hdfs.server.balancer; import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; +import java.io.File; import java.io.IOException; +import java.io.PrintWriter; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.HashSet; import java.util.List; import java.util.Random; +import java.util.Set; import java.util.concurrent.TimeoutException; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; @@ -48,6 +54,8 @@ import org.apache.hadoop.hdfs.protocol.HdfsConstants.DatanodeReportType; import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.server.balancer.Balancer.Cli; +import org.apache.hadoop.hdfs.server.balancer.Balancer.Parameters; +import org.apache.hadoop.hdfs.server.datanode.DataNode; import org.apache.hadoop.hdfs.server.datanode.SimulatedFSDataset; import org.apache.hadoop.util.Time; import org.apache.hadoop.util.Tool; @@ -255,6 +263,18 @@ static void waitForHeartBeat(long expectedUsedSpace, } } } + + /** + * Wait until balanced: each datanode gives utilization within + * BALANCE_ALLOWED_VARIANCE of average + * @throws IOException + * @throws TimeoutException + */ + static void waitForBalancer(long totalUsedSpace, long totalCapacity, + ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p) + throws IOException, TimeoutException { + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, 0); + } /** * Wait until balanced: each datanode gives utilization within @@ -263,11 +283,17 @@ static void waitForHeartBeat(long expectedUsedSpace, * @throws TimeoutException */ static void waitForBalancer(long totalUsedSpace, long totalCapacity, - ClientProtocol client, MiniDFSCluster cluster) - throws IOException, TimeoutException { + ClientProtocol client, MiniDFSCluster cluster, Balancer.Parameters p, + int expectedExcludedNodes) throws IOException, TimeoutException { long timeout = TIMEOUT; long failtime = (timeout <= 0L) ? Long.MAX_VALUE : Time.now() + timeout; + if (!p.nodesToBeIncluded.isEmpty()) { + totalCapacity = p.nodesToBeIncluded.size() * CAPACITY; + } + if (!p.nodesToBeExcluded.isEmpty()) { + totalCapacity -= p.nodesToBeExcluded.size() * CAPACITY; + } final double avgUtilization = ((double)totalUsedSpace) / totalCapacity; boolean balanced; do { @@ -275,9 +301,20 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, client.getDatanodeReport(DatanodeReportType.ALL); assertEquals(datanodeReport.length, cluster.getDataNodes().size()); balanced = true; + int actualExcludedNodeCount = 0; for (DatanodeInfo datanode : datanodeReport) { double nodeUtilization = ((double)datanode.getDfsUsed()) / datanode.getCapacity(); + if (Balancer.Util.shouldBeExcluded(p.nodesToBeExcluded, datanode)) { + assertTrue(nodeUtilization == 0); + actualExcludedNodeCount++; + continue; + } + if (!Balancer.Util.shouldBeIncluded(p.nodesToBeIncluded, datanode)) { + assertTrue(nodeUtilization == 0); + actualExcludedNodeCount++; + continue; + } if (Math.abs(avgUtilization - nodeUtilization) > BALANCE_ALLOWED_VARIANCE) { balanced = false; if (Time.now() > failtime) { @@ -294,6 +331,7 @@ static void waitForBalancer(long totalUsedSpace, long totalCapacity, break; } } + assertEquals(expectedExcludedNodes,actualExcludedNodeCount); } while (!balanced); } @@ -307,22 +345,118 @@ String long2String(long[] array) { } return b.append("]").toString(); } - /** This test start a cluster with specified number of nodes, + /** + * Class which contains information about the + * new nodes to be added to the cluster for balancing. + */ + static abstract class NewNodeInfo { + + Set nodesToBeExcluded = new HashSet(); + Set nodesToBeIncluded = new HashSet(); + + abstract String[] getNames(); + abstract int getNumberofNewNodes(); + abstract int getNumberofIncludeNodes(); + abstract int getNumberofExcludeNodes(); + + public Set getNodesToBeIncluded() { + return nodesToBeIncluded; + } + public Set getNodesToBeExcluded() { + return nodesToBeExcluded; + } + } + + /** + * The host names of new nodes are specified + */ + static class HostNameBasedNodes extends NewNodeInfo { + String[] hostnames; + + public HostNameBasedNodes(String[] hostnames, + Set nodesToBeExcluded, Set nodesToBeIncluded) { + this.hostnames = hostnames; + this.nodesToBeExcluded = nodesToBeExcluded; + this.nodesToBeIncluded = nodesToBeIncluded; + } + + @Override + String[] getNames() { + return hostnames; + } + @Override + int getNumberofNewNodes() { + return hostnames.length; + } + @Override + int getNumberofIncludeNodes() { + return nodesToBeIncluded.size(); + } + @Override + int getNumberofExcludeNodes() { + return nodesToBeExcluded.size(); + } + } + + /** + * The number of data nodes to be started are specified. + * The data nodes will have same host name, but different port numbers. + * + */ + static class PortNumberBasedNodes extends NewNodeInfo { + int newNodes; + int excludeNodes; + int includeNodes; + + public PortNumberBasedNodes(int newNodes, int excludeNodes, int includeNodes) { + this.newNodes = newNodes; + this.excludeNodes = excludeNodes; + this.includeNodes = includeNodes; + } + + @Override + String[] getNames() { + return null; + } + @Override + int getNumberofNewNodes() { + return newNodes; + } + @Override + int getNumberofIncludeNodes() { + return includeNodes; + } + @Override + int getNumberofExcludeNodes() { + return excludeNodes; + } + } + + private void doTest(Configuration conf, long[] capacities, String[] racks, + long newCapacity, String newRack, boolean useTool) throws Exception { + doTest(conf, capacities, racks, newCapacity, newRack, null, useTool, false); + } + + /** This test start a cluster with specified number of nodes, * and fills it to be 30% full (with a single file replicated identically * to all datanodes); * It then adds one new empty node and starts balancing. - * + * * @param conf - configuration * @param capacities - array of capacities of original nodes in cluster * @param racks - array of racks for original nodes in cluster * @param newCapacity - new node's capacity * @param newRack - new node's rack + * @param nodes - information about new nodes to be started. * @param useTool - if true run test via Cli with command-line argument * parsing, etc. Otherwise invoke balancer API directly. + * @param useFile - if true, the hosts to included or excluded will be stored in a + * file and then later read from the file. * @throws Exception */ - private void doTest(Configuration conf, long[] capacities, String[] racks, - long newCapacity, String newRack, boolean useTool) throws Exception { + private void doTest(Configuration conf, long[] capacities, + String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, + boolean useTool, boolean useFile) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -346,17 +480,75 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, long totalUsedSpace = totalCapacity*3/10; createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); - // start up an empty node with the same capacity and on the same rack - cluster.startDataNodes(conf, 1, true, null, - new String[]{newRack}, new long[]{newCapacity}); - totalCapacity += newCapacity; + if (nodes == null) { // there is no specification of new nodes. + // start up an empty node with the same capacity and on the same rack + cluster.startDataNodes(conf, 1, true, null, + new String[]{newRack}, null,new long[]{newCapacity}); + totalCapacity += newCapacity; + } else { + //if running a test with "include list", include original nodes as well + if (nodes.getNumberofIncludeNodes()>0) { + for (DataNode dn: cluster.getDataNodes()) + nodes.getNodesToBeIncluded().add(dn.getDatanodeId().getHostName()); + } + String[] newRacks = new String[nodes.getNumberofNewNodes()]; + long[] newCapacities = new long[nodes.getNumberofNewNodes()]; + for (int i=0; i < nodes.getNumberofNewNodes(); i++) { + newRacks[i] = newRack; + newCapacities[i] = newCapacity; + } + // if host names are specified for the new nodes to be created. + if (nodes.getNames() != null) { + cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, + newRacks, nodes.getNames(), newCapacities); + totalCapacity += newCapacity*nodes.getNumberofNewNodes(); + } else { // host names are not specified + cluster.startDataNodes(conf, nodes.getNumberofNewNodes(), true, null, + newRacks, null, newCapacities); + totalCapacity += newCapacity*nodes.getNumberofNewNodes(); + //populate the include nodes + if (nodes.getNumberofIncludeNodes() > 0) { + int totalNodes = cluster.getDataNodes().size(); + for (int i=0; i < nodes.getNumberofIncludeNodes(); i++) { + nodes.getNodesToBeIncluded().add (cluster.getDataNodes().get( + totalNodes-1-i).getDatanodeId().getXferAddr()); + } + } + //polulate the exclude nodes + if (nodes.getNumberofExcludeNodes() > 0) { + int totalNodes = cluster.getDataNodes().size(); + for (int i=0; i < nodes.getNumberofExcludeNodes(); i++) { + nodes.getNodesToBeExcluded().add (cluster.getDataNodes().get( + totalNodes-1-i).getDatanodeId().getXferAddr()); + } + } + } + } + // run balancer and validate results + Balancer.Parameters p = Balancer.Parameters.DEFAULT; + if (nodes != null) { + p = new Balancer.Parameters( + Balancer.Parameters.DEFAULT.policy, + Balancer.Parameters.DEFAULT.threshold, + nodes.getNodesToBeExcluded(), nodes.getNodesToBeIncluded()); + } + + int expectedExcludedNodes = 0; + if (nodes != null) { + if (!nodes.getNodesToBeExcluded().isEmpty()) { + expectedExcludedNodes = nodes.getNodesToBeExcluded().size(); + } else if (!nodes.getNodesToBeIncluded().isEmpty()) { + expectedExcludedNodes = + cluster.getDataNodes().size() - nodes.getNodesToBeIncluded().size(); + } + } // run balancer and validate results if (useTool) { - runBalancerCli(conf, totalUsedSpace, totalCapacity); + runBalancerCli(conf, totalUsedSpace, totalCapacity, p, useFile, expectedExcludedNodes); } else { - runBalancer(conf, totalUsedSpace, totalCapacity); + runBalancer(conf, totalUsedSpace, totalCapacity, p, expectedExcludedNodes); } } finally { cluster.shutdown(); @@ -365,11 +557,17 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, private void runBalancer(Configuration conf, long totalUsedSpace, long totalCapacity) throws Exception { + runBalancer(conf, totalUsedSpace, totalCapacity, Balancer.Parameters.DEFAULT, 0); + } + + private void runBalancer(Configuration conf, + long totalUsedSpace, long totalCapacity, Balancer.Parameters p, + int excludedNodes) throws Exception { waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + final int r = Balancer.run(namenodes, p, conf); if (conf.getInt(DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT) ==0) { assertEquals(Balancer.ReturnStatus.NO_MOVE_PROGRESS.code, r); @@ -379,22 +577,66 @@ private void runBalancer(Configuration conf, } waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, excludedNodes); } - - private void runBalancerCli(Configuration conf, - long totalUsedSpace, long totalCapacity) throws Exception { - waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); - final String[] args = { "-policy", "datanode" }; + private void runBalancerCli(Configuration conf, + long totalUsedSpace, long totalCapacity, + Balancer.Parameters p, boolean useFile, int expectedExcludedNodes) throws Exception { + waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); + List args = new ArrayList(); + args.add("-policy"); + args.add("datanode"); + + File excludeHostsFile = null; + if (!p.nodesToBeExcluded.isEmpty()) { + args.add("-exclude"); + if (useFile) { + excludeHostsFile = new File ("exclude-hosts-file"); + PrintWriter pw = new PrintWriter(excludeHostsFile); + for (String host: p.nodesToBeExcluded) { + pw.write( host + "\n"); + } + pw.close(); + args.add("-f"); + args.add("exclude-hosts-file"); + } else { + args.add(StringUtils.join(p.nodesToBeExcluded, ',')); + } + } + + File includeHostsFile = null; + if (!p.nodesToBeIncluded.isEmpty()) { + args.add("-include"); + if (useFile) { + includeHostsFile = new File ("include-hosts-file"); + PrintWriter pw = new PrintWriter(includeHostsFile); + for (String host: p.nodesToBeIncluded){ + pw.write( host + "\n"); + } + pw.close(); + args.add("-f"); + args.add("include-hosts-file"); + } else { + args.add(StringUtils.join(p.nodesToBeIncluded, ',')); + } + } + final Tool tool = new Cli(); tool.setConf(conf); - final int r = tool.run(args); // start rebalancing + final int r = tool.run(args.toArray(new String[0])); // start rebalancing assertEquals("Tools should exit 0 on success", 0, r); waitForHeartBeat(totalUsedSpace, totalCapacity, client, cluster); LOG.info("Rebalancing with default ctor."); - waitForBalancer(totalUsedSpace, totalCapacity, client, cluster); + waitForBalancer(totalUsedSpace, totalCapacity, client, cluster, p, expectedExcludedNodes); + + if (excludeHostsFile != null && excludeHostsFile.exists()) { + excludeHostsFile.delete(); + } + if (includeHostsFile != null && includeHostsFile.exists()) { + includeHostsFile.delete(); + } } /** one-node cluster test*/ @@ -440,7 +682,7 @@ public void testBalancerCliParseWithThresholdOutOfBoundaries() { } } - /** Test a cluster with even distribution, + /** Test a cluster with even distribution, * then a new empty node is added to the cluster*/ @Test(timeout=100000) public void testBalancer0() throws Exception { @@ -554,7 +796,13 @@ public void testBalancerCliParseWithWrongParams() { } catch (IllegalArgumentException e) { } + parameters = new String[] {"-include", "testnode1", "-exclude", "testnode2"}; + try { + Balancer.Cli.parse(parameters); + fail("IllegalArgumentException is expected when both -exclude and -include are specified"); + } catch (IllegalArgumentException e) { + } } /** @@ -569,6 +817,183 @@ public void testExitZeroOnSuccess() throws Exception { oneNodeTest(conf, true); } + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerWithExcludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set excludeHosts = new HashSet(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerWithExcludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set excludeHosts = new HashSet(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, excludeHosts, + Parameters.DEFAULT.nodesToBeIncluded), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list in a file + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set excludeHosts = new HashSet(); + excludeHosts.add( "datanodeY"); + excludeHosts.add( "datanodeZ"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + excludeHosts, Parameters.DEFAULT.nodesToBeIncluded), true, true); + } + + /** + * Test a cluster with even distribution,G + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the exclude list + */ + @Test(timeout=100000) + public void testBalancerCliWithExcludeListWithPortsInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 2, 0), true, true); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerWithIncludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeHosts = new HashSet(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerWithIncludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), false, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeList() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeHosts = new HashSet(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListWithPorts() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, false); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + Set includeHosts = new HashSet(); + includeHosts.add( "datanodeY"); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, CAPACITY, RACK2, + new HostNameBasedNodes(new String[] {"datanodeX", "datanodeY", "datanodeZ"}, + Parameters.DEFAULT.nodesToBeExcluded, includeHosts), true, true); + } + + /** + * Test a cluster with even distribution, + * then three nodes are added to the cluster, + * runs balancer with two of the nodes in the include list + */ + @Test(timeout=100000) + public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + doTest(conf, new long[]{CAPACITY, CAPACITY}, new String[]{RACK0, RACK1}, + CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); + } + /** * @param args */ diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java index 1a309910eb..9652f8636a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithHANameNodes.java @@ -97,10 +97,10 @@ public void testBalancerWithHANameNodes() throws Exception { Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); assertEquals(1, namenodes.size()); assertTrue(namenodes.contains(HATestUtil.getLogicalUri(cluster))); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); TestBalancer.waitForBalancer(totalUsedSpace, totalCapacity, client, - cluster); + cluster, Balancer.Parameters.DEFAULT); } finally { cluster.shutdown(); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java index f5848041bc..1a7ddd331b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithMultipleNameNodes.java @@ -159,7 +159,7 @@ static void runBalancer(Suite s, // start rebalancing final Collection namenodes = DFSUtil.getNsServiceRpcUris(s.conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, s.conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, s.conf); Assert.assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); LOG.info("BALANCER 2"); @@ -195,7 +195,7 @@ static void runBalancer(Suite s, balanced = true; for(int d = 0; d < used.length; d++) { final double p = used[d]*100.0/cap[d]; - balanced = p <= avg + Balancer.Parameters.DEFALUT.threshold; + balanced = p <= avg + Balancer.Parameters.DEFAULT.threshold; if (!balanced) { if (i % 100 == 0) { LOG.warn("datanodes " + d + " is not yet balanced: " diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java index 667204c0c9..153ced3a24 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerWithNodeGroup.java @@ -175,7 +175,7 @@ private void runBalancer(Configuration conf, // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); assertEquals(Balancer.ReturnStatus.SUCCESS.code, r); waitForHeartBeat(totalUsedSpace, totalCapacity); @@ -189,7 +189,7 @@ private void runBalancerCanFinish(Configuration conf, // start rebalancing Collection namenodes = DFSUtil.getNsServiceRpcUris(conf); - final int r = Balancer.run(namenodes, Balancer.Parameters.DEFALUT, conf); + final int r = Balancer.run(namenodes, Balancer.Parameters.DEFAULT, conf); Assert.assertTrue(r == Balancer.ReturnStatus.SUCCESS.code || (r == Balancer.ReturnStatus.NO_MOVE_PROGRESS.code)); waitForHeartBeat(totalUsedSpace, totalCapacity);