diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index e3b559eaf3..8d9b61e0fb 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -187,6 +187,9 @@ Release 2.4.0 - UNRELEASED MAPREDUCE-5052. Job History UI and web services confusing job start time and job submit time (Chen He via jeagles) + MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due + to speculative execution (Gera Shegalov via Sandy Ryza) + OPTIMIZATIONS MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java index f3c62a48d5..5e14ce1cb5 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskAttemptImpl.java @@ -1552,6 +1552,12 @@ public void transition(TaskAttemptImpl taskAttempt, TaskAttemptEvent event) { //set the finish time taskAttempt.setFinishTime(); + + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); + } + //send the deallocate event to ContainerAllocator taskAttempt.eventHandler.handle( new ContainerAllocatorEvent(taskAttempt.attemptId, @@ -1855,6 +1861,12 @@ public void transition(TaskAttemptImpl taskAttempt, LOG.debug("Not generating HistoryFinish event since start event not " + "generated for taskAttempt: " + taskAttempt.getID()); } + + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); + } + // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.attemptId, @@ -1872,6 +1884,12 @@ public void transition(TaskAttemptImpl taskAttempt, // for it taskAttempt.taskAttemptListener.unregister( taskAttempt.attemptId, taskAttempt.jvmID); + + if (event instanceof TaskAttemptKillEvent) { + taskAttempt.addDiagnosticInfo( + ((TaskAttemptKillEvent) event).getMessage()); + } + taskAttempt.reportedStatus.progress = 1.0f; taskAttempt.updateProgressSplits(); //send the cleanup event to containerLauncher diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java index 75d833652a..efb46d538b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/job/impl/TaskImpl.java @@ -69,6 +69,7 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; +import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; @@ -100,6 +101,7 @@ public abstract class TaskImpl implements Task, EventHandler { private static final Log LOG = LogFactory.getLog(TaskImpl.class); + private static final String SPECULATION = "Speculation: "; protected final JobConf conf; protected final Path jobFile; @@ -906,8 +908,8 @@ public void transition(TaskImpl task, TaskEvent event) { LOG.info(task.commitAttempt + " already given a go for committing the task output, so killing " + attemptID); - task.eventHandler.handle(new TaskAttemptEvent( - attemptID, TaskAttemptEventType.TA_KILL)); + task.eventHandler.handle(new TaskAttemptKillEvent(attemptID, + SPECULATION + task.commitAttempt + " committed first!")); } } } @@ -932,9 +934,8 @@ public void transition(TaskImpl task, TaskEvent event) { // other reasons. !attempt.isFinished()) { LOG.info("Issuing kill to other attempt " + attempt.getID()); - task.eventHandler.handle( - new TaskAttemptEvent(attempt.getID(), - TaskAttemptEventType.TA_KILL)); + task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(), + SPECULATION + task.successfulAttempt + " succeeded first!")); } } task.finished(TaskStateInternal.SUCCEEDED); @@ -1199,8 +1200,7 @@ public void transition(TaskImpl task, TaskEvent event) { private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) { if (attempt != null && !attempt.isFinished()) { eventHandler.handle( - new TaskAttemptEvent(attempt.getID(), - TaskAttemptEventType.TA_KILL)); + new TaskAttemptKillEvent(attempt.getID(), logMsg)); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java index 37d09e0da3..9c88c67b66 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestSpeculativeExecutionWithMRApp.java @@ -18,6 +18,7 @@ package org.apache.hadoop.mapreduce.v2; +import java.util.Collection; import java.util.Iterator; import java.util.Map; import java.util.Random; @@ -106,17 +107,21 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception { int maxTimeWait = 10; boolean successfullySpeculated = false; + TaskAttempt[] ta = null; while (maxTimeWait > 0 && !successfullySpeculated) { if (taskToBeSpeculated.getAttempts().size() != 2) { Thread.sleep(1000); clock.setTime(System.currentTimeMillis() + 20000); } else { successfullySpeculated = true; + // finish 1st TA, 2nd will be killed + ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated); } maxTimeWait--; } Assert .assertTrue("Couldn't speculate successfully", successfullySpeculated); + verifySpeculationMessage(app, ta); } @Test(timeout = 60000) @@ -197,16 +202,47 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception { int maxTimeWait = 5; boolean successfullySpeculated = false; + TaskAttempt[] ta = null; while (maxTimeWait > 0 && !successfullySpeculated) { if (speculatedTask.getAttempts().size() != 2) { Thread.sleep(1000); } else { successfullySpeculated = true; + ta = makeFirstAttemptWin(appEventHandler, speculatedTask); } maxTimeWait--; } Assert .assertTrue("Couldn't speculate successfully", successfullySpeculated); + verifySpeculationMessage(app, ta); + } + + private static TaskAttempt[] makeFirstAttemptWin( + EventHandler appEventHandler, Task speculatedTask) { + + // finish 1st TA, 2nd will be killed + Collection attempts = speculatedTask.getAttempts().values(); + TaskAttempt[] ta = new TaskAttempt[attempts.size()]; + attempts.toArray(ta); + appEventHandler.handle( + new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE)); + appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(), + TaskAttemptEventType.TA_CONTAINER_CLEANED)); + return ta; + } + + private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta) + throws Exception { + app.waitForState(ta[0], TaskAttemptState.SUCCEEDED); + app.waitForState(ta[1], TaskAttemptState.KILLED); + boolean foundSpecMsg = false; + for (String msg : ta[1].getDiagnostics()) { + if (msg.contains("Speculation")) { + foundSpecMsg = true; + break; + } + } + Assert.assertTrue("No speculation diagnostics!", foundSpecMsg); } private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,