HADOOP-12950. ShutdownHookManager should have a timeout for each of the Registered shutdown hook. Contributed by Xiaoyu Yao.

This commit is contained in:
Xiaoyu Yao 2016-03-31 15:20:09 -07:00
parent 19639785f5
commit aac4d65bf9
2 changed files with 150 additions and 23 deletions

View File

@ -17,8 +17,10 @@
*/
package org.apache.hadoop.util;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.concurrent.HadoopExecutors;
import java.util.ArrayList;
import java.util.Collections;
@ -26,6 +28,10 @@
import java.util.HashSet;
import java.util.List;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
/**
@ -42,7 +48,12 @@ public class ShutdownHookManager {
private static final ShutdownHookManager MGR = new ShutdownHookManager();
private static final Log LOG = LogFactory.getLog(ShutdownHookManager.class);
private static final long TIMEOUT_DEFAULT = 10;
private static final TimeUnit TIME_UNIT_DEFAULT = TimeUnit.SECONDS;
private static final ExecutorService EXECUTOR =
HadoopExecutors.newSingleThreadExecutor(new ThreadFactoryBuilder()
.setDaemon(true).build());
static {
try {
Runtime.getRuntime().addShutdownHook(
@ -50,14 +61,33 @@ public class ShutdownHookManager {
@Override
public void run() {
MGR.shutdownInProgress.set(true);
for (Runnable hook: MGR.getShutdownHooksInOrder()) {
for (HookEntry entry: MGR.getShutdownHooksInOrder()) {
Future<?> future = EXECUTOR.submit(entry.getHook());
try {
hook.run();
future.get(entry.getTimeout(), entry.getTimeUnit());
} catch (TimeoutException ex) {
future.cancel(true);
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
getSimpleName() + "' timeout, " + ex.toString(), ex);
} catch (Throwable ex) {
LOG.warn("ShutdownHook '" + hook.getClass().getSimpleName() +
"' failed, " + ex.toString(), ex);
LOG.warn("ShutdownHook '" + entry.getHook().getClass().
getSimpleName() + "' failed, " + ex.toString(), ex);
}
}
try {
EXECUTOR.shutdown();
if (!EXECUTOR.awaitTermination(TIMEOUT_DEFAULT,
TIME_UNIT_DEFAULT)) {
LOG.error("ShutdownHookManger shutdown forcefully.");
EXECUTOR.shutdownNow();
}
LOG.info("ShutdownHookManger complete shutdown.");
} catch (InterruptedException ex) {
LOG.error("ShutdownHookManger interrupted while waiting for " +
"termination.", ex);
EXECUTOR.shutdownNow();
Thread.currentThread().interrupt();
}
}
}
);
@ -77,15 +107,24 @@ public static ShutdownHookManager get() {
}
/**
* Private structure to store ShutdownHook and its priority.
* Private structure to store ShutdownHook, its priority and timeout
* settings.
*/
private static class HookEntry {
Runnable hook;
int priority;
static class HookEntry {
private final Runnable hook;
private final int priority;
private final long timeout;
private final TimeUnit unit;
public HookEntry(Runnable hook, int priority) {
HookEntry(Runnable hook, int priority) {
this(hook, priority, TIMEOUT_DEFAULT, TIME_UNIT_DEFAULT);
}
HookEntry(Runnable hook, int priority, long timeout, TimeUnit unit) {
this.hook = hook;
this.priority = priority;
this.timeout = timeout;
this.unit = unit;
}
@Override
@ -104,9 +143,24 @@ public boolean equals(Object obj) {
return eq;
}
Runnable getHook() {
return hook;
}
private Set<HookEntry> hooks =
int getPriority() {
return priority;
}
long getTimeout() {
return timeout;
}
TimeUnit getTimeUnit() {
return unit;
}
}
private final Set<HookEntry> hooks =
Collections.synchronizedSet(new HashSet<HookEntry>());
private AtomicBoolean shutdownInProgress = new AtomicBoolean(false);
@ -121,7 +175,7 @@ private ShutdownHookManager() {
*
* @return the list of shutdownHooks in order of execution.
*/
List<Runnable> getShutdownHooksInOrder() {
List<HookEntry> getShutdownHooksInOrder() {
List<HookEntry> list;
synchronized (MGR.hooks) {
list = new ArrayList<HookEntry>(MGR.hooks);
@ -134,11 +188,7 @@ public int compare(HookEntry o1, HookEntry o2) {
return o2.priority - o1.priority;
}
});
List<Runnable> ordered = new ArrayList<Runnable>();
for (HookEntry entry: list) {
ordered.add(entry.hook);
}
return ordered;
return list;
}
/**
@ -154,11 +204,36 @@ public void addShutdownHook(Runnable shutdownHook, int priority) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a shutdownHook");
throw new IllegalStateException("Shutdown in progress, cannot add a " +
"shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority));
}
/**
*
* Adds a shutdownHook with a priority and timeout the higher the priority
* the earlier will run. ShutdownHooks with same priority run
* in a non-deterministic order. The shutdown hook will be terminated if it
* has not been finished in the specified period of time.
*
* @param shutdownHook shutdownHook <code>Runnable</code>
* @param priority priority of the shutdownHook
* @param timeout timeout of the shutdownHook
* @param unit unit of the timeout <code>TimeUnit</code>
*/
public void addShutdownHook(Runnable shutdownHook, int priority, long timeout,
TimeUnit unit) {
if (shutdownHook == null) {
throw new IllegalArgumentException("shutdownHook cannot be NULL");
}
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot add a " +
"shutdownHook");
}
hooks.add(new HookEntry(shutdownHook, priority, timeout, unit));
}
/**
* Removes a shutdownHook.
*
@ -168,7 +243,8 @@ public void addShutdownHook(Runnable shutdownHook, int priority) {
*/
public boolean removeShutdownHook(Runnable shutdownHook) {
if (shutdownInProgress.get()) {
throw new IllegalStateException("Shutdown in progress, cannot remove a shutdownHook");
throw new IllegalStateException("Shutdown in progress, cannot remove a " +
"shutdownHook");
}
return hooks.remove(new HookEntry(shutdownHook, 0));
}

View File

@ -17,10 +17,19 @@
*/
package org.apache.hadoop.util;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.slf4j.LoggerFactory;
import org.junit.Assert;
import org.junit.Test;
import org.slf4j.Logger;
import java.util.concurrent.TimeUnit;
import static java.lang.Thread.sleep;
public class TestShutdownHookManager {
static final Logger LOG =
LoggerFactory.getLogger(TestShutdownHookManager.class.getName());
@Test
public void shutdownHookManager() {
@ -30,18 +39,48 @@ public void shutdownHookManager() {
Runnable hook1 = new Runnable() {
@Override
public void run() {
LOG.info("Shutdown hook1 complete.");
}
};
Runnable hook2 = new Runnable() {
@Override
public void run() {
LOG.info("Shutdown hook2 complete.");
}
};
Runnable hook3 = new Runnable() {
@Override
public void run() {
try {
sleep(3000);
LOG.info("Shutdown hook3 complete.");
} catch (InterruptedException ex) {
LOG.info("Shutdown hook3 interrupted exception:",
ExceptionUtils.getStackTrace(ex));
Assert.fail("Hook 3 should not timeout.");
}
}
};
Runnable hook4 = new Runnable() {
@Override
public void run() {
try {
sleep(3500);
LOG.info("Shutdown hook4 complete.");
Assert.fail("Hook 4 should timeout");
} catch (InterruptedException ex) {
LOG.info("Shutdown hook4 interrupted exception:",
ExceptionUtils.getStackTrace(ex));
}
}
};
mgr.addShutdownHook(hook1, 0);
Assert.assertTrue(mgr.hasShutdownHook(hook1));
Assert.assertEquals(1, mgr.getShutdownHooksInOrder().size());
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0));
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(0).getHook());
mgr.removeShutdownHook(hook1);
Assert.assertFalse(mgr.hasShutdownHook(hook1));
@ -55,8 +94,20 @@ public void run() {
Assert.assertTrue(mgr.hasShutdownHook(hook1));
Assert.assertTrue(mgr.hasShutdownHook(hook2));
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().size());
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0));
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1));
Assert.assertEquals(hook2, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(hook1, mgr.getShutdownHooksInOrder().get(1).getHook());
// Test hook finish without timeout
mgr.addShutdownHook(hook3, 2, 4, TimeUnit.SECONDS);
Assert.assertTrue(mgr.hasShutdownHook(hook3));
Assert.assertEquals(hook3, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(4, mgr.getShutdownHooksInOrder().get(0).getTimeout());
// Test hook finish with timeout
mgr.addShutdownHook(hook4, 3, 2, TimeUnit.SECONDS);
Assert.assertTrue(mgr.hasShutdownHook(hook4));
Assert.assertEquals(hook4, mgr.getShutdownHooksInOrder().get(0).getHook());
Assert.assertEquals(2, mgr.getShutdownHooksInOrder().get(0).getTimeout());
LOG.info("Shutdown starts here");
}
}