diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index d94f8a55b0..4a8a90e7ab 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -149,7 +149,6 @@ import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager; import org.apache.hadoop.yarn.util.Clock; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.hadoop.yarn.util.SystemClock; import org.apache.log4j.LogManager; @@ -1303,44 +1302,77 @@ public Boolean call(Configuration conf) throws IOException { } private void processRecovery() throws IOException{ - if (appAttemptID.getAttemptId() == 1) { - return; // no need to recover on the first attempt + boolean attemptRecovery = shouldAttemptRecovery(); + boolean recoverySucceeded = true; + if (attemptRecovery) { + LOG.info("Attempting to recover."); + try { + parsePreviousJobHistory(); + } catch (IOException e) { + LOG.warn("Unable to parse prior job history, aborting recovery", e); + recoverySucceeded = false; + } + } + + if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) { + amInfos.addAll(readJustAMInfos()); + } + } + + private boolean isFirstAttempt() { + return appAttemptID.getAttemptId() == 1; + } + + /** + * Check if the current job attempt should try to recover from previous + * job attempts if any. + */ + private boolean shouldAttemptRecovery() throws IOException { + if (isFirstAttempt()) { + return false; // no need to recover on the first attempt } boolean recoveryEnabled = getConfig().getBoolean( MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT); + if (!recoveryEnabled) { + LOG.info("Not attempting to recover. Recovery disabled. To enable " + + "recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE); + return false; + } boolean recoverySupportedByCommitter = isRecoverySupported(); + if (!recoverySupportedByCommitter) { + LOG.info("Not attempting to recover. Recovery is not supported by " + + committer.getClass() + ". Use an OutputCommitter that supports" + + " recovery."); + return false; + } - // If a shuffle secret was not provided by the job client then this app - // attempt will generate one. However that disables recovery if there - // are reducers as the shuffle secret would be app attempt specific. - int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); + int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0); + + // If a shuffle secret was not provided by the job client, one will be + // generated in this job attempt. However, that disables recovery if + // there are reducers as the shuffle secret would be job attempt specific. boolean shuffleKeyValidForRecovery = TokenCache.getShuffleSecretKey(jobCredentials) != null; - - if (recoveryEnabled && recoverySupportedByCommitter - && (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) { - LOG.info("Recovery is enabled. " - + "Will try to recover from previous life on best effort basis."); - try { - parsePreviousJobHistory(); - } catch (IOException e) { - LOG.warn("Unable to parse prior job history, aborting recovery", e); - // try to get just the AMInfos - amInfos.addAll(readJustAMInfos()); - } - } else { - LOG.info("Will not try to recover. recoveryEnabled: " - + recoveryEnabled + " recoverySupportedByCommitter: " - + recoverySupportedByCommitter + " numReduceTasks: " - + numReduceTasks + " shuffleKeyValidForRecovery: " - + shuffleKeyValidForRecovery + " ApplicationAttemptID: " - + appAttemptID.getAttemptId()); - // Get the amInfos anyways whether recovery is enabled or not - amInfos.addAll(readJustAMInfos()); + if (reducerCount > 0 && !shuffleKeyValidForRecovery) { + LOG.info("Not attempting to recover. The shuffle key is invalid for " + + "recovery."); + return false; } + + // If the intermediate data is encrypted, recovering the job requires the + // access to the key. Until the encryption key is persisted, we should + // avoid attempts to recover. + boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig()); + if (reducerCount > 0 && spillEncrypted) { + LOG.info("Not attempting to recover. Intermediate spill encryption" + + " is enabled."); + return false; + } + + return true; } private static FSDataInputStream getPreviousJobHistoryStream( @@ -1440,6 +1472,10 @@ private List readJustAMInfos() { return amInfos; } + public boolean recovered() { + return recoveredJobStartTime > 0; + } + /** * This can be overridden to instantiate multiple jobs and create a * workflow. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 9d5f0ae4d7..071575a156 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -579,6 +579,72 @@ public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception { app.verifyCompleted(); } + @Test + public void testRecoveryWithSpillEncryption() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), + true, ++runCount) { + }; + Configuration conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean("mapred.mapper.new-api", true); + conf.setBoolean("mapred.reducer.new-api", true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + conf.set(FileOutputFormat.OUTDIR, outputDir.toString()); + conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true); + + // run the MR job at the first attempt + Job jobAttempt1 = app.submit(conf); + app.waitForState(jobAttempt1, JobState.RUNNING); + + Iterator tasks = jobAttempt1.getTasks().values().iterator(); + + // finish the map task but the reduce task + Task mapper = tasks.next(); + app.waitForState(mapper, TaskState.RUNNING); + TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(mapper, TaskState.SUCCEEDED); + + // crash the first attempt of the MR job + app.stop(); + + // run the MR job again at the second attempt + app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false, + ++runCount); + Job jobAttempt2 = app.submit(conf); + Assert.assertTrue("Recovery from previous job attempt is processed even " + + "though intermediate data encryption is enabled.", !app.recovered()); + + // The map task succeeded from previous job attempt will not be recovered + // because the data spill encryption is enabled. + // Let's finish the job at the second attempt and verify its completion. + app.waitForState(jobAttempt2, JobState.RUNNING); + tasks = jobAttempt2.getTasks().values().iterator(); + mapper = tasks.next(); + Task reducer = tasks.next(); + + // finish the map task first + app.waitForState(mapper, TaskState.RUNNING); + mapAttempt = mapper.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(mapper, TaskState.SUCCEEDED); + + // then finish the reduce task + TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next(); + app.waitForState(redAttempt, TaskAttemptState.RUNNING); + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE)); + app.waitForState(reducer, TaskState.SUCCEEDED); + + // verify that the job succeeds at the 2rd attempt + app.waitForState(jobAttempt2, JobState.SUCCEEDED); + } + /** * This test case primarily verifies if the recovery is controlled through config * property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.