MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can gracefully exit. Contributed by Abhijit Suresh Shingate.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1170746 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2011-09-14 17:57:52 +00:00
parent 47e04d3124
commit 4ba2acf336
7 changed files with 325 additions and 44 deletions

View File

@ -292,6 +292,9 @@ Release 0.23.0 - Unreleased
MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a MAPREDUCE-2899. Replace major parts of ApplicationSubmissionContext with a
ContainerLaunchContext (Arun Murthy via mahadev) ContainerLaunchContext (Arun Murthy via mahadev)
MAPREDUCE-2966. Added ShutDown hooks for MRV2 processes so that they can
gracefully exit. (Abhijit Suresh Shingate via vinodkv)
OPTIMIZATIONS OPTIMIZATIONS
MAPREDUCE-2026. Make JobTracker.getJobCounters() and MAPREDUCE-2026. Make JobTracker.getJobCounters() and

View File

@ -559,12 +559,14 @@ public void handle(SpeculatorEvent event) {
public static void main(String[] args) { public static void main(String[] args) {
try { try {
//Configuration.addDefaultResource("job.xml"); //Configuration.addDefaultResource("job.xml");
ApplicationId applicationId = RecordFactoryProvider.getRecordFactory(null).newRecordInstance(ApplicationId.class); ApplicationId applicationId = RecordFactoryProvider
.getRecordFactory(null).newRecordInstance(ApplicationId.class);
applicationId.setClusterTimestamp(Long.valueOf(args[0])); applicationId.setClusterTimestamp(Long.valueOf(args[0]));
applicationId.setId(Integer.valueOf(args[1])); applicationId.setId(Integer.valueOf(args[1]));
int failCount = Integer.valueOf(args[2]); int failCount = Integer.valueOf(args[2]);
MRAppMaster appMaster = new MRAppMaster(applicationId, failCount); MRAppMaster appMaster = new MRAppMaster(applicationId, failCount);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(appMaster));
YarnConfiguration conf = new YarnConfiguration(new JobConf()); YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRConstants.JOB_CONF_FILE)); conf.addResource(new Path(MRConstants.JOB_CONF_FILE));
conf.set(MRJobConfig.USER_NAME, conf.set(MRJobConfig.USER_NAME,
@ -573,7 +575,7 @@ public static void main(String[] args) {
appMaster.init(conf); appMaster.init(conf);
appMaster.start(); appMaster.start();
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Caught throwable. Exiting:", t); LOG.fatal("Error starting MRAppMaster", t);
System.exit(1); System.exit(1);
} }
} }

View File

@ -74,14 +74,15 @@ protected void doSecureLogin(Configuration conf) throws IOException {
public static void main(String[] args) { public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG); StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
JobHistoryServer server = null;
try { try {
server = new JobHistoryServer(); JobHistoryServer jobHistoryServer = new JobHistoryServer();
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer));
YarnConfiguration conf = new YarnConfiguration(new JobConf()); YarnConfiguration conf = new YarnConfiguration(new JobConf());
server.init(conf); jobHistoryServer.init(conf);
server.start(); jobHistoryServer.start();
} catch (Throwable e) { } catch (Throwable t) {
LOG.fatal(StringUtils.stringifyException(e)); LOG.fatal("Error starting JobHistoryServer", t);
System.exit(-1); System.exit(-1);
} }
} }

View File

@ -67,28 +67,59 @@ public synchronized void start() {
Service service = serviceList.get(i); Service service = serviceList.get(i);
service.start(); service.start();
} }
} catch(Throwable e) { super.start();
} catch (Throwable e) {
LOG.error("Error starting services " + getName(), e); LOG.error("Error starting services " + getName(), e);
for (int j = i-1; j >= 0; j--) { // Note that the state of the failed service is still INITED and not
Service service = serviceList.get(j); // STARTED. Even though the last service is not started completely, still
try { // call stop() on all services including failed service to make sure cleanup
service.stop(); // happens.
} catch(Throwable t) { stop(i);
LOG.info("Error stopping " + service.getName(), t);
}
}
throw new YarnException("Failed to Start " + getName(), e); throw new YarnException("Failed to Start " + getName(), e);
} }
super.start();
} }
public synchronized void stop() { public synchronized void stop() {
//stop in reserve order of start if (serviceList.size() > 0) {
for (int i = serviceList.size() - 1; i >= 0; i--) { stop(serviceList.size() - 1);
Service service = serviceList.get(i);
service.stop();
} }
super.stop(); super.stop();
} }
private synchronized void stop(int numOfServicesStarted) {
// stop in reserve order of start
for (int i = numOfServicesStarted; i >= 0; i--) {
Service service = serviceList.get(i);
try {
service.stop();
} catch (Throwable t) {
LOG.info("Error stopping " + service.getName(), t);
}
}
}
/**
* JVM Shutdown hook for CompositeService which will stop the give
* CompositeService gracefully in case of JVM shutdown.
*/
public static class CompositeServiceShutdownHook extends Thread {
private CompositeService compositeService;
public CompositeServiceShutdownHook(CompositeService compositeService) {
this.compositeService = compositeService;
}
@Override
public void run() {
try {
// Stop the Composite Service
compositeService.stop();
} catch (Throwable t) {
LOG.info("Error stopping " + compositeService.getName(), t);
}
}
}
} }

View File

@ -0,0 +1,247 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.util;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.fail;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.service.CompositeService;
import org.apache.hadoop.yarn.service.Service.STATE;
import org.junit.Test;
public class TestCompositeService {
private static final int NUM_OF_SERVICES = 5;
private static final int FAILED_SERVICE_SEQ_NUMBER = 2;
@Test
public void testCallSequence() {
ServiceManager serviceManager = new ServiceManager("ServiceManager");
// Add services
for (int i = 0; i < NUM_OF_SERVICES; i++) {
CompositeServiceImpl service = new CompositeServiceImpl(i);
serviceManager.addTestService(service);
}
CompositeServiceImpl[] services = serviceManager.getServices().toArray(
new CompositeServiceImpl[0]);
assertEquals("Number of registered services ", NUM_OF_SERVICES,
services.length);
Configuration conf = new Configuration();
// Initialise the composite service
serviceManager.init(conf);
// Verify the init() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
+ " service, init() call sequence number should have been ", i,
services[i].getCallSequenceNumber());
}
// Reset the call sequence numbers
for (int i = 0; i < NUM_OF_SERVICES; i++) {
services[i].reset();
}
serviceManager.start();
// Verify the start() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
+ " service, start() call sequence number should have been ", i,
services[i].getCallSequenceNumber());
}
// Reset the call sequence numbers
for (int i = 0; i < NUM_OF_SERVICES; i++) {
services[i].reset();
}
serviceManager.stop();
// Verify the stop() call sequence numbers for every service
for (int i = 0; i < NUM_OF_SERVICES; i++) {
assertEquals("For " + services[i]
+ " service, stop() call sequence number should have been ",
((NUM_OF_SERVICES - 1) - i), services[i].getCallSequenceNumber());
}
}
@Test
public void testServiceStartup() {
ServiceManager serviceManager = new ServiceManager("ServiceManager");
// Add services
for (int i = 0; i < NUM_OF_SERVICES; i++) {
CompositeServiceImpl service = new CompositeServiceImpl(i);
if (i == FAILED_SERVICE_SEQ_NUMBER) {
service.setThrowExceptionOnStart(true);
}
serviceManager.addTestService(service);
}
CompositeServiceImpl[] services = serviceManager.getServices().toArray(
new CompositeServiceImpl[0]);
Configuration conf = new Configuration();
// Initialise the composite service
serviceManager.init(conf);
// Start the composite service
try {
serviceManager.start();
fail("Exception should have been thrown due to startup failure of last service");
} catch (YarnException e) {
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
if (i >= FAILED_SERVICE_SEQ_NUMBER) {
// Failed service state should be INITED
assertEquals("Service state should have been ", STATE.INITED,
services[NUM_OF_SERVICES - 1].getServiceState());
} else {
assertEquals("Service state should have been ", STATE.STOPPED,
services[i].getServiceState());
}
}
}
}
@Test
public void testServiceStop() {
ServiceManager serviceManager = new ServiceManager("ServiceManager");
// Add services
for (int i = 0; i < NUM_OF_SERVICES; i++) {
CompositeServiceImpl service = new CompositeServiceImpl(i);
if (i == FAILED_SERVICE_SEQ_NUMBER) {
service.setThrowExceptionOnStop(true);
}
serviceManager.addTestService(service);
}
CompositeServiceImpl[] services = serviceManager.getServices().toArray(
new CompositeServiceImpl[0]);
Configuration conf = new Configuration();
// Initialise the composite service
serviceManager.init(conf);
serviceManager.start();
// Start the composite service
try {
serviceManager.stop();
} catch (YarnException e) {
for (int i = 0; i < NUM_OF_SERVICES - 1; i++) {
assertEquals("Service state should have been ", STATE.STOPPED,
services[NUM_OF_SERVICES].getServiceState());
}
}
}
public static class CompositeServiceImpl extends CompositeService {
private static int counter = -1;
private int callSequenceNumber = -1;
private boolean throwExceptionOnStart;
private boolean throwExceptionOnStop;
public CompositeServiceImpl(int sequenceNumber) {
super(Integer.toString(sequenceNumber));
}
@Override
public synchronized void init(Configuration conf) {
counter++;
callSequenceNumber = counter;
super.init(conf);
}
@Override
public synchronized void start() {
if (throwExceptionOnStart) {
throw new YarnException("Fake service start exception");
}
counter++;
callSequenceNumber = counter;
super.start();
}
@Override
public synchronized void stop() {
counter++;
callSequenceNumber = counter;
if (throwExceptionOnStop) {
throw new YarnException("Fake service stop exception");
}
super.stop();
}
public static int getCounter() {
return counter;
}
public int getCallSequenceNumber() {
return callSequenceNumber;
}
public void reset() {
callSequenceNumber = -1;
counter = -1;
}
public void setThrowExceptionOnStart(boolean throwExceptionOnStart) {
this.throwExceptionOnStart = throwExceptionOnStart;
}
public void setThrowExceptionOnStop(boolean throwExceptionOnStop) {
this.throwExceptionOnStop = throwExceptionOnStop;
}
@Override
public String toString() {
return "Service " + getName();
}
}
public static class ServiceManager extends CompositeService {
public void addTestService(CompositeService service) {
addService(service);
}
public ServiceManager(String name) {
super(name);
}
}
}

View File

@ -133,13 +133,6 @@ public void init(Configuration conf) {
dispatcher.register(ContainerManagerEventType.class, containerManager); dispatcher.register(ContainerManagerEventType.class, containerManager);
addService(dispatcher); addService(dispatcher);
Runtime.getRuntime().addShutdownHook(new Thread() {
@Override
public void run() {
NodeManager.this.stop();
}
});
DefaultMetricsSystem.initialize("NodeManager"); DefaultMetricsSystem.initialize("NodeManager");
// StatusUpdater should be added last so that it get started last // StatusUpdater should be added last so that it get started last
@ -200,10 +193,17 @@ public NodeHealthStatus getNodeHealthStatus() {
public static void main(String[] args) { public static void main(String[] args) {
StringUtils.startupShutdownMessage(NodeManager.class, args, LOG); StringUtils.startupShutdownMessage(NodeManager.class, args, LOG);
try {
NodeManager nodeManager = new NodeManager(); NodeManager nodeManager = new NodeManager();
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(nodeManager));
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
nodeManager.init(conf); nodeManager.init(conf);
nodeManager.start(); nodeManager.start();
} catch (Throwable t) {
LOG.fatal("Error starting NodeManager", t);
System.exit(-1);
}
} }
} }

View File

@ -22,7 +22,6 @@
import java.io.IOException; import java.io.IOException;
import java.util.concurrent.BlockingQueue; import java.util.concurrent.BlockingQueue;
import java.util.concurrent.LinkedBlockingQueue; import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
@ -45,8 +44,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher; import org.apache.hadoop.yarn.server.resourcemanager.amlauncher.ApplicationMasterLauncher;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory; import org.apache.hadoop.yarn.server.resourcemanager.recovery.StoreFactory;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.Store.RMState;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType;
@ -101,7 +100,6 @@ public class ResourceManager extends CompositeService implements Recoverable {
private SchedulerEventDispatcher schedulerDispatcher; private SchedulerEventDispatcher schedulerDispatcher;
protected RMAppManager rmAppManager; protected RMAppManager rmAppManager;
private final AtomicBoolean shutdown = new AtomicBoolean(false);
private WebApp webApp; private WebApp webApp;
private RMContext rmContext; private RMContext rmContext;
private final Store store; private final Store store;
@ -490,20 +488,19 @@ public void recover(RMState state) throws Exception {
public static void main(String argv[]) { public static void main(String argv[]) {
StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG); StringUtils.startupShutdownMessage(ResourceManager.class, argv, LOG);
ResourceManager resourceManager = null;
try { try {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf); Store store = StoreFactory.getStore(conf);
resourceManager = new ResourceManager(store); ResourceManager resourceManager = new ResourceManager(store);
Runtime.getRuntime().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager));
resourceManager.init(conf); resourceManager.init(conf);
//resourceManager.recover(store.restore()); //resourceManager.recover(store.restore());
//store.doneWithRecovery(); //store.doneWithRecovery();
resourceManager.start(); resourceManager.start();
} catch (Throwable e) { } catch (Throwable t) {
LOG.error("Error starting RM", e); LOG.fatal("Error starting ResourceManager", t);
if (resourceManager != null) { System.exit(-1);
resourceManager.stop();
}
} }
} }
} }