HADOOP-13546. Override equals and hashCode of the default retry policy to avoid connection leakage. Contributed by Xiaobing Zhou.
This commit is contained in:
parent
db6d243cf8
commit
08d8e0ba25
@ -183,6 +183,20 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
|
||||
"and fail.");
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean equals(Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
} else {
|
||||
return obj != null && obj.getClass() == this.getClass();
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return this.getClass().hashCode();
|
||||
}
|
||||
}
|
||||
|
||||
static class RetryForever implements RetryPolicy {
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies.MultipleLinearRandomRetry;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
|
||||
import com.google.protobuf.ServiceException;
|
||||
@ -79,7 +80,23 @@ public static RetryPolicy getDefaultRetryPolicy(
|
||||
//no retry
|
||||
return RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
} else {
|
||||
return new RetryPolicy() {
|
||||
return new WrapperRetryPolicy(
|
||||
(MultipleLinearRandomRetry) multipleLinearRandomRetry,
|
||||
remoteExceptionToRetry);
|
||||
}
|
||||
}
|
||||
|
||||
private static final class WrapperRetryPolicy implements RetryPolicy {
|
||||
private MultipleLinearRandomRetry multipleLinearRandomRetry;
|
||||
private String remoteExceptionToRetry;
|
||||
|
||||
private WrapperRetryPolicy(
|
||||
final MultipleLinearRandomRetry multipleLinearRandomRetry,
|
||||
final String remoteExceptionToRetry) {
|
||||
this.multipleLinearRandomRetry = multipleLinearRandomRetry;
|
||||
this.remoteExceptionToRetry = remoteExceptionToRetry;
|
||||
}
|
||||
|
||||
@Override
|
||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
boolean isMethodIdempotent) throws Exception {
|
||||
@ -99,8 +116,8 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
p = multipleLinearRandomRetry;
|
||||
} else if (e instanceof RemoteException) {
|
||||
final RemoteException re = (RemoteException)e;
|
||||
p = remoteExceptionToRetry.equals(re.getClassName())?
|
||||
multipleLinearRandomRetry: RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
p = re.getClassName().equals(remoteExceptionToRetry)
|
||||
? multipleLinearRandomRetry : RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||
} else if (e instanceof IOException || e instanceof ServiceException) {
|
||||
p = multipleLinearRandomRetry;
|
||||
} else { //non-IOException
|
||||
@ -114,13 +131,34 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||
return p.shouldRetry(e, retries, failovers, isMethodIdempotent);
|
||||
}
|
||||
|
||||
/**
|
||||
* remoteExceptionToRetry is ignored as part of equals since it does not
|
||||
* affect connection failure handling.
|
||||
*/
|
||||
@Override
|
||||
public boolean equals(final Object obj) {
|
||||
if (obj == this) {
|
||||
return true;
|
||||
} else {
|
||||
return (obj instanceof WrapperRetryPolicy)
|
||||
&& this.multipleLinearRandomRetry
|
||||
.equals(((WrapperRetryPolicy) obj).multipleLinearRandomRetry);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Similarly, remoteExceptionToRetry is ignored as part of hashCode since it
|
||||
* does not affect connection failure handling.
|
||||
*/
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return multipleLinearRandomRetry.hashCode();
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "RetryPolicy[" + multipleLinearRandomRetry + ", "
|
||||
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName()
|
||||
+ "]";
|
||||
}
|
||||
};
|
||||
+ RetryPolicies.TRY_ONCE_THEN_FAIL.getClass().getSimpleName() + "]";
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -0,0 +1,154 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.apache.hadoop.ipc.RpcNoSuchMethodException;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class mainly tests behaviors of various retry policies in connection
|
||||
* level.
|
||||
*/
|
||||
public class TestConnectionRetryPolicy {
|
||||
private static RetryPolicy getDefaultRetryPolicy(
|
||||
final boolean defaultRetryPolicyEnabled,
|
||||
final String defaultRetryPolicySpec,
|
||||
final String remoteExceptionToRetry) {
|
||||
return getDefaultRetryPolicy(
|
||||
new Configuration(),
|
||||
defaultRetryPolicyEnabled,
|
||||
defaultRetryPolicySpec,
|
||||
remoteExceptionToRetry);
|
||||
}
|
||||
|
||||
private static RetryPolicy getDefaultRetryPolicy(
|
||||
final boolean defaultRetryPolicyEnabled,
|
||||
final String defaultRetryPolicySpec) {
|
||||
return getDefaultRetryPolicy(
|
||||
new Configuration(),
|
||||
defaultRetryPolicyEnabled,
|
||||
defaultRetryPolicySpec,
|
||||
"");
|
||||
}
|
||||
|
||||
public static RetryPolicy getDefaultRetryPolicy(
|
||||
final Configuration conf,
|
||||
final boolean defaultRetryPolicyEnabled,
|
||||
final String defaultRetryPolicySpec,
|
||||
final String remoteExceptionToRetry) {
|
||||
return RetryUtils.getDefaultRetryPolicy(
|
||||
conf,
|
||||
"org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
|
||||
defaultRetryPolicyEnabled,
|
||||
"org.apache.hadoop.io.retry.TestConnectionRetryPolicy.No.Such.Key",
|
||||
defaultRetryPolicySpec,
|
||||
"");
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDefaultRetryPolicyEquivalence() {
|
||||
RetryPolicy rp1 = null;
|
||||
RetryPolicy rp2 = null;
|
||||
RetryPolicy rp3 = null;
|
||||
|
||||
/* test the same setting */
|
||||
rp1 = getDefaultRetryPolicy(true, "10000,2");
|
||||
rp2 = getDefaultRetryPolicy(true, "10000,2");
|
||||
rp3 = getDefaultRetryPolicy(true, "10000,2");
|
||||
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
|
||||
|
||||
/* test different remoteExceptionToRetry */
|
||||
rp1 = getDefaultRetryPolicy(
|
||||
true,
|
||||
"10000,2",
|
||||
new RemoteException(
|
||||
PathIOException.class.getName(),
|
||||
"path IO exception").getClassName());
|
||||
rp2 = getDefaultRetryPolicy(
|
||||
true,
|
||||
"10000,2",
|
||||
new RemoteException(
|
||||
RpcNoSuchMethodException.class.getName(),
|
||||
"no such method exception").getClassName());
|
||||
rp3 = getDefaultRetryPolicy(
|
||||
true,
|
||||
"10000,2",
|
||||
new RemoteException(
|
||||
RetriableException.class.getName(),
|
||||
"retriable exception").getClassName());
|
||||
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
|
||||
|
||||
/* test enabled and different specifications */
|
||||
rp1 = getDefaultRetryPolicy(true, "20000,3");
|
||||
rp2 = getDefaultRetryPolicy(true, "30000,4");
|
||||
assertNotEquals("should not be equal", rp1, rp2);
|
||||
assertNotEquals(
|
||||
"should not have the same hash code",
|
||||
rp1.hashCode(),
|
||||
rp2.hashCode());
|
||||
|
||||
/* test disabled and the same specifications */
|
||||
rp1 = getDefaultRetryPolicy(false, "40000,5");
|
||||
rp2 = getDefaultRetryPolicy(false, "40000,5");
|
||||
assertEquals("should be equal", rp1, rp2);
|
||||
assertEquals(
|
||||
"should have the same hash code",
|
||||
rp1, rp2);
|
||||
|
||||
/* test the disabled and different specifications */
|
||||
rp1 = getDefaultRetryPolicy(false, "50000,6");
|
||||
rp2 = getDefaultRetryPolicy(false, "60000,7");
|
||||
assertEquals("should be equal", rp1, rp2);
|
||||
assertEquals(
|
||||
"should have the same hash code",
|
||||
rp1, rp2);
|
||||
}
|
||||
|
||||
public static RetryPolicy newTryOnceThenFail() {
|
||||
return new RetryPolicies.TryOnceThenFail();
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testTryOnceThenFailEquivalence() throws Exception {
|
||||
final RetryPolicy rp1 = newTryOnceThenFail();
|
||||
final RetryPolicy rp2 = newTryOnceThenFail();
|
||||
final RetryPolicy rp3 = newTryOnceThenFail();
|
||||
verifyRetryPolicyEquivalence(new RetryPolicy[] {rp1, rp2, rp3});
|
||||
}
|
||||
|
||||
private void verifyRetryPolicyEquivalence(RetryPolicy[] polices) {
|
||||
for (int i = 0; i < polices.length; i++) {
|
||||
for (int j = 0; j < polices.length; j++) {
|
||||
if (i != j) {
|
||||
assertEquals("should be equal", polices[i], polices[j]);
|
||||
assertEquals(
|
||||
"should have the same hash code",
|
||||
polices[i].hashCode(),
|
||||
polices[j].hashCode());
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -0,0 +1,166 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.ipc;
|
||||
import static org.junit.Assert.*;
|
||||
import java.util.Set;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
|
||||
import org.apache.hadoop.fs.PathIOException;
|
||||
import org.apache.hadoop.io.retry.RetryPolicies;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.io.retry.TestConnectionRetryPolicy;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.TestRpcBase.TestRpcService;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
/**
|
||||
* This class mainly tests behaviors of reusing RPC connections for various
|
||||
* retry policies.
|
||||
*/
|
||||
public class TestReuseRpcConnections extends TestRpcBase {
|
||||
@Before
|
||||
public void setup() {
|
||||
setupConf();
|
||||
}
|
||||
|
||||
private static RetryPolicy getDefaultRetryPolicy(
|
||||
final boolean defaultRetryPolicyEnabled,
|
||||
final String defaultRetryPolicySpec) {
|
||||
return TestConnectionRetryPolicy.getDefaultRetryPolicy(
|
||||
conf,
|
||||
defaultRetryPolicyEnabled,
|
||||
defaultRetryPolicySpec,
|
||||
"");
|
||||
}
|
||||
|
||||
private static RetryPolicy getDefaultRetryPolicy(
|
||||
final boolean defaultRetryPolicyEnabled,
|
||||
final String defaultRetryPolicySpec,
|
||||
final String remoteExceptionToRetry) {
|
||||
return TestConnectionRetryPolicy.getDefaultRetryPolicy(
|
||||
conf,
|
||||
defaultRetryPolicyEnabled,
|
||||
defaultRetryPolicySpec,
|
||||
remoteExceptionToRetry);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testDefaultRetryPolicyReuseConnections() throws Exception {
|
||||
RetryPolicy rp1 = null;
|
||||
RetryPolicy rp2 = null;
|
||||
RetryPolicy rp3 = null;
|
||||
|
||||
/* test the same setting */
|
||||
rp1 = getDefaultRetryPolicy(true, "10000,2");
|
||||
rp2 = getDefaultRetryPolicy(true, "10000,2");
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
|
||||
|
||||
/* test enabled and different specifications */
|
||||
rp1 = getDefaultRetryPolicy(true, "20000,3");
|
||||
rp2 = getDefaultRetryPolicy(true, "20000,3");
|
||||
rp3 = getDefaultRetryPolicy(true, "30000,4");
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, rp3);
|
||||
|
||||
/* test disabled and the same specifications */
|
||||
rp1 = getDefaultRetryPolicy(false, "40000,5");
|
||||
rp2 = getDefaultRetryPolicy(false, "40000,5");
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
|
||||
|
||||
/* test disabled and different specifications */
|
||||
rp1 = getDefaultRetryPolicy(false, "50000,6");
|
||||
rp2 = getDefaultRetryPolicy(false, "60000,7");
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
|
||||
|
||||
/* test different remoteExceptionToRetry */
|
||||
rp1 = getDefaultRetryPolicy(
|
||||
true,
|
||||
"70000,8",
|
||||
new RemoteException(
|
||||
RpcNoSuchMethodException.class.getName(),
|
||||
"no such method exception").getClassName());
|
||||
rp2 = getDefaultRetryPolicy(
|
||||
true,
|
||||
"70000,8",
|
||||
new RemoteException(
|
||||
PathIOException.class.getName(),
|
||||
"path IO exception").getClassName());
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
|
||||
}
|
||||
|
||||
@Test(timeout = 60000)
|
||||
public void testRetryPolicyTryOnceThenFail() throws Exception {
|
||||
final RetryPolicy rp1 = TestConnectionRetryPolicy.newTryOnceThenFail();
|
||||
final RetryPolicy rp2 = TestConnectionRetryPolicy.newTryOnceThenFail();
|
||||
verifyRetryPolicyReuseConnections(rp1, rp2, RetryPolicies.RETRY_FOREVER);
|
||||
}
|
||||
|
||||
private void verifyRetryPolicyReuseConnections(
|
||||
final RetryPolicy retryPolicy1,
|
||||
final RetryPolicy retryPolicy2,
|
||||
final RetryPolicy anotherRetryPolicy) throws Exception {
|
||||
final Server server = setupTestServer(conf, 2);
|
||||
final Configuration newConf = new Configuration(conf);
|
||||
newConf.set(
|
||||
CommonConfigurationKeysPublic
|
||||
.HADOOP_RPC_SOCKET_FACTORY_CLASS_DEFAULT_KEY,
|
||||
"");
|
||||
Client client = null;
|
||||
TestRpcService proxy1 = null;
|
||||
TestRpcService proxy2 = null;
|
||||
TestRpcService proxy3 = null;
|
||||
|
||||
try {
|
||||
proxy1 = getClient(addr, newConf, retryPolicy1);
|
||||
proxy1.ping(null, newEmptyRequest());
|
||||
client = ProtobufRpcEngine.getClient(newConf);
|
||||
final Set<ConnectionId> conns = client.getConnectionIds();
|
||||
assertEquals("number of connections in cache is wrong", 1, conns.size());
|
||||
|
||||
/*
|
||||
* another equivalent retry policy, reuse connection
|
||||
*/
|
||||
proxy2 = getClient(addr, newConf, retryPolicy2);
|
||||
proxy2.ping(null, newEmptyRequest());
|
||||
assertEquals("number of connections in cache is wrong", 1, conns.size());
|
||||
|
||||
/*
|
||||
* different retry policy, create a new connection
|
||||
*/
|
||||
proxy3 = getClient(addr, newConf, anotherRetryPolicy);
|
||||
proxy3.ping(null, newEmptyRequest());
|
||||
assertEquals("number of connections in cache is wrong", 2, conns.size());
|
||||
} finally {
|
||||
server.stop();
|
||||
// this is dirty, but clear out connection cache for next run
|
||||
if (client != null) {
|
||||
client.getConnectionIds().clear();
|
||||
}
|
||||
if (proxy1 != null) {
|
||||
RPC.stopProxy(proxy1);
|
||||
}
|
||||
if (proxy2 != null) {
|
||||
RPC.stopProxy(proxy2);
|
||||
}
|
||||
if (proxy3 != null) {
|
||||
RPC.stopProxy(proxy3);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
@ -30,18 +30,15 @@
|
||||
import org.junit.Assert;
|
||||
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
import org.apache.hadoop.ipc.protobuf.TestRpcServiceProtos;
|
||||
import org.apache.hadoop.net.NetUtils;
|
||||
import org.apache.hadoop.security.KerberosInfo;
|
||||
import org.apache.hadoop.security.SaslRpcServer.AuthMethod;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.SecretManager;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
import org.apache.hadoop.security.token.TokenInfo;
|
||||
import org.apache.hadoop.security.token.TokenSelector;
|
||||
import org.junit.Assert;
|
||||
|
||||
import java.io.DataInput;
|
||||
import java.io.DataOutput;
|
||||
@ -132,6 +129,24 @@ protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
}
|
||||
}
|
||||
|
||||
protected static TestRpcService getClient(InetSocketAddress serverAddr,
|
||||
Configuration clientConf, final RetryPolicy connectionRetryPolicy)
|
||||
throws ServiceException {
|
||||
try {
|
||||
return RPC.getProtocolProxy(
|
||||
TestRpcService.class,
|
||||
0,
|
||||
serverAddr,
|
||||
UserGroupInformation.getCurrentUser(),
|
||||
clientConf,
|
||||
NetUtils.getDefaultSocketFactory(clientConf),
|
||||
RPC.getRpcTimeout(clientConf),
|
||||
connectionRetryPolicy, null).getProxy();
|
||||
} catch (IOException e) {
|
||||
throw new ServiceException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected static void stop(Server server, TestRpcService proxy) {
|
||||
if (proxy != null) {
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user