MAPREDUCE-7020. Task timeout in uber mode can crash AM. Contributed by Peter Bacsko

This commit is contained in:
Jason Lowe 2018-01-26 15:31:43 -06:00
parent e990904dd5
commit 6eef3d7f1a
3 changed files with 20 additions and 8 deletions

View File

@ -369,14 +369,16 @@ public AMFeedback statusUpdate(TaskAttemptID taskAttemptID,
org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID = org.apache.hadoop.mapreduce.v2.api.records.TaskAttemptId yarnAttemptID =
TypeConverter.toYarn(taskAttemptID); TypeConverter.toYarn(taskAttemptID);
AMFeedback feedback = new AMFeedback();
AtomicReference<TaskAttemptStatus> lastStatusRef = AtomicReference<TaskAttemptStatus> lastStatusRef =
attemptIdToStatus.get(yarnAttemptID); attemptIdToStatus.get(yarnAttemptID);
if (lastStatusRef == null) { if (lastStatusRef == null) {
throw new IllegalStateException("Status update was called" LOG.error("Status update was called with illegal TaskAttemptId: "
+ " with illegal TaskAttemptId: " + yarnAttemptID); + yarnAttemptID);
feedback.setTaskFound(false);
return feedback;
} }
AMFeedback feedback = new AMFeedback();
feedback.setTaskFound(true); feedback.setTaskFound(true);
// Propagating preemption to the task if TASK_PREEMPTION is enabled // Propagating preemption to the task if TASK_PREEMPTION is enabled

View File

@ -487,13 +487,15 @@ public void testCoalescedStatusUpdatesCleared()
assertEquals(Phase.REDUCE, status.phase); assertEquals(Phase.REDUCE, status.phase);
} }
@Test(expected = IllegalStateException.class) @Test
public void testStatusUpdateFromUnregisteredTask() public void testStatusUpdateFromUnregisteredTask()
throws IOException, InterruptedException{ throws IOException, InterruptedException{
configureMocks(); configureMocks();
startListener(false); startListener(false);
listener.statusUpdate(attemptID, firstReduceStatus); AMFeedback feedback = listener.statusUpdate(attemptID, firstReduceStatus);
assertFalse(feedback.getTaskFound());
} }
private void configureMocks() { private void configureMocks() {

View File

@ -855,6 +855,9 @@ public void run() {
long taskProgressInterval = MRJobConfUtil. long taskProgressInterval = MRJobConfUtil.
getTaskProgressReportInterval(conf); getTaskProgressReportInterval(conf);
boolean uberized = conf.getBoolean("mapreduce.task.uberized",
false);
while (!taskDone.get()) { while (!taskDone.get()) {
synchronized (lock) { synchronized (lock) {
done = false; done = false;
@ -893,9 +896,14 @@ public void run() {
// if Task Tracker is not aware of our task ID (probably because it died and // if Task Tracker is not aware of our task ID (probably because it died and
// came back up), kill ourselves // came back up), kill ourselves
if (!taskFound) { if (!taskFound) {
LOG.warn("Parent died. Exiting "+taskId); if (uberized) {
resetDoneFlag(); taskDone.set(true);
System.exit(66); break;
} else {
LOG.warn("Parent died. Exiting "+taskId);
resetDoneFlag();
System.exit(66);
}
} }
// Set a flag that says we should preempt this is read by // Set a flag that says we should preempt this is read by