HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1541736 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Daryn Sharp 2013-11-13 21:37:21 +00:00
parent c02629f585
commit e43255302a
4 changed files with 185 additions and 30 deletions

View File

@ -284,7 +284,9 @@ Trunk (Unreleased)
HADOOP-7761. Improve the performance of raw comparisons. (todd) HADOOP-7761. Improve the performance of raw comparisons. (todd)
HADOOP-8589 ViewFs tests fail when tests and home dirs are nested (sanjay Radia) HADOOP-8589. ViewFs tests fail when tests and home dirs are nested (sanjay Radia)
HADOOP-9956. RPC listener inefficiently assigns connections to readers (daryn)
Release 2.3.0 - UNRELEASED Release 2.3.0 - UNRELEASED

View File

@ -65,6 +65,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
/** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */
public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1;
/** Number of pending connections that may be queued per socket reader */
public static final String IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY =
"ipc.server.read.connection-queue.size";
/** Default value for IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE */
public static final int IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT =
100;
public static final String IPC_MAXIMUM_DATA_LENGTH = public static final String IPC_MAXIMUM_DATA_LENGTH =
"ipc.maximum.data.length"; "ipc.maximum.data.length";

View File

@ -345,6 +345,7 @@ public static boolean isRpcInvocation() {
private int port; // port we listen on private int port; // port we listen on
private int handlerCount; // number of handler threads private int handlerCount; // number of handler threads
private int readThreads; // number of read threads private int readThreads; // number of read threads
private int readerPendingConnectionQueue; // number of connections to queue per read thread
private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request private Class<? extends Writable> rpcRequestClass; // class used for deserializing the rpc request
private int maxIdleTime; // the maximum idle time after private int maxIdleTime; // the maximum idle time after
// which a client may be disconnected // which a client may be disconnected
@ -553,12 +554,14 @@ public Listener() throws IOException {
} }
private class Reader extends Thread { private class Reader extends Thread {
private volatile boolean adding = false; final private BlockingQueue<Connection> pendingConnections;
private final Selector readSelector; private final Selector readSelector;
Reader(String name) throws IOException { Reader(String name) throws IOException {
super(name); super(name);
this.pendingConnections =
new LinkedBlockingQueue<Connection>(readerPendingConnectionQueue);
this.readSelector = Selector.open(); this.readSelector = Selector.open();
} }
@ -580,10 +583,14 @@ private synchronized void doRunLoop() {
while (running) { while (running) {
SelectionKey key = null; SelectionKey key = null;
try { try {
// consume as many connections as currently queued to avoid
// unbridled acceptance of connections that starves the select
int size = pendingConnections.size();
for (int i=size; i>0; i--) {
Connection conn = pendingConnections.take();
conn.channel.register(readSelector, SelectionKey.OP_READ, conn);
}
readSelector.select(); readSelector.select();
while (adding) {
this.wait(1000);
}
Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator(); Iterator<SelectionKey> iter = readSelector.selectedKeys().iterator();
while (iter.hasNext()) { while (iter.hasNext()) {
@ -607,26 +614,14 @@ private synchronized void doRunLoop() {
} }
/** /**
* This gets reader into the state that waits for the new channel * Updating the readSelector while it's being used is not thread-safe,
* to be registered with readSelector. If it was waiting in select() * so the connection must be queued. The reader will drain the queue
* the thread will be woken up, otherwise whenever select() is called * and update its readSelector before performing the next select
* it will return even if there is nothing to read and wait
* in while(adding) for finishAdd call
*/ */
public void startAdd() { public void addConnection(Connection conn) throws InterruptedException {
adding = true; pendingConnections.put(conn);
readSelector.wakeup(); readSelector.wakeup();
} }
public synchronized SelectionKey registerChannel(SocketChannel channel)
throws IOException {
return channel.register(readSelector, SelectionKey.OP_READ);
}
public synchronized void finishAdd() {
adding = false;
this.notify();
}
void shutdown() { void shutdown() {
assert !running; assert !running;
@ -766,20 +761,23 @@ void doAccept(SelectionKey key) throws IOException, OutOfMemoryError {
Reader reader = getReader(); Reader reader = getReader();
try { try {
reader.startAdd(); c = new Connection(channel, Time.now());
SelectionKey readKey = reader.registerChannel(channel);
c = new Connection(readKey, channel, Time.now());
readKey.attach(c);
synchronized (connectionList) { synchronized (connectionList) {
connectionList.add(numConnections, c); connectionList.add(numConnections, c);
numConnections++; numConnections++;
} }
reader.addConnection(c);
if (LOG.isDebugEnabled()) if (LOG.isDebugEnabled())
LOG.debug("Server connection from " + c.toString() + LOG.debug("Server connection from " + c.toString() +
"; # active connections: " + numConnections + "; # active connections: " + numConnections +
"; # queued calls: " + callQueue.size()); "; # queued calls: " + callQueue.size());
} finally { } catch (InterruptedException ie) {
reader.finishAdd(); if (running) {
LOG.info(
getName() + ": disconnecting client " + c.getHostAddress() +
" due to unexpected interrupt");
}
closeConnection(c);
} }
} }
} }
@ -1190,8 +1188,7 @@ public class Connection {
private boolean sentNegotiate = false; private boolean sentNegotiate = false;
private boolean useWrap = false; private boolean useWrap = false;
public Connection(SelectionKey key, SocketChannel channel, public Connection(SocketChannel channel, long lastContact) {
long lastContact) {
this.channel = channel; this.channel = channel;
this.lastContact = lastContact; this.lastContact = lastContact;
this.data = null; this.data = null;
@ -2189,6 +2186,9 @@ protected Server(String bindAddress, int port,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT); CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
} }
this.readerPendingConnectionQueue = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_DEFAULT);
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize); this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2 * conf.getInt( this.maxIdleTime = 2 * conf.getInt(
CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY, CommonConfigurationKeysPublic.IPC_CLIENT_CONNECTION_MAXIDLETIME_KEY,

View File

@ -44,12 +44,16 @@
import java.util.Collections; import java.util.Collections;
import java.util.List; import java.util.List;
import java.util.Random; import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicInteger;
import javax.net.SocketFactory; import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
@ -613,6 +617,148 @@ public void testIpcWithServiceClass() throws IOException {
server.stop(); server.stop();
} }
private static class TestServerQueue extends Server {
final CountDownLatch firstCallLatch = new CountDownLatch(1);
final CountDownLatch callBlockLatch = new CountDownLatch(1);
TestServerQueue(int expectedCalls, int readers, int callQ, int handlers,
Configuration conf) throws IOException {
super(ADDRESS, 0, LongWritable.class, handlers, readers, callQ, conf, null, null);
}
@Override
public Writable call(RPC.RpcKind rpcKind, String protocol, Writable param,
long receiveTime) throws IOException {
firstCallLatch.countDown();
try {
callBlockLatch.await();
} catch (InterruptedException e) {
throw new IOException(e);
}
return param;
}
}
/**
* Check that reader queueing works
* @throws BrokenBarrierException
* @throws InterruptedException
*/
@Test(timeout=60000)
public void testIpcWithReaderQueuing() throws Exception {
// 1 reader, 1 connectionQ slot, 1 callq
for (int i=0; i < 10; i++) {
checkBlocking(1, 1, 1);
}
// 4 readers, 5 connectionQ slots, 2 callq
for (int i=0; i < 10; i++) {
checkBlocking(4, 5, 2);
}
}
// goal is to jam a handler with a connection, fill the callq with
// connections, in turn jamming the readers - then flood the server and
// ensure that the listener blocks when the reader connection queues fill
private void checkBlocking(int readers, int readerQ, int callQ) throws Exception {
int handlers = 1; // makes it easier
final Configuration conf = new Configuration();
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_CONNECTION_QUEUE_SIZE_KEY, readerQ);
// send in enough clients to block up the handlers, callq, and readers
int initialClients = readers + callQ + handlers;
// max connections we should ever end up accepting at once
int maxAccept = initialClients + readers*readerQ + 1; // 1 = listener
// stress it with 2X the max
int clients = maxAccept*2;
final AtomicInteger failures = new AtomicInteger(0);
final CountDownLatch callFinishedLatch = new CountDownLatch(clients);
// start server
final TestServerQueue server =
new TestServerQueue(clients, readers, callQ, handlers, conf);
final InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
Client.setConnectTimeout(conf, 10000);
// instantiate the threads, will start in batches
Thread[] threads = new Thread[clients];
for (int i=0; i<clients; i++) {
threads[i] = new Thread(new Runnable() {
@Override
public void run() {
Client client = new Client(LongWritable.class, conf);
try {
client.call(new LongWritable(Thread.currentThread().getId()),
addr, null, null, 60000, conf);
} catch (Throwable e) {
LOG.error(e);
failures.incrementAndGet();
return;
} finally {
callFinishedLatch.countDown();
client.stop();
}
}
});
}
// start enough clients to block up the handler, callq, and each reader;
// let the calls sequentially slot in to avoid some readers blocking
// and others not blocking in the race to fill the callq
for (int i=0; i < initialClients; i++) {
threads[i].start();
if (i==0) {
// let first reader block in a call
server.firstCallLatch.await();
} else if (i <= callQ) {
// let subsequent readers jam the callq, will happen immediately
while (server.getCallQueueLen() != i) {
Thread.sleep(1);
}
} // additional threads block the readers trying to add to the callq
}
// wait till everything is slotted, should happen immediately
Thread.sleep(10);
if (server.getNumOpenConnections() < initialClients) {
LOG.info("(initial clients) need:"+initialClients+" connections have:"+server.getNumOpenConnections());
Thread.sleep(100);
}
LOG.info("ipc layer should be blocked");
assertEquals(callQ, server.getCallQueueLen());
assertEquals(initialClients, server.getNumOpenConnections());
// now flood the server with the rest of the connections, the reader's
// connection queues should fill and then the listener should block
for (int i=initialClients; i<clients; i++) {
threads[i].start();
}
Thread.sleep(10);
if (server.getNumOpenConnections() < maxAccept) {
LOG.info("(max clients) need:"+maxAccept+" connections have:"+server.getNumOpenConnections());
Thread.sleep(100);
}
// check a few times to make sure we didn't go over
for (int i=0; i<4; i++) {
assertEquals(maxAccept, server.getNumOpenConnections());
Thread.sleep(100);
}
// sanity check that no calls have finished
assertEquals(clients, callFinishedLatch.getCount());
LOG.info("releasing the calls");
server.callBlockLatch.countDown();
callFinishedLatch.await();
for (Thread t : threads) {
t.join();
}
assertEquals(0, failures.get());
server.stop();
}
/** /**
* Make a call from a client and verify if header info is changed in server side * Make a call from a client and verify if header info is changed in server side
*/ */