From 08da8ea5db5359fc04010be486b842a5d2e6b9c2 Mon Sep 17 00:00:00 2001 From: Mahadev Konar Date: Thu, 1 Dec 2011 08:35:20 +0000 Subject: [PATCH] MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1208994 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 ++ .../hadoop/mapreduce/v2/app/MRAppMaster.java | 12 +++++-- .../v2/app/recover/RecoveryService.java | 32 +++++++++++++++---- .../hadoop/mapreduce/v2/app/TestRecovery.java | 29 ++++++++++++++++- .../hadoop/yarn/event/AsyncDispatcher.java | 15 +++++++-- .../hadoop/yarn/event/DrainDispatcher.java | 2 +- 6 files changed, 79 insertions(+), 14 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 135623ecb8..187837abf7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -204,6 +204,9 @@ Release 0.23.1 - Unreleased MAPREDUCE-3488. Streaming jobs are failing because the main class isnt set in the pom files. (mahadev) + + MAPREDUCE-3463. Second AM fails to recover properly when first AM is killed with + java.lang.IllegalArgumentException causing lost job. (Siddharth Seth via mahadev) Release 0.23.0 - 2011-11-01 diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 7a6b86a0f8..800dfa9d36 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -217,8 +217,7 @@ public void init(final Configuration conf) { && appAttemptID.getAttemptId() > 1) { LOG.info("Recovery is enabled. " + "Will try to recover from previous life on best effort basis."); - recoveryServ = new RecoveryService(appAttemptID, clock, - committer); + recoveryServ = createRecoveryService(context); addIfService(recoveryServ); dispatcher = recoveryServ.getDispatcher(); clock = recoveryServ.getClock(); @@ -425,6 +424,15 @@ protected EventHandler createJobFinishEventHandler() { return new JobFinishEventHandler(); } + /** + * Create the recovery service. + * @return an instance of the recovery service. + */ + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryService(appContext.getApplicationAttemptId(), + appContext.getClock(), getCommitter()); + } + /** Create and initialize (but don't start) a single job. */ protected Job createJob(Configuration conf) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java index 843e666c87..30cbdae67b 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/recover/RecoveryService.java @@ -76,8 +76,6 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.Event; import org.apache.hadoop.yarn.event.EventHandler; -import org.apache.hadoop.yarn.factories.RecordFactory; -import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.service.CompositeService; import org.apache.hadoop.yarn.service.Service; import org.apache.hadoop.yarn.util.BuilderUtils; @@ -97,8 +95,6 @@ public class RecoveryService extends CompositeService implements Recovery { - private static final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); - private static final Log LOG = LogFactory.getLog(RecoveryService.class); private final ApplicationAttemptId applicationAttemptId; @@ -120,7 +116,7 @@ public RecoveryService(ApplicationAttemptId applicationAttemptId, super("RecoveringDispatcher"); this.applicationAttemptId = applicationAttemptId; this.committer = committer; - this.dispatcher = new RecoveryDispatcher(); + this.dispatcher = createRecoveryDispatcher(); this.clock = new ControlledClock(clock); addService((Service) dispatcher); } @@ -209,17 +205,32 @@ private void parse() throws IOException { LOG.info("Read completed tasks from history " + completedTasks.size()); } + + protected Dispatcher createRecoveryDispatcher() { + return new RecoveryDispatcher(); + } + + protected Dispatcher createRecoveryDispatcher(boolean exitOnException) { + return new RecoveryDispatcher(exitOnException); + } + @SuppressWarnings("rawtypes") class RecoveryDispatcher extends AsyncDispatcher { private final EventHandler actualHandler; private final EventHandler handler; - RecoveryDispatcher() { + RecoveryDispatcher(boolean exitOnException) { + super(exitOnException); actualHandler = super.getEventHandler(); handler = new InterceptingEventHandler(actualHandler); } + RecoveryDispatcher() { + this(false); + } + @Override + @SuppressWarnings("unchecked") public void dispatch(Event event) { if (recoveryMode) { if (event.getType() == TaskAttemptEventType.TA_CONTAINER_LAUNCHED) { @@ -267,6 +278,10 @@ else if (event.getType() == TaskEventType.T_ATTEMPT_FAILED } } } + realDispatch(event); + } + + public void realDispatch(Event event) { super.dispatch(event); } @@ -281,6 +296,7 @@ private TaskAttemptInfo getTaskAttemptInfo(TaskAttemptId id) { return taskInfo.getAllTaskAttempts().get(TypeConverter.fromYarn(id)); } + @SuppressWarnings({"rawtypes", "unchecked"}) private class InterceptingEventHandler implements EventHandler { EventHandler actualHandler; @@ -407,7 +423,9 @@ private void sendAssignedEvent(TaskAttemptId yarnAttemptID, LOG.info("Sending assigned event to " + yarnAttemptID); ContainerId cId = attemptInfo.getContainerId(); - NodeId nodeId = ConverterUtils.toNodeId(attemptInfo.getHostname()); + NodeId nodeId = + ConverterUtils.toNodeId(attemptInfo.getHostname() + ":" + + attemptInfo.getPort()); // Resource/Priority/ApplicationACLs are only needed while launching the // container on an NM, these are already completed tasks, so setting them // to null diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java index 277b097da4..ec492de7fe 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/test/java/org/apache/hadoop/mapreduce/v2/app/TestRecovery.java @@ -52,7 +52,12 @@ import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEvent; import org.apache.hadoop.mapreduce.v2.app.job.event.TaskAttemptEventType; import org.apache.hadoop.mapreduce.v2.app.launcher.ContainerLauncher; +import org.apache.hadoop.mapreduce.v2.app.recover.Recovery; +import org.apache.hadoop.mapreduce.v2.app.recover.RecoveryService; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.Clock; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; import org.junit.Test; @@ -407,6 +412,13 @@ public MRAppWithHistory(int maps, int reduces, boolean autoComplete, super(maps, reduces, autoComplete, testName, cleanOnStart, startCount); } + @Override + protected Recovery createRecoveryService(AppContext appContext) { + return new RecoveryServiceWithCustomDispatcher( + appContext.getApplicationAttemptId(), appContext.getClock(), + getCommitter()); + } + @Override protected ContainerLauncher createContainerLauncher(AppContext context) { MockContainerLauncher launcher = new MockContainerLauncher(); @@ -422,7 +434,22 @@ protected EventHandler createJobHistoryHandler( return eventHandler; } } - + + class RecoveryServiceWithCustomDispatcher extends RecoveryService { + + public RecoveryServiceWithCustomDispatcher( + ApplicationAttemptId applicationAttemptId, Clock clock, + OutputCommitter committer) { + super(applicationAttemptId, clock, committer); + } + + @Override + public Dispatcher createRecoveryDispatcher() { + return super.createRecoveryDispatcher(false); + } + + } + public static void main(String[] arg) throws Exception { TestRecovery test = new TestRecovery(); test.testCrashed(); diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 278d671ea5..8a5fceecb0 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -45,18 +45,25 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { private Thread eventHandlingThread; protected final Map, EventHandler> eventDispatchers; + private boolean exitOnDispatchException; public AsyncDispatcher() { this(new HashMap, EventHandler>(), - new LinkedBlockingQueue()); + new LinkedBlockingQueue(), true); + } + + public AsyncDispatcher(boolean exitOnException) { + this(new HashMap, EventHandler>(), + new LinkedBlockingQueue(), exitOnException); } AsyncDispatcher( Map, EventHandler> eventDispatchers, - BlockingQueue eventQueue) { + BlockingQueue eventQueue, boolean exitOnException) { super("Dispatcher"); this.eventQueue = eventQueue; this.eventDispatchers = eventDispatchers; + this.exitOnDispatchException = exitOnException; } Runnable createThread() { @@ -118,7 +125,9 @@ protected void dispatch(Event event) { catch (Throwable t) { //TODO Maybe log the state of the queue LOG.fatal("Error in dispatcher thread. Exiting..", t); - System.exit(-1); + if (exitOnDispatchException) { + System.exit(-1); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index 8a61f6f576..20d7dfca94 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -36,7 +36,7 @@ public DrainDispatcher() { } private DrainDispatcher(BlockingQueue eventQueue) { - super(new HashMap, EventHandler>(), eventQueue); + super(new HashMap, EventHandler>(), eventQueue, true); this.queue = eventQueue; }