diff --git a/CHANGES.txt b/CHANGES.txt index 9a3c28d54b..b5438530e3 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -888,6 +888,9 @@ Trunk (unreleased changes) HADOOP-6138. Eliminate the depracate warnings introduced by H-5438. (He Yongqiang via szetszwo) + HADOOP-6132. RPC client create an extra connection because of incorrect + key for connection cache. (Kan Zhang via rangadi) + Release 0.20.1 - Unreleased INCOMPATIBLE CHANGES diff --git a/src/java/org/apache/hadoop/ipc/RPC.java b/src/java/org/apache/hadoop/ipc/RPC.java index 94b0ec82e2..68702465d7 100644 --- a/src/java/org/apache/hadoop/ipc/RPC.java +++ b/src/java/org/apache/hadoop/ipc/RPC.java @@ -197,13 +197,16 @@ public class RPC { private static ClientCache CLIENTS=new ClientCache(); private static class Invoker implements InvocationHandler { + private Class protocol; private InetSocketAddress address; private UserGroupInformation ticket; private Client client; private boolean isClosed = false; - public Invoker(InetSocketAddress address, UserGroupInformation ticket, - Configuration conf, SocketFactory factory) { + public Invoker(Class protocol, + InetSocketAddress address, UserGroupInformation ticket, + Configuration conf, SocketFactory factory) { + this.protocol = protocol; this.address = address; this.ticket = ticket; this.client = CLIENTS.getClient(conf, factory); @@ -219,7 +222,7 @@ public class RPC { ObjectWritable value = (ObjectWritable) client.call(new Invocation(method, args), address, - method.getDeclaringClass(), ticket); + protocol, ticket); if (logDebug) { long callTime = System.currentTimeMillis() - startTime; LOG.debug("Call: " + method.getName() + " " + callTime); @@ -283,7 +286,8 @@ public class RPC { } } - public static VersionedProtocol waitForProxy(Class protocol, + public static VersionedProtocol waitForProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf @@ -301,7 +305,8 @@ public class RPC { * @return the proxy * @throws IOException if the far end through a RemoteException */ - static VersionedProtocol waitForProxy(Class protocol, + static VersionedProtocol waitForProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, @@ -334,7 +339,8 @@ public class RPC { } /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static VersionedProtocol getProxy(Class protocol, + public static VersionedProtocol getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf, SocketFactory factory) throws IOException { UserGroupInformation ugi = null; @@ -348,14 +354,15 @@ public class RPC { /** Construct a client-side proxy object that implements the named protocol, * talking to a server at the named address. */ - public static VersionedProtocol getProxy(Class protocol, + public static VersionedProtocol getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, SocketFactory factory) throws IOException { VersionedProtocol proxy = (VersionedProtocol) Proxy.newProxyInstance( protocol.getClassLoader(), new Class[] { protocol }, - new Invoker(addr, ticket, conf, factory)); + new Invoker(protocol, addr, ticket, conf, factory)); long serverVersion = proxy.getProtocolVersion(protocol.getName(), clientVersion); if (serverVersion == clientVersion) { @@ -376,7 +383,8 @@ public class RPC { * @return a proxy instance * @throws IOException */ - public static VersionedProtocol getProxy(Class protocol, + public static VersionedProtocol getProxy( + Class protocol, long clientVersion, InetSocketAddress addr, Configuration conf) throws IOException {