MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due to speculative execution (Gera Shegalov via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1552797 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-12-20 19:53:59 +00:00
parent 74689ab7ca
commit 1b9b956924
4 changed files with 64 additions and 7 deletions

View File

@ -187,6 +187,9 @@ Release 2.4.0 - UNRELEASED
MAPREDUCE-5052. Job History UI and web services confusing job start time and MAPREDUCE-5052. Job History UI and web services confusing job start time and
job submit time (Chen He via jeagles) job submit time (Chen He via jeagles)
MAPREDUCE-5692. Add explicit diagnostics when a task attempt is killed due
to speculative execution (Gera Shegalov via Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza) MAPREDUCE-5484. YarnChild unnecessarily loads job conf twice (Sandy Ryza)

View File

@ -1552,6 +1552,12 @@ public void transition(TaskAttemptImpl taskAttempt,
TaskAttemptEvent event) { TaskAttemptEvent event) {
//set the finish time //set the finish time
taskAttempt.setFinishTime(); taskAttempt.setFinishTime();
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
//send the deallocate event to ContainerAllocator //send the deallocate event to ContainerAllocator
taskAttempt.eventHandler.handle( taskAttempt.eventHandler.handle(
new ContainerAllocatorEvent(taskAttempt.attemptId, new ContainerAllocatorEvent(taskAttempt.attemptId,
@ -1855,6 +1861,12 @@ public void transition(TaskAttemptImpl taskAttempt,
LOG.debug("Not generating HistoryFinish event since start event not " + LOG.debug("Not generating HistoryFinish event since start event not " +
"generated for taskAttempt: " + taskAttempt.getID()); "generated for taskAttempt: " + taskAttempt.getID());
} }
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
// taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure. // taskAttempt.logAttemptFinishedEvent(TaskAttemptStateInternal.KILLED); Not logging Map/Reduce attempts in case of failure.
taskAttempt.eventHandler.handle(new TaskTAttemptEvent( taskAttempt.eventHandler.handle(new TaskTAttemptEvent(
taskAttempt.attemptId, taskAttempt.attemptId,
@ -1872,6 +1884,12 @@ public void transition(TaskAttemptImpl taskAttempt,
// for it // for it
taskAttempt.taskAttemptListener.unregister( taskAttempt.taskAttemptListener.unregister(
taskAttempt.attemptId, taskAttempt.jvmID); taskAttempt.attemptId, taskAttempt.jvmID);
if (event instanceof TaskAttemptKillEvent) {
taskAttempt.addDiagnosticInfo(
((TaskAttemptKillEvent) event).getMessage());
}
taskAttempt.reportedStatus.progress = 1.0f; taskAttempt.reportedStatus.progress = 1.0f;
taskAttempt.updateProgressSplits(); taskAttempt.updateProgressSplits();
//send the cleanup event to containerLauncher //send the cleanup event to containerLauncher

View File

@ -69,6 +69,7 @@
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskAttemptCompletedEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.JobTaskEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptKillEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptRecoverEvent;
import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskEvent;
@ -100,6 +101,7 @@
public abstract class TaskImpl implements Task, EventHandler<TaskEvent> { public abstract class TaskImpl implements Task, EventHandler<TaskEvent> {
private static final Log LOG = LogFactory.getLog(TaskImpl.class); private static final Log LOG = LogFactory.getLog(TaskImpl.class);
private static final String SPECULATION = "Speculation: ";
protected final JobConf conf; protected final JobConf conf;
protected final Path jobFile; protected final Path jobFile;
@ -906,8 +908,8 @@ public void transition(TaskImpl task, TaskEvent event) {
LOG.info(task.commitAttempt LOG.info(task.commitAttempt
+ " already given a go for committing the task output, so killing " + " already given a go for committing the task output, so killing "
+ attemptID); + attemptID);
task.eventHandler.handle(new TaskAttemptEvent( task.eventHandler.handle(new TaskAttemptKillEvent(attemptID,
attemptID, TaskAttemptEventType.TA_KILL)); SPECULATION + task.commitAttempt + " committed first!"));
} }
} }
} }
@ -932,9 +934,8 @@ public void transition(TaskImpl task, TaskEvent event) {
// other reasons. // other reasons.
!attempt.isFinished()) { !attempt.isFinished()) {
LOG.info("Issuing kill to other attempt " + attempt.getID()); LOG.info("Issuing kill to other attempt " + attempt.getID());
task.eventHandler.handle( task.eventHandler.handle(new TaskAttemptKillEvent(attempt.getID(),
new TaskAttemptEvent(attempt.getID(), SPECULATION + task.successfulAttempt + " succeeded first!"));
TaskAttemptEventType.TA_KILL));
} }
} }
task.finished(TaskStateInternal.SUCCEEDED); task.finished(TaskStateInternal.SUCCEEDED);
@ -1199,8 +1200,7 @@ public void transition(TaskImpl task, TaskEvent event) {
private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) { private void killUnfinishedAttempt(TaskAttempt attempt, String logMsg) {
if (attempt != null && !attempt.isFinished()) { if (attempt != null && !attempt.isFinished()) {
eventHandler.handle( eventHandler.handle(
new TaskAttemptEvent(attempt.getID(), new TaskAttemptKillEvent(attempt.getID(), logMsg));
TaskAttemptEventType.TA_KILL));
} }
} }

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.mapreduce.v2; package org.apache.hadoop.mapreduce.v2;
import java.util.Collection;
import java.util.Iterator; import java.util.Iterator;
import java.util.Map; import java.util.Map;
import java.util.Random; import java.util.Random;
@ -106,17 +107,21 @@ public void testSpeculateSuccessfulWithoutUpdateEvents() throws Exception {
int maxTimeWait = 10; int maxTimeWait = 10;
boolean successfullySpeculated = false; boolean successfullySpeculated = false;
TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) { while (maxTimeWait > 0 && !successfullySpeculated) {
if (taskToBeSpeculated.getAttempts().size() != 2) { if (taskToBeSpeculated.getAttempts().size() != 2) {
Thread.sleep(1000); Thread.sleep(1000);
clock.setTime(System.currentTimeMillis() + 20000); clock.setTime(System.currentTimeMillis() + 20000);
} else { } else {
successfullySpeculated = true; successfullySpeculated = true;
// finish 1st TA, 2nd will be killed
ta = makeFirstAttemptWin(appEventHandler, taskToBeSpeculated);
} }
maxTimeWait--; maxTimeWait--;
} }
Assert Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated); .assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta);
} }
@Test(timeout = 60000) @Test(timeout = 60000)
@ -197,16 +202,47 @@ public void testSepculateSuccessfulWithUpdateEvents() throws Exception {
int maxTimeWait = 5; int maxTimeWait = 5;
boolean successfullySpeculated = false; boolean successfullySpeculated = false;
TaskAttempt[] ta = null;
while (maxTimeWait > 0 && !successfullySpeculated) { while (maxTimeWait > 0 && !successfullySpeculated) {
if (speculatedTask.getAttempts().size() != 2) { if (speculatedTask.getAttempts().size() != 2) {
Thread.sleep(1000); Thread.sleep(1000);
} else { } else {
successfullySpeculated = true; successfullySpeculated = true;
ta = makeFirstAttemptWin(appEventHandler, speculatedTask);
} }
maxTimeWait--; maxTimeWait--;
} }
Assert Assert
.assertTrue("Couldn't speculate successfully", successfullySpeculated); .assertTrue("Couldn't speculate successfully", successfullySpeculated);
verifySpeculationMessage(app, ta);
}
private static TaskAttempt[] makeFirstAttemptWin(
EventHandler appEventHandler, Task speculatedTask) {
// finish 1st TA, 2nd will be killed
Collection<TaskAttempt> attempts = speculatedTask.getAttempts().values();
TaskAttempt[] ta = new TaskAttempt[attempts.size()];
attempts.toArray(ta);
appEventHandler.handle(
new TaskAttemptEvent(ta[0].getID(), TaskAttemptEventType.TA_DONE));
appEventHandler.handle(new TaskAttemptEvent(ta[0].getID(),
TaskAttemptEventType.TA_CONTAINER_CLEANED));
return ta;
}
private static void verifySpeculationMessage(MRApp app, TaskAttempt[] ta)
throws Exception {
app.waitForState(ta[0], TaskAttemptState.SUCCEEDED);
app.waitForState(ta[1], TaskAttemptState.KILLED);
boolean foundSpecMsg = false;
for (String msg : ta[1].getDiagnostics()) {
if (msg.contains("Speculation")) {
foundSpecMsg = true;
break;
}
}
Assert.assertTrue("No speculation diagnostics!", foundSpecMsg);
} }
private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id, private TaskAttemptStatus createTaskAttemptStatus(TaskAttemptId id,