From 08d8e0ba259f01465a83d8db09466dfd46b7ec81 Mon Sep 17 00:00:00 2001 From: Jing Zhao Date: Tue, 13 Sep 2016 11:12:52 -0700 Subject: [PATCH] HADOOP-13546. Override equals and hashCode of the default retry policy to avoid connection leakage. Contributed by Xiaobing Zhou. --- .../apache/hadoop/io/retry/RetryPolicies.java | 14 ++ .../apache/hadoop/io/retry/RetryUtils.java | 114 ++++++++---- .../io/retry/TestConnectionRetryPolicy.java | 154 ++++++++++++++++ .../hadoop/ipc/TestReuseRpcConnections.java | 166 ++++++++++++++++++ .../org/apache/hadoop/ipc/TestRpcBase.java | 23 ++- 5 files changed, 429 insertions(+), 42 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java index c0a14b7051..0c523a5d23 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicies.java @@ -183,6 +183,20 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers, return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " + "and fail."); } + + @Override + public boolean equals(Object obj) { + if (obj == this) { + return true; + } else { + return obj != null && obj.getClass() == this.getClass(); + } + } + + @Override + public int hashCode() { + return this.getClass().hashCode(); + } } static class RetryForever implements RetryPolicy { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java index a5a7624404..15a9b54432 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryUtils.java @@ -22,6 +22,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry; import org.apache.hadoop.ipc.RemoteException; import com.google.protobuf.ServiceException; @@ -79,48 +80,85 @@ public static RetryPolicy getDefaultRetryPolicy( //no retry return RetryPolicies.TRY_ONCE_THEN_FAIL; } else { - return new RetryPolicy() { - @Override - public RetryAction shouldRetry(Exception e, int retries, int failovers, - boolean isMethodIdempotent) throws Exception { - if (e instanceof ServiceException) { - //unwrap ServiceException - final Throwable cause = e.getCause(); - if (cause != null && cause instanceof Exception) { - e = (Exception)cause; - } - } + return new WrapperRetryPolicy( + (MultipleLinearRandomRetry) multipleLinearRandomRetry, + remoteExceptionToRetry); + } + } - //see (1) and (2) in the javadoc of this method. - final RetryPolicy p; - if (e instanceof RetriableException - || RetryPolicies.getWrappedRetriableException(e) != null) { - // RetriableException or RetriableException wrapped - p = multipleLinearRandomRetry; - } else if (e instanceof RemoteException) { - final RemoteException re = (RemoteException)e; - p = remoteExceptionToRetry.equals(re.getClassName())? - multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL; - } else if (e instanceof IOException || e instanceof ServiceException) { - p = multipleLinearRandomRetry; - } else { //non-IOException - p = RetryPolicies.TRY_ONCE_THEN_FAIL; - } + private static final class WrapperRetryPolicy implements RetryPolicy { + private MultipleLinearRandomRetry multipleLinearRandomRetry; + private String remoteExceptionToRetry; - if (LOG.isDebugEnabled()) { - LOG.debug("RETRY " + retries + ") policy=" - + p.getClass().getSimpleName() + ", exception=" + e); - } - return p.shouldRetry(e, retries, failovers, isMethodIdempotent); + private WrapperRetryPolicy( + final MultipleLinearRandomRetry multipleLinearRandomRetry, + final String remoteExceptionToRetry) { + this.multipleLinearRandomRetry = multipleLinearRandomRetry; + this.remoteExceptionToRetry = remoteExceptionToRetry; + } + + @Override + public RetryAction shouldRetry(Exception e, int retries, int failovers, + boolean isMethodIdempotent) throws Exception { + if (e instanceof ServiceException) { + //unwrap ServiceException + final Throwable cause = e.getCause(); + if (cause != null && cause instanceof Exception) { + e = (Exception)cause; } + } - @Override - public String toString() { - return "RetryPolicy[" + multipleLinearRandomRetry + ", " - + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() - + "]"; - } - }; + //see (1) and (2) in the javadoc of this method. + final RetryPolicy p; + if (e instanceof RetriableException + || RetryPolicies.getWrappedRetriableException(e) != null) { + // RetriableException or RetriableException wrapped + p = multipleLinearRandomRetry; + } else if (e instanceof RemoteException) { + final RemoteException re = (RemoteException)e; + p = re.getClassName().equals(remoteExceptionToRetry) + ? multipleLinearRandomRetry : RetryPolicies.TRY_ONCE_THEN_FAIL; + } else if (e instanceof IOException || e instanceof ServiceException) { + p = multipleLinearRandomRetry; + } else { //non-IOException + p = RetryPolicies.TRY_ONCE_THEN_FAIL; + } + + if (LOG.isDebugEnabled()) { + LOG.debug("RETRY " + retries + ") policy=" + + p.getClass().getSimpleName() + ", exception=" + e); + } + return p.shouldRetry(e, retries, failovers, isMethodIdempotent); + } + + /** + * remoteExceptionToRetry is ignored as part of equals since it does not + * affect connection failure handling. + */ + @Override + public boolean equals(final Object obj) { + if (obj == this) { + return true; + } else { + return (obj instanceof WrapperRetryPolicy) + && this.multipleLinearRandomRetry + .equals(((WrapperRetryPolicy) obj).multipleLinearRandomRetry); + } + } + + /** + * Similarly, remoteExceptionToRetry is ignored as part of hashCode since it + * does not affect connection failure handling. + */ + @Override + public int hashCode() { + return multipleLinearRandomRetry.hashCode(); + } + + @Override + public String toString() { + return "RetryPolicy[" + multipleLinearRandomRetry + ", " + + RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + "]"; } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java new file mode 100644 index 0000000000..05a309d52b --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestConnectionRetryPolicy.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.io.retry; + +import static org.junit.Assert.*; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.RetriableException; +import org.apache.hadoop.ipc.RpcNoSuchMethodException; +import org.junit.Test; + +/** + * This class mainly tests behaviors of various retry policies in connection + * level. + */ +public class TestConnectionRetryPolicy { + private static RetryPolicy getDefaultRetryPolicy( + final boolean defaultRetryPolicyEnabled, + final String defaultRetryPolicySpec, + final String remoteExceptionToRetry) { + return getDefaultRetryPolicy( + new Configuration(), + defaultRetryPolicyEnabled, + defaultRetryPolicySpec, + remoteExceptionToRetry); + } + + private static RetryPolicy getDefaultRetryPolicy( + final boolean defaultRetryPolicyEnabled, + final String defaultRetryPolicySpec) { + return getDefaultRetryPolicy( + new Configuration(), + defaultRetryPolicyEnabled, + defaultRetryPolicySpec, + ""); + } + + public static RetryPolicy getDefaultRetryPolicy( + final Configuration conf, + final boolean defaultRetryPolicyEnabled, + final String defaultRetryPolicySpec, + final String remoteExceptionToRetry) { + return RetryUtils.getDefaultRetryPolicy( + conf, + "org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key", + defaultRetryPolicyEnabled, + "org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key", + defaultRetryPolicySpec, + ""); + } + + @Test(timeout = 60000) + public void testDefaultRetryPolicyEquivalence() { + RetryPolicy rp1 = null; + RetryPolicy rp2 = null; + RetryPolicy rp3 = null; + + /* test the same setting */ + rp1 = getDefaultRetryPolicy(true, "10000,2"); + rp2 = getDefaultRetryPolicy(true, "10000,2"); + rp3 = getDefaultRetryPolicy(true, "10000,2"); + verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3}); + + /* test different remoteExceptionToRetry */ + rp1 = getDefaultRetryPolicy( + true, + "10000,2", + new RemoteException( + PathIOException.class.getName(), + "path IO exception").getClassName()); + rp2 = getDefaultRetryPolicy( + true, + "10000,2", + new RemoteException( + RpcNoSuchMethodException.class.getName(), + "no such method exception").getClassName()); + rp3 = getDefaultRetryPolicy( + true, + "10000,2", + new RemoteException( + RetriableException.class.getName(), + "retriable exception").getClassName()); + verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3}); + + /* test enabled and different specifications */ + rp1 = getDefaultRetryPolicy(true, "20000,3"); + rp2 = getDefaultRetryPolicy(true, "30000,4"); + assertNotEquals("should not be equal", rp1, rp2); + assertNotEquals( + "should not have the same hash code", + rp1.hashCode(), + rp2.hashCode()); + + /* test disabled and the same specifications */ + rp1 = getDefaultRetryPolicy(false, "40000,5"); + rp2 = getDefaultRetryPolicy(false, "40000,5"); + assertEquals("should be equal", rp1, rp2); + assertEquals( + "should have the same hash code", + rp1, rp2); + + /* test the disabled and different specifications */ + rp1 = getDefaultRetryPolicy(false, "50000,6"); + rp2 = getDefaultRetryPolicy(false, "60000,7"); + assertEquals("should be equal", rp1, rp2); + assertEquals( + "should have the same hash code", + rp1, rp2); + } + + public static RetryPolicy newTryOnceThenFail() { + return new RetryPolicies.TryOnceThenFail(); + } + + @Test(timeout = 60000) + public void testTryOnceThenFailEquivalence() throws Exception { + final RetryPolicy rp1 = newTryOnceThenFail(); + final RetryPolicy rp2 = newTryOnceThenFail(); + final RetryPolicy rp3 = newTryOnceThenFail(); + verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3}); + } + + private void verifyRetryPolicyEquivalence(RetryPolicy[] polices) { + for (int i = 0; i < polices.length; i++) { + for (int j = 0; j < polices.length; j++) { + if (i != j) { + assertEquals("should be equal", polices[i], polices[j]); + assertEquals( + "should have the same hash code", + polices[i].hashCode(), + polices[j].hashCode()); + } + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java new file mode 100644 index 0000000000..2729dc3cd9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestReuseRpcConnections.java @@ -0,0 +1,166 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ipc; +import static org.junit.Assert.*; +import java.util.Set; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.PathIOException; +import org.apache.hadoop.io.retry.RetryPolicies; +import org.apache.hadoop.io.retry.RetryPolicy; +import org.apache.hadoop.io.retry.TestConnectionRetryPolicy; +import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.ipc.TestRpcBase.TestRpcService; +import org.junit.Before; +import org.junit.Test; + +/** + * This class mainly tests behaviors of reusing RPC connections for various + * retry policies. + */ +public class TestReuseRpcConnections extends TestRpcBase { + @Before + public void setup() { + setupConf(); + } + + private static RetryPolicy getDefaultRetryPolicy( + final boolean defaultRetryPolicyEnabled, + final String defaultRetryPolicySpec) { + return TestConnectionRetryPolicy.getDefaultRetryPolicy( + conf, + defaultRetryPolicyEnabled, + defaultRetryPolicySpec, + ""); + } + + private static RetryPolicy getDefaultRetryPolicy( + final boolean defaultRetryPolicyEnabled, + final String defaultRetryPolicySpec, + final String remoteExceptionToRetry) { + return TestConnectionRetryPolicy.getDefaultRetryPolicy( + conf, + defaultRetryPolicyEnabled, + defaultRetryPolicySpec, + remoteExceptionToRetry); + } + + @Test(timeout = 60000) + public void testDefaultRetryPolicyReuseConnections() throws Exception { + RetryPolicy rp1 = null; + RetryPolicy rp2 = null; + RetryPolicy rp3 = null; + + /* test the same setting */ + rp1 = getDefaultRetryPolicy(true, "10000,2"); + rp2 = getDefaultRetryPolicy(true, "10000,2"); + verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER); + + /* test enabled and different specifications */ + rp1 = getDefaultRetryPolicy(true, "20000,3"); + rp2 = getDefaultRetryPolicy(true, "20000,3"); + rp3 = getDefaultRetryPolicy(true, "30000,4"); + verifyRetryPolicyReuseConnections(rp1, rp2, rp3); + + /* test disabled and the same specifications */ + rp1 = getDefaultRetryPolicy(false, "40000,5"); + rp2 = getDefaultRetryPolicy(false, "40000,5"); + verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER); + + /* test disabled and different specifications */ + rp1 = getDefaultRetryPolicy(false, "50000,6"); + rp2 = getDefaultRetryPolicy(false, "60000,7"); + verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER); + + /* test different remoteExceptionToRetry */ + rp1 = getDefaultRetryPolicy( + true, + "70000,8", + new RemoteException( + RpcNoSuchMethodException.class.getName(), + "no such method exception").getClassName()); + rp2 = getDefaultRetryPolicy( + true, + "70000,8", + new RemoteException( + PathIOException.class.getName(), + "path IO exception").getClassName()); + verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER); + } + + @Test(timeout = 60000) + public void testRetryPolicyTryOnceThenFail() throws Exception { + final RetryPolicy rp1 = TestConnectionRetryPolicy.newTryOnceThenFail(); + final RetryPolicy rp2 = TestConnectionRetryPolicy.newTryOnceThenFail(); + verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER); + } + + private void verifyRetryPolicyReuseConnections( + final RetryPolicy retryPolicy1, + final RetryPolicy retryPolicy2, + final RetryPolicy anotherRetryPolicy) throws Exception { + final Server server = setupTestServer(conf, 2); + final Configuration newConf = new Configuration(conf); + newConf.set( + CommonConfigurationKeysPublic + .HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY, + ""); + Client client = null; + TestRpcService proxy1 = null; + TestRpcService proxy2 = null; + TestRpcService proxy3 = null; + + try { + proxy1 = getClient(addr, newConf, retryPolicy1); + proxy1.ping(null, newEmptyRequest()); + client = ProtobufRpcEngine.getClient(newConf); + final Set conns = client.getConnectionIds(); + assertEquals("number of connections in cache is wrong", 1, conns.size()); + + /* + * another equivalent retry policy, reuse connection + */ + proxy2 = getClient(addr, newConf, retryPolicy2); + proxy2.ping(null, newEmptyRequest()); + assertEquals("number of connections in cache is wrong", 1, conns.size()); + + /* + * different retry policy, create a new connection + */ + proxy3 = getClient(addr, newConf, anotherRetryPolicy); + proxy3.ping(null, newEmptyRequest()); + assertEquals("number of connections in cache is wrong", 2, conns.size()); + } finally { + server.stop(); + // this is dirty, but clear out connection cache for next run + if (client != null) { + client.getConnectionIds().clear(); + } + if (proxy1 != null) { + RPC.stopProxy(proxy1); + } + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } + if (proxy3 != null) { + RPC.stopProxy(proxy3); + } + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java index bc604a47ef..e991405f0c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRpcBase.java @@ -30,18 +30,15 @@ import org.junit.Assert; import org.apache.hadoop.io.Text; +import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.KerberosInfo; import org.apache.hadoop.security.SaslRpcServer.AuthMethod; -import org.apache.hadoop.security.UserGroupInformation; -import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.Token; import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenInfo; import org.apache.hadoop.security.token.TokenSelector; -import org.junit.Assert; import java.io.DataInput; import java.io.DataOutput; @@ -132,6 +129,24 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr, } } + protected static TestRpcService getClient(InetSocketAddress serverAddr, + Configuration clientConf, final RetryPolicy connectionRetryPolicy) + throws ServiceException { + try { + return RPC.getProtocolProxy( + TestRpcService.class, + 0, + serverAddr, + UserGroupInformation.getCurrentUser(), + clientConf, + NetUtils.getDefaultSocketFactory(clientConf), + RPC.getRpcTimeout(clientConf), + connectionRetryPolicy, null).getProxy(); + } catch (IOException e) { + throw new ServiceException(e); + } + } + protected static void stop(Server server, TestRpcService proxy) { if (proxy != null) { try {