diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index e45708ac03..bd1331dae7 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -483,6 +483,8 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9716. Rpc retries should use the same call ID as the original call. (szetszwo) + HADOOP-9717. Add retry attempt count to the RPC requests. (jing9) + OPTIMIZATIONS HADOOP-9150. Avoid unnecessary DNS resolution attempts for logical URIs diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index e28a369717..9d46d71141 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -35,6 +35,8 @@ import org.apache.hadoop.ipc.RpcInvocationHandler; import org.apache.hadoop.util.ThreadUtil; +import com.google.common.base.Preconditions; + class RetryInvocationHandler implements RpcInvocationHandler { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private final FailoverProxyProvider proxyProvider; @@ -86,7 +88,7 @@ public Object invoke(Object proxy, Method method, Object[] args) } if (isRpc) { - Client.setCallId(callId); + Client.setCallIdAndRetryCount(callId, retries); } try { Object ret = invokeMethod(method, args); @@ -96,8 +98,8 @@ public Object invoke(Object proxy, Method method, Object[] args) boolean isMethodIdempotent = proxyProvider.getInterface() .getMethod(method.getName(), method.getParameterTypes()) .isAnnotationPresent(Idempotent.class); - RetryAction action = policy.shouldRetry(e, retries++, invocationFailoverCount, - isMethodIdempotent); + RetryAction action = policy.shouldRetry(e, retries++, + invocationFailoverCount, isMethodIdempotent); if (action.action == RetryAction.RetryDecision.FAIL) { if (action.reason != null) { LOG.warn("Exception while invoking " + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ecd9bb6e6b..99a721abee 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -111,12 +111,16 @@ public class Client { private static final AtomicInteger callIdCounter = new AtomicInteger(); private static final ThreadLocal callId = new ThreadLocal(); + private static final ThreadLocal retryCount = new ThreadLocal(); - /** Set call id for the next call. */ - public static void setCallId(int cid) { + /** Set call id and retry count for the next call. */ + public static void setCallIdAndRetryCount(int cid, int rc) { Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkState(callId.get() == null); + Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); + callId.set(cid); + retryCount.set(rc); } private Hashtable connections = @@ -281,6 +285,7 @@ Call createCall(RPC.RpcKind rpcKind, Writable rpcRequest) { */ static class Call { final int id; // call id + final int retry; // retry count final Writable rpcRequest; // the serialized rpc request Writable rpcResponse; // null if rpc has error IOException error; // exception, null if success @@ -298,6 +303,13 @@ private Call(RPC.RpcKind rpcKind, Writable param) { callId.set(null); this.id = id; } + + final Integer rc = retryCount.get(); + if (rc == null) { + this.retry = 0; + } else { + this.retry = rc; + } } /** Indicate when the call is complete and the @@ -865,10 +877,10 @@ private void writeConnectionContext(ConnectionId remoteId, RPC.getProtocolName(remoteId.getProtocol()), remoteId.getTicket(), authMethod); - RpcRequestHeaderProto connectionContextHeader = - ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + RpcRequestHeaderProto connectionContextHeader = ProtoUtil + .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, CONNECTION_CONTEXT_CALL_ID, - clientId); + RpcConstants.INVALID_RETRY_COUNT, clientId); RpcRequestMessageWrapper request = new RpcRequestMessageWrapper(connectionContextHeader, message); @@ -976,7 +988,8 @@ public void sendRpcRequest(final Call call) // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, clientId); + call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, call.retry, + clientId); header.writeDelimitedTo(d); call.rpcRequest.write(d); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java index 4bd7da8c97..14cc06e0d0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcConstants.java @@ -35,6 +35,8 @@ private RpcConstants() { public static final int CONNECTION_CONTEXT_CALL_ID = -3; + public static final int INVALID_RETRY_COUNT = -1; + /** * The first four bytes of Hadoop RPC connections */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index b1a55bbcb9..7812ecda8e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -282,6 +282,15 @@ static int getCallId() { Call call = CurCall.get(); return call != null ? call.callId : RpcConstants.INVALID_CALL_ID; } + + /** + * @return The current active RPC call's retry count. -1 indicates the retry + * cache is not supported in the client side. + */ + public static int getCallRetryCount() { + Call call = CurCall.get(); + return call != null ? call.retryCount : RpcConstants.INVALID_RETRY_COUNT; + } /** Returns the remote side ip address when invoked inside an RPC * Returns null incase of an error. @@ -456,6 +465,7 @@ public ServiceAuthorizationManager getServiceAuthorizationManager() { /** A call queued for handling. */ private static class Call { private final int callId; // the client's call id + private final int retryCount; // the retry count of the call private final Writable rpcRequest; // Serialized Rpc request from client private final Connection connection; // connection to client private long timestamp; // time received when response is null @@ -464,14 +474,16 @@ private static class Call { private final RPC.RpcKind rpcKind; private final byte[] clientId; - private Call(int id, Writable param, Connection connection) { - this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, + private Call(int id, int retryCount, Writable param, + Connection connection) { + this(id, retryCount, param, connection, RPC.RpcKind.RPC_BUILTIN, RpcConstants.DUMMY_CLIENT_ID); } - private Call(int id, Writable param, Connection connection, + private Call(int id, int retryCount, Writable param, Connection connection, RPC.RpcKind kind, byte[] clientId) { this.callId = id; + this.retryCount = retryCount; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); @@ -482,7 +494,8 @@ private Call(int id, Writable param, Connection connection, @Override public String toString() { - return rpcRequest + " from " + connection + " Call#" + callId; + return rpcRequest + " from " + connection + " Call#" + callId + " Retry#" + + retryCount; } public void setResponse(ByteBuffer response) { @@ -1162,11 +1175,12 @@ public class Connection { // Fake 'call' for failed authorization response private static final int AUTHORIZATION_FAILED_CALLID = -1; - private final Call authFailedCall = - new Call(AUTHORIZATION_FAILED_CALLID, null, this); + private final Call authFailedCall = new Call(AUTHORIZATION_FAILED_CALLID, + RpcConstants.INVALID_RETRY_COUNT, null, this); private ByteArrayOutputStream authFailedResponse = new ByteArrayOutputStream(); - private final Call saslCall = new Call(AuthProtocol.SASL.callId, null, this); + private final Call saslCall = new Call(AuthProtocol.SASL.callId, + RpcConstants.INVALID_RETRY_COUNT, null, this); private final ByteArrayOutputStream saslResponse = new ByteArrayOutputStream(); private boolean sentNegotiate = false; @@ -1594,20 +1608,23 @@ private void setupBadVersionResponse(int clientVersion) throws IOException { if (clientVersion >= 9) { // Versions >>9 understand the normal response - Call fakeCall = new Call(-1, null, this); + Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, + this); setupResponse(buffer, fakeCall, RpcStatusProto.FATAL, RpcErrorCodeProto.FATAL_VERSION_MISMATCH, null, VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); } else if (clientVersion >= 3) { - Call fakeCall = new Call(-1, null, this); + Call fakeCall = new Call(-1, RpcConstants.INVALID_RETRY_COUNT, null, + this); // Versions 3 to 8 use older response setupResponseOldVersionFatal(buffer, fakeCall, null, VersionMismatch.class.getName(), errMsg); responder.doRespond(fakeCall); } else if (clientVersion == 2) { // Hadoop 0.18.3 - Call fakeCall = new Call(0, null, this); + Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, + this); DataOutputStream out = new DataOutputStream(buffer); out.writeInt(0); // call ID out.writeBoolean(true); // error @@ -1620,7 +1637,7 @@ private void setupBadVersionResponse(int clientVersion) throws IOException { } private void setupHttpRequestOnIpcPortResponse() throws IOException { - Call fakeCall = new Call(0, null, this); + Call fakeCall = new Call(0, RpcConstants.INVALID_RETRY_COUNT, null, this); fakeCall.setResponse(ByteBuffer.wrap( RECEIVED_HTTP_REQ_RESPONSE.getBytes())); responder.doRespond(fakeCall); @@ -1752,12 +1769,14 @@ private void unwrapPacketAndProcessRpcs(byte[] inBuf) throws IOException, private void processOneRpc(byte[] buf) throws IOException, WrappedRpcServerException, InterruptedException { int callId = -1; + int retry = RpcConstants.INVALID_RETRY_COUNT; try { final DataInputStream dis = new DataInputStream(new ByteArrayInputStream(buf)); final RpcRequestHeaderProto header = decodeProtobufFromStream(RpcRequestHeaderProto.newBuilder(), dis); callId = header.getCallId(); + retry = header.getRetryCount(); if (LOG.isDebugEnabled()) { LOG.debug(" got #" + callId); } @@ -1774,7 +1793,7 @@ private void processOneRpc(byte[] buf) } } catch (WrappedRpcServerException wrse) { // inform client of error Throwable ioe = wrse.getCause(); - final Call call = new Call(callId, null, this); + final Call call = new Call(callId, retry, null, this); setupResponse(authFailedResponse, call, RpcStatusProto.FATAL, wrse.getRpcErrorCodeProto(), null, ioe.getClass().getName(), ioe.getMessage()); @@ -1848,9 +1867,9 @@ private void processRpcRequest(RpcRequestHeaderProto header, RpcErrorCodeProto.FATAL_DESERIALIZING_REQUEST, err); } - Call call = new Call(header.getCallId(), rpcRequest, this, - ProtoUtil.convert(header.getRpcKind()), header.getClientId() - .toByteArray()); + Call call = new Call(header.getCallId(), header.getRetryCount(), + rpcRequest, this, ProtoUtil.convert(header.getRpcKind()), header + .getClientId().toByteArray()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index a72bfbfeab..aacd792794 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -75,7 +75,7 @@ public class SaslRpcClient { private static final RpcRequestHeaderProto saslHeader = ProtoUtil .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, - RpcConstants.DUMMY_CLIENT_ID); + RpcConstants.INVALID_RETRY_COUNT, RpcConstants.DUMMY_CLIENT_ID); private static final RpcSaslProto negotiateRequest = RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index 7e32ffa27b..79f8692842 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -160,10 +160,11 @@ public static RPC.RpcKind convert( RpcKindProto kind) { } public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, - RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { + RpcRequestHeaderProto.OperationProto operation, int callId, + int retryCount, byte[] uuid) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) - .setClientId(ByteString.copyFrom(uuid)); + .setRetryCount(retryCount).setClientId(ByteString.copyFrom(uuid)); return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 92f7fbe2ff..6b013b5900 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -65,6 +65,8 @@ message RpcRequestHeaderProto { // the header for the RpcRequest required uint32 callId = 3; // a sequence number that is sent back in response required bytes clientId = 4; // Globally unique client ID // clientId + callId uniquely identifies a request + // retry count, 1 means this is the first retry + optional sint32 retryCount = 5 [default = -1]; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index d74d285a67..d8544e0466 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -35,6 +35,8 @@ import java.io.IOException; import java.io.InputStream; import java.io.OutputStream; +import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.net.InetSocketAddress; import java.net.Socket; import java.net.SocketTimeoutException; @@ -53,6 +55,9 @@ import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.Connection; import org.apache.hadoop.net.ConnectTimeoutException; @@ -171,6 +176,45 @@ public void run() { } } + /** + * A RpcInvocationHandler instance for test. Its invoke function uses the same + * {@link Client} instance, and will fail the first totalRetry times (by + * throwing an IOException). + */ + private static class TestInvocationHandler implements RpcInvocationHandler { + private static int retry = 0; + private final Client client; + private final Server server; + private final int total; + + TestInvocationHandler(Client client, Server server, int total) { + this.client = client; + this.server = server; + this.total = total; + } + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + LongWritable param = new LongWritable(RANDOM.nextLong()); + LongWritable value = (LongWritable) client.call(param, + NetUtils.getConnectAddress(server), null, null, 0, conf); + if (retry++ < total) { + throw new IOException("Fake IOException"); + } else { + return value; + } + } + + @Override + public void close() throws IOException {} + + @Override + public ConnectionId getConnectionId() { + return null; + } + } + @Test public void testSerial() throws Exception { testSerial(3, false, 2, 5, 100); @@ -705,6 +749,110 @@ public void run() { server.stop(); } } + + /** A dummy protocol */ + private interface DummyProtocol { + public void dummyRun(); + } + + /** + * Test the retry count while used in a retry proxy. + */ + @Test + public void testRetryProxy() throws Exception { + final Client client = new Client(LongWritable.class, conf); + + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + private int retryCount = 0; + @Override + public void run() { + Assert.assertEquals(retryCount++, Server.getCallRetryCount()); + } + }; + + final int totalRetry = 256; + DummyProtocol proxy = (DummyProtocol) Proxy.newProxyInstance( + DummyProtocol.class.getClassLoader(), + new Class[] { DummyProtocol.class }, new TestInvocationHandler(client, + server, totalRetry)); + DummyProtocol retryProxy = (DummyProtocol) RetryProxy.create( + DummyProtocol.class, proxy, RetryPolicies.RETRY_FOREVER); + + try { + server.start(); + retryProxy.dummyRun(); + Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1); + } finally { + Client.setCallIdAndRetryCount(0, 0); + client.stop(); + server.stop(); + } + } + + /** + * Test if the rpc server gets the default retry count (0) from client. + */ + @Test + public void testInitialCallRetryCount() throws Exception { + // Override client to store the call id + final Client client = new Client(LongWritable.class, conf); + + // Attach a listener that tracks every call ID received by the server. + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + @Override + public void run() { + // we have not set the retry count for the client, thus on the server + // side we should see retry count as 0 + Assert.assertEquals(0, Server.getCallRetryCount()); + } + }; + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + final SerialCaller caller = new SerialCaller(client, addr, 10); + caller.run(); + assertFalse(caller.failed); + } finally { + client.stop(); + server.stop(); + } + } + + /** + * Test if the rpc server gets the retry count from client. + */ + @Test + public void testCallRetryCount() throws Exception { + final int retryCount = 255; + // Override client to store the call id + final Client client = new Client(LongWritable.class, conf); + Client.setCallIdAndRetryCount(Client.nextCallId(), 255); + + // Attach a listener that tracks every call ID received by the server. + final TestServer server = new TestServer(1, false); + server.callListener = new Runnable() { + @Override + public void run() { + // we have not set the retry count for the client, thus on the server + // side we should see retry count as 0 + Assert.assertEquals(retryCount, Server.getCallRetryCount()); + } + }; + + try { + InetSocketAddress addr = NetUtils.getConnectAddress(server); + server.start(); + final SerialCaller caller = new SerialCaller(client, addr, 10); + caller.run(); + assertFalse(caller.failed); + } finally { + client.stop(); + server.stop(); + } + } /** * Tests that client generates a unique sequential call ID for each RPC call, diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 22200d6c1f..5d55f89d7b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -27,6 +27,7 @@ import java.util.Arrays; import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.junit.Test; @@ -79,7 +80,8 @@ private void doVarIntTest(int value) throws IOException { public void testRpcClientId() { byte[] uuid = StringUtils.getUuidBytes(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); + RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, + RpcConstants.INVALID_RETRY_COUNT, uuid); assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); } }