MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery. Contributed by Jerry Chen

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1431131 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jason Darrell Lowe 2013-01-09 22:56:09 +00:00
parent 3555e7c574
commit b16dfc125d
4 changed files with 128 additions and 4 deletions

View File

@ -685,6 +685,9 @@ Release 0.23.6 - UNRELEASED
MAPREDUCE-4913. TestMRAppMaster#testMRAppMasterMissingStaging occasionally
exits (Jason Lowe via tgraves)
MAPREDUCE-4848. TaskAttemptContext cast error during AM recovery (Jerry
Chen via jlowe)
Release 0.23.5 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -579,7 +579,7 @@ protected EventHandler<JobFinishEvent> createJobFinishEventHandler() {
*/
protected Recovery createRecoveryService(AppContext appContext) {
return new RecoveryService(appContext.getApplicationAttemptId(),
appContext.getClock(), getCommitter());
appContext.getClock(), getCommitter(), isNewApiCommitter());
}
/** Create and initialize (but don't start) a single job.

View File

@ -31,6 +31,7 @@
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileContext;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.OutputCommitter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@ -100,6 +101,7 @@ public class RecoveryService extends CompositeService implements Recovery {
private final ApplicationAttemptId applicationAttemptId;
private final OutputCommitter committer;
private final boolean newApiCommitter;
private final Dispatcher dispatcher;
private final ControlledClock clock;
@ -113,10 +115,11 @@ public class RecoveryService extends CompositeService implements Recovery {
private volatile boolean recoveryMode = false;
public RecoveryService(ApplicationAttemptId applicationAttemptId,
Clock clock, OutputCommitter committer) {
Clock clock, OutputCommitter committer, boolean newApiCommitter) {
super("RecoveringDispatcher");
this.applicationAttemptId = applicationAttemptId;
this.committer = committer;
this.newApiCommitter = newApiCommitter;
this.dispatcher = createRecoveryDispatcher();
this.clock = new ControlledClock(clock);
addService((Service) dispatcher);
@ -360,8 +363,17 @@ else if (event.getType() == ContainerLauncher.EventType.CONTAINER_REMOTE_LAUNCH)
switch (state) {
case SUCCEEDED:
//recover the task output
TaskAttemptContext taskContext = new TaskAttemptContextImpl(getConfig(),
// check the committer type and construct corresponding context
TaskAttemptContext taskContext = null;
if(newApiCommitter) {
taskContext = new TaskAttemptContextImpl(getConfig(),
attInfo.getAttemptId());
} else {
taskContext = new org.apache.hadoop.mapred.TaskAttemptContextImpl(new JobConf(getConfig()),
TypeConverter.fromYarn(aId));
}
try {
TaskType type = taskContext.getTaskAttemptID().getTaskID().getTaskType();
int numReducers = taskContext.getConfiguration().getInt(MRJobConfig.NUM_REDUCES, 1);

View File

@ -626,6 +626,115 @@ public void testOutputRecoveryMapsOnly() throws Exception {
validateOutput();
}
@Test
public void testRecoveryWithOldCommiter() throws Exception {
int runCount = 0;
MRApp app = new MRAppWithHistory(1, 2, false, this.getClass().getName(),
true, ++runCount);
Configuration conf = new Configuration();
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
Iterator<Task> it = job.getTasks().values().iterator();
Task mapTask1 = it.next();
Task reduceTask1 = it.next();
// all maps must be running
app.waitForState(mapTask1, TaskState.RUNNING);
TaskAttempt task1Attempt1 = mapTask1.getAttempts().values().iterator()
.next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(task1Attempt1, TaskAttemptState.RUNNING);
//send the done signal to the map
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
task1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for map task to complete
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
app.waitForState(reduceTask1, TaskState.RUNNING);
TaskAttempt reduce1Attempt1 = reduceTask1.getAttempts().values().iterator().next();
// write output corresponding to reduce1
writeOutput(reduce1Attempt1, conf);
//send the done signal to the 1st reduce
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce1Attempt1.getID(),
TaskAttemptEventType.TA_DONE));
//wait for first reduce task to complete
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
//stop the app before the job completes.
app.stop();
//rerun
//in rerun the map will be recovered from previous run
app = new MRAppWithHistory(1, 2, false, this.getClass().getName(), false,
++runCount);
conf = new Configuration();
conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true);
conf.setBoolean("mapred.mapper.new-api", false);
conf.setBoolean("mapred.reducer.new-api", false);
conf.set(FileOutputFormat.OUTDIR, outputDir.toString());
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Assert.assertEquals("No of tasks not correct",
3, job.getTasks().size());
it = job.getTasks().values().iterator();
mapTask1 = it.next();
reduceTask1 = it.next();
Task reduceTask2 = it.next();
// map will be recovered, no need to send done
app.waitForState(mapTask1, TaskState.SUCCEEDED);
// Verify the shuffle-port after recovery
task1Attempt1 = mapTask1.getAttempts().values().iterator().next();
Assert.assertEquals(5467, task1Attempt1.getShufflePort());
// first reduce will be recovered, no need to send done
app.waitForState(reduceTask1, TaskState.SUCCEEDED);
app.waitForState(reduceTask2, TaskState.RUNNING);
TaskAttempt reduce2Attempt = reduceTask2.getAttempts().values()
.iterator().next();
//before sending the TA_DONE, event make sure attempt has come to
//RUNNING state
app.waitForState(reduce2Attempt, TaskAttemptState.RUNNING);
//send the done signal to the 2nd reduce task
app.getContext().getEventHandler().handle(
new TaskAttemptEvent(
reduce2Attempt.getID(),
TaskAttemptEventType.TA_DONE));
//wait to get it completed
app.waitForState(reduceTask2, TaskState.SUCCEEDED);
app.waitForState(job, JobState.SUCCEEDED);
app.verifyCompleted();
validateOutput();
}
private void writeBadOutput(TaskAttempt attempt, Configuration conf)
throws Exception {
TaskAttemptContext tContext = new TaskAttemptContextImpl(conf,