From 8774f178686487007dcf8c418c989b785a529000 Mon Sep 17 00:00:00 2001 From: xuzq <15040255127@163.com> Date: Sat, 16 Jul 2022 05:18:46 +0800 Subject: [PATCH] HADOOP-13144. Enhancing IPC client throughput via multiple connections per user (#4542) --- .../java/org/apache/hadoop/ipc/Client.java | 8 +- .../apache/hadoop/ipc/ProtobufRpcEngine.java | 10 +++ .../apache/hadoop/ipc/ProtobufRpcEngine2.java | 10 +++ .../main/java/org/apache/hadoop/ipc/RPC.java | 23 +++++ .../java/org/apache/hadoop/ipc/RpcEngine.java | 16 ++++ .../apache/hadoop/ipc/WritableRpcEngine.java | 21 +++++ .../java/org/apache/hadoop/ipc/TestRPC.java | 56 ++++++++++++ .../org/apache/hadoop/ipc/TestRpcBase.java | 89 +++++++++++++++++-- 8 files changed, 226 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 2fe8aca85e..c4b96bffd8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -1716,7 +1716,7 @@ public static class ConnectionId { private String saslQop; // here for testing private final Configuration conf; // used to get the expected kerberos principal name - ConnectionId(InetSocketAddress address, Class protocol, + public ConnectionId(InetSocketAddress address, Class protocol, UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy, Configuration conf) { this.protocol = protocol; @@ -1760,7 +1760,7 @@ UserGroupInformation getTicket() { return ticket; } - private int getRpcTimeout() { + int getRpcTimeout() { return rpcTimeout; } @@ -1794,6 +1794,10 @@ boolean getDoPing() { int getPingInterval() { return pingInterval; } + + RetryPolicy getRetryPolicy() { + return connectionRetryPolicy; + } @VisibleForTesting String getSaslQop() { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index e53f57b1fc..d904aa8133 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -77,6 +77,16 @@ public static AsyncGet getAsyncReturnMessage() { return ASYNC_RETURN_MESSAGE.get(); } + @Override + @SuppressWarnings("unchecked") + public ProtocolProxy getProxy(Class protocol, long clientVersion, + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + final Invoker invoker = new Invoker(protocol, connId, conf, factory); + return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[] {protocol}, invoker), false); + } + public ProtocolProxy getProxy(Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory, int rpcTimeout) throws IOException { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java index 3a8c627582..336bf061aa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine2.java @@ -100,6 +100,16 @@ public ProtocolProxy getProxy( rpcTimeout, connectionRetryPolicy, null, null); } + @Override + @SuppressWarnings("unchecked") + public ProtocolProxy getProxy(Class protocol, long clientVersion, + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + final Invoker invoker = new Invoker(protocol, connId, conf, factory); + return new ProtocolProxy(protocol, (T) Proxy.newProxyInstance( + protocol.getClassLoader(), new Class[] {protocol}, invoker), false); + } + @Override @SuppressWarnings("unchecked") public ProtocolProxy getProxy(Class protocol, long clientVersion, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 818305b316..7f35b13aec 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -541,6 +541,29 @@ public static ProtocolProxy getProtocolProxy(Class protocol, return getProtocolProxy(protocol, clientVersion, addr, ticket, conf, factory, getRpcTimeout(conf), null); } + + /** + * Get a protocol proxy that contains a proxy connection to a remote server + * and a set of methods that are supported by the server. + * + * @param Generics Type T + * @param protocol protocol class + * @param clientVersion client's version + * @param connId client connection identifier + * @param conf configuration + * @param factory socket factory + * @return the protocol proxy + * @throws IOException if the far end through a RemoteException + */ + public static ProtocolProxy getProtocolProxy(Class protocol, + long clientVersion, ConnectionId connId, Configuration conf, + SocketFactory factory) throws IOException { + if (UserGroupInformation.isSecurityEnabled()) { + SaslRpcServer.init(conf); + } + return getProtocolEngine(protocol, conf).getProxy( + protocol, clientVersion, connId, conf, factory); + } /** * Construct a client-side proxy that implements the named protocol, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java index afc9d035b0..1f0ff2d99d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcEngine.java @@ -57,6 +57,22 @@ ProtocolProxy getProxy(Class protocol, SocketFactory factory, int rpcTimeout, RetryPolicy connectionRetryPolicy) throws IOException; + /** + * Construct a client-side proxy object with a ConnectionId. + * + * @param Generics Type T. + * @param protocol input protocol. + * @param clientVersion input clientVersion. + * @param connId input ConnectionId. + * @param conf input Configuration. + * @param factory input factory. + * @throws IOException raised on errors performing I/O. + * @return ProtocolProxy. + */ + ProtocolProxy getProxy(Class protocol, long clientVersion, + Client.ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException; + /** * Construct a client-side proxy object. * diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java index 21181f860d..2a19ad29a6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/WritableRpcEngine.java @@ -306,6 +306,27 @@ public ProtocolProxy getProxy(Class protocol, long clientVersion, rpcTimeout, connectionRetryPolicy, null, null); } + /** + * Construct a client-side proxy object with a ConnectionId. + * + * @param Generics Type T. + * @param protocol input protocol. + * @param clientVersion input clientVersion. + * @param connId input ConnectionId. + * @param conf input Configuration. + * @param factory input factory. + * @throws IOException raised on errors performing I/O. + * @return ProtocolProxy. + */ + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + Client.ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + return getProxy(protocol, clientVersion, connId.getAddress(), + connId.ticket, conf, factory, connId.getRpcTimeout(), + connId.getRetryPolicy(), null, null); + } + /** * Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 5caabd22a8..ed11c9c503 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.io.retry.RetryUtils; import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.thirdparty.com.google.common.util.concurrent.ThreadFactoryBuilder; @@ -289,6 +291,13 @@ public ProtocolProxy getProxy( rpcTimeout, connectionRetryPolicy, null, null); } + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + ConnectionId connId, Configuration conf, SocketFactory factory) + throws IOException { + throw new UnsupportedOperationException("This proxy is not supported"); + } + @SuppressWarnings("unchecked") @Override public ProtocolProxy getProxy( @@ -390,6 +399,53 @@ public void testProxyAddress() throws Exception { } } + @Test + public void testConnectionWithSocketFactory() throws IOException, ServiceException { + TestRpcService firstProxy = null; + TestRpcService secondProxy = null; + + Configuration newConf = new Configuration(conf); + newConf.set(CommonConfigurationKeysPublic. + HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, ""); + + RetryPolicy retryPolicy = RetryUtils.getDefaultRetryPolicy( + newConf, "Test.No.Such.Key", + true, + "Test.No.Such.Key", "10000,6", + null); + + // create a server with two handlers + Server server = setupTestServer(newConf, 2); + try { + // create the first client + firstProxy = getClient(addr, newConf); + // create the second client + secondProxy = getClient(addr, newConf); + + firstProxy.ping(null, newEmptyRequest()); + secondProxy.ping(null, newEmptyRequest()); + + Client client = ProtobufRpcEngine2.getClient(newConf); + assertEquals(1, client.getConnectionIds().size()); + + stop(null, firstProxy, secondProxy); + ProtobufRpcEngine2.clearClientCache(); + + // create the first client with index 1 + firstProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 1); + // create the second client with index 2 + secondProxy = getMultipleClientWithIndex(addr, newConf, retryPolicy, 2); + firstProxy.ping(null, newEmptyRequest()); + secondProxy.ping(null, newEmptyRequest()); + + Client client2 = ProtobufRpcEngine2.getClient(newConf); + assertEquals(2, client2.getConnectionIds().size()); + } finally { + System.out.println("Down slow rpc testing"); + stop(server, firstProxy, secondProxy); + } + } + @Test public void testSlowRpc() throws IOException, ServiceException { Server server; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index e9019e3d24..7635b16dac 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -18,6 +18,8 @@ package org.apache.hadoop.ipc; +import org.apache.commons.lang3.builder.EqualsBuilder; +import org.apache.commons.lang3.builder.HashCodeBuilder; import org.apache.hadoop.thirdparty.protobuf.BlockingService; import org.apache.hadoop.thirdparty.protobuf.RpcController; import org.apache.hadoop.thirdparty.protobuf.ServiceException; @@ -26,6 +28,7 @@ import java.util.concurrent.locks.Lock; import java.util.concurrent.locks.ReentrantLock; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; import org.apache.hadoop.net.NetUtils; @@ -154,11 +157,53 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr, } } - protected static void stop(Server server, TestRpcService proxy) { - if (proxy != null) { - try { - RPC.stopProxy(proxy); - } catch (Exception ignored) {} + /** + * Try to obtain a proxy of TestRpcService with an index. + * @param serverAddr input server address + * @param clientConf input client configuration + * @param retryPolicy input retryPolicy + * @param index input index + * @return one proxy of TestRpcService + */ + protected static TestRpcService getMultipleClientWithIndex(InetSocketAddress serverAddr, + Configuration clientConf, RetryPolicy retryPolicy, int index) + throws ServiceException, IOException { + MockConnectionId connectionId = new MockConnectionId(serverAddr, + TestRpcService.class, UserGroupInformation.getCurrentUser(), + RPC.getRpcTimeout(clientConf), retryPolicy, clientConf, index); + return getClient(connectionId, clientConf); + } + + /** + * Obtain a TestRpcService Proxy by a connectionId. + * @param connId input connectionId + * @param clientConf input configuration + * @return a TestRpcService Proxy + * @throws ServiceException a ServiceException + */ + protected static TestRpcService getClient(ConnectionId connId, + Configuration clientConf) throws ServiceException { + try { + return RPC.getProtocolProxy( + TestRpcService.class, + 0, + connId, + clientConf, + NetUtils.getDefaultSocketFactory(clientConf)).getProxy(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + + protected static void stop(Server server, TestRpcService... proxies) { + if (proxies != null) { + for (TestRpcService proxy : proxies) { + if (proxy != null) { + try { + RPC.stopProxy(proxy); + } catch (Exception ignored) {} + } + } } if (server != null) { @@ -189,6 +234,40 @@ protected static int countThreads(String search) { return count; } + public static class MockConnectionId extends ConnectionId { + private static final int PRIME = 16777619; + private final int index; + + public MockConnectionId(InetSocketAddress address, Class protocol, + UserGroupInformation ticket, int rpcTimeout, RetryPolicy connectionRetryPolicy, + Configuration conf, int index) { + super(address, protocol, ticket, rpcTimeout, connectionRetryPolicy, conf); + this.index = index; + } + + @Override + public int hashCode() { + return new HashCodeBuilder() + .append(PRIME * super.hashCode()) + .append(this.index) + .toHashCode(); + } + + @Override + public boolean equals(Object obj) { + if (!super.equals(obj)) { + return false; + } + if (obj instanceof MockConnectionId) { + MockConnectionId other = (MockConnectionId)obj; + return new EqualsBuilder() + .append(this.index, other.index) + .isEquals(); + } + return false; + } + } + public static class TestTokenIdentifier extends TokenIdentifier { private Text tokenid; private Text realUser;