From 805e9b5b6d835d1b7a50af18967afb8eebdf8606 Mon Sep 17 00:00:00 2001 From: Colin McCabe Date: Mon, 1 Jul 2013 23:17:13 +0000 Subject: [PATCH] HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick McCabe) git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1498737 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 +++ .../hadoop/fs/CommonConfigurationKeys.java | 5 ++++ .../java/org/apache/hadoop/ipc/Server.java | 26 ++++++++++++++----- .../apache/hadoop/ipc/TestProtoBufRpc.java | 25 +++++++++++++++++- 4 files changed, 52 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index ccfe9b7df4..0a4fd63099 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -440,6 +440,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9619 Mark stability of .proto files (sanjay Radia) + HADOOP-9676. Make maximum RPC buffer size configurable (Colin Patrick + McCabe) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index c5d86f140a..68632503e9 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -64,6 +64,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "ipc.server.read.threadpool.size"; /** Default value for IPC_SERVER_RPC_READ_THREADS_KEY */ public static final int IPC_SERVER_RPC_READ_THREADS_DEFAULT = 1; + + public static final String IPC_MAXIMUM_DATA_LENGTH = + "ipc.maximum.data.length"; + + public static final int IPC_MAXIMUM_DATA_LENGTH_DEFAULT = 64 * 1024 * 1024; /** How many calls per handler are allowed in the queue. */ public static final String IPC_SERVER_HANDLER_QUEUE_SIZE_KEY = diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2096773e3f..ca2b484008 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -343,6 +343,7 @@ public static boolean isRpcInvocation() { private int maxQueueSize; private final int maxRespSize; private int socketSendBufferSize; + private final int maxDataLength; private final boolean tcpNoDelay; // if T then disable Nagle's Algorithm volatile private boolean running = true; // true while server runs @@ -1380,7 +1381,22 @@ private void disposeSasl() { } } } - + + private void checkDataLength(int dataLength) throws IOException { + if (dataLength < 0) { + String error = "Unexpected data length " + dataLength + + "!! from " + getHostAddress(); + LOG.warn(error); + throw new IOException(error); + } else if (dataLength > maxDataLength) { + String error = "Requested data length " + dataLength + + " is longer than maximum configured RPC length " + + maxDataLength + ". RPC came from " + getHostAddress(); + LOG.warn(error); + throw new IOException(error); + } + } + public int readAndProcess() throws IOException, InterruptedException { while (true) { /* Read at most one RPC. If the header is not read completely yet @@ -1442,11 +1458,7 @@ public int readAndProcess() throws IOException, InterruptedException { dataLengthBuffer.clear(); return 0; // ping message } - - if (dataLength < 0) { - LOG.warn("Unexpected data length " + dataLength + "!! from " + - getHostAddress()); - } + checkDataLength(dataLength); data = ByteBuffer.allocate(dataLength); } @@ -1981,6 +1993,8 @@ protected Server(String bindAddress, int port, this.rpcRequestClass = rpcRequestClass; this.handlerCount = handlerCount; this.socketSendBufferSize = 0; + this.maxDataLength = conf.getInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, + CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH_DEFAULT); if (queueSizePerHandler != -1) { this.maxQueueSize = queueSizePerHandler; } else { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index e7200860dc..2ec56eb5ea 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -24,7 +24,9 @@ import java.net.InetSocketAddress; import java.net.URISyntaxException; +import org.apache.commons.lang.StringUtils; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoRequestProto; import org.apache.hadoop.ipc.protobuf.TestProtos.EchoResponseProto; @@ -113,6 +115,7 @@ public EchoResponseProto echo2(RpcController unused, EchoRequestProto request) @Before public void setUp() throws IOException { // Setup server for both protocols conf = new Configuration(); + conf.setInt(CommonConfigurationKeys.IPC_MAXIMUM_DATA_LENGTH, 1024); // Set RPC engine to protobuf RPC engine RPC.setProtocolEngine(conf, TestRpcService.class, ProtobufRpcEngine.class); @@ -230,4 +233,24 @@ public void testProtoBufRandomException() throws Exception { re.getErrorCode().equals(RpcErrorCodeProto.ERROR_APPLICATION)); } } -} \ No newline at end of file + + @Test(timeout=6000) + public void testExtraLongRpc() throws Exception { + TestRpcService2 client = getClient2(); + final String shortString = StringUtils.repeat("X", 4); + EchoRequestProto echoRequest = EchoRequestProto.newBuilder() + .setMessage(shortString).build(); + // short message goes through + EchoResponseProto echoResponse = client.echo2(null, echoRequest); + + final String longString = StringUtils.repeat("X", 4096); + echoRequest = EchoRequestProto.newBuilder() + .setMessage(longString).build(); + try { + echoResponse = client.echo2(null, echoRequest); + Assert.fail("expected extra-long RPC to fail"); + } catch (ServiceException se) { + // expected + } + } +}