diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 20ca2404ca..336d817e1a 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -11,6 +11,9 @@ Trunk (unreleased changes) HADOOP-7607. Simplify the RPC proxy cleanup process. (atm) + HADOOP-7635. RetryInvocationHandler should release underlying resources on + close (atm) + BUGS HADOOP-7606. Upgrade Jackson to version 1.7.1 to match the version required diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java index 0ea294da0f..812a46e02b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/DefaultFailoverProxyProvider.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.io.retry; +import java.io.IOException; + import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.RPC; /** * An implementation of {@link FailoverProxyProvider} which does nothing in the @@ -49,4 +52,9 @@ public void performFailover(Object currentProxy) { // Nothing to do. } + @Override + public void close() throws IOException { + RPC.stopProxy(proxy); + } + } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java index cb211c2efa..707a40d888 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/FailoverProxyProvider.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.io.retry; +import java.io.Closeable; + import org.apache.hadoop.classification.InterfaceStability; /** @@ -27,7 +29,7 @@ * {@link RetryPolicy}. */ @InterfaceStability.Evolving -public interface FailoverProxyProvider { +public interface FailoverProxyProvider extends Closeable { /** * Get the proxy object which should be used until the next failover event diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 70149e357c..6ea45ced67 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -17,6 +17,8 @@ */ package org.apache.hadoop.io.retry; +import java.io.Closeable; +import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.InvocationTargetException; import java.lang.reflect.Method; @@ -27,7 +29,7 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; -class RetryInvocationHandler implements InvocationHandler { +class RetryInvocationHandler implements InvocationHandler, Closeable { public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); private FailoverProxyProvider proxyProvider; @@ -103,4 +105,9 @@ private Object invokeMethod(Method method, Object[] args) throws Throwable { } } + @Override + public void close() throws IOException { + proxyProvider.close(); + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java index 4664ab342d..295bf13d11 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestFailoverProxy.java @@ -57,6 +57,11 @@ public void performFailover(Object currentProxy) { public Class getInterface() { return iface; } + + @Override + public void close() throws IOException { + // Nothing to do. + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index d14434925d..67fc608cb2 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -18,28 +18,38 @@ package org.apache.hadoop.ipc; +import java.io.Closeable; import java.io.IOException; import java.net.ConnectException; import java.net.InetSocketAddress; import java.lang.management.ManagementFactory; import java.lang.management.ThreadInfo; import java.lang.management.ThreadMXBean; +import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; +import java.lang.reflect.Proxy; import java.util.Arrays; -import junit.framework.TestCase; +import javax.net.SocketFactory; import org.apache.commons.logging.*; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.io.UTF8; import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.authorize.AuthorizationException; import org.apache.hadoop.security.authorize.PolicyProvider; import org.apache.hadoop.security.authorize.Service; +import org.apache.hadoop.security.token.SecretManager; +import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; +import org.junit.Test; +import static org.junit.Assert.*; import com.google.protobuf.DescriptorProtos; import com.google.protobuf.DescriptorProtos.EnumDescriptorProto; @@ -49,18 +59,22 @@ import static org.mockito.Mockito.*; /** Unit tests for RPC. */ -public class TestRPC extends TestCase { +@SuppressWarnings("deprecation") +public class TestRPC { private static final String ADDRESS = "0.0.0.0"; public static final Log LOG = LogFactory.getLog(TestRPC.class); private static Configuration conf = new Configuration(); + + static { + conf.setClass("rpc.engine." + StoppedProtocol.class.getName(), + StoppedRpcEngine.class, RpcEngine.class); + } int datasize = 1024*100; int numThreads = 50; - - public TestRPC(String name) { super(name); } public interface TestProtocol extends VersionedProtocol { public static final long versionID = 1L; @@ -207,6 +221,74 @@ public void run() { } } + /** + * A basic interface for testing client-side RPC resource cleanup. + */ + private static interface StoppedProtocol { + long versionID = 0; + + public void stop(); + } + + /** + * A class used for testing cleanup of client side RPC resources. + */ + private static class StoppedRpcEngine implements RpcEngine { + + @Override + public Object[] call(Method method, Object[][] params, InetSocketAddress[] addrs, + UserGroupInformation ticket, Configuration conf) + throws IOException, InterruptedException { + return null; + } + + @SuppressWarnings("unchecked") + @Override + public ProtocolProxy getProxy(Class protocol, long clientVersion, + InetSocketAddress addr, UserGroupInformation ticket, Configuration conf, + SocketFactory factory, int rpcTimeout) throws IOException { + T proxy = (T) Proxy.newProxyInstance(protocol.getClassLoader(), + new Class[] { protocol }, new StoppedInvocationHandler()); + return new ProtocolProxy(protocol, proxy, false); + } + + @Override + public org.apache.hadoop.ipc.RPC.Server getServer(Class protocol, + Object instance, String bindAddress, int port, int numHandlers, + int numReaders, int queueSizePerHandler, boolean verbose, Configuration conf, + SecretManager secretManager) throws IOException { + return null; + } + + } + + /** + * An invocation handler which does nothing when invoking methods, and just + * counts the number of times close() is called. + */ + private static class StoppedInvocationHandler + implements InvocationHandler, Closeable { + + private int closeCalled = 0; + + @Override + public Object invoke(Object proxy, Method method, Object[] args) + throws Throwable { + return null; + } + + @Override + public void close() throws IOException { + closeCalled++; + } + + public int getCloseCalled() { + return closeCalled; + } + + } + + @Test public void testConfRpc() throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 1, false, conf, null); @@ -229,6 +311,7 @@ public void testConfRpc() throws Exception { server.stop(); } + @Test public void testSlowRpc() throws Exception { System.out.println("Testing Slow RPC"); // create a server with two handlers @@ -273,11 +356,12 @@ public void testSlowRpc() throws Exception { } } - public void testRPCConf(Configuration conf) throws Exception { - + @Test + public void testCalls() throws Exception { + testCallsInternal(conf); } - - public void testCalls(Configuration conf) throws Exception { + + private void testCallsInternal(Configuration conf) throws Exception { Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, conf); TestProtocol proxy = null; @@ -384,6 +468,7 @@ public void testCalls(Configuration conf) throws Exception { } } + @Test public void testStandaloneClient() throws IOException { try { TestProtocol proxy = RPC.waitForProxy(TestProtocol.class, @@ -450,6 +535,7 @@ private void doRPCs(Configuration conf, boolean expectFailure) throws Exception } } + @Test public void testAuthorization() throws Exception { Configuration conf = new Configuration(); conf.setBoolean(CommonConfigurationKeys.HADOOP_SECURITY_AUTHORIZATION, @@ -481,20 +567,48 @@ public void testNoPings() throws Exception { Configuration conf = new Configuration(); conf.setBoolean("ipc.client.ping", false); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); conf.setInt(CommonConfigurationKeys.IPC_SERVER_RPC_READ_THREADS_KEY, 2); - new TestRPC("testnoPings").testCalls(conf); + new TestRPC().testCallsInternal(conf); } /** * Test stopping a non-registered proxy * @throws Exception */ + @Test public void testStopNonRegisteredProxy() throws Exception { RPC.stopProxy(mock(TestProtocol.class)); } + @Test + public void testStopProxy() throws IOException { + StoppedProtocol proxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(proxy); + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test + public void testWrappedStopProxy() throws IOException { + StoppedProtocol wrappedProxy = (StoppedProtocol) RPC.getProxy(StoppedProtocol.class, + StoppedProtocol.versionID, null, conf); + StoppedInvocationHandler invocationHandler = (StoppedInvocationHandler) + Proxy.getInvocationHandler(wrappedProxy); + + StoppedProtocol proxy = (StoppedProtocol) RetryProxy.create(StoppedProtocol.class, + wrappedProxy, RetryPolicies.RETRY_FOREVER); + + assertEquals(invocationHandler.getCloseCalled(), 0); + RPC.stopProxy(proxy); + assertEquals(invocationHandler.getCloseCalled(), 1); + } + + @Test public void testErrorMsgForInsecureClient() throws Exception { final Server server = RPC.getServer(TestProtocol.class, new TestImpl(), ADDRESS, 0, 5, true, conf, null); @@ -567,10 +681,10 @@ private static int countThreads(String search) { return count; } - /** * Test that server.stop() properly stops all threads */ + @Test public void testStopsAllThreads() throws Exception { int threadsBefore = countThreads("Server$Listener$Reader"); assertEquals("Expect no Reader threads running before test", @@ -591,8 +705,7 @@ public void testStopsAllThreads() throws Exception { } public static void main(String[] args) throws Exception { - - new TestRPC("test").testCalls(conf); + new TestRPC().testCallsInternal(conf); } }