HDFS-13107. Add Mover Cli Unit Tests for Federated cluster. Contributed by Bharat Viswanadham

This commit is contained in:
Tsz-Wo Nicholas Sze 2018-02-06 16:58:30 -08:00
parent b14db95c35
commit 4fadcf1631

View File

@ -51,6 +51,7 @@ import java.util.Properties;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
@ -199,6 +200,214 @@ public class TestMover {
}
}
private void setupStoragePoliciesAndPaths(DistributedFileSystem dfs1,
DistributedFileSystem dfs2,
Path dir, String file)
throws Exception {
dfs1.mkdirs(dir);
dfs2.mkdirs(dir);
//Write to DISK on nn1
dfs1.setStoragePolicy(dir, "HOT");
FSDataOutputStream out = dfs1.create(new Path(file));
out.writeChars("testScheduleWithinSameNode");
out.close();
//Write to Archive on nn2
dfs2.setStoragePolicy(dir, "COLD");
out = dfs2.create(new Path(file));
out.writeChars("testScheduleWithinSameNode");
out.close();
//verify before movement
LocatedBlock lb = dfs1.getClient().getLocatedBlocks(file, 0).get(0);
StorageType[] storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.DISK == storageType);
}
//verify before movement
lb = dfs2.getClient().getLocatedBlocks(file, 0).get(0);
storageTypes = lb.getStorageTypes();
for (StorageType storageType : storageTypes) {
Assert.assertTrue(StorageType.ARCHIVE == storageType);
}
}
private void waitForLocatedBlockWithDiskStorageType(
final DistributedFileSystem dfs, final String file,
int expectedDiskCount) throws Exception {
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
LocatedBlock lb = null;
try {
lb = dfs.getClient().getLocatedBlocks(file, 0).get(0);
} catch (IOException e) {
LOG.error("Exception while getting located blocks", e);
return false;
}
int diskCount = 0;
for (StorageType storageType : lb.getStorageTypes()) {
if (StorageType.DISK == storageType) {
diskCount++;
}
}
LOG.info("Archive replica count, expected={} and actual={}",
expectedDiskCount, diskCount);
return expectedDiskCount == diskCount;
}
}, 100, 3000);
}
@Test(timeout = 300000)
public void testWithFederateClusterWithinSameNode() throws
Exception {
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf)
.numDataNodes(4).storageTypes( new StorageType[] {StorageType.DISK,
StorageType.ARCHIVE}).nnTopology(MiniDFSNNTopology
.simpleFederatedTopology(2)).build();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
cluster.waitActive();
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = cluster.getFileSystem(0);
final DistributedFileSystem dfs2 = cluster.getFileSystem(1);
URI nn1 = dfs1.getUri();
URI nn2 = dfs2.getUri();
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
// move to ARCHIVE
dfs1.setStoragePolicy(dir, "COLD");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString()});
Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc);
//move to DISK
dfs2.setStoragePolicy(dir, "HOT");
rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
// Wait till namenode notified about the block location details
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWithFederatedCluster() throws Exception{
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf)
.storageTypes(new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE})
.nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2))
.numDataNodes(4).build();
DFSTestUtil.setFederatedConfiguration(cluster, conf);
try {
cluster.waitActive();
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = cluster.getFileSystem(0);
final DistributedFileSystem dfs2 = cluster.getFileSystem(1);
URI nn1 = dfs1.getUri();
URI nn2 = dfs2.getUri();
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
//Changing storage policies
dfs1.setStoragePolicy(dir, "COLD");
dfs2.setStoragePolicy(dir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
@Test(timeout = 300000)
public void testWithFederatedHACluster() throws Exception{
final Configuration conf = new HdfsConfiguration();
initConf(conf);
final MiniDFSCluster cluster = new MiniDFSCluster
.Builder(conf)
.storageTypes(new StorageType[]{StorageType.DISK,
StorageType.ARCHIVE})
.nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2))
.numDataNodes(4).build();
DFSTestUtil.setFederatedHAConfiguration(cluster, conf);
try {
Collection<URI> namenodes = DFSUtil.getInternalNsRpcUris(conf);
Iterator<URI> iter = namenodes.iterator();
URI nn1 = iter.next();
URI nn2 = iter.next();
cluster.transitionToActive(0);
cluster.transitionToActive(2);
final String file = "/test/file";
Path dir = new Path ("/test");
final DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem
.get(nn1, conf);
final DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem
.get(nn2, conf);
setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file);
//Changing Storage Policies
dfs1.setStoragePolicy(dir, "COLD");
dfs2.setStoragePolicy(dir, "HOT");
int rc = ToolRunner.run(conf, new Mover.Cli(),
new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()});
Assert.assertEquals("Movement to DISK should be successful", 0, rc);
waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3);
waitForLocatedBlockWithDiskStorageType(dfs2, file, 3);
} finally {
cluster.shutdown();
}
}
private void waitForLocatedBlockWithArchiveStorageType(
final DistributedFileSystem dfs, final String file,
int expectedArchiveCount) throws Exception {