HADOOP-9717. Add retry attempt count to the RPC requests. Contributed by Jing Zhao.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1504725 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jing Zhao 2013-07-18 23:44:06 +00:00
parent ba3d29a2e2
commit 8724ceb235
10 changed files with 219 additions and 28 deletions

View File

@ -483,6 +483,8 @@ Release 2.1.0-beta - 2013-07-02
HADOOP-9716. Rpc retries should use the same call ID as the original call.
(szetszwo)
HADOOP-9717. Add retry attempt count to the RPC requests. (jing9)
OPTIMIZATIONS
HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs

View File

@ -35,6 +35,8 @@
import org.apache.hadoop.ipc.RpcInvocationHandler;
import org.apache.hadoop.util.ThreadUtil;
import com.google.common.base.Preconditions;
class RetryInvocationHandler implements RpcInvocationHandler {
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
private final FailoverProxyProvider proxyProvider;
@ -86,7 +88,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
}
if (isRpc) {
Client.setCallId(callId);
Client.setCallIdAndRetryCount(callId, retries);
}
try {
Object ret = invokeMethod(method, args);
@ -96,8 +98,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
boolean isMethodIdempotent = proxyProvider.getInterface()
.getMethod(method.getName(), method.getParameterTypes())
.isAnnotationPresent(Idempotent.class);
RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount,
isMethodIdempotent);
RetryAction action = policy.shouldRetry(e, retries++,
invocationFailoverCount, isMethodIdempotent);
if (action.action == RetryAction.RetryDecision.FAIL) {
if (action.reason != null) {
LOG.warn("Exception while invoking " +

View File

@ -111,12 +111,16 @@ public class Client {
private static final AtomicInteger callIdCounter = new AtomicInteger();
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
/** Set call id for the next call. */
public static void setCallId(int cid) {
/** Set call id and retry count for the next call. */
public static void setCallIdAndRetryCount(int cid, int rc) {
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
Preconditions.checkState(callId.get() == null);
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
callId.set(cid);
retryCount.set(rc);
}
private Hashtable<ConnectionId, Connection> connections =
@ -281,6 +285,7 @@ Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) {
*/
static class Call {
final int id; // call id
final int retry; // retry count
final Writable rpcRequest; // the serialized rpc request
Writable rpcResponse; // null if rpc has error
IOException error; // exception, null if success
@ -298,6 +303,13 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
callId.set(null);
this.id = id;
}
final Integer rc = retryCount.get();
if (rc == null) {
this.retry = 0;
} else {
this.retry = rc;
}
}
/** Indicate when the call is complete and the
@ -865,10 +877,10 @@ private void writeConnectionContext(ConnectionId remoteId,
RPC.getProtocolName(remoteId.getProtocol()),
remoteId.getTicket(),
authMethod);
RpcRequestHeaderProto connectionContextHeader =
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
RpcRequestHeaderProto connectionContextHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID,
clientId);
RpcConstants.INVALID_RETRY_COUNT, clientId);
RpcRequestMessageWrapper request =
new RpcRequestMessageWrapper(connectionContextHeader, message);
@ -976,7 +988,8 @@ public void sendRpcRequest(final Call call)
// Items '1' and '2' are prepared here.
final DataOutputBuffer d = new DataOutputBuffer();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId);
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry,
clientId);
header.writeDelimitedTo(d);
call.rpcRequest.write(d);

View File

@ -35,6 +35,8 @@ private RpcConstants() {
public static final int CONNECTION_CONTEXT_CALL_ID = -3;
public static final int INVALID_RETRY_COUNT = -1;
/**
* The first four bytes of Hadoop RPC connections
*/

View File

@ -282,6 +282,15 @@ static int getCallId() {
Call call = CurCall.get();
return call != null ? call.callId : RpcConstants.INVALID_CALL_ID;
}
/**
* @return The current active RPC call's retry count. -1 indicates the retry
* cache is not supported in the client side.
*/
public static int getCallRetryCount() {
Call call = CurCall.get();
return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT;
}
/** Returns the remote side ip address when invoked inside an RPC
* Returns null incase of an error.
@ -456,6 +465,7 @@ public ServiceAuthorizationManager getServiceAuthorizationManager() {
/** A call queued for handling. */
private static class Call {
private final int callId; // the client's call id
private final int retryCount; // the retry count of the call
private final Writable rpcRequest; // Serialized Rpc request from client
private final Connection connection; // connection to client
private long timestamp; // time received when response is null
@ -464,14 +474,16 @@ private static class Call {
private final RPC.RpcKind rpcKind;
private final byte[] clientId;
private Call(int id, Writable param, Connection connection) {
this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
private Call(int id, int retryCount, Writable param,
Connection connection) {
this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN,
RpcConstants.DUMMY_CLIENT_ID);
}
private Call(int id, Writable param, Connection connection,
private Call(int id, int retryCount, Writable param, Connection connection,
RPC.RpcKind kind, byte[] clientId) {
this.callId = id;
this.retryCount = retryCount;
this.rpcRequest = param;
this.connection = connection;
this.timestamp = Time.now();
@ -482,7 +494,8 @@ private Call(int id, Writable param, Connection connection,
@Override
public String toString() {
return rpcRequest + " from " + connection + " Call#" + callId;
return rpcRequest + " from " + connection + " Call#" + callId + " Retry#"
+ retryCount;
}
public void setResponse(ByteBuffer response) {
@ -1162,11 +1175,12 @@ public class Connection {
// Fake 'call' for failed authorization response
private static final int AUTHORIZATION_FAILED_CALLID = -1;
private final Call authFailedCall =
new Call(AUTHORIZATION_FAILED_CALLID, null, this);
private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream();
private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this);
private final Call saslCall = new Call(AuthProtocol.SASL.callId,
RpcConstants.INVALID_RETRY_COUNT, null, this);
private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream();
private boolean sentNegotiate = false;
@ -1594,20 +1608,23 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
if (clientVersion >= 9) {
// Versions >>9 understand the normal response
Call fakeCall = new Call(-1, null, this);
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
setupResponse(buffer, fakeCall,
RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion >= 3) {
Call fakeCall = new Call(-1, null, this);
Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null,
this);
// Versions 3 to 8 use older response
setupResponseOldVersionFatal(buffer, fakeCall,
null, VersionMismatch.class.getName(), errMsg);
responder.doRespond(fakeCall);
} else if (clientVersion == 2) { // Hadoop 0.18.3
Call fakeCall = new Call(0, null, this);
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null,
this);
DataOutputStream out = new DataOutputStream(buffer);
out.writeInt(0); // call ID
out.writeBoolean(true); // error
@ -1620,7 +1637,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException {
}
private void setupHttpRequestOnIpcPortResponse() throws IOException {
Call fakeCall = new Call(0, null, this);
Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this);
fakeCall.setResponse(ByteBuffer.wrap(
RECEIVED_HTTP_REQ_RESPONSE.getBytes()));
responder.doRespond(fakeCall);
@ -1752,12 +1769,14 @@ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException,
private void processOneRpc(byte[] buf)
throws IOException, WrappedRpcServerException, InterruptedException {
int callId = -1;
int retry = RpcConstants.INVALID_RETRY_COUNT;
try {
final DataInputStream dis =
new DataInputStream(new ByteArrayInputStream(buf));
final RpcRequestHeaderProto header =
decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis);
callId = header.getCallId();
retry = header.getRetryCount();
if (LOG.isDebugEnabled()) {
LOG.debug(" got #" + callId);
}
@ -1774,7 +1793,7 @@ private void processOneRpc(byte[] buf)
}
} catch (WrappedRpcServerException wrse) { // inform client of error
Throwable ioe = wrse.getCause();
final Call call = new Call(callId, null, this);
final Call call = new Call(callId, retry, null, this);
setupResponse(authFailedResponse, call,
RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null,
ioe.getClass().getName(), ioe.getMessage());
@ -1848,9 +1867,9 @@ private void processRpcRequest(RpcRequestHeaderProto header,
RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err);
}
Call call = new Call(header.getCallId(), rpcRequest, this,
ProtoUtil.convert(header.getRpcKind()), header.getClientId()
.toByteArray());
Call call = new Call(header.getCallId(), header.getRetryCount(),
rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header
.getClientId().toByteArray());
callQueue.put(call); // queue the call; maybe blocked here
incRpcCount(); // Increment the rpc count
}

View File

@ -75,7 +75,7 @@ public class SaslRpcClient {
private static final RpcRequestHeaderProto saslHeader = ProtoUtil
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
RpcConstants.DUMMY_CLIENT_ID);
RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID);
private static final RpcSaslProto negotiateRequest =
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();

View File

@ -160,10 +160,11 @@ public static RPC.RpcKind convert( RpcKindProto kind) {
}
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
RpcRequestHeaderProto.OperationProto operation, int callId,
int retryCount, byte[] uuid) {
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
.setClientId(ByteString.copyFrom(uuid));
.setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid));
return result.build();
}
}

View File

@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
required uint32 callId = 3; // a sequence number that is sent back in response
required bytes clientId = 4; // Globally unique client ID
// clientId + callId uniquely identifies a request
// retry count, 1 means this is the first retry
optional sint32 retryCount = 5 [default = -1];
}

View File

@ -35,6 +35,8 @@
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.lang.reflect.Method;
import java.lang.reflect.Proxy;
import java.net.InetSocketAddress;
import java.net.Socket;
import java.net.SocketTimeoutException;
@ -53,6 +55,9 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryProxy;
import org.apache.hadoop.ipc.Client.ConnectionId;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.net.ConnectTimeoutException;
@ -171,6 +176,45 @@ public void run() {
}
}
/**
* A RpcInvocationHandler instance for test. Its invoke function uses the same
* {@link Client} instance, and will fail the first totalRetry times (by
* throwing an IOException).
*/
private static class TestInvocationHandler implements RpcInvocationHandler {
private static int retry = 0;
private final Client client;
private final Server server;
private final int total;
TestInvocationHandler(Client client, Server server, int total) {
this.client = client;
this.server = server;
this.total = total;
}
@Override
public Object invoke(Object proxy, Method method, Object[] args)
throws Throwable {
LongWritable param = new LongWritable(RANDOM.nextLong());
LongWritable value = (LongWritable) client.call(param,
NetUtils.getConnectAddress(server), null, null, 0, conf);
if (retry++ < total) {
throw new IOException("Fake IOException");
} else {
return value;
}
}
@Override
public void close() throws IOException {}
@Override
public ConnectionId getConnectionId() {
return null;
}
}
@Test
public void testSerial() throws Exception {
testSerial(3, false, 2, 5, 100);
@ -705,6 +749,110 @@ public void run() {
server.stop();
}
}
/** A dummy protocol */
private interface DummyProtocol {
public void dummyRun();
}
/**
* Test the retry count while used in a retry proxy.
*/
@Test
public void testRetryProxy() throws Exception {
final Client client = new Client(LongWritable.class, conf);
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
private int retryCount = 0;
@Override
public void run() {
Assert.assertEquals(retryCount++, Server.getCallRetryCount());
}
};
final int totalRetry = 256;
DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance(
DummyProtocol.class.getClassLoader(),
new Class[] { DummyProtocol.class }, new TestInvocationHandler(client,
server, totalRetry));
DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create(
DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER);
try {
server.start();
retryProxy.dummyRun();
Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
} finally {
Client.setCallIdAndRetryCount(0, 0);
client.stop();
server.stop();
}
}
/**
* Test if the rpc server gets the default retry count (0) from client.
*/
@Test
public void testInitialCallRetryCount() throws Exception {
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
@Override
public void run() {
// we have not set the retry count for the client, thus on the server
// side we should see retry count as 0
Assert.assertEquals(0, Server.getCallRetryCount());
}
};
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
assertFalse(caller.failed);
} finally {
client.stop();
server.stop();
}
}
/**
* Test if the rpc server gets the retry count from client.
*/
@Test
public void testCallRetryCount() throws Exception {
final int retryCount = 255;
// Override client to store the call id
final Client client = new Client(LongWritable.class, conf);
Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
// Attach a listener that tracks every call ID received by the server.
final TestServer server = new TestServer(1, false);
server.callListener = new Runnable() {
@Override
public void run() {
// we have not set the retry count for the client, thus on the server
// side we should see retry count as 0
Assert.assertEquals(retryCount, Server.getCallRetryCount());
}
};
try {
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
final SerialCaller caller = new SerialCaller(client, addr, 10);
caller.run();
assertFalse(caller.failed);
} finally {
client.stop();
server.stop();
}
}
/**
* Tests that client generates a unique sequential call ID for each RPC call,

View File

@ -27,6 +27,7 @@
import java.util.Arrays;
import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.RpcConstants;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
import org.junit.Test;
@ -79,7 +80,8 @@ private void doVarIntTest(int value) throws IOException {
public void testRpcClientId() {
byte[] uuid = StringUtils.getUuidBytes();
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0,
RpcConstants.INVALID_RETRY_COUNT, uuid);
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
}
}