HDFS-10966. Enhance Dispatcher logic on deciding when to give up a source DataNode. Contributed by Mark Wagner and Zhe Zhang.
This commit is contained in:
parent
f922067748
commit
49a09179e3
@ -502,6 +502,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
|
public static final String DFS_BALANCER_KERBEROS_PRINCIPAL_KEY = "dfs.balancer.kerberos.principal";
|
||||||
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
|
public static final String DFS_BALANCER_BLOCK_MOVE_TIMEOUT = "dfs.balancer.block-move.timeout";
|
||||||
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
||||||
|
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
|
||||||
|
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||||
@ -519,6 +521,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
"dfs.mover.keytab.file";
|
"dfs.mover.keytab.file";
|
||||||
public static final String DFS_MOVER_KERBEROS_PRINCIPAL_KEY =
|
public static final String DFS_MOVER_KERBEROS_PRINCIPAL_KEY =
|
||||||
"dfs.mover.kerberos.principal";
|
"dfs.mover.kerberos.principal";
|
||||||
|
public static final String DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.mover.max-no-move-interval";
|
||||||
|
public static final int DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||||
|
|
||||||
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
|
||||||
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
public static final int DFS_DATANODE_DEFAULT_PORT = 9866;
|
||||||
|
@ -285,13 +285,16 @@ public class Balancer {
|
|||||||
final int blockMoveTimeout = conf.getInt(
|
final int blockMoveTimeout = conf.getInt(
|
||||||
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
|
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT,
|
||||||
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT);
|
||||||
|
final int maxNoMoveInterval = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
|
||||||
|
|
||||||
this.nnc = theblockpool;
|
this.nnc = theblockpool;
|
||||||
this.dispatcher =
|
this.dispatcher =
|
||||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||||
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
||||||
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
||||||
getBlocksMinBlockSize, blockMoveTimeout, conf);
|
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
|
||||||
this.threshold = p.getThreshold();
|
this.threshold = p.getThreshold();
|
||||||
this.policy = p.getBalancingPolicy();
|
this.policy = p.getBalancingPolicy();
|
||||||
this.sourceNodes = p.getSourceNodes();
|
this.sourceNodes = p.getSourceNodes();
|
||||||
|
@ -122,6 +122,11 @@ public class Dispatcher {
|
|||||||
private final long getBlocksSize;
|
private final long getBlocksSize;
|
||||||
private final long getBlocksMinBlockSize;
|
private final long getBlocksMinBlockSize;
|
||||||
private final long blockMoveTimeout;
|
private final long blockMoveTimeout;
|
||||||
|
/**
|
||||||
|
* If no block can be moved out of a {@link Source} after this configured
|
||||||
|
* amount of time, the Source should give up choosing the next possible move.
|
||||||
|
*/
|
||||||
|
private final int maxNoMoveInterval;
|
||||||
|
|
||||||
private final int ioFileBufferSize;
|
private final int ioFileBufferSize;
|
||||||
|
|
||||||
@ -866,7 +871,7 @@ public class Dispatcher {
|
|||||||
*/
|
*/
|
||||||
private void dispatchBlocks() {
|
private void dispatchBlocks() {
|
||||||
this.blocksToReceive = 2 * getScheduledSize();
|
this.blocksToReceive = 2 * getScheduledSize();
|
||||||
int noPendingMoveIteration = 0;
|
long previousMoveTimestamp = Time.monotonicNow();
|
||||||
while (getScheduledSize() > 0 && !isIterationOver()
|
while (getScheduledSize() > 0 && !isIterationOver()
|
||||||
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
&& (!srcBlocks.isEmpty() || blocksToReceive > 0)) {
|
||||||
if (LOG.isTraceEnabled()) {
|
if (LOG.isTraceEnabled()) {
|
||||||
@ -876,8 +881,8 @@ public class Dispatcher {
|
|||||||
}
|
}
|
||||||
final PendingMove p = chooseNextMove();
|
final PendingMove p = chooseNextMove();
|
||||||
if (p != null) {
|
if (p != null) {
|
||||||
// Reset no pending move counter
|
// Reset previous move timestamp
|
||||||
noPendingMoveIteration=0;
|
previousMoveTimestamp = Time.monotonicNow();
|
||||||
executePendingMove(p);
|
executePendingMove(p);
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
@ -900,13 +905,11 @@ public class Dispatcher {
|
|||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
// source node cannot find a pending block to move, iteration +1
|
// jump out of while-loop after the configured timeout.
|
||||||
noPendingMoveIteration++;
|
long noMoveInterval = Time.monotonicNow() - previousMoveTimestamp;
|
||||||
// in case no blocks can be moved for source node's task,
|
if (noMoveInterval > maxNoMoveInterval) {
|
||||||
// jump out of while-loop after 5 iterations.
|
LOG.info("Failed to find a pending move for " + noMoveInterval
|
||||||
if (noPendingMoveIteration >= MAX_NO_PENDING_MOVE_ITERATIONS) {
|
+ " ms. Skipping " + this);
|
||||||
LOG.info("Failed to find a pending move " + noPendingMoveIteration
|
|
||||||
+ " times. Skipping " + this);
|
|
||||||
resetScheduledSize();
|
resetScheduledSize();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -917,6 +920,9 @@ public class Dispatcher {
|
|||||||
synchronized (Dispatcher.this) {
|
synchronized (Dispatcher.this) {
|
||||||
Dispatcher.this.wait(1000); // wait for targets/sources to be idle
|
Dispatcher.this.wait(1000); // wait for targets/sources to be idle
|
||||||
}
|
}
|
||||||
|
// Didn't find a possible move in this iteration of the while loop,
|
||||||
|
// adding a small delay before choosing next move again.
|
||||||
|
Thread.sleep(100);
|
||||||
} catch (InterruptedException ignored) {
|
} catch (InterruptedException ignored) {
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -941,17 +947,18 @@ public class Dispatcher {
|
|||||||
/** Constructor called by Mover. */
|
/** Constructor called by Mover. */
|
||||||
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
int dispatcherThreads, int maxConcurrentMovesPerNode, Configuration conf) {
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||||
|
int maxNoMoveInterval, Configuration conf) {
|
||||||
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||||
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||||
0L, 0L, 0, conf);
|
0L, 0L, 0, maxNoMoveInterval, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||||
long getBlocksSize, long getBlocksMinBlockSize,
|
long getBlocksSize, long getBlocksMinBlockSize,
|
||||||
int blockMoveTimeout, Configuration conf) {
|
int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
|
||||||
this.nnc = nnc;
|
this.nnc = nnc;
|
||||||
this.excludedNodes = excludedNodes;
|
this.excludedNodes = excludedNodes;
|
||||||
this.includedNodes = includedNodes;
|
this.includedNodes = includedNodes;
|
||||||
@ -967,6 +974,7 @@ public class Dispatcher {
|
|||||||
this.getBlocksSize = getBlocksSize;
|
this.getBlocksSize = getBlocksSize;
|
||||||
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
this.getBlocksMinBlockSize = getBlocksMinBlockSize;
|
||||||
this.blockMoveTimeout = blockMoveTimeout;
|
this.blockMoveTimeout = blockMoveTimeout;
|
||||||
|
this.maxNoMoveInterval = maxNoMoveInterval;
|
||||||
|
|
||||||
this.saslClient = new SaslDataTransferClient(conf,
|
this.saslClient = new SaslDataTransferClient(conf,
|
||||||
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
DataTransferSaslUtil.getSaslPropertiesResolver(conf),
|
||||||
|
@ -130,13 +130,16 @@ public class Mover {
|
|||||||
final int maxConcurrentMovesPerNode = conf.getInt(
|
final int maxConcurrentMovesPerNode = conf.getInt(
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
|
||||||
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
|
||||||
|
final int maxNoMoveInterval = conf.getInt(
|
||||||
|
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_KEY,
|
||||||
|
DFSConfigKeys.DFS_MOVER_MAX_NO_MOVE_INTERVAL_DEFAULT);
|
||||||
this.retryMaxAttempts = conf.getInt(
|
this.retryMaxAttempts = conf.getInt(
|
||||||
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
|
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
|
||||||
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
|
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
|
||||||
this.retryCount = retryCount;
|
this.retryCount = retryCount;
|
||||||
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
|
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
|
||||||
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
|
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
|
||||||
maxConcurrentMovesPerNode, conf);
|
maxConcurrentMovesPerNode, maxNoMoveInterval, conf);
|
||||||
this.storages = new StorageMap();
|
this.storages = new StorageMap();
|
||||||
this.targetPaths = nnc.getTargetPaths();
|
this.targetPaths = nnc.getTargetPaths();
|
||||||
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
|
this.blockStoragePolicies = new BlockStoragePolicy[1 <<
|
||||||
|
@ -3251,6 +3251,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.balancer.max-no-move-interval</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>
|
||||||
|
If this specified amount of time has elapsed and no block has been moved
|
||||||
|
out of a source DataNode, on more effort will be made to move blocks out of
|
||||||
|
this DataNode in the current Balancer iteration.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.block.invalidate.limit</name>
|
<name>dfs.block.invalidate.limit</name>
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
@ -3843,6 +3853,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.mover.max-no-move-interval</name>
|
||||||
|
<value>60000</value>
|
||||||
|
<description>
|
||||||
|
If this specified amount of time has elapsed and no block has been moved
|
||||||
|
out of a source DataNode, on more effort will be made to move blocks out of
|
||||||
|
this DataNode in the current Mover iteration.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.namenode.audit.log.async</name>
|
<name>dfs.namenode.audit.log.async</name>
|
||||||
<value>false</value>
|
<value>false</value>
|
||||||
|
@ -184,6 +184,7 @@ public class TestBalancer {
|
|||||||
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MOVEDWINWIDTH_KEY, 2000L);
|
||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
|
||||||
}
|
}
|
||||||
|
|
||||||
static void initConfWithRamDisk(Configuration conf,
|
static void initConfWithRamDisk(Configuration conf,
|
||||||
@ -194,6 +195,7 @@ public class TestBalancer {
|
|||||||
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
conf.setLong(DFS_HEARTBEAT_INTERVAL_KEY, 1);
|
||||||
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
conf.setInt(DFS_NAMENODE_HEARTBEAT_RECHECK_INTERVAL_KEY, 500);
|
||||||
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
conf.setInt(DFS_DATANODE_LAZY_WRITER_INTERVAL_SEC, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY, 5*1000);
|
||||||
LazyPersistTestCase.initCacheManipulator();
|
LazyPersistTestCase.initCacheManipulator();
|
||||||
|
|
||||||
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_GETBLOCKS_MIN_BLOCK_SIZE_KEY, 1L);
|
||||||
|
Loading…
x
Reference in New Issue
Block a user