diff --git a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java index 2272781f72..e5c74094e9 100644 --- a/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/main/java/org/apache/hadoop/tools/mapred/CopyCommitter.java @@ -149,9 +149,18 @@ public void abortJob(JobContext jobContext, } private void cleanupTempFiles(JobContext context) { - try { - Configuration conf = context.getConfiguration(); + Configuration conf = context.getConfiguration(); + final boolean directWrite = conf.getBoolean( + DistCpOptionSwitch.DIRECT_WRITE.getConfigLabel(), false); + final boolean append = conf.getBoolean( + DistCpOptionSwitch.APPEND.getConfigLabel(), false); + final boolean useTempTarget = !append && !directWrite; + if (!useTempTarget) { + return; + } + + try { Path targetWorkPath = new Path(conf.get(DistCpConstants.CONF_LABEL_TARGET_WORK_PATH)); FileSystem targetFS = targetWorkPath.getFileSystem(conf); diff --git a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java index 599f3ec2db..bda80a3d25 100644 --- a/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java +++ b/hadoop-tools/hadoop-distcp/src/test/java/org/apache/hadoop/tools/mapred/TestCopyCommitter.java @@ -18,6 +18,7 @@ package org.apache.hadoop.tools.mapred; +import org.apache.hadoop.fs.contract.ContractTestUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -580,6 +581,76 @@ fs, new Path(sourceBase + srcFilename), null, } } + @Test + public void testCommitWithCleanupTempFiles() throws IOException { + testCommitWithCleanup(true, false); + testCommitWithCleanup(false, true); + testCommitWithCleanup(true, true); + testCommitWithCleanup(false, false); + } + + private void testCommitWithCleanup(boolean append, boolean directWrite)throws IOException { + TaskAttemptContext taskAttemptContext = getTaskAttemptContext(config); + JobID jobID = taskAttemptContext.getTaskAttemptID().getJobID(); + JobContext jobContext = new JobContextImpl( + taskAttemptContext.getConfiguration(), + jobID); + Configuration conf = jobContext.getConfiguration(); + + String sourceBase; + String targetBase; + FileSystem fs = null; + try { + fs = FileSystem.get(conf); + sourceBase = "/tmp1/" + rand.nextLong(); + targetBase = "/tmp1/" + rand.nextLong(); + + DistCpOptions options = new DistCpOptions.Builder( + Collections.singletonList(new Path(sourceBase)), + new Path("/out")) + .withAppend(append) + .withSyncFolder(true) + .withDirectWrite(directWrite) + .build(); + options.appendToConf(conf); + + DistCpContext context = new DistCpContext(options); + context.setTargetPathExists(false); + + + conf.set(CONF_LABEL_TARGET_WORK_PATH, targetBase); + conf.set(CONF_LABEL_TARGET_FINAL_PATH, targetBase); + + Path tempFilePath = getTempFile(targetBase, taskAttemptContext); + createDirectory(fs, tempFilePath); + + OutputCommitter committer = new CopyCommitter( + null, taskAttemptContext); + committer.commitJob(jobContext); + + if (append || directWrite) { + ContractTestUtils.assertPathExists(fs, "Temp files should not be cleanup with append or direct option", + tempFilePath); + } else { + ContractTestUtils.assertPathDoesNotExist( + fs, + "Temp files should be clean up without append or direct option", + tempFilePath); + } + } finally { + TestDistCpUtils.delete(fs, "/tmp1"); + TestDistCpUtils.delete(fs, "/meta"); + } + } + + private Path getTempFile(String targetWorkPath, TaskAttemptContext taskAttemptContext) { + Path tempFile = new Path(targetWorkPath, ".distcp.tmp." + + taskAttemptContext.getTaskAttemptID().toString() + + "." + System.currentTimeMillis()); + LOG.info("Creating temp file: {}", tempFile); + return tempFile; + } + /** * Create a source file and its DistCp working files with different checksum * to test the checksum validation for copying blocks in parallel.