diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index b79ab5b5bc..7d7def2f1e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -208,6 +208,8 @@ public class Balancer { + "\n\t[-sortTopNodes]" + "\tSort datanodes based on the utilization so " + "that highly utilized datanodes get scheduled first." + + "\n\t[-limitOverUtilizedNum ]" + + "\tLimit the maximum number of overUtilized datanodes." + "\n\t[-hotBlockTimeInterval]\tprefer to move cold blocks."; @VisibleForTesting @@ -227,6 +229,7 @@ public class Balancer { private final long maxSizeToMove; private final long defaultBlockSize; private final boolean sortTopNodes; + private final int limitOverUtilizedNum; private final BalancerMetrics metrics; // all data node lists @@ -352,6 +355,7 @@ static int getFailedTimesSinceLastSuccessfulBalance() { this.sourceNodes = p.getSourceNodes(); this.runDuringUpgrade = p.getRunDuringUpgrade(); this.sortTopNodes = p.getSortTopNodes(); + this.limitOverUtilizedNum = p.getLimitOverUtilizedNum(); this.maxSizeToMove = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, @@ -456,11 +460,18 @@ private long init(List reports) { sortOverUtilized(overUtilizedPercentage); } + // Limit the maximum number of overUtilized datanodes + // If excludedOverUtilizedNum is greater than 0, The overUtilized nodes num is limited + int excludedOverUtilizedNum = Math.max(overUtilized.size() - limitOverUtilizedNum, 0); + if (excludedOverUtilizedNum > 0) { + limitOverUtilizedNum(); + } + logUtilizationCollections(); metrics.setNumOfOverUtilizedNodes(overUtilized.size()); metrics.setNumOfUnderUtilizedNodes(underUtilized.size()); - Preconditions.checkState(dispatcher.getStorageGroupMap().size() + Preconditions.checkState(dispatcher.getStorageGroupMap().size() - excludedOverUtilizedNum == overUtilized.size() + underUtilized.size() + aboveAvgUtilized.size() + belowAvgUtilized.size(), "Mismatched number of storage groups"); @@ -484,6 +495,20 @@ private void sortOverUtilized(Map overUtilizedPercentage) { ); } + private void limitOverUtilizedNum() { + Preconditions.checkState(overUtilized instanceof LinkedList, + "Collection overUtilized is not a LinkedList."); + LinkedList list = (LinkedList) overUtilized; + + LOG.info("Limiting over-utilized nodes num, if using the '-sortTopNodes' param," + + " the overUtilized nodes of top will be retained"); + + int size = overUtilized.size(); + for (int i = 0; i < size - limitOverUtilizedNum; i++) { + list.removeLast(); + } + } + private static long computeMaxSize2Move(final long capacity, final long remaining, final double utilizationDiff, final long max) { final double diff = Math.abs(utilizationDiff); @@ -1071,6 +1096,14 @@ static BalancerParameters parse(String[] args) { b.setSortTopNodes(true); LOG.info("Balancer will sort nodes by" + " capacity usage percentage to prioritize top used nodes"); + } else if ("-limitOverUtilizedNum".equalsIgnoreCase(args[i])) { + Preconditions.checkArgument(++i < args.length, + "limitOverUtilizedNum value is missing: args = " + Arrays.toString(args)); + int limitNum = Integer.parseInt(args[i]); + Preconditions.checkArgument(limitNum >= 0, + "limitOverUtilizedNum must be non-negative"); + LOG.info("Using a limitOverUtilizedNum of {}", limitNum); + b.setLimitOverUtilizedNum(limitNum); } else { throw new IllegalArgumentException("args = " + Arrays.toString(args)); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java index 2b53c15d1d..7633be1301 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/BalancerParameters.java @@ -50,6 +50,8 @@ final class BalancerParameters { private final boolean sortTopNodes; + private final int limitOverUtilizedNum; + static final BalancerParameters DEFAULT = new BalancerParameters(); private BalancerParameters() { @@ -67,6 +69,7 @@ private BalancerParameters(Builder builder) { this.runDuringUpgrade = builder.runDuringUpgrade; this.runAsService = builder.runAsService; this.sortTopNodes = builder.sortTopNodes; + this.limitOverUtilizedNum = builder.limitOverUtilizedNum; this.hotBlockTimeInterval = builder.hotBlockTimeInterval; } @@ -110,6 +113,10 @@ boolean getSortTopNodes() { return this.sortTopNodes; } + int getLimitOverUtilizedNum() { + return this.limitOverUtilizedNum; + } + long getHotBlockTimeInterval() { return this.hotBlockTimeInterval; } @@ -120,12 +127,12 @@ public String toString() { + " max idle iteration = %s," + " #excluded nodes = %s," + " #included nodes = %s," + " #source nodes = %s," + " #blockpools = %s," + " run during upgrade = %s," - + " sort top nodes = %s," + + " sort top nodes = %s," + " limit overUtilized nodes num = %s," + " hot block time interval = %s]", Balancer.class.getSimpleName(), getClass().getSimpleName(), policy, threshold, maxIdleIteration, excludedNodes.size(), includedNodes.size(), sourceNodes.size(), blockpools.size(), - runDuringUpgrade, sortTopNodes, hotBlockTimeInterval); + runDuringUpgrade, sortTopNodes, limitOverUtilizedNum, hotBlockTimeInterval); } static class Builder { @@ -141,6 +148,7 @@ static class Builder { private boolean runDuringUpgrade = false; private boolean runAsService = false; private boolean sortTopNodes = false; + private int limitOverUtilizedNum = Integer.MAX_VALUE; private long hotBlockTimeInterval = 0; Builder() { @@ -201,6 +209,11 @@ Builder setSortTopNodes(boolean shouldSortTopNodes) { return this; } + Builder setLimitOverUtilizedNum(int overUtilizedNum) { + this.limitOverUtilizedNum = overUtilizedNum; + return this; + } + BalancerParameters build() { return new BalancerParameters(this); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md index 1a79d8e864..c065eb4c8d 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md +++ b/hadoop-hdfs-project/hadoop-hdfs/src/site/markdown/HDFSCommands.md @@ -293,6 +293,7 @@ Usage: [-runDuringUpgrade] [-asService] [-sortTopNodes] + [-limitOverUtilizedNum ] [-hotBlockTimeInterval ] | COMMAND\_OPTION | Description | @@ -307,6 +308,7 @@ Usage: | `-runDuringUpgrade` | Whether to run the balancer during an ongoing HDFS upgrade. This is usually not desired since it will not affect used space on over-utilized machines. | | `-asService` | Run Balancer as a long running service. | | `-sortTopNodes` | Sort datanodes based on the utilization so that highly utilized datanodes get scheduled first. | +| `-limitOverUtilizedNum` | Limit the maximum number of overUtilized datanodes. | | `-hotBlockTimeInterval` | Prefer moving cold blocks i.e blocks associated with files accessed or modified before the specified time interval. | | `-h`\|`--help` | Display the tool usage and help information and exit. | diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java index ba36a42e0c..adca5ff093 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerLongRunningTasks.java @@ -672,6 +672,98 @@ public void testBalancerWithSortTopNodes() throws Exception { assertEquals(900, maxUsage); } + @Test(timeout = 60000) + public void testBalancerWithLimitOverUtilizedNum() throws Exception { + final Configuration conf = new HdfsConfiguration(); + // Init the config (block size to 100) + initConf(conf); + conf.setInt(DFS_HEARTBEAT_INTERVAL_KEY, 30000); + + final long totalCapacity = 1000L; + final int diffBetweenNodes = 50; + + // Set up the nodes with two groups: + // 5 over-utilized nodes with 80%, 85%, 90%, 95%, 100% usage + // 2 under-utilized nodes with 0%, 5% usage + // With sortTopNodes and limitOverUtilizedNum option, 100% used ones will be chosen + final int numOfOverUtilizedDn = 5; + final int numOfUnderUtilizedDn = 2; + final int totalNumOfDn = numOfOverUtilizedDn + numOfUnderUtilizedDn; + final long[] capacityArray = new long[totalNumOfDn]; + Arrays.fill(capacityArray, totalCapacity); + + try (MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(totalNumOfDn) + .simulatedCapacities(capacityArray) + .build()) { + cluster.setDataNodesDead(); + List dataNodes = cluster.getDataNodes(); + // Create top used nodes + for (int i = 0; i < numOfOverUtilizedDn; i++) { + // Bring one node alive + DataNodeTestUtils.triggerHeartbeat(dataNodes.get(i)); + DataNodeTestUtils.triggerBlockReport(dataNodes.get(i)); + // Create nodes with: 80%, 85%, 90%, 95%, 100% + int nodeCapacity = (int) totalCapacity - diffBetweenNodes * (numOfOverUtilizedDn - i - 1); + TestBalancer.createFile(cluster, new Path("test_big" + i), nodeCapacity, (short) 1, 0); + cluster.setDataNodesDead(); + } + + // Create under utilized nodes + for (int i = numOfUnderUtilizedDn - 1; i >= 0; i--) { + int index = i + numOfOverUtilizedDn; + // Bring one node alive + DataNodeTestUtils.triggerHeartbeat(dataNodes.get(index)); + DataNodeTestUtils.triggerBlockReport(dataNodes.get(index)); + // Create nodes with: 5%, 0% + int nodeCapacity = diffBetweenNodes * i; + TestBalancer.createFile(cluster, new Path("test_small" + i), nodeCapacity, (short) 1, 0); + cluster.setDataNodesDead(); + } + + // Bring all nodes alive + cluster.triggerHeartbeats(); + cluster.triggerBlockReports(); + cluster.waitFirstBRCompleted(0, 6000); + + final BalancerParameters balancerParameters = Balancer.Cli.parse(new String[] { + "-policy", BalancingPolicy.Node.INSTANCE.getName(), + "-threshold", "1", + "-sortTopNodes", + "-limitOverUtilizedNum", "1" + }); + + client = NameNodeProxies.createProxy(conf, cluster.getFileSystem(0) + .getUri(), ClientProtocol.class) + .getProxy(); + + // Set max-size-to-move to small number + // so only top two nodes will be chosen in one iteration + conf.setLong(DFS_BALANCER_MAX_SIZE_TO_MOVE_KEY, 99L); + final Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + List connectors = + NameNodeConnector.newNameNodeConnectors(namenodes, Balancer.class.getSimpleName(), + Balancer.BALANCER_ID_PATH, conf, BalancerParameters.DEFAULT.getMaxIdleIteration()); + final Balancer balancer = new Balancer(connectors.get(0), balancerParameters, conf); + Balancer.Result balancerResult = balancer.runOneIteration(); + + cluster.triggerDeletionReports(); + cluster.triggerBlockReports(); + cluster.triggerHeartbeats(); + + DatanodeInfo[] datanodeReport = + client.getDatanodeReport(HdfsConstants.DatanodeReportType.ALL); + long maxUsage = 0; + for (int i = 0; i < totalNumOfDn; i++) { + maxUsage = Math.max(maxUsage, datanodeReport[i].getDfsUsed()); + } + // The maxUsage value is 950, only 100% of the nodes will be balanced + assertEquals(950, maxUsage); + assertTrue("BalancerResult is not as expected. " + balancerResult, + (balancerResult.getBytesAlreadyMoved() == 100 && balancerResult.getBlocksMoved() == 1)); + } + } + @Test(timeout = 100000) public void testMaxIterationTime() throws Exception { final Configuration conf = new HdfsConfiguration();