HADOOP-8202. RPC stopProxy() does not close the proxy correctly. Contributed by Hari Mankude.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305704 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
466908e67b
commit
600e327a56
@ -286,6 +286,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
|
|
||||||
HADOOP-8204. TestHealthMonitor fails occasionally (todd)
|
HADOOP-8204. TestHealthMonitor fails occasionally (todd)
|
||||||
|
|
||||||
|
HADOOP-8202. RPC stopProxy() does not close the proxy correctly.
|
||||||
|
(Hari Mankude via suresh)
|
||||||
|
|
||||||
BREAKDOWN OF HADOOP-7454 SUBTASKS
|
BREAKDOWN OF HADOOP-7454 SUBTASKS
|
||||||
|
|
||||||
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
|
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)
|
||||||
|
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.ha;
|
package org.apache.hadoop.ha;
|
||||||
|
|
||||||
|
import java.io.Closeable;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.LinkedList;
|
import java.util.LinkedList;
|
||||||
@ -194,7 +195,9 @@ private void doHealthChecks() throws InterruptedException {
|
|||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
LOG.warn("Transport-level exception trying to monitor health of " +
|
LOG.warn("Transport-level exception trying to monitor health of " +
|
||||||
targetToMonitor + ": " + t.getLocalizedMessage());
|
targetToMonitor + ": " + t.getLocalizedMessage());
|
||||||
RPC.stopProxy(proxy);
|
if (proxy instanceof Closeable) {
|
||||||
|
RPC.stopProxy(proxy);
|
||||||
|
}
|
||||||
proxy = null;
|
proxy = null;
|
||||||
enterState(State.SERVICE_NOT_RESPONDING);
|
enterState(State.SERVICE_NOT_RESPONDING);
|
||||||
Thread.sleep(sleepAfterDisconnectMillis);
|
Thread.sleep(sleepAfterDisconnectMillis);
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
|
|
||||||
import org.apache.commons.logging.*;
|
import org.apache.commons.logging.*;
|
||||||
|
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.io.*;
|
import org.apache.hadoop.io.*;
|
||||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||||
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
import org.apache.hadoop.ipc.RpcPayloadHeader.RpcKind;
|
||||||
@ -572,38 +573,41 @@ public static <T> ProtocolProxy<T> getProtocolProxy(Class<T> protocol,
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Stop this proxy and release its invoker's resource by getting the
|
* Stop the proxy. Proxy must either implement {@link Closeable} or must have
|
||||||
* invocation handler for the given proxy object and calling
|
* associated {@link RpcInvocationHandler}.
|
||||||
* {@link Closeable#close} if that invocation handler implements
|
|
||||||
* {@link Closeable}.
|
|
||||||
*
|
*
|
||||||
* @param proxy the RPC proxy object to be stopped
|
* @param proxy
|
||||||
|
* the RPC proxy object to be stopped
|
||||||
|
* @throws HadoopIllegalArgumentException
|
||||||
|
* if the proxy does not implement {@link Closeable} interface or
|
||||||
|
* does not have closeable {@link InvocationHandler}
|
||||||
*/
|
*/
|
||||||
public static void stopProxy(Object proxy) {
|
public static void stopProxy(Object proxy) {
|
||||||
if (proxy instanceof ProtocolTranslator) {
|
if (proxy == null) {
|
||||||
RPC.stopProxy(((ProtocolTranslator)proxy)
|
throw new HadoopIllegalArgumentException(
|
||||||
.getUnderlyingProxyObject());
|
"Cannot close proxy since it is null");
|
||||||
return;
|
}
|
||||||
|
try {
|
||||||
|
if (proxy instanceof Closeable) {
|
||||||
|
((Closeable) proxy).close();
|
||||||
|
return;
|
||||||
|
} else {
|
||||||
|
InvocationHandler handler = Proxy.getInvocationHandler(proxy);
|
||||||
|
if (handler instanceof Closeable) {
|
||||||
|
((Closeable) handler).close();
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Closing proxy or invocation handler caused exception", e);
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
LOG.error("RPC.stopProxy called on non proxy.", e);
|
||||||
}
|
}
|
||||||
|
|
||||||
InvocationHandler invocationHandler = null;
|
throw new HadoopIllegalArgumentException(
|
||||||
try {
|
"Cannot close proxy - is not Closeable or "
|
||||||
invocationHandler = Proxy.getInvocationHandler(proxy);
|
+ "does not provide closeable invocation handler "
|
||||||
} catch (IllegalArgumentException e) {
|
+ proxy.getClass());
|
||||||
LOG.error("Tried to call RPC.stopProxy on an object that is not a proxy.", e);
|
|
||||||
}
|
|
||||||
if (proxy != null && invocationHandler != null &&
|
|
||||||
invocationHandler instanceof Closeable) {
|
|
||||||
try {
|
|
||||||
((Closeable)invocationHandler).close();
|
|
||||||
} catch (IOException e) {
|
|
||||||
LOG.error("Stopping RPC invocation handler caused exception", e);
|
|
||||||
}
|
|
||||||
} else {
|
|
||||||
LOG.error("Could not get invocation handler " + invocationHandler +
|
|
||||||
" for proxy class " + (proxy == null ? null : proxy.getClass()) +
|
|
||||||
", or invocation handler is not closeable.");
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -33,6 +33,7 @@
|
|||||||
import javax.net.SocketFactory;
|
import javax.net.SocketFactory;
|
||||||
|
|
||||||
import org.apache.commons.logging.*;
|
import org.apache.commons.logging.*;
|
||||||
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.io.UTF8;
|
import org.apache.hadoop.io.UTF8;
|
||||||
@ -584,9 +585,10 @@ public void testNoPings() throws Exception {
|
|||||||
* Test stopping a non-registered proxy
|
* Test stopping a non-registered proxy
|
||||||
* @throws Exception
|
* @throws Exception
|
||||||
*/
|
*/
|
||||||
@Test
|
@Test(expected=HadoopIllegalArgumentException.class)
|
||||||
public void testStopNonRegisteredProxy() throws Exception {
|
public void testStopNonRegisteredProxy() throws Exception {
|
||||||
RPC.stopProxy(mock(TestProtocol.class));
|
RPC.stopProxy(mock(TestProtocol.class));
|
||||||
|
RPC.stopProxy(null);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
Loading…
Reference in New Issue
Block a user