From 9192f71e21847ad86bc9ff23847d8957dfe8ae58 Mon Sep 17 00:00:00 2001 From: Tsz Wo Nicholas Sze Date: Tue, 26 Feb 2019 15:14:21 -0800 Subject: [PATCH] HADOOP-16127. In ipc.Client, put a new connection could happen after stop. --- .../java/org/apache/hadoop/ipc/Client.java | 120 ++++++++++-------- .../org/apache/hadoop/ipc/ClientCache.java | 7 +- 2 files changed, 72 insertions(+), 55 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index 2219dece9a..01219671b8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -71,6 +71,7 @@ import java.util.concurrent.atomic.AtomicInteger; import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; +import java.util.function.Consumer; import static org.apache.hadoop.ipc.RpcConstants.CONNECTION_CONTEXT_CALL_ID; import static org.apache.hadoop.ipc.RpcConstants.PING_CALL_ID; @@ -84,9 +85,7 @@ @Public @InterfaceStability.Evolving public class Client implements AutoCloseable { - public static final Logger LOG = LoggerFactory.getLogger(Client.class); - private static final int STOP_SLEEP_TIME_MS = 10; /** A counter for generating call IDs. */ private static final AtomicInteger callIdCounter = new AtomicInteger(); @@ -124,15 +123,17 @@ public static void setCallIdAndRetryCount(int cid, int rc, EXTERNAL_CALL_HANDLER.set(externalHandler); } - private ConcurrentMap connections = + private final ConcurrentMap connections = new ConcurrentHashMap<>(); + private final Object putLock = new Object(); + private final Object emptyCondition = new Object(); + private final AtomicBoolean running = new AtomicBoolean(true); private Class valueClass; // class of call values - private AtomicBoolean running = new AtomicBoolean(true); // if client runs final private Configuration conf; private SocketFactory socketFactory; // how to create sockets - private int refCount = 1; + private final AtomicInteger refCount = new AtomicInteger(1); private final int connectionTimeout; @@ -207,7 +208,7 @@ synchronized ExecutorService unrefAndCleanup() { return clientExecutor; } - }; + } /** * set the ping interval value in configuration @@ -281,29 +282,19 @@ public static final void setConnectTimeout(Configuration conf, int timeout) { public static final ExecutorService getClientExecutor() { return Client.clientExcecutorFactory.clientExecutor; } + /** * Increment this client's reference count - * */ - synchronized void incCount() { - refCount++; + void incCount() { + refCount.incrementAndGet(); } /** * Decrement this client's reference count - * */ - synchronized void decCount() { - refCount--; - } - - /** - * Return if this client has no reference - * - * @return true if this client has no reference; false otherwise - */ - synchronized boolean isZeroReference() { - return refCount==0; + int decAndGetCount() { + return refCount.decrementAndGet(); } /** Check the rpc response header. */ @@ -452,17 +443,13 @@ private class Connection extends Thread { private final Object sendRpcRequestLock = new Object(); private AtomicReference connectingThread = new AtomicReference<>(); + private final Consumer removeMethod; - public Connection(ConnectionId remoteId, int serviceClass) throws IOException { + Connection(ConnectionId remoteId, int serviceClass, + Consumer removeMethod) { this.remoteId = remoteId; this.server = remoteId.getAddress(); - if (server.isUnresolved()) { - throw NetUtils.wrapException(server.getHostName(), - server.getPort(), - null, - 0, - new UnknownHostException()); - } + this.maxResponseLength = remoteId.conf.getInt( CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH, CommonConfigurationKeys.IPC_MAXIMUM_RESPONSE_LENGTH_DEFAULT); @@ -481,7 +468,12 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException { .makeRpcRequestHeader(RpcKind.RPC_PROTOCOL_BUFFER, OperationProto.RPC_FINAL_PACKET, PING_CALL_ID, RpcConstants.INVALID_RETRY_COUNT, clientId); - pingHeader.writeDelimitedTo(buf); + try { + pingHeader.writeDelimitedTo(buf); + } catch (IOException e) { + throw new IllegalStateException("Failed to write to buf for " + + remoteId + " in " + Client.this + " due to " + e, e); + } pingRequest = buf.toByteArray(); } this.pingInterval = remoteId.getPingInterval(); @@ -494,6 +486,8 @@ public Connection(ConnectionId remoteId, int serviceClass) throws IOException { this.soTimeout = pingInterval; } this.serviceClass = serviceClass; + this.removeMethod = removeMethod; + if (LOG.isDebugEnabled()) { LOG.debug("The ping interval is " + this.pingInterval + " ms."); } @@ -1253,7 +1247,7 @@ private synchronized void close() { // We have marked this connection as closed. Other thread could have // already known it and replace this closedConnection with a new one. // We should only remove this closedConnection. - connections.remove(remoteId, this); + removeMethod.accept(this); // close the streams and therefore the socket IOUtils.closeStream(ipcStreams); @@ -1325,7 +1319,13 @@ public Client(Class valueClass, Configuration conf, public Client(Class valueClass, Configuration conf) { this(valueClass, conf, NetUtils.getDefaultSocketFactory(conf)); } - + + @Override + public String toString() { + return getClass().getSimpleName() + "-" + + StringUtils.byteToHexString(clientId); + } + /** Return the socket factory of this client * * @return this client's socket factory @@ -1340,11 +1340,12 @@ public void stop() { if (LOG.isDebugEnabled()) { LOG.debug("Stopping client"); } - - if (!running.compareAndSet(true, false)) { - return; + synchronized (putLock) { // synchronized to avoid put after stop + if (!running.compareAndSet(true, false)) { + return; + } } - + // wake up all connections for (Connection conn : connections.values()) { conn.interrupt(); @@ -1352,13 +1353,15 @@ public void stop() { } // wait until all connections are closed - while (!connections.isEmpty()) { - try { - Thread.sleep(STOP_SLEEP_TIME_MS); - } catch (InterruptedException e) { + synchronized (emptyCondition) { + // synchronized the loop to guarantee wait must be notified. + while (!connections.isEmpty()) { + try { + emptyCondition.wait(); + } catch (InterruptedException e) { + } } } - clientExcecutorFactory.unrefAndCleanup(); } @@ -1569,24 +1572,37 @@ Set getConnectionIds() { private Connection getConnection(ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) throws IOException { - if (!running.get()) { - // the client is stopped - throw new IOException("The client is stopped"); + final InetSocketAddress address = remoteId.getAddress(); + if (address.isUnresolved()) { + throw NetUtils.wrapException(address.getHostName(), + address.getPort(), + null, + 0, + new UnknownHostException()); } + + final Consumer removeMethod = c -> { + final boolean removed = connections.remove(remoteId, c); + if (removed && connections.isEmpty()) { + synchronized (emptyCondition) { + emptyCondition.notify(); + } + } + }; + Connection connection; /* we could avoid this allocation for each RPC by having a * connectionsId object and with set() method. We need to manage the * refs for keys in HashMap properly. For now its ok. */ while (true) { - // These lines below can be shorten with computeIfAbsent in Java8 - connection = connections.get(remoteId); - if (connection == null) { - connection = new Connection(remoteId, serviceClass); - Connection existing = connections.putIfAbsent(remoteId, connection); - if (existing != null) { - connection = existing; + synchronized (putLock) { // synchronized to avoid put after stop + if (!running.get()) { + throw new IOException("Failed to get connection for " + remoteId + + ", " + call + ": " + this + " is already stopped"); } + connection = connections.computeIfAbsent(remoteId, + id -> new Connection(id, serviceClass, removeMethod)); } if (connection.addCall(call)) { @@ -1596,7 +1612,7 @@ private Connection getConnection(ConnectionId remoteId, // have already known this closedConnection, and replace it with a new // connection. So we should call conditional remove to make sure we only // remove this closedConnection. - connections.remove(remoteId, connection); + removeMethod.accept(connection); } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java index 00d9a7953e..a0720d4218 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ClientCache.java @@ -96,16 +96,17 @@ public void stopClient(Client client) { if (Client.LOG.isDebugEnabled()) { Client.LOG.debug("stopping client from cache: " + client); } + final int count; synchronized (this) { - client.decCount(); - if (client.isZeroReference()) { + count = client.decAndGetCount(); + if (count == 0) { if (Client.LOG.isDebugEnabled()) { Client.LOG.debug("removing client from cache: " + client); } clients.remove(client.getSocketFactory()); } } - if (client.isZeroReference()) { + if (count == 0) { if (Client.LOG.isDebugEnabled()) { Client.LOG.debug("stopping actual client because no more references remain: " + client);