diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index b86f5ea414..d7a730dfca 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -109,13 +109,6 @@ public void commitJob(JobContext jobContext) throws IOException { cleanupTempFiles(jobContext); - String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); - final boolean preserveRawXattrs = - conf.getBoolean(DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false); - if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) { - preserveFileAttributesForDirectories(conf); - } - try { if (conf.getBoolean(DistCpConstants.CONF_LABEL_DELETE_MISSING, false)) { deleteMissing(conf); @@ -125,6 +118,13 @@ public void commitJob(JobContext jobContext) throws IOException { // save missing information to a directory trackMissing(conf); } + // for HDFS-14621, should preserve status after -delete + String attributes = conf.get(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); + final boolean preserveRawXattrs = conf.getBoolean( + DistCpConstants.CONF_LABEL_PRESERVE_RAWXATTRS, false); + if ((attributes != null && !attributes.isEmpty()) || preserveRawXattrs) { + preserveFileAttributesForDirectories(conf); + } taskAttemptContext.setStatus("Commit Successful"); } finally { diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 912205f24b..2ef89e5804 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -26,11 +26,15 @@ import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.permission.FsPermission; import org.apache.hadoop.hdfs.MiniDFSCluster; +import org.apache.hadoop.io.IOUtils; +import org.apache.hadoop.io.SequenceFile; +import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.*; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.lib.output.NullOutputFormat; import org.apache.hadoop.tools.CopyListing; +import org.apache.hadoop.tools.CopyListingFileStatus; import org.apache.hadoop.tools.DistCpConstants; import org.apache.hadoop.tools.DistCpContext; import org.apache.hadoop.tools.DistCpOptions; @@ -204,6 +208,61 @@ public void testDeleteMissing() throws IOException { } } + // for HDFS-14621, should preserve times after -delete + @Test + public void testPreserveTimeWithDeleteMiss() throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + taskAttemptContext.getTaskAttemptID().getJobID()); + Configuration conf = jobContext.getConfiguration(); + + FileSystem fs = null; + try { + OutputCommitter committer = new CopyCommitter(null, taskAttemptContext); + fs = FileSystem.get(conf); + String sourceBase = TestDistCpUtils.createTestSetup( + fs, FsPermission.getDefault()); + String targetBase = TestDistCpUtils.createTestSetup( + fs, FsPermission.getDefault()); + String targetBaseAdd = TestDistCpUtils.createTestSetup( + fs, FsPermission.getDefault()); + fs.rename(new Path(targetBaseAdd), new Path(targetBase)); + + final DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), new Path("/out")) + .withSyncFolder(true).withDeleteMissing(true) + .preserve(FileAttribute.TIMES).build(); + options.appendToConf(conf); + final DistCpContext context = new DistCpContext(options); + + CopyListing listing = new GlobbedCopyListing(conf, CREDENTIALS); + Path listingFile = new Path("/tmp1/" + String.valueOf(rand.nextLong())); + listing.buildListing(listingFile, context); + + conf.set(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(DistCpConstants.CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + Path sourceListing = new Path( + conf.get(DistCpConstants.CONF_LABEL_LISTING_FILE_PATH)); + SequenceFile.Reader sourceReader = new SequenceFile.Reader(conf, + SequenceFile.Reader.file(sourceListing)); + Path targetRoot = new Path(targetBase); + + committer.commitJob(jobContext); + checkDirectoryTimes(fs, sourceReader, targetRoot); + + //Test for idempotent commit + committer.commitJob(jobContext); + checkDirectoryTimes(fs, sourceReader, targetRoot); + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + conf.unset(DistCpConstants.CONF_LABEL_PRESERVE_STATUS); + conf.set(DistCpConstants.CONF_LABEL_DELETE_MISSING, "false"); + } + } + + @Test public void testDeleteMissingFlatInterleavedFiles() throws IOException { TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); @@ -364,6 +423,27 @@ private void checkDirectoryPermissions(FileSystem fs, String targetBase, } } + private void checkDirectoryTimes( + FileSystem fs, SequenceFile.Reader sourceReader, Path targetRoot) + throws IOException { + try { + CopyListingFileStatus srcFileStatus = new CopyListingFileStatus(); + Text srcRelPath = new Text(); + + // Iterate over every source path that was copied. + while (sourceReader.next(srcRelPath, srcFileStatus)) { + Path targetFile = new Path(targetRoot.toString() + "/" + srcRelPath); + FileStatus targetStatus = fs.getFileStatus(targetFile); + Assert.assertEquals(srcFileStatus.getModificationTime(), + targetStatus.getModificationTime()); + Assert.assertEquals(srcFileStatus.getAccessTime(), + targetStatus.getAccessTime()); + } + } finally { + IOUtils.closeStream(sourceReader); + } + } + private static class NullInputFormat extends InputFormat { @Override public List getSplits(JobContext context)