From ee4ee6af6a5a6299d27462adb6944206039bbbae Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 17 Sep 2015 21:37:39 +0000 Subject: [PATCH] MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can disappear. Contributed by Chang Li --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../v2/app/job/impl/TaskAttemptImpl.java | 92 ++++++----- .../apache/hadoop/mapreduce/v2/app/MRApp.java | 11 +- .../v2/app/job/impl/TestTaskAttempt.java | 154 ++++++++++++++++++ 4 files changed, 213 insertions(+), 47 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 6cf7abbbff..cd84a34a91 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -573,6 +573,9 @@ Release 2.8.0 - UNRELEASED MAPREDUCE-5002. AM could potentially allocate a reduce container to a map attempt (Chang Li via jlowe) + MAPREDUCE-5982. Task attempts that fail from the ASSIGNED state can + disappear (Chang Li via jlowe) + Release 2.7.2 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index 77a75557ef..a7becdbfac 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1484,6 +1484,19 @@ private static JobCounterUpdateEvent createJobCounterUpdateEventTAKilled( return tauce; } + private static void + sendJHStartEventForAssignedFailTask(TaskAttemptImpl taskAttempt) { + TaskAttemptContainerLaunchedEvent event; + taskAttempt.launchTime = taskAttempt.clock.getTime(); + + InetSocketAddress nodeHttpInetAddr = + NetUtils.createSocketAddr(taskAttempt.container.getNodeHttpAddress()); + taskAttempt.trackerName = nodeHttpInetAddr.getHostName(); + taskAttempt.httpPort = nodeHttpInetAddr.getPort(); + taskAttempt.sendLaunchedEvents(); + } + + @SuppressWarnings("unchecked") private void sendLaunchedEvents() { JobCounterUpdateEvent jce = new JobCounterUpdateEvent(attemptId.getTaskId() @@ -1681,6 +1694,9 @@ private static class DeallocateContainerTransition implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); @@ -1715,23 +1731,19 @@ public void transition(TaskAttemptImpl taskAttempt, default: LOG.error("Task final state is not FAILED or KILLED: " + finalState); } - if (taskAttempt.getLaunchTime() != 0) { - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - finalState); - if(finalState == TaskAttemptStateInternal.FAILED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - } else if(finalState == TaskAttemptStateInternal.KILLED) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - } - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - } else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); + + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + finalState); + if(finalState == TaskAttemptStateInternal.FAILED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + } else if(finalState == TaskAttemptStateInternal.KILLED) { + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); } + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); } } @@ -2023,27 +2035,25 @@ private static class KilledTransition implements @Override public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } //set the finish time taskAttempt.setFinishTime(); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.KILLED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } + + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAKilled(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.KILLED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); if (event instanceof TaskAttemptKillEvent) { taskAttempt.addDiagnosticInfo( ((TaskAttemptKillEvent) event).getMessage()); } -// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_KILLED)); @@ -2178,23 +2188,19 @@ public void transition(TaskAttemptImpl taskAttempt, @SuppressWarnings("unchecked") private static void notifyTaskAttemptFailed(TaskAttemptImpl taskAttempt) { + if (taskAttempt.getLaunchTime() == 0) { + sendJHStartEventForAssignedFailTask(taskAttempt); + } // set the finish time taskAttempt.setFinishTime(); + taskAttempt.eventHandler + .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); + TaskAttemptUnsuccessfulCompletionEvent tauce = + createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, + TaskAttemptStateInternal.FAILED); + taskAttempt.eventHandler.handle(new JobHistoryEvent( + taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - if (taskAttempt.getLaunchTime() != 0) { - taskAttempt.eventHandler - .handle(createJobCounterUpdateEventTAFailed(taskAttempt, false)); - TaskAttemptUnsuccessfulCompletionEvent tauce = - createTaskAttemptUnsuccessfulCompletionEvent(taskAttempt, - TaskAttemptStateInternal.FAILED); - taskAttempt.eventHandler.handle(new JobHistoryEvent( - taskAttempt.attemptId.getTaskId().getJobId(), tauce)); - // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.FAILED); Not - // handling failed map/reduce events. - }else { - LOG.debug("Not generating HistoryFinish event since start event not " + - "generated for taskAttempt: " + taskAttempt.getID()); - } taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, TaskEventType.T_ATTEMPT_FAILED)); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java index b51adf2889..f0c10d3ed3 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/MRApp.java @@ -544,10 +544,7 @@ public MockContainerLauncher() { public void handle(ContainerLauncherEvent event) { switch (event.getType()) { case CONTAINER_REMOTE_LAUNCH: - getContext().getEventHandler().handle( - new TaskAttemptContainerLaunchedEvent(event.getTaskAttemptID(), - shufflePort)); - + containerLaunched(event.getTaskAttemptID(), shufflePort); attemptLaunched(event.getTaskAttemptID()); break; case CONTAINER_REMOTE_CLEANUP: @@ -561,6 +558,12 @@ public void handle(ContainerLauncherEvent event) { } } + protected void containerLaunched(TaskAttemptId attemptID, int shufflePort) { + getContext().getEventHandler().handle( + new TaskAttemptContainerLaunchedEvent(attemptID, + shufflePort)); + } + protected void attemptLaunched(TaskAttemptId attemptID) { if (autoComplete) { // send the done event diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java index a88a9358a8..6b4656aaf4 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TestTaskAttempt.java @@ -114,6 +114,69 @@ public void testMRAppHistoryForReduce() throws Exception { testMRAppHistory(app); } + @Test + public void testMRAppHistoryForTAFailedInAssigned() throws Exception { + // test TA_CONTAINER_LAUNCH_FAILED for map + FailingAttemptsDuringAssignedMRApp app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_LAUNCH_FAILED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_LAUNCH_FAILED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_CONTAINER_COMPLETED for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_CONTAINER_COMPLETED); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_FAILMSG); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG_BY_CLIENT for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT); + testTaskAttemptAssignedFailHistory(app); + + // test TA_FAILMSG_BY_CLIENT for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_FAILMSG_BY_CLIENT); + testTaskAttemptAssignedFailHistory(app); + + // test TA_KILL for map + app = + new FailingAttemptsDuringAssignedMRApp(1, 0, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + + // test TA_KILL for reduce + app = + new FailingAttemptsDuringAssignedMRApp(0, 1, + TaskAttemptEventType.TA_KILL); + testTaskAttemptAssignedKilledHistory(app); + } + @Test public void testSingleRackRequest() throws Exception { TaskAttemptImpl.RequestContainerTransition rct = @@ -301,6 +364,31 @@ private void testMRAppHistory(MRApp app) throws Exception { report.getTaskAttemptState()); } + private void testTaskAttemptAssignedFailHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.FAILED); + Map tasks = job.getTasks(); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Failed JH Event", app.getTaFailedJHEvent()); + } + + private void testTaskAttemptAssignedKilledHistory + (FailingAttemptsDuringAssignedMRApp app) throws Exception { + Configuration conf = new Configuration(); + Job job = app.submit(conf); + app.waitForState(job, JobState.RUNNING); + Map tasks = job.getTasks(); + Task task = tasks.values().iterator().next(); + app.waitForState(task, TaskState.SCHEDULED); + Map attempts = task.getAttempts(); + TaskAttempt attempt = attempts.values().iterator().next(); + app.waitForState(attempt, TaskAttemptState.KILLED); + Assert.assertTrue("No Ta Started JH Event", app.getTaStartJHEvent()); + Assert.assertTrue("No Ta Killed JH Event", app.getTaKilledJHEvent()); + } + static class FailingAttemptsMRApp extends MRApp { FailingAttemptsMRApp(int maps, int reduces) { super(maps, reduces, true, "FailingAttemptsMRApp", true); @@ -331,6 +419,72 @@ public void handle(JobHistoryEvent event) { } } + static class FailingAttemptsDuringAssignedMRApp extends MRApp { + FailingAttemptsDuringAssignedMRApp(int maps, int reduces, + TaskAttemptEventType event) { + super(maps, reduces, true, "FailingAttemptsMRApp", true); + sendFailEvent = event; + } + + TaskAttemptEventType sendFailEvent; + + @Override + protected void containerLaunched(TaskAttemptId attemptID, + int shufflePort) { + //do nothing, not send TA_CONTAINER_LAUNCHED event + } + + @Override + protected void attemptLaunched(TaskAttemptId attemptID) { + getContext().getEventHandler().handle( + new TaskAttemptEvent(attemptID, sendFailEvent)); + } + + private boolean receiveTaStartJHEvent = false; + private boolean receiveTaFailedJHEvent = false; + private boolean receiveTaKilledJHEvent = false; + + public boolean getTaStartJHEvent(){ + return receiveTaStartJHEvent; + } + + public boolean getTaFailedJHEvent(){ + return receiveTaFailedJHEvent; + } + + public boolean getTaKilledJHEvent(){ + return receiveTaKilledJHEvent; + } + + protected EventHandler createJobHistoryHandler( + AppContext context) { + return new EventHandler() { + @Override + public void handle(JobHistoryEvent event) { + if (event.getType() == org.apache.hadoop.mapreduce.jobhistory. + EventType.MAP_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.MAP_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_FAILED) { + receiveTaFailedJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_KILLED) { + receiveTaKilledJHEvent = true; + } else if (event.getType() == org.apache.hadoop.mapreduce. + jobhistory.EventType.REDUCE_ATTEMPT_STARTED) { + receiveTaStartJHEvent = true; + } + } + }; + } + } + @Test public void testLaunchFailedWhileKilling() throws Exception { ApplicationId appId = ApplicationId.newInstance(1, 2);