HDFS-8143. Mover should exit after some retry when failed to move blocks. Contributed by surendra singh lilhore

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-05-13 11:57:49 -07:00
parent 065d8f2a34
commit cdec12d1b8
3 changed files with 65 additions and 6 deletions

View File

@ -353,6 +353,8 @@ public class DFSConfigKeys extends CommonConfigurationKeys {
public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L; public static final long DFS_MOVER_MOVEDWINWIDTH_DEFAULT = 5400*1000L;
public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads"; public static final String DFS_MOVER_MOVERTHREADS_KEY = "dfs.mover.moverThreads";
public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000; public static final int DFS_MOVER_MOVERTHREADS_DEFAULT = 1000;
public static final String DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY = "dfs.mover.retry.max.attempts";
public static final int DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT = 10;
public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address"; public static final String DFS_DATANODE_ADDRESS_KEY = "dfs.datanode.address";
public static final int DFS_DATANODE_DEFAULT_PORT = 50010; public static final int DFS_DATANODE_DEFAULT_PORT = 50010;

View File

@ -58,6 +58,7 @@
import java.net.URI; import java.net.URI;
import java.text.DateFormat; import java.text.DateFormat;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
@InterfaceAudience.Private @InterfaceAudience.Private
public class Mover { public class Mover {
@ -107,10 +108,12 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
private final Dispatcher dispatcher; private final Dispatcher dispatcher;
private final StorageMap storages; private final StorageMap storages;
private final List<Path> targetPaths; private final List<Path> targetPaths;
private final int retryMaxAttempts;
private final AtomicInteger retryCount;
private final BlockStoragePolicy[] blockStoragePolicies; private final BlockStoragePolicy[] blockStoragePolicies;
Mover(NameNodeConnector nnc, Configuration conf) { Mover(NameNodeConnector nnc, Configuration conf, AtomicInteger retryCount) {
final long movedWinWidth = conf.getLong( final long movedWinWidth = conf.getLong(
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY, DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_KEY,
DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT); DFSConfigKeys.DFS_MOVER_MOVEDWINWIDTH_DEFAULT);
@ -120,7 +123,10 @@ private List<StorageGroup> getTargetStorages(StorageType t) {
final int maxConcurrentMovesPerNode = conf.getInt( final int maxConcurrentMovesPerNode = conf.getInt(
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY, DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_KEY,
DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT); DFSConfigKeys.DFS_DATANODE_BALANCE_MAX_NUM_CONCURRENT_MOVES_DEFAULT);
this.retryMaxAttempts = conf.getInt(
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY,
DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_DEFAULT);
this.retryCount = retryCount;
this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(), this.dispatcher = new Dispatcher(nnc, Collections.<String> emptySet(),
Collections.<String> emptySet(), movedWinWidth, moverThreads, 0, Collections.<String> emptySet(), movedWinWidth, moverThreads, 0,
maxConcurrentMovesPerNode, conf); maxConcurrentMovesPerNode, conf);
@ -255,14 +261,27 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* @return whether there is still remaining migration work for the next * @return whether there is still remaining migration work for the next
* round * round
*/ */
private boolean processNamespace() { private boolean processNamespace() throws IOException {
getSnapshottableDirs(); getSnapshottableDirs();
boolean hasRemaining = false; boolean hasRemaining = false;
for (Path target : targetPaths) { for (Path target : targetPaths) {
hasRemaining |= processPath(target.toUri().getPath()); hasRemaining |= processPath(target.toUri().getPath());
} }
// wait for pending move to finish and retry the failed migration // wait for pending move to finish and retry the failed migration
hasRemaining |= Dispatcher.waitForMoveCompletion(storages.targets.values()); boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
.values());
if (hasFailed) {
if (retryCount.get() == retryMaxAttempts) {
throw new IOException("Failed to move some block's after "
+ retryMaxAttempts + " retries.");
} else {
retryCount.incrementAndGet();
}
} else {
// Reset retry count if no failure.
retryCount.set(0);
}
hasRemaining |= hasFailed;
return hasRemaining; return hasRemaining;
} }
@ -528,6 +547,7 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 + DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_DEFAULT) * 2000 +
conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY, conf.getLong(DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_KEY,
DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000; DFSConfigKeys.DFS_NAMENODE_REPLICATION_INTERVAL_DEFAULT) * 1000;
AtomicInteger retryCount = new AtomicInteger(0);
LOG.info("namenodes = " + namenodes); LOG.info("namenodes = " + namenodes);
List<NameNodeConnector> connectors = Collections.emptyList(); List<NameNodeConnector> connectors = Collections.emptyList();
@ -541,7 +561,7 @@ static int run(Map<URI, List<Path>> namenodes, Configuration conf)
Iterator<NameNodeConnector> iter = connectors.iterator(); Iterator<NameNodeConnector> iter = connectors.iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
NameNodeConnector nnc = iter.next(); NameNodeConnector nnc = iter.next();
final Mover m = new Mover(nnc, conf); final Mover m = new Mover(nnc, conf, retryCount);
final ExitStatus r = m.run(); final ExitStatus r = m.run();
if (r == ExitStatus.SUCCESS) { if (r == ExitStatus.SUCCESS) {

View File

@ -20,12 +20,14 @@
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.*; import java.util.*;
import java.util.concurrent.atomic.AtomicInteger;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType; import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DFSUtil;
@ -34,6 +36,7 @@
import org.apache.hadoop.hdfs.MiniDFSNNTopology; import org.apache.hadoop.hdfs.MiniDFSNNTopology;
import org.apache.hadoop.hdfs.protocol.LocatedBlock; import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock; import org.apache.hadoop.hdfs.server.balancer.Dispatcher.DBlock;
import org.apache.hadoop.hdfs.server.balancer.ExitStatus;
import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector; import org.apache.hadoop.hdfs.server.balancer.NameNodeConnector;
import org.apache.hadoop.hdfs.server.mover.Mover.MLocation; import org.apache.hadoop.hdfs.server.mover.Mover.MLocation;
import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil; import org.apache.hadoop.hdfs.server.namenode.ha.HATestUtil;
@ -54,7 +57,7 @@ static Mover newMover(Configuration conf) throws IOException {
final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors( final List<NameNodeConnector> nncs = NameNodeConnector.newNameNodeConnectors(
nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf, nnMap, Mover.class.getSimpleName(), Mover.MOVER_ID_PATH, conf,
NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS); NameNodeConnector.DEFAULT_MAX_IDLE_ITERATIONS);
return new Mover(nncs.get(0), conf); return new Mover(nncs.get(0), conf, new AtomicInteger(0));
} }
@Test @Test
@ -324,4 +327,38 @@ public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
cluster.shutdown(); cluster.shutdown();
} }
} }
@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
conf.set(DFSConfigKeys.DFS_MOVER_RETRY_MAX_ATTEMPTS_KEY, "2");
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] {{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE},
{StorageType.DISK, StorageType.ARCHIVE}}).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoverFailedRetry";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file), (short) 2);
out.writeChars("testMoverFailedRetry");
out.close();
// Delete block file so, block move will fail with FileNotFoundException
LocatedBlock lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
cluster.corruptBlockOnDataNodesByDeletingBlockFile(lb.getBlock());
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", file.toString()});
Assert.assertEquals("Movement should fail after some retry",
ExitStatus.IO_EXCEPTION.getExitCode(), rc);
} finally {
cluster.shutdown();
}
}
} }