diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt
index ad2b336fb3..a1ac0440be 100644
--- a/hadoop-common-project/hadoop-common/CHANGES.txt
+++ b/hadoop-common-project/hadoop-common/CHANGES.txt
@@ -17,6 +17,8 @@ Trunk (Unreleased)
HADOOP-9380 Add totalLength to rpc response (sanjay Radia)
+ HADOOP-9194. RPC Support for QoS. (Junping Du via llu)
+
NEW FEATURES
HADOOP-8561. Introduce HADOOP_PROXY_USER for secure impersonation in child
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
index 5294aa3b94..986b1a2c15 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java
@@ -257,6 +257,7 @@ private class Connection extends Thread {
private final ConnectionId remoteId; // connection id
private AuthMethod authMethod; // authentication method
private Token extends TokenIdentifier> token;
+ private int serviceClass;
private SaslRpcClient saslRpcClient;
private Socket socket = null; // connected socket
@@ -279,7 +280,7 @@ private class Connection extends Thread {
private final Object sendRpcRequestLock = new Object();
- public Connection(ConnectionId remoteId) throws IOException {
+ public Connection(ConnectionId remoteId, int serviceClass) throws IOException {
this.remoteId = remoteId;
this.server = remoteId.getAddress();
if (server.isUnresolved()) {
@@ -296,6 +297,7 @@ public Connection(ConnectionId remoteId) throws IOException {
this.tcpNoDelay = remoteId.getTcpNoDelay();
this.doPing = remoteId.getDoPing();
this.pingInterval = remoteId.getPingInterval();
+ this.serviceClass = serviceClass;
if (LOG.isDebugEnabled()) {
LOG.debug("The ping interval is " + this.pingInterval + " ms.");
}
@@ -747,7 +749,9 @@ private void handleConnectionFailure(int curRetries, IOException ioe
* +----------------------------------+
* | "hrpc" 4 bytes |
* +----------------------------------+
- * | Version (1 bytes) |
+ * | Version (1 byte) |
+ * +----------------------------------+
+ * | Service Class (1 byte) |
* +----------------------------------+
* | Authmethod (1 byte) |
* +----------------------------------+
@@ -760,6 +764,7 @@ private void writeConnectionHeader(OutputStream outStream)
// Write out the header, version and authentication method
out.write(Server.HEADER.array());
out.write(Server.CURRENT_VERSION);
+ out.write(serviceClass);
authMethod.write(out);
Server.IpcSerializationType.PROTOBUF.write(out);
out.flush();
@@ -1179,19 +1184,33 @@ public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr
/**
- * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
+ * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)}
* except that rpcKind is writable.
*/
- public Writable call(Writable param, InetSocketAddress addr,
+ public Writable call(Writable param, InetSocketAddress addr,
Class> protocol, UserGroupInformation ticket,
- int rpcTimeout, Configuration conf)
+ int rpcTimeout, Configuration conf)
throws InterruptedException, IOException {
- ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+ ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
+ /**
+ * Same as {@link #call(Writable, InetSocketAddress,
+ * Class, UserGroupInformation, int, Configuration)}
+ * except that specifying serviceClass.
+ */
+ public Writable call(Writable param, InetSocketAddress addr,
+ Class> protocol, UserGroupInformation ticket,
+ int rpcTimeout, int serviceClass, Configuration conf)
+ throws InterruptedException, IOException {
+ ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
+ ticket, rpcTimeout, conf);
+ return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId, serviceClass);
+ }
+
/**
* Make a call, passing param
, to the IPC server running at
* address
which is servicing the protocol
protocol,
@@ -1218,10 +1237,10 @@ public Writable call(Writable param, ConnectionId remoteId)
return call(RPC.RpcKind.RPC_BUILTIN, param, remoteId);
}
- /**
+ /**
* Make a call, passing rpcRequest
, to the IPC server defined by
* remoteId
, returning the rpc respond.
- *
+ *
* @param rpcKind
* @param rpcRequest - contains serialized method and method parameters
* @param remoteId - the target rpc server
@@ -1231,8 +1250,26 @@ public Writable call(Writable param, ConnectionId remoteId)
*/
public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
ConnectionId remoteId) throws InterruptedException, IOException {
+ return call(rpcKind, rpcRequest, remoteId, RPC.RPC_SERVICE_CLASS_DEFAULT);
+ }
+
+ /**
+ * Make a call, passing rpcRequest
, to the IPC server defined by
+ * remoteId
, returning the rpc respond.
+ *
+ * @param rpcKind
+ * @param rpcRequest - contains serialized method and method parameters
+ * @param remoteId - the target rpc server
+ * @param serviceClass - service class for RPC
+ * @returns the rpc response
+ * Throws exceptions if there are network problems or if the remote code
+ * threw an exception.
+ */
+ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
+ ConnectionId remoteId, int serviceClass)
+ throws InterruptedException, IOException {
Call call = new Call(rpcKind, rpcRequest);
- Connection connection = getConnection(remoteId, call);
+ Connection connection = getConnection(remoteId, call, serviceClass);
try {
connection.sendRpcRequest(call); // send the rpc request
} catch (RejectedExecutionException e) {
@@ -1289,7 +1326,7 @@ Set getConnectionIds() {
/** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId,
- Call call)
+ Call call, int serviceClass)
throws IOException, InterruptedException {
if (!running.get()) {
// the client is stopped
@@ -1304,7 +1341,7 @@ private Connection getConnection(ConnectionId remoteId,
synchronized (connections) {
connection = connections.get(remoteId);
if (connection == null) {
- connection = new Connection(remoteId);
+ connection = new Connection(remoteId, serviceClass);
connections.put(remoteId, connection);
}
}
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
index 8ee22aa415..3563e07dba 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java
@@ -77,12 +77,12 @@
@InterfaceAudience.LimitedPrivate(value = { "Common", "HDFS", "MapReduce", "Yarn" })
@InterfaceStability.Evolving
public class RPC {
+ final static int RPC_SERVICE_CLASS_DEFAULT = 0;
public enum RpcKind {
RPC_BUILTIN ((short) 1), // Used for built in calls by tests
RPC_WRITABLE ((short) 2), // Use WritableRpcEngine
RPC_PROTOCOL_BUFFER ((short) 3); // Use ProtobufRpcEngine
final static short MAX_INDEX = RPC_PROTOCOL_BUFFER.value; // used for array size
- private static final short FIRST_INDEX = RPC_BUILTIN.value;
public final short value; //TODO make it private
RpcKind(short val) {
diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
index c44eb9426d..f76690c27f 100644
--- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
+++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java
@@ -438,6 +438,11 @@ Iterable extends Thread> getHandlers() {
return Arrays.asList(handlers);
}
+ @VisibleForTesting
+ List getConnections() {
+ return connectionList;
+ }
+
/**
* Refresh the service authorization ACL for the service handled by this server.
*/
@@ -1104,6 +1109,7 @@ public class Connection {
private ByteBuffer connectionHeaderBuf = null;
private ByteBuffer unwrappedData;
private ByteBuffer unwrappedDataLengthBuffer;
+ private int serviceClass;
UserGroupInformation user = null;
public UserGroupInformation attemptingUser = null; // user name before auth
@@ -1314,14 +1320,17 @@ public int readAndProcess() throws IOException, InterruptedException {
if (!connectionHeaderRead) {
//Every connection is expected to send the header.
if (connectionHeaderBuf == null) {
- connectionHeaderBuf = ByteBuffer.allocate(3);
+ connectionHeaderBuf = ByteBuffer.allocate(4);
}
count = channelRead(channel, connectionHeaderBuf);
if (count < 0 || connectionHeaderBuf.remaining() > 0) {
return count;
}
int version = connectionHeaderBuf.get(0);
- byte[] method = new byte[] {connectionHeaderBuf.get(1)};
+ // TODO we should add handler for service class later
+ this.setServiceClass(connectionHeaderBuf.get(1));
+
+ byte[] method = new byte[] {connectionHeaderBuf.get(2)};
authMethod = AuthMethod.read(new DataInputStream(
new ByteArrayInputStream(method)));
dataLengthBuffer.flip();
@@ -1345,7 +1354,7 @@ public int readAndProcess() throws IOException, InterruptedException {
}
IpcSerializationType serializationType = IpcSerializationType
- .fromByte(connectionHeaderBuf.get(2));
+ .fromByte(connectionHeaderBuf.get(3));
if (serializationType != IpcSerializationType.PROTOBUF) {
respondUnsupportedSerialization(serializationType);
return -1;
@@ -1735,6 +1744,22 @@ private boolean authorizeConnection() throws IOException {
return true;
}
+ /**
+ * Get service class for connection
+ * @return the serviceClass
+ */
+ public int getServiceClass() {
+ return serviceClass;
+ }
+
+ /**
+ * Set service class for connection
+ * @param serviceClass the serviceClass to set
+ */
+ public void setServiceClass(int serviceClass) {
+ this.serviceClass = serviceClass;
+ }
+
private synchronized void close() throws IOException {
disposeSasl();
data = null;
diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
index 3847bfd081..24b20f89ef 100644
--- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
+++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java
@@ -25,6 +25,7 @@
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.ipc.Server.Connection;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.net.ConnectTimeoutException;
import org.apache.hadoop.net.NetUtils;
@@ -520,11 +521,53 @@ public void testIpcConnectTimeout() throws Exception {
}
}
+ /**
+ * Check service class byte in IPC header is correct on wire.
+ */
+ @Test(timeout=60000)
+ public void testIpcWithServiceClass() throws Exception {
+ // start server
+ Server server = new TestServer(5, false);
+ InetSocketAddress addr = NetUtils.getConnectAddress(server);
+ server.start();
+
+ // start client
+ Client.setConnectTimeout(conf, 10000);
+
+ callAndVerify(server, addr, 0, true);
+ // Service Class is low to -128 as byte on wire.
+ // -128 shouldn't be casted on wire but -129 should.
+ callAndVerify(server, addr, -128, true);
+ callAndVerify(server, addr, -129, false);
+
+ // Service Class is up to 127.
+ // 127 shouldn't be casted on wire but 128 should.
+ callAndVerify(server, addr, 127, true);
+ callAndVerify(server, addr, 128, false);
+
+ server.stop();
+ }
+
+ /**
+ * Make a call from a client and verify if header info is changed in server side
+ */
+ private void callAndVerify(Server server, InetSocketAddress addr,
+ int serviceClass, boolean noChanged) throws Exception{
+ Client client = new Client(LongWritable.class, conf);
+
+ client.call(new LongWritable(RANDOM.nextLong()),
+ addr, null, null, MIN_SLEEP_TIME, serviceClass, conf);
+ Connection connection = server.getConnections().get(0);
+ int serviceClass2 = connection.getServiceClass();
+ assertFalse(noChanged ^ serviceClass == serviceClass2);
+ client.stop();
+ }
+
/**
* Check that file descriptors aren't leaked by starting
* and stopping IPC servers.
*/
- @Test
+ @Test(timeout=60000)
public void testSocketLeak() throws Exception {
Assume.assumeTrue(FD_DIR.exists()); // only run on Linux