From 439f43ad3defbac907eda2d139a793f153544430 Mon Sep 17 00:00:00 2001 From: rohithsharmaks Date: Fri, 2 Oct 2015 20:34:34 +0530 Subject: [PATCH] MAPREDUCE-6485. Create a new task attempt with failed map task priority if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks) --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 7 +- .../mapreduce/v2/app/job/impl/TaskImpl.java | 20 +++- .../v2/app/job/impl/TestTaskImpl.java | 94 ++++++++++++++++++- 4 files changed, 117 insertions(+), 7 deletions(-) mode change 100644 => 100755 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java mode change 100644 => 100755 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java mode change 100644 => 100755 hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e05688d842..781cb76c6f 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -588,6 +588,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-6494. Permission issue when running archive-logs tool as different users (rkanter) + MAPREDUCE-6485. Create a new task attempt with failed map task priority + if in-progress attempts are unassigned. (Xianyin Xin via rohithsharmaks) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java old mode 100644 new mode 100755 index db4f585a8d..a5321415ad --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1333,7 +1333,7 @@ public TaskAttemptStateInternal recover(TaskAttemptInfo taInfo, return attemptState; } - private static TaskAttemptState getExternalState( + protected static TaskAttemptState getExternalState( TaskAttemptStateInternal smState) { switch (smState) { case ASSIGNED: @@ -1365,6 +1365,11 @@ private static TaskAttemptState getExternalState( } } + // check whether the attempt is assigned if container is not null + boolean isContainerAssigned() { + return container != null; + } + //always called in write lock private void setFinishTime() { //set the finish time only if launch time is set diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java old mode 100644 new mode 100755 index 5f5f300670..a392837eb1 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -1057,9 +1057,21 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { TaskAttemptCompletionEventStatus.FAILED); // we don't need a new event if we already have a spare task.inProgressAttempts.remove(taskAttemptId); - if (task.inProgressAttempts.size() == 0 - && task.successfulAttempt == null) { - task.addAndScheduleAttempt(Avataar.VIRGIN); + if (task.successfulAttempt == null) { + boolean shouldAddNewAttempt = true; + if (task.inProgressAttempts.size() > 0) { + // if not all of the inProgressAttempts are hanging for resource + for (TaskAttemptId attemptId : task.inProgressAttempts) { + if (((TaskAttemptImpl) task.getAttempt(attemptId)) + .isContainerAssigned()) { + shouldAddNewAttempt = false; + break; + } + } + } + if (shouldAddNewAttempt) { + task.addAndScheduleAttempt(Avataar.VIRGIN); + } } } else { task.handleTaskAttemptCompletion( @@ -1080,7 +1092,7 @@ public TaskStateInternal transition(TaskImpl task, TaskEvent event) { taskFailedEvent)); } else { LOG.debug("Not generating HistoryFinish event since start event not" + - " generated for task: " + task.getID()); + " generated for task: " + task.getID()); } task.eventHandler.handle( new JobTaskEvent(task.taskId, TaskState.FAILED)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java old mode 100644 new mode 100755 index ae8a797b09..af5181742e --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskImpl.java @@ -50,6 +50,8 @@ import org.apache.hadoop.mapreduce.v2.app.TaskAttemptListener; import org.apache.hadoop.mapreduce.v2.app.job.TaskAttempt; import org.apache.hadoop.mapreduce.v2.app.job.TaskStateInternal; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskTAttemptEvent; @@ -57,6 +59,7 @@ import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; import org.apache.hadoop.yarn.event.InlineDispatcher; import org.apache.hadoop.yarn.util.Clock; @@ -140,6 +143,8 @@ private class MockTaskAttemptImpl extends TaskAttemptImpl { private float progress = 0; private TaskAttemptState state = TaskAttemptState.NEW; + boolean rescheduled = false; + boolean containerAssigned = false; private TaskType taskType; private Counters attemptCounters = TaskAttemptImpl.EMPTY_COUNTERS; @@ -153,6 +158,15 @@ public MockTaskAttemptImpl(TaskId taskId, int id, EventHandler eventHandler, this.taskType = taskType; } + public void assignContainer() { + containerAssigned = true; + } + + @Override + boolean isContainerAssigned() { + return containerAssigned; + } + public TaskAttemptId getAttemptId() { return getID(); } @@ -173,11 +187,20 @@ public void setProgress(float progress) { public void setState(TaskAttemptState state) { this.state = state; } - + + @Override public TaskAttemptState getState() { return state; } + public boolean getRescheduled() { + return this.rescheduled; + } + + public void setRescheduled(boolean rescheduled) { + this.rescheduled = rescheduled; + } + @Override public Counters getCounters() { return attemptCounters; @@ -279,7 +302,9 @@ private void killScheduledTaskAttempt(TaskAttemptId attemptId) { private void launchTaskAttempt(TaskAttemptId attemptId) { mockTask.handle(new TaskTAttemptEvent(attemptId, TaskEventType.T_ATTEMPT_LAUNCHED)); - assertTaskRunningState(); + ((MockTaskAttemptImpl)(mockTask.getAttempt(attemptId))) + .assignContainer(); + assertTaskRunningState(); } private void commitTaskAttempt(TaskAttemptId attemptId) { @@ -708,6 +733,71 @@ protected int getMaxAttempts() { assertEquals(TaskState.FAILED, mockTask.getState()); } + private class PartialAttemptEventHandler implements EventHandler { + + @Override + public void handle(Event event) { + if (event instanceof TaskAttemptEvent) + if (event.getType() == TaskAttemptEventType.TA_RESCHEDULE) { + TaskAttempt attempt = mockTask.getAttempt(((TaskAttemptEvent) event).getTaskAttemptID()); + ((MockTaskAttemptImpl)attempt).setRescheduled(true); + } + } + } + + @Test + public void testFailedTransitionWithHangingSpeculativeMap() { + mockTask = new MockTaskImpl(jobId, partition, new PartialAttemptEventHandler(), + remoteJobConfFile, conf, taskAttemptListener, jobToken, + credentials, clock, startCount, metrics, appContext, TaskType.MAP) { + @Override + protected int getMaxAttempts() { + return 4; + } + }; + + // start a new task, schedule and launch a new attempt + TaskId taskId = getNewTaskID(); + scheduleTaskAttempt(taskId); + launchTaskAttempt(getLastAttempt().getAttemptId()); + + // add a speculative attempt(#2), but not launch it + mockTask.handle(new TaskTAttemptEvent(getLastAttempt().getAttemptId(), + TaskEventType.T_ADD_SPEC_ATTEMPT)); + + // have the first attempt(#1) fail, verify task still running since the + // max attempts is 4 + MockTaskAttemptImpl taskAttempt = taskAttempts.get(0); + taskAttempt.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.RUNNING, mockTask.getState()); + + // verify a new attempt(#3) added because the speculative attempt(#2) + // is hanging + assertEquals(3, taskAttempts.size()); + + // verify the speculative attempt(#2) is not a rescheduled attempt + assertEquals(false, taskAttempts.get(1).getRescheduled()); + + // verify the third attempt is a rescheduled attempt + assertEquals(true, taskAttempts.get(2).getRescheduled()); + + // now launch the latest attempt(#3) and set the internal state to running + launchTaskAttempt(getLastAttempt().getAttemptId()); + + // have the speculative attempt(#2) fail, verify task still since it + // hasn't reach the max attempts which is 4 + MockTaskAttemptImpl taskAttempt1 = taskAttempts.get(1); + taskAttempt1.setState(TaskAttemptState.FAILED); + mockTask.handle(new TaskTAttemptEvent(taskAttempt1.getAttemptId(), + TaskEventType.T_ATTEMPT_FAILED)); + assertEquals(TaskState.RUNNING, mockTask.getState()); + + // verify there's no new attempt added because of the running attempt(#3) + assertEquals(3, taskAttempts.size()); + } + @Test public void testCountersWithSpeculation() { mockTask = new MockTaskImpl(jobId, partition, dispatcher.getEventHandler(),