HADOOP-9688. Add globally unique Client ID to RPC requests. Contributed by Suresh Srinivas.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1500843 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
4175e41548
commit
eb2a603387
@ -354,6 +354,8 @@ Release 2.1.0-beta - 2013-07-02
|
||||
HADOOP-9421. [RPC v9] Convert SASL to use ProtoBuf and provide
|
||||
negotiation capabilities (daryn)
|
||||
|
||||
HADOOP-9688. Add globally unique Client ID to RPC requests. (suresh)
|
||||
|
||||
NEW FEATURES
|
||||
|
||||
HADOOP-9283. Add support for running the Hadoop client on AIX. (atm)
|
||||
|
@ -82,6 +82,7 @@
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.apache.hadoop.util.ProtoUtil;
|
||||
import org.apache.hadoop.util.ReflectionUtils;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
|
||||
import com.google.common.util.concurrent.ThreadFactoryBuilder;
|
||||
@ -113,8 +114,7 @@ public class Client {
|
||||
private final int connectionTimeout;
|
||||
|
||||
private final boolean fallbackAllowed;
|
||||
|
||||
final static int PING_CALL_ID = -1;
|
||||
private final byte[] uuid;
|
||||
|
||||
/**
|
||||
* Executor on which IPC calls' parameters are sent. Deferring
|
||||
@ -759,8 +759,8 @@ private void writeConnectionHeader(OutputStream outStream)
|
||||
throws IOException {
|
||||
DataOutputStream out = new DataOutputStream(new BufferedOutputStream(outStream));
|
||||
// Write out the header, version and authentication method
|
||||
out.write(Server.HEADER.array());
|
||||
out.write(Server.CURRENT_VERSION);
|
||||
out.write(RpcConstants.HEADER.array());
|
||||
out.write(RpcConstants.CURRENT_VERSION);
|
||||
out.write(serviceClass);
|
||||
final AuthProtocol authProtocol;
|
||||
switch (authMethod) {
|
||||
@ -837,7 +837,7 @@ private synchronized void sendPing() throws IOException {
|
||||
if ( curTime - lastActivity.get() >= pingInterval) {
|
||||
lastActivity.set(curTime);
|
||||
synchronized (out) {
|
||||
out.writeInt(PING_CALL_ID);
|
||||
out.writeInt(RpcConstants.PING_CALL_ID);
|
||||
out.flush();
|
||||
}
|
||||
}
|
||||
@ -892,7 +892,7 @@ public void sendRpcRequest(final Call call)
|
||||
// Items '1' and '2' are prepared here.
|
||||
final DataOutputBuffer d = new DataOutputBuffer();
|
||||
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id);
|
||||
call.rpcKind, OperationProto.RPC_FINAL_PACKET, call.id, uuid);
|
||||
header.writeDelimitedTo(d);
|
||||
call.rpcRequest.write(d);
|
||||
|
||||
@ -1092,6 +1092,7 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
|
||||
CommonConfigurationKeys.IPC_CLIENT_CONNECT_TIMEOUT_DEFAULT);
|
||||
this.fallbackAllowed = conf.getBoolean(CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_KEY,
|
||||
CommonConfigurationKeys.IPC_CLIENT_FALLBACK_TO_SIMPLE_AUTH_ALLOWED_DEFAULT);
|
||||
this.uuid = StringUtils.getUuidBytes();
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -72,6 +72,7 @@
|
||||
import org.apache.hadoop.io.DataOutputBuffer;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.WritableUtils;
|
||||
import static org.apache.hadoop.ipc.RpcConstants.CURRENT_VERSION;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcRequestMessageWrapper;
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseWrapper;
|
||||
import org.apache.hadoop.ipc.RPC.RpcInvoker;
|
||||
@ -158,11 +159,7 @@ boolean isTerse(Class<?> t) {
|
||||
return terseExceptions.contains(t.toString());
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* The first four bytes of Hadoop RPC connections
|
||||
*/
|
||||
public static final ByteBuffer HEADER = ByteBuffer.wrap("hrpc".getBytes());
|
||||
|
||||
|
||||
/**
|
||||
* If the user accidentally sends an HTTP GET to an IPC port, we detect this
|
||||
@ -180,17 +177,6 @@ boolean isTerse(Class<?> t) {
|
||||
"Content-type: text/plain\r\n\r\n" +
|
||||
"It looks like you are making an HTTP request to a Hadoop IPC port. " +
|
||||
"This is not the correct port for the web interface on this daemon.\r\n";
|
||||
|
||||
// 1 : Introduce ping and server does not throw away RPCs
|
||||
// 3 : Introduce the protocol into the RPC connection header
|
||||
// 4 : Introduced SASL security layer
|
||||
// 5 : Introduced use of {@link ArrayPrimitiveWritable$Internal}
|
||||
// in ObjectWritable to efficiently transmit arrays of primitives
|
||||
// 6 : Made RPC Request header explicit
|
||||
// 7 : Changed Ipc Connection Header to use Protocol buffers
|
||||
// 8 : SASL server always sends a final response
|
||||
// 9 : Changes to protocol for HADOOP-8990
|
||||
public static final byte CURRENT_VERSION = 9;
|
||||
|
||||
/**
|
||||
* Initial and max size of response buffer
|
||||
@ -294,6 +280,15 @@ public static InetAddress getRemoteIp() {
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the clientId from the current RPC request
|
||||
*/
|
||||
public static byte[] getClientId() {
|
||||
Call call = CurCall.get();
|
||||
return call != null ? call.clientId : RpcConstants.DUMMY_CLIENT_ID;
|
||||
}
|
||||
|
||||
/** Returns remote address as a string when invoked inside an RPC.
|
||||
* Returns null in case of an error.
|
||||
*/
|
||||
@ -454,17 +449,22 @@ private static class Call {
|
||||
// time served when response is not null
|
||||
private ByteBuffer rpcResponse; // the response for this call
|
||||
private final RPC.RpcKind rpcKind;
|
||||
private final byte[] clientId;
|
||||
|
||||
public Call(int id, Writable param, Connection connection) {
|
||||
this( id, param, connection, RPC.RpcKind.RPC_BUILTIN );
|
||||
this(id, param, connection, RPC.RpcKind.RPC_BUILTIN,
|
||||
RpcConstants.DUMMY_CLIENT_ID);
|
||||
}
|
||||
public Call(int id, Writable param, Connection connection, RPC.RpcKind kind) {
|
||||
|
||||
public Call(int id, Writable param, Connection connection,
|
||||
RPC.RpcKind kind, byte[] clientId) {
|
||||
this.callId = id;
|
||||
this.rpcRequest = param;
|
||||
this.connection = connection;
|
||||
this.timestamp = Time.now();
|
||||
this.rpcResponse = null;
|
||||
this.rpcKind = kind;
|
||||
this.clientId = clientId;
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -1430,8 +1430,9 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
setupHttpRequestOnIpcPortResponse();
|
||||
return -1;
|
||||
}
|
||||
|
||||
if (!HEADER.equals(dataLengthBuffer) || version != CURRENT_VERSION) {
|
||||
|
||||
if (!RpcConstants.HEADER.equals(dataLengthBuffer)
|
||||
|| version != CURRENT_VERSION) {
|
||||
//Warning is ok since this is not supposed to happen.
|
||||
LOG.warn("Incorrect header or version mismatch from " +
|
||||
hostAddress + ":" + remotePort +
|
||||
@ -1453,7 +1454,7 @@ public int readAndProcess() throws IOException, InterruptedException {
|
||||
if (data == null) {
|
||||
dataLengthBuffer.flip();
|
||||
dataLength = dataLengthBuffer.getInt();
|
||||
if ((dataLength == Client.PING_CALL_ID) && (!useWrap)) {
|
||||
if ((dataLength == RpcConstants.PING_CALL_ID) && (!useWrap)) {
|
||||
// covers the !useSasl too
|
||||
dataLengthBuffer.clear();
|
||||
return 0; // ping message
|
||||
@ -1664,7 +1665,7 @@ private void processUnwrappedData(byte[] inBuf) throws IOException,
|
||||
unwrappedDataLengthBuffer.flip();
|
||||
int unwrappedDataLength = unwrappedDataLengthBuffer.getInt();
|
||||
|
||||
if (unwrappedDataLength == Client.PING_CALL_ID) {
|
||||
if (unwrappedDataLength == RpcConstants.PING_CALL_ID) {
|
||||
if (LOG.isDebugEnabled())
|
||||
LOG.debug("Received ping message");
|
||||
unwrappedDataLengthBuffer.clear();
|
||||
@ -1773,8 +1774,9 @@ private void processRpcRequest(byte[] buf)
|
||||
throw new RpcServerException(err, t);
|
||||
}
|
||||
|
||||
Call call = new Call(header.getCallId(), rpcRequest, this,
|
||||
ProtoUtil.convert(header.getRpcKind()));
|
||||
Call call = new Call(header.getCallId(), rpcRequest, this,
|
||||
ProtoUtil.convert(header.getRpcKind()), header.getClientId()
|
||||
.toByteArray());
|
||||
callQueue.put(call); // queue the call; maybe blocked here
|
||||
incRpcCount(); // Increment the rpc count
|
||||
}
|
||||
@ -2125,7 +2127,7 @@ private void setupResponse(ByteArrayOutputStream responseBuf,
|
||||
RpcResponseHeaderProto.newBuilder();
|
||||
headerBuilder.setCallId(call.callId);
|
||||
headerBuilder.setStatus(status);
|
||||
headerBuilder.setServerIpcVersionNum(Server.CURRENT_VERSION);
|
||||
headerBuilder.setServerIpcVersionNum(CURRENT_VERSION);
|
||||
|
||||
if (status == RpcStatusProto.SUCCESS) {
|
||||
RpcResponseHeaderProto header = headerBuilder.build();
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.ipc.ProtobufRpcEngine.RpcResponseMessageWrapper;
|
||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RpcConstants;
|
||||
import org.apache.hadoop.ipc.Server.AuthProtocol;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||
@ -71,9 +72,10 @@ public class SaslRpcClient {
|
||||
private final AuthMethod authMethod;
|
||||
private final SaslClient saslClient;
|
||||
private final boolean fallbackAllowed;
|
||||
private static final RpcRequestHeaderProto saslHeader =
|
||||
ProtoUtil.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId);
|
||||
private static final RpcRequestHeaderProto saslHeader = ProtoUtil
|
||||
.makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER,
|
||||
OperationProto.RPC_FINAL_PACKET, AuthProtocol.SASL.callId,
|
||||
RpcConstants.DUMMY_CLIENT_ID);
|
||||
private static final RpcSaslProto negotiateRequest =
|
||||
RpcSaslProto.newBuilder().setState(SaslState.NEGOTIATE).build();
|
||||
|
||||
|
@ -28,6 +28,8 @@
|
||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
|
||||
import com.google.protobuf.ByteString;
|
||||
|
||||
public abstract class ProtoUtil {
|
||||
|
||||
/**
|
||||
@ -158,9 +160,10 @@ public static RPC.RpcKind convert( RpcKindProto kind) {
|
||||
}
|
||||
|
||||
public static RpcRequestHeaderProto makeRpcRequestHeader(RPC.RpcKind rpcKind,
|
||||
RpcRequestHeaderProto.OperationProto operation, int callId) {
|
||||
RpcRequestHeaderProto.OperationProto operation, int callId, byte[] uuid) {
|
||||
RpcRequestHeaderProto.Builder result = RpcRequestHeaderProto.newBuilder();
|
||||
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId);
|
||||
result.setRpcKind(convert(rpcKind)).setRpcOp(operation).setCallId(callId)
|
||||
.setClientId(ByteString.copyFrom(uuid));
|
||||
return result.build();
|
||||
}
|
||||
}
|
||||
|
@ -22,6 +22,7 @@
|
||||
import java.io.StringWriter;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
import java.nio.ByteBuffer;
|
||||
import java.text.DateFormat;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
@ -32,6 +33,7 @@
|
||||
import java.util.Locale;
|
||||
import java.util.Map;
|
||||
import java.util.StringTokenizer;
|
||||
import java.util.UUID;
|
||||
import java.util.regex.Matcher;
|
||||
import java.util.regex.Pattern;
|
||||
|
||||
@ -894,4 +896,15 @@ public static String replaceTokens(String template, Pattern pattern,
|
||||
matcher.appendTail(sb);
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
/**
|
||||
* Return a new UUID as byte[]
|
||||
*/
|
||||
public static byte[] getUuidBytes() {
|
||||
UUID uuid = UUID.randomUUID();
|
||||
ByteBuffer buf = ByteBuffer.wrap(new byte[16]);
|
||||
buf.putLong(uuid.getMostSignificantBits());
|
||||
buf.putLong(uuid.getLeastSignificantBits());
|
||||
return buf.array();
|
||||
}
|
||||
}
|
||||
|
@ -62,7 +62,9 @@ message RpcRequestHeaderProto { // the header for the RpcRequest
|
||||
|
||||
optional RpcKindProto rpcKind = 1;
|
||||
optional OperationProto rpcOp = 2;
|
||||
required uint32 callId = 3; // each rpc has a callId that is also used in response
|
||||
required uint32 callId = 3; // a sequence number that is sent back in response
|
||||
required bytes clientId = 4; // Globally unique client ID
|
||||
// clientId + callId uniquely identifies a request
|
||||
}
|
||||
|
||||
|
||||
|
@ -718,7 +718,7 @@ private static abstract class NetworkTraces {
|
||||
"6f 6e 67 00 00 00 00 00 00 00 0a ong..... ... \n");
|
||||
|
||||
final static String HADOOP0_18_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
||||
" cannot communicate with client version 2";
|
||||
|
||||
/**
|
||||
@ -757,7 +757,7 @@ private static abstract class NetworkTraces {
|
||||
"00 14 .. \n");
|
||||
|
||||
final static String HADOOP0_20_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
||||
" cannot communicate with client version 3";
|
||||
|
||||
|
||||
@ -772,7 +772,7 @@ private static abstract class NetworkTraces {
|
||||
|
||||
|
||||
final static String HADOOP0_21_ERROR_MSG =
|
||||
"Server IPC version " + Server.CURRENT_VERSION +
|
||||
"Server IPC version " + RpcConstants.CURRENT_VERSION +
|
||||
" cannot communicate with client version 4";
|
||||
|
||||
final static byte[] HADOOP_0_21_0_RPC_DUMP =
|
||||
|
@ -72,6 +72,10 @@ public static class PBServerImpl implements TestRpcService {
|
||||
@Override
|
||||
public EmptyResponseProto ping(RpcController unused,
|
||||
EmptyRequestProto request) throws ServiceException {
|
||||
// Ensure clientId is received
|
||||
byte[] clientId = Server.getClientId();
|
||||
Assert.assertNotNull(Server.getClientId());
|
||||
Assert.assertEquals(16, clientId.length);
|
||||
return EmptyResponseProto.newBuilder().build();
|
||||
}
|
||||
|
||||
|
@ -17,13 +17,18 @@
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.DataInputStream;
|
||||
import java.io.IOException;
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.hadoop.ipc.RPC.RpcKind;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcRequestHeaderProto.OperationProto;
|
||||
import org.junit.Test;
|
||||
|
||||
import com.google.protobuf.CodedOutputStream;
|
||||
@ -69,4 +74,12 @@ private void doVarIntTest(int value) throws IOException {
|
||||
new ByteArrayInputStream(baos.toByteArray()));
|
||||
assertEquals(value, ProtoUtil.readRawVarint32(dis));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcClientId() {
|
||||
byte[] uuid = StringUtils.getUuidBytes();
|
||||
RpcRequestHeaderProto header = ProtoUtil.makeRpcRequestHeader(
|
||||
RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, 0, uuid);
|
||||
assertTrue(Arrays.equals(uuid, header.getClientId().toByteArray()));
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user