HADOOP-9194. RPC Support for QoS. (Junping Du via llu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461370 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Luke Lu 2013-03-26 23:29:09 +00:00
parent fdf1e6e07e
commit 5319818487
5 changed files with 123 additions and 16 deletions

View File

@ -17,6 +17,8 @@ Trunk (Unreleased)
HADOOP-9380 Add totalLength to rpc response (sanjay Radia) HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
HADOOP-9194. RPC Support for QoS. (Junping Du via llu)
NEW FEATURES NEW FEATURES
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child

View File

@ -257,6 +257,7 @@ private class Connection extends Thread {
private final ConnectionId remoteId; // connection id private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method private AuthMethod authMethod; // authentication method
private Token<? extends TokenIdentifier> token; private Token<? extends TokenIdentifier> token;
private int serviceClass;
private SaslRpcClient saslRpcClient; private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket private Socket socket = null; // connected socket
@ -279,7 +280,7 @@ private class Connection extends Thread {
private final Object sendRpcRequestLock = new Object(); private final Object sendRpcRequestLock = new Object();
public Connection(ConnectionId remoteId) throws IOException { public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
this.remoteId = remoteId; this.remoteId = remoteId;
this.server = remoteId.getAddress(); this.server = remoteId.getAddress();
if (server.isUnresolved()) { if (server.isUnresolved()) {
@ -296,6 +297,7 @@ public Connection(ConnectionId remoteId) throws IOException {
this.tcpNoDelay = remoteId.getTcpNoDelay(); this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing(); this.doPing = remoteId.getDoPing();
this.pingInterval = remoteId.getPingInterval(); this.pingInterval = remoteId.getPingInterval();
this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms."); LOG.debug("The ping interval is " + this.pingInterval + " ms.");
} }
@ -747,7 +749,9 @@ private void handleConnectionFailure(int curRetries, IOException ioe
* +----------------------------------+ * +----------------------------------+
* | "hrpc" 4 bytes | * | "hrpc" 4 bytes |
* +----------------------------------+ * +----------------------------------+
* | Version (1 bytes) | * | Version (1 byte) |
* +----------------------------------+
* | Service Class (1 byte) |
* +----------------------------------+ * +----------------------------------+
* | Authmethod (1 byte) | * | Authmethod (1 byte) |
* +----------------------------------+ * +----------------------------------+
@ -760,6 +764,7 @@ private void writeConnectionHeader(OutputStream outStream)
// Write out the header, version and authentication method // Write out the header, version and authentication method
out.write(Server.HEADER.array()); out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION); out.write(Server.CURRENT_VERSION);
out.write(serviceClass);
authMethod.write(out); authMethod.write(out);
Server.IpcSerializationType.PROTOBUF.write(out); Server.IpcSerializationType.PROTOBUF.write(out);
out.flush(); out.flush();
@ -1192,6 +1197,20 @@ public Writable call(Writable param, InetSocketAddress addr,
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId); return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
} }
/**
* Same as {@link #call(Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that specifying serviceClass.
*/
public Writable call(Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout, int serviceClass, Configuration conf)
throws InterruptedException, IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
}
/** /**
* Make a call, passing <code>param</code>, to the IPC server running at * Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol, * <code>address</code> which is servicing the <code>protocol</code> protocol,
@ -1231,8 +1250,26 @@ public Writable call(Writable param, ConnectionId remoteId)
*/ */
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest, public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws InterruptedException, IOException { ConnectionId remoteId) throws InterruptedException, IOException {
return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
}
/**
* Make a call, passing <code>rpcRequest</code>, to the IPC server defined by
* <code>remoteId</code>, returning the rpc respond.
*
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
* @param serviceClass - service class for RPC
* @returns the rpc response
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId, int serviceClass)
throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest); Call call = new Call(rpcKind, rpcRequest);
Connection connection = getConnection(remoteId, call); Connection connection = getConnection(remoteId, call, serviceClass);
try { try {
connection.sendRpcRequest(call); // send the rpc request connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) { } catch (RejectedExecutionException e) {
@ -1289,7 +1326,7 @@ Set<ConnectionId> getConnectionIds() {
/** Get a connection from the pool, or create a new one and add it to the /** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */ * pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId, private Connection getConnection(ConnectionId remoteId,
Call call) Call call, int serviceClass)
throws IOException, InterruptedException { throws IOException, InterruptedException {
if (!running.get()) { if (!running.get()) {
// the client is stopped // the client is stopped
@ -1304,7 +1341,7 @@ private Connection getConnection(ConnectionId remoteId,
synchronized (connections) { synchronized (connections) {
connection = connections.get(remoteId); connection = connections.get(remoteId);
if (connection == null) { if (connection == null) {
connection = new Connection(remoteId); connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection); connections.put(remoteId, connection);
} }
} }

View File

@ -77,12 +77,12 @@
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" }) @InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@InterfaceStability.Evolving @InterfaceStability.Evolving
public class RPC { public class RPC {
final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind { public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
private static final short FIRST_INDEX = RPC_BUILTIN.value;
public final short value; //TODO make it private public final short value; //TODO make it private
RpcKind(short val) { RpcKind(short val) {

View File

@ -438,6 +438,11 @@ Iterable<? extends Thread> getHandlers() {
return Arrays.asList(handlers); return Arrays.asList(handlers);
} }
@VisibleForTesting
List<Connection> getConnections() {
return connectionList;
}
/** /**
* Refresh the service authorization ACL for the service handled by this server. * Refresh the service authorization ACL for the service handled by this server.
*/ */
@ -1104,6 +1109,7 @@ public class Connection {
private ByteBuffer connectionHeaderBuf = null; private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData; private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer; private ByteBuffer unwrappedDataLengthBuffer;
private int serviceClass;
UserGroupInformation user = null; UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth public UserGroupInformation attemptingUser = null; // user name before auth
@ -1314,14 +1320,17 @@ public int readAndProcess() throws IOException, InterruptedException {
if (!connectionHeaderRead) { if (!connectionHeaderRead) {
//Every connection is expected to send the header. //Every connection is expected to send the header.
if (connectionHeaderBuf == null) { if (connectionHeaderBuf == null) {
connectionHeaderBuf = ByteBuffer.allocate(3); connectionHeaderBuf = ByteBuffer.allocate(4);
} }
count = channelRead(channel, connectionHeaderBuf); count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) { if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count; return count;
} }
int version = connectionHeaderBuf.get(0); int version = connectionHeaderBuf.get(0);
byte[] method = new byte[] {connectionHeaderBuf.get(1)}; // TODO we should add handler for service class later
this.setServiceClass(connectionHeaderBuf.get(1));
byte[] method = new byte[] {connectionHeaderBuf.get(2)};
authMethod = AuthMethod.read(new DataInputStream( authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method))); new ByteArrayInputStream(method)));
dataLengthBuffer.flip(); dataLengthBuffer.flip();
@ -1345,7 +1354,7 @@ public int readAndProcess() throws IOException, InterruptedException {
} }
IpcSerializationType serializationType = IpcSerializationType IpcSerializationType serializationType = IpcSerializationType
.fromByte(connectionHeaderBuf.get(2)); .fromByte(connectionHeaderBuf.get(3));
if (serializationType != IpcSerializationType.PROTOBUF) { if (serializationType != IpcSerializationType.PROTOBUF) {
respondUnsupportedSerialization(serializationType); respondUnsupportedSerialization(serializationType);
return -1; return -1;
@ -1735,6 +1744,22 @@ private boolean authorizeConnection() throws IOException {
return true; return true;
} }
/**
* Get service class for connection
* @return the serviceClass
*/
public int getServiceClass() {
return serviceClass;
}
/**
* Set service class for connection
* @param serviceClass the serviceClass to set
*/
public void setServiceClass(int serviceClass) {
this.serviceClass = serviceClass;
}
private synchronized void close() throws IOException { private synchronized void close() throws IOException {
disposeSasl(); disposeSasl();
data = null; data = null;

View File

@ -25,6 +25,7 @@
import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable; import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.ConnectTimeoutException; import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
@ -520,11 +521,53 @@ public void testIpcConnectTimeout() throws Exception {
} }
} }
/**
* Check service class byte in IPC header is correct on wire.
*/
@Test(timeout=60000)
public void testIpcWithServiceClass() throws Exception {
// start server
Server server = new TestServer(5, false);
InetSocketAddress addr = NetUtils.getConnectAddress(server);
server.start();
// start client
Client.setConnectTimeout(conf, 10000);
callAndVerify(server, addr, 0, true);
// Service Class is low to -128 as byte on wire.
// -128 shouldn't be casted on wire but -129 should.
callAndVerify(server, addr, -128, true);
callAndVerify(server, addr, -129, false);
// Service Class is up to 127.
// 127 shouldn't be casted on wire but 128 should.
callAndVerify(server, addr, 127, true);
callAndVerify(server, addr, 128, false);
server.stop();
}
/**
* Make a call from a client and verify if header info is changed in server side
*/
private void callAndVerify(Server server, InetSocketAddress addr,
int serviceClass, boolean noChanged) throws Exception{
Client client = new Client(LongWritable.class, conf);
client.call(new LongWritable(RANDOM.nextLong()),
addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
Connection connection = server.getConnections().get(0);
int serviceClass2 = connection.getServiceClass();
assertFalse(noChanged ^ serviceClass == serviceClass2);
client.stop();
}
/** /**
* Check that file descriptors aren't leaked by starting * Check that file descriptors aren't leaked by starting
* and stopping IPC servers. * and stopping IPC servers.
*/ */
@Test @Test(timeout=60000)
public void testSocketLeak() throws Exception { public void testSocketLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux Assume.assumeTrue(FD_DIR.exists()); // only run on Linux