MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for user-specified task output subdirs. Contributed by Gera Shegalov and Siqi Li

This commit is contained in:
Jason Lowe 2015-03-19 21:39:21 +00:00
parent 61a4c7fc98
commit 91baca145a
3 changed files with 186 additions and 45 deletions

View File

@ -463,6 +463,9 @@ Release 2.7.0 - UNRELEASED
MAPREDUCE-6277. Job can post multiple history files if attempt loses MAPREDUCE-6277. Job can post multiple history files if attempt loses
connection to the RM (Chang Li via jlowe) connection to the RM (Chang Li via jlowe)
MAPREDUCE-6275. Race condition in FileOutputCommitter v2 for
user-specified task output subdirs (Gera Shegalov and Siqi Li via jlowe)
Release 2.6.1 - UNRELEASED Release 2.6.1 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -348,44 +348,61 @@ public void commitJob(JobContext context) throws IOException {
* @param to the path data is going to. * @param to the path data is going to.
* @throws IOException on any error * @throws IOException on any error
*/ */
private static void mergePaths(FileSystem fs, final FileStatus from, private void mergePaths(FileSystem fs, final FileStatus from,
final Path to) final Path to) throws IOException {
throws IOException { if (LOG.isDebugEnabled()) {
LOG.debug("Merging data from "+from+" to "+to); LOG.debug("Merging data from " + from + " to " + to);
if(from.isFile()) { }
if(fs.exists(to)) { FileStatus toStat;
if(!fs.delete(to, true)) { try {
throw new IOException("Failed to delete "+to); toStat = fs.getFileStatus(to);
} } catch (FileNotFoundException fnfe) {
} toStat = null;
}
if(!fs.rename(from.getPath(), to)) { if (from.isFile()) {
throw new IOException("Failed to rename "+from+" to "+to); if (toStat != null) {
} if (!fs.delete(to, true)) {
} else if(from.isDirectory()) { throw new IOException("Failed to delete " + to);
if(fs.exists(to)) { }
FileStatus toStat = fs.getFileStatus(to); }
if(!toStat.isDirectory()) {
if(!fs.delete(to, true)) { if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to delete "+to); throw new IOException("Failed to rename " + from + " to " + to);
} }
if(!fs.rename(from.getPath(), to)) { } else if (from.isDirectory()) {
throw new IOException("Failed to rename "+from+" to "+to); if (toStat != null) {
} if (!toStat.isDirectory()) {
} else { if (!fs.delete(to, true)) {
//It is a directory so merge everything in the directories throw new IOException("Failed to delete " + to);
for(FileStatus subFrom: fs.listStatus(from.getPath())) { }
Path subTo = new Path(to, subFrom.getPath().getName()); renameOrMerge(fs, from, to);
mergePaths(fs, subFrom, subTo); } else {
} //It is a directory so merge everything in the directories
} for (FileStatus subFrom : fs.listStatus(from.getPath())) {
} else { Path subTo = new Path(to, subFrom.getPath().getName());
//it does not exist just rename mergePaths(fs, subFrom, subTo);
if(!fs.rename(from.getPath(), to)) { }
throw new IOException("Failed to rename "+from+" to "+to); }
} } else {
} renameOrMerge(fs, from, to);
} }
}
}
private void renameOrMerge(FileSystem fs, FileStatus from, Path to)
throws IOException {
if (algorithmVersion == 1) {
if (!fs.rename(from.getPath(), to)) {
throw new IOException("Failed to rename " + from + " to " + to);
}
} else {
fs.mkdirs(to);
for (FileStatus subFrom : fs.listStatus(from.getPath())) {
Path subTo = new Path(to, subFrom.getPath().getName());
mergePaths(fs, subFrom, subTo);
}
}
} }
@Override @Override
@ -546,8 +563,9 @@ public void recoverTask(TaskAttemptContext context)
Path previousCommittedTaskPath = getCommittedTaskPath( Path previousCommittedTaskPath = getCommittedTaskPath(
previousAttempt, context); previousAttempt, context);
FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration()); FileSystem fs = previousCommittedTaskPath.getFileSystem(context.getConfiguration());
if (LOG.isDebugEnabled()) {
LOG.debug("Trying to recover task from " + previousCommittedTaskPath); LOG.debug("Trying to recover task from " + previousCommittedTaskPath);
}
if (algorithmVersion == 1) { if (algorithmVersion == 1) {
if (fs.exists(previousCommittedTaskPath)) { if (fs.exists(previousCommittedTaskPath)) {
Path committedTaskPath = getCommittedTaskPath(context); Path committedTaskPath = getCommittedTaskPath(context);

View File

@ -22,9 +22,15 @@
import java.io.FileInputStream; import java.io.FileInputStream;
import java.io.IOException; import java.io.IOException;
import java.net.URI; import java.net.URI;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import junit.framework.TestCase; import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileSystem;
@ -39,6 +45,7 @@
import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.JobStatus; import org.apache.hadoop.mapreduce.JobStatus;
import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskAttemptID;
@ -47,13 +54,25 @@
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class TestFileOutputCommitter extends TestCase { public class TestFileOutputCommitter extends TestCase {
private static Path outDir = new Path(System.getProperty("test.build.data", private static final Path outDir = new Path(
"/tmp"), "output"); System.getProperty("test.build.data",
System.getProperty("java.io.tmpdir")),
TestFileOutputCommitter.class.getName());
private final static String SUB_DIR = "SUB_DIR";
private final static Path OUT_SUB_DIR = new Path(outDir, SUB_DIR);
private static final Log LOG =
LogFactory.getLog(TestFileOutputCommitter.class);
// A random task attempt id for testing. // A random task attempt id for testing.
private static String attempt = "attempt_200707121733_0001_m_000000_0"; private static final String attempt = "attempt_200707121733_0001_m_000000_0";
private static String partFile = "part-m-00000"; private static final String partFile = "part-m-00000";
private static TaskAttemptID taskID = TaskAttemptID.forName(attempt); private static final TaskAttemptID taskID = TaskAttemptID.forName(attempt);
private static final String attempt1 = "attempt_200707121733_0001_m_000001_0";
private static final TaskAttemptID taskID1 = TaskAttemptID.forName(attempt1);
private Text key1 = new Text("key1"); private Text key1 = new Text("key1");
private Text key2 = new Text("key2"); private Text key2 = new Text("key2");
private Text val1 = new Text("val1"); private Text val1 = new Text("val1");
@ -229,7 +248,7 @@ else if (f.getPath().getName().equals(MapFile.DATA_FILE_NAME)) {
} }
private void testCommitterInternal(int version) throws Exception { private void testCommitterInternal(int version) throws Exception {
Job job = Job.getInstance(); Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir); FileOutputFormat.setOutputPath(job, outDir);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt); conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
@ -441,6 +460,107 @@ public void testFailAbortV2() throws Exception {
testFailAbortInternal(2); testFailAbortInternal(2);
} }
static class RLFS extends RawLocalFileSystem {
private final ThreadLocal<Boolean> needNull = new ThreadLocal<Boolean>() {
@Override
protected Boolean initialValue() {
return true;
}
};
public RLFS() {
}
@Override
public FileStatus getFileStatus(Path f) throws IOException {
if (needNull.get() &&
OUT_SUB_DIR.toUri().getPath().equals(f.toUri().getPath())) {
needNull.set(false); // lie once per thread
return null;
}
return super.getFileStatus(f);
}
}
private void testConcurrentCommitTaskWithSubDir(int version)
throws Exception {
final Job job = Job.getInstance();
FileOutputFormat.setOutputPath(job, outDir);
final Configuration conf = job.getConfiguration();
conf.set(MRJobConfig.TASK_ATTEMPT_ID, attempt);
conf.setInt(FileOutputCommitter.FILEOUTPUTCOMMITTER_ALGORITHM_VERSION,
version);
conf.setClass("fs.file.impl", RLFS.class, FileSystem.class);
FileSystem.closeAll();
final JobContext jContext = new JobContextImpl(conf, taskID.getJobID());
final FileOutputCommitter amCommitter =
new FileOutputCommitter(outDir, jContext);
amCommitter.setupJob(jContext);
final TaskAttemptContext[] taCtx = new TaskAttemptContextImpl[2];
taCtx[0] = new TaskAttemptContextImpl(conf, taskID);
taCtx[1] = new TaskAttemptContextImpl(conf, taskID1);
final TextOutputFormat[] tof = new TextOutputFormat[2];
for (int i = 0; i < tof.length; i++) {
tof[i] = new TextOutputFormat() {
@Override
public Path getDefaultWorkFile(TaskAttemptContext context,
String extension) throws IOException {
final FileOutputCommitter foc = (FileOutputCommitter)
getOutputCommitter(context);
return new Path(new Path(foc.getWorkPath(), SUB_DIR),
getUniqueFile(context, getOutputName(context), extension));
}
};
}
final ExecutorService executor = Executors.newFixedThreadPool(2);
try {
for (int i = 0; i < taCtx.length; i++) {
final int taskIdx = i;
executor.submit(new Callable<Void>() {
@Override
public Void call() throws IOException, InterruptedException {
final OutputCommitter outputCommitter =
tof[taskIdx].getOutputCommitter(taCtx[taskIdx]);
outputCommitter.setupTask(taCtx[taskIdx]);
final RecordWriter rw =
tof[taskIdx].getRecordWriter(taCtx[taskIdx]);
writeOutput(rw, taCtx[taskIdx]);
outputCommitter.commitTask(taCtx[taskIdx]);
return null;
}
});
}
} finally {
executor.shutdown();
while (!executor.awaitTermination(1, TimeUnit.SECONDS)) {
LOG.info("Awaiting thread termination!");
}
}
amCommitter.commitJob(jContext);
final RawLocalFileSystem lfs = new RawLocalFileSystem();
lfs.setConf(conf);
assertFalse("Must not end up with sub_dir/sub_dir",
lfs.exists(new Path(OUT_SUB_DIR, SUB_DIR)));
// validate output
validateContent(OUT_SUB_DIR);
FileUtil.fullyDelete(new File(outDir.toString()));
}
public void testConcurrentCommitTaskWithSubDirV1() throws Exception {
testConcurrentCommitTaskWithSubDir(1);
}
public void testConcurrentCommitTaskWithSubDirV2() throws Exception {
testConcurrentCommitTaskWithSubDir(2);
}
public static String slurp(File f) throws IOException { public static String slurp(File f) throws IOException {
int len = (int) f.length(); int len = (int) f.length();
byte[] buf = new byte[len]; byte[] buf = new byte[len];