HDFS-11131. TestThrottledAsyncChecker#testCancellation is flaky.

This commit is contained in:
Arpit Agarwal 2017-04-05 16:01:54 -07:00
parent 32bb36b750
commit 8c57aeb5b4
2 changed files with 43 additions and 100 deletions

View File

@ -187,28 +187,21 @@ public void onFailure(@Nonnull Throwable t) {
/**
* {@inheritDoc}.
*
* The results of in-progress checks are not useful during shutdown,
* so we optimize for faster shutdown by interrupt all actively
* executing checks.
*/
@Override
public void shutdownAndWait(long timeout, TimeUnit timeUnit)
throws InterruptedException {
// Try orderly shutdown.
executorService.shutdown();
if (!executorService.awaitTermination(timeout, timeUnit)) {
// Interrupt executing tasks and wait again.
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
if (scheduledExecutorService != null) {
// Try orderly shutdown
scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) {
// Interrupt executing tasks and wait again.
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(timeout, timeUnit);
}
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(timeout, timeUnit);
}
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
/**

View File

@ -20,8 +20,6 @@
import com.google.common.base.Optional;
import com.google.common.base.Supplier;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.util.FakeTimer;
@ -29,12 +27,9 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nonnull;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
import static org.junit.Assert.assertFalse;
@ -93,35 +88,9 @@ public void testScheduler() throws Exception {
waitTestCheckableCheckCount(target2, 2L);
}
@Test (timeout=60000)
public void testCancellation() throws Exception {
LatchedCheckable target = new LatchedCheckable();
final FakeTimer timer = new FakeTimer();
final LatchedCallback callback = new LatchedCallback(target);
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
Optional<ListenableFuture<Boolean>> olf =
checker.schedule(target, true);
if (olf.isPresent()) {
Futures.addCallback(olf.get(), callback);
}
// Request immediate cancellation.
checker.shutdownAndWait(0, TimeUnit.MILLISECONDS);
try {
assertFalse(olf.get().get());
fail("Failed to get expected InterruptedException");
} catch (ExecutionException ee) {
assertTrue(ee.getCause() instanceof InterruptedException);
}
callback.failureLatch.await();
}
@Test (timeout=60000)
public void testConcurrentChecks() throws Exception {
LatchedCheckable target = new LatchedCheckable();
final StalledCheckable target = new StalledCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
@ -136,25 +105,6 @@ public void testConcurrentChecks() throws Exception {
// for the first caller.
assertTrue(olf1.isPresent());
assertFalse(olf2.isPresent());
// Unblock the latch and wait for it to finish execution.
target.latch.countDown();
olf1.get().get();
GenericTestUtils.waitFor(new Supplier<Boolean>() {
@Override
public Boolean get() {
// We should get an absent Optional.
// This can take a short while until the internal callback in
// ThrottledAsyncChecker is scheduled for execution.
// Also this should not trigger a new check operation as the timer
// was not advanced. If it does trigger a new check then the test
// will fail with a timeout.
final Optional<ListenableFuture<Boolean>> olf3 =
checker.schedule(target, true);
return !olf3.isPresent();
}
}, 100, 10000);
}
/**
@ -191,6 +141,32 @@ public Boolean get() {
}
}, 100, 10000);
}
/**
* Ensure that an exception thrown by the check routine is
* propagated.
*
* @throws Exception
*/
@Test(timeout=60000)
public void testExceptionIsPropagated() throws Exception {
final ThrowingCheckable target = new ThrowingCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
final Optional<ListenableFuture<Boolean>> olf =
checker.schedule(target, true);
assertTrue(olf.isPresent());
try {
olf.get().get();
fail("Failed to get expected ExecutionException");
} catch(ExecutionException ee) {
assertTrue(ee.getCause() instanceof DummyException);
}
}
/**
* Ensure that the exception from a failed check is cached
* and returned without re-running the check when the minimum
@ -245,6 +221,9 @@ public Boolean check(Boolean context) {
}
}
/**
* A Checkable that throws an exception when checked.
*/
private static class ThrowingCheckable
extends TestCheckableBase {
@Override
@ -258,43 +237,14 @@ private static class DummyException extends Exception {
}
/**
* A checkable that hangs until signaled.
* A checkable that hangs forever when checked.
*/
private static class LatchedCheckable
private static class StalledCheckable
implements Checkable<Boolean, Boolean> {
private final CountDownLatch latch = new CountDownLatch(1);
@Override
public Boolean check(Boolean ignored) throws InterruptedException {
LOG.info("LatchedCheckable {} waiting.", this);
latch.await();
return true; // Unreachable.
}
}
/**
* A {@link FutureCallback} that counts its invocations.
*/
private static final class LatchedCallback
implements FutureCallback<Boolean> {
private final CountDownLatch successLatch = new CountDownLatch(1);
private final CountDownLatch failureLatch = new CountDownLatch(1);
private final Checkable target;
private LatchedCallback(Checkable target) {
this.target = target;
}
@Override
public void onSuccess(@Nonnull Boolean result) {
LOG.info("onSuccess callback invoked for {}", target);
successLatch.countDown();
}
@Override
public void onFailure(@Nonnull Throwable t) {
LOG.info("onFailure callback invoked for {} with exception", target, t);
failureLatch.countDown();
Thread.sleep(Long.MAX_VALUE);
return false; // Unreachable.
}
}
}