YARN-4288. Fixed RMProxy to retry on IOException from local host. Contributed by Junping Du
This commit is contained in:
parent
5eca6dece6
commit
c41699965e
@ -140,6 +140,16 @@ public static final RetryPolicy retryByRemoteException(
|
|||||||
return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
|
return new RemoteExceptionDependentRetry(defaultPolicy, exceptionToPolicyMap);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* A retry policy for exceptions other than RemoteException.
|
||||||
|
*/
|
||||||
|
public static final RetryPolicy retryOtherThanRemoteException(
|
||||||
|
RetryPolicy defaultPolicy,
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap) {
|
||||||
|
return new OtherThanRemoteExceptionDependentRetry(defaultPolicy,
|
||||||
|
exceptionToPolicyMap);
|
||||||
|
}
|
||||||
|
|
||||||
public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
|
public static final RetryPolicy failoverOnNetworkException(int maxFailovers) {
|
||||||
return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
|
return failoverOnNetworkException(TRY_ONCE_THEN_FAIL, maxFailovers);
|
||||||
}
|
}
|
||||||
@ -490,6 +500,36 @@ public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
static class OtherThanRemoteExceptionDependentRetry implements RetryPolicy {
|
||||||
|
|
||||||
|
private RetryPolicy defaultPolicy;
|
||||||
|
private Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap;
|
||||||
|
|
||||||
|
public OtherThanRemoteExceptionDependentRetry(RetryPolicy defaultPolicy,
|
||||||
|
Map<Class<? extends Exception>,
|
||||||
|
RetryPolicy> exceptionToPolicyMap) {
|
||||||
|
this.defaultPolicy = defaultPolicy;
|
||||||
|
this.exceptionToPolicyMap = exceptionToPolicyMap;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||||
|
boolean isIdempotentOrAtMostOnce) throws Exception {
|
||||||
|
RetryPolicy policy = null;
|
||||||
|
// ignore Remote Exception
|
||||||
|
if (e instanceof RemoteException) {
|
||||||
|
// do nothing
|
||||||
|
} else {
|
||||||
|
policy = exceptionToPolicyMap.get(e.getClass());
|
||||||
|
}
|
||||||
|
if (policy == null) {
|
||||||
|
policy = defaultPolicy;
|
||||||
|
}
|
||||||
|
return policy.shouldRetry(
|
||||||
|
e, retries, failovers, isIdempotentOrAtMostOnce);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
static class ExponentialBackoffRetry extends RetryLimited {
|
static class ExponentialBackoffRetry extends RetryLimited {
|
||||||
|
|
||||||
public ExponentialBackoffRetry(
|
public ExponentialBackoffRetry(
|
||||||
|
@ -22,6 +22,7 @@
|
|||||||
import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
|
import static org.apache.hadoop.io.retry.RetryPolicies.TRY_ONCE_THEN_FAIL;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryByException;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryByRemoteException;
|
||||||
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryOtherThanRemoteException;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithFixedSleep;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumCountWithProportionalSleep;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
|
||||||
@ -29,6 +30,7 @@
|
|||||||
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
|
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.Callable;
|
import java.util.concurrent.Callable;
|
||||||
@ -215,6 +217,27 @@ public void testRetryByRemoteException() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRetryOtherThanRemoteException() throws Throwable {
|
||||||
|
Map<Class<? extends Exception>, RetryPolicy> exceptionToPolicyMap =
|
||||||
|
Collections.<Class<? extends Exception>, RetryPolicy>singletonMap(
|
||||||
|
IOException.class, RETRY_FOREVER);
|
||||||
|
|
||||||
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
|
||||||
|
retryOtherThanRemoteException(TRY_ONCE_THEN_FAIL,
|
||||||
|
exceptionToPolicyMap));
|
||||||
|
// should retry with local IOException.
|
||||||
|
unreliable.failsOnceWithIOException();
|
||||||
|
try {
|
||||||
|
// won't get retry on remote exception
|
||||||
|
unreliable.failsOnceWithRemoteException();
|
||||||
|
fail("Should fail");
|
||||||
|
} catch (RemoteException e) {
|
||||||
|
// expected
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryInterruptible() throws Throwable {
|
public void testRetryInterruptible() throws Throwable {
|
||||||
final UnreliableInterface unreliable = (UnreliableInterface)
|
final UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
|
@ -26,6 +26,8 @@ class UnreliableImplementation implements UnreliableInterface {
|
|||||||
|
|
||||||
private int failsOnceInvocationCount,
|
private int failsOnceInvocationCount,
|
||||||
failsOnceWithValueInvocationCount,
|
failsOnceWithValueInvocationCount,
|
||||||
|
failsOnceIOExceptionInvocationCount,
|
||||||
|
failsOnceRemoteExceptionInvocationCount,
|
||||||
failsTenTimesInvocationCount,
|
failsTenTimesInvocationCount,
|
||||||
succeedsOnceThenFailsCount,
|
succeedsOnceThenFailsCount,
|
||||||
succeedsOnceThenFailsIdempotentCount,
|
succeedsOnceThenFailsIdempotentCount,
|
||||||
@ -89,6 +91,21 @@ public boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failsOnceWithIOException() throws IOException {
|
||||||
|
if (failsOnceIOExceptionInvocationCount++ == 0) {
|
||||||
|
throw new IOException("test exception for failsOnceWithIOException");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public void failsOnceWithRemoteException() throws RemoteException {
|
||||||
|
if (failsOnceRemoteExceptionInvocationCount++ == 0) {
|
||||||
|
throw new RemoteException(IOException.class.getName(),
|
||||||
|
"test exception for failsOnceWithRemoteException");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void failsTenTimesThenSucceeds() throws UnreliableException {
|
public void failsTenTimesThenSucceeds() throws UnreliableException {
|
||||||
if (failsTenTimesInvocationCount++ < 10) {
|
if (failsTenTimesInvocationCount++ < 10) {
|
||||||
|
@ -54,6 +54,9 @@ public static class FatalException extends UnreliableException {
|
|||||||
void alwaysFailsWithFatalException() throws FatalException;
|
void alwaysFailsWithFatalException() throws FatalException;
|
||||||
void alwaysFailsWithRemoteFatalException() throws RemoteException;
|
void alwaysFailsWithRemoteFatalException() throws RemoteException;
|
||||||
|
|
||||||
|
void failsOnceWithIOException() throws IOException;
|
||||||
|
void failsOnceWithRemoteException() throws RemoteException;
|
||||||
|
|
||||||
void failsOnceThenSucceeds() throws UnreliableException;
|
void failsOnceThenSucceeds() throws UnreliableException;
|
||||||
boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
|
boolean failsOnceThenSucceedsWithReturnValue() throws UnreliableException;
|
||||||
|
|
||||||
|
@ -1034,6 +1034,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
YARN-4130. Duplicate declaration of ApplicationId in RMAppManager#submitApplication method.
|
YARN-4130. Duplicate declaration of ApplicationId in RMAppManager#submitApplication method.
|
||||||
(Kai Sasaki via rohithsharmaks)
|
(Kai Sasaki via rohithsharmaks)
|
||||||
|
|
||||||
|
YARN-4288. Fixed RMProxy to retry on IOException from local host.
|
||||||
|
(Junping Du via jianhe)
|
||||||
|
|
||||||
Release 2.7.2 - UNRELEASED
|
Release 2.7.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -250,8 +250,10 @@ public static RetryPolicy createRetryPolicy(Configuration conf) {
|
|||||||
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
exceptionToPolicyMap.put(ConnectTimeoutException.class, retryPolicy);
|
||||||
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
exceptionToPolicyMap.put(RetriableException.class, retryPolicy);
|
||||||
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
exceptionToPolicyMap.put(SocketException.class, retryPolicy);
|
||||||
|
// YARN-4288: local IOException is also possible.
|
||||||
return RetryPolicies.retryByException(
|
exceptionToPolicyMap.put(IOException.class, retryPolicy);
|
||||||
|
// Not retry on remote IO exception.
|
||||||
|
return RetryPolicies.retryOtherThanRemoteException(
|
||||||
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
RetryPolicies.TRY_ONCE_THEN_FAIL, exceptionToPolicyMap);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user