From 081eda94fec4bbf321047c93a9bb40be678b1666 Mon Sep 17 00:00:00 2001 From: Tsz-wo Sze Date: Fri, 23 Mar 2012 18:21:45 +0000 Subject: [PATCH] HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet. Contributed by Sanjay Radia git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1304542 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.txt | 3 + .../apache/hadoop/ipc/ProtobufRpcEngine.java | 64 +++++-------------- .../main/java/org/apache/hadoop/ipc/RPC.java | 4 +- .../java/org/apache/hadoop/ipc/Server.java | 4 +- .../src/main/proto/hadoop_rpc.proto | 41 ++---------- 5 files changed, 26 insertions(+), 90 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a91d4df460..75ebdf13b3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -211,6 +211,9 @@ Release 0.23.3 - UNRELEASED HADOOP-8200. Remove HADOOP_[JOBTRACKER|TASKTRACKER]_OPTS. (eli) + HADOOP-8184. ProtoBuf RPC engine uses the IPC layer reply packet. + (Sanjay Radia via szetszwo) + OPTIMIZATIONS BUG FIXES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index ff2c1a4a53..ffeda175d7 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -39,15 +39,12 @@ import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcInvoker; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; -import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcExceptionProto; + import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcRequestProto; -import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto; -import org.apache.hadoop.ipc.protobuf.HadoopRpcProtos.HadoopRpcResponseProto.ResponseStatus; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.util.ProtoUtil; -import org.apache.hadoop.util.StringUtils; import com.google.common.annotations.VisibleForTesting; import com.google.protobuf.BlockingService; @@ -191,21 +188,11 @@ public Object invoke(Object proxy, Method method, Object[] args) throw new ServiceException(e); } - HadoopRpcResponseProto response = val.message; if (LOG.isDebugEnabled()) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); } - - // Wrap the received message - ResponseStatus status = response.getStatus(); - if (status != ResponseStatus.SUCCESS) { - RemoteException re = new RemoteException(response.getException() - .getExceptionName(), response.getException().getStackTrace()); - re.fillInStackTrace(); - throw new ServiceException(re); - } - + Message prototype = null; try { prototype = getReturnProtoType(method); @@ -215,7 +202,7 @@ public Object invoke(Object proxy, Method method, Object[] args) Message returnMessage; try { returnMessage = prototype.newBuilderForType() - .mergeFrom(response.getResponse()).build(); + .mergeFrom(val.responseMessage).build(); } catch (Throwable e) { throw new ServiceException(e); } @@ -287,28 +274,28 @@ public void readFields(DataInput in) throws IOException { * Writable Wrapper for Protocol Buffer Responses */ private static class RpcResponseWritable implements Writable { - HadoopRpcResponseProto message; + byte[] responseMessage; @SuppressWarnings("unused") public RpcResponseWritable() { } - public RpcResponseWritable(HadoopRpcResponseProto message) { - this.message = message; + public RpcResponseWritable(Message message) { + this.responseMessage = message.toByteArray(); } @Override public void write(DataOutput out) throws IOException { - ((Message)message).writeDelimitedTo( - DataOutputOutputStream.constructOutputStream(out)); + out.writeInt(responseMessage.length); + out.write(responseMessage); } @Override public void readFields(DataInput in) throws IOException { - int length = ProtoUtil.readRawVarint32(in); + int length = in.readInt(); byte[] bytes = new byte[length]; in.readFully(bytes); - message = HadoopRpcResponseProto.parseFrom(bytes); + responseMessage = bytes; } } @@ -356,24 +343,6 @@ numReaders, queueSizePerHandler, conf, classNameBase(protocolImpl registerProtocolAndImpl(RpcKind.RPC_PROTOCOL_BUFFER, protocolClass, protocolImpl); } - - private static RpcResponseWritable handleException(Throwable e) { - HadoopRpcExceptionProto exception = HadoopRpcExceptionProto.newBuilder() - .setExceptionName(e.getClass().getName()) - .setStackTrace(StringUtils.stringifyException(e)).build(); - HadoopRpcResponseProto response = HadoopRpcResponseProto.newBuilder() - .setStatus(ResponseStatus.ERRROR).setException(exception).build(); - return new RpcResponseWritable(response); - } - - private static HadoopRpcResponseProto constructProtoSpecificRpcSuccessResponse( - Message message) { - HadoopRpcResponseProto res = HadoopRpcResponseProto.newBuilder() - .setResponse(message.toByteString()) - .setStatus(ResponseStatus.SUCCESS) - .build(); - return res; - } /** * Protobuf invoker for {@link RpcInvoker} @@ -418,7 +387,7 @@ private static ProtoClassProtoImpl getProtocolImpl(RPC.Server server, * */ public Writable call(RPC.Server server, String protocol, - Writable writableRequest, long receiveTime) throws IOException { + Writable writableRequest, long receiveTime) throws Exception { RpcRequestWritable request = (RpcRequestWritable) writableRequest; HadoopRpcRequestProto rpcRequest = request.message; String methodName = rpcRequest.getMethodName(); @@ -436,7 +405,7 @@ public Writable call(RPC.Server server, String protocol, String msg = "Unknown method " + methodName + " called on " + protocol + " protocol."; LOG.warn(msg); - return handleException(new RpcServerException(msg)); + throw new RpcServerException(msg); } Message prototype = service.getRequestPrototype(methodDescriptor); Message param = prototype.newBuilderForType() @@ -457,14 +426,11 @@ public Writable call(RPC.Server server, String protocol, server.rpcDetailedMetrics.addProcessingTime(methodName, processingTime); } catch (ServiceException e) { - Throwable cause = e.getCause(); - return handleException(cause != null ? cause : e); + throw (Exception) e.getCause(); } catch (Exception e) { - return handleException(e); + throw e; } - - HadoopRpcResponseProto response = constructProtoSpecificRpcSuccessResponse(result); - return new RpcResponseWritable(response); + return new RpcResponseWritable(result); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index eee364ccde..303b3aac3a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -85,7 +85,7 @@ interface RpcInvoker { * @throws IOException **/ public Writable call(Server server, String protocol, - Writable rpcRequest, long receiveTime) throws IOException ; + Writable rpcRequest, long receiveTime) throws Exception ; } static final Log LOG = LogFactory.getLog(RPC.class); @@ -880,7 +880,7 @@ public Server addProtocol(RpcKind rpcKind, Class protocolClass, @Override public Writable call(RpcKind rpcKind, String protocol, - Writable rpcRequest, long receiveTime) throws IOException { + Writable rpcRequest, long receiveTime) throws Exception { return getRpcInvoker(rpcKind).call(this, protocol, rpcRequest, receiveTime); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 21d5c0ab02..f11224c1d2 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -1952,13 +1952,13 @@ public synchronized InetSocketAddress getListenerAddress() { * Writable, long)} instead */ @Deprecated - public Writable call(Writable param, long receiveTime) throws IOException { + public Writable call(Writable param, long receiveTime) throws Exception { return call(RpcKind.RPC_BUILTIN, null, param, receiveTime); } /** Called for each call. */ public abstract Writable call(RpcKind rpcKind, String protocol, - Writable param, long receiveTime) throws IOException; + Writable param, long receiveTime) throws Exception; /** * Authorize the incoming client connection. diff --git a/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto b/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto index 7af8eae182..41d075cb73 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/hadoop_rpc.proto @@ -25,8 +25,11 @@ option java_outer_classname = "HadoopRpcProtos"; option java_generate_equals_and_hash = true; /** - * Message used to marshal the client request + * This message is used for Protobuf Rpc Engine. + * The message is used to marshal a Rpc-request * from RPC client to the RPC server. + * The Response to the Rpc call (including errors) are handled + * as part of the standard Rpc response. */ message HadoopRpcRequestProto { /** Name of the RPC method */ @@ -41,39 +44,3 @@ message HadoopRpcRequestProto { /** protocol version of class declaring the called method */ required uint64 clientProtocolVersion = 4; } - -/** - * At the RPC layer, this message is used to indicate - * the server side exception the the RPC client. - * - * Hadoop RPC client throws an exception indicated - * by exceptionName with the stackTrace. - */ -message HadoopRpcExceptionProto { - /** Class name of the exception thrown from the server */ - - optional string exceptionName = 1; - /** Exception stack trace from the server side */ - optional string stackTrace = 2; -} - -/** - * This message is used to marshal the response from - * RPC server to the client. - */ -message HadoopRpcResponseProto { - /** Status of IPC call */ - enum ResponseStatus { - SUCCESS = 1; - ERRROR = 2; - } - - required ResponseStatus status = 1; - - // Protobuf response payload from the server, when status is SUCCESS. - optional bytes response = 2; - - // Exception when status is ERROR or FATAL - optional HadoopRpcExceptionProto exception = 3; -} -