From 2689b6ca727fff8a13347b811eb4cf79b9d30f48 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Mon, 15 Sep 2014 10:16:56 -0700 Subject: [PATCH] HDFS-7062. Archival Storage: skip under construction block for migration. Contributed by Jing Zhao. --- .../hadoop/hdfs/server/mover/Mover.java | 10 ++- .../apache/hadoop/hdfs/tools/DFSAdmin.java | 4 +- .../hdfs/server/mover/TestStorageMover.java | 77 +++++++++++++++++++ 3 files changed, 88 insertions(+), 3 deletions(-) diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java index 96588ffb19..e336ebc26b 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/mover/Mover.java @@ -321,7 +321,14 @@ private boolean processFile(HdfsLocatedFileStatus status) { final LocatedBlocks locatedBlocks = status.getBlockLocations(); boolean hasRemaining = false; - for(LocatedBlock lb : locatedBlocks.getLocatedBlocks()) { + final boolean lastBlkComplete = locatedBlocks.isLastBlockComplete(); + List lbs = locatedBlocks.getLocatedBlocks(); + for(int i = 0; i < lbs.size(); i++) { + if (i == lbs.size() - 1 && !lastBlkComplete) { + // last block is incomplete, skip it + continue; + } + LocatedBlock lb = lbs.get(i); final StorageTypeDiff diff = new StorageTypeDiff(types, lb.getStorageTypes()); if (!diff.removeOverlap()) { @@ -472,6 +479,7 @@ static int run(Map> namenodes, Configuration conf) final ExitStatus r = m.run(); if (r == ExitStatus.SUCCESS) { + IOUtils.cleanup(LOG, nnc); iter.remove(); } else if (r != ExitStatus.IN_PROGRESS) { // must be an error statue, return diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java index 10012c68d1..556eca677a 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/tools/DFSAdmin.java @@ -387,8 +387,8 @@ static int run(DistributedFileSystem dfs, String[] argv, int idx) throws IOExcep "\t[-shutdownDatanode [upgrade]]\n" + "\t[-getDatanodeInfo ]\n" + "\t[-metasave filename]\n" + - "\t[-setStoragePolicy path policyName\n" + - "\t[-getStoragePolicy path\n" + + "\t[-setStoragePolicy path policyName]\n" + + "\t[-getStoragePolicy path]\n" + "\t[-help [cmd]]\n"; /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java index d5d5cab7e8..ceedfc2881 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/mover/TestStorageMover.java @@ -30,9 +30,13 @@ import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.BlockLocation; +import org.apache.hadoop.fs.FSDataInputStream; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.BlockStoragePolicy; import org.apache.hadoop.hdfs.DFSConfigKeys; +import org.apache.hadoop.hdfs.DFSOutputStream; import org.apache.hadoop.hdfs.DFSTestUtil; import org.apache.hadoop.hdfs.DFSUtil; import org.apache.hadoop.hdfs.DistributedFileSystem; @@ -44,6 +48,7 @@ import org.apache.hadoop.hdfs.protocol.HdfsFileStatus; import org.apache.hadoop.hdfs.protocol.HdfsLocatedFileStatus; import org.apache.hadoop.hdfs.protocol.LocatedBlock; +import org.apache.hadoop.hdfs.protocol.LocatedBlocks; import org.apache.hadoop.hdfs.protocol.datatransfer.DataTransferProtocol; import org.apache.hadoop.hdfs.server.balancer.Dispatcher; import org.apache.hadoop.hdfs.server.balancer.ExitStatus; @@ -489,6 +494,78 @@ public void testMigrateFileToArchival() throws Exception { new MigrationTest(clusterScheme, nsScheme).runBasicTest(true); } + /** + * Print a big banner in the test log to make debug easier. + */ + static void banner(String string) { + LOG.info("\n\n\n\n================================================\n" + + string + "\n" + + "==================================================\n\n"); + } + + /** + * Move an open file into archival storage + */ + @Test + public void testMigrateOpenFileToArchival() throws Exception { + LOG.info("testMigrateOpenFileToArchival"); + final Path fooDir = new Path("/foo"); + Map policyMap = Maps.newHashMap(); + policyMap.put(fooDir, COLD); + NamespaceScheme nsScheme = new NamespaceScheme(Arrays.asList(fooDir), null, + BLOCK_SIZE, null, policyMap); + ClusterScheme clusterScheme = new ClusterScheme(DEFAULT_CONF, + NUM_DATANODES, REPL, genStorageTypes(NUM_DATANODES), null); + MigrationTest test = new MigrationTest(clusterScheme, nsScheme); + test.setupCluster(); + + // create an open file + banner("writing to file /foo/bar"); + final Path barFile = new Path(fooDir, "bar"); + DFSTestUtil.createFile(test.dfs, barFile, BLOCK_SIZE, (short) 1, 0L); + FSDataOutputStream out = test.dfs.append(barFile); + out.writeBytes("hello, "); + ((DFSOutputStream) out.getWrappedStream()).hsync(); + + try { + banner("start data migration"); + test.setStoragePolicy(); // set /foo to COLD + test.migrate(); + + // make sure the under construction block has not been migrated + LocatedBlocks lbs = test.dfs.getClient().getLocatedBlocks( + barFile.toString(), BLOCK_SIZE); + LOG.info("Locations: " + lbs); + List blks = lbs.getLocatedBlocks(); + Assert.assertEquals(1, blks.size()); + Assert.assertEquals(1, blks.get(0).getLocations().length); + + banner("finish the migration, continue writing"); + // make sure the writing can continue + out.writeBytes("world!"); + ((DFSOutputStream) out.getWrappedStream()).hsync(); + IOUtils.cleanup(LOG, out); + + lbs = test.dfs.getClient().getLocatedBlocks( + barFile.toString(), BLOCK_SIZE); + LOG.info("Locations: " + lbs); + blks = lbs.getLocatedBlocks(); + Assert.assertEquals(1, blks.size()); + Assert.assertEquals(1, blks.get(0).getLocations().length); + + banner("finish writing, starting reading"); + // check the content of /foo/bar + FSDataInputStream in = test.dfs.open(barFile); + byte[] buf = new byte[13]; + // read from offset 1024 + in.readFully(BLOCK_SIZE, buf, 0, buf.length); + IOUtils.cleanup(LOG, in); + Assert.assertEquals("hello, world!", new String(buf)); + } finally { + test.shutdownCluster(); + } + } + /** * Test directories with Hot, Warm and Cold polices. */