diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 140c82ac8d..050af54fce 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -715,6 +715,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3813. Added a cache for resolved racks. (vinodkv via acmurthy) + MAPREDUCE-3808. Fixed an NPE in FileOutputCommitter for jobs with maps + but no reduces. (Robert Joseph Evans via vinodkv) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java index c23e9a9378..a6190d2060 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java @@ -85,18 +85,21 @@ private static Path getOutputPath(TaskAttemptContext context) { */ @Private Path getJobAttemptPath(JobContext context) { - return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - .getJobAttemptPath(context, getOutputPath(context)); + Path out = getOutputPath(context); + return out == null ? null : + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getJobAttemptPath(context, out); } @Private Path getTaskAttemptPath(TaskAttemptContext context) throws IOException { - return getTaskAttemptPath(context, getOutputPath(context)); + Path out = getOutputPath(context); + return out == null ? null : getTaskAttemptPath(context, out); } private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOException { Path workPath = FileOutputFormat.getWorkOutputPath(context.getJobConf()); - if(workPath == null) { + if(workPath == null && out != null) { return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter .getTaskAttemptPath(context, out); } @@ -110,14 +113,17 @@ private Path getTaskAttemptPath(TaskAttemptContext context, Path out) throws IOE * @return the path where the output of a committed task is stored until * the entire job is committed. */ + @Private Path getCommittedTaskPath(TaskAttemptContext context) { - return org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter - .getCommittedTaskPath(context, getOutputPath(context)); + Path out = getOutputPath(context); + return out == null ? null : + org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter + .getCommittedTaskPath(context, out); } public Path getWorkPath(TaskAttemptContext context, Path outputPath) throws IOException { - return getTaskAttemptPath(context, outputPath); + return outputPath == null ? null : getTaskAttemptPath(context, outputPath); } @Override @@ -156,6 +162,7 @@ public void abortJob(JobContext context, int runState) getWrapped(context).abortJob(context, state); } + @Override public void setupTask(TaskAttemptContext context) throws IOException { getWrapped(context).setupTask(context); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 0845f15350..7bad09f303 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -495,36 +495,40 @@ public boolean isRecoverySupported() { @Override public void recoverTask(TaskAttemptContext context) throws IOException { - context.progress(); - TaskAttemptID attemptId = context.getTaskAttemptID(); - int previousAttempt = getAppAttemptId(context) - 1; - if (previousAttempt < 0) { - throw new IOException ("Cannot recover task output for first attempt..."); - } - - Path committedTaskPath = getCommittedTaskPath(context); - Path previousCommittedTaskPath = getCommittedTaskPath( - previousAttempt, context); - FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); - - LOG.debug("Trying to recover task from " + previousCommittedTaskPath - + " into " + committedTaskPath); - if (fs.exists(previousCommittedTaskPath)) { - if(fs.exists(committedTaskPath)) { - if(!fs.delete(committedTaskPath, true)) { - throw new IOException("Could not delete "+committedTaskPath); + if(hasOutputPath()) { + context.progress(); + TaskAttemptID attemptId = context.getTaskAttemptID(); + int previousAttempt = getAppAttemptId(context) - 1; + if (previousAttempt < 0) { + throw new IOException ("Cannot recover task output for first attempt..."); + } + + Path committedTaskPath = getCommittedTaskPath(context); + Path previousCommittedTaskPath = getCommittedTaskPath( + previousAttempt, context); + FileSystem fs = committedTaskPath.getFileSystem(context.getConfiguration()); + + LOG.debug("Trying to recover task from " + previousCommittedTaskPath + + " into " + committedTaskPath); + if (fs.exists(previousCommittedTaskPath)) { + if(fs.exists(committedTaskPath)) { + if(!fs.delete(committedTaskPath, true)) { + throw new IOException("Could not delete "+committedTaskPath); + } } + //Rename can fail if the parent directory does not yet exist. + Path committedParent = committedTaskPath.getParent(); + fs.mkdirs(committedParent); + if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) { + throw new IOException("Could not rename " + previousCommittedTaskPath + + " to " + committedTaskPath); + } + LOG.info("Saved output of " + attemptId + " to " + committedTaskPath); + } else { + LOG.warn(attemptId+" had no output to recover."); } - //Rename can fail if the parent directory does not yet exist. - Path committedParent = committedTaskPath.getParent(); - fs.mkdirs(committedParent); - if(!fs.rename(previousCommittedTaskPath, committedTaskPath)) { - throw new IOException("Could not rename " + previousCommittedTaskPath + - " to " + committedTaskPath); - } - LOG.info("Saved output of " + attemptId + " to " + committedTaskPath); } else { - LOG.warn(attemptId+" had no output to recover."); + LOG.warn("Output Path is null in recoverTask()"); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java index e8a67cd848..0859571d1f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapred/TestFileOutputCommitter.java @@ -104,7 +104,9 @@ public void testRecovery() throws Exception { writeOutput(theRecordWriter, tContext); // do commit - committer.commitTask(tContext); + if(committer.needsTaskCommit(tContext)) { + committer.commitTask(tContext); + } Path jobTempDir1 = committer.getCommittedTaskPath(tContext); File jtd1 = new File(jobTempDir1.toUri().getPath()); assertTrue(jtd1.exists()); @@ -188,7 +190,9 @@ public void testCommitter() throws Exception { writeOutput(theRecordWriter, tContext); // do commit - committer.commitTask(tContext); + if(committer.needsTaskCommit(tContext)) { + committer.commitTask(tContext); + } committer.commitJob(jContext); // validate output @@ -214,7 +218,9 @@ public void testMapFileOutputCommitter() throws Exception { writeMapFileOutput(theRecordWriter, tContext); // do commit - committer.commitTask(tContext); + if(committer.needsTaskCommit(tContext)) { + committer.commitTask(tContext); + } committer.commitJob(jContext); // validate output @@ -222,6 +228,28 @@ public void testMapFileOutputCommitter() throws Exception { FileUtil.fullyDelete(new File(outDir.toString())); } + public void testMapOnlyNoOutput() throws Exception { + JobConf conf = new JobConf(); + //This is not set on purpose. FileOutputFormat.setOutputPath(conf, outDir); + conf.set(JobContext.TASK_ATTEMPT_ID, attempt); + JobContext jContext = new JobContextImpl(conf, taskID.getJobID()); + TaskAttemptContext tContext = new TaskAttemptContextImpl(conf, taskID); + FileOutputCommitter committer = new FileOutputCommitter(); + + // setup + committer.setupJob(jContext); + committer.setupTask(tContext); + + if(committer.needsTaskCommit(tContext)) { + // do commit + committer.commitTask(tContext); + } + committer.commitJob(jContext); + + // validate output + FileUtil.fullyDelete(new File(outDir.toString())); + } + public void testAbort() throws IOException, InterruptedException { JobConf conf = new JobConf(); FileOutputFormat.setOutputPath(conf, outDir);