HDFS-7027. Archival Storage: Mover does not terminate when some storage type is out of space. Contributed by Tsz Wo Nicholas Sze.

This commit is contained in:
Jing Zhao 2014-09-08 14:21:26 -07:00
parent 2b5c528a73
commit 74a7e227c8
2 changed files with 22 additions and 12 deletions

View File

@ -329,7 +329,7 @@ private boolean processFile(HdfsLocatedFileStatus status) {
hasRemaining |= (diff.existing.size() > 1 && hasRemaining |= (diff.existing.size() > 1 &&
diff.expected.size() > 1); diff.expected.size() > 1);
} else { } else {
hasRemaining = true; hasRemaining = false; // not able to schedule any move
} }
} }
} }

View File

@ -280,13 +280,27 @@ private void verifyRecursively(final Path parent,
} }
} }
void verifyFile(final Path file, final Byte expectedPolicyId)
throws Exception {
final Path parent = file.getParent();
DirectoryListing children = dfs.getClient().listPaths(
parent.toString(), HdfsFileStatus.EMPTY_NAME, true);
for (HdfsFileStatus child : children.getPartialListing()) {
if (child.getLocalName().equals(file.getName())) {
verifyFile(parent, child, expectedPolicyId);
return;
}
}
Assert.fail("File " + file + " not found.");
}
private void verifyFile(final Path parent, final HdfsFileStatus status, private void verifyFile(final Path parent, final HdfsFileStatus status,
final Byte expectedPolicyId) throws Exception { final Byte expectedPolicyId) throws Exception {
HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status; HdfsLocatedFileStatus fileStatus = (HdfsLocatedFileStatus) status;
byte policyId = fileStatus.getStoragePolicy(); byte policyId = fileStatus.getStoragePolicy();
BlockStoragePolicy policy = policies.getPolicy(policyId); BlockStoragePolicy policy = policies.getPolicy(policyId);
if (expectedPolicyId != null) { if (expectedPolicyId != null) {
Assert.assertEquals(expectedPolicyId, policy); Assert.assertEquals((byte)expectedPolicyId, policy.getId());
} }
final List<StorageType> types = policy.chooseStorageTypes( final List<StorageType> types = policy.chooseStorageTypes(
status.getReplication()); status.getReplication());
@ -484,7 +498,7 @@ public void testNoSpaceDisk() throws Exception {
final PathPolicyMap pathPolicyMap = new PathPolicyMap(0); final PathPolicyMap pathPolicyMap = new PathPolicyMap(0);
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = (3 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; final long diskCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
final long archiveCapacity = 100*BLOCK_SIZE; final long archiveCapacity = 100*BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity); diskCapacity, archiveCapacity);
@ -526,10 +540,8 @@ public void testNoSpaceDisk() throws Exception {
// new replicas should be stored in ARCHIVE as a fallback storage. // new replicas should be stored in ARCHIVE as a fallback storage.
final Path file0 = new Path(pathPolicyMap.hot, "file0"); final Path file0 = new Path(pathPolicyMap.hot, "file0");
final Replication r = test.getReplication(file0); final Replication r = test.getReplication(file0);
LOG.info("XXX " + file0 + ": replication=" + r);
final short newReplication = (short)5; final short newReplication = (short)5;
test.dfs.setReplication(file0, newReplication); test.dfs.setReplication(file0, newReplication);
// DFSTestUtil.waitReplication(test.dfs, file0, newReplication);
Thread.sleep(10000); Thread.sleep(10000);
test.verifyReplication(file0, r.disk, newReplication - r.disk); test.verifyReplication(file0, r.disk, newReplication - r.disk);
} }
@ -541,17 +553,15 @@ public void testNoSpaceDisk() throws Exception {
final short newReplication = 5; final short newReplication = 5;
test.dfs.setReplication(p, newReplication); test.dfs.setReplication(p, newReplication);
// DFSTestUtil.waitReplication(test.dfs, p, newReplication);
Thread.sleep(10000); Thread.sleep(10000);
test.verifyReplication(p, 0, newReplication); test.verifyReplication(p, 0, newReplication);
} }
{ //test move a hot file to warm { //test move a hot file to warm
//TODO: fix Mover not terminate in the test below final Path file1 = new Path(pathPolicyMap.hot, "file1");
// final Path file1 = new Path(pathPolicyMap.hot, "file1"); test.dfs.rename(file1, pathPolicyMap.warm);
// test.dfs.rename(file1, pathPolicyMap.warm); test.migrate();
// test.migrate(); test.verifyFile(new Path(pathPolicyMap.warm, "file1"), WARM.getId());;
// test.verify(true);
} }
test.shutdownCluster(); test.shutdownCluster();
@ -566,7 +576,7 @@ public void testNoSpaceArchive() throws Exception {
final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme(); final NamespaceScheme nsScheme = pathPolicyMap.newNamespaceScheme();
final long diskCapacity = 100*BLOCK_SIZE; final long diskCapacity = 100*BLOCK_SIZE;
final long archiveCapacity = (2 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE; final long archiveCapacity = (10 + HdfsConstants.MIN_BLOCKS_FOR_WRITE)*BLOCK_SIZE;
final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1, final long[][] capacities = genCapacities(NUM_DATANODES, 1, 1,
diskCapacity, archiveCapacity); diskCapacity, archiveCapacity);
final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, final ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF,