From 0b73dde6ce865ff94b483558ff0701de9932e211 Mon Sep 17 00:00:00 2001 From: Thomas White Date: Thu, 21 Feb 2013 11:56:08 +0000 Subject: [PATCH] MAPREDUCE-4951. Container preemption interpreted as task failure. Contributed by Sandy Ryza. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1448615 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 +++ .../v2/app/job/impl/TaskAttemptImpl.java | 1 - .../v2/app/rm/RMContainerAllocator.java | 20 ++++++++++++-- .../v2/app/TestRMContainerAllocator.java | 27 +++++++++++++++++++ 4 files changed, 48 insertions(+), 3 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 628fc97300..e5ba0124e7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -183,6 +183,9 @@ Release 2.0.4-beta - UNRELEASED MAPREDUCE-5013. mapred.JobStatus compatibility: MR2 missing constructors from MR1. (Sandy Ryza via tomwhite) + MAPREDUCE-4951. Container preemption interpreted as task failure. + (Sandy Ryza via tomwhite) + Release 2.0.3-alpha - 2013-02-06 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 6250f86295..8d834c3da7 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -238,7 +238,6 @@ TaskAttemptEventType.TA_FAILMSG, new DeallocateContainerTransition( TaskAttemptStateInternal.FAIL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_CONTAINER_COMPLETED, CLEANUP_CONTAINER_TRANSITION) - // ^ If RM kills the container due to expiry, preemption etc. .addTransition(TaskAttemptStateInternal.ASSIGNED, TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, TaskAttemptEventType.TA_KILL, CLEANUP_CONTAINER_TRANSITION) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index d21528aa37..430d8fd945 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -67,9 +67,12 @@ import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.util.RackResolver; +import com.google.common.annotations.VisibleForTesting; + /** * Allocates the container from the ResourceManager scheduler. */ @@ -606,8 +609,8 @@ private List getResources() throws Exception { assignedRequests.remove(attemptID); // send the container completed event to Task attempt - eventHandler.handle(new TaskAttemptEvent(attemptID, - TaskAttemptEventType.TA_CONTAINER_COMPLETED)); + eventHandler.handle(createContainerFinishedEvent(cont, attemptID)); + // Send the diagnostics String diagnostics = StringInterner.weakIntern(cont.getDiagnostics()); eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(attemptID, @@ -617,6 +620,19 @@ private List getResources() throws Exception { return newContainers; } + @VisibleForTesting + public TaskAttemptEvent createContainerFinishedEvent(ContainerStatus cont, + TaskAttemptId attemptID) { + if (cont.getExitStatus() == YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS) { + // killed by framework + return new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_KILL); + } else { + return new TaskAttemptEvent(attemptID, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + } + } + @SuppressWarnings("unchecked") private void handleUpdatedNodes(AMResponse response) { // send event to the job about on updated nodes diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java index 47845a0aa6..c27c71cfe4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRMContainerAllocator.java @@ -83,6 +83,7 @@ import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; import org.apache.hadoop.yarn.event.Event; @@ -1645,6 +1646,32 @@ public void run() { Assert.assertTrue(callbackCalled.get()); } + @Test + public void testCompletedContainerEvent() { + RMContainerAllocator allocator = new RMContainerAllocator( + mock(ClientService.class), mock(AppContext.class)); + + TaskAttemptId attemptId = MRBuilderUtils.newTaskAttemptId( + MRBuilderUtils.newTaskId( + MRBuilderUtils.newJobId(1, 1, 1), 1, TaskType.MAP), 1); + ContainerId containerId = BuilderUtils.newContainerId(1, 1, 1, 1); + ContainerStatus status = BuilderUtils.newContainerStatus( + containerId, ContainerState.RUNNING, "", 0); + + ContainerStatus abortedStatus = BuilderUtils.newContainerStatus( + containerId, ContainerState.RUNNING, "", + YarnConfiguration.ABORTED_CONTAINER_EXIT_STATUS); + + TaskAttemptEvent event = allocator.createContainerFinishedEvent(status, + attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_CONTAINER_COMPLETED, + event.getType()); + + TaskAttemptEvent abortedEvent = allocator.createContainerFinishedEvent( + abortedStatus, attemptId); + Assert.assertEquals(TaskAttemptEventType.TA_KILL, abortedEvent.getType()); + } + public static void main(String[] args) throws Exception { TestRMContainerAllocator t = new TestRMContainerAllocator(); t.testSimple();