diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f14e91818a..6167a22377 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -354,6 +354,8 @@ Release 2.1.0-beta - 2013-07-02 HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide negotiation capabilities (daryn) + HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh) + NEW FEATURES HADOOP-9283. Add support for running the Hadoop client on AIX. (atm) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 3782eef61a..8dcbbae04c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -82,6 +82,7 @@ import org.apache.hadoop.security.token.TokenSelector; import org.apache.hadoop.util.ProtoUtil; import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.Time; import com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -113,8 +114,7 @@ public class Client { private final int connectionTimeout; private final boolean fallbackAllowed; - - final static int PING_CALL_ID = -1; + private final byte[] uuid; /** * Executor on which IPC calls' parameters are sent. Deferring @@ -759,8 +759,8 @@ private void writeConnectionHeader(OutputStream outStream) throws IOException { DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream)); // Write out the header, version and authentication method - out.write(Server.HEADER.array()); - out.write(Server.CURRENT_VERSION); + out.write(RpcConstants.HEADER.array()); + out.write(RpcConstants.CURRENT_VERSION); out.write(serviceClass); final AuthProtocol authProtocol; switch (authMethod) { @@ -837,7 +837,7 @@ private synchronized void sendPing() throws IOException { if ( curTime - lastActivity.get() >= pingInterval) { lastActivity.set(curTime); synchronized (out) { - out.writeInt(PING_CALL_ID); + out.writeInt(RpcConstants.PING_CALL_ID); out.flush(); } } @@ -892,7 +892,7 @@ public void sendRpcRequest(final Call call) // Items '1' and '2' are prepared here. final DataOutputBuffer d = new DataOutputBuffer(); RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( - call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id); + call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, uuid); header.writeDelimitedTo(d); call.rpcRequest.write(d); @@ -1092,6 +1092,7 @@ public Client(Class valueClass, Configuration conf, CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT); this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY, CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT); + this.uuid = StringUtils.getUuidBytes(); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index ca2b484008..63d9a5ec8b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -72,6 +72,7 @@ import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableUtils; +import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper; import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper; import org.apache.hadoop.ipc.RPC.RpcInvoker; @@ -158,11 +159,7 @@ boolean isTerse(Class t) { return terseExceptions.contains(t.toString()); } } - - /** - * The first four bytes of Hadoop RPC connections - */ - public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes()); + /** * If the user accidentally sends an HTTP GET to an IPC port, we detect this @@ -180,17 +177,6 @@ boolean isTerse(Class t) { "Content-type: text/plain\r\n\r\n" + "It looks like you are making an HTTP request to a Hadoop IPC port. " + "This is not the correct port for the web interface on this daemon.\r\n"; - - // 1 : Introduce ping and server does not throw away RPCs - // 3 : Introduce the protocol into the RPC connection header - // 4 : Introduced SASL security layer - // 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal} - // in ObjectWritable to efficiently transmit arrays of primitives - // 6 : Made RPC Request header explicit - // 7 : Changed Ipc Connection Header to use Protocol buffers - // 8 : SASL server always sends a final response - // 9 : Changes to protocol for HADOOP-8990 - public static final byte CURRENT_VERSION = 9; /** * Initial and max size of response buffer @@ -294,6 +280,15 @@ public static InetAddress getRemoteIp() { } return null; } + + /** + * Returns the clientId from the current RPC request + */ + public static byte[] getClientId() { + Call call = CurCall.get(); + return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID; + } + /** Returns remote address as a string when invoked inside an RPC. * Returns null in case of an error. */ @@ -454,17 +449,22 @@ private static class Call { // time served when response is not null private ByteBuffer rpcResponse; // the response for this call private final RPC.RpcKind rpcKind; + private final byte[] clientId; public Call(int id, Writable param, Connection connection) { - this( id, param, connection, RPC.RpcKind.RPC_BUILTIN ); + this(id, param, connection, RPC.RpcKind.RPC_BUILTIN, + RpcConstants.DUMMY_CLIENT_ID); } - public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) { + + public Call(int id, Writable param, Connection connection, + RPC.RpcKind kind, byte[] clientId) { this.callId = id; this.rpcRequest = param; this.connection = connection; this.timestamp = Time.now(); this.rpcResponse = null; this.rpcKind = kind; + this.clientId = clientId; } @Override @@ -1430,8 +1430,9 @@ public int readAndProcess() throws IOException, InterruptedException { setupHttpRequestOnIpcPortResponse(); return -1; } - - if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) { + + if (!RpcConstants.HEADER.equals(dataLengthBuffer) + || version != CURRENT_VERSION) { //Warning is ok since this is not supposed to happen. LOG.warn("Incorrect header or version mismatch from " + hostAddress + ":" + remotePort + @@ -1453,7 +1454,7 @@ public int readAndProcess() throws IOException, InterruptedException { if (data == null) { dataLengthBuffer.flip(); dataLength = dataLengthBuffer.getInt(); - if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) { + if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) { // covers the !useSasl too dataLengthBuffer.clear(); return 0; // ping message @@ -1664,7 +1665,7 @@ private void processUnwrappedData(byte[] inBuf) throws IOException, unwrappedDataLengthBuffer.flip(); int unwrappedDataLength = unwrappedDataLengthBuffer.getInt(); - if (unwrappedDataLength == Client.PING_CALL_ID) { + if (unwrappedDataLength == RpcConstants.PING_CALL_ID) { if (LOG.isDebugEnabled()) LOG.debug("Received ping message"); unwrappedDataLengthBuffer.clear(); @@ -1773,8 +1774,9 @@ private void processRpcRequest(byte[] buf) throw new RpcServerException(err, t); } - Call call = new Call(header.getCallId(), rpcRequest, this, - ProtoUtil.convert(header.getRpcKind())); + Call call = new Call(header.getCallId(), rpcRequest, this, + ProtoUtil.convert(header.getRpcKind()), header.getClientId() + .toByteArray()); callQueue.put(call); // queue the call; maybe blocked here incRpcCount(); // Increment the rpc count } @@ -2125,7 +2127,7 @@ private void setupResponse(ByteArrayOutputStream responseBuf, RpcResponseHeaderProto.newBuilder(); headerBuilder.setCallId(call.callId); headerBuilder.setStatus(status); - headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION); + headerBuilder.setServerIpcVersionNum(CURRENT_VERSION); if (status == RpcStatusProto.SUCCESS) { RpcResponseHeaderProto header = headerBuilder.build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java index 372e13b5a1..a72bfbfeab 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/security/SaslRpcClient.java @@ -46,6 +46,7 @@ import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RpcConstants; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; @@ -71,9 +72,10 @@ public class SaslRpcClient { private final AuthMethod authMethod; private final SaslClient saslClient; private final boolean fallbackAllowed; - private static final RpcRequestHeaderProto saslHeader = - ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, - OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId); + private static final RpcRequestHeaderProto saslHeader = ProtoUtil + .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, + OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId, + RpcConstants.DUMMY_CLIENT_ID); private static final RpcSaslProto negotiateRequest = RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build(); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java index ac6c572b34..7e32ffa27b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ProtoUtil.java @@ -28,6 +28,8 @@ import org.apache.hadoop.security.SaslRpcServer.AuthMethod; import org.apache.hadoop.security.UserGroupInformation; +import com.google.protobuf.ByteString; + public abstract class ProtoUtil { /** @@ -158,9 +160,10 @@ public static RPC.RpcKind convert( RpcKindProto kind) { } public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind, - RpcRequestHeaderProto.OperationProto operation, int callId) { + RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) { RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder(); - result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId); + result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId) + .setClientId(ByteString.copyFrom(uuid)); return result.build(); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index e57d53c70f..1f8e57a859 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -22,6 +22,7 @@ import java.io.StringWriter; import java.net.URI; import java.net.URISyntaxException; +import java.nio.ByteBuffer; import java.text.DateFormat; import java.util.ArrayList; import java.util.Arrays; @@ -32,6 +33,7 @@ import java.util.Locale; import java.util.Map; import java.util.StringTokenizer; +import java.util.UUID; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -894,4 +896,15 @@ public static String replaceTokens(String template, Pattern pattern, matcher.appendTail(sb); return sb.toString(); } + + /** + * Return a new UUID as byte[] + */ + public static byte[] getUuidBytes() { + UUID uuid = UUID.randomUUID(); + ByteBuffer buf = ByteBuffer.wrap(new byte[16]); + buf.putLong(uuid.getMostSignificantBits()); + buf.putLong(uuid.getLeastSignificantBits()); + return buf.array(); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto index 872f29db73..92f7fbe2ff 100644 --- a/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto +++ b/hadoop-common-project/hadoop-common/src/main/proto/RpcHeader.proto @@ -62,7 +62,9 @@ message RpcRequestHeaderProto { // the header for the RpcRequest optional RpcKindProto rpcKind = 1; optional OperationProto rpcOp = 2; - required uint32 callId = 3; // each rpc has a callId that is also used in response + required uint32 callId = 3; // a sequence number that is sent back in response + required bytes clientId = 4; // Globally unique client ID + // clientId + callId uniquely identifies a request } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 24b20f89ef..68b8fb765b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -718,7 +718,7 @@ private static abstract class NetworkTraces { "6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n"); final static String HADOOP0_18_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 2"; /** @@ -757,7 +757,7 @@ private static abstract class NetworkTraces { "00 14 .. \n"); final static String HADOOP0_20_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 3"; @@ -772,7 +772,7 @@ private static abstract class NetworkTraces { final static String HADOOP0_21_ERROR_MSG = - "Server IPC version " + Server.CURRENT_VERSION + + "Server IPC version " + RpcConstants.CURRENT_VERSION + " cannot communicate with client version 4"; final static byte[] HADOOP_0_21_0_RPC_DUMP = diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java index 2ec56eb5ea..547b7fe110 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestProtoBufRpc.java @@ -72,6 +72,10 @@ public static class PBServerImpl implements TestRpcService { @Override public EmptyResponseProto ping(RpcController unused, EmptyRequestProto request) throws ServiceException { + // Ensure clientId is received + byte[] clientId = Server.getClientId(); + Assert.assertNotNull(Server.getClientId()); + Assert.assertEquals(16, clientId.length); return EmptyResponseProto.newBuilder().build(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java index 89f67ee8b4..22200d6c1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/util/TestProtoUtil.java @@ -17,13 +17,18 @@ */ package org.apache.hadoop.util; +import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertEquals; import java.io.ByteArrayInputStream; import java.io.ByteArrayOutputStream; import java.io.DataInputStream; import java.io.IOException; +import java.util.Arrays; +import org.apache.hadoop.ipc.RPC.RpcKind; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto; import org.junit.Test; import com.google.protobuf.CodedOutputStream; @@ -69,4 +74,12 @@ private void doVarIntTest(int value) throws IOException { new ByteArrayInputStream(baos.toByteArray())); assertEquals(value, ProtoUtil.readRawVarint32(dis)); } + + @Test + public void testRpcClientId() { + byte[] uuid = StringUtils.getUuidBytes(); + RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader( + RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid); + assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray())); + } }