HADOOP-15679. ShutdownHookManager shutdown time needs to be configurable & extended.
Contributed by Steve Loughran.
This commit is contained in:
parent
e0f6ffdbad
commit
34577d2c21
@ -905,5 +905,14 @@ public class CommonConfigurationKeysPublic {
|
|||||||
|
|
||||||
public static final String HADOOP_TAGS_SYSTEM = "hadoop.tags.system";
|
public static final String HADOOP_TAGS_SYSTEM = "hadoop.tags.system";
|
||||||
public static final String HADOOP_TAGS_CUSTOM = "hadoop.tags.custom";
|
public static final String HADOOP_TAGS_CUSTOM = "hadoop.tags.custom";
|
||||||
|
|
||||||
|
/** Configuration option for the shutdown hook manager shutdown time:
|
||||||
|
* {@value}. */
|
||||||
|
public static final String SERVICE_SHUTDOWN_TIMEOUT =
|
||||||
|
"hadoop.service.shutdown.timeout";
|
||||||
|
|
||||||
|
/** Default shutdown hook timeout: {@value} seconds. */
|
||||||
|
public static final long SERVICE_SHUTDOWN_TIMEOUT_DEFAULT = 30;
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -17,11 +17,17 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||||
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
import org.apache.hadoop.util.concurrent.HadoopExecutors;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -34,6 +40,9 @@
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicBoolean;
|
import java.util.concurrent.atomic.AtomicBoolean;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT_DEFAULT;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* The <code>ShutdownHookManager</code> enables running shutdownHook
|
* The <code>ShutdownHookManager</code> enables running shutdownHook
|
||||||
* in a deterministic order, higher priority first.
|
* in a deterministic order, higher priority first.
|
||||||
@ -42,53 +51,55 @@
|
|||||||
* This class registers a single JVM shutdownHook and run all the
|
* This class registers a single JVM shutdownHook and run all the
|
||||||
* shutdownHooks registered to it (to this class) in order based on their
|
* shutdownHooks registered to it (to this class) in order based on their
|
||||||
* priority.
|
* priority.
|
||||||
|
*
|
||||||
|
* Unless a hook was registered with a shutdown explicitly set through
|
||||||
|
* {@link #addShutdownHook(Runnable, int, long, TimeUnit)},
|
||||||
|
* the shutdown time allocated to it is set by the configuration option
|
||||||
|
* {@link CommonConfigurationKeysPublic#SERVICE_SHUTDOWN_TIMEOUT} in
|
||||||
|
* {@code core-site.xml}, with a default value of
|
||||||
|
* {@link CommonConfigurationKeysPublic#SERVICE_SHUTDOWN_TIMEOUT_DEFAULT}
|
||||||
|
* seconds.
|
||||||
*/
|
*/
|
||||||
public class ShutdownHookManager {
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Evolving
|
||||||
|
public final class ShutdownHookManager {
|
||||||
|
|
||||||
private static final ShutdownHookManager MGR = new ShutdownHookManager();
|
private static final ShutdownHookManager MGR = new ShutdownHookManager();
|
||||||
|
|
||||||
private static final Logger LOG =
|
private static final Logger LOG =
|
||||||
LoggerFactory.getLogger(ShutdownHookManager.class);
|
LoggerFactory.getLogger(ShutdownHookManager.class);
|
||||||
private static final long TIMEOUT_DEFAULT = 10;
|
|
||||||
private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
|
/** Minimum shutdown timeout: {@value} second(s). */
|
||||||
|
public static final long TIMEOUT_MINIMUM = 1;
|
||||||
|
|
||||||
|
/** The default time unit used: seconds. */
|
||||||
|
public static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
|
||||||
|
|
||||||
private static final ExecutorService EXECUTOR =
|
private static final ExecutorService EXECUTOR =
|
||||||
HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
|
||||||
.setDaemon(true).build());
|
.setDaemon(true)
|
||||||
|
.setNameFormat("shutdown-hook-%01d")
|
||||||
|
.build());
|
||||||
|
|
||||||
static {
|
static {
|
||||||
try {
|
try {
|
||||||
Runtime.getRuntime().addShutdownHook(
|
Runtime.getRuntime().addShutdownHook(
|
||||||
new Thread() {
|
new Thread() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
MGR.shutdownInProgress.set(true);
|
if (MGR.shutdownInProgress.getAndSet(true)) {
|
||||||
for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
|
LOG.info("Shutdown process invoked a second time: ignoring");
|
||||||
Future<?> future = EXECUTOR.submit(entry.getHook());
|
return;
|
||||||
try {
|
|
||||||
future.get(entry.getTimeout(), entry.getTimeUnit());
|
|
||||||
} catch (TimeoutException ex) {
|
|
||||||
future.cancel(true);
|
|
||||||
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
|
||||||
getSimpleName() + "' timeout, " + ex.toString(), ex);
|
|
||||||
} catch (Throwable ex) {
|
|
||||||
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
|
||||||
getSimpleName() + "' failed, " + ex.toString(), ex);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
try {
|
|
||||||
EXECUTOR.shutdown();
|
|
||||||
if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
|
|
||||||
TIME_UNIT_DEFAULT)) {
|
|
||||||
LOG.error("ShutdownHookManger shutdown forcefully.");
|
|
||||||
EXECUTOR.shutdownNow();
|
|
||||||
}
|
|
||||||
LOG.debug("ShutdownHookManger complete shutdown.");
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
LOG.error("ShutdownHookManger interrupted while waiting for " +
|
|
||||||
"termination.", ex);
|
|
||||||
EXECUTOR.shutdownNow();
|
|
||||||
Thread.currentThread().interrupt();
|
|
||||||
}
|
}
|
||||||
|
long started = System.currentTimeMillis();
|
||||||
|
int timeoutCount = executeShutdown();
|
||||||
|
long ended = System.currentTimeMillis();
|
||||||
|
LOG.debug(String.format(
|
||||||
|
"Completed shutdown in %.3f seconds; Timeouts: %d",
|
||||||
|
(ended-started)/1000.0, timeoutCount));
|
||||||
|
// each of the hooks have executed; now shut down the
|
||||||
|
// executor itself.
|
||||||
|
shutdownExecutor(new Configuration());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
);
|
);
|
||||||
@ -98,19 +109,93 @@ public void run() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Execute the shutdown.
|
||||||
|
* This is exposed purely for testing: do not invoke it.
|
||||||
|
* @return the number of shutdown hooks which timed out.
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@VisibleForTesting
|
||||||
|
static int executeShutdown() {
|
||||||
|
int timeouts = 0;
|
||||||
|
for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
|
||||||
|
Future<?> future = EXECUTOR.submit(entry.getHook());
|
||||||
|
try {
|
||||||
|
future.get(entry.getTimeout(), entry.getTimeUnit());
|
||||||
|
} catch (TimeoutException ex) {
|
||||||
|
timeouts++;
|
||||||
|
future.cancel(true);
|
||||||
|
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
||||||
|
getSimpleName() + "' timeout, " + ex.toString(), ex);
|
||||||
|
} catch (Throwable ex) {
|
||||||
|
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
|
||||||
|
getSimpleName() + "' failed, " + ex.toString(), ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return timeouts;
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Shutdown the executor thread itself.
|
||||||
|
* @param conf the configuration containing the shutdown timeout setting.
|
||||||
|
*/
|
||||||
|
private static void shutdownExecutor(final Configuration conf) {
|
||||||
|
try {
|
||||||
|
EXECUTOR.shutdown();
|
||||||
|
long shutdownTimeout = getShutdownTimeout(conf);
|
||||||
|
if (!EXECUTOR.awaitTermination(
|
||||||
|
shutdownTimeout,
|
||||||
|
TIME_UNIT_DEFAULT)) {
|
||||||
|
// timeout waiting for the
|
||||||
|
LOG.error("ShutdownHookManger shutdown forcefully after"
|
||||||
|
+ " {} seconds.", shutdownTimeout);
|
||||||
|
EXECUTOR.shutdownNow();
|
||||||
|
}
|
||||||
|
LOG.debug("ShutdownHookManger completed shutdown.");
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
// interrupted.
|
||||||
|
LOG.error("ShutdownHookManger interrupted while waiting for " +
|
||||||
|
"termination.", ex);
|
||||||
|
EXECUTOR.shutdownNow();
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Return <code>ShutdownHookManager</code> singleton.
|
* Return <code>ShutdownHookManager</code> singleton.
|
||||||
*
|
*
|
||||||
* @return <code>ShutdownHookManager</code> singleton.
|
* @return <code>ShutdownHookManager</code> singleton.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
public static ShutdownHookManager get() {
|
public static ShutdownHookManager get() {
|
||||||
return MGR;
|
return MGR;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Get the shutdown timeout in seconds, from the supplied
|
||||||
|
* configuration.
|
||||||
|
* @param conf configuration to use.
|
||||||
|
* @return a timeout, always greater than or equal to {@link #TIMEOUT_MINIMUM}
|
||||||
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@VisibleForTesting
|
||||||
|
static long getShutdownTimeout(Configuration conf) {
|
||||||
|
long duration = conf.getTimeDuration(
|
||||||
|
SERVICE_SHUTDOWN_TIMEOUT,
|
||||||
|
SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
|
||||||
|
TIME_UNIT_DEFAULT);
|
||||||
|
if (duration < TIMEOUT_MINIMUM) {
|
||||||
|
duration = TIMEOUT_MINIMUM;
|
||||||
|
}
|
||||||
|
return duration;
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Private structure to store ShutdownHook, its priority and timeout
|
* Private structure to store ShutdownHook, its priority and timeout
|
||||||
* settings.
|
* settings.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@VisibleForTesting
|
||||||
static class HookEntry {
|
static class HookEntry {
|
||||||
private final Runnable hook;
|
private final Runnable hook;
|
||||||
private final int priority;
|
private final int priority;
|
||||||
@ -118,7 +203,9 @@ static class HookEntry {
|
|||||||
private final TimeUnit unit;
|
private final TimeUnit unit;
|
||||||
|
|
||||||
HookEntry(Runnable hook, int priority) {
|
HookEntry(Runnable hook, int priority) {
|
||||||
this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
|
this(hook, priority,
|
||||||
|
getShutdownTimeout(new Configuration()),
|
||||||
|
TIME_UNIT_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
|
HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
|
||||||
@ -176,10 +263,12 @@ private ShutdownHookManager() {
|
|||||||
*
|
*
|
||||||
* @return the list of shutdownHooks in order of execution.
|
* @return the list of shutdownHooks in order of execution.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Private
|
||||||
|
@VisibleForTesting
|
||||||
List<HookEntry> getShutdownHooksInOrder() {
|
List<HookEntry> getShutdownHooksInOrder() {
|
||||||
List<HookEntry> list;
|
List<HookEntry> list;
|
||||||
synchronized (MGR.hooks) {
|
synchronized (MGR.hooks) {
|
||||||
list = new ArrayList<HookEntry>(MGR.hooks);
|
list = new ArrayList<>(MGR.hooks);
|
||||||
}
|
}
|
||||||
Collections.sort(list, new Comparator<HookEntry>() {
|
Collections.sort(list, new Comparator<HookEntry>() {
|
||||||
|
|
||||||
@ -200,6 +289,8 @@ public int compare(HookEntry o1, HookEntry o2) {
|
|||||||
* @param shutdownHook shutdownHook <code>Runnable</code>
|
* @param shutdownHook shutdownHook <code>Runnable</code>
|
||||||
* @param priority priority of the shutdownHook.
|
* @param priority priority of the shutdownHook.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public void addShutdownHook(Runnable shutdownHook, int priority) {
|
public void addShutdownHook(Runnable shutdownHook, int priority) {
|
||||||
if (shutdownHook == null) {
|
if (shutdownHook == null) {
|
||||||
throw new IllegalArgumentException("shutdownHook cannot be NULL");
|
throw new IllegalArgumentException("shutdownHook cannot be NULL");
|
||||||
@ -223,6 +314,8 @@ public void addShutdownHook(Runnable shutdownHook, int priority) {
|
|||||||
* @param timeout timeout of the shutdownHook
|
* @param timeout timeout of the shutdownHook
|
||||||
* @param unit unit of the timeout <code>TimeUnit</code>
|
* @param unit unit of the timeout <code>TimeUnit</code>
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
|
public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
|
||||||
TimeUnit unit) {
|
TimeUnit unit) {
|
||||||
if (shutdownHook == null) {
|
if (shutdownHook == null) {
|
||||||
@ -242,6 +335,8 @@ public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
|
|||||||
* @return TRUE if the shutdownHook was registered and removed,
|
* @return TRUE if the shutdownHook was registered and removed,
|
||||||
* FALSE otherwise.
|
* FALSE otherwise.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public boolean removeShutdownHook(Runnable shutdownHook) {
|
public boolean removeShutdownHook(Runnable shutdownHook) {
|
||||||
if (shutdownInProgress.get()) {
|
if (shutdownInProgress.get()) {
|
||||||
throw new IllegalStateException("Shutdown in progress, cannot remove a " +
|
throw new IllegalStateException("Shutdown in progress, cannot remove a " +
|
||||||
@ -256,6 +351,8 @@ public boolean removeShutdownHook(Runnable shutdownHook) {
|
|||||||
* @param shutdownHook shutdownHook to check if registered.
|
* @param shutdownHook shutdownHook to check if registered.
|
||||||
* @return TRUE/FALSE depending if the shutdownHook is is registered.
|
* @return TRUE/FALSE depending if the shutdownHook is is registered.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public boolean hasShutdownHook(Runnable shutdownHook) {
|
public boolean hasShutdownHook(Runnable shutdownHook) {
|
||||||
return hooks.contains(new HookEntry(shutdownHook, 0));
|
return hooks.contains(new HookEntry(shutdownHook, 0));
|
||||||
}
|
}
|
||||||
@ -265,6 +362,8 @@ public boolean hasShutdownHook(Runnable shutdownHook) {
|
|||||||
*
|
*
|
||||||
* @return TRUE if the shutdown is in progress, otherwise FALSE.
|
* @return TRUE if the shutdown is in progress, otherwise FALSE.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public boolean isShutdownInProgress() {
|
public boolean isShutdownInProgress() {
|
||||||
return shutdownInProgress.get();
|
return shutdownInProgress.get();
|
||||||
}
|
}
|
||||||
@ -272,7 +371,9 @@ public boolean isShutdownInProgress() {
|
|||||||
/**
|
/**
|
||||||
* clear all registered shutdownHooks.
|
* clear all registered shutdownHooks.
|
||||||
*/
|
*/
|
||||||
|
@InterfaceAudience.Public
|
||||||
|
@InterfaceStability.Stable
|
||||||
public void clearShutdownHooks() {
|
public void clearShutdownHooks() {
|
||||||
hooks.clear();
|
hooks.clear();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -553,6 +553,22 @@
|
|||||||
</description>
|
</description>
|
||||||
</property>
|
</property>
|
||||||
|
|
||||||
|
<property>
|
||||||
|
<name>hadoop.service.shutdown.timeout</name>
|
||||||
|
<value>30s</value>
|
||||||
|
<description>
|
||||||
|
Timeout to wait for each shutdown operation to complete.
|
||||||
|
If a hook takes longer than this time to complete, it will be interrupted,
|
||||||
|
so the service will shutdown. This allows the service shutdown
|
||||||
|
to recover from a blocked operation.
|
||||||
|
Some shutdown hooks may need more time than this, for example when
|
||||||
|
a large amount of data needs to be uploaded to an object store.
|
||||||
|
In this situation: increase the timeout.
|
||||||
|
|
||||||
|
The minimum duration of the timeout is 1 second, "1s".
|
||||||
|
</description>
|
||||||
|
</property>
|
||||||
|
|
||||||
<property>
|
<property>
|
||||||
<name>hadoop.rpc.protection</name>
|
<name>hadoop.rpc.protection</name>
|
||||||
<value>authentication</value>
|
<value>authentication</value>
|
||||||
|
@ -17,97 +17,285 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.util;
|
package org.apache.hadoop.util;
|
||||||
|
|
||||||
import org.apache.commons.lang3.exception.ExceptionUtils;
|
import java.util.List;
|
||||||
import org.slf4j.LoggerFactory;
|
import java.util.concurrent.TimeUnit;
|
||||||
import org.junit.Assert;
|
import java.util.concurrent.atomic.AtomicInteger;
|
||||||
|
|
||||||
|
import org.junit.After;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
import java.util.concurrent.TimeUnit;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
|
||||||
import static java.lang.Thread.sleep;
|
import static java.lang.Thread.sleep;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT;
|
||||||
|
import static org.apache.hadoop.fs.CommonConfigurationKeysPublic.SERVICE_SHUTDOWN_TIMEOUT_DEFAULT;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertNotEquals;
|
||||||
|
import static org.junit.Assert.assertNotNull;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
public class TestShutdownHookManager {
|
public class TestShutdownHookManager {
|
||||||
|
|
||||||
static final Logger LOG =
|
static final Logger LOG =
|
||||||
LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
|
LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
|
||||||
|
|
||||||
|
/**
|
||||||
|
* remove all the shutdown hooks so that they never get invoked later
|
||||||
|
* on in this test process.
|
||||||
|
*/
|
||||||
|
@After
|
||||||
|
public void clearShutdownHooks() {
|
||||||
|
ShutdownHookManager.get().clearShutdownHooks();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify hook registration, then execute the hook callback stage
|
||||||
|
* of shutdown to verify invocation, execution order and timeout
|
||||||
|
* processing.
|
||||||
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void shutdownHookManager() {
|
public void shutdownHookManager() {
|
||||||
ShutdownHookManager mgr = ShutdownHookManager.get();
|
ShutdownHookManager mgr = ShutdownHookManager.get();
|
||||||
Assert.assertNotNull(mgr);
|
assertNotNull("No ShutdownHookManager", mgr);
|
||||||
Assert.assertEquals(0, mgr.getShutdownHooksInOrder().size());
|
assertEquals(0, mgr.getShutdownHooksInOrder().size());
|
||||||
Runnable hook1 = new Runnable() {
|
Hook hook1 = new Hook("hook1", 0, false);
|
||||||
@Override
|
Hook hook2 = new Hook("hook2", 0, false);
|
||||||
public void run() {
|
Hook hook3 = new Hook("hook3", 1000, false);
|
||||||
LOG.info("Shutdown hook1 complete.");
|
Hook hook4 = new Hook("hook4", 25000, true);
|
||||||
}
|
Hook hook5 = new Hook("hook5",
|
||||||
};
|
(SERVICE_SHUTDOWN_TIMEOUT_DEFAULT + 1) * 1000, true);
|
||||||
Runnable hook2 = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
LOG.info("Shutdown hook2 complete.");
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Runnable hook3 = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
sleep(3000);
|
|
||||||
LOG.info("Shutdown hook3 complete.");
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
LOG.info("Shutdown hook3 interrupted exception:",
|
|
||||||
ExceptionUtils.getStackTrace(ex));
|
|
||||||
Assert.fail("Hook 3 should not timeout.");
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
Runnable hook4 = new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
try {
|
|
||||||
sleep(3500);
|
|
||||||
LOG.info("Shutdown hook4 complete.");
|
|
||||||
Assert.fail("Hook 4 should timeout");
|
|
||||||
} catch (InterruptedException ex) {
|
|
||||||
LOG.info("Shutdown hook4 interrupted exception:",
|
|
||||||
ExceptionUtils.getStackTrace(ex));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
|
|
||||||
mgr.addShutdownHook(hook1, 0);
|
mgr.addShutdownHook(hook1, 0);
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
assertTrue(mgr.hasShutdownHook(hook1));
|
||||||
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
||||||
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
|
assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
mgr.removeShutdownHook(hook1);
|
assertTrue(mgr.removeShutdownHook(hook1));
|
||||||
Assert.assertFalse(mgr.hasShutdownHook(hook1));
|
assertFalse(mgr.hasShutdownHook(hook1));
|
||||||
|
assertFalse(mgr.removeShutdownHook(hook1));
|
||||||
|
|
||||||
mgr.addShutdownHook(hook1, 0);
|
mgr.addShutdownHook(hook1, 0);
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
assertTrue(mgr.hasShutdownHook(hook1));
|
||||||
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
assertEquals(SERVICE_SHUTDOWN_TIMEOUT_DEFAULT,
|
||||||
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
|
mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
||||||
|
|
||||||
mgr.addShutdownHook(hook2, 1);
|
mgr.addShutdownHook(hook2, 1);
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook1));
|
assertTrue(mgr.hasShutdownHook(hook1));
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook2));
|
assertTrue(mgr.hasShutdownHook(hook2));
|
||||||
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
|
assertEquals(2, mgr.getShutdownHooksInOrder().size());
|
||||||
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
|
assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
|
assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
|
||||||
|
|
||||||
// Test hook finish without timeout
|
// Test hook finish without timeout
|
||||||
mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
|
mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook3));
|
assertTrue(mgr.hasShutdownHook(hook3));
|
||||||
Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
|
assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
||||||
|
|
||||||
// Test hook finish with timeout
|
// Test hook finish with timeout; highest priority
|
||||||
mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
|
int hook4timeout = 2;
|
||||||
Assert.assertTrue(mgr.hasShutdownHook(hook4));
|
mgr.addShutdownHook(hook4, 3, hook4timeout, TimeUnit.SECONDS);
|
||||||
Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
|
assertTrue(mgr.hasShutdownHook(hook4));
|
||||||
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
|
||||||
LOG.info("Shutdown starts here");
|
assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
|
||||||
|
|
||||||
|
// a default timeout hook and verify it gets the default timeout
|
||||||
|
mgr.addShutdownHook(hook5, 5);
|
||||||
|
ShutdownHookManager.HookEntry hookEntry5 = mgr.getShutdownHooksInOrder()
|
||||||
|
.get(0);
|
||||||
|
assertEquals(hook5, hookEntry5.getHook());
|
||||||
|
assertEquals("default timeout not used",
|
||||||
|
ShutdownHookManager.getShutdownTimeout(new Configuration()),
|
||||||
|
hookEntry5.getTimeout());
|
||||||
|
assertEquals("hook priority", 5, hookEntry5.getPriority());
|
||||||
|
// remove this to avoid a longer sleep in the test run
|
||||||
|
assertTrue("failed to remove " + hook5,
|
||||||
|
mgr.removeShutdownHook(hook5));
|
||||||
|
|
||||||
|
|
||||||
|
// now execute the hook shutdown sequence
|
||||||
|
INVOCATION_COUNT.set(0);
|
||||||
|
LOG.info("invoking executeShutdown()");
|
||||||
|
int timeouts = ShutdownHookManager.executeShutdown();
|
||||||
|
LOG.info("Shutdown completed");
|
||||||
|
assertEquals("Number of timed out hooks", 1, timeouts);
|
||||||
|
|
||||||
|
List<ShutdownHookManager.HookEntry> hooks
|
||||||
|
= mgr.getShutdownHooksInOrder();
|
||||||
|
|
||||||
|
// analyze the hooks
|
||||||
|
for (ShutdownHookManager.HookEntry entry : hooks) {
|
||||||
|
Hook hook = (Hook) entry.getHook();
|
||||||
|
assertTrue("Was not invoked " + hook, hook.invoked);
|
||||||
|
// did any hook raise an exception?
|
||||||
|
hook.maybeThrowAssertion();
|
||||||
|
}
|
||||||
|
|
||||||
|
// check the state of some of the invoked hooks
|
||||||
|
// hook4 was invoked first, but it timed out.
|
||||||
|
assertEquals("Expected to be invoked first " + hook4,
|
||||||
|
1, hook4.invokedOrder);
|
||||||
|
assertFalse("Expected to time out " + hook4, hook4.completed);
|
||||||
|
|
||||||
|
|
||||||
|
// hook1 completed, but in order after the others, so its start time
|
||||||
|
// is the longest.
|
||||||
|
assertTrue("Expected to complete " + hook1, hook1.completed);
|
||||||
|
long invocationInterval = hook1.startTime - hook4.startTime;
|
||||||
|
assertTrue("invocation difference too short " + invocationInterval,
|
||||||
|
invocationInterval >= hook4timeout * 1000);
|
||||||
|
assertTrue("sleeping hook4 blocked other threads for " + invocationInterval,
|
||||||
|
invocationInterval < hook4.sleepTime);
|
||||||
|
|
||||||
|
// finally, clear the hooks
|
||||||
|
mgr.clearShutdownHooks();
|
||||||
|
// and verify that the hooks are empty
|
||||||
|
assertFalse(mgr.hasShutdownHook(hook1));
|
||||||
|
assertEquals("shutdown hook list is not empty",
|
||||||
|
0,
|
||||||
|
mgr.getShutdownHooksInOrder().size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testShutdownTimeoutConfiguration() throws Throwable {
|
||||||
|
// set the shutdown timeout and verify it can be read back.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
long shutdownTimeout = 5;
|
||||||
|
conf.setTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
|
||||||
|
shutdownTimeout, TimeUnit.SECONDS);
|
||||||
|
assertEquals(SERVICE_SHUTDOWN_TIMEOUT,
|
||||||
|
shutdownTimeout,
|
||||||
|
ShutdownHookManager.getShutdownTimeout(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verify that low timeouts simply fall back to
|
||||||
|
* {@link ShutdownHookManager#TIMEOUT_MINIMUM}.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testShutdownTimeoutBadConfiguration() throws Throwable {
|
||||||
|
// set the shutdown timeout and verify it can be read back.
|
||||||
|
Configuration conf = new Configuration();
|
||||||
|
long shutdownTimeout = 50;
|
||||||
|
conf.setTimeDuration(SERVICE_SHUTDOWN_TIMEOUT,
|
||||||
|
shutdownTimeout, TimeUnit.NANOSECONDS);
|
||||||
|
assertEquals(SERVICE_SHUTDOWN_TIMEOUT,
|
||||||
|
ShutdownHookManager.TIMEOUT_MINIMUM,
|
||||||
|
ShutdownHookManager.getShutdownTimeout(conf));
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Verifies that a hook cannot be re-registered: an attempt to do so
|
||||||
|
* will simply be ignored.
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testDuplicateRegistration() throws Throwable {
|
||||||
|
ShutdownHookManager mgr = ShutdownHookManager.get();
|
||||||
|
Hook hook = new Hook("hook1", 0, false);
|
||||||
|
|
||||||
|
// add the hook
|
||||||
|
mgr.addShutdownHook(hook, 2, 1, TimeUnit.SECONDS);
|
||||||
|
|
||||||
|
// add it at a higher priority. This will be ignored.
|
||||||
|
mgr.addShutdownHook(hook, 5);
|
||||||
|
List<ShutdownHookManager.HookEntry> hookList
|
||||||
|
= mgr.getShutdownHooksInOrder();
|
||||||
|
assertEquals("Hook added twice", 1, hookList.size());
|
||||||
|
ShutdownHookManager.HookEntry entry = hookList.get(0);
|
||||||
|
assertEquals("priority of hook", 2, entry.getPriority());
|
||||||
|
assertEquals("timeout of hook", 1, entry.getTimeout());
|
||||||
|
|
||||||
|
// remove the hook
|
||||||
|
assertTrue("failed to remove hook " + hook, mgr.removeShutdownHook(hook));
|
||||||
|
// which will fail a second time
|
||||||
|
assertFalse("expected hook removal to fail", mgr.removeShutdownHook(hook));
|
||||||
|
|
||||||
|
// now register it
|
||||||
|
mgr.addShutdownHook(hook, 5);
|
||||||
|
hookList = mgr.getShutdownHooksInOrder();
|
||||||
|
entry = hookList.get(0);
|
||||||
|
assertEquals("priority of hook", 5, entry.getPriority());
|
||||||
|
assertNotEquals("timeout of hook", 1, entry.getTimeout());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
private static final AtomicInteger INVOCATION_COUNT = new AtomicInteger();
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Hooks for testing; save state for ease of asserting on
|
||||||
|
* invocation.
|
||||||
|
*/
|
||||||
|
private class Hook implements Runnable {
|
||||||
|
|
||||||
|
private final String name;
|
||||||
|
private final long sleepTime;
|
||||||
|
private final boolean expectFailure;
|
||||||
|
private AssertionError assertion;
|
||||||
|
private boolean invoked;
|
||||||
|
private int invokedOrder;
|
||||||
|
private boolean completed;
|
||||||
|
private boolean interrupted;
|
||||||
|
private long startTime;
|
||||||
|
|
||||||
|
Hook(final String name,
|
||||||
|
final long sleepTime,
|
||||||
|
final boolean expectFailure) {
|
||||||
|
this.name = name;
|
||||||
|
this.sleepTime = sleepTime;
|
||||||
|
this.expectFailure = expectFailure;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void run() {
|
||||||
|
try {
|
||||||
|
invoked = true;
|
||||||
|
invokedOrder = INVOCATION_COUNT.incrementAndGet();
|
||||||
|
startTime = System.currentTimeMillis();
|
||||||
|
LOG.info("Starting shutdown of {} with sleep time of {}",
|
||||||
|
name, sleepTime);
|
||||||
|
if (sleepTime > 0) {
|
||||||
|
sleep(sleepTime);
|
||||||
|
}
|
||||||
|
LOG.info("Completed shutdown of {}", name);
|
||||||
|
completed = true;
|
||||||
|
if (expectFailure) {
|
||||||
|
assertion = new AssertionError("Expected a failure of " + name);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException ex) {
|
||||||
|
LOG.info("Shutdown {} interrupted exception", name, ex);
|
||||||
|
interrupted = true;
|
||||||
|
if (!expectFailure) {
|
||||||
|
assertion = new AssertionError("Timeout of " + name, ex);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
maybeThrowAssertion();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Raise any exception generated during the shutdown process.
|
||||||
|
* @throws AssertionError any assertion from the shutdown.
|
||||||
|
*/
|
||||||
|
void maybeThrowAssertion() throws AssertionError {
|
||||||
|
if (assertion != null) {
|
||||||
|
throw assertion;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public String toString() {
|
||||||
|
final StringBuilder sb = new StringBuilder("Hook{");
|
||||||
|
sb.append("name='").append(name).append('\'');
|
||||||
|
sb.append(", sleepTime=").append(sleepTime);
|
||||||
|
sb.append(", expectFailure=").append(expectFailure);
|
||||||
|
sb.append(", invoked=").append(invoked);
|
||||||
|
sb.append(", invokedOrder=").append(invokedOrder);
|
||||||
|
sb.append(", completed=").append(completed);
|
||||||
|
sb.append(", interrupted=").append(interrupted);
|
||||||
|
sb.append('}');
|
||||||
|
return sb.toString();
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user