HADOOP-7298. Add test utility for writing multi-threaded tests. Contributed by Todd Lipcon and Harsh J Chouraria.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1150910 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
85461fb0fa
commit
672064500c
@ -280,6 +280,9 @@ Trunk (unreleased changes)
|
||||
HADOOP-7463. Adding a configuration parameter to SecurityInfo interface.
|
||||
(mahadev)
|
||||
|
||||
HADOOP-7298. Add test utility for writing multi-threaded tests. (todd and
|
||||
Harsh J Chouraria via todd)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
HADOOP-7333. Performance improvement in PureJavaCrc32. (Eric Caspole
|
||||
|
@ -0,0 +1,225 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
|
||||
/**
|
||||
* A utility to easily test threaded/synchronized code.
|
||||
* Utility works by letting you add threads that do some work to a
|
||||
* test context object, and then lets you kick them all off to stress test
|
||||
* your parallel code.
|
||||
*
|
||||
* Also propagates thread exceptions back to the runner, to let you verify.
|
||||
*
|
||||
* An example:
|
||||
*
|
||||
* <code>
|
||||
* final AtomicInteger threadsRun = new AtomicInteger();
|
||||
*
|
||||
* TestContext ctx = new TestContext();
|
||||
* // Add 3 threads to test.
|
||||
* for (int i = 0; i < 3; i++) {
|
||||
* ctx.addThread(new TestingThread(ctx) {
|
||||
* @Override
|
||||
* public void doWork() throws Exception {
|
||||
* threadsRun.incrementAndGet();
|
||||
* }
|
||||
* });
|
||||
* }
|
||||
* ctx.startThreads();
|
||||
* // Set a timeout period for threads to complete.
|
||||
* ctx.waitFor(30000);
|
||||
* assertEquals(3, threadsRun.get());
|
||||
* </code>
|
||||
*
|
||||
* For repetitive actions, use the {@link MultithreadedTestUtil.RepeatingThread}
|
||||
* instead.
|
||||
*
|
||||
* (More examples can be found in {@link TestMultithreadedTestUtil})
|
||||
*/
|
||||
public abstract class MultithreadedTestUtil {
|
||||
|
||||
public static final Log LOG =
|
||||
LogFactory.getLog(MultithreadedTestUtil.class);
|
||||
|
||||
/**
|
||||
* TestContext is used to setup the multithreaded test runner.
|
||||
* It lets you add threads, run them, wait upon or stop them.
|
||||
*/
|
||||
public static class TestContext {
|
||||
private Throwable err = null;
|
||||
private boolean stopped = false;
|
||||
private Set<TestingThread> testThreads = new HashSet<TestingThread>();
|
||||
private Set<TestingThread> finishedThreads = new HashSet<TestingThread>();
|
||||
|
||||
/**
|
||||
* Check if the context can run threads.
|
||||
* Can't if its been stopped and contains an error.
|
||||
* @return true if it can run, false if it can't.
|
||||
*/
|
||||
public synchronized boolean shouldRun() {
|
||||
return !stopped && err == null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Add a thread to the context for running.
|
||||
* Threads can be of type {@link MultithreadedTestUtil.TestingThread}
|
||||
* or {@link MultithreadedTestUtil.RepeatingTestThread}
|
||||
* or other custom derivatives of the former.
|
||||
* @param t the thread to add for running.
|
||||
*/
|
||||
public void addThread(TestingThread t) {
|
||||
testThreads.add(t);
|
||||
}
|
||||
|
||||
/**
|
||||
* Starts all test threads that have been added so far.
|
||||
*/
|
||||
public void startThreads() {
|
||||
for (TestingThread t : testThreads) {
|
||||
t.start();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Waits for threads to finish or error out.
|
||||
* @param millis the number of milliseconds to wait
|
||||
* for threads to complete.
|
||||
* @throws Exception if one or more of the threads
|
||||
* have thrown up an error.
|
||||
*/
|
||||
public synchronized void waitFor(long millis) throws Exception {
|
||||
long endTime = System.currentTimeMillis() + millis;
|
||||
while (shouldRun() &&
|
||||
finishedThreads.size() < testThreads.size()) {
|
||||
long left = endTime - System.currentTimeMillis();
|
||||
if (left <= 0) break;
|
||||
checkException();
|
||||
wait(left);
|
||||
}
|
||||
checkException();
|
||||
}
|
||||
|
||||
/**
|
||||
* Checks for thread exceptions, and if they've occurred
|
||||
* throws them as RuntimeExceptions in a deferred manner.
|
||||
*/
|
||||
private synchronized void checkException() throws Exception {
|
||||
if (err != null) {
|
||||
throw new RuntimeException("Deferred", err);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by {@link MultithreadedTestUtil.TestingThread}s to signal
|
||||
* a failed thread.
|
||||
* @param t the thread that failed.
|
||||
*/
|
||||
public synchronized void threadFailed(Throwable t) {
|
||||
if (err == null) err = t;
|
||||
LOG.error("Failed!", err);
|
||||
notify();
|
||||
}
|
||||
|
||||
/**
|
||||
* Called by {@link MultithreadedTestUtil.TestingThread}s to signal
|
||||
* a successful completion.
|
||||
* @param t the thread that finished.
|
||||
*/
|
||||
public synchronized void threadDone(TestingThread t) {
|
||||
finishedThreads.add(t);
|
||||
notify();
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns after stopping all threads by joining them back.
|
||||
* @throws Exception in case a thread terminated with a failure.
|
||||
*/
|
||||
public void stop() throws Exception {
|
||||
synchronized (this) {
|
||||
stopped = true;
|
||||
}
|
||||
for (TestingThread t : testThreads) {
|
||||
t.join();
|
||||
}
|
||||
checkException();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A thread that can be added to a test context, and properly
|
||||
* passes exceptions through.
|
||||
*/
|
||||
public static abstract class TestingThread extends Thread {
|
||||
protected final TestContext ctx;
|
||||
protected boolean stopped;
|
||||
|
||||
public TestingThread(TestContext ctx) {
|
||||
this.ctx = ctx;
|
||||
}
|
||||
|
||||
public void run() {
|
||||
try {
|
||||
doWork();
|
||||
} catch (Throwable t) {
|
||||
ctx.threadFailed(t);
|
||||
}
|
||||
ctx.threadDone(this);
|
||||
}
|
||||
|
||||
/**
|
||||
* User method to add any code to test thread behavior of.
|
||||
* @throws Exception throw an exception if a failure has occurred.
|
||||
*/
|
||||
public abstract void doWork() throws Exception;
|
||||
|
||||
protected void stopTestThread() {
|
||||
this.stopped = true;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A test thread that performs a repeating operation.
|
||||
*/
|
||||
public static abstract class RepeatingTestThread extends TestingThread {
|
||||
public RepeatingTestThread(TestContext ctx) {
|
||||
super(ctx);
|
||||
}
|
||||
|
||||
/**
|
||||
* Repeats a given user action until the context is asked to stop
|
||||
* or meets an error.
|
||||
*/
|
||||
public final void doWork() throws Exception {
|
||||
while (ctx.shouldRun() && !stopped) {
|
||||
doAnAction();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* User method for any code to test repeating behavior of (as threads).
|
||||
* @throws Exception throw an exception if a failure has occured.
|
||||
*/
|
||||
public abstract void doAnAction() throws Exception;
|
||||
}
|
||||
}
|
@ -0,0 +1,138 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.test;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.concurrent.atomic.AtomicInteger;
|
||||
|
||||
import org.junit.Test;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.RepeatingTestThread;
|
||||
|
||||
public class TestMultithreadedTestUtil {
|
||||
|
||||
private static final String FAIL_MSG =
|
||||
"Inner thread fails an assert";
|
||||
|
||||
@Test
|
||||
public void testNoErrors() throws Exception {
|
||||
final AtomicInteger threadsRun = new AtomicInteger();
|
||||
|
||||
TestContext ctx = new TestContext();
|
||||
for (int i = 0; i < 3; i++) {
|
||||
ctx.addThread(new TestingThread(ctx) {
|
||||
@Override
|
||||
public void doWork() throws Exception {
|
||||
threadsRun.incrementAndGet();
|
||||
}
|
||||
});
|
||||
}
|
||||
assertEquals(0, threadsRun.get());
|
||||
ctx.startThreads();
|
||||
long st = System.currentTimeMillis();
|
||||
ctx.waitFor(30000);
|
||||
long et = System.currentTimeMillis();
|
||||
|
||||
// All threads should have run
|
||||
assertEquals(3, threadsRun.get());
|
||||
// Test shouldn't have waited the full 30 seconds, since
|
||||
// the threads exited faster than that.
|
||||
assertTrue("Test took " + (et - st) + "ms",
|
||||
et - st < 5000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadFails() throws Exception {
|
||||
TestContext ctx = new TestContext();
|
||||
ctx.addThread(new TestingThread(ctx) {
|
||||
@Override
|
||||
public void doWork() throws Exception {
|
||||
fail(FAIL_MSG);
|
||||
}
|
||||
});
|
||||
ctx.startThreads();
|
||||
long st = System.currentTimeMillis();
|
||||
try {
|
||||
ctx.waitFor(30000);
|
||||
fail("waitFor did not throw");
|
||||
} catch (RuntimeException rte) {
|
||||
// expected
|
||||
assertEquals(FAIL_MSG, rte.getCause().getMessage());
|
||||
}
|
||||
long et = System.currentTimeMillis();
|
||||
// Test shouldn't have waited the full 30 seconds, since
|
||||
// the thread throws faster than that
|
||||
assertTrue("Test took " + (et - st) + "ms",
|
||||
et - st < 5000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testThreadThrowsCheckedException() throws Exception {
|
||||
TestContext ctx = new TestContext();
|
||||
ctx.addThread(new TestingThread(ctx) {
|
||||
@Override
|
||||
public void doWork() throws Exception {
|
||||
throw new IOException("my ioe");
|
||||
}
|
||||
});
|
||||
ctx.startThreads();
|
||||
long st = System.currentTimeMillis();
|
||||
try {
|
||||
ctx.waitFor(30000);
|
||||
fail("waitFor did not throw");
|
||||
} catch (RuntimeException rte) {
|
||||
// expected
|
||||
assertEquals("my ioe", rte.getCause().getMessage());
|
||||
}
|
||||
long et = System.currentTimeMillis();
|
||||
// Test shouldn't have waited the full 30 seconds, since
|
||||
// the thread throws faster than that
|
||||
assertTrue("Test took " + (et - st) + "ms",
|
||||
et - st < 5000);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRepeatingThread() throws Exception {
|
||||
final AtomicInteger counter = new AtomicInteger();
|
||||
|
||||
TestContext ctx = new TestContext();
|
||||
ctx.addThread(new RepeatingTestThread(ctx) {
|
||||
@Override
|
||||
public void doAnAction() throws Exception {
|
||||
counter.incrementAndGet();
|
||||
}
|
||||
});
|
||||
ctx.startThreads();
|
||||
long st = System.currentTimeMillis();
|
||||
ctx.waitFor(3000);
|
||||
ctx.stop();
|
||||
long et = System.currentTimeMillis();
|
||||
long elapsed = et - st;
|
||||
|
||||
// Test should have waited just about 3 seconds
|
||||
assertTrue("Test took " + (et - st) + "ms",
|
||||
Math.abs(elapsed - 3000) < 500);
|
||||
// Counter should have been incremented lots of times in 3 full seconds
|
||||
assertTrue("Counter value = " + counter.get(),
|
||||
counter.get() > 1000);
|
||||
}
|
||||
|
||||
}
|
Loading…
x
Reference in New Issue
Block a user