diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e3e30079ce..f5619b3180 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -249,6 +249,9 @@ Release 2.0.5-beta - UNRELEASED FileOuputFormat.Counter for binary compatibility with 1.x mapred APIs. (Mayank Bansal via vinodkv) + MAPREDUCE-5176. Add annotation for tagging tasks as responsive to + preemption. (Carlo Curino, cdouglas) + OPTIMIZATIONS MAPREDUCE-4974. Optimising the LineRecordReader initialize() method diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java index 3df477f13e..ddf67e18ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/Reducer.java @@ -23,8 +23,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.io.RawComparator; -import org.apache.hadoop.mapred.RawKeyValueIterator; +import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; import java.util.Iterator; @@ -119,6 +118,7 @@ * @see Mapper * @see Partitioner */ +@Checkpointable @InterfaceAudience.Public @InterfaceStability.Stable public class Reducer { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index 9ad310c4b1..5b1e9a0c47 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -154,7 +154,7 @@ public static Path getJobAttemptPath(JobContext context, Path out) { * @param appAttemptId the ID of the application attempt for this job. * @return the path to store job attempt data. */ - private Path getJobAttemptPath(int appAttemptId) { + protected Path getJobAttemptPath(int appAttemptId) { return getJobAttemptPath(appAttemptId, getOutputPath()); } @@ -232,7 +232,7 @@ public static Path getCommittedTaskPath(TaskAttemptContext context, Path out) { * @param context the context of any task. * @return the path where the output of a committed task is stored. */ - private Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { + protected Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { return new Path(getJobAttemptPath(appAttemptId), String.valueOf(context.getTaskAttemptID().getTaskID())); } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialFileOutputCommitter.java new file mode 100644 index 0000000000..1d15370ea4 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialFileOutputCommitter.java @@ -0,0 +1,106 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.OutputCommitter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskID; +import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; + +import com.google.common.annotations.VisibleForTesting; + +/** An {@link OutputCommitter} that commits files specified + * in job output directory i.e. ${mapreduce.output.fileoutputformat.outputdir}. + **/ +@Checkpointable +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class PartialFileOutputCommitter + extends FileOutputCommitter implements PartialOutputCommitter { + + private static final Log LOG = + LogFactory.getLog(PartialFileOutputCommitter.class); + + + public PartialFileOutputCommitter(Path outputPath, + TaskAttemptContext context) throws IOException { + super(outputPath, context); + } + + public PartialFileOutputCommitter(Path outputPath, + JobContext context) throws IOException { + super(outputPath, context); + } + + @Override + public Path getCommittedTaskPath(int appAttemptId, TaskAttemptContext context) { + return new Path(getJobAttemptPath(appAttemptId), + String.valueOf(context.getTaskAttemptID())); + } + + @VisibleForTesting + FileSystem fsFor(Path p, Configuration conf) throws IOException { + return p.getFileSystem(conf); + } + + @Override + public void cleanUpPartialOutputForTask(TaskAttemptContext context) + throws IOException { + + // we double check this is never invoked from a non-preemptable subclass. + // This should never happen, since the invoking codes is checking it too, + // but it is safer to double check. Errors handling this would produce + // inconsistent output. + + if (!this.getClass().isAnnotationPresent(Checkpointable.class)) { + throw new IllegalStateException("Invoking cleanUpPartialOutputForTask() " + + "from non @Preemptable class"); + } + FileSystem fs = + fsFor(getTaskAttemptPath(context), context.getConfiguration()); + + LOG.info("cleanUpPartialOutputForTask: removing everything belonging to " + + context.getTaskAttemptID().getTaskID() + " in: " + + getCommittedTaskPath(context).getParent()); + + final TaskAttemptID taid = context.getTaskAttemptID(); + final TaskID tid = taid.getTaskID(); + Path pCommit = getCommittedTaskPath(context).getParent(); + // remove any committed output + for (int i = 0; i < taid.getId(); ++i) { + TaskAttemptID oldId = new TaskAttemptID(tid, i); + Path pTask = new Path(pCommit, oldId.toString()); + if (fs.exists(pTask) && !fs.delete(pTask, true)) { + throw new IOException("Failed to delete " + pTask); + } + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialOutputCommitter.java new file mode 100644 index 0000000000..a0c6d1ef48 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/PartialOutputCommitter.java @@ -0,0 +1,43 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.IOException; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.mapreduce.TaskAttemptContext; + +/** + * Interface for an {@link org.apache.hadoop.mapreduce.OutputCommitter} + * implementing partial commit of task output, as during preemption. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public interface PartialOutputCommitter { + + /** + * Remove all previously committed outputs from prior executions of this task. + * @param context Context for cleaning up previously promoted output. + * @throws IOException If cleanup fails, then the state of the task my not be + * well defined. + */ + public void cleanUpPartialOutputForTask(TaskAttemptContext context) + throws IOException; + +} \ No newline at end of file diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/annotation/Checkpointable.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/annotation/Checkpointable.java new file mode 100644 index 0000000000..47e421a30a --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/task/annotation/Checkpointable.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.mapreduce.task.annotation; + +import java.lang.annotation.Documented; +import java.lang.annotation.ElementType; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; +import java.lang.annotation.Target; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; + +/** + * Contract representing to the framework that the task can be safely preempted + * and restarted between invocations of the user-defined function. + * + * This is often true when the result of a function does not rely on state + * derived from previous elements in the record stream, but the guarantee is + * left as an exercise to the implementor. + */ +@Documented +@Target(ElementType.TYPE) +@Retention(RetentionPolicy.RUNTIME) +@InterfaceAudience.Public +@InterfaceStability.Evolving +public @interface Checkpointable { } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPreemptableFileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPreemptableFileOutputCommitter.java new file mode 100644 index 0000000000..09ac286ef2 --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/test/java/org/apache/hadoop/mapreduce/lib/output/TestPreemptableFileOutputCommitter.java @@ -0,0 +1,101 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.mapreduce.lib.output; + +import java.io.FileNotFoundException; +import java.io.IOException; + +import org.apache.hadoop.mapreduce.task.annotation.Checkpointable; +import org.junit.Test; +import static org.mockito.Mockito.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.TaskType; + +public class TestPreemptableFileOutputCommitter { + + @Test + public void testPartialOutputCleanup() + throws FileNotFoundException, IllegalArgumentException, IOException { + + Configuration conf = new Configuration(false); + conf.setInt(MRJobConfig.APPLICATION_ATTEMPT_ID, 1); + TaskAttemptID tid0 = + new TaskAttemptID("1363718006656", 1, TaskType.REDUCE, 14, 3); + + Path p = spy(new Path("/user/hadoop/out")); + Path a = new Path("hdfs://user/hadoop/out"); + Path p0 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_0"); + Path p1 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_1"); + Path p2 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000013_0"); + // (p3 does not exist) + Path p3 = new Path(a, "_temporary/1/attempt_1363718006656_0001_r_000014_2"); + + FileStatus[] fsa = new FileStatus[3]; + fsa[0] = new FileStatus(); + fsa[0].setPath(p0); + fsa[1] = new FileStatus(); + fsa[1].setPath(p1); + fsa[2] = new FileStatus(); + fsa[2].setPath(p2); + + final FileSystem fs = mock(FileSystem.class); + when(fs.exists(eq(p0))).thenReturn(true); + when(fs.exists(eq(p1))).thenReturn(true); + when(fs.exists(eq(p2))).thenReturn(true); + when(fs.exists(eq(p3))).thenReturn(false); + when(fs.delete(eq(p0), eq(true))).thenReturn(true); + when(fs.delete(eq(p1), eq(true))).thenReturn(true); + doReturn(fs).when(p).getFileSystem(any(Configuration.class)); + when(fs.makeQualified(eq(p))).thenReturn(a); + + TaskAttemptContext context = mock(TaskAttemptContext.class); + when(context.getTaskAttemptID()).thenReturn(tid0); + when(context.getConfiguration()).thenReturn(conf); + + PartialFileOutputCommitter foc = new TestPFOC(p, context, fs); + + foc.cleanUpPartialOutputForTask(context); + verify(fs).delete(eq(p0), eq(true)); + verify(fs).delete(eq(p1), eq(true)); + verify(fs, never()).delete(eq(p3), eq(true)); + verify(fs, never()).delete(eq(p2), eq(true)); + } + + @Checkpointable + static class TestPFOC extends PartialFileOutputCommitter { + final FileSystem fs; + TestPFOC(Path outputPath, TaskAttemptContext ctxt, FileSystem fs) + throws IOException { + super(outputPath, ctxt); + this.fs = fs; + } + @Override + FileSystem fsFor(Path p, Configuration conf) { + return fs; + } + } + +}