From fe35103591ece0209f8345aba5544313e45a073c Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Wed, 3 Jan 2018 11:01:38 -0600 Subject: [PATCH] MAPREDUCE-7028. Concurrent task progress updates causing NPE in Application Master. Contributed by Gergo Repas --- .../mapred/TaskAttemptListenerImpl.java | 41 +++++++++++-------- 1 file changed, 23 insertions(+), 18 deletions(-) diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java index 67f8ff03b3..556c90c441 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapred/TaskAttemptListenerImpl.java @@ -585,33 +585,38 @@ public void setCheckpointID(TaskID taskId, TaskCheckpointID cid) { private void coalesceStatusUpdate(TaskAttemptId yarnAttemptID, TaskAttemptStatus taskAttemptStatus, AtomicReference lastStatusRef) { - boolean asyncUpdatedNeeded = false; - TaskAttemptStatus lastStatus = lastStatusRef.get(); - - if (lastStatus == null) { - lastStatusRef.set(taskAttemptStatus); - asyncUpdatedNeeded = true; - } else { - List oldFetchFailedMaps = - taskAttemptStatus.fetchFailedMaps; - - // merge fetchFailedMaps from the previous update - if (lastStatus.fetchFailedMaps != null) { + List fetchFailedMaps = taskAttemptStatus.fetchFailedMaps; + TaskAttemptStatus lastStatus = null; + boolean done = false; + while (!done) { + lastStatus = lastStatusRef.get(); + if (lastStatus != null && lastStatus.fetchFailedMaps != null) { + // merge fetchFailedMaps from the previous update if (taskAttemptStatus.fetchFailedMaps == null) { taskAttemptStatus.fetchFailedMaps = lastStatus.fetchFailedMaps; } else { - taskAttemptStatus.fetchFailedMaps.addAll(lastStatus.fetchFailedMaps); + taskAttemptStatus.fetchFailedMaps = + new ArrayList<>(lastStatus.fetchFailedMaps.size() + + fetchFailedMaps.size()); + taskAttemptStatus.fetchFailedMaps.addAll( + lastStatus.fetchFailedMaps); + taskAttemptStatus.fetchFailedMaps.addAll( + fetchFailedMaps); } } - if (!lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus)) { - // update failed - async dispatcher has processed it in the meantime - taskAttemptStatus.fetchFailedMaps = oldFetchFailedMaps; - lastStatusRef.set(taskAttemptStatus); - asyncUpdatedNeeded = true; + // lastStatusRef may be changed by either the AsyncDispatcher when + // it processes the update, or by another IPC server handler + done = lastStatusRef.compareAndSet(lastStatus, taskAttemptStatus); + if (!done) { + LOG.info("TaskAttempt " + yarnAttemptID + + ": lastStatusRef changed by another thread, retrying..."); + // let's revert taskAttemptStatus.fetchFailedMaps + taskAttemptStatus.fetchFailedMaps = fetchFailedMaps; } } + boolean asyncUpdatedNeeded = (lastStatus == null); if (asyncUpdatedNeeded) { context.getEventHandler().handle( new TaskAttemptStatusUpdateEvent(taskAttemptStatus.id,