HADOOP-12475. Replace guava Cache with ConcurrentHashMap for caching Connection in ipc Client (Walter Su via sjlee)

This commit is contained in:
Sangjin Lee 2015-10-15 11:43:07 -07:00
parent 7a98d94b7b
commit 8d2d3eb7bb
2 changed files with 33 additions and 34 deletions

View File

@ -510,6 +510,9 @@ Trunk (Unreleased)
HADOOP-12364. Deleting pid file after stop is causing the daemons to HADOOP-12364. Deleting pid file after stop is causing the daemons to
keep restarting (Siqi Li via aw) keep restarting (Siqi Li via aw)
HADOOP-12475. Replace guava Cache with ConcurrentHashMap for caching
Connection in ipc Client (Walter Su via sjlee)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd) HADOOP-7761. Improve the performance of raw comparisons. (todd)

View File

@ -43,7 +43,8 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -57,8 +58,6 @@
import javax.net.SocketFactory; import javax.net.SocketFactory;
import javax.security.sasl.Sasl; import javax.security.sasl.Sasl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -128,8 +127,8 @@ public static void setCallIdAndRetryCount(int cid, int rc) {
retryCount.set(rc); retryCount.set(rc);
} }
private final Cache<ConnectionId, Connection> connections = private ConcurrentMap<ConnectionId, Connection> connections =
CacheBuilder.newBuilder().build(); new ConcurrentHashMap<>();
private Class<? extends Writable> valueClass; // class of call values private Class<? extends Writable> valueClass; // class of call values
private AtomicBoolean running = new AtomicBoolean(true); // if client runs private AtomicBoolean running = new AtomicBoolean(true); // if client runs
@ -1178,7 +1177,10 @@ private synchronized void close() {
return; return;
} }
connections.invalidate(remoteId); // We have marked this connection as closed. Other thread could have
// already known it and replace this closedConnection with a new one.
// We should only remove this closedConnection.
connections.remove(remoteId, this);
// close the streams and therefore the socket // close the streams and therefore the socket
IOUtils.closeStream(out); IOUtils.closeStream(out);
@ -1265,12 +1267,12 @@ public void stop() {
} }
// wake up all connections // wake up all connections
for (Connection conn : connections.asMap().values()) { for (Connection conn : connections.values()) {
conn.interrupt(); conn.interrupt();
} }
// wait until all connections are closed // wait until all connections are closed
while (connections.size() > 0) { while (!connections.isEmpty()) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1289,7 +1291,6 @@ public Writable call(Writable param, InetSocketAddress address)
ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0, ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
conf); conf);
return call(RpcKind.RPC_BUILTIN, param, remoteId); return call(RpcKind.RPC_BUILTIN, param, remoteId);
} }
/** /**
@ -1465,14 +1466,13 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
Set<ConnectionId> getConnectionIds() { Set<ConnectionId> getConnectionIds() {
return connections.asMap().keySet(); return connections.keySet();
} }
/** Get a connection from the pool, or create a new one and add it to the /** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */ * pool. Connections to a given ConnectionId are reused. */
private Connection getConnection( private Connection getConnection(ConnectionId remoteId,
final ConnectionId remoteId, Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth)
Call call, final int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException { throws IOException {
if (!running.get()) { if (!running.get()) {
// the client is stopped // the client is stopped
@ -1483,34 +1483,30 @@ private Connection getConnection(
* connectionsId object and with set() method. We need to manage the * connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok. * refs for keys in HashMap properly. For now its ok.
*/ */
while(true) { while (true) {
try { // These lines below can be shorten with computeIfAbsent in Java8
connection = connections.get(remoteId, new Callable<Connection>() { connection = connections.get(remoteId);
@Override if (connection == null) {
public Connection call() throws Exception { connection = new Connection(remoteId, serviceClass);
return new Connection(remoteId, serviceClass); Connection existing = connections.putIfAbsent(remoteId, connection);
} if (existing != null) {
}); connection = existing;
} catch (ExecutionException e) {
Throwable cause = e.getCause();
// the underlying exception should normally be IOException
if (cause instanceof IOException) {
throw (IOException) cause;
} else {
throw new IOException(cause);
}
}
if (connection.addCall(call)) {
break;
} else {
connections.invalidate(remoteId);
} }
} }
//we don't invoke the method below inside "synchronized (connections)" if (connection.addCall(call)) {
//block above. The reason for that is if the server happens to be slow, break;
//it will take longer to establish a connection and that will slow the } else {
//entire system down. // This connection is closed, should be removed. But other thread could
// have already known this closedConnection, and replace it with a new
// connection. So we should call conditional remove to make sure we only
// remove this closedConnection.
connections.remove(remoteId, connection);
}
}
// If the server happens to be slow, the method below will take longer to
// establish a connection.
connection.setupIOstreams(fallbackToSimpleAuth); connection.setupIOstreams(fallbackToSimpleAuth);
return connection; return connection;
} }