MAPREDUCE-6638. Do not attempt to recover progress from previous job attempts if spill encryption is enabled. (Haibo Chen via kasha)
This commit is contained in:
parent
744208431f
commit
de7a0a92ca
@ -149,7 +149,6 @@
|
||||
import org.apache.hadoop.yarn.security.AMRMTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.security.client.ClientToAMTokenSecretManager;
|
||||
import org.apache.hadoop.yarn.util.Clock;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.hadoop.yarn.util.SystemClock;
|
||||
import org.apache.log4j.LogManager;
|
||||
|
||||
@ -1303,44 +1302,77 @@ public Boolean call(Configuration conf) throws IOException {
|
||||
}
|
||||
|
||||
private void processRecovery() throws IOException{
|
||||
if (appAttemptID.getAttemptId() == 1) {
|
||||
return; // no need to recover on the first attempt
|
||||
boolean attemptRecovery = shouldAttemptRecovery();
|
||||
boolean recoverySucceeded = true;
|
||||
if (attemptRecovery) {
|
||||
LOG.info("Attempting to recover.");
|
||||
try {
|
||||
parsePreviousJobHistory();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to parse prior job history, aborting recovery", e);
|
||||
recoverySucceeded = false;
|
||||
}
|
||||
}
|
||||
|
||||
if (!isFirstAttempt() && (!attemptRecovery || !recoverySucceeded)) {
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isFirstAttempt() {
|
||||
return appAttemptID.getAttemptId() == 1;
|
||||
}
|
||||
|
||||
/**
|
||||
* Check if the current job attempt should try to recover from previous
|
||||
* job attempts if any.
|
||||
*/
|
||||
private boolean shouldAttemptRecovery() throws IOException {
|
||||
if (isFirstAttempt()) {
|
||||
return false; // no need to recover on the first attempt
|
||||
}
|
||||
|
||||
boolean recoveryEnabled = getConfig().getBoolean(
|
||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE,
|
||||
MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE_DEFAULT);
|
||||
if (!recoveryEnabled) {
|
||||
LOG.info("Not attempting to recover. Recovery disabled. To enable " +
|
||||
"recovery, set " + MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE);
|
||||
return false;
|
||||
}
|
||||
|
||||
boolean recoverySupportedByCommitter = isRecoverySupported();
|
||||
if (!recoverySupportedByCommitter) {
|
||||
LOG.info("Not attempting to recover. Recovery is not supported by " +
|
||||
committer.getClass() + ". Use an OutputCommitter that supports" +
|
||||
" recovery.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// If a shuffle secret was not provided by the job client then this app
|
||||
// attempt will generate one. However that disables recovery if there
|
||||
// are reducers as the shuffle secret would be app attempt specific.
|
||||
int numReduceTasks = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
int reducerCount = getConfig().getInt(MRJobConfig.NUM_REDUCES, 0);
|
||||
|
||||
// If a shuffle secret was not provided by the job client, one will be
|
||||
// generated in this job attempt. However, that disables recovery if
|
||||
// there are reducers as the shuffle secret would be job attempt specific.
|
||||
boolean shuffleKeyValidForRecovery =
|
||||
TokenCache.getShuffleSecretKey(jobCredentials) != null;
|
||||
|
||||
if (recoveryEnabled && recoverySupportedByCommitter
|
||||
&& (numReduceTasks <= 0 || shuffleKeyValidForRecovery)) {
|
||||
LOG.info("Recovery is enabled. "
|
||||
+ "Will try to recover from previous life on best effort basis.");
|
||||
try {
|
||||
parsePreviousJobHistory();
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to parse prior job history, aborting recovery", e);
|
||||
// try to get just the AMInfos
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
}
|
||||
} else {
|
||||
LOG.info("Will not try to recover. recoveryEnabled: "
|
||||
+ recoveryEnabled + " recoverySupportedByCommitter: "
|
||||
+ recoverySupportedByCommitter + " numReduceTasks: "
|
||||
+ numReduceTasks + " shuffleKeyValidForRecovery: "
|
||||
+ shuffleKeyValidForRecovery + " ApplicationAttemptID: "
|
||||
+ appAttemptID.getAttemptId());
|
||||
// Get the amInfos anyways whether recovery is enabled or not
|
||||
amInfos.addAll(readJustAMInfos());
|
||||
if (reducerCount > 0 && !shuffleKeyValidForRecovery) {
|
||||
LOG.info("Not attempting to recover. The shuffle key is invalid for " +
|
||||
"recovery.");
|
||||
return false;
|
||||
}
|
||||
|
||||
// If the intermediate data is encrypted, recovering the job requires the
|
||||
// access to the key. Until the encryption key is persisted, we should
|
||||
// avoid attempts to recover.
|
||||
boolean spillEncrypted = CryptoUtils.isEncryptedSpillEnabled(getConfig());
|
||||
if (reducerCount > 0 && spillEncrypted) {
|
||||
LOG.info("Not attempting to recover. Intermediate spill encryption" +
|
||||
" is enabled.");
|
||||
return false;
|
||||
}
|
||||
|
||||
return true;
|
||||
}
|
||||
|
||||
private static FSDataInputStream getPreviousJobHistoryStream(
|
||||
@ -1440,6 +1472,10 @@ private List<AMInfo> readJustAMInfos() {
|
||||
return amInfos;
|
||||
}
|
||||
|
||||
public boolean recovered() {
|
||||
return recoveredJobStartTime > 0;
|
||||
}
|
||||
|
||||
/**
|
||||
* This can be overridden to instantiate multiple jobs and create a
|
||||
* workflow.
|
||||
|
@ -579,6 +579,72 @@ public void testRecoverySuccessUsingCustomOutputCommitter() throws Exception {
|
||||
app.verifyCompleted();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRecoveryWithSpillEncryption() throws Exception {
|
||||
int runCount = 0;
|
||||
MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(),
|
||||
true, ++runCount) {
|
||||
};
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
|
||||
conf.setBoolean("mapred.mapper.new-api", true);
|
||||
conf.setBoolean("mapred.reducer.new-api", true);
|
||||
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
|
||||
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
|
||||
conf.setBoolean(MRJobConfig.MR_ENCRYPTED_INTERMEDIATE_DATA, true);
|
||||
|
||||
// run the MR job at the first attempt
|
||||
Job jobAttempt1 = app.submit(conf);
|
||||
app.waitForState(jobAttempt1, JobState.RUNNING);
|
||||
|
||||
Iterator<Task> tasks = jobAttempt1.getTasks().values().iterator();
|
||||
|
||||
// finish the map task but the reduce task
|
||||
Task mapper = tasks.next();
|
||||
app.waitForState(mapper, TaskState.RUNNING);
|
||||
TaskAttempt mapAttempt = mapper.getAttempts().values().iterator().next();
|
||||
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
app.waitForState(mapper, TaskState.SUCCEEDED);
|
||||
|
||||
// crash the first attempt of the MR job
|
||||
app.stop();
|
||||
|
||||
// run the MR job again at the second attempt
|
||||
app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), false,
|
||||
++runCount);
|
||||
Job jobAttempt2 = app.submit(conf);
|
||||
Assert.assertTrue("Recovery from previous job attempt is processed even " +
|
||||
"though intermediate data encryption is enabled.", !app.recovered());
|
||||
|
||||
// The map task succeeded from previous job attempt will not be recovered
|
||||
// because the data spill encryption is enabled.
|
||||
// Let's finish the job at the second attempt and verify its completion.
|
||||
app.waitForState(jobAttempt2, JobState.RUNNING);
|
||||
tasks = jobAttempt2.getTasks().values().iterator();
|
||||
mapper = tasks.next();
|
||||
Task reducer = tasks.next();
|
||||
|
||||
// finish the map task first
|
||||
app.waitForState(mapper, TaskState.RUNNING);
|
||||
mapAttempt = mapper.getAttempts().values().iterator().next();
|
||||
app.waitForState(mapAttempt, TaskAttemptState.RUNNING);
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(mapAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
app.waitForState(mapper, TaskState.SUCCEEDED);
|
||||
|
||||
// then finish the reduce task
|
||||
TaskAttempt redAttempt = reducer.getAttempts().values().iterator().next();
|
||||
app.waitForState(redAttempt, TaskAttemptState.RUNNING);
|
||||
app.getContext().getEventHandler().handle(
|
||||
new TaskAttemptEvent(redAttempt.getID(), TaskAttemptEventType.TA_DONE));
|
||||
app.waitForState(reducer, TaskState.SUCCEEDED);
|
||||
|
||||
// verify that the job succeeds at the 2rd attempt
|
||||
app.waitForState(jobAttempt2, JobState.SUCCEEDED);
|
||||
}
|
||||
|
||||
/**
|
||||
* This test case primarily verifies if the recovery is controlled through config
|
||||
* property. In this case, recover is turned OFF. AM with 3 maps and 0 reduce.
|
||||
|
Loading…
Reference in New Issue
Block a user