diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 16f0db0777..78197c6ba9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -8,6 +8,12 @@ Trunk (Unreleased) FSDataOutputStream.sync() and Syncable.sync(). (szetszwo) HADOOP-8886. Remove KFS support. (eli) + + HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to + avoid an extra copy (Sanjay Radia) + + HADOOP-9151 Include RPC error info in RpcResponseHeader instead of sending + it separately (sanjay Radia) NEW FEATURES @@ -157,8 +163,6 @@ Trunk (Unreleased) HADOOP-9112. test-patch should -1 for @Tests without a timeout (Surenkumar Nihalani via bobby) - HADOOP-9163 The rpc msg in ProtobufRpcEngine.proto should be moved out to - avoid an extra copy (Sanjay Radia) BUG FIXES diff --git a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml index a11383cd03..98dc0813fc 100644 --- a/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml +++ b/hadoop-common-project/hadoop-common/dev-support/findbugsExcludeFile.xml @@ -308,4 +308,11 @@ + + + + + + + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 24a84c3566..02d41e5349 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -59,7 +59,6 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.io.Writable; -import org.apache.hadoop.io.WritableUtils; import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; @@ -945,31 +944,38 @@ private void receiveRpcResponse() { touch(); try { - RpcResponseHeaderProto response = + RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); - if (response == null) { + if (header == null) { throw new IOException("Response is null."); } - int callId = response.getCallId(); + int callId = header.getCallId(); if (LOG.isDebugEnabled()) LOG.debug(getName() + " got value #" + callId); Call call = calls.get(callId); - RpcStatusProto status = response.getStatus(); + RpcStatusProto status = header.getStatus(); if (status == RpcStatusProto.SUCCESS) { Writable value = ReflectionUtils.newInstance(valueClass, conf); value.readFields(in); // read value call.setRpcResponse(value); calls.remove(callId); - } else if (status == RpcStatusProto.ERROR) { - call.setException(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); - calls.remove(callId); - } else if (status == RpcStatusProto.FATAL) { - // Close the connection - markClosed(new RemoteException(WritableUtils.readString(in), - WritableUtils.readString(in))); + } else { // Rpc Request failed + final String exceptionClassName = header.hasExceptionClassName() ? + header.getExceptionClassName() : + "ServerDidNotSetExceptionClassName"; + final String errorMsg = header.hasErrorMsg() ? + header.getErrorMsg() : "ServerDidNotSetErrorMsg" ; + RemoteException re = + new RemoteException(exceptionClassName, errorMsg); + if (status == RpcStatusProto.ERROR) { + call.setException(re); + calls.remove(callId); + } else if (status == RpcStatusProto.FATAL) { + // Close the connection + markClosed(re); + } } } catch (IOException e) { markClosed(e); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index a859138fd9..2726c05b32 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2001,6 +2001,7 @@ private void setupResponse(ByteArrayOutputStream responseBuf, RpcResponseHeaderProto.newBuilder(); response.setCallId(call.callId); response.setStatus(status); + response.setServerIpcVersionNum(Server.CURRENT_VERSION); if (status == RpcStatusProto.SUCCESS) { @@ -2017,13 +2018,10 @@ private void setupResponse(ByteArrayOutputStream responseBuf, StringUtils.stringifyException(t)); return; } - } else { - if (status == RpcStatusProto.FATAL) { - response.setServerIpcVersionNum(Server.CURRENT_VERSION); - } + } else { // Rpc Failure + response.setExceptionClassName(errorClass); + response.setErrorMsg(error); response.build().writeDelimitedTo(out); - WritableUtils.writeString(out, errorClass); - WritableUtils.writeString(out, error); } if (call.connection.useWrap) { wrapWithSasl(responseBuf, call); diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 723434bf2f..82bd4cbc85 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -70,12 +70,11 @@ message RpcRequestHeaderProto { // the header for the RpcRequest * | RpcResponseHeaderProto - serialized delimited ie has len | * +------------------------------------------------------------------+ * | if request is successful: | - * | - RpcResponse - The actual rpc response bytes | - * | This response is serialized based on RpcKindProto | + * | - RpcResponse - The actual rpc response bytes follow | + * the response header | + * | This response is serialized based on RpcKindProto | * | if request fails : | - * | - length (4 byte int) + Class name of exception - UTF-8 string | - * | - length (4 byte int) + Stacktrace - UTF-8 string | - * | if the strings are null then the length is -1 | + * | The rpc response header contains the necessary info | * +------------------------------------------------------------------+ * */ @@ -88,5 +87,7 @@ message RpcResponseHeaderProto { required uint32 callId = 1; // callId used in Request required RpcStatusProto status = 2; - optional uint32 serverIpcVersionNum = 3; // in case of an fatal IPC error + optional uint32 serverIpcVersionNum = 3; // Sent if success or fail + optional string exceptionClassName = 4; // if request fails + optional string errorMsg = 5; // if request fails, often contains strack trace }