diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 042d3c50c3..f4418caa77 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -153,6 +153,9 @@ Trunk (Unreleased) HADOOP-9112. test-patch should -1 for @Tests without a timeout (Surenkumar Nihalani via bobby) + HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to + avoid an extra copy (Sanjay Radia) + BUG FIXES HADOOP-8419. Fixed GzipCode NPE reset for IBM JDK. (Yu Li via eyang) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 0483c7c445..fefae53f36 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -21,6 +21,7 @@ import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; +import java.io.OutputStream; import java.lang.reflect.Method; import java.lang.reflect.Proxy; import java.net.InetSocketAddress; @@ -39,7 +40,7 @@ import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; -import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestProto; +import org.apache.hadoop.ipc.protobuf.ProtobufRpcEngineProtos.RequestHeaderProto; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; @@ -128,25 +129,12 @@ public Invoker(Class protocol, Client.ConnectionId connId, .getProtocolVersion(protocol); } - private RequestProto constructRpcRequest(Method method, - Object[] params) throws ServiceException { - RequestProto rpcRequest; - RequestProto.Builder builder = RequestProto + private RequestHeaderProto constructRpcRequestHeader(Method method) { + RequestHeaderProto.Builder builder = RequestHeaderProto .newBuilder(); builder.setMethodName(method.getName()); + - if (params.length != 2) { // RpcController + Message - throw new ServiceException("Too many parameters for request. Method: [" - + method.getName() + "]" + ", Expected: 2, Actual: " - + params.length); - } - if (params[1] == null) { - throw new ServiceException("null param while calling Method: [" - + method.getName() + "]"); - } - - Message param = (Message) params[1]; - builder.setRequest(param.toByteString()); // For protobuf, {@code protocol} used when creating client side proxy is // the interface extending BlockingInterface, which has the annotations // such as ProtocolName etc. @@ -160,8 +148,7 @@ private RequestProto constructRpcRequest(Method method, // For PB this may limit the use of mixins on client side. builder.setDeclaringClassProtocolName(protocolName); builder.setClientProtocolVersion(clientProtocolVersion); - rpcRequest = builder.build(); - return rpcRequest; + return builder.build(); } /** @@ -189,8 +176,18 @@ public Object invoke(Object proxy, Method method, Object[] args) if (LOG.isDebugEnabled()) { startTime = Time.now(); } + + if (args.length != 2) { // RpcController + Message + throw new ServiceException("Too many parameters for request. Method: [" + + method.getName() + "]" + ", Expected: 2, Actual: " + + args.length); + } + if (args[1] == null) { + throw new ServiceException("null param while calling Method: [" + + method.getName() + "]"); + } - RequestProto rpcRequest = constructRpcRequest(method, args); + RequestHeaderProto rpcRequestHeader = constructRpcRequestHeader(method); RpcResponseWrapper val = null; if (LOG.isTraceEnabled()) { @@ -198,9 +195,12 @@ public Object invoke(Object proxy, Method method, Object[] args) remoteId + ": " + method.getName() + " {" + TextFormat.shortDebugString((Message) args[1]) + "}"); } + + + Message theRequest = (Message) args[1]; try { val = (RpcResponseWrapper) client.call(RPC.RpcKind.RPC_PROTOCOL_BUFFER, - new RpcRequestWrapper(rpcRequest), remoteId); + new RpcRequestWrapper(rpcRequestHeader, theRequest), remoteId); } catch (Throwable e) { if (LOG.isTraceEnabled()) { @@ -275,20 +275,25 @@ public ConnectionId getConnectionId() { * use type Writable as a wrapper to work across multiple RpcEngine kinds. */ private static class RpcRequestWrapper implements Writable { - RequestProto message; + RequestHeaderProto requestHeader; + Message theRequest; // for clientSide, the request is here + byte[] theRequestRead; // for server side, the request is here @SuppressWarnings("unused") public RpcRequestWrapper() { } - RpcRequestWrapper(RequestProto message) { - this.message = message; + RpcRequestWrapper(RequestHeaderProto requestHeader, Message theRequest) { + this.requestHeader = requestHeader; + this.theRequest = theRequest; } @Override public void write(DataOutput out) throws IOException { - ((Message)message).writeDelimitedTo( - DataOutputOutputStream.constructOutputStream(out)); + OutputStream os = DataOutputOutputStream.constructOutputStream(out); + + ((Message)requestHeader).writeDelimitedTo(os); + theRequest.writeDelimitedTo(os); } @Override @@ -296,13 +301,16 @@ public void readFields(DataInput in) throws IOException { int length = ProtoUtil.readRawVarint32(in); byte[] bytes = new byte[length]; in.readFully(bytes); - message = RequestProto.parseFrom(bytes); + requestHeader = RequestHeaderProto.parseFrom(bytes); + length = ProtoUtil.readRawVarint32(in); + theRequestRead = new byte[length]; + in.readFully(theRequestRead); } @Override public String toString() { - return message.getDeclaringClassProtocolName() + "." + - message.getMethodName(); + return requestHeader.getDeclaringClassProtocolName() + "." + + requestHeader.getMethodName(); } } @@ -434,7 +442,7 @@ private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, public Writable call(RPC.Server server, String connectionProtocolName, Writable writableRequest, long receiveTime) throws Exception { RpcRequestWrapper request = (RpcRequestWrapper) writableRequest; - RequestProto rpcRequest = request.message; + RequestHeaderProto rpcRequest = request.requestHeader; String methodName = rpcRequest.getMethodName(); @@ -474,7 +482,8 @@ public Writable call(RPC.Server server, String connectionProtocolName, } Message prototype = service.getRequestPrototype(methodDescriptor); Message param = prototype.newBuilderForType() - .mergeFrom(rpcRequest.getRequest()).build(); + .mergeFrom(request.theRequestRead).build(); + Message result; try { long startTime = Time.now(); diff --git a/hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto b/hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto index c0bb23587a..bb915f2802 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/ProtobufRpcEngine.proto @@ -1,4 +1,4 @@ -/** +/**DER * Licensed to the Apache Software Foundation (ASF) under one * or more contributor license agreements. See the NOTICE file * distributed with this work for additional information @@ -28,20 +28,17 @@ option java_generate_equals_and_hash = true; package hadoop.common; /** - * This message is used for Protobuf Rpc Engine. - * The message is used to marshal a Rpc-request - * from RPC client to the RPC server. + * This message is the header for the Protobuf Rpc Engine + * when sending a RPC request from RPC client to the RPC server. + * The actual request (serialized as protobuf) follows this request. * * No special header is needed for the Rpc Response for Protobuf Rpc Engine. * The normal RPC response header (see RpcHeader.proto) are sufficient. */ -message RequestProto { +message RequestHeaderProto { /** Name of the RPC method */ required string methodName = 1; - /** Bytes corresponding to the client protobuf request */ - optional bytes request = 2; - /** * RPCs for a particular interface (ie protocol) are done using a * IPC connection that is setup using rpcProxy.