diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 85540ccdfd..0e173b21e9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -15,6 +15,8 @@ Trunk (Unreleased) HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending it separately (sanjay Radia) + HADOOP-9380 Add totalLength to rpc response (sanjay Radia) + NEW FEATURES HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 02d41e5349..5294aa3b94 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -83,6 +83,7 @@ import org.apache.hadoop.util.Time; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import com.google.protobuf.CodedOutputStream; /** A client for an IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -242,7 +243,7 @@ public synchronized void setRpcResponse(Writable rpcResponse) { callComplete(); } - public synchronized Writable getRpcResult() { + public synchronized Writable getRpcResponse() { return rpcResponse; } } @@ -944,11 +945,14 @@ private void receiveRpcResponse() { touch(); try { + int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); if (header == null) { throw new IOException("Response is null."); } + int headerLen = header.getSerializedSize(); + headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); int callId = header.getCallId(); if (LOG.isDebugEnabled()) @@ -961,11 +965,28 @@ private void receiveRpcResponse() { value.readFields(in); // read value call.setRpcResponse(value); calls.remove(callId); + + // verify that length was correct + // only for ProtobufEngine where len can be verified easily + if (call.getRpcResponse() instanceof ProtobufRpcEngine.RpcWrapper) { + ProtobufRpcEngine.RpcWrapper resWrapper = + (ProtobufRpcEngine.RpcWrapper) call.getRpcResponse(); + if (totalLen != headerLen + resWrapper.getLength()) { + throw new RpcClientException( + "RPC response length mismatch on rpc success"); + } + } } else { // Rpc Request failed - final String exceptionClassName = header.hasExceptionClassName() ? + // Verify that length was correct + if (totalLen != headerLen) { + throw new RpcClientException( + "RPC response length mismatch on rpc error"); + } + + final String exceptionClassName = header.hasExceptionClassName() ? header.getExceptionClassName() : "ServerDidNotSetExceptionClassName"; - final String errorMsg = header.hasErrorMsg() ? + final String errorMsg = header.hasErrorMsg() ? header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; RemoteException re = new RemoteException(exceptionClassName, errorMsg); @@ -1251,7 +1272,7 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, call.error); } } else { - return call.getRpcResult(); + return call.getRpcResponse(); } } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index fefae53f36..376d4f1ee0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -48,7 +48,9 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.AbstractMessageLite; import com.google.protobuf.BlockingService; +import com.google.protobuf.CodedOutputStream; import com.google.protobuf.Descriptors.MethodDescriptor; import com.google.protobuf.Message; import com.google.protobuf.ServiceException; @@ -226,7 +228,7 @@ public Object invoke(Object proxy, Method method, Object[] args) Message returnMessage; try { returnMessage = prototype.newBuilderForType() - .mergeFrom(val.responseMessage).build(); + .mergeFrom(val.theResponseRead).build(); if (LOG.isTraceEnabled()) { LOG.trace(Thread.currentThread().getId() + ": Response <- " + @@ -267,6 +269,9 @@ public ConnectionId getConnectionId() { } } + interface RpcWrapper extends Writable { + int getLength(); + } /** * Wrapper for Protocol Buffer Requests * @@ -274,7 +279,7 @@ public ConnectionId getConnectionId() { * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} * use type Writable as a wrapper to work across multiple RpcEngine kinds. */ - private static class RpcRequestWrapper implements Writable { + private static class RpcRequestWrapper implements RpcWrapper { RequestHeaderProto requestHeader; Message theRequest; // for clientSide, the request is here byte[] theRequestRead; // for server side, the request is here @@ -312,6 +317,22 @@ public String toString() { return requestHeader.getDeclaringClassProtocolName() + "." + requestHeader.getMethodName(); } + + @Override + public int getLength() { + int headerLen = requestHeader.getSerializedSize(); + int reqLen; + if (theRequest != null) { + reqLen = theRequest.getSerializedSize(); + } else if (theRequestRead != null ) { + reqLen = theRequestRead.length; + } else { + throw new IllegalArgumentException( + "getLenght on uninilialized RpcWrapper"); + } + return CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen + + CodedOutputStream.computeRawVarint32Size(reqLen) + reqLen; + } } /** @@ -321,29 +342,43 @@ public String toString() { * Protobuf. Several methods on {@link org.apache.hadoop.ipc.Server and RPC} * use type Writable as a wrapper to work across multiple RpcEngine kinds. */ - private static class RpcResponseWrapper implements Writable { - byte[] responseMessage; + private static class RpcResponseWrapper implements RpcWrapper { + Message theResponse; // for senderSide, the response is here + byte[] theResponseRead; // for receiver side, the response is here @SuppressWarnings("unused") public RpcResponseWrapper() { } public RpcResponseWrapper(Message message) { - this.responseMessage = message.toByteArray(); + this.theResponse = message; } @Override public void write(DataOutput out) throws IOException { - out.writeInt(responseMessage.length); - out.write(responseMessage); + OutputStream os = DataOutputOutputStream.constructOutputStream(out); + theResponse.writeDelimitedTo(os); } @Override public void readFields(DataInput in) throws IOException { - int length = in.readInt(); - byte[] bytes = new byte[length]; - in.readFully(bytes); - responseMessage = bytes; + int length = ProtoUtil.readRawVarint32(in); + theResponseRead = new byte[length]; + in.readFully(theResponseRead); + } + + @Override + public int getLength() { + int resLen; + if (theResponse != null) { + resLen = theResponse.getSerializedSize(); + } else if (theResponseRead != null ) { + resLen = theResponseRead.length; + } else { + throw new IllegalArgumentException( + "getLenght on uninilialized RpcWrapper"); + } + return CodedOutputStream.computeRawVarint32Size(resLen) + resLen; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 2726c05b32..c44eb9426d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -73,6 +73,7 @@ import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.BytesWritable; +import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; @@ -107,6 +108,7 @@ import org.apache.hadoop.util.Time; import com.google.common.annotations.VisibleForTesting; +import com.google.protobuf.CodedOutputStream; /** An abstract IPC service. IPC calls take a single {@link Writable} as a * parameter, and return a {@link Writable} as their value. A service runs on @@ -202,7 +204,8 @@ static IpcSerializationType fromByte(byte b) { // 6 : Made RPC Request header explicit // 7 : Changed Ipc Connection Header to use Protocol buffers // 8 : SASL server always sends a final response - public static final byte CURRENT_VERSION = 8; + // 9 : Changes to protocol for HADOOP-8990 + public static final byte CURRENT_VERSION = 9; /** * Initial and max size of response buffer @@ -1512,10 +1515,15 @@ private void setupBadVersionResponse(int clientVersion) throws IOException { " cannot communicate with client version " + clientVersion; ByteArrayOutputStream buffer = new ByteArrayOutputStream(); - if (clientVersion >= 3) { + if (clientVersion >= 9) { + // Versions >>9 understand the normal response Call fakeCall = new Call(-1, null, this); - // Versions 3 and greater can interpret this exception - // response in the same manner + setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, + null, VersionMismatch.class.getName(), errMsg); + responder.doRespond(fakeCall); + } else if (clientVersion >= 3) { + Call fakeCall = new Call(-1, null, this); + // Versions 3 to 8 use older response setupResponseOldVersionFatal(buffer, fakeCall, null, VersionMismatch.class.getName(), errMsg); @@ -1997,17 +2005,34 @@ private void setupResponse(ByteArrayOutputStream responseBuf, throws IOException { responseBuf.reset(); DataOutputStream out = new DataOutputStream(responseBuf); - RpcResponseHeaderProto.Builder response = + RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.newBuilder(); - response.setCallId(call.callId); - response.setStatus(status); - response.setServerIpcVersionNum(Server.CURRENT_VERSION); - + headerBuilder.setCallId(call.callId); + headerBuilder.setStatus(status); + headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION); if (status == RpcStatusProto.SUCCESS) { + RpcResponseHeaderProto header = headerBuilder.build(); + final int headerLen = header.getSerializedSize(); + int fullLength = CodedOutputStream.computeRawVarint32Size(headerLen) + + headerLen; try { - response.build().writeDelimitedTo(out); - rv.write(out); + if (rv instanceof ProtobufRpcEngine.RpcWrapper) { + ProtobufRpcEngine.RpcWrapper resWrapper = + (ProtobufRpcEngine.RpcWrapper) rv; + fullLength += resWrapper.getLength(); + out.writeInt(fullLength); + header.writeDelimitedTo(out); + rv.write(out); + } else { // Have to serialize to buffer to get len + final DataOutputBuffer buf = new DataOutputBuffer(); + rv.write(buf); + byte[] data = buf.getData(); + fullLength += buf.getLength(); + out.writeInt(fullLength); + header.writeDelimitedTo(out); + out.write(data, 0, buf.getLength()); + } } catch (Throwable t) { LOG.warn("Error serializing call response for call " + call, t); // Call back to same function - this is OK since the @@ -2019,9 +2044,14 @@ private void setupResponse(ByteArrayOutputStream responseBuf, return; } } else { // Rpc Failure - response.setExceptionClassName(errorClass); - response.setErrorMsg(error); - response.build().writeDelimitedTo(out); + headerBuilder.setExceptionClassName(errorClass); + headerBuilder.setErrorMsg(error); + RpcResponseHeaderProto header = headerBuilder.build(); + int headerLen = header.getSerializedSize(); + final int fullLength = + CodedOutputStream.computeRawVarint32Size(headerLen) + headerLen; + out.writeInt(fullLength); + header.writeDelimitedTo(out); } if (call.connection.useWrap) { wrapWithSasl(responseBuf, call);