diff --git a/CHANGES.txt b/CHANGES.txt index a0bd91c4c9..46fe53305e 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -489,6 +489,8 @@ Release 0.22.0 - Unreleased HADOOP-6642. Fix javac, javadoc, findbugs warnings related to security work. (Chris Douglas, Po Cheung via shv) + HADOOP-7140. IPC Reader threads do not stop when server stops (todd) + Release 0.21.1 - Unreleased IMPROVEMENTS diff --git a/src/java/org/apache/hadoop/ipc/Server.java b/src/java/org/apache/hadoop/ipc/Server.java index f0813355d5..3dbeb02aed 100644 --- a/src/java/org/apache/hadoop/ipc/Server.java +++ b/src/java/org/apache/hadoop/ipc/Server.java @@ -299,7 +299,6 @@ private class Listener extends Thread { private long cleanupInterval = 10000; //the minimum interval between //two cleanup runs private int backlogLength = conf.getInt("ipc.server.listen.queue.size", 128); - private ExecutorService readPool; public Listener() throws IOException { address = new InetSocketAddress(bindAddress, port); @@ -313,12 +312,12 @@ public Listener() throws IOException { // create a selector; selector= Selector.open(); readers = new Reader[readThreads]; - readPool = Executors.newFixedThreadPool(readThreads); for (int i = 0; i < readThreads; i++) { Selector readSelector = Selector.open(); - Reader reader = new Reader(readSelector); + Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port, + readSelector); readers[i] = reader; - readPool.execute(reader); + reader.start(); } // Register accepts on the server socket with the selector. @@ -327,15 +326,16 @@ public Listener() throws IOException { this.setDaemon(true); } - private class Reader implements Runnable { + private class Reader extends Thread { private volatile boolean adding = false; private Selector readSelector = null; - Reader(Selector readSelector) { + Reader(String name, Selector readSelector) { + super(name); this.readSelector = readSelector; } public void run() { - LOG.info("Starting SocketReader"); + LOG.info("Starting " + getName()); synchronized (this) { while (running) { SelectionKey key = null; @@ -389,6 +389,16 @@ public synchronized void finishAdd() { adding = false; this.notify(); } + + void shutdown() { + assert !running; + readSelector.wakeup(); + try { + join(); + } catch (InterruptedException ie) { + Thread.currentThread().interrupt(); + } + } } /** cleanup connections from connectionList. Choose a random range * to scan and also have a limit on the number of the connections @@ -577,7 +587,9 @@ synchronized void doStop() { LOG.info(getName() + ":Exception in closing listener socket. " + e); } } - readPool.shutdown(); + for (Reader r : readers) { + r.shutdown(); + } } synchronized Selector getSelector() { return selector; } diff --git a/src/test/core/org/apache/hadoop/ipc/TestRPC.java b/src/test/core/org/apache/hadoop/ipc/TestRPC.java index 554eed8c68..5f28480584 100644 --- a/src/test/core/org/apache/hadoop/ipc/TestRPC.java +++ b/src/test/core/org/apache/hadoop/ipc/TestRPC.java @@ -21,6 +21,9 @@ import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; +import java.lang.management.ManagementFactory; +import java.lang.management.ThreadInfo; +import java.lang.management.ThreadMXBean; import java.lang.reflect.Method; import junit.framework.TestCase; @@ -536,6 +539,50 @@ public void testErrorMsgForInsecureClient() throws Exception { } assertTrue(succeeded); } + + /** + * Count the number of threads that have a stack frame containing + * the given string + */ + private static int countThreads(String search) { + ThreadMXBean threadBean = ManagementFactory.getThreadMXBean(); + + int count = 0; + ThreadInfo[] infos = threadBean.getThreadInfo(threadBean.getAllThreadIds(), 20); + for (ThreadInfo info : infos) { + if (info == null) continue; + for (StackTraceElement elem : info.getStackTrace()) { + if (elem.getClassName().contains(search)) { + count++; + break; + } + } + } + return count; + } + + + /** + * Test that server.stop() properly stops all threads + */ + public void testStopsAllThreads() throws Exception { + int threadsBefore = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads running before test", + 0, threadsBefore); + + final Server server = RPC.getServer(TestProtocol.class, + new TestImpl(), ADDRESS, 0, 5, true, conf, null); + server.start(); + try { + int threadsRunning = countThreads("Server$Listener$Reader"); + assertTrue(threadsRunning > 0); + } finally { + server.stop(); + } + int threadsAfter = countThreads("Server$Listener$Reader"); + assertEquals("Expect no Reader threads left running after test", + 0, threadsAfter); + } public static void main(String[] args) throws Exception {