diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 86af2cff15..cbae575b1a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -87,6 +87,17 @@ public class FileOutputCommitter extends PathOutputCommitter { // default value to be 1 to keep consistent with previous behavior public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1; + // Whether tasks should delete their task temporary directories. This is + // purely an optimization for filesystems without O(1) recursive delete, as + // commitJob will recursively delete the entire job temporary directory. + // HDFS has O(1) recursive delete, so this parameter is left false by default. + // Users of object stores, for example, may want to set this to true. Note: + // this is only used if mapreduce.fileoutputcommitter.algorithm.version=2 + public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED = + "mapreduce.fileoutputcommitter.task.cleanup.enabled"; + public static final boolean + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false; + private Path outputPath = null; private Path workPath = null; private final int algorithmVersion; @@ -586,6 +597,17 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath) mergePaths(fs, taskAttemptDirStatus, outputPath); LOG.info("Saved output of task '" + attemptId + "' to " + outputPath); + + if (context.getConfiguration().getBoolean( + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) { + LOG.debug(String.format( + "Deleting the temporary directory of '%s': '%s'", + attemptId, taskAttemptPath)); + if(!fs.delete(taskAttemptPath, true)) { + LOG.warn("Could not delete " + taskAttemptPath); + } + } } } else { LOG.warn("No Output found for " + attemptId); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml index 1e432cec5c..62f3dfa127 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/resources/mapred-default.xml @@ -1494,6 +1494,17 @@ + + mapreduce.fileoutputcommitter.task.cleanup.enabled + false + Whether tasks should delete their task temporary directories. This is purely an + optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete + the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left + false by default. Users of object stores, for example, may want to set this to true. + + Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2 + + yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms 1000 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java index f72aa55144..cd9d44b936 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestFileOutputCommitter.java @@ -255,13 +255,18 @@ else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) { assert(dataFileFound && indexFileFound); } - private void testCommitterInternal(int version) throws Exception { + private void testCommitterInternal(int version, boolean taskCleanup) + throws Exception { Job job = Job.getInstance(); FileOutputFormat.setOutputPath(job, outDir); Configuration conf = job.getConfiguration(); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); - conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, + conf.setInt( + FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION, version); + conf.setBoolean( + FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED, + taskCleanup); JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext); @@ -275,9 +280,30 @@ private void testCommitterInternal(int version) throws Exception { RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext); writeOutput(theRecordWriter, tContext); + // check task and job temp directories exist + File jobOutputDir = new File( + new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString()); + File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority( + committer.getWorkPath()).toString()); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + // do commit committer.commitTask(tContext); + assertTrue("job temp dir does not exist", jobOutputDir.exists()); + if (version == 1 || taskCleanup) { + // Task temp dir gets renamed in v1 and deleted if taskCleanup is + // enabled in v2 + assertFalse("task temp dir still exists", taskOutputDir.exists()); + } else { + // By default, in v2 the task temp dir is only deleted during commitJob + assertTrue("task temp dir does not exist", taskOutputDir.exists()); + } + + // Entire job temp directory gets deleted, including task temp dir committer.commitJob(jContext); + assertFalse("job temp dir still exists", jobOutputDir.exists()); + assertFalse("task temp dir still exists", taskOutputDir.exists()); // validate output validateContent(outDir); @@ -286,12 +312,17 @@ private void testCommitterInternal(int version) throws Exception { @Test public void testCommitterV1() throws Exception { - testCommitterInternal(1); + testCommitterInternal(1, false); } @Test public void testCommitterV2() throws Exception { - testCommitterInternal(2); + testCommitterInternal(2, false); + } + + @Test + public void testCommitterV2TaskCleanupEnabled() throws Exception { + testCommitterInternal(2, true); } @Test