diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 2638f7e332..ddbc04a545 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -286,6 +286,9 @@ Release 0.23.3 - UNRELEASED HADOOP-8204. TestHealthMonitor fails occasionally (todd) + HADOOP-8202. RPC stopProxy() does not close the proxy correctly. + (Hari Mankude via suresh) + BREAKDOWN OF HADOOP-7454 SUBTASKS HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index 753352945d..545cc44836 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.ha; +import java.io.Closeable; import java.io.IOException; import java.util.Collections; import java.util.LinkedList; @@ -194,7 +195,9 @@ private void doHealthChecks() throws InterruptedException { } catch (Throwable t) { LOG.warn("Transport-level exception trying to monitor health of " + targetToMonitor + ": " + t.getLocalizedMessage()); - RPC.stopProxy(proxy); + if (proxy instanceof Closeable) { + RPC.stopProxy(proxy); + } proxy = null; enterState(State.SERVICE_NOT_RESPONDING); Thread.sleep(sleepAfterDisconnectMillis); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java index 303b3aac3a..4dd1269780 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RPC.java @@ -39,6 +39,7 @@ import org.apache.commons.logging.*; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.io.*; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind; @@ -572,38 +573,41 @@ public static ProtocolProxy getProtocolProxy(Class protocol, } /** - * Stop this proxy and release its invoker's resource by getting the - * invocation handler for the given proxy object and calling - * {@link Closeable#close} if that invocation handler implements - * {@link Closeable}. + * Stop the proxy. Proxy must either implement {@link Closeable} or must have + * associated {@link RpcInvocationHandler}. * - * @param proxy the RPC proxy object to be stopped + * @param proxy + * the RPC proxy object to be stopped + * @throws HadoopIllegalArgumentException + * if the proxy does not implement {@link Closeable} interface or + * does not have closeable {@link InvocationHandler} */ public static void stopProxy(Object proxy) { - if (proxy instanceof ProtocolTranslator) { - RPC.stopProxy(((ProtocolTranslator)proxy) - .getUnderlyingProxyObject()); - return; + if (proxy == null) { + throw new HadoopIllegalArgumentException( + "Cannot close proxy since it is null"); + } + try { + if (proxy instanceof Closeable) { + ((Closeable) proxy).close(); + return; + } else { + InvocationHandler handler = Proxy.getInvocationHandler(proxy); + if (handler instanceof Closeable) { + ((Closeable) handler).close(); + return; + } + } + } catch (IOException e) { + LOG.error("Closing proxy or invocation handler caused exception", e); + } catch (IllegalArgumentException e) { + LOG.error("RPC.stopProxy called on non proxy.", e); } - InvocationHandler invocationHandler = null; - try { - invocationHandler = Proxy.getInvocationHandler(proxy); - } catch (IllegalArgumentException e) { - LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e); - } - if (proxy != null && invocationHandler != null && - invocationHandler instanceof Closeable) { - try { - ((Closeable)invocationHandler).close(); - } catch (IOException e) { - LOG.error("Stopping RPC invocation handler caused exception", e); - } - } else { - LOG.error("Could not get invocation handler " + invocationHandler + - " for proxy class " + (proxy == null ? null : proxy.getClass()) + - ", or invocation handler is not closeable."); - } + throw new HadoopIllegalArgumentException( + "Cannot close proxy - is not Closeable or " + + "does not provide closeable invocation handler " + + proxy.getClass()); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 49e1ed6453..6a3b0f718f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -33,6 +33,7 @@ import javax.net.SocketFactory; import org.apache.commons.logging.*; +import org.apache.hadoop.HadoopIllegalArgumentException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; @@ -584,9 +585,10 @@ public void testNoPings() throws Exception { * Test stopping a non-registered proxy * @throws Exception */ - @Test + @Test(expected=HadoopIllegalArgumentException.class) public void testStopNonRegisteredProxy() throws Exception { RPC.stopProxy(mock(TestProtocol.class)); + RPC.stopProxy(null); } @Test