HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler. Contributed by Junping Du
This commit is contained in:
parent
9dafaaaf0d
commit
d8f390d015
@ -1165,6 +1165,9 @@ Release 2.8.0 - UNRELEASED
|
|||||||
HADOOP-12825. Log slow name resolutions.
|
HADOOP-12825. Log slow name resolutions.
|
||||||
(Sidharta Seethana via stevel)
|
(Sidharta Seethana via stevel)
|
||||||
|
|
||||||
|
HADOOP-12622. Improve the loggings in RetryPolicies and RetryInvocationHandler.
|
||||||
|
(Junping Du via jianhe)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
HADOOP-11785. Reduce the number of listStatus operation in distcp
|
||||||
|
@ -120,6 +120,7 @@ public Object invoke(Object proxy, Method method, Object[] args)
|
|||||||
invocationFailoverCount, isIdempotentOrAtMostOnce);
|
invocationFailoverCount, isIdempotentOrAtMostOnce);
|
||||||
RetryAction failAction = getFailAction(actions);
|
RetryAction failAction = getFailAction(actions);
|
||||||
if (failAction != null) {
|
if (failAction != null) {
|
||||||
|
// fail.
|
||||||
if (failAction.reason != null) {
|
if (failAction.reason != null) {
|
||||||
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
|
LOG.warn("Exception while invoking " + currentProxy.proxy.getClass()
|
||||||
+ "." + method.getName() + " over " + currentProxy.proxyInfo
|
+ "." + method.getName() + " over " + currentProxy.proxyInfo
|
||||||
@ -135,7 +136,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
|
|||||||
worthLogging |= LOG.isDebugEnabled();
|
worthLogging |= LOG.isDebugEnabled();
|
||||||
RetryAction failOverAction = getFailOverAction(actions);
|
RetryAction failOverAction = getFailOverAction(actions);
|
||||||
long delay = getDelayMillis(actions);
|
long delay = getDelayMillis(actions);
|
||||||
if (failOverAction != null && worthLogging) {
|
|
||||||
|
if (worthLogging) {
|
||||||
String msg = "Exception while invoking " + method.getName()
|
String msg = "Exception while invoking " + method.getName()
|
||||||
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
|
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
|
||||||
+ " over " + currentProxy.proxyInfo;
|
+ " over " + currentProxy.proxyInfo;
|
||||||
@ -143,21 +145,21 @@ public Object invoke(Object proxy, Method method, Object[] args)
|
|||||||
if (invocationFailoverCount > 0) {
|
if (invocationFailoverCount > 0) {
|
||||||
msg += " after " + invocationFailoverCount + " fail over attempts";
|
msg += " after " + invocationFailoverCount + " fail over attempts";
|
||||||
}
|
}
|
||||||
msg += ". Trying to fail over " + formatSleepMessage(delay);
|
|
||||||
LOG.info(msg, ex);
|
if (failOverAction != null) {
|
||||||
} else {
|
// failover
|
||||||
if(LOG.isDebugEnabled()) {
|
msg += ". Trying to fail over " + formatSleepMessage(delay);
|
||||||
LOG.debug("Exception while invoking " + method.getName()
|
} else {
|
||||||
+ " of class " + currentProxy.proxy.getClass().getSimpleName()
|
// retry
|
||||||
+ " over " + currentProxy.proxyInfo + ". Retrying "
|
msg += ". Retrying " + formatSleepMessage(delay);
|
||||||
+ formatSleepMessage(delay), ex);
|
|
||||||
}
|
}
|
||||||
|
LOG.info(msg, ex);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (delay > 0) {
|
if (delay > 0) {
|
||||||
Thread.sleep(delay);
|
Thread.sleep(delay);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (failOverAction != null) {
|
if (failOverAction != null) {
|
||||||
// Make sure that concurrent failed method invocations only cause a
|
// Make sure that concurrent failed method invocations only cause a
|
||||||
// single actual fail over.
|
// single actual fail over.
|
||||||
|
@ -1,4 +1,4 @@
|
|||||||
/*
|
/**
|
||||||
* Licensed to the Apache Software Foundation (ASF) under one
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
* or more contributor license agreements. See the NOTICE file
|
* or more contributor license agreements. See the NOTICE file
|
||||||
* distributed with this work for additional information
|
* distributed with this work for additional information
|
||||||
@ -39,6 +39,8 @@
|
|||||||
import org.apache.hadoop.net.ConnectTimeoutException;
|
import org.apache.hadoop.net.ConnectTimeoutException;
|
||||||
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
import org.apache.hadoop.security.token.SecretManager.InvalidToken;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* <p>
|
* <p>
|
||||||
* A collection of useful implementations of {@link RetryPolicy}.
|
* A collection of useful implementations of {@link RetryPolicy}.
|
||||||
@ -177,10 +179,11 @@ static class TryOnceThenFail implements RetryPolicy {
|
|||||||
@Override
|
@Override
|
||||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||||
boolean isIdempotentOrAtMostOnce) throws Exception {
|
boolean isIdempotentOrAtMostOnce) throws Exception {
|
||||||
return RetryAction.FAIL;
|
return new RetryAction(RetryAction.RetryDecision.FAIL, 0, "try once " +
|
||||||
|
"and fail.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RetryForever implements RetryPolicy {
|
static class RetryForever implements RetryPolicy {
|
||||||
@Override
|
@Override
|
||||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||||
@ -221,14 +224,24 @@ static abstract class RetryLimited implements RetryPolicy {
|
|||||||
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
public RetryAction shouldRetry(Exception e, int retries, int failovers,
|
||||||
boolean isIdempotentOrAtMostOnce) throws Exception {
|
boolean isIdempotentOrAtMostOnce) throws Exception {
|
||||||
if (retries >= maxRetries) {
|
if (retries >= maxRetries) {
|
||||||
return RetryAction.FAIL;
|
return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , getReason());
|
||||||
}
|
}
|
||||||
return new RetryAction(RetryAction.RetryDecision.RETRY,
|
return new RetryAction(RetryAction.RetryDecision.RETRY,
|
||||||
timeUnit.toMillis(calculateSleepTime(retries)));
|
timeUnit.toMillis(calculateSleepTime(retries)), getReason());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
protected String getReason() {
|
||||||
|
return constructReasonString(maxRetries);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static String constructReasonString(int retries) {
|
||||||
|
return "retries get failed due to exceeded maximum allowed retries " +
|
||||||
|
"number: " + retries;
|
||||||
|
}
|
||||||
|
|
||||||
protected abstract long calculateSleepTime(int retries);
|
protected abstract long calculateSleepTime(int retries);
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public int hashCode() {
|
public int hashCode() {
|
||||||
return toString().hashCode();
|
return toString().hashCode();
|
||||||
@ -264,18 +277,37 @@ protected long calculateSleepTime(int retries) {
|
|||||||
return sleepTime;
|
return sleepTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RetryUpToMaximumTimeWithFixedSleep extends RetryUpToMaximumCountWithFixedSleep {
|
static class RetryUpToMaximumTimeWithFixedSleep extends
|
||||||
public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime, TimeUnit timeUnit) {
|
RetryUpToMaximumCountWithFixedSleep {
|
||||||
|
private long maxTime = 0;
|
||||||
|
private TimeUnit timeUnit;
|
||||||
|
|
||||||
|
public RetryUpToMaximumTimeWithFixedSleep(long maxTime, long sleepTime,
|
||||||
|
TimeUnit timeUnit) {
|
||||||
super((int) (maxTime / sleepTime), sleepTime, timeUnit);
|
super((int) (maxTime / sleepTime), sleepTime, timeUnit);
|
||||||
|
this.maxTime = maxTime;
|
||||||
|
this.timeUnit = timeUnit;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
protected String getReason() {
|
||||||
|
return constructReasonString(this.maxTime, this.timeUnit);
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
public static String constructReasonString(long maxTime,
|
||||||
|
TimeUnit timeUnit) {
|
||||||
|
return "retries get failed due to exceeded maximum allowed time (" +
|
||||||
|
"in " + timeUnit.toString() + "): " + maxTime;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited {
|
static class RetryUpToMaximumCountWithProportionalSleep extends RetryLimited {
|
||||||
public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
|
public RetryUpToMaximumCountWithProportionalSleep(int maxRetries, long sleepTime, TimeUnit timeUnit) {
|
||||||
super(maxRetries, sleepTime, timeUnit);
|
super(maxRetries, sleepTime, timeUnit);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
protected long calculateSleepTime(int retries) {
|
protected long calculateSleepTime(int retries) {
|
||||||
return sleepTime * (retries + 1);
|
return sleepTime * (retries + 1);
|
||||||
@ -332,7 +364,8 @@ public RetryAction shouldRetry(Exception e, int curRetry, int failovers,
|
|||||||
final Pair p = searchPair(curRetry);
|
final Pair p = searchPair(curRetry);
|
||||||
if (p == null) {
|
if (p == null) {
|
||||||
//no more retries.
|
//no more retries.
|
||||||
return RetryAction.FAIL;
|
return new RetryAction(RetryAction.RetryDecision.FAIL, 0 , "Retry " +
|
||||||
|
"all pairs in MultipleLinearRandomRetry: " + pairs);
|
||||||
}
|
}
|
||||||
|
|
||||||
//calculate sleep time and return.
|
//calculate sleep time and return.
|
||||||
@ -549,6 +582,7 @@ public ExponentialBackoffRetry(
|
|||||||
protected long calculateSleepTime(int retries) {
|
protected long calculateSleepTime(int retries) {
|
||||||
return calculateExponentialTime(sleepTime, retries + 1);
|
return calculateExponentialTime(sleepTime, retries + 1);
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
|
@ -28,6 +28,15 @@
|
|||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryUpToMaximumTimeWithFixedSleep;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep;
|
import static org.apache.hadoop.io.retry.RetryPolicies.retryForeverWithFixedSleep;
|
||||||
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
|
import static org.apache.hadoop.io.retry.RetryPolicies.exponentialBackoffRetry;
|
||||||
|
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.anyBoolean;
|
||||||
|
import static org.mockito.Matchers.anyInt;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.times;
|
||||||
|
import static org.mockito.Mockito.verify;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
@ -41,10 +50,19 @@
|
|||||||
import java.util.concurrent.TimeUnit;
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.atomic.AtomicReference;
|
import java.util.concurrent.atomic.AtomicReference;
|
||||||
|
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction.RetryDecision;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumCountWithFixedSleep;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies.RetryUpToMaximumTimeWithFixedSleep;
|
||||||
|
import org.apache.hadoop.io.retry.RetryPolicies.TryOnceThenFail;
|
||||||
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
|
import org.apache.hadoop.io.retry.UnreliableInterface.FatalException;
|
||||||
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
|
import org.apache.hadoop.io.retry.UnreliableInterface.UnreliableException;
|
||||||
import org.apache.hadoop.ipc.ProtocolTranslator;
|
import org.apache.hadoop.ipc.ProtocolTranslator;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
|
|
||||||
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
|
import org.mockito.stubbing.Answer;
|
||||||
|
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
@ -53,25 +71,57 @@
|
|||||||
public class TestRetryProxy {
|
public class TestRetryProxy {
|
||||||
|
|
||||||
private UnreliableImplementation unreliableImpl;
|
private UnreliableImplementation unreliableImpl;
|
||||||
|
private RetryAction caughtRetryAction = null;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setUp() throws Exception {
|
public void setUp() throws Exception {
|
||||||
unreliableImpl = new UnreliableImplementation();
|
unreliableImpl = new UnreliableImplementation();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// answer mockPolicy's method with realPolicy, caught method's return value
|
||||||
|
private void setupMockPolicy(RetryPolicy mockPolicy,
|
||||||
|
final RetryPolicy realPolicy) throws Exception {
|
||||||
|
when(mockPolicy.shouldRetry(any(Exception.class), anyInt(), anyInt(),
|
||||||
|
anyBoolean())).thenAnswer(new Answer<RetryAction>() {
|
||||||
|
@SuppressWarnings("rawtypes")
|
||||||
|
@Override
|
||||||
|
public RetryAction answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Object[] args = invocation.getArguments();
|
||||||
|
Exception e = (Exception) args[0];
|
||||||
|
int retries = (int) args[1];
|
||||||
|
int failovers = (int) args[2];
|
||||||
|
boolean isIdempotentOrAtMostOnce = (boolean) args[3];
|
||||||
|
caughtRetryAction = realPolicy.shouldRetry(e, retries, failovers,
|
||||||
|
isIdempotentOrAtMostOnce);
|
||||||
|
return caughtRetryAction;
|
||||||
|
}
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testTryOnceThenFail() throws UnreliableException {
|
public void testTryOnceThenFail() throws Exception {
|
||||||
|
RetryPolicy policy = mock(TryOnceThenFail.class);
|
||||||
|
RetryPolicy realPolicy = TRY_ONCE_THEN_FAIL;
|
||||||
|
setupMockPolicy(policy, realPolicy);
|
||||||
|
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
RetryProxy.create(UnreliableInterface.class, unreliableImpl, TRY_ONCE_THEN_FAIL);
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
|
||||||
unreliable.alwaysSucceeds();
|
unreliable.alwaysSucceeds();
|
||||||
try {
|
try {
|
||||||
unreliable.failsOnceThenSucceeds();
|
unreliable.failsOnceThenSucceeds();
|
||||||
fail("Should fail");
|
fail("Should fail");
|
||||||
} catch (UnreliableException e) {
|
} catch (UnreliableException e) {
|
||||||
// expected
|
// expected
|
||||||
|
verify(policy, times(1)).shouldRetry(any(Exception.class), anyInt(),
|
||||||
|
anyInt(), anyBoolean());
|
||||||
|
assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
|
||||||
|
assertEquals("try once and fail.", caughtRetryAction.reason);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Other exception other than UnreliableException should also get " +
|
||||||
|
"failed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
|
* Test for {@link RetryInvocationHandler#isRpcInvocation(Object)}
|
||||||
*/
|
*/
|
||||||
@ -125,25 +175,48 @@ public void testRetryForeverWithFixedSleep() throws UnreliableException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryUpToMaximumCountWithFixedSleep() throws UnreliableException {
|
public void testRetryUpToMaximumCountWithFixedSleep() throws
|
||||||
|
Exception {
|
||||||
|
|
||||||
|
RetryPolicy policy = mock(RetryUpToMaximumCountWithFixedSleep.class);
|
||||||
|
int maxRetries = 8;
|
||||||
|
RetryPolicy realPolicy = retryUpToMaximumCountWithFixedSleep(maxRetries, 1,
|
||||||
|
TimeUnit.NANOSECONDS);
|
||||||
|
setupMockPolicy(policy, realPolicy);
|
||||||
|
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
|
||||||
retryUpToMaximumCountWithFixedSleep(8, 1, TimeUnit.NANOSECONDS));
|
// shouldRetry += 1
|
||||||
unreliable.alwaysSucceeds();
|
unreliable.alwaysSucceeds();
|
||||||
|
// shouldRetry += 2
|
||||||
unreliable.failsOnceThenSucceeds();
|
unreliable.failsOnceThenSucceeds();
|
||||||
try {
|
try {
|
||||||
|
// shouldRetry += (maxRetries -1) (just failed once above)
|
||||||
unreliable.failsTenTimesThenSucceeds();
|
unreliable.failsTenTimesThenSucceeds();
|
||||||
fail("Should fail");
|
fail("Should fail");
|
||||||
} catch (UnreliableException e) {
|
} catch (UnreliableException e) {
|
||||||
// expected
|
// expected
|
||||||
|
verify(policy, times(maxRetries + 2)).shouldRetry(any(Exception.class),
|
||||||
|
anyInt(), anyInt(), anyBoolean());
|
||||||
|
assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
|
||||||
|
assertEquals(RetryUpToMaximumCountWithFixedSleep.constructReasonString(
|
||||||
|
maxRetries), caughtRetryAction.reason);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Other exception other than UnreliableException should also get " +
|
||||||
|
"failed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException {
|
public void testRetryUpToMaximumTimeWithFixedSleep() throws Exception {
|
||||||
|
RetryPolicy policy = mock(RetryUpToMaximumTimeWithFixedSleep.class);
|
||||||
|
long maxTime = 80L;
|
||||||
|
RetryPolicy realPolicy = retryUpToMaximumTimeWithFixedSleep(maxTime, 10,
|
||||||
|
TimeUnit.NANOSECONDS);
|
||||||
|
setupMockPolicy(policy, realPolicy);
|
||||||
|
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
RetryProxy.create(UnreliableInterface.class, unreliableImpl,
|
RetryProxy.create(UnreliableInterface.class, unreliableImpl, policy);
|
||||||
retryUpToMaximumTimeWithFixedSleep(80, 10, TimeUnit.NANOSECONDS));
|
|
||||||
unreliable.alwaysSucceeds();
|
unreliable.alwaysSucceeds();
|
||||||
unreliable.failsOnceThenSucceeds();
|
unreliable.failsOnceThenSucceeds();
|
||||||
try {
|
try {
|
||||||
@ -151,9 +224,17 @@ public void testRetryUpToMaximumTimeWithFixedSleep() throws UnreliableException
|
|||||||
fail("Should fail");
|
fail("Should fail");
|
||||||
} catch (UnreliableException e) {
|
} catch (UnreliableException e) {
|
||||||
// expected
|
// expected
|
||||||
|
verify(policy, times((int)(maxTime/10) + 2)).shouldRetry(any(Exception.class),
|
||||||
|
anyInt(), anyInt(), anyBoolean());
|
||||||
|
assertEquals(RetryDecision.FAIL, caughtRetryAction.action);
|
||||||
|
assertEquals(RetryUpToMaximumTimeWithFixedSleep.constructReasonString(
|
||||||
|
maxTime, TimeUnit.NANOSECONDS), caughtRetryAction.reason);
|
||||||
|
} catch (Exception e) {
|
||||||
|
fail("Other exception other than UnreliableException should also get " +
|
||||||
|
"failed.");
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
|
public void testRetryUpToMaximumCountWithProportionalSleep() throws UnreliableException {
|
||||||
UnreliableInterface unreliable = (UnreliableInterface)
|
UnreliableInterface unreliable = (UnreliableInterface)
|
||||||
|
Loading…
Reference in New Issue
Block a user