diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index a389b1b0b0..a97d7ddfeb 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -475,6 +475,9 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9717. Add retry attempt count to the RPC requests. (jing9) + HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto. + (szetszwo) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 99a721abee..64e401e605 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -35,6 +35,7 @@ import java.net.SocketTimeoutException; import java.net.UnknownHostException; import java.security.PrivilegedExceptionAction; +import java.util.Arrays; import java.util.Hashtable; import java.util.Iterator; import java.util.Map.Entry; @@ -276,6 +277,24 @@ synchronized boolean isZeroReference() { return refCount==0; } + /** Check the rpc response header. */ + void checkResponse(RpcResponseHeaderProto header) throws IOException { + if (header == null) { + throw new IOException("Response is null."); + } + if (header.hasClientId()) { + // check client IDs + final byte[] id = header.getClientId().toByteArray(); + if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) { + if (!Arrays.equals(id, clientId)) { + throw new IOException("Client IDs not matched: local ID=" + + StringUtils.byteToHexString(clientId) + ", ID in reponse=" + + StringUtils.byteToHexString(header.getClientId().toByteArray())); + } + } + } + } + Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { return new Call(rpcKind, rpcRequest); } @@ -1054,9 +1073,8 @@ private void receiveRpcResponse() { int totalLen = in.readInt(); RpcResponseHeaderProto header = RpcResponseHeaderProto.parseDelimitedFrom(in); - if (header == null) { - throw new IOException("Response is null."); - } + checkResponse(header); + int headerLen = header.getSerializedSize(); headerLen += CodedOutputStream.computeRawVarint32Size(headerLen); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 7812ecda8e..9a189e941a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -2289,7 +2289,9 @@ private void setupResponse(ByteArrayOutputStream responseBuf, DataOutputStream out = new DataOutputStream(responseBuf); RpcResponseHeaderProto.Builder headerBuilder = RpcResponseHeaderProto.newBuilder(); + headerBuilder.setClientId(ByteString.copyFrom(call.clientId)); headerBuilder.setCallId(call.callId); + headerBuilder.setRetryCount(call.retryCount); headerBuilder.setStatus(status); headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 6b013b5900..2f61d98682 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -130,6 +130,8 @@ message RpcResponseHeaderProto { optional string exceptionClassName = 4; // if request fails optional string errorMsg = 5; // if request fails, often contains strack trace optional RpcErrorCodeProto errorDetail = 6; // in case of error + optional bytes clientId = 7; // Globally unique client ID + optional sint32 retryCount = 8 [default = -1]; } message RpcSaslProto { @@ -153,4 +155,4 @@ message RpcSaslProto { required SaslState state = 2; optional bytes token = 3; repeated SaslAuth auths = 4; -} \ No newline at end of file +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d8544e0466..2b33f5210f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -60,6 +60,7 @@ import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto; import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.StringUtils; @@ -708,33 +709,45 @@ public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception { assertRetriesOnSocketTimeouts(conf, 4); } - private static class CallId { + private static class CallInfo { int id = RpcConstants.INVALID_CALL_ID; + int retry = RpcConstants.INVALID_RETRY_COUNT; } /** - * Test if the rpc server uses the call id generated by the rpc client. + * Test if + * (1) the rpc server uses the call id/retry provided by the rpc client, and + * (2) the rpc client receives the same call id/retry from the rpc server. */ @Test - public void testCallIds() throws Exception { - final CallId callId = new CallId(); + public void testCallIdAndRetry() throws Exception { + final CallInfo info = new CallInfo(); - // Override client to store the call id + // Override client to store the call info and check response final Client client = new Client(LongWritable.class, conf) { @Override Call createCall(RpcKind rpcKind, Writable rpcRequest) { final Call call = super.createCall(rpcKind, rpcRequest); - callId.id = call.id; + info.id = call.id; + info.retry = call.retry; return call; } + + @Override + void checkResponse(RpcResponseHeaderProto header) throws IOException { + super.checkResponse(header); + Assert.assertEquals(info.id, header.getCallId()); + Assert.assertEquals(info.retry, header.getRetryCount()); + } }; - // Attach a listener that tracks every call ID received by the server. + // Attach a listener that tracks every call received by the server. final TestServer server = new TestServer(1, false); server.callListener = new Runnable() { @Override public void run() { - Assert.assertEquals(callId.id, Server.getCallId()); + Assert.assertEquals(info.id, Server.getCallId()); + Assert.assertEquals(info.retry, Server.getCallRetryCount()); } };