From 27ea3ab6ba125bfb2061a772649788747375f557 Mon Sep 17 00:00:00 2001 From: Robert Joseph Evans Date: Fri, 13 Apr 2012 13:45:03 +0000 Subject: [PATCH] MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to also be completed. (Bikas Saha via bobby) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1325765 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../mapreduce/v2/app/job/impl/TaskImpl.java | 1 + .../TestJobHistoryEventHandler.java | 8 +- .../mapreduce/v2/app/TestFetchFailure.java | 120 ++++++++++++++++++ .../src/main/avro/Events.avpr | 3 +- .../jobhistory/JobHistoryParser.java | 17 +++ .../jobhistory/TaskFinishedEvent.java | 22 +++- .../rumen/Task20LineHistoryEventEmitter.java | 2 +- 8 files changed, 169 insertions(+), 7 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 51e01b5594..901aa9b0f5 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -332,6 +332,9 @@ Release 0.23.3 - UNRELEASED text on the UI to N/A instead of a link to null. (Bhallamudi Venkata Siva Kamesh via sseth) + MAPREDUCE-4128. AM Recovery expects all attempts of a completed task to + also be completed. (Bikas Saha via bobby) + Release 0.23.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 174e9d1f44..58edd1690c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -656,6 +656,7 @@ private void handleTaskAttemptCompletion(TaskAttemptId attemptId, private static TaskFinishedEvent createTaskFinishedEvent(TaskImpl task, TaskState taskState) { TaskFinishedEvent tfe = new TaskFinishedEvent(TypeConverter.fromYarn(task.taskId), + TypeConverter.fromYarn(task.successfulAttempt), task.getFinishTime(task.successfulAttempt), TypeConverter.fromYarn(task.taskId.getTaskType()), taskState.toString(), diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java index fce41d6086..c1c227064e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/jobhistory/TestJobHistoryEventHandler.java @@ -93,7 +93,7 @@ public void testFirstFlushOnCompletionEvent() throws Exception { // First completion event, but min-queue-size for batching flushes is 10 handleEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, 0, TaskType.MAP, "", null))); + t.taskID, null, 0, TaskType.MAP, "", null))); verify(mockWriter).flush(); } finally { @@ -129,7 +129,7 @@ public void testMaxUnflushedCompletionEvents() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, 0, TaskType.MAP, "", null))); + t.taskID, null, 0, TaskType.MAP, "", null))); } handleNextNEvents(jheh, 9); @@ -174,7 +174,7 @@ public void testUnflushedTimer() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, 0, TaskType.MAP, "", null))); + t.taskID, null, 0, TaskType.MAP, "", null))); } handleNextNEvents(jheh, 9); @@ -215,7 +215,7 @@ public void testBatchedFlushJobEndMultiplier() throws Exception { for (int i = 0 ; i < 100 ; i++) { queueEvent(jheh, new JobHistoryEvent(t.jobId, new TaskFinishedEvent( - t.taskID, 0, TaskType.MAP, "", null))); + t.taskID, null, 0, TaskType.MAP, "", null))); } queueEvent(jheh, new JobHistoryEvent(t.jobId, new JobFinishedEvent( TypeConverter.fromYarn(t.jobId), 0, 10, 10, 0, 0, null, null, new Counters()))); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java index 827e727e56..bc895a4ff1 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestFetchFailure.java @@ -25,6 +25,8 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.mapreduce.MRJobConfig; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEvent; +import org.apache.hadoop.mapreduce.jobhistory.JobHistoryEventHandler; import org.apache.hadoop.mapreduce.v2.api.records.JobState; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEvent; import org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptCompletionEventStatus; @@ -37,6 +39,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptFetchFailureEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; +import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; public class TestFetchFailure { @@ -142,6 +145,107 @@ public void testFetchFailure() throws Exception { Assert.assertEquals("Event status not correct for reduce attempt1", TaskAttemptCompletionEventStatus.SUCCEEDED, events[3].getStatus()); } + + /** + * This tests that if a map attempt was failed (say due to fetch failures), + * then it gets re-run. When the next map attempt is running, if the AM dies, + * then, on AM re-run, the AM does not incorrectly remember the first failed + * attempt. Currently recovery does not recover running tasks. Effectively, + * the AM re-runs the maps from scratch. + */ + @Test + public void testFetchFailureWithRecovery() throws Exception { + int runCount = 0; + MRApp app = new MRAppWithHistory(1, 1, false, this.getClass().getName(), true, ++runCount); + Configuration conf = new Configuration(); + // map -> reduce -> fetch-failure -> map retry is incompatible with + // sequential, single-task-attempt approach in uber-AM, so disable: + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("Num tasks not correct", + 2, job.getTasks().size()); + Iterator it = job.getTasks().values().iterator(); + Task mapTask = it.next(); + Task reduceTask = it.next(); + + //wait for Task state move to RUNNING + app.waitForState(mapTask, TaskState.RUNNING); + TaskAttempt mapAttempt1 = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for map success + app.waitForState(mapTask, TaskState.SUCCEEDED); + + TaskAttemptCompletionEvent[] events = + job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Num completion events not correct", + 1, events.length); + Assert.assertEquals("Event status not correct", + TaskAttemptCompletionEventStatus.SUCCEEDED, events[0].getStatus()); + + // wait for reduce to start running + app.waitForState(reduceTask, TaskState.RUNNING); + TaskAttempt reduceAttempt = + reduceTask.getAttempts().values().iterator().next(); + app.waitForState(reduceAttempt, TaskAttemptState.RUNNING); + + //send 3 fetch failures from reduce to trigger map re execution + sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1); + sendFetchFailure(app, reduceAttempt, mapAttempt1); + + //wait for map Task state move back to RUNNING + app.waitForState(mapTask, TaskState.RUNNING); + + // Crash the app again. + app.stop(); + + //rerun + app = + new MRAppWithHistory(1, 1, false, this.getClass().getName(), false, + ++runCount); + conf = new Configuration(); + conf.setBoolean(MRJobConfig.MR_AM_JOB_RECOVERY_ENABLE, true); + conf.setBoolean(MRJobConfig.JOB_UBERTASK_ENABLE, false); + job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + //all maps would be running + Assert.assertEquals("Num tasks not correct", + 2, job.getTasks().size()); + it = job.getTasks().values().iterator(); + mapTask = it.next(); + reduceTask = it.next(); + + // the map is not in a SUCCEEDED state after restart of AM + app.waitForState(mapTask, TaskState.RUNNING); + mapAttempt1 = mapTask.getAttempts().values().iterator().next(); + app.waitForState(mapAttempt1, TaskAttemptState.RUNNING); + + //send the done signal to the map attempt + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(mapAttempt1.getID(), + TaskAttemptEventType.TA_DONE)); + + // wait for map success + app.waitForState(mapTask, TaskState.SUCCEEDED); + + reduceAttempt = reduceTask.getAttempts().values().iterator().next(); + //send done to reduce + app.getContext().getEventHandler().handle( + new TaskAttemptEvent(reduceAttempt.getID(), + TaskAttemptEventType.TA_DONE)); + + app.waitForState(job, JobState.SUCCEEDED); + events = job.getTaskAttemptCompletionEvents(0, 100); + Assert.assertEquals("Num completion events not correct", 2, events.length); + } private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, TaskAttempt mapAttempt) { @@ -150,4 +254,20 @@ private void sendFetchFailure(MRApp app, TaskAttempt reduceAttempt, reduceAttempt.getID(), Arrays.asList(new TaskAttemptId[] {mapAttempt.getID()}))); } + + static class MRAppWithHistory extends MRApp { + public MRAppWithHistory(int maps, int reduces, boolean autoComplete, + String testName, boolean cleanOnStart, int startCount) { + super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); + } + + @Override + protected EventHandler createJobHistoryHandler( + AppContext context) { + JobHistoryEventHandler eventHandler = new JobHistoryEventHandler(context, + getStartCount()); + return eventHandler; + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr index 22864a6fc2..050433a488 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/avro/Events.avpr @@ -230,7 +230,8 @@ {"name": "taskType", "type": "string"}, {"name": "finishTime", "type": "long"}, {"name": "status", "type": "string"}, - {"name": "counters", "type": "JhCounters"} + {"name": "counters", "type": "JhCounters"}, + {"name": "successfulAttemptId", "type": "string"} ] }, diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java index aa1089f1db..34eb59449c 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/JobHistoryParser.java @@ -276,6 +276,17 @@ private void handleTaskAttemptFailedEvent( attemptInfo.shuffleFinishTime = event.getFinishTime(); attemptInfo.sortFinishTime = event.getFinishTime(); attemptInfo.mapFinishTime = event.getFinishTime(); + if(TaskStatus.State.SUCCEEDED.toString().equals(taskInfo.status)) + { + //this is a successful task + if(attemptInfo.getAttemptId().equals(taskInfo.getSuccessfulAttemptId())) + { + // the failed attempt is the one that made this task successful + // so its no longer successful + taskInfo.status = null; + // not resetting the other fields set in handleTaskFinishedEvent() + } + } } private void handleTaskAttemptStartedEvent(TaskAttemptStartedEvent event) { @@ -299,6 +310,7 @@ private void handleTaskFinishedEvent(TaskFinishedEvent event) { taskInfo.counters = event.getCounters(); taskInfo.finishTime = event.getFinishTime(); taskInfo.status = TaskStatus.State.SUCCEEDED.toString(); + taskInfo.successfulAttemptId = event.getSuccessfulTaskAttemptId(); } private void handleTaskUpdatedEvent(TaskUpdatedEvent event) { @@ -514,6 +526,7 @@ public static class TaskInfo { String status; String error; TaskAttemptID failedDueToAttemptId; + TaskAttemptID successfulAttemptId; Map attemptsMap; public TaskInfo() { @@ -554,6 +567,10 @@ public void printAll() { public TaskAttemptID getFailedDueToAttemptId() { return failedDueToAttemptId; } + /** @return the attempt Id that caused this task to succeed */ + public TaskAttemptID getSuccessfulAttemptId() { + return successfulAttemptId; + } /** @return the error */ public String getError() { return error; } /** @return the map of all attempts for this task */ diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java index 35399709bf..55de80ca63 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/jobhistory/TaskFinishedEvent.java @@ -22,6 +22,7 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.mapreduce.Counters; +import org.apache.hadoop.mapreduce.TaskAttemptID; import org.apache.hadoop.mapreduce.TaskID; import org.apache.hadoop.mapreduce.TaskType; @@ -36,6 +37,7 @@ public class TaskFinishedEvent implements HistoryEvent { private TaskFinished datum = null; private TaskID taskid; + private TaskAttemptID successfulAttemptId; private long finishTime; private TaskType taskType; private String status; @@ -44,15 +46,17 @@ public class TaskFinishedEvent implements HistoryEvent { /** * Create an event to record the successful completion of a task * @param id Task ID + * @param attemptId Task Attempt ID of the successful attempt for this task * @param finishTime Finish time of the task * @param taskType Type of the task * @param status Status string * @param counters Counters for the task */ - public TaskFinishedEvent(TaskID id, long finishTime, + public TaskFinishedEvent(TaskID id, TaskAttemptID attemptId, long finishTime, TaskType taskType, String status, Counters counters) { this.taskid = id; + this.successfulAttemptId = attemptId; this.finishTime = finishTime; this.taskType = taskType; this.status = status; @@ -65,6 +69,10 @@ public Object getDatum() { if (datum == null) { datum = new TaskFinished(); datum.taskid = new Utf8(taskid.toString()); + if(successfulAttemptId != null) + { + datum.successfulAttemptId = new Utf8(successfulAttemptId.toString()); + } datum.finishTime = finishTime; datum.counters = EventWriter.toAvro(counters); datum.taskType = new Utf8(taskType.name()); @@ -76,6 +84,10 @@ public Object getDatum() { public void setDatum(Object oDatum) { this.datum = (TaskFinished)oDatum; this.taskid = TaskID.forName(datum.taskid.toString()); + if (datum.successfulAttemptId != null) { + this.successfulAttemptId = TaskAttemptID + .forName(datum.successfulAttemptId.toString()); + } this.finishTime = datum.finishTime; this.taskType = TaskType.valueOf(datum.taskType.toString()); this.status = datum.status.toString(); @@ -84,6 +96,14 @@ public void setDatum(Object oDatum) { /** Get task id */ public TaskID getTaskId() { return TaskID.forName(taskid.toString()); } + /** Get successful task attempt id */ + public TaskAttemptID getSuccessfulTaskAttemptId() { + if(successfulAttemptId != null) + { + return TaskAttemptID.forName(successfulAttemptId.toString()); + } + return null; + } /** Get the task finish time */ public long getFinishTime() { return finishTime; } /** Get task counters */ diff --git a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java index 6ed9130c27..dd002a4342 100644 --- a/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java +++ b/hadoop-tools/hadoop-rumen/src/main/java/org/apache/hadoop/tools/rumen/Task20LineHistoryEventEmitter.java @@ -128,7 +128,7 @@ HistoryEvent maybeEmitEvent(ParsedLine line, String taskIDName, return null; } - return new TaskFinishedEvent(taskID, Long.parseLong(finishTime), + return new TaskFinishedEvent(taskID, null, Long.parseLong(finishTime), that.originalTaskType, status, eventCounters); }