MAPREDUCE-2995. Better handling of expired containers in MapReduce ApplicationMaster. Contributed by Vinod K V.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170279 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Arun Murthy 2011-09-13 18:12:02 +00:00
parent e4dc2e1f56
commit 53f921418d
5 changed files with 89 additions and 3 deletions

View File

@ -1306,6 +1306,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2874. Fix formatting of ApplicationId in web-ui. (Eric Payne via
acmurthy)
MAPREDUCE-2995. Better handling of expired containers in MapReduce
ApplicationMaster. (vinodkv via acmurthy)
Release 0.22.0 - Unreleased
INCOMPATIBLE CHANGES

View File

@ -204,6 +204,11 @@ TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition(
.addTransition(TaskAttemptState.ASSIGNED, TaskAttemptState.FAILED,
TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED,
new DeallocateContainerTransition(TaskAttemptState.FAILED, false))
.addTransition(TaskAttemptState.ASSIGNED,
TaskAttemptState.FAIL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_CONTAINER_COMPLETED,
CLEANUP_CONTAINER_TRANSITION)
// ^ If RM kills the container due to expiry, preemption etc.
.addTransition(TaskAttemptState.ASSIGNED,
TaskAttemptState.KILL_CONTAINER_CLEANUP,
TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION)
@ -925,7 +930,8 @@ public void handle(TaskAttemptEvent event) {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Can't handle this event at current state for "
+ this.attemptId, e);
eventHandler.handle(new JobDiagnosticsUpdateEvent(
this.attemptId.getTaskId().getJobId(), "Invalid event " + event.getType() +
" on TaskAttempt " + this.attemptId));

View File

@ -528,7 +528,8 @@ public void handle(TaskEvent event) {
try {
stateMachine.doTransition(event.getType(), event);
} catch (InvalidStateTransitonException e) {
LOG.error("Can't handle this event at current state", e);
LOG.error("Can't handle this event at current state for "
+ this.taskId, e);
internalError(event.getType());
}
if (oldState != getState()) {

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2.app;
import java.io.IOException;
import java.util.Iterator;
import java.util.Map;
@ -36,6 +37,12 @@
import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent;
import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherImpl;
import org.apache.hadoop.yarn.api.ContainerManager;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.ContainerToken;
import org.junit.Test;
/**
@ -160,6 +167,74 @@ public void testTimedOutTask() throws Exception {
}
}
@Test
public void testTaskFailWithUnusedContainer() throws Exception {
MRApp app = new FailingTaskWithUnusedContainer();
Configuration conf = new Configuration();
int maxAttempts = 1;
conf.setInt(MRJobConfig.MAP_MAX_ATTEMPTS, maxAttempts);
// disable uberization (requires entire job to be reattempted, so max for
// subtask attempts is overridden to 1)
conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false);
Job job = app.submit(conf);
app.waitForState(job, JobState.RUNNING);
Map<TaskId, Task> tasks = job.getTasks();
Assert.assertEquals("Num tasks is not correct", 1, tasks.size());
Task task = tasks.values().iterator().next();
app.waitForState(task, TaskState.SCHEDULED);
Map<TaskAttemptId, TaskAttempt> attempts = tasks.values().iterator()
.next().getAttempts();
Assert.assertEquals("Num attempts is not correct", maxAttempts, attempts
.size());
TaskAttempt attempt = attempts.values().iterator().next();
app.waitForState(attempt, TaskAttemptState.ASSIGNED);
app.getDispatcher().getEventHandler().handle(
new TaskAttemptEvent(attempt.getID(),
TaskAttemptEventType.TA_CONTAINER_COMPLETED));
app.waitForState(job, JobState.FAILED);
}
static class FailingTaskWithUnusedContainer extends MRApp {
public FailingTaskWithUnusedContainer() {
super(1, 0, false, "TaskFailWithUnsedContainer", true);
}
protected ContainerLauncher createContainerLauncher(AppContext context,
boolean isLocal) {
return new ContainerLauncherImpl(context) {
@Override
public void handle(ContainerLauncherEvent event) {
switch (event.getType()) {
case CONTAINER_REMOTE_LAUNCH:
super.handle(event);
break;
case CONTAINER_REMOTE_CLEANUP:
getContext().getEventHandler().handle(
new TaskAttemptEvent(event.getTaskAttemptID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
break;
}
}
@Override
protected ContainerManager getCMProxy(ContainerId containerID,
String containerManagerBindAddr, ContainerToken containerToken)
throws IOException {
try {
synchronized (this) {
wait(); // Just hang the thread simulating a very slow NM.
}
} catch (InterruptedException e) {
e.printStackTrace();
}
return null;
}
};
};
}
static class TimeOutTaskMRApp extends MRApp {
TimeOutTaskMRApp(int maps, int reduces) {
super(maps, reduces, false, "TimeOutTaskMRApp", true);
@ -232,5 +307,6 @@ public static void main(String[] args) throws Exception {
t.testTimedOutTask();
t.testMapFailureMaxPercent();
t.testReduceFailureMaxPercent();
t.testTaskFailWithUnusedContainer();
}
}

View File

@ -46,7 +46,7 @@ public class SchedulerUtils {
"Container of a completed application";
public static final String EXPIRED_CONTAINER =
"Container expired since it unused";
"Container expired since it was unused";
public static final String UNRESERVED_CONTAINER =
"Container reservation no longer required.";