MAPREDUCE-7029. FileOutputCommitter is slow on filesystems lacking recursive delete. Contributed by Karthik Palaniappan

This commit is contained in:
Jason Lowe 2018-01-17 08:14:11 -06:00
parent 09efdfe9e1
commit 6e42d05829
3 changed files with 68 additions and 4 deletions

View File

@ -87,6 +87,17 @@ public class FileOutputCommitter extends PathOutputCommitter {
// default value to be 1 to keep consistent with previous behavior
public static final int FILEOUTPUTCOMMITTER_FAILURE_ATTEMPTS_DEFAULT = 1;
// Whether tasks should delete their task temporary directories. This is
// purely an optimization for filesystems without O(1) recursive delete, as
// commitJob will recursively delete the entire job temporary directory.
// HDFS has O(1) recursive delete, so this parameter is left false by default.
// Users of object stores, for example, may want to set this to true. Note:
// this is only used if mapreduce.fileoutputcommitter.algorithm.version=2
public static final String FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED =
"mapreduce.fileoutputcommitter.task.cleanup.enabled";
public static final boolean
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT = false;
private Path outputPath = null;
private Path workPath = null;
private final int algorithmVersion;
@ -586,6 +597,17 @@ public void commitTask(TaskAttemptContext context, Path taskAttemptPath)
mergePaths(fs, taskAttemptDirStatus, outputPath);
LOG.info("Saved output of task '" + attemptId + "' to " +
outputPath);
if (context.getConfiguration().getBoolean(
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED_DEFAULT)) {
LOG.debug(String.format(
"Deleting the temporary directory of '%s': '%s'",
attemptId, taskAttemptPath));
if(!fs.delete(taskAttemptPath, true)) {
LOG.warn("Could not delete " + taskAttemptPath);
}
}
}
} else {
LOG.warn("No Output found for " + attemptId);

View File

@ -1494,6 +1494,17 @@
</description>
</property>
<property>
<name>mapreduce.fileoutputcommitter.task.cleanup.enabled</name>
<value>false</value>
<description>Whether tasks should delete their task temporary directories. This is purely an
optimization for filesystems without O(1) recursive delete, as commitJob will recursively delete
the entire job temporary directory. HDFS has O(1) recursive delete, so this parameter is left
false by default. Users of object stores, for example, may want to set this to true.
Note: this is only used if mapreduce.fileoutputcommitter.algorithm.version=2</description>
</property>
<property>
<name>yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms</name>
<value>1000</value>

View File

@ -255,13 +255,18 @@ else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
assert(dataFileFound && indexFileFound);
}
private void testCommitterInternal(int version) throws Exception {
private void testCommitterInternal(int version, boolean taskCleanup)
throws Exception {
Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
conf.setInt(
FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
conf.setBoolean(
FileOutputCommitter.FILEOUTPUTCOMMITTER_TASK_CLEANUP_ENABLED,
taskCleanup);
JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID);
FileOutputCommitter committer = new FileOutputCommitter(outDir, tContext);
@ -275,9 +280,30 @@ private void testCommitterInternal(int version) throws Exception {
RecordWriter theRecordWriter = theOutputFormat.getRecordWriter(tContext);
writeOutput(theRecordWriter, tContext);
// check task and job temp directories exist
File jobOutputDir = new File(
new Path(outDir, FileOutputCommitter.PENDING_DIR_NAME).toString());
File taskOutputDir = new File(Path.getPathWithoutSchemeAndAuthority(
committer.getWorkPath()).toString());
assertTrue("job temp dir does not exist", jobOutputDir.exists());
assertTrue("task temp dir does not exist", taskOutputDir.exists());
// do commit
committer.commitTask(tContext);
assertTrue("job temp dir does not exist", jobOutputDir.exists());
if (version == 1 || taskCleanup) {
// Task temp dir gets renamed in v1 and deleted if taskCleanup is
// enabled in v2
assertFalse("task temp dir still exists", taskOutputDir.exists());
} else {
// By default, in v2 the task temp dir is only deleted during commitJob
assertTrue("task temp dir does not exist", taskOutputDir.exists());
}
// Entire job temp directory gets deleted, including task temp dir
committer.commitJob(jContext);
assertFalse("job temp dir still exists", jobOutputDir.exists());
assertFalse("task temp dir still exists", taskOutputDir.exists());
// validate output
validateContent(outDir);
@ -286,12 +312,17 @@ private void testCommitterInternal(int version) throws Exception {
@Test
public void testCommitterV1() throws Exception {
testCommitterInternal(1);
testCommitterInternal(1, false);
}
@Test
public void testCommitterV2() throws Exception {
testCommitterInternal(2);
testCommitterInternal(2, false);
}
@Test
public void testCommitterV2TaskCleanupEnabled() throws Exception {
testCommitterInternal(2, true);
}
@Test