From 4ba2acf3363bdfd7fcdd9de496cd57f8af6f03ad Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 14 Sep 2011 17:57:52 +0000 Subject: [PATCH] MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can gracefully exit. Contributed by Abhijit Suresh Shingate. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170746 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../hadoop/mapreduce/v2/app/MRAppMaster.java | 8 +- .../mapreduce/v2/hs/JobHistoryServer.java | 13 +- .../hadoop/yarn/service/CompositeService.java | 59 ++++- .../yarn/util/TestCompositeService.java | 247 ++++++++++++++++++ .../yarn/server/nodemanager/NodeManager.java | 22 +- .../resourcemanager/ResourceManager.java | 17 +- 7 files changed, 325 insertions(+), 44 deletions(-) create mode 100644 hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 13acc03599..ffebfe98e6 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -292,6 +292,9 @@ Release 0.23.0 - Unreleased MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a ContainerLaunchContext (Arun Murthy via mahadev) + MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can + gracefully exit. (Abhijit Suresh Shingate via vinodkv) + OPTIMIZATIONS MAPREDUCE-2026. Make JobTracker.getJobCounters() and diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java index 4d7a9eafb5..af04b3ff57 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-app/src/main/java/org/apache/hadoop/mapreduce/v2/app/MRAppMaster.java @@ -559,12 +559,14 @@ public void handle(SpeculatorEvent event) { public static void main(String[] args) { try { //Configuration.addDefaultResource("job.xml"); - ApplicationId applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class); - + ApplicationId applicationId = RecordFactoryProvider + .getRecordFactory(null).newRecordInstance(ApplicationId.class); applicationId.setClusterTimestamp(Long.valueOf(args[0])); applicationId.setId(Integer.valueOf(args[1])); int failCount = Integer.valueOf(args[2]); MRAppMaster appMaster = new MRAppMaster(applicationId, failCount); + Runtime.getRuntime().addShutdownHook( + new CompositeServiceShutdownHook(appMaster)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); conf.addResource(new Path(MRConstants.JOB_CONF_FILE)); conf.set(MRJobConfig.USER_NAME, @@ -573,7 +575,7 @@ public static void main(String[] args) { appMaster.init(conf); appMaster.start(); } catch (Throwable t) { - LOG.error("Caught throwable. Exiting:", t); + LOG.fatal("Error starting MRAppMaster", t); System.exit(1); } } diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java index 73ef9feaa2..e6a9b95734 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-hs/src/main/java/org/apache/hadoop/mapreduce/v2/hs/JobHistoryServer.java @@ -74,14 +74,15 @@ protected void doSecureLogin(Configuration conf) throws IOException { public static void main(String[] args) { StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG); - JobHistoryServer server = null; try { - server = new JobHistoryServer(); + JobHistoryServer jobHistoryServer = new JobHistoryServer(); + Runtime.getRuntime().addShutdownHook( + new CompositeServiceShutdownHook(jobHistoryServer)); YarnConfiguration conf = new YarnConfiguration(new JobConf()); - server.init(conf); - server.start(); - } catch (Throwable e) { - LOG.fatal(StringUtils.stringifyException(e)); + jobHistoryServer.init(conf); + jobHistoryServer.start(); + } catch (Throwable t) { + LOG.fatal("Error starting JobHistoryServer", t); System.exit(-1); } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java index 973ecc0eec..6e2ef33014 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/service/CompositeService.java @@ -67,28 +67,59 @@ public synchronized void start() { Service service = serviceList.get(i); service.start(); } - } catch(Throwable e) { + super.start(); + } catch (Throwable e) { LOG.error("Error starting services " + getName(), e); - for (int j = i-1; j >= 0; j--) { - Service service = serviceList.get(j); - try { - service.stop(); - } catch(Throwable t) { - LOG.info("Error stopping " + service.getName(), t); - } - } + // Note that the state of the failed service is still INITED and not + // STARTED. Even though the last service is not started completely, still + // call stop() on all services including failed service to make sure cleanup + // happens. + stop(i); throw new YarnException("Failed to Start " + getName(), e); } - super.start(); + } public synchronized void stop() { - //stop in reserve order of start - for (int i = serviceList.size() - 1; i >= 0; i--) { - Service service = serviceList.get(i); - service.stop(); + if (serviceList.size() > 0) { + stop(serviceList.size() - 1); } super.stop(); } + private synchronized void stop(int numOfServicesStarted) { + // stop in reserve order of start + for (int i = numOfServicesStarted; i >= 0; i--) { + Service service = serviceList.get(i); + try { + service.stop(); + } catch (Throwable t) { + LOG.info("Error stopping " + service.getName(), t); + } + } + } + + /** + * JVM Shutdown hook for CompositeService which will stop the give + * CompositeService gracefully in case of JVM shutdown. + */ + public static class CompositeServiceShutdownHook extends Thread { + + private CompositeService compositeService; + + public CompositeServiceShutdownHook(CompositeService compositeService) { + this.compositeService = compositeService; + } + + @Override + public void run() { + try { + // Stop the Composite Service + compositeService.stop(); + } catch (Throwable t) { + LOG.info("Error stopping " + compositeService.getName(), t); + } + } + } + } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java new file mode 100644 index 0000000000..9ca0e4b42c --- /dev/null +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/util/TestCompositeService.java @@ -0,0 +1,247 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.util; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.fail; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.YarnException; +import org.apache.hadoop.yarn.service.CompositeService; +import org.apache.hadoop.yarn.service.Service.STATE; +import org.junit.Test; + +public class TestCompositeService { + + private static final int NUM_OF_SERVICES = 5; + + private static final int FAILED_SERVICE_SEQ_NUMBER = 2; + + @Test + public void testCallSequence() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + + assertEquals("Number of registered services ", NUM_OF_SERVICES, + services.length); + + Configuration conf = new Configuration(); + // Initialise the composite service + serviceManager.init(conf); + + // Verify the init() call sequence numbers for every service + for (int i = 0; i < NUM_OF_SERVICES; i++) { + assertEquals("For " + services[i] + + " service, init() call sequence number should have been ", i, + services[i].getCallSequenceNumber()); + } + + // Reset the call sequence numbers + for (int i = 0; i < NUM_OF_SERVICES; i++) { + services[i].reset(); + } + + serviceManager.start(); + + // Verify the start() call sequence numbers for every service + for (int i = 0; i < NUM_OF_SERVICES; i++) { + assertEquals("For " + services[i] + + " service, start() call sequence number should have been ", i, + services[i].getCallSequenceNumber()); + } + + // Reset the call sequence numbers + for (int i = 0; i < NUM_OF_SERVICES; i++) { + services[i].reset(); + } + + serviceManager.stop(); + + // Verify the stop() call sequence numbers for every service + for (int i = 0; i < NUM_OF_SERVICES; i++) { + assertEquals("For " + services[i] + + " service, stop() call sequence number should have been ", + ((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber()); + } + + } + + @Test + public void testServiceStartup() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + if (i == FAILED_SERVICE_SEQ_NUMBER) { + service.setThrowExceptionOnStart(true); + } + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + + Configuration conf = new Configuration(); + + // Initialise the composite service + serviceManager.init(conf); + + // Start the composite service + try { + serviceManager.start(); + fail("Exception should have been thrown due to startup failure of last service"); + } catch (YarnException e) { + for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { + if (i >= FAILED_SERVICE_SEQ_NUMBER) { + // Failed service state should be INITED + assertEquals("Service state should have been ", STATE.INITED, + services[NUM_OF_SERVICES - 1].getServiceState()); + } else { + assertEquals("Service state should have been ", STATE.STOPPED, + services[i].getServiceState()); + } + } + + } + } + + @Test + public void testServiceStop() { + ServiceManager serviceManager = new ServiceManager("ServiceManager"); + + // Add services + for (int i = 0; i < NUM_OF_SERVICES; i++) { + CompositeServiceImpl service = new CompositeServiceImpl(i); + if (i == FAILED_SERVICE_SEQ_NUMBER) { + service.setThrowExceptionOnStop(true); + } + serviceManager.addTestService(service); + } + + CompositeServiceImpl[] services = serviceManager.getServices().toArray( + new CompositeServiceImpl[0]); + + Configuration conf = new Configuration(); + + // Initialise the composite service + serviceManager.init(conf); + + serviceManager.start(); + + // Start the composite service + try { + serviceManager.stop(); + } catch (YarnException e) { + for (int i = 0; i < NUM_OF_SERVICES - 1; i++) { + assertEquals("Service state should have been ", STATE.STOPPED, + services[NUM_OF_SERVICES].getServiceState()); + } + } + } + + public static class CompositeServiceImpl extends CompositeService { + + private static int counter = -1; + + private int callSequenceNumber = -1; + + private boolean throwExceptionOnStart; + + private boolean throwExceptionOnStop; + + public CompositeServiceImpl(int sequenceNumber) { + super(Integer.toString(sequenceNumber)); + } + + @Override + public synchronized void init(Configuration conf) { + counter++; + callSequenceNumber = counter; + super.init(conf); + } + + @Override + public synchronized void start() { + if (throwExceptionOnStart) { + throw new YarnException("Fake service start exception"); + } + counter++; + callSequenceNumber = counter; + super.start(); + } + + @Override + public synchronized void stop() { + counter++; + callSequenceNumber = counter; + if (throwExceptionOnStop) { + throw new YarnException("Fake service stop exception"); + } + super.stop(); + } + + public static int getCounter() { + return counter; + } + + public int getCallSequenceNumber() { + return callSequenceNumber; + } + + public void reset() { + callSequenceNumber = -1; + counter = -1; + } + + public void setThrowExceptionOnStart(boolean throwExceptionOnStart) { + this.throwExceptionOnStart = throwExceptionOnStart; + } + + public void setThrowExceptionOnStop(boolean throwExceptionOnStop) { + this.throwExceptionOnStop = throwExceptionOnStop; + } + + @Override + public String toString() { + return "Service " + getName(); + } + + } + + public static class ServiceManager extends CompositeService { + + public void addTestService(CompositeService service) { + addService(service); + } + + public ServiceManager(String name) { + super(name); + } + } + +} diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java index b5249440c7..8dc16e97df 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-nodemanager/src/main/java/org/apache/hadoop/yarn/server/nodemanager/NodeManager.java @@ -133,13 +133,6 @@ public void init(Configuration conf) { dispatcher.register(ContainerManagerEventType.class, containerManager); addService(dispatcher); - Runtime.getRuntime().addShutdownHook(new Thread() { - @Override - public void run() { - NodeManager.this.stop(); - } - }); - DefaultMetricsSystem.initialize("NodeManager"); // StatusUpdater should be added last so that it get started last @@ -200,10 +193,17 @@ public NodeHealthStatus getNodeHealthStatus() { public static void main(String[] args) { StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); - NodeManager nodeManager = new NodeManager(); - YarnConfiguration conf = new YarnConfiguration(); - nodeManager.init(conf); - nodeManager.start(); + try { + NodeManager nodeManager = new NodeManager(); + Runtime.getRuntime().addShutdownHook( + new CompositeServiceShutdownHook(nodeManager)); + YarnConfiguration conf = new YarnConfiguration(); + nodeManager.init(conf); + nodeManager.start(); + } catch (Throwable t) { + LOG.fatal("Error starting NodeManager", t); + System.exit(-1); + } } } diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index b578fee818..179b56a4af 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -22,7 +22,6 @@ import java.io.IOException; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -45,8 +44,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; -import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; +import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; @@ -101,7 +100,6 @@ public class ResourceManager extends CompositeService implements Recoverable { private SchedulerEventDispatcher schedulerDispatcher; protected RMAppManager rmAppManager; - private final AtomicBoolean shutdown = new AtomicBoolean(false); private WebApp webApp; private RMContext rmContext; private final Store store; @@ -490,20 +488,19 @@ public void recover(RMState state) throws Exception { public static void main(String argv[]) { StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); - ResourceManager resourceManager = null; try { Configuration conf = new YarnConfiguration(); Store store = StoreFactory.getStore(conf); - resourceManager = new ResourceManager(store); + ResourceManager resourceManager = new ResourceManager(store); + Runtime.getRuntime().addShutdownHook( + new CompositeServiceShutdownHook(resourceManager)); resourceManager.init(conf); //resourceManager.recover(store.restore()); //store.doneWithRecovery(); resourceManager.start(); - } catch (Throwable e) { - LOG.error("Error starting RM", e); - if (resourceManager != null) { - resourceManager.stop(); - } + } catch (Throwable t) { + LOG.fatal("Error starting ResourceManager", t); + System.exit(-1); } } }