From bb7ce82816574f67aa1898f67e0e0cff54fa67be Mon Sep 17 00:00:00 2001 From: Jason Darrell Lowe Date: Mon, 28 Apr 2014 15:18:02 +0000 Subject: [PATCH] MAPREDUCE-5812. Make job context available to OutputCommitter.isRecoverySupported(). Contributed by Mohammad Kamrul Islam git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1590668 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../dev-support/findbugs-exclude.xml | 2 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 25 +- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 8 + .../hadoop/mapreduce/v2/app/TestRecovery.java | 260 ++++++++++++++++++ .../hadoop/mapred/FileOutputCommitter.java | 8 +- .../apache/hadoop/mapred/OutputCommitter.java | 35 ++- .../hadoop/mapreduce/OutputCommitter.java | 25 +- .../lib/output/FileOutputCommitter.java | 1 + .../lib/output/NullOutputFormat.java | 1 + 10 files changed, 359 insertions(+), 9 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 923a5b068c..c4feb8f4b4 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -172,6 +172,9 @@ Release 2.5.0 - UNRELEASED MAPREDUCE-5639. Port DistCp2 document to trunk (Akira AJISAKA via jeagles) + MAPREDUCE-5812. Make job context available to + OutputCommitter.isRecoverySupported() (Mohammad Kamrul Islam via jlowe) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml index 11d4643066..5e465afd0a 100644 --- a/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml +++ b/hadoop-mapreduce-project/dev-support/findbugs-exclude.xml @@ -158,6 +158,7 @@ + @@ -168,6 +169,7 @@ + diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index a67e2b57a7..df66c4fb51 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -48,6 +48,7 @@ import org.apache.hadoop.mapred.TaskAttemptListenerImpl; import org.apache.hadoop.mapred.TaskLog; import org.apache.hadoop.mapred.TaskUmbilicalProtocol; +import org.apache.hadoop.mapreduce.JobContext; import org.apache.hadoop.mapreduce.MRJobConfig; import org.apache.hadoop.mapreduce.OutputCommitter; import org.apache.hadoop.mapreduce.OutputFormat; @@ -67,6 +68,7 @@ import org.apache.hadoop.mapreduce.jobhistory.JobHistoryParser.TaskInfo; import org.apache.hadoop.mapreduce.security.TokenCache; import org.apache.hadoop.mapreduce.security.token.JobTokenSecretManager; +import org.apache.hadoop.mapreduce.task.JobContextImpl; import org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl; import org.apache.hadoop.mapreduce.v2.api.records.AMInfo; import org.apache.hadoop.mapreduce.v2.api.records.JobId; @@ -1107,7 +1109,24 @@ public void stop() { TaskLog.syncLogsShutdown(logSyncer); } - private void processRecovery() { + private boolean isRecoverySupported(OutputCommitter committer2) + throws IOException { + boolean isSupported = false; + JobContext _jobContext; + if (committer != null) { + if (newApiCommitter) { + _jobContext = new JobContextImpl( + getConfig(), TypeConverter.fromYarn(getJobId())); + } else { + _jobContext = new org.apache.hadoop.mapred.JobContextImpl( + new JobConf(getConfig()), TypeConverter.fromYarn(getJobId())); + } + isSupported = committer.isRecoverySupported(_jobContext); + } + return isSupported; + } + + private void processRecovery() throws IOException{ if (appAttemptID.getAttemptId() == 1) { return; // no need to recover on the first attempt } @@ -1115,8 +1134,8 @@ private void processRecovery() { boolean recoveryEnabled = getConfig().getBoolean( MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); - boolean recoverySupportedByCommitter = - committer != null && committer.isRecoverySupported(); + + boolean recoverySupportedByCommitter = isRecoverySupported(committer); // If a shuffle secret was not provided by the job client then this app // attempt will generate one. However that disables recovery if there diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index 6bd2010f52..623d5de4ea 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -627,10 +627,18 @@ public void abortJob(JobContext jobContext, State state) throws IOException { committer.abortJob(jobContext, state); } + + @Override + public boolean isRecoverySupported(JobContext jobContext) throws IOException{ + return committer.isRecoverySupported(jobContext); + } + + @SuppressWarnings("deprecation") @Override public boolean isRecoverySupported() { return committer.isRecoverySupported(); } + @Override public void setupTask(TaskAttemptContext taskContext) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 1a969a37dd..0e07bcf969 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -425,6 +425,266 @@ public void testCrashOfMapsOnlyJob() throws Exception { app.verifyCompleted(); } + /** + * The class provides a custom implementation of output committer setupTask + * and isRecoverySupported methods, which determines if recovery supported + * based on config property. + */ + public static class TestFileOutputCommitter extends + org.apache.hadoop.mapred.FileOutputCommitter { + + @Override + public boolean isRecoverySupported( + org.apache.hadoop.mapred.JobContext jobContext) { + boolean isRecoverySupported = false; + if (jobContext != null && jobContext.getConfiguration() != null) { + isRecoverySupported = jobContext.getConfiguration().getBoolean( + "want.am.recovery", false); + } + return isRecoverySupported; + } + } + + /** + * This test case primarily verifies if the recovery is controlled through config + * property. In this case, recover is turned ON. AM with 3 maps and 0 reduce. + * AM crashes after the first two tasks finishes and recovers completely and + * succeeds in the second generation. + * + * @throws Exception + */ + @Test + public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(3, 0, false, this.getClass().getName(), + true, ++runCount); + Configuration conf = new Configuration(); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean("want.am.recovery", true); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + // all maps would be running + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task mapTask3 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + app.waitForState(mapTask3, TaskState.RUNNING); + + TaskAttempt task1Attempt = mapTask1.getAttempts().values().iterator() + .next(); + TaskAttempt task2Attempt = mapTask2.getAttempts().values().iterator() + .next(); + TaskAttempt task3Attempt = mapTask3.getAttempts().values().iterator() + .next(); + + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 1st two maps + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for first two map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // stop the app + app.stop(); + + // rerun + // in rerun the 1st two map will be recovered from previous run + app = new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setClass("mapred.output.committer.class", + TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean("want.am.recovery", true); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + // Set num-reduces explicitly in conf as recovery logic depends on it. + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + mapTask3 = it.next(); + + // first two maps will be recovered, no need to send done + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + app.waitForState(mapTask3, TaskState.RUNNING); + + task3Attempt = mapTask3.getAttempts().values().iterator().next(); + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 3rd map task + app.getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask3.getAttempts().values().iterator() + .next().getID(), TaskAttemptEventType.TA_DONE)); + + // wait to get it completed + app.waitForState(mapTask3, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + + /** + * This test case primarily verifies if the recovery is controlled through config + * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce. + * AM crashes after the first two tasks finishes and recovery fails and have + * to rerun fully in the second generation and succeeds. + * + * @throws Exception + */ + @Test + public void testRecoveryFailsUsingCustomOutputCommitter() throws Exception { + int runCount = 0; + MRApp app = + new MRAppWithHistory(3, 0, false, this.getClass().getName(), true, + ++runCount); + Configuration conf = new Configuration(); + conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean("want.am.recovery", false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + // all maps would be running + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask1 = it.next(); + Task mapTask2 = it.next(); + Task mapTask3 = it.next(); + + // all maps must be running + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + app.waitForState(mapTask3, TaskState.RUNNING); + + TaskAttempt task1Attempt = + mapTask1.getAttempts().values().iterator().next(); + TaskAttempt task2Attempt = + mapTask2.getAttempts().values().iterator().next(); + TaskAttempt task3Attempt = + mapTask3.getAttempts().values().iterator().next(); + + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task1Attempt, TaskAttemptState.RUNNING); + app.waitForState(task2Attempt, TaskAttemptState.RUNNING); + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to the 1st two maps + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task1Attempt.getID(), TaskAttemptEventType.TA_DONE)); + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(task2Attempt.getID(), TaskAttemptEventType.TA_DONE)); + + // wait for first two map task to complete + app.waitForState(mapTask1, TaskState.SUCCEEDED); + app.waitForState(mapTask2, TaskState.SUCCEEDED); + + // stop the app + app.stop(); + + // rerun + // in rerun the 1st two map will be recovered from previous run + app = + new MRAppWithHistory(2, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setClass("mapred.output.committer.class", TestFileOutputCommitter.class, + org.apache.hadoop.mapred.OutputCommitter.class); + conf.setBoolean("want.am.recovery", false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + // Set num-reduces explicitly in conf as recovery logic depends on it. + conf.setInt(MRJobConfig.NUM_REDUCES, 0); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + + Assert.assertEquals("No of tasks not correct", 3, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask1 = it.next(); + mapTask2 = it.next(); + mapTask3 = it.next(); + + // first two maps will NOT be recovered, need to send done from them + app.waitForState(mapTask1, TaskState.RUNNING); + app.waitForState(mapTask2, TaskState.RUNNING); + + app.waitForState(mapTask3, TaskState.RUNNING); + + task3Attempt = mapTask3.getAttempts().values().iterator().next(); + // before sending the TA_DONE, event make sure attempt has come to + // RUNNING state + app.waitForState(task3Attempt, TaskAttemptState.RUNNING); + + // send the done signal to all 3 tasks map task + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask1.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask2.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + + app + .getContext() + .getEventHandler() + .handle( + new TaskAttemptEvent(mapTask3.getAttempts().values().iterator().next() + .getID(), TaskAttemptEventType.TA_DONE)); + + // wait to get it completed + app.waitForState(mapTask3, TaskState.SUCCEEDED); + + app.waitForState(job, JobState.SUCCEEDED); + app.verifyCompleted(); + } + @Test public void testMultipleCrashes() throws Exception { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java index 496280a737..77d06b6b97 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/FileOutputCommitter.java @@ -184,10 +184,16 @@ public boolean needsTaskCommit(TaskAttemptContext context) } @Override + @Deprecated public boolean isRecoverySupported() { return true; } - + + @Override + public boolean isRecoverySupported(JobContext context) throws IOException { + return getWrapped(context).isRecoverySupported(context); + } + @Override public void recoverTask(TaskAttemptContext context) throws IOException { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java index 318ef9d678..79df7f863f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapred/OutputCommitter.java @@ -176,14 +176,34 @@ public abstract void abortTask(TaskAttemptContext taskContext) /** * This method implements the new interface by calling the old method. Note - * that the input types are different between the new and old apis and this - * is a bridge between the two. + * that the input types are different between the new and old apis and this is + * a bridge between the two. + * + * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. */ + @Deprecated @Override public boolean isRecoverySupported() { return false; } + /** + * Is task output recovery supported for restarting jobs? + * + * If task output recovery is supported, job restart can be done more + * efficiently. + * + * @param jobContext + * Context of the job whose output is being written. + * @return true if task output recovery is supported, + * false otherwise + * @throws IOException + * @see #recoverTask(TaskAttemptContext) + */ + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return isRecoverySupported(); + } + /** * Recover the task output. * @@ -315,4 +335,15 @@ void recoverTask(org.apache.hadoop.mapreduce.TaskAttemptContext taskContext recoverTask((TaskAttemptContext) taskContext); } + /** + * This method implements the new interface by calling the old method. Note + * that the input types are different between the new and old apis and this is + * a bridge between the two. + */ + @Override + public final boolean isRecoverySupported( + org.apache.hadoop.mapreduce.JobContext context) throws IOException { + return isRecoverySupported((JobContext) context); + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java index c87490d8a6..cb44f63327 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/OutputCommitter.java @@ -176,17 +176,36 @@ public abstract void abortTask(TaskAttemptContext taskContext) /** * Is task output recovery supported for restarting jobs? * - * If task output recovery is supported, job restart can be done more + * If task output recovery is supported, job restart can be done more * efficiently. * * @return true if task output recovery is supported, * false otherwise - * @see #recoverTask(TaskAttemptContext) + * @see #recoverTask(TaskAttemptContext) + * @deprecated Use {@link #isRecoverySupported(JobContext)} instead. */ + @Deprecated public boolean isRecoverySupported() { return false; } - + + /** + * Is task output recovery supported for restarting jobs? + * + * If task output recovery is supported, job restart can be done more + * efficiently. + * + * @param jobContext + * Context of the job whose output is being written. + * @return true if task output recovery is supported, + * false otherwise + * @throws IOException + * @see #recoverTask(TaskAttemptContext) + */ + public boolean isRecoverySupported(JobContext jobContext) throws IOException { + return isRecoverySupported(); + } + /** * Recover the task output. * diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java index dbf915c24b..55252f04f9 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/FileOutputCommitter.java @@ -495,6 +495,7 @@ public boolean needsTaskCommit(TaskAttemptContext context, Path taskAttemptPath } @Override + @Deprecated public boolean isRecoverySupported() { return true; } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java index 32f44f24d8..7bd615ab1e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/lib/output/NullOutputFormat.java @@ -60,6 +60,7 @@ public void setupJob(JobContext jobContext) { } public void setupTask(TaskAttemptContext taskContext) { } @Override + @Deprecated public boolean isRecoverySupported() { return true; }