From c966a3837af1c1a1c4a441f491b0d76d5c9e5d78 Mon Sep 17 00:00:00 2001 From: Wei-Chiu Chuang Date: Fri, 15 Jun 2018 13:35:50 -0700 Subject: [PATCH] HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by Istvan Fajth. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 4 +- .../hadoop/hdfs/server/balancer/Balancer.java | 6 +- .../hdfs/server/balancer/Dispatcher.java | 30 ++++--- .../src/main/resources/hdfs-default.xml | 10 +++ .../hdfs/server/balancer/TestBalancer.java | 79 +++++++++++++++++++ .../hadoop/hdfs/server/mover/TestMover.java | 46 +++++++++++ 6 files changed, 163 insertions(+), 12 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index bc8e81f976..dde7eb79c2 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -581,7 +581,9 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval"; - public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute + public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute + public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time"; + public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 13d584644d..426c7ab074 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -289,13 +289,17 @@ static int getInt(Configuration conf, String key, int defaultValue) { final int maxNoMoveInterval = conf.getInt( DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT); + final long maxIterationTime = conf.getLong( + DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, + DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT); this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, - getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf); + getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, + maxIterationTime, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index 349ced13f3..060c013e37 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -138,6 +138,8 @@ public class Dispatcher { private final boolean connectToDnViaHostname; private BlockPlacementPolicies placementPolicies; + private long maxIterationTime; + static class Allocator { private final int max; private int count = 0; @@ -346,13 +348,19 @@ private boolean addTo(StorageGroup g) { /** Dispatch the move to the proxy source & wait for the response. */ private void dispatch() { - LOG.info("Start moving " + this); - assert !(reportedBlock instanceof DBlockStriped); - Socket sock = new Socket(); DataOutputStream out = null; DataInputStream in = null; try { + if (source.isIterationOver()){ + LOG.info("Cancel moving " + this + + " as iteration is already cancelled due to" + + " dfs.balancer.max-iteration-time is passed."); + throw new IOException("Block move cancelled."); + } + LOG.info("Start moving " + this); + assert !(reportedBlock instanceof DBlockStriped); + sock.connect( NetUtils.createSocketAddr(target.getDatanodeInfo(). getXferAddr(Dispatcher.this.connectToDnViaHostname)), @@ -760,7 +768,10 @@ private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { * Check if the iteration is over */ public boolean isIterationOver() { - return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); + if (maxIterationTime < 0){ + return false; + } + return (Time.monotonicNow()-startTime > maxIterationTime); } /** Add a task */ @@ -908,8 +919,6 @@ private boolean shouldFetchMoreBlocks() { return blocksToReceive > 0; } - private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins - /** * This method iteratively does the following: it first selects a block to * move, then sends a request to the proxy source to start the block move @@ -990,7 +999,7 @@ private void dispatchBlocks(long delay) { } if (isIterationOver()) { - LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 + LOG.info("The maximum iteration time (" + maxIterationTime/1000 + " seconds) has been reached. Stopping " + this); } } @@ -1013,14 +1022,14 @@ public Dispatcher(NameNodeConnector nnc, Set includedNodes, int maxNoMoveInterval, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, 0, maxNoMoveInterval, conf); + 0L, 0L, 0, maxNoMoveInterval, -1, conf); } Dispatcher(NameNodeConnector nnc, Set includedNodes, Set excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, - long getBlocksSize, long getBlocksMinBlockSize, - int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) { + long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout, + int maxNoMoveInterval, long maxIterationTime, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -1047,6 +1056,7 @@ public Dispatcher(NameNodeConnector nnc, Set includedNodes, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME, HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT); placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null); + this.maxIterationTime = maxIterationTime; } public DistributedFileSystem getDistributedFileSystem() { diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index b55421c162..146ae6c9c6 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3540,6 +3540,16 @@ + + dfs.balancer.max-iteration-time + 1200000 + + Maximum amount of time while an iteration can be run by the Balancer. After + this time the Balancer will stop the iteration, and reevaluate the work + needs to be done to Balance the cluster. The default value is 20 minutes. + + + dfs.block.invalidate.limit 1000 diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java index 9579b82c09..fa026f0499 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/balancer/TestBalancer.java @@ -1580,6 +1580,85 @@ public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception { CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true); } + + @Test(timeout = 100000) + public void testMaxIterationTime() throws Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + int blockSize = 10*1024*1024; // 10MB block size + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + // limit the worker thread count of Balancer to have only 1 queue per DN + conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1); + // limit the bandwitdh to 1 packet per sec to emulate slow block moves + conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY, + 64 * 1024); + // set client socket timeout to have an IN_PROGRESS notification back from + // the DataNode about the copy in every second. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L); + // set max iteration time to 2 seconds to timeout before moving any block + conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L); + // setup the cluster + final long capacity = 10L * blockSize; + final long[] dnCapacities = new long[] {capacity, capacity}; + final short rep = 1; + final long seed = 0xFAFAFA; + cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(0) + .build(); + try { + cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1); + cluster.startDataNodes(conf, 1, true, null, null, dnCapacities); + cluster.waitClusterUp(); + cluster.waitActive(); + final Path path = new Path("/testMaxIterationTime.dat"); + DistributedFileSystem fs = cluster.getFileSystem(); + // fill the DN to 40% + DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed); + // start a new DN + cluster.startDataNodes(conf, 1, true, null, null, dnCapacities); + cluster.triggerHeartbeats(); + // setup Balancer and run one iteration + List connectors = Collections.emptyList(); + try { + BalancerParameters bParams = BalancerParameters.DEFAULT; + connectors = NameNodeConnector.newNameNodeConnectors( + DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(), + Balancer.BALANCER_ID_PATH, conf, bParams.getMaxIdleIteration()); + for (NameNodeConnector nnc : connectors) { + LOG.info("NNC to work on: " + nnc); + Balancer b = new Balancer(nnc, bParams, conf); + long startTime = Time.monotonicNow(); + Result r = b.runOneIteration(); + long runtime = Time.monotonicNow() - startTime; + assertEquals("We expect ExitStatus.IN_PROGRESS to be reported.", + ExitStatus.IN_PROGRESS, r.exitStatus); + // accept runtime if it is under 3.5 seconds, as we need to wait for + // IN_PROGRESS report from DN, and some spare to be able to finish. + // NOTE: This can be a source of flaky tests, if the box is busy, + // assertion here is based on the following: Balancer is already set + // up, iteration gets the blocks from the NN, and makes the decision + // to move 2 blocks. After that the PendingMoves are scheduled, and + // DataNode heartbeats in for the Balancer every second, iteration is + // two seconds long. This means that it will fail if the setup and the + // heartbeat from the DataNode takes more than 500ms, as the iteration + // should end at the 3rd second from start. As the number of + // operations seems to be pretty low, and all comm happens locally, I + // think the possibility of a failure due to node busyness is low. + assertTrue("Unexpected iteration runtime: " + runtime + "ms > 3.5s", + runtime < 3500); + } + } finally { + for (NameNodeConnector nnc : connectors) { + IOUtils.cleanupWithLogger(null, nnc); + } + } + } finally { + cluster.shutdown(true, true); + } + } + /* * Test Balancer with Ram_Disk configured * One DN has two files on RAM_DISK, other DN has no files on RAM_DISK. diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 36e7bb9840..62c91bf9e3 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -685,6 +685,52 @@ public void testMoverFailedRetry() throws Exception { } } + @Test(timeout=100000) + public void testBalancerMaxIterationTimeNotAffectMover() throws Exception { + long blockSize = 10*1024*1024; + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1); + conf.setInt( + DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1); + // set a fairly large block size to run into the limitation + conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize); + conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize); + // set a somewhat grater than zero max iteration time to have the move time + // to surely exceed it + conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L); + conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1); + // set client socket timeout to have an IN_PROGRESS notification back from + // the DataNode about the copy in every second. + conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(2) + .storageTypes( + new StorageType[][] {{StorageType.DISK, StorageType.DISK}, + {StorageType.ARCHIVE, StorageType.ARCHIVE}}) + .build(); + try { + cluster.waitActive(); + final DistributedFileSystem fs = cluster.getFileSystem(); + final String file = "/testMaxIterationTime.dat"; + final Path path = new Path(file); + short rep_factor = 1; + int seed = 0xFAFAFA; + // write to DISK + DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed); + + // move to ARCHIVE + fs.setStoragePolicy(new Path(file), "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", file}); + Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).", + ExitStatus.SUCCESS.getExitCode(), rc); + } finally { + cluster.shutdown(); + } + } + private final ErasureCodingPolicy ecPolicy = StripedFileTestUtil.getDefaultECPolicy(); private final int dataBlocks = ecPolicy.getNumDataUnits();