diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java index 7584d97b5f..b71c0156f0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/checker/ThrottledAsyncChecker.java @@ -187,28 +187,21 @@ public void onFailure(@Nonnull Throwable t) { /** * {@inheritDoc}. + * + * The results of in-progress checks are not useful during shutdown, + * so we optimize for faster shutdown by interrupt all actively + * executing checks. */ @Override public void shutdownAndWait(long timeout, TimeUnit timeUnit) throws InterruptedException { - // Try orderly shutdown. - executorService.shutdown(); - - if (!executorService.awaitTermination(timeout, timeUnit)) { - // Interrupt executing tasks and wait again. - executorService.shutdownNow(); - executorService.awaitTermination(timeout, timeUnit); - } if (scheduledExecutorService != null) { - // Try orderly shutdown - scheduledExecutorService.shutdown(); - - if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) { - // Interrupt executing tasks and wait again. - scheduledExecutorService.shutdownNow(); - scheduledExecutorService.awaitTermination(timeout, timeUnit); - } + scheduledExecutorService.shutdownNow(); + scheduledExecutorService.awaitTermination(timeout, timeUnit); } + + executorService.shutdownNow(); + executorService.awaitTermination(timeout, timeUnit); } /** diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java index 00b1af2a11..4ed6371afb 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/checker/TestThrottledAsyncChecker.java @@ -20,8 +20,6 @@ import com.google.common.base.Optional; import com.google.common.base.Supplier; -import com.google.common.util.concurrent.FutureCallback; -import com.google.common.util.concurrent.Futures; import com.google.common.util.concurrent.ListenableFuture; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.FakeTimer; @@ -29,12 +27,9 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import javax.annotation.Nonnull; -import java.util.concurrent.CountDownLatch; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.ScheduledThreadPoolExecutor; -import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicLong; import static org.junit.Assert.assertFalse; @@ -93,35 +88,9 @@ public void testScheduler() throws Exception { waitTestCheckableCheckCount(target2, 2L); } - @Test (timeout=60000) - public void testCancellation() throws Exception { - LatchedCheckable target = new LatchedCheckable(); - final FakeTimer timer = new FakeTimer(); - final LatchedCallback callback = new LatchedCallback(target); - ThrottledAsyncChecker checker = - new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, - getExecutorService()); - - Optional> olf = - checker.schedule(target, true); - if (olf.isPresent()) { - Futures.addCallback(olf.get(), callback); - } - - // Request immediate cancellation. - checker.shutdownAndWait(0, TimeUnit.MILLISECONDS); - try { - assertFalse(olf.get().get()); - fail("Failed to get expected InterruptedException"); - } catch (ExecutionException ee) { - assertTrue(ee.getCause() instanceof InterruptedException); - } - callback.failureLatch.await(); - } - @Test (timeout=60000) public void testConcurrentChecks() throws Exception { - LatchedCheckable target = new LatchedCheckable(); + final StalledCheckable target = new StalledCheckable(); final FakeTimer timer = new FakeTimer(); ThrottledAsyncChecker checker = new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, @@ -136,25 +105,6 @@ public void testConcurrentChecks() throws Exception { // for the first caller. assertTrue(olf1.isPresent()); assertFalse(olf2.isPresent()); - - // Unblock the latch and wait for it to finish execution. - target.latch.countDown(); - olf1.get().get(); - - GenericTestUtils.waitFor(new Supplier() { - @Override - public Boolean get() { - // We should get an absent Optional. - // This can take a short while until the internal callback in - // ThrottledAsyncChecker is scheduled for execution. - // Also this should not trigger a new check operation as the timer - // was not advanced. If it does trigger a new check then the test - // will fail with a timeout. - final Optional> olf3 = - checker.schedule(target, true); - return !olf3.isPresent(); - } - }, 100, 10000); } /** @@ -191,6 +141,32 @@ public Boolean get() { } }, 100, 10000); } + + /** + * Ensure that an exception thrown by the check routine is + * propagated. + * + * @throws Exception + */ + @Test(timeout=60000) + public void testExceptionIsPropagated() throws Exception { + final ThrowingCheckable target = new ThrowingCheckable(); + final FakeTimer timer = new FakeTimer(); + ThrottledAsyncChecker checker = + new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0, + getExecutorService()); + + final Optional> olf = + checker.schedule(target, true); + assertTrue(olf.isPresent()); + try { + olf.get().get(); + fail("Failed to get expected ExecutionException"); + } catch(ExecutionException ee) { + assertTrue(ee.getCause() instanceof DummyException); + } + } + /** * Ensure that the exception from a failed check is cached * and returned without re-running the check when the minimum @@ -245,6 +221,9 @@ public Boolean check(Boolean context) { } } + /** + * A Checkable that throws an exception when checked. + */ private static class ThrowingCheckable extends TestCheckableBase { @Override @@ -258,43 +237,14 @@ private static class DummyException extends Exception { } /** - * A checkable that hangs until signaled. + * A checkable that hangs forever when checked. */ - private static class LatchedCheckable + private static class StalledCheckable implements Checkable { - private final CountDownLatch latch = new CountDownLatch(1); - @Override public Boolean check(Boolean ignored) throws InterruptedException { - LOG.info("LatchedCheckable {} waiting.", this); - latch.await(); - return true; // Unreachable. - } - } - - /** - * A {@link FutureCallback} that counts its invocations. - */ - private static final class LatchedCallback - implements FutureCallback { - private final CountDownLatch successLatch = new CountDownLatch(1); - private final CountDownLatch failureLatch = new CountDownLatch(1); - private final Checkable target; - - private LatchedCallback(Checkable target) { - this.target = target; - } - - @Override - public void onSuccess(@Nonnull Boolean result) { - LOG.info("onSuccess callback invoked for {}", target); - successLatch.countDown(); - } - - @Override - public void onFailure(@Nonnull Throwable t) { - LOG.info("onFailure callback invoked for {} with exception", target, t); - failureLatch.countDown(); + Thread.sleep(Long.MAX_VALUE); + return false; // Unreachable. } } }