HDFS-11511. Support Timeout when checking single disk. Contributed by Hanisha Koneru.

This commit is contained in:
Hanisha Koneru 2017-03-15 18:01:45 -07:00 committed by Arpit Agarwal
parent 615ac09499
commit d69a82c89c
10 changed files with 1905 additions and 10 deletions

View File

@ -2659,4 +2659,24 @@ available under the Creative Commons By Attribution 3.0 License.
available upon request from time to time. For the avoidance of doubt,
this trademark restriction does not form part of this License.
Creative Commons may be contacted at https://creativecommons.org/.
Creative Commons may be contacted at https://creativecommons.org/.
--------------------------------------------------------------------------------
For: hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
/server/datanode/checker/AbstractFuture.java and
hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs
/server/datanode/checker/TimeoutFuture.java
Copyright (C) 2007 The Guava Authors
Licensed under the Apache License, Version 2.0 (the "License"); you may not
use this file except in compliance with the License. You may obtain a copy of
the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
License for the specific language governing permissions and limitations under
the License.

View File

@ -240,4 +240,16 @@
<Method name="getBlockLayoutRedundancy" />
<Bug pattern="BIT_IOR_OF_SIGNED_BYTE" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
<Bug pattern="DLS_DEAD_STORE_OF_CLASS_LITERAL" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
<Bug pattern="DLS_DEAD_LOCAL_STORE" />
</Match>
<Match>
<Class name="org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture" />
<Bug pattern="NS_DANGEROUS_NON_SHORT_CIRCUIT" />
</Match>
</FindBugsFilter>

View File

@ -91,6 +91,7 @@ public class DatasetVolumeChecker {
* Minimum time between two successive disk checks of a volume.
*/
private final long minDiskCheckGapMs;
private final long diskCheckTimeout;
/**
* Timestamp of the last check of all volumes.
@ -136,6 +137,17 @@ public DatasetVolumeChecker(Configuration conf, Timer timer)
+ minDiskCheckGapMs + " (should be >= 0)");
}
diskCheckTimeout = conf.getTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_DEFAULT,
TimeUnit.MILLISECONDS);
if (diskCheckTimeout < 0) {
throw new DiskErrorException("Invalid value configured for "
+ DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY + " - "
+ diskCheckTimeout + " (should be >= 0)");
}
lastAllVolumesCheck = timer.monotonicNow() - minDiskCheckGapMs;
if (maxVolumeFailuresTolerated < 0) {
@ -145,7 +157,8 @@ public DatasetVolumeChecker(Configuration conf, Timer timer)
}
delegateChecker = new ThrottledAsyncChecker<>(
timer, minDiskCheckGapMs, Executors.newCachedThreadPool(
timer, minDiskCheckGapMs, diskCheckTimeout,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("DataNode DiskChecker thread %d")
.setDaemon(true)

View File

@ -119,6 +119,7 @@ public StorageLocationChecker(Configuration conf, Timer timer)
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_KEY,
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_MIN_GAP_DEFAULT,
TimeUnit.MILLISECONDS),
0,
Executors.newCachedThreadPool(
new ThreadFactoryBuilder()
.setNameFormat("StorageLocationChecker thread %d")

View File

@ -38,6 +38,8 @@
import java.util.WeakHashMap;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
/**
@ -64,12 +66,14 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
* The ExecutorService used to schedule asynchronous checks.
*/
private final ListeningExecutorService executorService;
private final ScheduledExecutorService scheduledExecutorService;
/**
* The minimum gap in milliseconds between two successive checks
* of the same object. This is the throttle.
*/
private final long minMsBetweenChecks;
private final long diskCheckTimeout;
/**
* Map of checks that are currently in progress. Protected by the object
@ -86,12 +90,23 @@ public class ThrottledAsyncChecker<K, V> implements AsyncChecker<K, V> {
ThrottledAsyncChecker(final Timer timer,
final long minMsBetweenChecks,
final long diskCheckTimeout,
final ExecutorService executorService) {
this.timer = timer;
this.minMsBetweenChecks = minMsBetweenChecks;
this.diskCheckTimeout = diskCheckTimeout;
this.executorService = MoreExecutors.listeningDecorator(executorService);
this.checksInProgress = new HashMap<>();
this.completedChecks = new WeakHashMap<>();
if (this.diskCheckTimeout > 0) {
ScheduledThreadPoolExecutor scheduledThreadPoolExecutor = new
ScheduledThreadPoolExecutor(1);
this.scheduledExecutorService = MoreExecutors
.getExitingScheduledExecutorService(scheduledThreadPoolExecutor);
} else {
this.scheduledExecutorService = null;
}
}
/**
@ -120,13 +135,23 @@ public Optional<ListenableFuture<V>> schedule(Checkable<K, V> target,
}
}
final ListenableFuture<V> lf = executorService.submit(
final ListenableFuture<V> lfWithoutTimeout = executorService.submit(
new Callable<V>() {
@Override
public V call() throws Exception {
return target.check(context);
}
});
final ListenableFuture<V> lf;
if (diskCheckTimeout > 0) {
lf = TimeoutFuture
.create(lfWithoutTimeout, diskCheckTimeout, TimeUnit.MILLISECONDS,
scheduledExecutorService);
} else {
lf = lfWithoutTimeout;
}
checksInProgress.put(target, lf);
addResultCachingCallback(target, lf);
return Optional.of(lf);
@ -174,6 +199,16 @@ public void shutdownAndWait(long timeout, TimeUnit timeUnit)
executorService.shutdownNow();
executorService.awaitTermination(timeout, timeUnit);
}
if (scheduledExecutorService != null) {
// Try orderly shutdown
scheduledExecutorService.shutdown();
if (!scheduledExecutorService.awaitTermination(timeout, timeUnit)) {
// Interrupt executing tasks and wait again.
scheduledExecutorService.shutdownNow();
scheduledExecutorService.awaitTermination(timeout, timeUnit);
}
}
}
/**

View File

@ -0,0 +1,162 @@
/*
* Copyright (C) 2007 The Guava Authors
*
* Licensed under the Apache License, Version 2.0 (the "License"); you may
* not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
* WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
* License for the specific language governing permissions and limitations under
* the License.
*/
/**
* Some portions of this class have been modified to make it functional in this
* package.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Preconditions;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.hdfs.server.datanode.checker.AbstractFuture;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import javax.annotation.Nullable;
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
/**
* Implementation of {@code Futures#withTimeout}.
* <p>
* <p>Future that delegates to another but will finish early (via a
* {@link TimeoutException} wrapped in an {@link ExecutionException}) if the
* specified duration expires. The delegate future is interrupted and
* cancelled if it times out.
*/
final class TimeoutFuture<V> extends AbstractFuture.TrustedFuture<V> {
public static final Logger LOG = LoggerFactory.getLogger(
TimeoutFuture.class);
static <V> ListenableFuture<V> create(
ListenableFuture<V> delegate,
long time,
TimeUnit unit,
ScheduledExecutorService scheduledExecutor) {
TimeoutFuture<V> result = new TimeoutFuture<V>(delegate);
TimeoutFuture.Fire<V> fire = new TimeoutFuture.Fire<V>(result);
result.timer = scheduledExecutor.schedule(fire, time, unit);
delegate.addListener(fire, directExecutor());
return result;
}
/*
* Memory visibility of these fields. There are two cases to consider.
*
* 1. visibility of the writes to these fields to Fire.run:
*
* The initial write to delegateRef is made definitely visible via the
* semantics of addListener/SES.schedule. The later racy write in cancel()
* is not guaranteed to be observed, however that is fine since the
* correctness is based on the atomic state in our base class. The initial
* write to timer is never definitely visible to Fire.run since it is
* assigned after SES.schedule is called. Therefore Fire.run has to check
* for null. However, it should be visible if Fire.run is called by
* delegate.addListener since addListener is called after the assignment
* to timer, and importantly this is the main situation in which we need to
* be able to see the write.
*
* 2. visibility of the writes to an afterDone() call triggered by cancel():
*
* Since these fields are non-final that means that TimeoutFuture is not
* being 'safely published', thus a motivated caller may be able to expose
* the reference to another thread that would then call cancel() and be
* unable to cancel the delegate. There are a number of ways to solve this,
* none of which are very pretty, and it is currently believed to be a
* purely theoretical problem (since the other actions should supply
* sufficient write-barriers).
*/
@Nullable private ListenableFuture<V> delegateRef;
@Nullable private Future<?> timer;
private TimeoutFuture(ListenableFuture<V> delegate) {
this.delegateRef = Preconditions.checkNotNull(delegate);
}
/**
* A runnable that is called when the delegate or the timer completes.
*/
private static final class Fire<V> implements Runnable {
@Nullable
TimeoutFuture<V> timeoutFutureRef;
Fire(
TimeoutFuture<V> timeoutFuture) {
this.timeoutFutureRef = timeoutFuture;
}
@Override
public void run() {
// If either of these reads return null then we must be after a
// successful cancel or another call to this method.
TimeoutFuture<V> timeoutFuture = timeoutFutureRef;
if (timeoutFuture == null) {
return;
}
ListenableFuture<V> delegate = timeoutFuture.delegateRef;
if (delegate == null) {
return;
}
/*
* If we're about to complete the TimeoutFuture, we want to release our
* reference to it. Otherwise, we'll pin it (and its result) in memory
* until the timeout task is GCed. (The need to clear our reference to
* the TimeoutFuture is the reason we use a *static* nested class with
* a manual reference back to the "containing" class.)
*
* This has the nice-ish side effect of limiting reentrancy: run() calls
* timeoutFuture.setException() calls run(). That reentrancy would
* already be harmless, since timeoutFuture can be set (and delegate
* cancelled) only once. (And "set only once" is important for other
* reasons: run() can still be invoked concurrently in different threads,
* even with the above null checks.)
*/
timeoutFutureRef = null;
if (delegate.isDone()) {
timeoutFuture.setFuture(delegate);
} else {
try {
timeoutFuture.setException(
new TimeoutException("Future timed out: " + delegate));
} finally {
delegate.cancel(true);
}
}
}
}
@Override
protected void afterDone() {
maybePropagateCancellation(delegateRef);
Future<?> localTimer = timer;
// Try to cancel the timer as an optimization.
// timer may be null if this call to run was by the timer task since there
// is no happens-before edge between the assignment to timer and an
// execution of the timer task.
if (localTimer != null) {
localTimer.cancel(false);
}
delegateRef = null;
timer = null;
}
}

View File

@ -0,0 +1,134 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.server.datanode.StorageLocation;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeReference;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.FakeTimer;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.assertTrue;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Mockito.*;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
import org.slf4j.LoggerFactory;
import java.util.List;
import java.util.Set;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
/**
* Test that timeout is triggered during Disk Volume Checker.
*/
public class TestDatasetVolumeCheckerTimeout {
public static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestDatasetVolumeCheckerTimeout.class);
@Rule
public TestName testName = new TestName();
static Configuration conf;
private static final long DISK_CHECK_TIMEOUT = 10;
private static final long DISK_CHECK_TIME = 100;
static ReentrantLock lock = new ReentrantLock();
static {
conf = new HdfsConfiguration();
conf.setTimeDuration(
DFSConfigKeys.DFS_DATANODE_DISK_CHECK_TIMEOUT_KEY,
DISK_CHECK_TIMEOUT, TimeUnit.MILLISECONDS);
}
static FsVolumeSpi makeSlowVolume() throws Exception {
final FsVolumeSpi volume = mock(FsVolumeSpi.class);
final FsVolumeReference reference = mock(FsVolumeReference.class);
final StorageLocation location = mock(StorageLocation.class);
when(reference.getVolume()).thenReturn(volume);
when(volume.obtainReference()).thenReturn(reference);
when(volume.getStorageLocation()).thenReturn(location);
when(volume.check(anyObject())).thenAnswer(new Answer<VolumeCheckResult>() {
@Override
public VolumeCheckResult answer(
InvocationOnMock invocationOnMock) throws Throwable {
// Wait for the disk check to timeout and then release lock.
lock.lock();
lock.unlock();
return VolumeCheckResult.HEALTHY;
}
});
return volume;
}
@Test (timeout = 1000)
public void testDiskCheckTimeout() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final FsVolumeSpi volume = makeSlowVolume();
final DatasetVolumeChecker checker =
new DatasetVolumeChecker(conf, new FakeTimer());
final AtomicLong numCallbackInvocations = new AtomicLong(0);
lock.lock();
/**
* Request a check and ensure it triggered {@link FsVolumeSpi#check}.
*/
boolean result =
checker.checkVolume(volume, new DatasetVolumeChecker.Callback() {
@Override
public void call(Set<FsVolumeSpi> healthyVolumes,
Set<FsVolumeSpi> failedVolumes) {
numCallbackInvocations.incrementAndGet();
// Assert that the disk check registers a failed volume due to
// timeout
assertThat(healthyVolumes.size(), is(0));
assertThat(failedVolumes.size(), is(1));
}
});
// Wait for the callback
Thread.sleep(DISK_CHECK_TIME);
// Release lock
lock.unlock();
// Ensure that the check was invoked only once.
verify(volume, times(1)).check(anyObject());
assertThat(numCallbackInvocations.get(), is(1L));
}
}

View File

@ -59,7 +59,7 @@ public void testScheduler() throws Exception {
final NoOpCheckable target2 = new NoOpCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
// check target1 and ensure we get back the expected result.
@ -99,8 +99,8 @@ public void testCancellation() throws Exception {
final FakeTimer timer = new FakeTimer();
final LatchedCallback callback = new LatchedCallback(target);
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
Optional<ListenableFuture<Boolean>> olf =
checker.schedule(target, true);
@ -124,8 +124,8 @@ public void testConcurrentChecks() throws Exception {
LatchedCheckable target = new LatchedCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
getExecutorService());
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
final Optional<ListenableFuture<Boolean>> olf1 =
checker.schedule(target, true);
@ -167,7 +167,7 @@ public void testContextIsPassed() throws Exception {
final NoOpCheckable target1 = new NoOpCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
assertTrue(checker.schedule(target1, true).isPresent());
@ -203,7 +203,7 @@ public void testExceptionCaching() throws Exception {
final ThrowingCheckable target1 = new ThrowingCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP,
new ThrottledAsyncChecker<>(timer, MIN_ERROR_CHECK_GAP, 0,
getExecutorService());
assertTrue(checker.schedule(target1, true).isPresent());

View File

@ -0,0 +1,223 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hdfs.server.datanode.checker;
import com.google.common.base.Optional;
import com.google.common.util.concurrent.FutureCallback;
import com.google.common.util.concurrent.Futures;
import com.google.common.util.concurrent.ListenableFuture;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hdfs.DFSConfigKeys;
import org.apache.hadoop.hdfs.HdfsConfiguration;
import org.apache.hadoop.hdfs.LogVerificationAppender;
import org.apache.hadoop.hdfs.server.datanode.fsdataset.FsVolumeSpi;
import org.apache.hadoop.util.FakeTimer;
import org.apache.log4j.Logger;
import org.apache.log4j.spi.LoggingEvent;
import static org.hamcrest.CoreMatchers.is;
import static org.junit.Assert.*;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TestName;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.anyObject;
import static org.mockito.Matchers.anySet;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import org.slf4j.LoggerFactory;
import java.util.Set;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.locks.ReentrantLock;
public class TestThrottledAsyncCheckerTimeout {
public static final org.slf4j.Logger LOG =
LoggerFactory.getLogger(TestThrottledAsyncCheckerTimeout.class);
@Rule
public TestName testName = new TestName();
Configuration conf;
private static final long DISK_CHECK_TIMEOUT = 10;
private static final long DISK_CHECK_TIME = 100;
private ReentrantLock lock;
private ExecutorService getExecutorService() {
return new ScheduledThreadPoolExecutor(1);
}
@Before
public void initializeLock() {
lock = new ReentrantLock();
}
@Test (timeout = 1000)
public void testDiskCheckTimeout() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
// Acquire lock to halt checker. Release after timeout occurs.
lock.lock();
final Optional<ListenableFuture<Boolean>> olf = checker
.schedule(target, true);
final AtomicLong numCallbackInvocationsSuccess = new AtomicLong(0);
final AtomicLong numCallbackInvocationsFailure = new AtomicLong(0);
AtomicBoolean callbackResult = new AtomicBoolean(false);
final Throwable[] throwable = new Throwable[1];
assertTrue(olf.isPresent());
Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
numCallbackInvocationsSuccess.incrementAndGet();
callbackResult.set(true);
}
@Override
public void onFailure(Throwable t) {
throwable[0] = t;
numCallbackInvocationsFailure.incrementAndGet();
callbackResult.set(true);
}
});
while (!callbackResult.get()) {
// Wait for the callback
Thread.sleep(DISK_CHECK_TIMEOUT);
}
lock.unlock();
assertThat(numCallbackInvocationsFailure.get(), is(1L));
assertThat(numCallbackInvocationsSuccess.get(), is(0L));
assertTrue(throwable[0] instanceof TimeoutException);
}
@Test (timeout = 2000)
public void testDiskCheckTimeoutInvokesOneCallbackOnly() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
FutureCallback<Boolean> futureCallback = mock(FutureCallback.class);
// Acquire lock to halt disk checker. Release after timeout occurs.
lock.lock();
final Optional<ListenableFuture<Boolean>> olf1 = checker
.schedule(target, true);
assertTrue(olf1.isPresent());
Futures.addCallback(olf1.get(), futureCallback);
// Wait for the callback
Thread.sleep(DISK_CHECK_TIMEOUT);
// Verify that timeout results in only 1 onFailure call and 0 onSuccess
// calls.
verify(futureCallback, times(1)).onFailure(any());
verify(futureCallback, times(0)).onSuccess(any());
// Release lock so that target can acquire it.
lock.unlock();
final Optional<ListenableFuture<Boolean>> olf2 = checker
.schedule(target, true);
assertTrue(olf2.isPresent());
Futures.addCallback(olf2.get(), futureCallback);
// Wait for the callback
Thread.sleep(DISK_CHECK_TIME);
// Verify that normal check (dummy) results in only 1 onSuccess call.
// Number of times onFailure is invoked should remain the same - 1.
verify(futureCallback, times(1)).onFailure(any());
verify(futureCallback, times(1)).onSuccess(any());
}
@Test (timeout = 1000)
public void testTimeoutExceptionIsNotThrownForGoodDisk() throws Exception {
LOG.info("Executing {}", testName.getMethodName());
final DummyCheckable target = new DummyCheckable();
final FakeTimer timer = new FakeTimer();
ThrottledAsyncChecker<Boolean, Boolean> checker =
new ThrottledAsyncChecker<>(timer, 0, DISK_CHECK_TIMEOUT,
getExecutorService());
final Optional<ListenableFuture<Boolean>> olf = checker
.schedule(target, true);
AtomicBoolean callbackResult = new AtomicBoolean(false);
final Throwable[] throwable = new Throwable[1];
assertTrue(olf.isPresent());
Futures.addCallback(olf.get(), new FutureCallback<Boolean>() {
@Override
public void onSuccess(Boolean result) {
callbackResult.set(true);
}
@Override
public void onFailure(Throwable t) {
throwable[0] = t;
callbackResult.set(true);
}
});
while (!callbackResult.get()) {
// Wait for the callback
Thread.sleep(DISK_CHECK_TIMEOUT);
}
assertTrue(throwable[0] == null);
}
/**
* A dummy Checkable that just returns true after acquiring lock.
*/
protected class DummyCheckable implements Checkable<Boolean,Boolean> {
@Override
public Boolean check(Boolean context) throws Exception {
// Wait to acquire lock
lock.lock();
lock.unlock();
return true;
}
}
}