diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java index 461da07c7e..36e7bb9840 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestMover.java @@ -51,6 +51,7 @@ import java.util.concurrent.atomic.AtomicInteger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; @@ -199,6 +200,214 @@ private void testWithinSameNode(Configuration conf) throws Exception { } } + private void setupStoragePoliciesAndPaths(DistributedFileSystem dfs1, + DistributedFileSystem dfs2, + Path dir, String file) + throws Exception { + + dfs1.mkdirs(dir); + dfs2.mkdirs(dir); + + //Write to DISK on nn1 + dfs1.setStoragePolicy(dir, "HOT"); + FSDataOutputStream out = dfs1.create(new Path(file)); + out.writeChars("testScheduleWithinSameNode"); + out.close(); + + //Write to Archive on nn2 + dfs2.setStoragePolicy(dir, "COLD"); + out = dfs2.create(new Path(file)); + out.writeChars("testScheduleWithinSameNode"); + out.close(); + + //verify before movement + LocatedBlock lb = dfs1.getClient().getLocatedBlocks(file, 0).get(0); + StorageType[] storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.DISK == storageType); + } + + //verify before movement + lb = dfs2.getClient().getLocatedBlocks(file, 0).get(0); + storageTypes = lb.getStorageTypes(); + for (StorageType storageType : storageTypes) { + Assert.assertTrue(StorageType.ARCHIVE == storageType); + } + } + + private void waitForLocatedBlockWithDiskStorageType( + final DistributedFileSystem dfs, final String file, + int expectedDiskCount) throws Exception { + GenericTestUtils.waitFor(new Supplier() { + @Override + public Boolean get() { + LocatedBlock lb = null; + try { + lb = dfs.getClient().getLocatedBlocks(file, 0).get(0); + } catch (IOException e) { + LOG.error("Exception while getting located blocks", e); + return false; + } + int diskCount = 0; + for (StorageType storageType : lb.getStorageTypes()) { + if (StorageType.DISK == storageType) { + diskCount++; + } + } + LOG.info("Archive replica count, expected={} and actual={}", + expectedDiskCount, diskCount); + return expectedDiskCount == diskCount; + } + }, 100, 3000); + } + + @Test(timeout = 300000) + public void testWithFederateClusterWithinSameNode() throws + Exception { + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + + final MiniDFSCluster cluster = new MiniDFSCluster.Builder(conf) + .numDataNodes(4).storageTypes( new StorageType[] {StorageType.DISK, + StorageType.ARCHIVE}).nnTopology(MiniDFSNNTopology + .simpleFederatedTopology(2)).build(); + DFSTestUtil.setFederatedConfiguration(cluster, conf); + + try { + cluster.waitActive(); + + final String file = "/test/file"; + Path dir = new Path ("/test"); + + final DistributedFileSystem dfs1 = cluster.getFileSystem(0); + final DistributedFileSystem dfs2 = cluster.getFileSystem(1); + + URI nn1 = dfs1.getUri(); + URI nn2 = dfs2.getUri(); + + setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file); + + + // move to ARCHIVE + dfs1.setStoragePolicy(dir, "COLD"); + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", nn1 + dir.toString()}); + Assert.assertEquals("Movement to ARCHIVE should be successful", 0, rc); + + + //move to DISK + dfs2.setStoragePolicy(dir, "HOT"); + rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", nn2 + dir.toString()}); + Assert.assertEquals("Movement to DISK should be successful", 0, rc); + + + // Wait till namenode notified about the block location details + waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3); + waitForLocatedBlockWithDiskStorageType(dfs2, file, 3); + + } finally { + cluster.shutdown(); + } + } + + @Test(timeout = 300000) + public void testWithFederatedCluster() throws Exception{ + + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf) + .storageTypes(new StorageType[]{StorageType.DISK, + StorageType.ARCHIVE}) + .nnTopology(MiniDFSNNTopology.simpleFederatedTopology(2)) + .numDataNodes(4).build(); + DFSTestUtil.setFederatedConfiguration(cluster, conf); + try { + cluster.waitActive(); + + final String file = "/test/file"; + Path dir = new Path ("/test"); + + final DistributedFileSystem dfs1 = cluster.getFileSystem(0); + final DistributedFileSystem dfs2 = cluster.getFileSystem(1); + + URI nn1 = dfs1.getUri(); + URI nn2 = dfs2.getUri(); + + setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file); + + //Changing storage policies + dfs1.setStoragePolicy(dir, "COLD"); + dfs2.setStoragePolicy(dir, "HOT"); + + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()}); + + Assert.assertEquals("Movement to DISK should be successful", 0, rc); + + waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3); + waitForLocatedBlockWithDiskStorageType(dfs2, file, 3); + + } finally { + cluster.shutdown(); + } + + } + + @Test(timeout = 300000) + public void testWithFederatedHACluster() throws Exception{ + + final Configuration conf = new HdfsConfiguration(); + initConf(conf); + final MiniDFSCluster cluster = new MiniDFSCluster + .Builder(conf) + .storageTypes(new StorageType[]{StorageType.DISK, + StorageType.ARCHIVE}) + .nnTopology(MiniDFSNNTopology.simpleHAFederatedTopology(2)) + .numDataNodes(4).build(); + DFSTestUtil.setFederatedHAConfiguration(cluster, conf); + + + try { + + Collection namenodes = DFSUtil.getInternalNsRpcUris(conf); + + Iterator iter = namenodes.iterator(); + URI nn1 = iter.next(); + URI nn2 = iter.next(); + + cluster.transitionToActive(0); + cluster.transitionToActive(2); + + final String file = "/test/file"; + Path dir = new Path ("/test"); + + final DistributedFileSystem dfs1 = (DistributedFileSystem) FileSystem + .get(nn1, conf); + + final DistributedFileSystem dfs2 = (DistributedFileSystem) FileSystem + .get(nn2, conf); + + setupStoragePoliciesAndPaths(dfs1, dfs2, dir, file); + + //Changing Storage Policies + dfs1.setStoragePolicy(dir, "COLD"); + dfs2.setStoragePolicy(dir, "HOT"); + + int rc = ToolRunner.run(conf, new Mover.Cli(), + new String[] {"-p", nn1 + dir.toString(), nn2 + dir.toString()}); + + Assert.assertEquals("Movement to DISK should be successful", 0, rc); + + waitForLocatedBlockWithArchiveStorageType(dfs1, file, 3); + waitForLocatedBlockWithDiskStorageType(dfs2, file, 3); + } finally { + cluster.shutdown(); + } + + } + private void waitForLocatedBlockWithArchiveStorageType( final DistributedFileSystem dfs, final String file, int expectedArchiveCount) throws Exception {