HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1505036 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
3c6e5b900f
commit
9ff01d6261
@ -475,6 +475,9 @@ Release 2.1.0-beta - 2013-07-02
|
|||||||
|
|
||||||
HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
|
HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
|
||||||
|
|
||||||
|
HADOOP-9751. Add clientId and retryCount to RpcResponseHeaderProto.
|
||||||
|
(szetszwo)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs
|
||||||
|
@ -35,6 +35,7 @@
|
|||||||
import java.net.SocketTimeoutException;
|
import java.net.SocketTimeoutException;
|
||||||
import java.net.UnknownHostException;
|
import java.net.UnknownHostException;
|
||||||
import java.security.PrivilegedExceptionAction;
|
import java.security.PrivilegedExceptionAction;
|
||||||
|
import java.util.Arrays;
|
||||||
import java.util.Hashtable;
|
import java.util.Hashtable;
|
||||||
import java.util.Iterator;
|
import java.util.Iterator;
|
||||||
import java.util.Map.Entry;
|
import java.util.Map.Entry;
|
||||||
@ -276,6 +277,24 @@ synchronized boolean isZeroReference() {
|
|||||||
return refCount==0;
|
return refCount==0;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/** Check the rpc response header. */
|
||||||
|
void checkResponse(RpcResponseHeaderProto header) throws IOException {
|
||||||
|
if (header == null) {
|
||||||
|
throw new IOException("Response is null.");
|
||||||
|
}
|
||||||
|
if (header.hasClientId()) {
|
||||||
|
// check client IDs
|
||||||
|
final byte[] id = header.getClientId().toByteArray();
|
||||||
|
if (!Arrays.equals(id, RpcConstants.DUMMY_CLIENT_ID)) {
|
||||||
|
if (!Arrays.equals(id, clientId)) {
|
||||||
|
throw new IOException("Client IDs not matched: local ID="
|
||||||
|
+ StringUtils.byteToHexString(clientId) + ", ID in reponse="
|
||||||
|
+ StringUtils.byteToHexString(header.getClientId().toByteArray()));
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
|
Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
|
||||||
return new Call(rpcKind, rpcRequest);
|
return new Call(rpcKind, rpcRequest);
|
||||||
}
|
}
|
||||||
@ -1054,9 +1073,8 @@ private void receiveRpcResponse() {
|
|||||||
int totalLen = in.readInt();
|
int totalLen = in.readInt();
|
||||||
RpcResponseHeaderProto header =
|
RpcResponseHeaderProto header =
|
||||||
RpcResponseHeaderProto.parseDelimitedFrom(in);
|
RpcResponseHeaderProto.parseDelimitedFrom(in);
|
||||||
if (header == null) {
|
checkResponse(header);
|
||||||
throw new IOException("Response is null.");
|
|
||||||
}
|
|
||||||
int headerLen = header.getSerializedSize();
|
int headerLen = header.getSerializedSize();
|
||||||
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
|
headerLen += CodedOutputStream.computeRawVarint32Size(headerLen);
|
||||||
|
|
||||||
|
@ -2289,7 +2289,9 @@ private void setupResponse(ByteArrayOutputStream responseBuf,
|
|||||||
DataOutputStream out = new DataOutputStream(responseBuf);
|
DataOutputStream out = new DataOutputStream(responseBuf);
|
||||||
RpcResponseHeaderProto.Builder headerBuilder =
|
RpcResponseHeaderProto.Builder headerBuilder =
|
||||||
RpcResponseHeaderProto.newBuilder();
|
RpcResponseHeaderProto.newBuilder();
|
||||||
|
headerBuilder.setClientId(ByteString.copyFrom(call.clientId));
|
||||||
headerBuilder.setCallId(call.callId);
|
headerBuilder.setCallId(call.callId);
|
||||||
|
headerBuilder.setRetryCount(call.retryCount);
|
||||||
headerBuilder.setStatus(status);
|
headerBuilder.setStatus(status);
|
||||||
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
||||||
|
|
||||||
|
@ -130,6 +130,8 @@ message RpcResponseHeaderProto {
|
|||||||
optional string exceptionClassName = 4; // if request fails
|
optional string exceptionClassName = 4; // if request fails
|
||||||
optional string errorMsg = 5; // if request fails, often contains strack trace
|
optional string errorMsg = 5; // if request fails, often contains strack trace
|
||||||
optional RpcErrorCodeProto errorDetail = 6; // in case of error
|
optional RpcErrorCodeProto errorDetail = 6; // in case of error
|
||||||
|
optional bytes clientId = 7; // Globally unique client ID
|
||||||
|
optional sint32 retryCount = 8 [default = -1];
|
||||||
}
|
}
|
||||||
|
|
||||||
message RpcSaslProto {
|
message RpcSaslProto {
|
||||||
@ -153,4 +155,4 @@ message RpcSaslProto {
|
|||||||
required SaslState state = 2;
|
required SaslState state = 2;
|
||||||
optional bytes token = 3;
|
optional bytes token = 3;
|
||||||
repeated SaslAuth auths = 4;
|
repeated SaslAuth auths = 4;
|
||||||
}
|
}
|
||||||
|
@ -60,6 +60,7 @@
|
|||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||||
import org.apache.hadoop.ipc.Server.Connection;
|
import org.apache.hadoop.ipc.Server.Connection;
|
||||||
|
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto;
|
||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.net.NetUtils;
|
import org.apache.hadoop.net.NetUtils;
|
||||||
import org.apache.hadoop.util.StringUtils;
|
import org.apache.hadoop.util.StringUtils;
|
||||||
@ -708,33 +709,45 @@ public void testConnectionRetriesOnSocketTimeoutExceptions() throws Exception {
|
|||||||
assertRetriesOnSocketTimeouts(conf, 4);
|
assertRetriesOnSocketTimeouts(conf, 4);
|
||||||
}
|
}
|
||||||
|
|
||||||
private static class CallId {
|
private static class CallInfo {
|
||||||
int id = RpcConstants.INVALID_CALL_ID;
|
int id = RpcConstants.INVALID_CALL_ID;
|
||||||
|
int retry = RpcConstants.INVALID_RETRY_COUNT;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test if the rpc server uses the call id generated by the rpc client.
|
* Test if
|
||||||
|
* (1) the rpc server uses the call id/retry provided by the rpc client, and
|
||||||
|
* (2) the rpc client receives the same call id/retry from the rpc server.
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test
|
||||||
public void testCallIds() throws Exception {
|
public void testCallIdAndRetry() throws Exception {
|
||||||
final CallId callId = new CallId();
|
final CallInfo info = new CallInfo();
|
||||||
|
|
||||||
// Override client to store the call id
|
// Override client to store the call info and check response
|
||||||
final Client client = new Client(LongWritable.class, conf) {
|
final Client client = new Client(LongWritable.class, conf) {
|
||||||
@Override
|
@Override
|
||||||
Call createCall(RpcKind rpcKind, Writable rpcRequest) {
|
Call createCall(RpcKind rpcKind, Writable rpcRequest) {
|
||||||
final Call call = super.createCall(rpcKind, rpcRequest);
|
final Call call = super.createCall(rpcKind, rpcRequest);
|
||||||
callId.id = call.id;
|
info.id = call.id;
|
||||||
|
info.retry = call.retry;
|
||||||
return call;
|
return call;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
void checkResponse(RpcResponseHeaderProto header) throws IOException {
|
||||||
|
super.checkResponse(header);
|
||||||
|
Assert.assertEquals(info.id, header.getCallId());
|
||||||
|
Assert.assertEquals(info.retry, header.getRetryCount());
|
||||||
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
// Attach a listener that tracks every call ID received by the server.
|
// Attach a listener that tracks every call received by the server.
|
||||||
final TestServer server = new TestServer(1, false);
|
final TestServer server = new TestServer(1, false);
|
||||||
server.callListener = new Runnable() {
|
server.callListener = new Runnable() {
|
||||||
@Override
|
@Override
|
||||||
public void run() {
|
public void run() {
|
||||||
Assert.assertEquals(callId.id, Server.getCallId());
|
Assert.assertEquals(info.id, Server.getCallId());
|
||||||
|
Assert.assertEquals(info.retry, Server.getCallRetryCount());
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user