MAPREDUCE-3858. Task attempt failure during commit results in task never completing. (Tom White via mahadev)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1244254 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Mahadev Konar 2012-02-14 22:49:00 +00:00
parent 16a44b6fa5
commit 83ab8f087b
3 changed files with 59 additions and 4 deletions

View File

@ -815,6 +815,9 @@ Release 0.23.1 - 2012-02-08
MAPREDUCE-3802. Added test to validate that AM can crash multiple times and MAPREDUCE-3802. Added test to validate that AM can crash multiple times and
still can recover successfully after MAPREDUCE-3846. (vinodkv) still can recover successfully after MAPREDUCE-3846. (vinodkv)
MAPREDUCE-3858. Task attempt failure during commit results in task never completing.
(Tom White via mahadev)
Release 0.23.0 - 2011-11-01 Release 0.23.0 - 2011-11-01
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -832,6 +832,9 @@ private static class AttemptFailedTransition implements
public TaskState transition(TaskImpl task, TaskEvent event) { public TaskState transition(TaskImpl task, TaskEvent event) {
task.failedAttempts++; task.failedAttempts++;
TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event;
if (castEvent.getTaskAttemptID().equals(task.commitAttempt)) {
task.commitAttempt = null;
}
TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID()); TaskAttempt attempt = task.attempts.get(castEvent.getTaskAttemptID());
if (attempt.getAssignedContainerMgrAddress() != null) { if (attempt.getAssignedContainerMgrAddress() != null) {
//container was assigned //container was assigned
@ -877,6 +880,7 @@ protected TaskState getDefaultState(Task task) {
protected void unSucceed(TaskImpl task) { protected void unSucceed(TaskImpl task) {
++task.numberUncompletedAttempts; ++task.numberUncompletedAttempts;
task.commitAttempt = null;
task.successfulAttempt = null; task.successfulAttempt = null;
} }
} }

View File

@ -18,6 +18,8 @@
package org.apache.hadoop.mapreduce.v2.app.job.impl; package org.apache.hadoop.mapreduce.v2.app.job.impl;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock; import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.when; import static org.mockito.Mockito.when;
@ -261,6 +263,12 @@ private void launchTaskAttempt(TaskAttemptId attemptId) {
assertTaskRunningState(); assertTaskRunningState();
} }
private void commitTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_COMMIT_PENDING));
assertTaskRunningState();
}
private MockTaskAttemptImpl getLastAttempt() { private MockTaskAttemptImpl getLastAttempt() {
return taskAttempts.get(taskAttempts.size()-1); return taskAttempts.get(taskAttempts.size()-1);
} }
@ -279,32 +287,45 @@ private void killRunningTaskAttempt(TaskAttemptId attemptId) {
assertTaskRunningState(); assertTaskRunningState();
} }
private void failRunningTaskAttempt(TaskAttemptId attemptId) {
mockTask.handle(new TaskTAttemptEvent(attemptId,
TaskEventType.T_ATTEMPT_FAILED));
assertTaskRunningState();
}
/** /**
* {@link TaskState#NEW} * {@link TaskState#NEW}
*/ */
private void assertTaskNewState() { private void assertTaskNewState() {
assertEquals(mockTask.getState(), TaskState.NEW); assertEquals(TaskState.NEW, mockTask.getState());
} }
/** /**
* {@link TaskState#SCHEDULED} * {@link TaskState#SCHEDULED}
*/ */
private void assertTaskScheduledState() { private void assertTaskScheduledState() {
assertEquals(mockTask.getState(), TaskState.SCHEDULED); assertEquals(TaskState.SCHEDULED, mockTask.getState());
} }
/** /**
* {@link TaskState#RUNNING} * {@link TaskState#RUNNING}
*/ */
private void assertTaskRunningState() { private void assertTaskRunningState() {
assertEquals(mockTask.getState(), TaskState.RUNNING); assertEquals(TaskState.RUNNING, mockTask.getState());
} }
/** /**
* {@link TaskState#KILL_WAIT} * {@link TaskState#KILL_WAIT}
*/ */
private void assertTaskKillWaitState() { private void assertTaskKillWaitState() {
assertEquals(mockTask.getState(), TaskState.KILL_WAIT); assertEquals(TaskState.KILL_WAIT, mockTask.getState());
}
/**
* {@link TaskState#SUCCEEDED}
*/
private void assertTaskSucceededState() {
assertEquals(TaskState.SUCCEEDED, mockTask.getState());
} }
@Test @Test
@ -410,4 +431,31 @@ public void testTaskProgress() {
} }
@Test
public void testFailureDuringTaskAttemptCommit() {
TaskId taskId = getNewTaskID();
scheduleTaskAttempt(taskId);
launchTaskAttempt(getLastAttempt().getAttemptId());
updateLastAttemptState(TaskAttemptState.COMMIT_PENDING);
commitTaskAttempt(getLastAttempt().getAttemptId());
// During the task attempt commit there is an exception which causes
// the attempt to fail
updateLastAttemptState(TaskAttemptState.FAILED);
failRunningTaskAttempt(getLastAttempt().getAttemptId());
assertEquals(2, taskAttempts.size());
updateLastAttemptState(TaskAttemptState.SUCCEEDED);
commitTaskAttempt(getLastAttempt().getAttemptId());
mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(),
TaskEventType.T_ATTEMPT_SUCCEEDED));
assertFalse("First attempt should not commit",
mockTask.canCommit(taskAttempts.get(0).getAttemptId()));
assertTrue("Second attempt should commit",
mockTask.canCommit(getLastAttempt().getAttemptId()));
assertTaskSucceededState();
}
} }