HDFS-13174. hdfs mover -p /path times out after 20 min. Contributed by Istvan Fajth.
This commit is contained in:
parent
eebeb6033f
commit
c966a3837a
@ -582,6 +582,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
|
|||||||
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
public static final int DFS_BALANCER_BLOCK_MOVE_TIMEOUT_DEFAULT = 0;
|
||||||
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
|
public static final String DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY = "dfs.balancer.max-no-move-interval";
|
||||||
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
public static final int DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT = 60*1000; // One minute
|
||||||
|
public static final String DFS_BALANCER_MAX_ITERATION_TIME_KEY = "dfs.balancer.max-iteration-time";
|
||||||
|
public static final long DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT = 20 * 60 * 1000L; // 20 mins
|
||||||
|
|
||||||
|
|
||||||
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
public static final String DFS_MOVER_MOVEDWINWIDTH_KEY = "dfs.mover.movedWinWidth";
|
||||||
|
@ -289,13 +289,17 @@ static int getInt(Configuration conf, String key, int defaultValue) {
|
|||||||
final int maxNoMoveInterval = conf.getInt(
|
final int maxNoMoveInterval = conf.getInt(
|
||||||
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
|
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_KEY,
|
||||||
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
|
DFSConfigKeys.DFS_BALANCER_MAX_NO_MOVE_INTERVAL_DEFAULT);
|
||||||
|
final long maxIterationTime = conf.getLong(
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY,
|
||||||
|
DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_DEFAULT);
|
||||||
|
|
||||||
this.nnc = theblockpool;
|
this.nnc = theblockpool;
|
||||||
this.dispatcher =
|
this.dispatcher =
|
||||||
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
new Dispatcher(theblockpool, p.getIncludedNodes(),
|
||||||
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
p.getExcludedNodes(), movedWinWidth, moverThreads,
|
||||||
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
dispatcherThreads, maxConcurrentMovesPerNode, getBlocksSize,
|
||||||
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval, conf);
|
getBlocksMinBlockSize, blockMoveTimeout, maxNoMoveInterval,
|
||||||
|
maxIterationTime, conf);
|
||||||
this.threshold = p.getThreshold();
|
this.threshold = p.getThreshold();
|
||||||
this.policy = p.getBalancingPolicy();
|
this.policy = p.getBalancingPolicy();
|
||||||
this.sourceNodes = p.getSourceNodes();
|
this.sourceNodes = p.getSourceNodes();
|
||||||
|
@ -138,6 +138,8 @@ public class Dispatcher {
|
|||||||
private final boolean connectToDnViaHostname;
|
private final boolean connectToDnViaHostname;
|
||||||
private BlockPlacementPolicies placementPolicies;
|
private BlockPlacementPolicies placementPolicies;
|
||||||
|
|
||||||
|
private long maxIterationTime;
|
||||||
|
|
||||||
static class Allocator {
|
static class Allocator {
|
||||||
private final int max;
|
private final int max;
|
||||||
private int count = 0;
|
private int count = 0;
|
||||||
@ -346,13 +348,19 @@ private boolean addTo(StorageGroup g) {
|
|||||||
|
|
||||||
/** Dispatch the move to the proxy source & wait for the response. */
|
/** Dispatch the move to the proxy source & wait for the response. */
|
||||||
private void dispatch() {
|
private void dispatch() {
|
||||||
LOG.info("Start moving " + this);
|
|
||||||
assert !(reportedBlock instanceof DBlockStriped);
|
|
||||||
|
|
||||||
Socket sock = new Socket();
|
Socket sock = new Socket();
|
||||||
DataOutputStream out = null;
|
DataOutputStream out = null;
|
||||||
DataInputStream in = null;
|
DataInputStream in = null;
|
||||||
try {
|
try {
|
||||||
|
if (source.isIterationOver()){
|
||||||
|
LOG.info("Cancel moving " + this +
|
||||||
|
" as iteration is already cancelled due to" +
|
||||||
|
" dfs.balancer.max-iteration-time is passed.");
|
||||||
|
throw new IOException("Block move cancelled.");
|
||||||
|
}
|
||||||
|
LOG.info("Start moving " + this);
|
||||||
|
assert !(reportedBlock instanceof DBlockStriped);
|
||||||
|
|
||||||
sock.connect(
|
sock.connect(
|
||||||
NetUtils.createSocketAddr(target.getDatanodeInfo().
|
NetUtils.createSocketAddr(target.getDatanodeInfo().
|
||||||
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
getXferAddr(Dispatcher.this.connectToDnViaHostname)),
|
||||||
@ -760,7 +768,10 @@ private Source(StorageType storageType, long maxSize2Move, DDatanode dn) {
|
|||||||
* Check if the iteration is over
|
* Check if the iteration is over
|
||||||
*/
|
*/
|
||||||
public boolean isIterationOver() {
|
public boolean isIterationOver() {
|
||||||
return (Time.monotonicNow()-startTime > MAX_ITERATION_TIME);
|
if (maxIterationTime < 0){
|
||||||
|
return false;
|
||||||
|
}
|
||||||
|
return (Time.monotonicNow()-startTime > maxIterationTime);
|
||||||
}
|
}
|
||||||
|
|
||||||
/** Add a task */
|
/** Add a task */
|
||||||
@ -908,8 +919,6 @@ private boolean shouldFetchMoreBlocks() {
|
|||||||
return blocksToReceive > 0;
|
return blocksToReceive > 0;
|
||||||
}
|
}
|
||||||
|
|
||||||
private static final long MAX_ITERATION_TIME = 20 * 60 * 1000L; // 20 mins
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* This method iteratively does the following: it first selects a block to
|
* This method iteratively does the following: it first selects a block to
|
||||||
* move, then sends a request to the proxy source to start the block move
|
* move, then sends a request to the proxy source to start the block move
|
||||||
@ -990,7 +999,7 @@ private void dispatchBlocks(long delay) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
if (isIterationOver()) {
|
if (isIterationOver()) {
|
||||||
LOG.info("The maximum iteration time (" + MAX_ITERATION_TIME/1000
|
LOG.info("The maximum iteration time (" + maxIterationTime/1000
|
||||||
+ " seconds) has been reached. Stopping " + this);
|
+ " seconds) has been reached. Stopping " + this);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -1013,14 +1022,14 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|||||||
int maxNoMoveInterval, Configuration conf) {
|
int maxNoMoveInterval, Configuration conf) {
|
||||||
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
this(nnc, includedNodes, excludedNodes, movedWinWidth,
|
||||||
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
moverThreads, dispatcherThreads, maxConcurrentMovesPerNode,
|
||||||
0L, 0L, 0, maxNoMoveInterval, conf);
|
0L, 0L, 0, maxNoMoveInterval, -1, conf);
|
||||||
}
|
}
|
||||||
|
|
||||||
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
||||||
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
Set<String> excludedNodes, long movedWinWidth, int moverThreads,
|
||||||
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
int dispatcherThreads, int maxConcurrentMovesPerNode,
|
||||||
long getBlocksSize, long getBlocksMinBlockSize,
|
long getBlocksSize, long getBlocksMinBlockSize, int blockMoveTimeout,
|
||||||
int blockMoveTimeout, int maxNoMoveInterval, Configuration conf) {
|
int maxNoMoveInterval, long maxIterationTime, Configuration conf) {
|
||||||
this.nnc = nnc;
|
this.nnc = nnc;
|
||||||
this.excludedNodes = excludedNodes;
|
this.excludedNodes = excludedNodes;
|
||||||
this.includedNodes = includedNodes;
|
this.includedNodes = includedNodes;
|
||||||
@ -1047,6 +1056,7 @@ public Dispatcher(NameNodeConnector nnc, Set<String> includedNodes,
|
|||||||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
|
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME,
|
||||||
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
HdfsClientConfigKeys.DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
|
||||||
placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null);
|
placementPolicies = new BlockPlacementPolicies(conf, null, cluster, null);
|
||||||
|
this.maxIterationTime = maxIterationTime;
|
||||||
}
|
}
|
||||||
|
|
||||||
public DistributedFileSystem getDistributedFileSystem() {
|
public DistributedFileSystem getDistributedFileSystem() {
|
||||||
|
@ -3540,6 +3540,16 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>dfs.balancer.max-iteration-time</name>
|
||||||
|
<value>1200000</value>
|
||||||
|
<description>
|
||||||
|
Maximum amount of time while an iteration can be run by the Balancer. After
|
||||||
|
this time the Balancer will stop the iteration, and reevaluate the work
|
||||||
|
needs to be done to Balance the cluster. The default value is 20 minutes.
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>dfs.block.invalidate.limit</name>
|
<name>dfs.block.invalidate.limit</name>
|
||||||
<value>1000</value>
|
<value>1000</value>
|
||||||
|
@ -1580,6 +1580,85 @@ public void testBalancerCliWithIncludeListWithPortsInAFile() throws Exception {
|
|||||||
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
CAPACITY, RACK2, new PortNumberBasedNodes(3, 0, 1), true, true);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test(timeout = 100000)
|
||||||
|
public void testMaxIterationTime() throws Exception {
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
int blockSize = 10*1024*1024; // 10MB block size
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||||
|
// limit the worker thread count of Balancer to have only 1 queue per DN
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_BALANCER_MOVERTHREADS_KEY, 1);
|
||||||
|
// limit the bandwitdh to 1 packet per sec to emulate slow block moves
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_DATANODE_BALANCE_BANDWIDTHPERSEC_KEY,
|
||||||
|
64 * 1024);
|
||||||
|
// set client socket timeout to have an IN_PROGRESS notification back from
|
||||||
|
// the DataNode about the copy in every second.
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 2000L);
|
||||||
|
// set max iteration time to 2 seconds to timeout before moving any block
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 2000L);
|
||||||
|
// setup the cluster
|
||||||
|
final long capacity = 10L * blockSize;
|
||||||
|
final long[] dnCapacities = new long[] {capacity, capacity};
|
||||||
|
final short rep = 1;
|
||||||
|
final long seed = 0xFAFAFA;
|
||||||
|
cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(0)
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.getConfiguration(0).setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_REPLICATION_KEY, 1);
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
||||||
|
cluster.waitClusterUp();
|
||||||
|
cluster.waitActive();
|
||||||
|
final Path path = new Path("/testMaxIterationTime.dat");
|
||||||
|
DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
// fill the DN to 40%
|
||||||
|
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep, seed);
|
||||||
|
// start a new DN
|
||||||
|
cluster.startDataNodes(conf, 1, true, null, null, dnCapacities);
|
||||||
|
cluster.triggerHeartbeats();
|
||||||
|
// setup Balancer and run one iteration
|
||||||
|
List<NameNodeConnector> connectors = Collections.emptyList();
|
||||||
|
try {
|
||||||
|
BalancerParameters bParams = BalancerParameters.DEFAULT;
|
||||||
|
connectors = NameNodeConnector.newNameNodeConnectors(
|
||||||
|
DFSUtil.getInternalNsRpcUris(conf), Balancer.class.getSimpleName(),
|
||||||
|
Balancer.BALANCER_ID_PATH, conf, bParams.getMaxIdleIteration());
|
||||||
|
for (NameNodeConnector nnc : connectors) {
|
||||||
|
LOG.info("NNC to work on: " + nnc);
|
||||||
|
Balancer b = new Balancer(nnc, bParams, conf);
|
||||||
|
long startTime = Time.monotonicNow();
|
||||||
|
Result r = b.runOneIteration();
|
||||||
|
long runtime = Time.monotonicNow() - startTime;
|
||||||
|
assertEquals("We expect ExitStatus.IN_PROGRESS to be reported.",
|
||||||
|
ExitStatus.IN_PROGRESS, r.exitStatus);
|
||||||
|
// accept runtime if it is under 3.5 seconds, as we need to wait for
|
||||||
|
// IN_PROGRESS report from DN, and some spare to be able to finish.
|
||||||
|
// NOTE: This can be a source of flaky tests, if the box is busy,
|
||||||
|
// assertion here is based on the following: Balancer is already set
|
||||||
|
// up, iteration gets the blocks from the NN, and makes the decision
|
||||||
|
// to move 2 blocks. After that the PendingMoves are scheduled, and
|
||||||
|
// DataNode heartbeats in for the Balancer every second, iteration is
|
||||||
|
// two seconds long. This means that it will fail if the setup and the
|
||||||
|
// heartbeat from the DataNode takes more than 500ms, as the iteration
|
||||||
|
// should end at the 3rd second from start. As the number of
|
||||||
|
// operations seems to be pretty low, and all comm happens locally, I
|
||||||
|
// think the possibility of a failure due to node busyness is low.
|
||||||
|
assertTrue("Unexpected iteration runtime: " + runtime + "ms > 3.5s",
|
||||||
|
runtime < 3500);
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
for (NameNodeConnector nnc : connectors) {
|
||||||
|
IOUtils.cleanupWithLogger(null, nnc);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown(true, true);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/*
|
/*
|
||||||
* Test Balancer with Ram_Disk configured
|
* Test Balancer with Ram_Disk configured
|
||||||
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
|
* One DN has two files on RAM_DISK, other DN has no files on RAM_DISK.
|
||||||
|
@ -685,6 +685,52 @@ public void testMoverFailedRetry() throws Exception {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test(timeout=100000)
|
||||||
|
public void testBalancerMaxIterationTimeNotAffectMover() throws Exception {
|
||||||
|
long blockSize = 10*1024*1024;
|
||||||
|
final Configuration conf = new HdfsConfiguration();
|
||||||
|
initConf(conf);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_MOVER_MOVERTHREADS_KEY, 1);
|
||||||
|
conf.setInt(
|
||||||
|
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, 1);
|
||||||
|
// set a fairly large block size to run into the limitation
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BLOCK_SIZE_KEY, blockSize);
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BYTES_PER_CHECKSUM_KEY, blockSize);
|
||||||
|
// set a somewhat grater than zero max iteration time to have the move time
|
||||||
|
// to surely exceed it
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_BALANCER_MAX_ITERATION_TIME_KEY, 200L);
|
||||||
|
conf.setInt(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, 1);
|
||||||
|
// set client socket timeout to have an IN_PROGRESS notification back from
|
||||||
|
// the DataNode about the copy in every second.
|
||||||
|
conf.setLong(DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY, 1000L);
|
||||||
|
|
||||||
|
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
|
||||||
|
.numDataNodes(2)
|
||||||
|
.storageTypes(
|
||||||
|
new StorageType[][] {{StorageType.DISK, StorageType.DISK},
|
||||||
|
{StorageType.ARCHIVE, StorageType.ARCHIVE}})
|
||||||
|
.build();
|
||||||
|
try {
|
||||||
|
cluster.waitActive();
|
||||||
|
final DistributedFileSystem fs = cluster.getFileSystem();
|
||||||
|
final String file = "/testMaxIterationTime.dat";
|
||||||
|
final Path path = new Path(file);
|
||||||
|
short rep_factor = 1;
|
||||||
|
int seed = 0xFAFAFA;
|
||||||
|
// write to DISK
|
||||||
|
DFSTestUtil.createFile(fs, path, 4L * blockSize, rep_factor, seed);
|
||||||
|
|
||||||
|
// move to ARCHIVE
|
||||||
|
fs.setStoragePolicy(new Path(file), "COLD");
|
||||||
|
int rc = ToolRunner.run(conf, new Mover.Cli(),
|
||||||
|
new String[] {"-p", file});
|
||||||
|
Assert.assertEquals("Retcode expected to be ExitStatus.SUCCESS (0).",
|
||||||
|
ExitStatus.SUCCESS.getExitCode(), rc);
|
||||||
|
} finally {
|
||||||
|
cluster.shutdown();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private final ErasureCodingPolicy ecPolicy =
|
private final ErasureCodingPolicy ecPolicy =
|
||||||
StripedFileTestUtil.getDefaultECPolicy();
|
StripedFileTestUtil.getDefaultECPolicy();
|
||||||
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
private final int dataBlocks = ecPolicy.getNumDataUnits();
|
||||||
|
Loading…
Reference in New Issue
Block a user