HADOOP-11772. RPC Invoker relies on static ClientCache which has synchronized(this) blocks. Contributed by Haohui Mai.

This commit is contained in:
Haohui Mai 2015-05-20 20:10:50 -07:00
parent 6329bd00fa
commit fb6b38d67d
2 changed files with 35 additions and 74 deletions

View File

@ -604,6 +604,9 @@ Release 2.8.0 - UNRELEASED
HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7 HADOOP-11970. Replace uses of ThreadLocal<Random> with JDK7
ThreadLocalRandom. (Sean Busbey via Colin P. McCabe) ThreadLocalRandom. (Sean Busbey via Colin P. McCabe)
HADOOP-11772. RPC Invoker relies on static ClientCache which has
synchronized(this) blocks. (wheat9)
BUG FIXES BUG FIXES
HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there HADOOP-11802: DomainSocketWatcher thread terminates sometimes after there
is an I/O error during requestShortCircuitShm (cmccabe) is an I/O error during requestShortCircuitShm (cmccabe)

View File

@ -43,6 +43,7 @@
import java.util.Map.Entry; import java.util.Map.Entry;
import java.util.Random; import java.util.Random;
import java.util.Set; import java.util.Set;
import java.util.concurrent.Callable;
import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService; import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors; import java.util.concurrent.Executors;
@ -56,6 +57,8 @@
import javax.net.SocketFactory; import javax.net.SocketFactory;
import javax.security.sasl.Sasl; import javax.security.sasl.Sasl;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
@ -124,8 +127,8 @@ public static void setCallIdAndRetryCount(int cid, int rc) {
retryCount.set(rc); retryCount.set(rc);
} }
private Hashtable<ConnectionId, Connection> connections = private final Cache<ConnectionId, Connection> connections =
new Hashtable<ConnectionId, Connection>(); CacheBuilder.newBuilder().build();
private Class<? extends Writable> valueClass; // class of call values private Class<? extends Writable> valueClass; // class of call values
private AtomicBoolean running = new AtomicBoolean(true); // if client runs private AtomicBoolean running = new AtomicBoolean(true); // if client runs
@ -1167,13 +1170,7 @@ private synchronized void close() {
return; return;
} }
// release the resources connections.invalidate(remoteId);
// first thing to do;take the connection out of the connection list
synchronized (connections) {
if (connections.get(remoteId) == this) {
connections.remove(remoteId);
}
}
// close the streams and therefore the socket // close the streams and therefore the socket
IOUtils.closeStream(out); IOUtils.closeStream(out);
@ -1260,14 +1257,12 @@ public void stop() {
} }
// wake up all connections // wake up all connections
synchronized (connections) { for (Connection conn : connections.asMap().values()) {
for (Connection conn : connections.values()) { conn.interrupt();
conn.interrupt();
}
} }
// wait until all connections are closed // wait until all connections are closed
while (!connections.isEmpty()) { while (connections.size() > 0) {
try { try {
Thread.sleep(100); Thread.sleep(100);
} catch (InterruptedException e) { } catch (InterruptedException e) {
@ -1283,56 +1278,12 @@ public void stop() {
*/ */
public Writable call(Writable param, InetSocketAddress address) public Writable call(Writable param, InetSocketAddress address)
throws IOException { throws IOException {
return call(RPC.RpcKind.RPC_BUILTIN, param, address); ConnectionId remoteId = ConnectionId.getConnectionId(address, null, null, 0,
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code>, returning the value. Throws exceptions if there are
* network problems or if the remote code threw an exception.
* @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead
*/
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress address)
throws IOException {
return call(rpcKind, param, address, null);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> with the <code>ticket</code> credentials, returning
* the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead
*/
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
UserGroupInformation ticket) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, null, ticket, 0,
conf); conf);
return call(rpcKind, param, remoteId); return call(RpcKind.RPC_BUILTIN, param, remoteId);
}
/** Make a call, passing <code>param</code>, to the IPC server running at
* <code>address</code> which is servicing the <code>protocol</code> protocol,
* with the <code>ticket</code> credentials and <code>rpcTimeout</code> as
* timeout, returning the value.
* Throws exceptions if there are network problems or if the remote code
* threw an exception.
* @deprecated Use {@link #call(RPC.RpcKind, Writable,
* ConnectionId)} instead
*/
@Deprecated
public Writable call(RPC.RpcKind rpcKind, Writable param, InetSocketAddress addr,
Class<?> protocol, UserGroupInformation ticket,
int rpcTimeout) throws IOException {
ConnectionId remoteId = ConnectionId.getConnectionId(addr, protocol,
ticket, rpcTimeout, conf);
return call(rpcKind, param, remoteId);
} }
/** /**
* Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress, * Same as {@link #call(RPC.RpcKind, Writable, InetSocketAddress,
* Class, UserGroupInformation, int, Configuration)} * Class, UserGroupInformation, int, Configuration)}
@ -1506,15 +1457,14 @@ public Writable call(RPC.RpcKind rpcKind, Writable rpcRequest,
@InterfaceAudience.Private @InterfaceAudience.Private
@InterfaceStability.Unstable @InterfaceStability.Unstable
Set<ConnectionId> getConnectionIds() { Set<ConnectionId> getConnectionIds() {
synchronized (connections) { return connections.asMap().keySet();
return connections.keySet();
}
} }
/** Get a connection from the pool, or create a new one and add it to the /** Get a connection from the pool, or create a new one and add it to the
* pool. Connections to a given ConnectionId are reused. */ * pool. Connections to a given ConnectionId are reused. */
private Connection getConnection(ConnectionId remoteId, private Connection getConnection(
Call call, int serviceClass, AtomicBoolean fallbackToSimpleAuth) final ConnectionId remoteId,
Call call, final int serviceClass, AtomicBoolean fallbackToSimpleAuth)
throws IOException { throws IOException {
if (!running.get()) { if (!running.get()) {
// the client is stopped // the client is stopped
@ -1525,15 +1475,23 @@ private Connection getConnection(ConnectionId remoteId,
* connectionsId object and with set() method. We need to manage the * connectionsId object and with set() method. We need to manage the
* refs for keys in HashMap properly. For now its ok. * refs for keys in HashMap properly. For now its ok.
*/ */
do { while(true) {
synchronized (connections) { try {
connection = connections.get(remoteId); connection = connections.get(remoteId, new Callable<Connection>() {
if (connection == null) { @Override
connection = new Connection(remoteId, serviceClass); public Connection call() throws Exception {
connections.put(remoteId, connection); return new Connection(remoteId, serviceClass);
} }
});
} catch (ExecutionException e) {
throw new IOException(e);
} }
} while (!connection.addCall(call)); if (connection.addCall(call)) {
break;
} else {
connections.invalidate(remoteId);
}
}
//we don't invoke the method below inside "synchronized (connections)" //we don't invoke the method below inside "synchronized (connections)"
//block above. The reason for that is if the server happens to be slow, //block above. The reason for that is if the server happens to be slow,