From f6367c5f44a88cb5eb7edffb015b10b657504a61 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 25 Oct 2016 10:18:57 -0700 Subject: [PATCH] HDFS-11015. Enforce timeout in balancer. Contributed by Kihwal Lee. --- .../org/apache/hadoop/hdfs/DFSConfigKeys.java | 2 + .../hadoop/hdfs/server/balancer/Balancer.java | 5 +- .../hdfs/server/balancer/Dispatcher.java | 49 ++++++++++++++----- .../src/main/resources/hdfs-default.xml | 15 ++++++ 4 files changed, 58 insertions(+), 13 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java index d54c1095d8..951ad688ab 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/DFSConfigKeys.java @@ -496,6 +496,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys { public static final String DFS_BALANCER_ADDRESS_DEFAULT= "0.0.0.0:0"; public static final String DFS_BALANCER_KEYTAB_FILE_KEY = "dfs.balancer.keytab.file"; public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal"; + public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout"; + public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0; public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth"; diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java index 2037d013f0..583ade39cd 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Balancer.java @@ -282,13 +282,16 @@ static int getInt(Configuration conf, String key, int defaultValue) { final long getBlocksMinBlockSize = getLongBytes(conf, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_DEFAULT); + final int blockMoveTimeout = conf.getInt( + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT, + DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT); this.nnc = theblockpool; this.dispatcher = new Dispatcher(theblockpool, p.getIncludedNodes(), p.getExcludedNodes(), movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize, - getBlocksMinBlockSize, conf); + getBlocksMinBlockSize, blockMoveTimeout, conf); this.threshold = p.getThreshold(); this.policy = p.getBalancingPolicy(); this.sourceNodes = p.getSourceNodes(); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java index e5c5e531cf..e090174454 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/balancer/Dispatcher.java @@ -121,6 +121,7 @@ public class Dispatcher { private final long getBlocksSize; private final long getBlocksMinBlockSize; + private final long blockMoveTimeout; private final int ioFileBufferSize; @@ -331,6 +332,11 @@ private void dispatch() { getXferAddr(Dispatcher.this.connectToDnViaHostname)), HdfsConstants.READ_TIMEOUT); + // Set read timeout so that it doesn't hang forever against + // unresponsive nodes. Datanode normally sends IN_PROGRESS response + // twice within the client read timeout period (every 30 seconds by + // default). Here, we make it give up after 5 minutes of no response. + sock.setSoTimeout(HdfsConstants.READ_TIMEOUT * 5); sock.setKeepAlive(true); OutputStream unbufOut = sock.getOutputStream(); @@ -386,13 +392,26 @@ private void sendRequest(DataOutputStream out, ExtendedBlock eb, source.getDatanodeInfo().getDatanodeUuid(), proxySource.datanode); } + /** Check whether to continue waiting for response */ + private boolean stopWaitingForResponse(long startTime) { + return source.isIterationOver() || + (blockMoveTimeout > 0 && + (Time.monotonicNow() - startTime > blockMoveTimeout)); + } + /** Receive a reportedBlock copy response from the input stream */ private void receiveResponse(DataInputStream in) throws IOException { + long startTime = Time.monotonicNow(); BlockOpResponseProto response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); while (response.getStatus() == Status.IN_PROGRESS) { // read intermediate responses response = BlockOpResponseProto.parseFrom(vintPrefixed(in)); + // Stop waiting for slow block moves. Even if it stops waiting, + // the actual move may continue. + if (stopWaitingForResponse(startTime)) { + throw new IOException("Block move timed out"); + } } String logInfo = "reportedBlock move is failed"; DataTransferProtoUtil.checkBlockOpStatus(response, logInfo); @@ -671,6 +690,7 @@ public class Source extends DDatanode.StorageGroup { private final List tasks = new ArrayList(2); private long blocksToReceive = 0L; + private final long startTime = Time.monotonicNow(); /** * Source blocks point to the objects in {@link Dispatcher#globalBlocks} * because we want to keep one copy of a block and be aware that the @@ -682,6 +702,13 @@ private Source(StorageType storageType, long maxSize2Move, DDatanode dn) { dn.super(storageType, maxSize2Move); } + /** + * Check if the iteration is over + */ + public boolean isIterationOver() { + return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME); + } + /** Add a task */ void addTask(Task task) { Preconditions.checkState(task.target != this, @@ -838,24 +865,15 @@ private boolean shouldFetchMoreBlocks() { * elapsed time of the iteration has exceeded the max time limit. */ private void dispatchBlocks() { - final long startTime = Time.monotonicNow(); this.blocksToReceive = 2 * getScheduledSize(); - boolean isTimeUp = false; int noPendingMoveIteration = 0; - while (!isTimeUp && getScheduledSize() > 0 + while (getScheduledSize() > 0 && !isIterationOver() && (!srcBlocks.isEmpty() || blocksToReceive > 0)) { if (LOG.isTraceEnabled()) { LOG.trace(this + " blocksToReceive=" + blocksToReceive + ", scheduledSize=" + getScheduledSize() + ", srcBlocks#=" + srcBlocks.size()); } - // check if time is up or not - if (Time.monotonicNow() - startTime > MAX_ITERATION_TIME) { - LOG.info("Time up (max time=" + MAX_ITERATION_TIME/1000 - + " seconds). Skipping " + this); - isTimeUp = true; - continue; - } final PendingMove p = chooseNextMove(); if (p != null) { // Reset no pending move counter @@ -902,6 +920,11 @@ private void dispatchBlocks() { } catch (InterruptedException ignored) { } } + + if (isIterationOver()) { + LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000 + + " seconds) has been reached. Stopping " + this); + } } @Override @@ -921,13 +944,14 @@ public Dispatcher(NameNodeConnector nnc, Set includedNodes, int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) { this(nnc, includedNodes, excludedNodes, movedWinWidth, moverThreads, dispatcherThreads, maxConcurrentMovesPerNode, - 0L, 0L, conf); + 0L, 0L, 0, conf); } Dispatcher(NameNodeConnector nnc, Set includedNodes, Set excludedNodes, long movedWinWidth, int moverThreads, int dispatcherThreads, int maxConcurrentMovesPerNode, - long getBlocksSize, long getBlocksMinBlockSize, Configuration conf) { + long getBlocksSize, long getBlocksMinBlockSize, + int blockMoveTimeout, Configuration conf) { this.nnc = nnc; this.excludedNodes = excludedNodes; this.includedNodes = includedNodes; @@ -942,6 +966,7 @@ public Dispatcher(NameNodeConnector nnc, Set includedNodes, this.getBlocksSize = getBlocksSize; this.getBlocksMinBlockSize = getBlocksMinBlockSize; + this.blockMoveTimeout = blockMoveTimeout; this.saslClient = new SaslDataTransferClient(conf, DataTransferSaslUtil.getSaslPropertiesResolver(conf), diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml index 483663efdb..61a706309a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/resources/hdfs-default.xml @@ -3227,6 +3227,21 @@ + + dfs.balancer.block-move.timeout + 0 + + Maximum amount of time in milliseconds for a block to move. If this is set + greater than 0, Balancer will stop waiting for a block move completion + after this time. In typical clusters, a 3 to 5 minute timeout is reasonable. + If timeout happens to a large proportion of block moves, this needs to be + increased. It could also be that too much work is dispatched and many nodes + are constantly exceeding the bandwidth limit as a result. In that case, + other balancer parameters might need to be adjusted. + It is disabled (0) by default. + + + dfs.block.invalidate.limit 1000