From 6804ef32fcd221dd05db14f2c7a266026b70fa05 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Tue, 10 Jul 2012 16:10:14 +0000 Subject: [PATCH] MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1359747 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../mapreduce/v2/app/job/impl/TaskImpl.java | 14 ++++++- .../v2/app/job/impl/TestTaskImpl.java | 37 ++++++++++++++++++- 3 files changed, 50 insertions(+), 4 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 62714ee889..01254e14d3 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -667,6 +667,9 @@ Release 0.23.3 - UNRELEASED MAPREDUCE-4300. OOM in AM can turn it into a zombie. (Robert Evans via tgraves) + MAPREDUCE-4252. MR2 job never completes with 1 pending task (Tom White via + bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index c7f89f9ac5..cf5d92b4c7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -189,7 +189,7 @@ TaskEventType.T_ADD_SPEC_ATTEMPT, new RedundantScheduleTransition()) // Transitions from SUCCEEDED state .addTransition(TaskState.SUCCEEDED, //only possible for map tasks - EnumSet.of(TaskState.SCHEDULED, TaskState.FAILED), + EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED, TaskState.FAILED), TaskEventType.T_ATTEMPT_FAILED, new MapRetroactiveFailureTransition()) .addTransition(TaskState.SUCCEEDED, //only possible for map tasks EnumSet.of(TaskState.SCHEDULED, TaskState.SUCCEEDED), @@ -618,7 +618,7 @@ public void handle(TaskEvent event) { } } - private void internalError(TaskEventType type) { + protected void internalError(TaskEventType type) { LOG.error("Invalid event " + type + " on Task " + this.taskId); eventHandler.handle(new JobDiagnosticsUpdateEvent( this.taskId.getJobId(), "Invalid event " + type + @@ -896,6 +896,16 @@ private static class MapRetroactiveFailureTransition @Override public TaskState transition(TaskImpl task, TaskEvent event) { + if (event instanceof TaskTAttemptEvent) { + TaskTAttemptEvent castEvent = (TaskTAttemptEvent) event; + if (task.getState() == TaskState.SUCCEEDED && + !castEvent.getTaskAttemptID().equals(task.successfulAttempt)) { + // don't allow a different task attempt to override a previous + // succeeded state + return TaskState.SUCCEEDED; + } + } + //verify that this occurs only for map task //TODO: consider moving it to MapTaskImpl if (!TaskType.MAP.equals(task.getType())) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 4ff6001bf5..af77f7dbb7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -20,6 +20,7 @@ import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -127,8 +128,13 @@ protected TaskAttemptImpl createAttempt() { @Override protected int getMaxAttempts() { return 100; - } - + } + + @Override + protected void internalError(TaskEventType type) { + super.internalError(type); + fail("Internal error: " + type); + } } private class MockTaskAttemptImpl extends TaskAttemptImpl { @@ -462,5 +468,32 @@ public void testFailureDuringTaskAttemptCommit() { assertTaskSucceededState(); } + + @Test + public void testSpeculativeTaskAttemptSucceedsEvenIfFirstFails() { + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + updateLastAttemptState(TaskAttemptState.RUNNING); + + // Add a speculative task attempt that succeeds + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + + // The task should now have succeeded + assertTaskSucceededState(); + + // Now fail the first task attempt, after the second has succeeded + mockTask.handle(new TaskTAttemptEvent(taskAttempts.get(0).getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + + // The task should still be in the succeeded state + assertTaskSucceededState(); + + } }