HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be moved. Contributed by surendra singh lilhore

This commit is contained in:
Tsz-Wo Nicholas Sze 2015-06-15 16:26:53 -07:00
parent 2cb09e98e3
commit 321940cf19
4 changed files with 107 additions and 38 deletions

View File

@ -917,6 +917,9 @@ Release 2.7.1 - UNRELEASED
HDFS-8521. Add VisibleForTesting annotation to
BlockPoolSlice#selectReplicaToDelete. (cmccabe)
HDFS-8540. Mover should exit with NO_MOVE_BLOCK if no block can be moved.
(surendra singh lilhore via szetszwo)
OPTIMIZATIONS
BUG FIXES

View File

@ -27,7 +27,6 @@
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.BlockStoragePolicySpi;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StorageType;
import org.apache.hadoop.hdfs.HdfsConfiguration;
@ -163,8 +162,7 @@ private void initStoragePolicies() throws IOException {
private ExitStatus run() {
try {
init();
boolean hasRemaining = new Processor().processNamespace();
return hasRemaining ? ExitStatus.IN_PROGRESS : ExitStatus.SUCCESS;
return new Processor().processNamespace().getExitStatus();
} catch (IllegalArgumentException e) {
System.out.println(e + ". Exiting ...");
return ExitStatus.ILLEGAL_ARGUMENTS;
@ -262,11 +260,11 @@ private boolean isSnapshotPathInCurrent(String path) throws IOException {
* @return whether there is still remaining migration work for the next
* round
*/
private boolean processNamespace() throws IOException {
private Result processNamespace() throws IOException {
getSnapshottableDirs();
boolean hasRemaining = false;
Result result = new Result();
for (Path target : targetPaths) {
hasRemaining |= processPath(target.toUri().getPath());
processPath(target.toUri().getPath(), result);
}
// wait for pending move to finish and retry the failed migration
boolean hasFailed = Dispatcher.waitForMoveCompletion(storages.targets
@ -282,16 +280,15 @@ private boolean processNamespace() throws IOException {
// Reset retry count if no failure.
retryCount.set(0);
}
hasRemaining |= hasFailed;
return hasRemaining;
result.updateHasRemaining(hasFailed);
return result;
}
/**
* @return whether there is still remaing migration work for the next
* round
*/
private boolean processPath(String fullPath) {
boolean hasRemaining = false;
private void processPath(String fullPath, Result result) {
for (byte[] lastReturnedName = HdfsFileStatus.EMPTY_NAME;;) {
final DirectoryListing children;
try {
@ -299,73 +296,71 @@ private boolean processPath(String fullPath) {
} catch(IOException e) {
LOG.warn("Failed to list directory " + fullPath
+ ". Ignore the directory and continue.", e);
return hasRemaining;
return;
}
if (children == null) {
return hasRemaining;
return;
}
for (HdfsFileStatus child : children.getPartialListing()) {
hasRemaining |= processRecursively(fullPath, child);
processRecursively(fullPath, child, result);
}
if (children.hasMore()) {
lastReturnedName = children.getLastName();
} else {
return hasRemaining;
return;
}
}
}
/** @return whether the migration requires next round */
private boolean processRecursively(String parent, HdfsFileStatus status) {
private void processRecursively(String parent, HdfsFileStatus status,
Result result) {
String fullPath = status.getFullName(parent);
boolean hasRemaining = false;
if (status.isDir()) {
if (!fullPath.endsWith(Path.SEPARATOR)) {
fullPath = fullPath + Path.SEPARATOR;
}
hasRemaining = processPath(fullPath);
processPath(fullPath, result);
// process snapshots if this is a snapshottable directory
if (snapshottableDirs.contains(fullPath)) {
final String dirSnapshot = fullPath + HdfsConstants.DOT_SNAPSHOT_DIR;
hasRemaining |= processPath(dirSnapshot);
processPath(dirSnapshot, result);
}
} else if (!status.isSymlink()) { // file
try {
if (!isSnapshotPathInCurrent(fullPath)) {
// the full path is a snapshot path but it is also included in the
// current directory tree, thus ignore it.
hasRemaining = processFile(fullPath, (HdfsLocatedFileStatus)status);
processFile(fullPath, (HdfsLocatedFileStatus) status, result);
}
} catch (IOException e) {
LOG.warn("Failed to check the status of " + parent
+ ". Ignore it and continue.", e);
return false;
}
}
return hasRemaining;
}
/** @return true if it is necessary to run another round of migration */
private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
private void processFile(String fullPath, HdfsLocatedFileStatus status,
Result result) {
final byte policyId = status.getStoragePolicy();
// currently we ignore files with unspecified storage policy
if (policyId == HdfsConstants.BLOCK_STORAGE_POLICY_ID_UNSPECIFIED) {
return false;
return;
}
final BlockStoragePolicy policy = blockStoragePolicies[policyId];
if (policy == null) {
LOG.warn("Failed to get the storage policy of file " + fullPath);
return false;
return;
}
final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication());
final LocatedBlocks locatedBlocks = status.getBlockLocations();
boolean hasRemaining = false;
final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete();
List<LocatedBlock> lbs = locatedBlocks.getLocatedBlocks();
for(int i = 0; i < lbs.size(); i++) {
for (int i = 0; i < lbs.size(); i++) {
if (i == lbs.size() - 1 && !lastBlkComplete) {
// last block is incomplete, skip it
continue;
@ -375,12 +370,15 @@ private boolean processFile(String fullPath, HdfsLocatedFileStatus status) {
lb.getStorageTypes());
if (!diff.removeOverlap(true)) {
if (scheduleMoves4Block(diff, lb)) {
hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1);
result.updateHasRemaining(diff.existing.size() > 1
&& diff.expected.size() > 1);
// One block scheduled successfully, set noBlockMoved to false
result.setNoBlockMoved(false);
} else {
result.updateHasRemaining(true);
}
}
}
return hasRemaining;
}
boolean scheduleMoves4Block(StorageTypeDiff diff, LocatedBlock lb) {
@ -711,6 +709,45 @@ public int run(String[] args) throws Exception {
}
}
private static class Result {
private boolean hasRemaining;
private boolean noBlockMoved;
Result() {
hasRemaining = false;
noBlockMoved = true;
}
boolean isHasRemaining() {
return hasRemaining;
}
boolean isNoBlockMoved() {
return noBlockMoved;
}
void updateHasRemaining(boolean hasRemaining) {
this.hasRemaining |= hasRemaining;
}
void setNoBlockMoved(boolean noBlockMoved) {
this.noBlockMoved = noBlockMoved;
}
/**
* @return SUCCESS if all moves are success and there is no remaining move.
* Return NO_MOVE_BLOCK if there moves available but all the moves
* cannot be scheduled. Otherwise, return IN_PROGRESS since there
* must be some remaining moves.
*/
ExitStatus getExitStatus() {
return !isHasRemaining() ? ExitStatus.SUCCESS
: isNoBlockMoved() ? ExitStatus.NO_MOVE_BLOCK
: ExitStatus.IN_PROGRESS;
}
}
/**
* Run a Mover in command line.
*

View File

@ -328,6 +328,35 @@ public void testTwoReplicaSameStorageTypeShouldNotSelect() throws Exception {
}
}
@Test(timeout = 300000)
public void testMoveWhenStoragePolicyNotSatisfying() throws Exception {
// HDFS-8147
final Configuration conf = new HdfsConfiguration();
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(3)
.storageTypes(
new StorageType[][] { { StorageType.DISK }, { StorageType.DISK },
{ StorageType.DISK } }).build();
try {
cluster.waitActive();
final DistributedFileSystem dfs = cluster.getFileSystem();
final String file = "/testMoveWhenStoragePolicyNotSatisfying";
// write to DISK
final FSDataOutputStream out = dfs.create(new Path(file));
out.writeChars("testMoveWhenStoragePolicyNotSatisfying");
out.close();
// move to ARCHIVE
dfs.setStoragePolicy(new Path(file), "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] { "-p", file.toString() });
int exitcode = ExitStatus.NO_MOVE_BLOCK.getExitCode();
Assert.assertEquals("Exit code should be " + exitcode, exitcode, rc);
} finally {
cluster.shutdown();
}
}
@Test
public void testMoverFailedRetry() throws Exception {
// HDFS-8147

View File

@ -219,7 +219,7 @@ private void runBasicTest(boolean shutdown) throws Exception {
verify(true);
setStoragePolicy();
migrate();
migrate(ExitStatus.SUCCESS);
verify(true);
} finally {
if (shutdown) {
@ -250,8 +250,8 @@ void setStoragePolicy() throws Exception {
/**
* Run the migration tool.
*/
void migrate() throws Exception {
runMover();
void migrate(ExitStatus expectedExitCode) throws Exception {
runMover(expectedExitCode);
Thread.sleep(5000); // let the NN finish deletion
}
@ -267,14 +267,14 @@ void verify(boolean verifyAll) throws Exception {
}
}
private void runMover() throws Exception {
private void runMover(ExitStatus expectedExitCode) throws Exception {
Collection<URI> namenodes = DFSUtil.getNsServiceRpcUris(conf);
Map<URI, List<Path>> nnMap = Maps.newHashMap();
for (URI nn : namenodes) {
nnMap.put(nn, null);
}
int result = Mover.run(nnMap, conf);
Assert.assertEquals(ExitStatus.SUCCESS.getExitCode(), result);
Assert.assertEquals(expectedExitCode.getExitCode(), result);
}
private void verifyNamespace() throws Exception {
@ -555,7 +555,7 @@ public void testMigrateOpenFileToArchival() throws Exception {
try {
banner("start data migration");
test.setStoragePolicy(); // set /foo to COLD
test.migrate();
test.migrate(ExitStatus.SUCCESS);
// make sure the under construction block has not been migrated
LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks(
@ -605,7 +605,7 @@ public void testHotWarmColdDirs() throws Exception {
try {
test.runBasicTest(false);
pathPolicyMap.moveAround(test.dfs);
test.migrate();
test.migrate(ExitStatus.SUCCESS);
test.verify(true);
} finally {
@ -695,7 +695,7 @@ public void testNoSpaceDisk() throws Exception {
//test move a hot file to warm
final Path file1 = new Path(pathPolicyMap.hot, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.migrate(ExitStatus.NO_MOVE_BLOCK);
test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());
} finally {
test.shutdownCluster();
@ -753,7 +753,7 @@ public void testNoSpaceArchive() throws Exception {
{ //test move a cold file to warm
final Path file1 = new Path(pathPolicyMap.cold, "file1");
test.dfs.rename(file1, pathPolicyMap.warm);
test.migrate();
test.migrate(ExitStatus.SUCCESS);
test.verify(true);
}
} finally {