diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java index 9bcc838173..767ef0d7a2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskAttemptKillEvent.java @@ -24,14 +24,27 @@ public class TaskAttemptKillEvent extends TaskAttemptEvent { private final String message; + // Next map attempt will be rescheduled(i.e. updated in ask with higher + // priority equivalent to that of a fast fail map) + private final boolean rescheduleAttempt; + + public TaskAttemptKillEvent(TaskAttemptId attemptID, + String message, boolean rescheduleAttempt) { + super(attemptID, TaskAttemptEventType.TA_KILL); + this.message = message; + this.rescheduleAttempt = rescheduleAttempt; + } public TaskAttemptKillEvent(TaskAttemptId attemptID, String message) { - super(attemptID, TaskAttemptEventType.TA_KILL); - this.message = message; + this(attemptID, message, false); } public String getMessage() { return message; } + + public boolean getRescheduleAttempt() { + return rescheduleAttempt; + } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java new file mode 100644 index 0000000000..897444d7dc --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/event/TaskTAttemptKilledEvent.java @@ -0,0 +1,40 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.mapreduce.v2.app.job.event; + +import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId; + +/** + * Task Attempt killed event. + */ +public class TaskTAttemptKilledEvent extends TaskTAttemptEvent { + + // Next map attempt will be rescheduled(i.e. updated in ask with + // higher priority equivalent to that of a fast fail map) + private final boolean rescheduleAttempt; + + public TaskTAttemptKilledEvent(TaskAttemptId id, boolean rescheduleAttempt) { + super(id, TaskEventType.T_ATTEMPT_KILLED); + this.rescheduleAttempt = rescheduleAttempt; + } + + public boolean getRescheduleAttempt() { + return rescheduleAttempt; + } +} diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java index c8c5ce90ca..b7036a5363 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/JobImpl.java @@ -1349,7 +1349,9 @@ private void actOnUnusableNode(NodeId nodeId, NodeState nodeState) { if (TaskType.MAP == id.getTaskId().getTaskType()) { // reschedule only map tasks because their outputs maybe unusable LOG.info(mesg + ". AttemptId:" + id); - eventHandler.handle(new TaskAttemptKillEvent(id, mesg)); + // Kill the attempt and indicate that next map attempt should be + // rescheduled (i.e. considered as a fast fail map). + eventHandler.handle(new TaskAttemptKillEvent(id, mesg, true)); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 5f0a622ec4..da6617e00e 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -98,6 +98,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; @@ -184,6 +185,7 @@ public abstract class TaskAttemptImpl implements private int httpPort; private Locality locality; private Avataar avataar; + private boolean rescheduleNextAttempt = false; private static final CleanupContainerTransition CLEANUP_CONTAINER_TRANSITION = new CleanupContainerTransition(); @@ -1377,6 +1379,16 @@ boolean isContainerAssigned() { return container != null; } + //always called in write lock + private boolean getRescheduleNextAttempt() { + return rescheduleNextAttempt; + } + + //always called in write lock + private void setRescheduleNextAttempt(boolean reschedule) { + rescheduleNextAttempt = reschedule; + } + //always called in write lock private void setFinishTime() { //set the finish time only if launch time is set @@ -1745,9 +1757,8 @@ public void transition(TaskAttemptImpl taskAttempt, TaskEventType.T_ATTEMPT_FAILED)); break; case KILLED: - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, false)); break; default: LOG.error("Task final state is not FAILED or KILLED: " + finalState); @@ -2014,8 +2025,13 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, taskAttempt, TaskAttemptStateInternal.KILLED); taskAttempt.eventHandler.handle(new JobHistoryEvent(taskAttempt.attemptId .getTaskId().getJobId(), tauce)); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); + boolean rescheduleNextTaskAttempt = false; + if (event instanceof TaskAttemptKillEvent) { + rescheduleNextTaskAttempt = + ((TaskAttemptKillEvent)event).getRescheduleAttempt(); + } + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, rescheduleNextTaskAttempt)); return TaskAttemptStateInternal.KILLED; } } @@ -2044,6 +2060,12 @@ public TaskAttemptStateInternal transition(TaskAttemptImpl taskAttempt, taskAttempt.getID().toString()); return TaskAttemptStateInternal.SUCCESS_CONTAINER_CLEANUP; } else { + // Store reschedule flag so that after clean up is completed, new + // attempt is scheduled/rescheduled based on it. + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.setRescheduleNextAttempt( + ((TaskAttemptKillEvent)event).getRescheduleAttempt()); + } return TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP; } } @@ -2075,9 +2097,8 @@ public void transition(TaskAttemptImpl taskAttempt, ((TaskAttemptKillEvent) event).getMessage()); } - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, taskAttempt.getRescheduleNextAttempt())); } } @@ -2095,9 +2116,8 @@ public void transition(TaskAttemptImpl taskAttempt, taskAttempt.getAssignedContainerID(), taskAttempt.getAssignedContainerMgrAddress(), taskAttempt.container.getContainerToken(), ContainerLauncher.EventType.CONTAINER_REMOTE_CLEANUP)); - taskAttempt.eventHandler.handle(new TaskTAttemptEvent( - taskAttempt.attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + taskAttempt.eventHandler.handle(new TaskTAttemptKilledEvent( + taskAttempt.attemptId, false)); } } @@ -2137,6 +2157,12 @@ public void transition(TaskAttemptImpl taskAttempt, // for it. finalizeProgress(taskAttempt); sendContainerCleanup(taskAttempt, event); + // Store reschedule flag so that after clean up is completed, new + // attempt is scheduled/rescheduled based on it. + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.setRescheduleNextAttempt( + ((TaskAttemptKillEvent)event).getRescheduleAttempt()); + } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index a392837eb1..34d9f0ef13 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -76,6 +76,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerFailedEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; @@ -594,10 +595,15 @@ protected TaskAttempt getSuccessfulAttempt() { // This is always called in the Write Lock private void addAndScheduleAttempt(Avataar avataar) { + addAndScheduleAttempt(avataar, false); + } + + // This is always called in the Write Lock + private void addAndScheduleAttempt(Avataar avataar, boolean reschedule) { TaskAttempt attempt = addAttempt(avataar); inProgressAttempts.add(attempt.getID()); //schedule the nextAttemptNumber - if (failedAttempts.size() > 0) { + if (failedAttempts.size() > 0 || reschedule) { eventHandler.handle(new TaskAttemptEvent(attempt.getID(), TaskAttemptEventType.TA_RESCHEDULE)); } else { @@ -968,7 +974,12 @@ public void transition(TaskImpl task, TaskEvent event) { task.finishedAttempts.add(taskAttemptId); task.inProgressAttempts.remove(taskAttemptId); if (task.successfulAttempt == null) { - task.addAndScheduleAttempt(Avataar.VIRGIN); + boolean rescheduleNewAttempt = false; + if (event instanceof TaskTAttemptKilledEvent) { + rescheduleNewAttempt = + ((TaskTAttemptKilledEvent)event).getRescheduleAttempt(); + } + task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNewAttempt); } if ((task.commitAttempt != null) && (task.commitAttempt == taskAttemptId)) { task.commitAttempt = null; @@ -1187,7 +1198,15 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { // from the map splitInfo. So the bad node might be sent as a location // to the RM. But the RM would ignore that just like it would ignore // currently pending container requests affinitized to bad nodes. - task.addAndScheduleAttempt(Avataar.VIRGIN); + boolean rescheduleNextTaskAttempt = false; + if (event instanceof TaskTAttemptKilledEvent) { + // Decide whether to reschedule next task attempt. If true, this + // typically indicates that a successful map attempt was killed on an + // unusable node being reported. + rescheduleNextTaskAttempt = + ((TaskTAttemptKilledEvent)event).getRescheduleAttempt(); + } + task.addAndScheduleAttempt(Avataar.VIRGIN, rescheduleNextTaskAttempt); return TaskStateInternal.SCHEDULED; } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java index 73745d358e..0f4b59bd3f 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/rm/RMContainerAllocator.java @@ -925,9 +925,11 @@ private void handleUpdatedNodes(AllocateResponse response) { LOG.info("Killing taskAttempt:" + tid + " because it is running on unusable node:" + taskAttemptNodeId); + // If map, reschedule next task attempt. + boolean rescheduleNextAttempt = (i == 0) ? true : false; eventHandler.handle(new TaskAttemptKillEvent(tid, "TaskAttempt killed because it ran on unusable node" - + taskAttemptNodeId)); + + taskAttemptNodeId, rescheduleNextAttempt)); } } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java index eb6b93292b..eaf107050d 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestMRApp.java @@ -26,6 +26,7 @@ import java.util.Collection; import java.util.Iterator; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicInteger; import com.google.common.base.Supplier; import org.apache.hadoop.test.GenericTestUtils; @@ -56,13 +57,19 @@ import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncherEvent; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerRemoteLaunchEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocator; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerAllocatorEvent; +import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeReport; import org.apache.hadoop.yarn.api.records.NodeState; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.junit.Test; +import org.mockito.Mockito; /** * Tests the state machine of MR App. @@ -201,13 +208,18 @@ public void testCompletedMapsForReduceSlowstart() throws Exception { @Test public void testUpdatedNodes() throws Exception { int runCount = 0; + Dispatcher disp = Mockito.spy(new AsyncDispatcher()); MRApp app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), - true, ++runCount); + true, ++runCount, disp); Configuration conf = new Configuration(); // after half of the map completion, reduce will start conf.setFloat(MRJobConfig.COMPLETED_MAPS_FOR_REDUCE_SLOWSTART, 0.5f); // uberization forces full slowstart (1.0), so disable that conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + + ContainerAllocEventHandler handler = new ContainerAllocEventHandler(); + disp.register(ContainerAllocator.EventType.class, handler); + final Job job1 = app.submit(conf); app.waitForState(job1, JobState.RUNNING); Assert.assertEquals("Num tasks not correct", 4, job1.getTasks().size()); @@ -285,6 +297,12 @@ public Boolean get() { events = job1.getTaskAttemptCompletionEvents(0, 100); Assert.assertEquals("Expecting 2 more completion events for killed", 4, events.length); + // 2 map task attempts which were killed above should be requested from + // container allocator with the previous map task marked as failed. If + // this happens allocator will request the container for this mapper from + // RM at a higher priority of 5(i.e. with a priority equivalent to that of + // a fail fast map). + handler.waitForFailedMapContainerReqEvents(2); // all maps must be back to running app.waitForState(mapTask1, TaskState.RUNNING); @@ -324,7 +342,7 @@ public Boolean get() { // rerun // in rerun the 1st map will be recovered from previous run app = new MRAppWithHistory(2, 2, false, this.getClass().getName(), false, - ++runCount); + ++runCount, (Dispatcher)new AsyncDispatcher()); conf = new Configuration(); conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); @@ -420,6 +438,25 @@ public Boolean get() { app.waitForState(job2, JobState.SUCCEEDED); } + private final class ContainerAllocEventHandler + implements EventHandler { + private AtomicInteger failedMapContainerReqEventCnt = new AtomicInteger(0); + @Override + public void handle(ContainerAllocatorEvent event) { + if (event.getType() == ContainerAllocator.EventType.CONTAINER_REQ && + ((ContainerRequestEvent)event).getEarlierAttemptFailed()) { + failedMapContainerReqEventCnt.incrementAndGet(); + } + } + public void waitForFailedMapContainerReqEvents(int count) + throws InterruptedException { + while(failedMapContainerReqEventCnt.get() != count) { + Thread.sleep(50); + } + failedMapContainerReqEventCnt.set(0); + } + } + private static void waitFor(Supplier predicate, int checkIntervalMillis, int checkTotalMillis) throws InterruptedException { try { @@ -590,9 +627,17 @@ public void handle(ContainerLauncherEvent event) { } private final class MRAppWithHistory extends MRApp { + private Dispatcher dispatcher; public MRAppWithHistory(int maps, int reduces, boolean autoComplete, - String testName, boolean cleanOnStart, int startCount) { + String testName, boolean cleanOnStart, int startCount, + Dispatcher disp) { super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); + this.dispatcher = disp; + } + + @Override + protected Dispatcher createDispatcher() { + return dispatcher; } @Override diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index 509f6af612..98dffba458 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -78,9 +78,13 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerAssignedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptContainerLaunchedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptDiagnosticsUpdateEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptTooManyFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.rm.ContainerRequestEvent; import org.apache.hadoop.mapreduce.v2.util.MRBuilderUtils; import org.apache.hadoop.security.Credentials; @@ -982,7 +986,46 @@ public void testFetchFailureAttemptFinishTime() throws Exception{ + " Task attempt finish time is not the same ", finishTime, Long.valueOf(taImpl.getFinishTime())); } - + + private void containerKillBeforeAssignment(boolean scheduleAttempt) + throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + ApplicationId appId = ApplicationId.newInstance(1, 2); + JobId jobId = MRBuilderUtils.newJobId(appId, 1); + TaskId taskId = MRBuilderUtils.newTaskId(jobId, 1, TaskType.MAP); + + TaskAttemptImpl taImpl = + new MapTaskAttemptImpl(taskId, 1, eventHandler, mock(Path.class), 1, + mock(TaskSplitMetaInfo.class), new JobConf(), + mock(TaskAttemptListener.class), mock(Token.class), + new Credentials(), SystemClock.getInstance(), + mock(AppContext.class)); + if (scheduleAttempt) { + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_SCHEDULE)); + } + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILLED", + taImpl.getInternalState(), TaskAttemptStateInternal.KILLED); + assertFalse("InternalError occurred", eventHandler.internalError); + TaskEvent event = eventHandler.lastTaskEvent; + assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType()); + // In NEW state, new map attempt should not be rescheduled. + assertFalse(((TaskTAttemptKilledEvent)event).getRescheduleAttempt()); + } + + @Test + public void testContainerKillOnNew() throws Exception { + containerKillBeforeAssignment(false); + } + + @Test + public void testContainerKillOnUnassigned() throws Exception { + containerKillBeforeAssignment(true); + } + @Test public void testContainerKillAfterAssigned() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2); @@ -1032,7 +1075,7 @@ public void testContainerKillAfterAssigned() throws Exception { taImpl.getInternalState(), TaskAttemptStateInternal.ASSIGNED); taImpl.handle(new TaskAttemptEvent(attemptId, TaskAttemptEventType.TA_KILL)); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } @@ -1089,7 +1132,7 @@ public void testContainerKillWhileRunning() throws Exception { TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } @@ -1150,12 +1193,11 @@ public void testContainerKillWhileCommitPending() throws Exception { TaskAttemptEventType.TA_KILL)); assertFalse("InternalError occurred trying to handle TA_KILL", eventHandler.internalError); - assertEquals("Task should be in KILLED state", + assertEquals("Task should be in KILL_CONTAINER_CLEANUP state", TaskAttemptStateInternal.KILL_CONTAINER_CLEANUP, taImpl.getInternalState()); } - @Test public void testKillMapTaskWhileSuccessFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); @@ -1195,6 +1237,37 @@ public void testKillMapTaskWhileSuccessFinishing() throws Exception { assertFalse("InternalError occurred", eventHandler.internalError); } + @Test + public void testKillMapTaskAfterSuccess() throws Exception { + MockEventHandler eventHandler = new MockEventHandler(); + TaskAttemptImpl taImpl = createTaskAttemptImpl(eventHandler); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_DONE)); + + assertEquals("Task attempt is not in SUCCEEDED state", taImpl.getState(), + TaskAttemptState.SUCCEEDED); + assertEquals("Task attempt's internal state is not " + + "SUCCESS_FINISHING_CONTAINER", taImpl.getInternalState(), + TaskAttemptStateInternal.SUCCESS_FINISHING_CONTAINER); + + taImpl.handle(new TaskAttemptEvent(taImpl.getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + // Send a map task attempt kill event indicating next map attempt has to be + // reschedule + taImpl.handle(new TaskAttemptKillEvent(taImpl.getID(),"", true)); + assertEquals("Task attempt is not in KILLED state", taImpl.getState(), + TaskAttemptState.KILLED); + assertEquals("Task attempt's internal state is not KILLED", + taImpl.getInternalState(), TaskAttemptStateInternal.KILLED); + assertFalse("InternalError occurred", eventHandler.internalError); + TaskEvent event = eventHandler.lastTaskEvent; + assertEquals(TaskEventType.T_ATTEMPT_KILLED, event.getType()); + // Send an attempt killed event to TaskImpl forwarding the same reschedule + // flag we received in task attempt kill event. + assertTrue(((TaskTAttemptKilledEvent)event).getRescheduleAttempt()); + } + @Test public void testKillMapTaskWhileFailFinishing() throws Exception { MockEventHandler eventHandler = new MockEventHandler(); @@ -1406,9 +1479,13 @@ private TaskAttemptImpl createTaskAttemptImpl( public static class MockEventHandler implements EventHandler { public boolean internalError; + public TaskEvent lastTaskEvent; @Override public void handle(Event event) { + if (event instanceof TaskEvent) { + lastTaskEvent = (TaskEvent)event; + } if (event instanceof JobEvent) { JobEvent je = ((JobEvent) event); if (JobEventType.INTERNAL_ERROR == je.getType()) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java index 84576712b4..4abdff871d 100755 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -55,6 +55,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptKilledEvent; import org.apache.hadoop.mapreduce.v2.app.metrics.MRAppMetrics; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; @@ -92,7 +93,8 @@ public class TestTaskImpl { private int taskCounter = 0; private final int partition = 1; - private InlineDispatcher dispatcher; + private InlineDispatcher dispatcher; + private MockTaskAttemptEventHandler taskAttemptEventHandler; private List taskAttempts; private class MockTaskImpl extends TaskImpl { @@ -257,7 +259,10 @@ public void setup() { taskSplitMetaInfo = mock(TaskSplitMetaInfo.class); when(taskSplitMetaInfo.getLocations()).thenReturn(dataLocations); - taskAttempts = new ArrayList(); + taskAttempts = new ArrayList(); + + taskAttemptEventHandler = new MockTaskAttemptEventHandler(); + dispatcher.register(TaskAttemptEventType.class, taskAttemptEventHandler); } private MockTaskImpl createMockTask(TaskType taskType) { @@ -294,8 +299,12 @@ private void killTask(TaskId taskId) { } private void killScheduledTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + killScheduledTaskAttempt(attemptId, false); + } + + private void killScheduledTaskAttempt(TaskAttemptId attemptId, + boolean reschedule) { + mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); assertTaskScheduledState(); } @@ -326,11 +335,15 @@ private void updateLastAttemptState(TaskAttemptState s) { } private void killRunningTaskAttempt(TaskAttemptId attemptId) { - mockTask.handle(new TaskTAttemptEvent(attemptId, - TaskEventType.T_ATTEMPT_KILLED)); + killRunningTaskAttempt(attemptId, false); + } + + private void killRunningTaskAttempt(TaskAttemptId attemptId, + boolean reschedule) { + mockTask.handle(new TaskTAttemptKilledEvent(attemptId, reschedule)); assertTaskRunningState(); } - + private void failRunningTaskAttempt(TaskAttemptId attemptId) { mockTask.handle(new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_FAILED)); @@ -423,10 +436,12 @@ public void testKillScheduledTask() { */ public void testKillScheduledTaskAttempt() { LOG.info("--- START: testKillScheduledTaskAttempt ---"); - mockTask = createMockTask(TaskType.MAP); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); - killScheduledTaskAttempt(getLastAttempt().getAttemptId()); + killScheduledTaskAttempt(getLastAttempt().getAttemptId(), true); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); } @Test @@ -449,11 +464,13 @@ public void testLaunchTaskAttempt() { */ public void testKillRunningTaskAttempt() { LOG.info("--- START: testKillRunningTaskAttempt ---"); - mockTask = createMockTask(TaskType.MAP); + mockTask = createMockTask(TaskType.MAP); TaskId taskId = getNewTaskID(); scheduleTaskAttempt(taskId); launchTaskAttempt(getLastAttempt().getAttemptId()); - killRunningTaskAttempt(getLastAttempt().getAttemptId()); + killRunningTaskAttempt(getLastAttempt().getAttemptId(), true); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); } @Test @@ -471,6 +488,28 @@ public void testKillSuccessfulTask() { assertTaskSucceededState(); } + @Test + /** + * Kill map attempt for succeeded map task + * {@link TaskState#SUCCEEDED}->{@link TaskState#SCHEDULED} + */ + public void testKillAttemptForSuccessfulTask() { + LOG.info("--- START: testKillAttemptForSuccessfulTask ---"); + mockTask = createMockTask(TaskType.MAP); + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + commitTaskAttempt(getLastAttempt().getAttemptId()); + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ATTEMPT_SUCCEEDED)); + assertTaskSucceededState(); + mockTask.handle( + new TaskTAttemptKilledEvent(getLastAttempt().getAttemptId(), true)); + assertEquals(TaskAttemptEventType.TA_RESCHEDULE, + taskAttemptEventHandler.lastTaskAttemptEvent.getType()); + assertTaskScheduledState(); + } + @Test public void testTaskProgress() { LOG.info("--- START: testTaskProgress ---"); @@ -728,8 +767,8 @@ protected int getMaxAttempts() { assertEquals(TaskState.FAILED, mockTask.getState()); taskAttempt = taskAttempts.get(3); taskAttempt.setState(TaskAttemptState.KILLED); - mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), - TaskEventType.T_ATTEMPT_KILLED)); + mockTask.handle(new TaskTAttemptKilledEvent(taskAttempt.getAttemptId(), + false)); assertEquals(TaskState.FAILED, mockTask.getState()); } @@ -840,4 +879,14 @@ protected int getMaxAttempts() { Counters taskCounters = mockTask.getCounters(); assertEquals("wrong counters for task", specAttemptCounters, taskCounters); } + + public static class MockTaskAttemptEventHandler implements EventHandler { + public TaskAttemptEvent lastTaskAttemptEvent; + @Override + public void handle(Event event) { + if (event instanceof TaskAttemptEvent) { + lastTaskAttemptEvent = (TaskAttemptEvent)event; + } + } + }; }