HADOOP-17040. Fix intermittent failure of ITestBlockingThreadPoolExecutorService. (#2020)
(cherry picked from commit 9685314633
)
This commit is contained in:
parent
6b54f259e7
commit
4d30c395f7
@ -31,11 +31,11 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.util.concurrent.Callable;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertFalse;
|
||||
|
||||
/**
|
||||
* Basic test for S3A's blocking executor service.
|
||||
@ -92,11 +92,12 @@ public void testSubmitRunnable() throws Exception {
|
||||
*/
|
||||
protected void verifyQueueSize(ExecutorService executorService,
|
||||
int expectedQueueSize) {
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
CountDownLatch latch = new CountDownLatch(1);
|
||||
for (int i = 0; i < expectedQueueSize; i++) {
|
||||
executorService.submit(sleeper);
|
||||
assertDidntBlock(stopWatch);
|
||||
executorService.submit(new LatchedSleeper(latch));
|
||||
}
|
||||
StopWatch stopWatch = new StopWatch().start();
|
||||
latch.countDown();
|
||||
executorService.submit(sleeper);
|
||||
assertDidBlock(stopWatch);
|
||||
}
|
||||
@ -124,15 +125,6 @@ public void testChainedQueue() throws Throwable {
|
||||
|
||||
// Helper functions, etc.
|
||||
|
||||
private void assertDidntBlock(StopWatch sw) {
|
||||
try {
|
||||
assertFalse("Non-blocking call took too long.",
|
||||
sw.now(TimeUnit.MILLISECONDS) > BLOCKING_THRESHOLD_MSEC);
|
||||
} finally {
|
||||
sw.reset().start();
|
||||
}
|
||||
}
|
||||
|
||||
private void assertDidBlock(StopWatch sw) {
|
||||
try {
|
||||
if (sw.now(TimeUnit.MILLISECONDS) < BLOCKING_THRESHOLD_MSEC) {
|
||||
@ -164,6 +156,25 @@ public Integer call() throws Exception {
|
||||
}
|
||||
};
|
||||
|
||||
private class LatchedSleeper implements Runnable {
|
||||
private final CountDownLatch latch;
|
||||
|
||||
LatchedSleeper(CountDownLatch latch) {
|
||||
this.latch = latch;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
latch.await();
|
||||
Thread.sleep(TASK_SLEEP_MSEC);
|
||||
} catch (InterruptedException e) {
|
||||
LOG.info("Thread {} interrupted.", Thread.currentThread().getName());
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Helper function to create thread pool under test.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user