diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 139bd08fd7..e346d0b938 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -318,8 +318,10 @@ private void preserveFileAttributesForDirectories(Configuration conf) SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, SequenceFile.Reader.file(sourceListing)); long totalLen = clusterFS.getFileStatus(sourceListing).getLen(); - - Path targetRoot = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); + // For Atomic Copy the Final & Work Path are different & atomic copy has + // already moved it to final path. + Path targetRoot = + new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH)); long preservedEntries = 0; try { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 11118c1f72..685f030e15 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -53,6 +53,8 @@ import java.util.*; import static org.apache.hadoop.fs.contract.ContractTestUtils.*; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH; +import static org.apache.hadoop.tools.DistCpConstants.CONF_LABEL_TARGET_WORK_PATH; import static org.apache.hadoop.tools.util.TestDistCpUtils.*; public class TestCopyCommitter { @@ -160,10 +162,10 @@ public void testPreserveStatus() throws IOException { context.setTargetPathExists(false); CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); - Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + Path listingFile = new Path("/tmp1/" + rand.nextLong()); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); checkDirectoryPermissions(fs, targetBase, sourcePerm); @@ -179,6 +181,45 @@ public void testPreserveStatus() throws IOException { } + @Test + public void testPreserveStatusWithAtomicCommit() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + String sourceBase; + String workBase; + String targetBase; + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + FsPermission sourcePerm = new FsPermission((short) 511); + FsPermission initialPerm = new FsPermission((short) 448); + sourceBase = TestDistCpUtils.createTestSetup(fs, sourcePerm); + workBase = TestDistCpUtils.createTestSetup(fs, initialPerm); + targetBase = "/tmp1/" + rand.nextLong(); + final DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), new Path("/out")) + .preserve(FileAttribute.PERMISSION).build(); + options.appendToConf(conf); + final DistCpContext context = new DistCpContext(options); + context.setTargetPathExists(false); + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + rand.nextLong()); + listing.buildListing(listingFile, context); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workBase); + conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); + committer.commitJob(jobContext); + checkDirectoryPermissions(fs, targetBase, sourcePerm); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); + } + } + @Test public void testDeleteMissing() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); @@ -207,8 +248,8 @@ public void testDeleteMissing() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); verifyFoldersAreInSync(fs, targetBase, sourceBase); @@ -256,8 +297,8 @@ public void testPreserveTimeWithDeleteMiss() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); Path sourceListing = new Path( conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); @@ -320,8 +361,8 @@ public void testDeleteMissingFlatInterleavedFiles() throws IOException { Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); committer.commitJob(jobContext); verifyFoldersAreInSync(fs, targetBase, sourceBase); @@ -353,8 +394,8 @@ public void testAtomicCommitMissingFinal() throws IOException { fs = FileSystem.get(conf); fs.mkdirs(new Path(workPath)); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); assertPathExists(fs, "Work path", new Path(workPath)); @@ -391,8 +432,8 @@ public void testAtomicCommitExistingFinal() throws IOException { fs.mkdirs(new Path(workPath)); fs.mkdirs(new Path(finalPath)); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, workPath); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, finalPath); + conf.set(CONF_LABEL_TARGET_WORK_PATH, workPath); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, finalPath); conf.setBoolean(DistCpConstants.CONF_LABEL_ATOMIC_COPY, true); assertPathExists(fs, "Work path", new Path(workPath)); @@ -463,8 +504,8 @@ private void testCommitWithChecksumMismatch(boolean skipCrc) + String.valueOf(rand.nextLong())); listing.buildListing(listingFile, context); - conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); - conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); OutputCommitter committer = new CopyCommitter( null, taskAttemptContext);