diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index 16a29dd3de..a2df31707f 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -614,6 +614,14 @@ public class DFSConfigKeys extends CommonConfigurationKeys { "dfs.datanode.metrics.logger.period.seconds"; public static final int DFS_DATANODE_METRICS_LOGGER_PERIOD_SECONDS_DEFAULT = 600; + /** + * The maximum number of getBlocks RPCs data movement utilities can make to + * a NameNode per second. Values <= 0 disable throttling. This affects + * anything that uses a NameNodeConnector, i.e., the Balancer, Mover, + * and StoragePolicySatisfier. + */ + public static final String DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY = "dfs.namenode.get-blocks.max-qps"; + public static final int DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT = 20; public static final String DFS_BALANCER_MOVEDWINWIDTH_KEY = "dfs.balancer.movedWinWidth"; public static final long DFS_BALANCER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index cf3dc3b722..c222270882 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -928,10 +928,8 @@ private boolean shouldFetchMoreBlocks() { * move tasks or it has received enough blocks from the namenode, or the * elapsed time of the iteration has exceeded the max time limit. * - * @param delay - time to sleep before sending getBlocks. Intended to - * disperse Balancer RPCs to NameNode for large clusters. See HDFS-11384. */ - private void dispatchBlocks(long delay) { + private void dispatchBlocks() { this.blocksToReceive = 2 * getScheduledSize(); long previousMoveTimestamp = Time.monotonicNow(); while (getScheduledSize() > 0 && !isIterationOver() @@ -956,25 +954,15 @@ private void dispatchBlocks(long delay) { if (shouldFetchMoreBlocks()) { // fetch new blocks try { - if(delay > 0) { - if (LOG.isDebugEnabled()) { - LOG.debug("Sleeping " + delay + " msec."); - } - Thread.sleep(delay); - } final long received = getBlockList(); if (received == 0) { return; } blocksToReceive -= received; continue; - } catch (InterruptedException ignored) { - // nothing to do } catch (IOException e) { LOG.warn("Exception while getting reportedBlock list", e); return; - } finally { - delay = 0L; } } else { // jump out of while-loop after the configured timeout. @@ -1166,12 +1154,6 @@ public boolean dispatchAndCheckContinue() throws InterruptedException { return nnc.shouldContinue(dispatchBlockMoves()); } - /** - * The best-effort limit on the number of RPCs per second - * the Balancer will send to the NameNode. - */ - final static int BALANCER_NUM_RPC_PER_SEC = 20; - /** * Dispatch block moves for each source. The thread selects blocks to move & * sends request to proxy source to initiate block move. The process is flow @@ -1187,12 +1169,7 @@ private long dispatchBlockMoves() throws InterruptedException { int concurrentThreads = Math.min(sources.size(), ((ThreadPoolExecutor)dispatchExecutor).getCorePoolSize()); assert concurrentThreads > 0 : "Number of concurrent threads is 0."; - if (LOG.isDebugEnabled()) { - LOG.debug("Balancer allowed RPCs per sec = " + BALANCER_NUM_RPC_PER_SEC); - LOG.debug("Balancer concurrent threads = " + concurrentThreads); - LOG.debug("Disperse Interval sec = " + - concurrentThreads / BALANCER_NUM_RPC_PER_SEC); - } + LOG.debug("Balancer concurrent dispatcher threads = {}", concurrentThreads); // Determine the size of each mover thread pool per target int threadsPerTarget = maxMoverThreads/targets.size(); @@ -1212,23 +1189,15 @@ private long dispatchBlockMoves() throws InterruptedException { LOG.info("Allocating " + threadsPerTarget + " threads per target."); } - long dSec = 0; final Iterator i = sources.iterator(); for (int j = 0; j < futures.length; j++) { final Source s = i.next(); - final long delay = dSec * 1000; futures[j] = dispatchExecutor.submit(new Runnable() { @Override public void run() { - s.dispatchBlocks(delay); + s.dispatchBlocks(); } }); - // Calculate delay in seconds for the next iteration - if(j >= concurrentThreads) { - dSec = 0; - } else if((j + 1) % BALANCER_NUM_RPC_PER_SEC == 0) { - dSec++; - } } // wait for all dispatcher threads to finish diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java index 3be7530e76..2844ad5a94 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/NameNodeConnector.java @@ -31,6 +31,8 @@ import java.util.concurrent.atomic.AtomicLong; import com.google.common.base.Preconditions; +import com.google.common.util.concurrent.RateLimiter; +import org.apache.hadoop.hdfs.DFSConfigKeys; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience; @@ -123,6 +125,7 @@ public static void checkOtherInstanceRunning(boolean toCheck) { private final int maxNotChangedIterations; private int notChangedIterations = 0; + private final RateLimiter getBlocksRateLimiter; public NameNodeConnector(String name, URI nameNodeUri, Path idPath, List targetPaths, Configuration conf, @@ -133,6 +136,16 @@ public NameNodeConnector(String name, URI nameNodeUri, Path idPath, this.targetPaths = targetPaths == null || targetPaths.isEmpty() ? Arrays .asList(new Path("/")) : targetPaths; this.maxNotChangedIterations = maxNotChangedIterations; + int getBlocksMaxQps = conf.getInt( + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY, + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); + if (getBlocksMaxQps > 0) { + LOG.info("getBlocks calls for {} will be rate-limited to {} per second", + nameNodeUri, getBlocksMaxQps); + this.getBlocksRateLimiter = RateLimiter.create(getBlocksMaxQps); + } else { + this.getBlocksRateLimiter = null; + } this.namenode = NameNodeProxies.createProxy(conf, nameNodeUri, BalancerProtocols.class, fallbackToSimpleAuth).getProxy(); @@ -169,8 +182,10 @@ AtomicLong getBytesMoved() { /** @return blocks with locations. */ public BlocksWithLocations getBlocks(DatanodeInfo datanode, long size, long - minBlockSize) - throws IOException { + minBlockSize) throws IOException { + if (getBlocksRateLimiter != null) { + getBlocksRateLimiter.acquire(); + } return namenode.getBlocks(datanode, size, minBlockSize); } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index fa2f30e67a..98b91a63cb 100755 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3646,7 +3646,16 @@ HTTPS port for DataNode. - + + dfs.namenode.get-blocks.max-qps + 20 + + The maximum number of getBlocks RPCs data movement utilities can make to + a NameNode per second. Values less than or equal to 0 disable throttling. + This affects anything that uses a NameNodeConnector, i.e., the Balancer, + Mover, and StoragePolicySatisfier. + + dfs.balancer.dispatcherThreads 200 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index e640526729..c6c17e0d3e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -72,8 +72,11 @@ import java.util.Random; import java.util.Set; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import org.apache.commons.lang3.StringUtils; +import org.junit.Before; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.HadoopIllegalArgumentException; @@ -157,6 +160,16 @@ public class TestBalancer { private static MiniKdc kdc; private static File keytabFile; private MiniDFSCluster cluster; + private AtomicInteger numGetBlocksCalls; + private AtomicLong startGetBlocksTime; + private AtomicLong endGetBlocksTime; + + @Before + public void setup() { + numGetBlocksCalls = new AtomicInteger(0); + startGetBlocksTime = new AtomicLong(Long.MAX_VALUE); + endGetBlocksTime = new AtomicLong(Long.MIN_VALUE); + } @After public void shutdown() throws Exception { @@ -791,7 +804,7 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile) throws Exception { doTest(conf, capacities, racks, newCapacity, newRack, nodes, - useTool, useFile, false); + useTool, useFile, false, 0.3); } /** This test start a cluster with specified number of nodes, @@ -810,12 +823,14 @@ private void doTest(Configuration conf, long[] capacities, String[] racks, * @param useFile - if true, the hosts to included or excluded will be stored in a * file and then later read from the file. * @param useNamesystemSpy - spy on FSNamesystem if true + * @param clusterUtilization - The utilization of the cluster to start, from + * 0.0 to 1.0 * @throws Exception */ private void doTest(Configuration conf, long[] capacities, String[] racks, long newCapacity, String newRack, NewNodeInfo nodes, boolean useTool, boolean useFile, - boolean useNamesystemSpy) throws Exception { + boolean useNamesystemSpy, double clusterUtilization) throws Exception { LOG.info("capacities = " + long2String(capacities)); LOG.info("racks = " + Arrays.asList(racks)); LOG.info("newCapacity= " + newCapacity); @@ -845,8 +860,8 @@ private void doTest(Configuration conf, long[] capacities, long totalCapacity = sum(capacities); - // fill up the cluster to be 30% full - long totalUsedSpace = totalCapacity*3/10; + // fill up the cluster to be `clusterUtilization` full + long totalUsedSpace = (long) (totalCapacity * clusterUtilization); createFile(cluster, filePath, totalUsedSpace / numOfDatanodes, (short) numOfDatanodes, 0); @@ -2135,33 +2150,34 @@ public Void run() throws Exception { } } - private static int numGetBlocksCalls; - private static long startGetBlocksTime, endGetBlocksTime; - private void spyFSNamesystem(NameNode nn) throws IOException { FSNamesystem fsnSpy = NameNodeAdapter.spyOnNamesystem(nn); - numGetBlocksCalls = 0; - endGetBlocksTime = startGetBlocksTime = Time.monotonicNow(); doAnswer(new Answer() { @Override public BlocksWithLocations answer(InvocationOnMock invocation) throws Throwable { + long startTime = Time.monotonicNow(); + startGetBlocksTime.getAndUpdate((curr) -> Math.min(curr, startTime)); BlocksWithLocations blk = (BlocksWithLocations)invocation.callRealMethod(); - endGetBlocksTime = Time.monotonicNow(); - numGetBlocksCalls++; + long endTime = Time.monotonicNow(); + endGetBlocksTime.getAndUpdate((curr) -> Math.max(curr, endTime)); + numGetBlocksCalls.incrementAndGet(); return blk; }}).when(fsnSpy).getBlocks(any(DatanodeID.class), anyLong(), anyLong()); } /** * Test that makes the Balancer to disperse RPCs to the NameNode - * in order to avoid NN's RPC queue saturation. + * in order to avoid NN's RPC queue saturation. This not marked as @Test + * because it is run from {@link TestBalancerRPCDelay}. */ - void testBalancerRPCDelay() throws Exception { + void testBalancerRPCDelay(int getBlocksMaxQps) throws Exception { final Configuration conf = new HdfsConfiguration(); initConf(conf); conf.setInt(DFSConfigKeys.DFS_BALANCER_DISPATCHERTHREADS_KEY, 30); + conf.setInt(DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_KEY, + getBlocksMaxQps); int numDNs = 20; long[] capacities = new long[numDNs]; @@ -2171,16 +2187,22 @@ void testBalancerRPCDelay() throws Exception { racks[i] = (i < numDNs/2 ? RACK0 : RACK1); } doTest(conf, capacities, racks, CAPACITY, RACK2, - new PortNumberBasedNodes(3, 0, 0), false, false, true); + // Use only 1 node and set the starting capacity to 50% to allow the + // balancing to complete in only one iteration. This is necessary + // because the startGetBlocksTime and endGetBlocksTime measures across + // all get block calls, so if two iterations are performed, the duration + // also includes the time it took to perform the block move ops in the + // first iteration + new PortNumberBasedNodes(1, 0, 0), false, false, true, 0.5); assertTrue("Number of getBlocks should be not less than " + - Dispatcher.BALANCER_NUM_RPC_PER_SEC, - numGetBlocksCalls > Dispatcher.BALANCER_NUM_RPC_PER_SEC); - long d = 1 + endGetBlocksTime - startGetBlocksTime; - LOG.info("Balancer executed " + numGetBlocksCalls - + " getBlocks in " + d + " msec."); - assertTrue("Expected BALANCER_NUM_RPC_PER_SEC = " + - Dispatcher.BALANCER_NUM_RPC_PER_SEC, - (numGetBlocksCalls * 1000 / d) < Dispatcher.BALANCER_NUM_RPC_PER_SEC); + getBlocksMaxQps, numGetBlocksCalls.get() >= getBlocksMaxQps); + long durationMs = 1 + endGetBlocksTime.get() - startGetBlocksTime.get(); + int durationSec = (int) Math.ceil(durationMs / 1000.0); + LOG.info("Balancer executed {} getBlocks in {} msec (round up to {} sec)", + numGetBlocksCalls.get(), durationMs, durationSec); + long getBlockCallsPerSecond = numGetBlocksCalls.get() / durationSec; + assertTrue("Expected balancer getBlocks calls per second <= " + + getBlocksMaxQps, getBlockCallsPerSecond <= getBlocksMaxQps); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java index 960ad25766..79c7f87d4e 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancerRPCDelay.java @@ -17,6 +17,9 @@ */ package org.apache.hadoop.hdfs.server.balancer; +import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.junit.After; +import org.junit.Before; import org.junit.Test; /** @@ -25,8 +28,29 @@ */ public class TestBalancerRPCDelay { + private TestBalancer testBalancer; + + @Before + public void setup() { + testBalancer = new TestBalancer(); + testBalancer.setup(); + } + + @After + public void teardown() throws Exception { + if (testBalancer != null) { + testBalancer.shutdown(); + } + } + @Test(timeout=100000) - public void testBalancerRPCDelay() throws Exception { - new TestBalancer().testBalancerRPCDelay(); + public void testBalancerRPCDelayQps3() throws Exception { + testBalancer.testBalancerRPCDelay(3); + } + + @Test(timeout=100000) + public void testBalancerRPCDelayQpsDefault() throws Exception { + testBalancer.testBalancerRPCDelay( + DFSConfigKeys.DFS_NAMENODE_GETBLOCKS_MAX_QPS_DEFAULT); } }