HADOOP-7146. RPC server leaks file descriptors. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1127811 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2011-05-26 07:48:03 +00:00
parent 42a185b57d
commit 2f6c03ad54
3 changed files with 87 additions and 38 deletions

View File

@ -732,6 +732,8 @@ Release 0.22.0 - Unreleased
HADOOP-7287. Configuration deprecation mechanism doesn't work properly for HADOOP-7287. Configuration deprecation mechanism doesn't work properly for
GenericOptionsParser and Tools. (Aaron T. Myers via todd) GenericOptionsParser and Tools. (Aaron T. Myers via todd)
HADOOP-7146. RPC server leaks file descriptors (todd)
Release 0.21.1 - Unreleased Release 0.21.1 - Unreleased
IMPROVEMENTS IMPROVEMENTS

View File

@ -322,9 +322,8 @@ public Listener() throws IOException {
selector= Selector.open(); selector= Selector.open();
readers = new Reader[readThreads]; readers = new Reader[readThreads];
for (int i = 0; i < readThreads; i++) { for (int i = 0; i < readThreads; i++) {
Selector readSelector = Selector.open(); Reader reader = new Reader(
Reader reader = new Reader("Socket Reader #" + (i + 1) + " for port " + port, "Socket Reader #" + (i + 1) + " for port " + port);
readSelector);
readers[i] = reader; readers[i] = reader;
reader.start(); reader.start();
} }
@ -337,15 +336,28 @@ public Listener() throws IOException {
private class Reader extends Thread { private class Reader extends Thread {
private volatile boolean adding = false; private volatile boolean adding = false;
private Selector readSelector = null; private final Selector readSelector;
Reader(String name, Selector readSelector) { Reader(String name) throws IOException {
super(name); super(name);
this.readSelector = readSelector;
this.readSelector = Selector.open();
} }
public void run() { public void run() {
LOG.info("Starting " + getName()); LOG.info("Starting " + getName());
synchronized (this) { try {
doRunLoop();
} finally {
try {
readSelector.close();
} catch (IOException ioe) {
LOG.error("Error closing read selector in " + this.getName(), ioe);
}
}
}
private synchronized void doRunLoop() {
while (running) { while (running) {
SelectionKey key = null; SelectionKey key = null;
try { try {
@ -367,15 +379,13 @@ public void run() {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " + LOG.info(getName() + " unexpectedly interrupted", e);
StringUtils.stringifyException(e));
} }
} catch (IOException ex) { } catch (IOException ex) {
LOG.error("Error in Reader", ex); LOG.error("Error in Reader", ex);
} }
} }
} }
}
/** /**
* This gets reader into the state that waits for the new channel * This gets reader into the state that waits for the new channel
@ -614,7 +624,7 @@ Reader getReader() {
// Sends responses of RPC back to clients. // Sends responses of RPC back to clients.
private class Responder extends Thread { private class Responder extends Thread {
private Selector writeSelector; private final Selector writeSelector;
private int pending; // connections waiting to register private int pending; // connections waiting to register
final static int PURGE_INTERVAL = 900000; // 15mins final static int PURGE_INTERVAL = 900000; // 15mins
@ -630,6 +640,19 @@ private class Responder extends Thread {
public void run() { public void run() {
LOG.info(getName() + ": starting"); LOG.info(getName() + ": starting");
SERVER.set(Server.this); SERVER.set(Server.this);
try {
doRunLoop();
} finally {
LOG.info("Stopping " + this.getName());
try {
writeSelector.close();
} catch (IOException ioe) {
LOG.error("Couldn't close write selector in " + this.getName(), ioe);
}
}
}
private void doRunLoop() {
long lastPurgeTime = 0; // last check for old calls. long lastPurgeTime = 0; // last check for old calls.
while (running) { while (running) {
@ -691,11 +714,9 @@ public void run() {
LOG.warn("Out of Memory in server select", e); LOG.warn("Out of Memory in server select", e);
try { Thread.sleep(60000); } catch (Exception ie) {} try { Thread.sleep(60000); } catch (Exception ie) {}
} catch (Exception e) { } catch (Exception e) {
LOG.warn("Exception in Responder " + LOG.warn("Exception in Responder", e);
StringUtils.stringifyException(e));
} }
} }
LOG.info("Stopping " + this.getName());
} }
private void doAsyncWrite(SelectionKey key) throws IOException { private void doAsyncWrite(SelectionKey key) throws IOException {
@ -1460,12 +1481,10 @@ public Writable run() throws Exception {
} }
} catch (InterruptedException e) { } catch (InterruptedException e) {
if (running) { // unexpected -- log it if (running) { // unexpected -- log it
LOG.info(getName() + " caught: " + LOG.info(getName() + " unexpectedly interrupted", e);
StringUtils.stringifyException(e));
} }
} catch (Exception e) { } catch (Exception e) {
LOG.info(getName() + " caught: " + LOG.info(getName() + " caught an exception", e);
StringUtils.stringifyException(e));
} }
} }
LOG.info(getName() + ": exiting"); LOG.info(getName() + ": exiting");

View File

@ -27,6 +27,7 @@
import java.util.Random; import java.util.Random;
import java.io.DataInput; import java.io.DataInput;
import java.io.File;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.net.InetSocketAddress;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
@ -36,6 +37,7 @@
import static org.mockito.Mockito.*; import static org.mockito.Mockito.*;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.junit.Assume;
/** Unit tests for IPC. */ /** Unit tests for IPC. */
public class TestIPC extends TestCase { public class TestIPC extends TestCase {
@ -55,6 +57,9 @@ public class TestIPC extends TestCase {
private static final String ADDRESS = "0.0.0.0"; private static final String ADDRESS = "0.0.0.0";
/** Directory where we can count open file descriptors on Linux */
private static final File FD_DIR = new File("/proc/self/fd");
private static class TestServer extends Server { private static class TestServer extends Server {
private boolean sleep; private boolean sleep;
@ -354,6 +359,29 @@ public void testIpcTimeout() throws Exception {
addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf); addr, null, null, 3*PING_INTERVAL+MIN_SLEEP_TIME, conf);
} }
/**
* Check that file descriptors aren't leaked by starting
* and stopping IPC servers.
*/
public void testSocketLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux
long startFds = countOpenFileDescriptors();
for (int i = 0; i < 50; i++) {
Server server = new TestServer(1, true);
server.start();
server.stop();
}
long endFds = countOpenFileDescriptors();
assertTrue("Leaked " + (endFds - startFds) + " file descriptors",
endFds - startFds < 20);
}
private long countOpenFileDescriptors() {
return FD_DIR.list().length;
}
public static void main(String[] args) throws Exception { public static void main(String[] args) throws Exception {
//new TestIPC("test").testSerial(5, false, 2, 10, 1000); //new TestIPC("test").testSerial(5, false, 2, 10, 1000);