MAPREDUCE-4205. retrofit all JVM shutdown hooks to use ShutdownHookManager (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1333748 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2012-05-04 03:20:16 +00:00
parent a7993ef5e8
commit 0fca4fbaf3
8 changed files with 63 additions and 21 deletions

View File

@ -163,6 +163,9 @@ Release 2.0.0 - UNRELEASED
MAPREDUCE-4219. make default container-executor.conf.dir be a path MAPREDUCE-4219. make default container-executor.conf.dir be a path
relative to the container-executor binary. (rvs via tucu) relative to the container-executor binary. (rvs via tucu)
MAPREDUCE-4205. retrofit all JVM shutdown hooks to use ShutdownHookManager
(tucu)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -90,6 +90,7 @@
import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.Token;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.yarn.Clock; import org.apache.hadoop.yarn.Clock;
import org.apache.hadoop.yarn.ClusterInfo; import org.apache.hadoop.yarn.ClusterInfo;
import org.apache.hadoop.yarn.SystemClock; import org.apache.hadoop.yarn.SystemClock;
@ -130,6 +131,11 @@ public class MRAppMaster extends CompositeService {
private static final Log LOG = LogFactory.getLog(MRAppMaster.class); private static final Log LOG = LogFactory.getLog(MRAppMaster.class);
/**
* Priority of the MRAppMaster shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private Clock clock; private Clock clock;
private final long startTime; private final long startTime;
private final long appSubmitTime; private final long appSubmitTime;
@ -990,8 +996,8 @@ public static void main(String[] args) {
new MRAppMaster(applicationAttemptId, containerId, nodeHostString, new MRAppMaster(applicationAttemptId, containerId, nodeHostString,
Integer.parseInt(nodePortString), Integer.parseInt(nodePortString),
Integer.parseInt(nodeHttpPortString), appSubmitTime); Integer.parseInt(nodeHttpPortString), appSubmitTime);
Runtime.getRuntime().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new MRAppMasterShutdownHook(appMaster)); new MRAppMasterShutdownHook(appMaster), SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf()); YarnConfiguration conf = new YarnConfiguration(new JobConf());
conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE)); conf.addResource(new Path(MRJobConfig.JOB_CONF_FILE));
String jobUserName = System String jobUserName = System
@ -1010,7 +1016,7 @@ public static void main(String[] args) {
// The shutdown hook that runs when a signal is received AND during normal // The shutdown hook that runs when a signal is received AND during normal
// close of the JVM. // close of the JVM.
static class MRAppMasterShutdownHook extends Thread { static class MRAppMasterShutdownHook implements Runnable {
MRAppMaster appMaster; MRAppMaster appMaster;
MRAppMasterShutdownHook(MRAppMaster appMaster) { MRAppMasterShutdownHook(MRAppMaster appMaster) {
this.appMaster = appMaster; this.appMaster = appMaster;
@ -1028,12 +1034,6 @@ public void run() {
appMaster.jobHistoryEventHandler.setSignalled(true); appMaster.jobHistoryEventHandler.setSignalled(true);
} }
appMaster.stop(); appMaster.stop();
try {
//Close all the FileSystem objects
FileSystem.closeAll();
} catch (IOException ioe) {
LOG.warn("Failed to close all FileSystem objects", ioe);
}
} }
} }

View File

@ -28,6 +28,7 @@
import org.apache.hadoop.mapreduce.MRConfig; import org.apache.hadoop.mapreduce.MRConfig;
import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig; import org.apache.hadoop.mapreduce.v2.jobhistory.JHAdminConfig;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -40,6 +41,12 @@
* *
*****************************************************************/ *****************************************************************/
public class JobHistoryServer extends CompositeService { public class JobHistoryServer extends CompositeService {
/**
* Priority of the JobHistoryServer shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(JobHistoryServer.class); private static final Log LOG = LogFactory.getLog(JobHistoryServer.class);
private HistoryContext historyContext; private HistoryContext historyContext;
private HistoryClientService clientService; private HistoryClientService clientService;
@ -118,8 +125,9 @@ public static void main(String[] args) {
StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG); StringUtils.startupShutdownMessage(JobHistoryServer.class, args, LOG);
try { try {
JobHistoryServer jobHistoryServer = new JobHistoryServer(); JobHistoryServer jobHistoryServer = new JobHistoryServer();
Runtime.getRuntime().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(jobHistoryServer)); new CompositeServiceShutdownHook(jobHistoryServer),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(new JobConf()); YarnConfiguration conf = new YarnConfiguration(new JobConf());
jobHistoryServer.init(conf); jobHistoryServer.init(conf);
jobHistoryServer.start(); jobHistoryServer.start();

View File

@ -107,12 +107,11 @@ private synchronized void stop(int numOfServicesStarted) {
* JVM Shutdown hook for CompositeService which will stop the give * JVM Shutdown hook for CompositeService which will stop the give
* CompositeService gracefully in case of JVM shutdown. * CompositeService gracefully in case of JVM shutdown.
*/ */
public static class CompositeServiceShutdownHook extends Thread { public static class CompositeServiceShutdownHook implements Runnable {
private CompositeService compositeService; private CompositeService compositeService;
public CompositeServiceShutdownHook(CompositeService compositeService) { public CompositeServiceShutdownHook(CompositeService compositeService) {
super("CompositeServiceShutdownHook for " + compositeService.getName());
this.compositeService = compositeService; this.compositeService = compositeService;
} }

View File

@ -30,6 +30,7 @@
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -54,6 +55,12 @@
public class NodeManager extends CompositeService implements public class NodeManager extends CompositeService implements
ServiceStateChangeListener { ServiceStateChangeListener {
/**
* Priority of the NodeManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(NodeManager.class); private static final Log LOG = LogFactory.getLog(NodeManager.class);
protected final NodeManagerMetrics metrics = NodeManagerMetrics.create(); protected final NodeManagerMetrics metrics = NodeManagerMetrics.create();
protected ContainerTokenSecretManager containerTokenSecretManager; protected ContainerTokenSecretManager containerTokenSecretManager;
@ -250,11 +257,12 @@ private void initAndStartNodeManager(boolean hasToReboot) {
// Remove the old hook if we are rebooting. // Remove the old hook if we are rebooting.
if (hasToReboot && null != nodeManagerShutdownHook) { if (hasToReboot && null != nodeManagerShutdownHook) {
Runtime.getRuntime().removeShutdownHook(nodeManagerShutdownHook); ShutdownHookManager.get().removeShutdownHook(nodeManagerShutdownHook);
} }
nodeManagerShutdownHook = new CompositeServiceShutdownHook(this); nodeManagerShutdownHook = new CompositeServiceShutdownHook(this);
Runtime.getRuntime().addShutdownHook(nodeManagerShutdownHook); ShutdownHookManager.get().addShutdownHook(nodeManagerShutdownHook,
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
this.init(conf); this.init(conf);

View File

@ -32,6 +32,7 @@
import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.metrics2.source.JvmMetrics;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ReflectionUtils; import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
@ -87,6 +88,12 @@
*/ */
@SuppressWarnings("unchecked") @SuppressWarnings("unchecked")
public class ResourceManager extends CompositeService implements Recoverable { public class ResourceManager extends CompositeService implements Recoverable {
/**
* Priority of the ResourceManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(ResourceManager.class); private static final Log LOG = LogFactory.getLog(ResourceManager.class);
public static final long clusterTimeStamp = System.currentTimeMillis(); public static final long clusterTimeStamp = System.currentTimeMillis();
@ -613,8 +620,9 @@ public static void main(String argv[]) {
Configuration conf = new YarnConfiguration(); Configuration conf = new YarnConfiguration();
Store store = StoreFactory.getStore(conf); Store store = StoreFactory.getStore(conf);
ResourceManager resourceManager = new ResourceManager(store); ResourceManager resourceManager = new ResourceManager(store);
Runtime.getRuntime().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(resourceManager)); new CompositeServiceShutdownHook(resourceManager),
SHUTDOWN_HOOK_PRIORITY);
resourceManager.init(conf); resourceManager.init(conf);
//resourceManager.recover(store.restore()); //resourceManager.recover(store.restore());
//store.doneWithRecovery(); //store.doneWithRecovery();

View File

@ -24,6 +24,7 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.YarnException; import org.apache.hadoop.yarn.YarnException;
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
@ -34,6 +35,12 @@
* web interfaces. * web interfaces.
*/ */
public class WebAppProxyServer extends CompositeService { public class WebAppProxyServer extends CompositeService {
/**
* Priority of the ResourceManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(WebAppProxyServer.class); private static final Log LOG = LogFactory.getLog(WebAppProxyServer.class);
private WebAppProxy proxy = null; private WebAppProxy proxy = null;
@ -69,8 +76,9 @@ public static void main(String[] args) {
StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG); StringUtils.startupShutdownMessage(WebAppProxyServer.class, args, LOG);
try { try {
WebAppProxyServer proxy = new WebAppProxyServer(); WebAppProxyServer proxy = new WebAppProxyServer();
Runtime.getRuntime().addShutdownHook( ShutdownHookManager.get().addShutdownHook(
new CompositeServiceShutdownHook(proxy)); new CompositeServiceShutdownHook(proxy),
SHUTDOWN_HOOK_PRIORITY);
YarnConfiguration conf = new YarnConfiguration(); YarnConfiguration conf = new YarnConfiguration();
proxy.init(conf); proxy.init(conf);
proxy.start(); proxy.start();

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.tools.mapred.CopyMapper; import org.apache.hadoop.tools.mapred.CopyMapper;
import org.apache.hadoop.tools.mapred.CopyOutputFormat; import org.apache.hadoop.tools.mapred.CopyOutputFormat;
import org.apache.hadoop.tools.util.DistCpUtils; import org.apache.hadoop.tools.util.DistCpUtils;
import org.apache.hadoop.util.ShutdownHookManager;
import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner; import org.apache.hadoop.util.ToolRunner;
@ -49,6 +50,12 @@
* behaviour. * behaviour.
*/ */
public class DistCp extends Configured implements Tool { public class DistCp extends Configured implements Tool {
/**
* Priority of the ResourceManager shutdown hook.
*/
public static final int SHUTDOWN_HOOK_PRIORITY = 30;
private static final Log LOG = LogFactory.getLog(DistCp.class); private static final Log LOG = LogFactory.getLog(DistCp.class);
private DistCpOptions inputOptions; private DistCpOptions inputOptions;
@ -353,7 +360,8 @@ public static void main(String argv[]) {
DistCp distCp = new DistCp(); DistCp distCp = new DistCp();
Cleanup CLEANUP = new Cleanup(distCp); Cleanup CLEANUP = new Cleanup(distCp);
Runtime.getRuntime().addShutdownHook(CLEANUP); ShutdownHookManager.get().addShutdownHook(CLEANUP,
SHUTDOWN_HOOK_PRIORITY);
System.exit(ToolRunner.run(getDefaultConf(), distCp, argv)); System.exit(ToolRunner.run(getDefaultConf(), distCp, argv));
} }
catch (Exception e) { catch (Exception e) {
@ -388,7 +396,7 @@ private boolean isSubmitted() {
return submitted; return submitted;
} }
private static class Cleanup extends Thread { private static class Cleanup implements Runnable {
private final DistCp distCp; private final DistCp distCp;
public Cleanup(DistCp distCp) { public Cleanup(DistCp distCp) {