HADOOP-6764. Add number of reader threads and queue length as configuration parameters in RPC.getServer. Contributed by Dmytro Molkov.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1038918 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Hairong Kuang 2010-11-25 04:36:17 +00:00
parent 19a762d5c2
commit 3a43e5930b
7 changed files with 97 additions and 24 deletions

View File

@ -15,7 +15,10 @@ Trunk (unreleased changes)
improve other messaging. (nigel)
HADOOP-7001. Configuration changes can occur via the Reconfigurable
interface. (Patrick Kline via dhruba)
interface. (Patrick Kling via dhruba)
HADOOP-6764. Add number of reader threads and queue length as
configuration parameters in RPC.getServer. (Dmytro Molkov via hairong)
OPTIMIZATIONS

View File

@ -211,14 +211,15 @@ public Object[] call(Method method, Object[][] params,
/** Construct a server for a protocol implementation instance listening on a
* port and address. */
public RPC.Server getServer(Class<?> iface, Object impl, String bindAddress,
int port, int numHandlers, boolean verbose,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException {
return ENGINE.getServer(TunnelProtocol.class,
new TunnelResponder(iface, impl),
bindAddress, port, numHandlers, verbose, conf,
secretManager);
bindAddress, port, numHandlers, numReaders,
queueSizePerHandler, verbose, conf, secretManager);
}
}

View File

@ -380,18 +380,33 @@ public static Server getServer(Class<?> protocol,
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers, verbose,
conf, secretManager);
.getServer(protocol, instance, bindAddress, port, numHandlers, -1, -1,
verbose, conf, secretManager);
}
/** Construct a server for a protocol implementation instance. */
public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return getProtocolEngine(protocol, conf)
.getServer(protocol, instance, bindAddress, port, numHandlers,
numReaders, queueSizePerHandler, verbose, conf, secretManager);
}
/** An RPC Server. */
public abstract static class Server extends org.apache.hadoop.ipc.Server {
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
Class<? extends Writable> paramClass, int handlerCount,
int numReaders, int queueSizePerHandler,
Configuration conf, String serverName,
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
super(bindAddress, port, paramClass, handlerCount, conf, serverName, secretManager);
super(bindAddress, port, paramClass, handlerCount, numReaders, queueSizePerHandler,
conf, serverName, secretManager);
}
}

View File

@ -50,7 +50,8 @@ Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
/** Construct a server for a protocol implementation instance. */
RPC.Server getServer(Class<?> protocol, Object instance, String bindAddress,
int port, int numHandlers, boolean verbose,
int port, int numHandlers, int numReaders,
int queueSizePerHandler, boolean verbose,
Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager
) throws IOException;

View File

@ -1451,16 +1451,18 @@ protected Server(String bindAddress, int port,
Configuration conf)
throws IOException
{
this(bindAddress, port, paramClass, handlerCount, conf, Integer.toString(port), null);
this(bindAddress, port, paramClass, handlerCount, -1, -1, conf, Integer.toString(port), null);
}
/** Constructs a server listening on the named port and address. Parameters passed must
* be of the named class. The <code>handlerCount</handlerCount> determines
* the number of handler threads that will be used to process calls.
*
* If queueSizePerHandler or numReaders are not -1 they will be used instead of parameters
* from configuration. Otherwise the configuration will be picked up.
*/
@SuppressWarnings("unchecked")
protected Server(String bindAddress, int port,
Class<? extends Writable> paramClass, int handlerCount,
Class<? extends Writable> paramClass, int handlerCount, int numReaders, int queueSizePerHandler,
Configuration conf, String serverName, SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
this.bindAddress = bindAddress;
@ -1469,15 +1471,23 @@ protected Server(String bindAddress, int port,
this.paramClass = paramClass;
this.handlerCount = handlerCount;
this.socketSendBufferSize = 0;
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
if (queueSizePerHandler != -1) {
this.maxQueueSize = queueSizePerHandler;
} else {
this.maxQueueSize = handlerCount * conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
}
this.maxRespSize = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_MAX_RESPONSE_SIZE_DEFAULT);
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
if (numReaders != -1) {
this.readThreads = numReaders;
} else {
this.readThreads = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
}
this.callQueue = new LinkedBlockingQueue<Call>(maxQueueSize);
this.maxIdleTime = 2*conf.getInt("ipc.client.connection.maxidletime", 1000);
this.maxConnectionsToNuke = conf.getInt("ipc.client.kill.max", 10);
@ -1691,7 +1701,22 @@ public int getCallQueueLen() {
return callQueue.size();
}
/**
* The maximum size of the rpc call queue of this server.
* @return The maximum size of the rpc call queue.
*/
public int getMaxQueueSize() {
return maxQueueSize;
}
/**
* The number of reader threads for this server.
* @return The number of reader threads.
*/
public int getNumReaders() {
return readThreads;
}
/**
* When the read or write buffer size is larger than this limit, i/o will be
* done in chunks of this size. Most RPC requests and responses would be

View File

@ -285,11 +285,12 @@ public Object[] call(Method method, Object[][] params,
* port and address. */
public Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port,
int numHandlers, boolean verbose, Configuration conf,
int numHandlers, int numReaders, int queueSizePerHandler,
boolean verbose, Configuration conf,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
return new Server(instance, conf, bindAddress, port, numHandlers,
verbose, secretManager);
numReaders, queueSizePerHandler, verbose, secretManager);
}
/** An RPC Server. */
@ -305,7 +306,7 @@ public static class Server extends RPC.Server {
*/
public Server(Object instance, Configuration conf, String bindAddress, int port)
throws IOException {
this(instance, conf, bindAddress, port, 1, false, null);
this(instance, conf, bindAddress, port, 1, -1, -1, false, null);
}
private static String classNameBase(String className) {
@ -325,10 +326,11 @@ private static String classNameBase(String className) {
* @param verbose whether each call should be logged
*/
public Server(Object instance, Configuration conf, String bindAddress, int port,
int numHandlers, boolean verbose,
int numHandlers, int numReaders, int queueSizePerHandler, boolean verbose,
SecretManager<? extends TokenIdentifier> secretManager)
throws IOException {
super(bindAddress, port, Invocation.class, numHandlers, conf,
super(bindAddress, port, Invocation.class, numHandlers, numReaders,
queueSizePerHandler, conf,
classNameBase(instance.getClass().getName()), secretManager);
this.instance = instance;
this.verbose = verbose;

View File

@ -190,6 +190,28 @@ public void run() {
}
}
}
public void testConfRpc() throws Exception {
Server server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, 1, false, conf, null);
// Just one handler
int confQ = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_KEY,
CommonConfigurationKeys.IPC_SERVER_HANDLER_QUEUE_SIZE_DEFAULT);
assertEquals(confQ, server.getMaxQueueSize());
int confReaders = conf.getInt(
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY,
CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_DEFAULT);
assertEquals(confReaders, server.getNumReaders());
server.stop();
server = RPC.getServer(TestProtocol.class,
new TestImpl(), ADDRESS, 0, 1, 3, 200, false, conf, null);
assertEquals(3, server.getNumReaders());
assertEquals(200, server.getMaxQueueSize());
server.stop();
}
public void testSlowRpc() throws Exception {
System.out.println("Testing Slow RPC");
@ -234,6 +256,10 @@ public void testSlowRpc() throws Exception {
System.out.println("Down slow rpc testing");
}
}
public void testRPCConf(Configuration conf) throws Exception {
}
public void testCalls(Configuration conf) throws Exception {
Server server = RPC.getServer(TestProtocol.class,