HADOOP-7635. RetryInvocationHandler should release underlying resources on close (atm)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1171221 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
b9af522c2a
commit
e66d697a66
@ -11,6 +11,9 @@ Trunk (unreleased changes)
|
||||
|
||||
HADOOP-7607. Simplify the RPC proxy cleanup process. (atm)
|
||||
|
||||
HADOOP-7635. RetryInvocationHandler should release underlying resources on
|
||||
close (atm)
|
||||
|
||||
BUGS
|
||||
|
||||
HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required
|
||||
|
@ -17,7 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import java.io.IOException;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
||||
/**
|
||||
* An implementation of {@link FailoverProxyProvider} which does nothing in the
|
||||
@ -49,4 +52,9 @@ public void performFailover(Object currentProxy) {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import java.io.Closeable;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
|
||||
/**
|
||||
@ -27,7 +29,7 @@
|
||||
* {@link RetryPolicy}.
|
||||
*/
|
||||
@InterfaceStability.Evolving
|
||||
public interface FailoverProxyProvider {
|
||||
public interface FailoverProxyProvider extends Closeable {
|
||||
|
||||
/**
|
||||
* Get the proxy object which should be used until the next failover event
|
||||
|
@ -17,6 +17,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.InvocationTargetException;
|
||||
import java.lang.reflect.Method;
|
||||
@ -27,7 +29,7 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
|
||||
class RetryInvocationHandler implements InvocationHandler {
|
||||
class RetryInvocationHandler implements InvocationHandler, Closeable {
|
||||
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
||||
private FailoverProxyProvider proxyProvider;
|
||||
|
||||
@ -103,4 +105,9 @@ private Object invokeMethod(Method method, Object[] args) throws Throwable {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
proxyProvider.close();
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -58,6 +58,11 @@ public Class<?> getInterface() {
|
||||
return iface;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
// Nothing to do.
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
public static class FailOverOnceOnAnyExceptionPolicy implements RetryPolicy {
|
||||
|
@ -18,28 +18,38 @@
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
|
||||
import java.io.Closeable;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.lang.management.ManagementFactory;
|
||||
import java.lang.management.ThreadInfo;
|
||||
import java.lang.management.ThreadMXBean;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
import java.util.Arrays;
|
||||
|
||||
import junit.framework.TestCase;
|
||||
import javax.net.SocketFactory;
|
||||
|
||||
import org.apache.commons.logging.*;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.io.UTF8;
|
||||
import org.apache.hadoop.io.Writable;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryProxy;
|
||||
import org.apache.hadoop.metrics2.MetricsRecordBuilder;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.authorize.AuthorizationException;
|
||||
import org.apache.hadoop.security.authorize.PolicyProvider;
|
||||
import org.apache.hadoop.security.authorize.Service;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.junit.Test;
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import com.google.protobuf.DescriptorProtos;
|
||||
import com.google.protobuf.DescriptorProtos.EnumDescriptorProto;
|
||||
@ -49,7 +59,8 @@
|
||||
import static org.mockito.Mockito.*;
|
||||
|
||||
/** Unit tests for RPC. */
|
||||
public class TestRPC extends TestCase {
|
||||
@SuppressWarnings("deprecation")
|
||||
public class TestRPC {
|
||||
private static final String ADDRESS = "0.0.0.0";
|
||||
|
||||
public static final Log LOG =
|
||||
@ -57,11 +68,14 @@ public class TestRPC extends TestCase {
|
||||
|
||||
private static Configuration conf = new Configuration();
|
||||
|
||||
static {
|
||||
conf.setClass("rpc.engine." + StoppedProtocol.class.getName(),
|
||||
StoppedRpcEngine.class, RpcEngine.class);
|
||||
}
|
||||
|
||||
int datasize = 1024*100;
|
||||
int numThreads = 50;
|
||||
|
||||
public TestRPC(String name) { super(name); }
|
||||
|
||||
public interface TestProtocol extends VersionedProtocol {
|
||||
public static final long versionID = 1L;
|
||||
|
||||
@ -207,6 +221,74 @@ public void run() {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* A basic interface for testing client-side RPC resource cleanup.
|
||||
*/
|
||||
private static interface StoppedProtocol {
|
||||
long versionID = 0;
|
||||
|
||||
public void stop();
|
||||
}
|
||||
|
||||
/**
|
||||
* A class used for testing cleanup of client side RPC resources.
|
||||
*/
|
||||
private static class StoppedRpcEngine implements RpcEngine {
|
||||
|
||||
@Override
|
||||
public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs,
|
||||
UserGroupInformation ticket, Configuration conf)
|
||||
throws IOException, InterruptedException {
|
||||
return null;
|
||||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
@Override
|
||||
public <T> ProtocolProxy<T> getProxy(Class<T> protocol, long clientVersion,
|
||||
InetSocketAddress addr, UserGroupInformation ticket, Configuration conf,
|
||||
SocketFactory factory, int rpcTimeout) throws IOException {
|
||||
T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(),
|
||||
new Class[] { protocol }, new StoppedInvocationHandler());
|
||||
return new ProtocolProxy<T>(protocol, proxy, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public org.apache.hadoop.ipc.RPC.Server getServer(Class<?> protocol,
|
||||
Object instance, String bindAddress, int port, int numHandlers,
|
||||
int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf,
|
||||
SecretManager<? extends TokenIdentifier> secretManager) throws IOException {
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
/**
|
||||
* An invocation handler which does nothing when invoking methods, and just
|
||||
* counts the number of times close() is called.
|
||||
*/
|
||||
private static class StoppedInvocationHandler
|
||||
implements InvocationHandler, Closeable {
|
||||
|
||||
private int closeCalled = 0;
|
||||
|
||||
@Override
|
||||
public Object invoke(Object proxy, Method method, Object[] args)
|
||||
throws Throwable {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() throws IOException {
|
||||
closeCalled++;
|
||||
}
|
||||
|
||||
public int getCloseCalled() {
|
||||
return closeCalled;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConfRpc() throws Exception {
|
||||
Server server = RPC.getServer(TestProtocol.class,
|
||||
new TestImpl(), ADDRESS, 0, 1, false, conf, null);
|
||||
@ -229,6 +311,7 @@ public void testConfRpc() throws Exception {
|
||||
server.stop();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSlowRpc() throws Exception {
|
||||
System.out.println("Testing Slow RPC");
|
||||
// create a server with two handlers
|
||||
@ -273,11 +356,12 @@ public void testSlowRpc() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
public void testRPCConf(Configuration conf) throws Exception {
|
||||
|
||||
@Test
|
||||
public void testCalls() throws Exception {
|
||||
testCallsInternal(conf);
|
||||
}
|
||||
|
||||
public void testCalls(Configuration conf) throws Exception {
|
||||
private void testCallsInternal(Configuration conf) throws Exception {
|
||||
Server server = RPC.getServer(TestProtocol.class,
|
||||
new TestImpl(), ADDRESS, 0, conf);
|
||||
TestProtocol proxy = null;
|
||||
@ -384,6 +468,7 @@ public void testCalls(Configuration conf) throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStandaloneClient() throws IOException {
|
||||
try {
|
||||
TestProtocol proxy = RPC.waitForProxy(TestProtocol.class,
|
||||
@ -450,6 +535,7 @@ private void doRPCs(Configuration conf, boolean expectFailure) throws Exception
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAuthorization() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION,
|
||||
@ -481,20 +567,48 @@ public void testNoPings() throws Exception {
|
||||
Configuration conf = new Configuration();
|
||||
|
||||
conf.setBoolean("ipc.client.ping", false);
|
||||
new TestRPC("testnoPings").testCalls(conf);
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
||||
conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2);
|
||||
new TestRPC("testnoPings").testCalls(conf);
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test stopping a non-registered proxy
|
||||
* @throws Exception
|
||||
*/
|
||||
@Test
|
||||
public void testStopNonRegisteredProxy() throws Exception {
|
||||
RPC.stopProxy(mock(TestProtocol.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testStopProxy() throws IOException {
|
||||
StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
|
||||
StoppedProtocol.versionID, null, conf);
|
||||
StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
|
||||
Proxy.getInvocationHandler(proxy);
|
||||
assertEquals(invocationHandler.getCloseCalled(), 0);
|
||||
RPC.stopProxy(proxy);
|
||||
assertEquals(invocationHandler.getCloseCalled(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testWrappedStopProxy() throws IOException {
|
||||
StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class,
|
||||
StoppedProtocol.versionID, null, conf);
|
||||
StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler)
|
||||
Proxy.getInvocationHandler(wrappedProxy);
|
||||
|
||||
StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class,
|
||||
wrappedProxy, RetryPolicies.RETRY_FOREVER);
|
||||
|
||||
assertEquals(invocationHandler.getCloseCalled(), 0);
|
||||
RPC.stopProxy(proxy);
|
||||
assertEquals(invocationHandler.getCloseCalled(), 1);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testErrorMsgForInsecureClient() throws Exception {
|
||||
final Server server = RPC.getServer(TestProtocol.class,
|
||||
new TestImpl(), ADDRESS, 0, 5, true, conf, null);
|
||||
@ -567,10 +681,10 @@ private static int countThreads(String search) {
|
||||
return count;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Test that server.stop() properly stops all threads
|
||||
*/
|
||||
@Test
|
||||
public void testStopsAllThreads() throws Exception {
|
||||
int threadsBefore = countThreads("Server$Listener$Reader");
|
||||
assertEquals("Expect no Reader threads running before test",
|
||||
@ -591,8 +705,7 @@ public void testStopsAllThreads() throws Exception {
|
||||
}
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
|
||||
new TestRPC("test").testCalls(conf);
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user