HADOOP-7607 and MAPREDUCE-2934. Simplify the RPC proxy cleanup process. (atm)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1167318 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-09-09 18:12:14 +00:00
parent 1d84d983a2
commit 68cb2b01b6
7 changed files with 44 additions and 59 deletions

View File

@ -5,7 +5,11 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm) HADOOP-7595. Upgrade dependency to Avro 1.5.3. (Alejandro Abdelnur via atm)
HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia)
HADOOP-7524. Change RPC to allow multiple protocols including multuple
versions of the same protocol (sanjay Radia)
HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
BUGS BUGS

View File

@ -131,7 +131,7 @@ public void writeBuffers(List<ByteBuffer> buffers) throws IOException {
} }
public void close() throws IOException { public void close() throws IOException {
ENGINE.stopProxy(tunnel); RPC.stopProxy(tunnel);
} }
} }
@ -152,15 +152,6 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
false); false);
} }
/** Stop this proxy. */
public void stopProxy(Object proxy) {
try {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
} catch (IOException e) {
LOG.warn("Error while stopping "+proxy, e);
}
}
private class Invoker implements InvocationHandler, Closeable { private class Invoker implements InvocationHandler, Closeable {
private final ClientTransceiver tx; private final ClientTransceiver tx;
private final SpecificRequestor requestor; private final SpecificRequestor requestor;

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.ipc; package org.apache.hadoop.ipc;
import java.lang.reflect.InvocationHandler;
import java.lang.reflect.Proxy; import java.lang.reflect.Proxy;
import java.lang.reflect.Method; import java.lang.reflect.Method;
@ -26,6 +27,7 @@
import java.net.NoRouteToHostException; import java.net.NoRouteToHostException;
import java.net.SocketTimeoutException; import java.net.SocketTimeoutException;
import java.io.*; import java.io.*;
import java.io.Closeable;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
@ -80,12 +82,8 @@ static public String getProtocolName(Class<?> protocol) {
private RPC() {} // no public ctor private RPC() {} // no public ctor
// cache of RpcEngines by protocol // cache of RpcEngines by protocol
private static final Map<Class,RpcEngine> PROTOCOL_ENGINES private static final Map<Class<?>,RpcEngine> PROTOCOL_ENGINES
= new HashMap<Class,RpcEngine>(); = new HashMap<Class<?>,RpcEngine>();
// track what RpcEngine is used by a proxy class, for stopProxy()
private static final Map<Class,RpcEngine> PROXY_ENGINES
= new HashMap<Class,RpcEngine>();
private static final String ENGINE_PROP = "rpc.engine"; private static final String ENGINE_PROP = "rpc.engine";
@ -96,32 +94,23 @@ private RPC() {} // no public ctor
* @param engine the RpcEngine impl * @param engine the RpcEngine impl
*/ */
public static void setProtocolEngine(Configuration conf, public static void setProtocolEngine(Configuration conf,
Class protocol, Class engine) { Class<?> protocol, Class<?> engine) {
conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class); conf.setClass(ENGINE_PROP+"."+protocol.getName(), engine, RpcEngine.class);
} }
// return the RpcEngine configured to handle a protocol // return the RpcEngine configured to handle a protocol
private static synchronized RpcEngine getProtocolEngine(Class protocol, private static synchronized RpcEngine getProtocolEngine(Class<?> protocol,
Configuration conf) { Configuration conf) {
RpcEngine engine = PROTOCOL_ENGINES.get(protocol); RpcEngine engine = PROTOCOL_ENGINES.get(protocol);
if (engine == null) { if (engine == null) {
Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(), Class<?> impl = conf.getClass(ENGINE_PROP+"."+protocol.getName(),
WritableRpcEngine.class); WritableRpcEngine.class);
engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf); engine = (RpcEngine)ReflectionUtils.newInstance(impl, conf);
if (protocol.isInterface())
PROXY_ENGINES.put(Proxy.getProxyClass(protocol.getClassLoader(),
protocol),
engine);
PROTOCOL_ENGINES.put(protocol, engine); PROTOCOL_ENGINES.put(protocol, engine);
} }
return engine; return engine;
} }
// return the RpcEngine that handles a proxy object
private static synchronized RpcEngine getProxyEngine(Object proxy) {
return PROXY_ENGINES.get(proxy.getClass());
}
/** /**
* A version mismatch for the RPC protocol. * A version mismatch for the RPC protocol.
*/ */
@ -477,13 +466,30 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
} }
/** /**
* Stop this proxy and release its invoker's resource * Stop this proxy and release its invoker's resource by getting the
* @param proxy the proxy to be stopped * invocation handler for the given proxy object and calling
* {@link Closeable#close} if that invocation handler implements
* {@link Closeable}.
*
* @param proxy the RPC proxy object to be stopped
*/ */
public static void stopProxy(Object proxy) { public static void stopProxy(Object proxy) {
RpcEngine rpcEngine; InvocationHandler invocationHandler = null;
if (proxy!=null && (rpcEngine = getProxyEngine(proxy)) != null) { try {
rpcEngine.stopProxy(proxy); invocationHandler = Proxy.getInvocationHandler(proxy);
} catch (IllegalArgumentException e) {
LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
}
if (proxy != null && invocationHandler != null &&
invocationHandler instanceof Closeable) {
try {
((Closeable)invocationHandler).close();
} catch (IOException e) {
LOG.error("Stopping RPC invocation handler caused exception", e);
}
} else {
LOG.error("Could not get invocation handler " + invocationHandler +
" for proxy " + proxy + ", or invocation handler is not closeable.");
} }
} }
@ -532,7 +538,7 @@ public static Server getServer(final Object instance, final String bindAddress,
} }
/** Construct a server for a protocol implementation instance. */ /** Construct a server for a protocol implementation instance. */
public static Server getServer(Class protocol, public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, Object instance, String bindAddress,
int port, Configuration conf) int port, Configuration conf)
throws IOException { throws IOException {
@ -543,7 +549,7 @@ public static Server getServer(Class protocol,
* @deprecated secretManager should be passed. * @deprecated secretManager should be passed.
*/ */
@Deprecated @Deprecated
public static Server getServer(Class protocol, public static Server getServer(Class<?> protocol,
Object instance, String bindAddress, int port, Object instance, String bindAddress, int port,
int numHandlers, int numHandlers,
boolean verbose, Configuration conf) boolean verbose, Configuration conf)

View File

@ -41,9 +41,6 @@ <T> ProtocolProxy<T> getProxy(Class<T> protocol,
UserGroupInformation ticket, Configuration conf, UserGroupInformation ticket, Configuration conf,
SocketFactory factory, int rpcTimeout) throws IOException; SocketFactory factory, int rpcTimeout) throws IOException;
/** Stop this proxy. */
void stopProxy(Object proxy);
/** Expert: Make multiple, parallel calls to a set of servers. */ /** Expert: Make multiple, parallel calls to a set of servers. */
Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
UserGroupInformation ticket, Configuration conf) UserGroupInformation ticket, Configuration conf)

View File

@ -30,6 +30,7 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Arrays; import java.util.Arrays;
import java.util.List; import java.util.List;
import java.io.Closeable;
import java.util.Map; import java.util.Map;
import java.util.HashMap; import java.util.HashMap;
@ -219,7 +220,7 @@ public Configuration getConf() {
private static ClientCache CLIENTS=new ClientCache(); private static ClientCache CLIENTS=new ClientCache();
private static class Invoker implements InvocationHandler { private static class Invoker implements InvocationHandler, Closeable {
private Client.ConnectionId remoteId; private Client.ConnectionId remoteId;
private Client client; private Client client;
private boolean isClosed = false; private boolean isClosed = false;
@ -250,7 +251,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
} }
/* close the IPC client that's responsible for this invoker's RPCs */ /* close the IPC client that's responsible for this invoker's RPCs */
synchronized private void close() { synchronized public void close() {
if (!isClosed) { if (!isClosed) {
isClosed = true; isClosed = true;
CLIENTS.stopClient(client); CLIENTS.stopClient(client);
@ -282,15 +283,6 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
return new ProtocolProxy<T>(protocol, proxy, true); return new ProtocolProxy<T>(protocol, proxy, true);
} }
/**
* Stop this proxy and release its invoker's resource
* @param proxy the proxy to be stopped
*/
public void stopProxy(Object proxy) {
((Invoker)Proxy.getInvocationHandler(proxy)).close();
}
/** Expert: Make multiple, parallel calls to a set of servers. */ /** Expert: Make multiple, parallel calls to a set of servers. */
public Object[] call(Method method, Object[][] params, public Object[] call(Method method, Object[][] params,
InetSocketAddress[] addrs, InetSocketAddress[] addrs,

View File

@ -4,7 +4,11 @@ Trunk (unreleased changes)
IMPROVEMENTS IMPROVEMENTS
MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols including multuple versions of the same protocol (sanjay Radia) MAPREDUCE-2887 due to HADOOP-7524 Change RPC to allow multiple protocols
including multuple versions of the same protocol (sanjay Radia)
MAPREDUCE-2934. MR portion of HADOOP-7607 - Simplify the RPC proxy cleanup
process (atm)
BUG FIXES BUG FIXES
MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and MAPREDUCE-2784. [Gridmix] Bug fixes in ExecutionSummarizer and

View File

@ -73,15 +73,6 @@ public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
addr, ticket, conf, factory, rpcTimeout)), false); addr, ticket, conf, factory, rpcTimeout)), false);
} }
@Override
public void stopProxy(Object proxy) {
try {
((Invoker) Proxy.getInvocationHandler(proxy)).close();
} catch (IOException e) {
LOG.warn("Error while stopping " + proxy, e);
}
}
private static class Invoker implements InvocationHandler, Closeable { private static class Invoker implements InvocationHandler, Closeable {
private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>(); private Map<String, Message> returnTypes = new ConcurrentHashMap<String, Message>();
private boolean isClosed = false; private boolean isClosed = false;