HADOOP-7140. IPC Reader threads do not stop when server stops. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1070790 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-02-15 07:16:15 +00:00
parent 58ed6ab095
commit ae8b2310d8
3 changed files with 69 additions and 8 deletions

View File

@ -489,6 +489,8 @@ Release 0.22.0 - Unreleased
HADOOP-6642. Fix javac, javadoc, findbugs warnings related to security work.
(Chris Douglas, Po Cheung via shv)
HADOOP-7140. IPC Reader threads do not stop when server stops (todd)
Release 0.21.1 - Unreleased
IMPROVEMENTS

View File

@ -299,7 +299,6 @@ private class Listener extends Thread {
private long cleanupInterval = 10000; //the minimum interval between
//two cleanup runs
private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128);
private ExecutorService readPool;
public Listener() throws IOException {
address = new InetSocketAddress(bindAddress, port);
@ -313,12 +312,12 @@ public Listener() throws IOException {
// create a selector;
selector= Selector.open();
readers = new Reader[readThreads];
readPool = Executors.newFixedThreadPool(readThreads);
for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open();
Reader reader = new Reader(readSelector);
Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port,
readSelector);
readers[i] = reader;
readPool.execute(reader);
reader.start();
}
// Register accepts on the server socket with the selector.
@ -327,15 +326,16 @@ public Listener() throws IOException {
this.setDaemon(true);
}
private class Reader implements Runnable {
private class Reader extends Thread {
private volatile boolean adding = false;
private Selector readSelector = null;
Reader(Selector readSelector) {
Reader(String name, Selector readSelector) {
super(name);
this.readSelector = readSelector;
}
public void run() {
LOG.info("Starting SocketReader");
LOG.info("Starting " + getName());
synchronized (this) {
while (running) {
SelectionKey key = null;
@ -389,6 +389,16 @@ public synchronized void finishAdd() {
adding = false;
this.notify();
}
void shutdown() {
assert !running;
readSelector.wakeup();
try {
join();
} catch (InterruptedException ie) {
Thread.currentThread().interrupt();
}
}
}
/** cleanup connections from connectionList. Choose a random range
* to scan and also have a limit on the number of the connections
@ -577,7 +587,9 @@ synchronized void doStop() {
LOG.info(getName() + ":Exception in closing listener socket. " + e);
}
}
readPool.shutdown();
for (Reader r : readers) {
r.shutdown();
}
}
synchronized Selector getSelector() { return selector; }

View File

@ -21,6 +21,9 @@
import java.io.IOException;
import java.net.ConnectException;
import java.net.InetSocketAddress;
import java.lang.management.ManagementFactory;
import java.lang.management.ThreadInfo;
import java.lang.management.ThreadMXBean;
import java.lang.reflect.Method;
import junit.framework.TestCase;
@ -536,6 +539,50 @@ public void testErrorMsgForInsecureClient() throws Exception {
}
assertTrue(succeeded);
}
/**
* Count the number of threads that have a stack frame containing
* the given string
*/
private static int countThreads(String search) {
ThreadMXBean threadBean = ManagementFactory.getThreadMXBean();
int count = 0;
ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20);
for (ThreadInfo info : infos) {
if (info == null) continue;
for (StackTraceElement elem : info.getStackTrace()) {
if (elem.getClassName().contains(search)) {
count++;
break;
}
}
}
return count;
}
/**
* Test that server.stop() properly stops all threads
*/
public void testStopsAllThreads() throws Exception {
int threadsBefore = countThreads("Server$Listener$Reader");
assertEquals("Expect no Reader threads running before test",
0, threadsBefore);
final Server server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
server.start();
try {
int threadsRunning = countThreads("Server$Listener$Reader");
assertTrue(threadsRunning > 0);
} finally {
server.stop();
}
int threadsAfter = countThreads("Server$Listener$Reader");
assertEquals("Expect no Reader threads left running after test",
0, threadsAfter);
}
public static void main(String[] args) throws Exception {