HADOOP-13227. AsyncCallHandler should use an event driven architecture to handle async calls.
This commit is contained in:
parent
bf74dbf80d
commit
d328e66706
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
@ -27,17 +28,21 @@
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.InterruptedIOException;
|
||||
import java.lang.reflect.Method;
|
||||
import java.util.LinkedList;
|
||||
import java.util.Iterator;
|
||||
import java.util.Queue;
|
||||
import java.util.concurrent.ConcurrentLinkedQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.TimeoutException;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
||||
/** Handle async calls. */
|
||||
@InterfaceAudience.Private
|
||||
public class AsyncCallHandler {
|
||||
static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class);
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
AsyncCallHandler.class);
|
||||
|
||||
private static final ThreadLocal<AsyncGet<?, Exception>>
|
||||
LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>();
|
||||
@ -73,35 +78,34 @@ private static AsyncGet<?, Exception> getLowerLayerAsyncReturn() {
|
||||
|
||||
/** A simple concurrent queue which keeping track the empty start time. */
|
||||
static class ConcurrentQueue<T> {
|
||||
private final Queue<T> queue = new LinkedList<>();
|
||||
private long emptyStartTime = Time.monotonicNow();
|
||||
private final Queue<T> queue = new ConcurrentLinkedQueue<>();
|
||||
private final AtomicLong emptyStartTime
|
||||
= new AtomicLong(Time.monotonicNow());
|
||||
|
||||
synchronized int size() {
|
||||
return queue.size();
|
||||
Iterator<T> iterator() {
|
||||
return queue.iterator();
|
||||
}
|
||||
|
||||
/** Is the queue empty for more than the given time in millisecond? */
|
||||
synchronized boolean isEmpty(long time) {
|
||||
return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time;
|
||||
boolean isEmpty(long time) {
|
||||
return Time.monotonicNow() - emptyStartTime.get() > time
|
||||
&& queue.isEmpty();
|
||||
}
|
||||
|
||||
synchronized void offer(T c) {
|
||||
void offer(T c) {
|
||||
final boolean added = queue.offer(c);
|
||||
Preconditions.checkState(added);
|
||||
}
|
||||
|
||||
synchronized T poll() {
|
||||
Preconditions.checkState(!queue.isEmpty());
|
||||
final T t = queue.poll();
|
||||
void checkEmpty() {
|
||||
if (queue.isEmpty()) {
|
||||
emptyStartTime = Time.monotonicNow();
|
||||
emptyStartTime.set(Time.monotonicNow());
|
||||
}
|
||||
return t;
|
||||
}
|
||||
}
|
||||
|
||||
/** A queue for handling async calls. */
|
||||
static class AsyncCallQueue {
|
||||
class AsyncCallQueue {
|
||||
private final ConcurrentQueue<AsyncCall> queue = new ConcurrentQueue<>();
|
||||
private final Processor processor = new Processor();
|
||||
|
||||
@ -113,20 +117,29 @@ void addCall(AsyncCall call) {
|
||||
processor.tryStart();
|
||||
}
|
||||
|
||||
void checkCalls() {
|
||||
final int size = queue.size();
|
||||
for (int i = 0; i < size; i++) {
|
||||
final AsyncCall c = queue.poll();
|
||||
if (!c.isDone()) {
|
||||
queue.offer(c); // the call is not done yet, add it back.
|
||||
long checkCalls() {
|
||||
final long startTime = Time.monotonicNow();
|
||||
long minWaitTime = Processor.MAX_WAIT_PERIOD;
|
||||
|
||||
for (final Iterator<AsyncCall> i = queue.iterator(); i.hasNext();) {
|
||||
final AsyncCall c = i.next();
|
||||
if (c.isDone()) {
|
||||
i.remove(); // the call is done, remove it from the queue.
|
||||
queue.checkEmpty();
|
||||
} else {
|
||||
final Long waitTime = c.getWaitTime(startTime);
|
||||
if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) {
|
||||
minWaitTime = waitTime;
|
||||
}
|
||||
}
|
||||
}
|
||||
return minWaitTime;
|
||||
}
|
||||
|
||||
/** Process the async calls in the queue. */
|
||||
private class Processor {
|
||||
static final long GRACE_PERIOD = 10*1000L;
|
||||
static final long SLEEP_PERIOD = 100L;
|
||||
static final long GRACE_PERIOD = 3*1000L;
|
||||
static final long MAX_WAIT_PERIOD = 100L;
|
||||
|
||||
private final AtomicReference<Thread> running = new AtomicReference<>();
|
||||
|
||||
@ -141,15 +154,16 @@ void tryStart() {
|
||||
@Override
|
||||
public void run() {
|
||||
for (; isRunning(this);) {
|
||||
final long waitTime = checkCalls();
|
||||
tryStop(this);
|
||||
|
||||
try {
|
||||
Thread.sleep(SLEEP_PERIOD);
|
||||
synchronized (AsyncCallHandler.this) {
|
||||
AsyncCallHandler.this.wait(waitTime);
|
||||
}
|
||||
} catch (InterruptedException e) {
|
||||
kill(this);
|
||||
return;
|
||||
}
|
||||
|
||||
checkCalls();
|
||||
tryStop(this);
|
||||
}
|
||||
}
|
||||
};
|
||||
@ -215,10 +229,9 @@ static class AsyncCall extends RetryInvocationHandler.Call {
|
||||
private AsyncGet<?, Exception> lowerLayerAsyncGet;
|
||||
|
||||
AsyncCall(Method method, Object[] args, boolean isRpc, int callId,
|
||||
RetryInvocationHandler.Counters counters,
|
||||
RetryInvocationHandler<?> retryInvocationHandler,
|
||||
AsyncCallHandler asyncCallHandler) {
|
||||
super(method, args, isRpc, callId, counters, retryInvocationHandler);
|
||||
super(method, args, isRpc, callId, retryInvocationHandler);
|
||||
|
||||
this.asyncCallHandler = asyncCallHandler;
|
||||
}
|
||||
@ -226,6 +239,7 @@ static class AsyncCall extends RetryInvocationHandler.Call {
|
||||
/** @return true if the call is done; otherwise, return false. */
|
||||
boolean isDone() {
|
||||
final CallReturn r = invokeOnce();
|
||||
LOG.debug("#{}: {}", getCallId(), r.getState());
|
||||
switch (r.getState()) {
|
||||
case RETURNED:
|
||||
case EXCEPTION:
|
||||
@ -234,6 +248,7 @@ boolean isDone() {
|
||||
case RETRY:
|
||||
invokeOnce();
|
||||
break;
|
||||
case WAIT_RETRY:
|
||||
case ASYNC_CALL_IN_PROGRESS:
|
||||
case ASYNC_INVOKED:
|
||||
// nothing to do
|
||||
@ -244,13 +259,25 @@ boolean isDone() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
CallReturn processWaitTimeAndRetryInfo() {
|
||||
final Long waitTime = getWaitTime(Time.monotonicNow());
|
||||
LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime);
|
||||
if (waitTime != null && waitTime > 0) {
|
||||
return CallReturn.WAIT_RETRY;
|
||||
}
|
||||
processRetryInfo();
|
||||
return CallReturn.RETRY;
|
||||
}
|
||||
|
||||
@Override
|
||||
CallReturn invoke() throws Throwable {
|
||||
LOG.debug("{}.invoke {}", getClass().getSimpleName(), this);
|
||||
if (lowerLayerAsyncGet != null) {
|
||||
// async call was submitted early, check the lower level async call
|
||||
final boolean isDone = lowerLayerAsyncGet.isDone();
|
||||
LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone);
|
||||
LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}",
|
||||
getCallId(), isDone);
|
||||
if (!isDone) {
|
||||
return CallReturn.ASYNC_CALL_IN_PROGRESS;
|
||||
}
|
||||
@ -262,7 +289,7 @@ CallReturn invoke() throws Throwable {
|
||||
}
|
||||
|
||||
// submit a new async call
|
||||
LOG.trace("invoke: ASYNC_INVOKED");
|
||||
LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId());
|
||||
final boolean mode = Client.isAsynchronousMode();
|
||||
try {
|
||||
Client.setAsynchronousMode(true);
|
||||
@ -271,9 +298,9 @@ CallReturn invoke() throws Throwable {
|
||||
Preconditions.checkState(r == null);
|
||||
lowerLayerAsyncGet = getLowerLayerAsyncReturn();
|
||||
|
||||
if (counters.isZeros()) {
|
||||
if (getCounters().isZeros()) {
|
||||
// first async attempt, initialize
|
||||
LOG.trace("invoke: initAsyncCall");
|
||||
LOG.trace("#{} invoke: initAsyncCall", getCallId());
|
||||
asyncCallHandler.initAsyncCall(this, asyncCallReturn);
|
||||
}
|
||||
return CallReturn.ASYNC_INVOKED;
|
||||
@ -287,9 +314,9 @@ CallReturn invoke() throws Throwable {
|
||||
private volatile boolean hasSuccessfulCall = false;
|
||||
|
||||
AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc,
|
||||
int callId, RetryInvocationHandler.Counters counters,
|
||||
int callId,
|
||||
RetryInvocationHandler<?> retryInvocationHandler) {
|
||||
return new AsyncCall(method, args, isRpc, callId, counters,
|
||||
return new AsyncCall(method, args, isRpc, callId,
|
||||
retryInvocationHandler, this);
|
||||
}
|
||||
|
||||
@ -318,4 +345,9 @@ public boolean isDone() {
|
||||
};
|
||||
ASYNC_RETURN.set(asyncGet);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static long getGracePeriod() {
|
||||
return AsyncCallQueue.Processor.GRACE_PERIOD;
|
||||
}
|
||||
}
|
||||
|
@ -29,6 +29,8 @@ enum State {
|
||||
EXCEPTION,
|
||||
/** Call should be retried according to the {@link RetryPolicy}. */
|
||||
RETRY,
|
||||
/** Call should wait and then retry according to the {@link RetryPolicy}. */
|
||||
WAIT_RETRY,
|
||||
/** Call, which is async, is still in progress. */
|
||||
ASYNC_CALL_IN_PROGRESS,
|
||||
/** Call, which is async, just has been invoked. */
|
||||
@ -39,6 +41,7 @@ enum State {
|
||||
State.ASYNC_CALL_IN_PROGRESS);
|
||||
static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED);
|
||||
static final CallReturn RETRY = new CallReturn(State.RETRY);
|
||||
static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY);
|
||||
|
||||
private final Object returnValue;
|
||||
private final Throwable thrown;
|
||||
|
@ -18,13 +18,14 @@
|
||||
package org.apache.hadoop.io.retry;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo;
|
||||
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
|
||||
import org.apache.hadoop.ipc.*;
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.util.Time;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.InterruptedIOException;
|
||||
@ -41,33 +42,51 @@
|
||||
*/
|
||||
@InterfaceAudience.Private
|
||||
public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||
public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class);
|
||||
public static final Logger LOG = LoggerFactory.getLogger(
|
||||
RetryInvocationHandler.class);
|
||||
|
||||
static class Call {
|
||||
private final Method method;
|
||||
private final Object[] args;
|
||||
private final boolean isRpc;
|
||||
private final int callId;
|
||||
final Counters counters;
|
||||
private final Counters counters = new Counters();
|
||||
|
||||
private final RetryPolicy retryPolicy;
|
||||
private final RetryInvocationHandler<?> retryInvocationHandler;
|
||||
|
||||
private RetryInfo retryInfo;
|
||||
|
||||
Call(Method method, Object[] args, boolean isRpc, int callId,
|
||||
Counters counters, RetryInvocationHandler<?> retryInvocationHandler) {
|
||||
RetryInvocationHandler<?> retryInvocationHandler) {
|
||||
this.method = method;
|
||||
this.args = args;
|
||||
this.isRpc = isRpc;
|
||||
this.callId = callId;
|
||||
this.counters = counters;
|
||||
|
||||
this.retryPolicy = retryInvocationHandler.getRetryPolicy(method);
|
||||
this.retryInvocationHandler = retryInvocationHandler;
|
||||
}
|
||||
|
||||
int getCallId() {
|
||||
return callId;
|
||||
}
|
||||
|
||||
Counters getCounters() {
|
||||
return counters;
|
||||
}
|
||||
|
||||
synchronized Long getWaitTime(final long now) {
|
||||
return retryInfo == null? null: retryInfo.retryTime - now;
|
||||
}
|
||||
|
||||
/** Invoke the call once without retrying. */
|
||||
synchronized CallReturn invokeOnce() {
|
||||
try {
|
||||
if (retryInfo != null) {
|
||||
return processWaitTimeAndRetryInfo();
|
||||
}
|
||||
|
||||
// The number of times this invocation handler has ever been failed over
|
||||
// before this method invocation attempt. Used to prevent concurrent
|
||||
// failed method invocations from triggering multiple failover attempts.
|
||||
@ -76,28 +95,70 @@ synchronized CallReturn invokeOnce() {
|
||||
return invoke();
|
||||
} catch (Exception e) {
|
||||
if (LOG.isTraceEnabled()) {
|
||||
LOG.trace(this, e);
|
||||
LOG.trace(toString(), e);
|
||||
}
|
||||
if (Thread.currentThread().isInterrupted()) {
|
||||
// If interrupted, do not retry.
|
||||
throw e;
|
||||
}
|
||||
retryInvocationHandler.handleException(
|
||||
method, retryPolicy, failoverCount, counters, e);
|
||||
return CallReturn.RETRY;
|
||||
|
||||
retryInfo = retryInvocationHandler.handleException(
|
||||
method, callId, retryPolicy, counters, failoverCount, e);
|
||||
return processWaitTimeAndRetryInfo();
|
||||
}
|
||||
} catch(Throwable t) {
|
||||
return new CallReturn(t);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* It first processes the wait time, if there is any,
|
||||
* and then invokes {@link #processRetryInfo()}.
|
||||
*
|
||||
* If the wait time is positive, it either sleeps for synchronous calls
|
||||
* or immediately returns for asynchronous calls.
|
||||
*
|
||||
* @return {@link CallReturn#RETRY} if the retryInfo is processed;
|
||||
* otherwise, return {@link CallReturn#WAIT_RETRY}.
|
||||
*/
|
||||
CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException {
|
||||
final Long waitTime = getWaitTime(Time.monotonicNow());
|
||||
LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}",
|
||||
callId, retryInfo, waitTime);
|
||||
if (waitTime != null && waitTime > 0) {
|
||||
try {
|
||||
Thread.sleep(retryInfo.delay);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.warn("Interrupted while waiting to retry", e);
|
||||
InterruptedIOException intIOE = new InterruptedIOException(
|
||||
"Retry interrupted");
|
||||
intIOE.initCause(e);
|
||||
throw intIOE;
|
||||
}
|
||||
}
|
||||
processRetryInfo();
|
||||
return CallReturn.RETRY;
|
||||
}
|
||||
|
||||
synchronized void processRetryInfo() {
|
||||
counters.retries++;
|
||||
if (retryInfo.isFailover()) {
|
||||
retryInvocationHandler.proxyDescriptor.failover(
|
||||
retryInfo.expectedFailoverCount, method, callId);
|
||||
counters.failovers++;
|
||||
}
|
||||
retryInfo = null;
|
||||
}
|
||||
|
||||
CallReturn invoke() throws Throwable {
|
||||
return new CallReturn(invokeMethod());
|
||||
}
|
||||
|
||||
Object invokeMethod() throws Throwable {
|
||||
if (isRpc) {
|
||||
Client.setCallIdAndRetryCount(callId, counters.retries);
|
||||
Client.setCallIdAndRetryCount(callId, counters.retries,
|
||||
retryInvocationHandler.asyncCallHandler);
|
||||
}
|
||||
return retryInvocationHandler.invokeMethod(method, args);
|
||||
}
|
||||
@ -146,15 +207,16 @@ synchronized long getFailoverCount() {
|
||||
return failoverCount;
|
||||
}
|
||||
|
||||
synchronized void failover(long expectedFailoverCount, Method method) {
|
||||
synchronized void failover(long expectedFailoverCount, Method method,
|
||||
int callId) {
|
||||
// Make sure that concurrent failed invocations only cause a single
|
||||
// actual failover.
|
||||
if (failoverCount == expectedFailoverCount) {
|
||||
fpp.performFailover(proxyInfo.proxy);
|
||||
failoverCount++;
|
||||
} else {
|
||||
LOG.warn("A failover has occurred since the start of "
|
||||
+ proxyInfo.getString(method.getName()));
|
||||
LOG.warn("A failover has occurred since the start of call #" + callId
|
||||
+ " " + proxyInfo.getString(method.getName()));
|
||||
}
|
||||
proxyInfo = fpp.getProxy();
|
||||
}
|
||||
@ -172,22 +234,33 @@ void close() throws IOException {
|
||||
}
|
||||
|
||||
private static class RetryInfo {
|
||||
private final long retryTime;
|
||||
private final long delay;
|
||||
private final RetryAction failover;
|
||||
private final RetryAction fail;
|
||||
private final RetryAction action;
|
||||
private final long expectedFailoverCount;
|
||||
|
||||
RetryInfo(long delay, RetryAction failover, RetryAction fail) {
|
||||
RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
|
||||
this.delay = delay;
|
||||
this.failover = failover;
|
||||
this.fail = fail;
|
||||
this.retryTime = Time.monotonicNow() + delay;
|
||||
this.action = action;
|
||||
this.expectedFailoverCount = expectedFailoverCount;
|
||||
}
|
||||
|
||||
boolean isFailover() {
|
||||
return action != null
|
||||
&& action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY;
|
||||
}
|
||||
|
||||
boolean isFail() {
|
||||
return action != null
|
||||
&& action.action == RetryAction.RetryDecision.FAIL;
|
||||
}
|
||||
|
||||
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
||||
Counters counters, boolean idempotentOrAtMostOnce) throws Exception {
|
||||
Counters counters, boolean idempotentOrAtMostOnce,
|
||||
long expectedFailoverCount) throws Exception {
|
||||
RetryAction max = null;
|
||||
long maxRetryDelay = 0;
|
||||
RetryAction failover = null;
|
||||
RetryAction retry = null;
|
||||
RetryAction fail = null;
|
||||
|
||||
final Iterable<Exception> exceptions = e instanceof MultiException ?
|
||||
((MultiException) e).getExceptions().values()
|
||||
@ -195,23 +268,19 @@ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
||||
for (Exception exception : exceptions) {
|
||||
final RetryAction a = policy.shouldRetry(exception,
|
||||
counters.retries, counters.failovers, idempotentOrAtMostOnce);
|
||||
if (a.action == RetryAction.RetryDecision.FAIL) {
|
||||
fail = a;
|
||||
} else {
|
||||
if (a.action != RetryAction.RetryDecision.FAIL) {
|
||||
// must be a retry or failover
|
||||
if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) {
|
||||
failover = a;
|
||||
} else {
|
||||
retry = a;
|
||||
}
|
||||
if (a.delayMillis > maxRetryDelay) {
|
||||
maxRetryDelay = a.delayMillis;
|
||||
}
|
||||
}
|
||||
|
||||
if (max == null || max.action.compareTo(a.action) < 0) {
|
||||
max = a;
|
||||
}
|
||||
}
|
||||
|
||||
return new RetryInfo(maxRetryDelay, failover,
|
||||
failover == null && retry == null? fail: null);
|
||||
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
|
||||
}
|
||||
}
|
||||
|
||||
@ -246,13 +315,12 @@ private long getFailoverCount() {
|
||||
return proxyDescriptor.getFailoverCount();
|
||||
}
|
||||
|
||||
private Call newCall(Method method, Object[] args, boolean isRpc, int callId,
|
||||
Counters counters) {
|
||||
private Call newCall(Method method, Object[] args, boolean isRpc,
|
||||
int callId) {
|
||||
if (Client.isAsynchronousMode()) {
|
||||
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId,
|
||||
counters, this);
|
||||
return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this);
|
||||
} else {
|
||||
return new Call(method, args, isRpc, callId, counters, this);
|
||||
return new Call(method, args, isRpc, callId, this);
|
||||
}
|
||||
}
|
||||
|
||||
@ -261,9 +329,8 @@ public Object invoke(Object proxy, Method method, Object[] args)
|
||||
throws Throwable {
|
||||
final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy());
|
||||
final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID;
|
||||
final Counters counters = new Counters();
|
||||
|
||||
final Call call = newCall(method, args, isRpc, callId, counters);
|
||||
final Call call = newCall(method, args, isRpc, callId);
|
||||
while (true) {
|
||||
final CallReturn c = call.invokeOnce();
|
||||
final CallReturn.State state = c.getState();
|
||||
@ -275,45 +342,24 @@ public Object invoke(Object proxy, Method method, Object[] args)
|
||||
}
|
||||
}
|
||||
|
||||
private void handleException(final Method method, final RetryPolicy policy,
|
||||
final long expectedFailoverCount, final Counters counters,
|
||||
final Exception ex) throws Exception {
|
||||
final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters,
|
||||
proxyDescriptor.idempotentOrAtMostOnce(method));
|
||||
counters.retries++;
|
||||
|
||||
if (retryInfo.fail != null) {
|
||||
private RetryInfo handleException(final Method method, final int callId,
|
||||
final RetryPolicy policy, final Counters counters,
|
||||
final long expectFailoverCount, final Exception e) throws Exception {
|
||||
final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e,
|
||||
counters, proxyDescriptor.idempotentOrAtMostOnce(method),
|
||||
expectFailoverCount);
|
||||
if (retryInfo.isFail()) {
|
||||
// fail.
|
||||
if (retryInfo.fail.reason != null) {
|
||||
LOG.warn("Exception while invoking "
|
||||
if (retryInfo.action.reason != null) {
|
||||
LOG.warn("Exception while invoking call #" + callId + " "
|
||||
+ proxyDescriptor.getProxyInfo().getString(method.getName())
|
||||
+ ". Not retrying because " + retryInfo.fail.reason, ex);
|
||||
+ ". Not retrying because " + retryInfo.action.reason, e);
|
||||
}
|
||||
throw ex;
|
||||
throw e;
|
||||
}
|
||||
|
||||
// retry
|
||||
final boolean isFailover = retryInfo.failover != null;
|
||||
|
||||
log(method, isFailover, counters.failovers, retryInfo.delay, ex);
|
||||
|
||||
if (retryInfo.delay > 0) {
|
||||
try {
|
||||
Thread.sleep(retryInfo.delay);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.warn("Interrupted while waiting to retry", e);
|
||||
InterruptedIOException intIOE = new InterruptedIOException(
|
||||
"Retry interrupted");
|
||||
intIOE.initCause(e);
|
||||
throw intIOE;
|
||||
}
|
||||
}
|
||||
|
||||
if (isFailover) {
|
||||
proxyDescriptor.failover(expectedFailoverCount, method);
|
||||
counters.failovers++;
|
||||
}
|
||||
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
|
||||
return retryInfo;
|
||||
}
|
||||
|
||||
private void log(final Method method, final boolean isFailover,
|
||||
|
@ -67,6 +67,7 @@ public String toString() {
|
||||
}
|
||||
|
||||
public enum RetryDecision {
|
||||
// Ordering: FAIL < RETRY < FAILOVER_AND_RETRY.
|
||||
FAIL,
|
||||
RETRY,
|
||||
FAILOVER_AND_RETRY
|
||||
|
@ -93,6 +93,8 @@ public class Client implements AutoCloseable {
|
||||
|
||||
private static final ThreadLocal<Integer> callId = new ThreadLocal<Integer>();
|
||||
private static final ThreadLocal<Integer> retryCount = new ThreadLocal<Integer>();
|
||||
private static final ThreadLocal<Object> EXTERNAL_CALL_HANDLER
|
||||
= new ThreadLocal<>();
|
||||
private static final ThreadLocal<AsyncGet<? extends Writable, IOException>>
|
||||
ASYNC_RPC_RESPONSE = new ThreadLocal<>();
|
||||
private static final ThreadLocal<Boolean> asynchronousMode =
|
||||
@ -111,13 +113,15 @@ protected Boolean initialValue() {
|
||||
}
|
||||
|
||||
/** Set call id and retry count for the next call. */
|
||||
public static void setCallIdAndRetryCount(int cid, int rc) {
|
||||
public static void setCallIdAndRetryCount(int cid, int rc,
|
||||
Object externalHandler) {
|
||||
Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID);
|
||||
Preconditions.checkState(callId.get() == null);
|
||||
Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT);
|
||||
|
||||
callId.set(cid);
|
||||
retryCount.set(rc);
|
||||
EXTERNAL_CALL_HANDLER.set(externalHandler);
|
||||
}
|
||||
|
||||
private ConcurrentMap<ConnectionId, Connection> connections =
|
||||
@ -333,6 +337,7 @@ static class Call {
|
||||
IOException error; // exception, null if success
|
||||
final RPC.RpcKind rpcKind; // Rpc EngineKind
|
||||
boolean done; // true when call is done
|
||||
private final Object externalHandler;
|
||||
|
||||
private Call(RPC.RpcKind rpcKind, Writable param) {
|
||||
this.rpcKind = rpcKind;
|
||||
@ -352,6 +357,8 @@ private Call(RPC.RpcKind rpcKind, Writable param) {
|
||||
} else {
|
||||
this.retry = rc;
|
||||
}
|
||||
|
||||
this.externalHandler = EXTERNAL_CALL_HANDLER.get();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -364,6 +371,12 @@ public String toString() {
|
||||
protected synchronized void callComplete() {
|
||||
this.done = true;
|
||||
notify(); // notify caller
|
||||
|
||||
if (externalHandler != null) {
|
||||
synchronized (externalHandler) {
|
||||
externalHandler.notify();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/** Set the exception when there is an error.
|
||||
|
@ -21,6 +21,7 @@
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.RetriableException;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Rule;
|
||||
import org.junit.Test;
|
||||
import org.junit.rules.Timeout;
|
||||
@ -35,7 +36,18 @@
|
||||
*/
|
||||
public class TestDefaultRetryPolicy {
|
||||
@Rule
|
||||
public Timeout timeout = new Timeout(300000);
|
||||
public Timeout timeout = new Timeout(30000);
|
||||
|
||||
/** Verify FAIL < RETRY < FAILOVER_AND_RETRY. */
|
||||
@Test
|
||||
public void testRetryDecisionOrdering() throws Exception {
|
||||
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
|
||||
RetryPolicy.RetryAction.RetryDecision.RETRY) < 0);
|
||||
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.RETRY.compareTo(
|
||||
RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
|
||||
Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo(
|
||||
RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that the default retry policy correctly retries
|
||||
|
@ -367,7 +367,7 @@ public void testCallIdAndRetry() throws IOException, InterruptedException,
|
||||
Call createCall(RpcKind rpcKind, Writable rpcRequest) {
|
||||
// Set different call id and retry count for the next call
|
||||
Client.setCallIdAndRetryCount(Client.nextCallId(),
|
||||
TestIPC.RANDOM.nextInt(255));
|
||||
TestIPC.RANDOM.nextInt(255), null);
|
||||
|
||||
final Call call = super.createCall(rpcKind, rpcRequest);
|
||||
|
||||
@ -421,7 +421,7 @@ public void testCallRetryCount() throws IOException, InterruptedException,
|
||||
final int retryCount = 255;
|
||||
// Override client to store the call id
|
||||
final Client client = new Client(LongWritable.class, conf);
|
||||
Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount);
|
||||
Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount, null);
|
||||
|
||||
// Attach a listener that tracks every call ID received by the server.
|
||||
final TestServer server = new TestIPC.TestServer(1, false, conf);
|
||||
|
@ -1172,7 +1172,7 @@ public void run() {
|
||||
retryProxy.dummyRun();
|
||||
Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1);
|
||||
} finally {
|
||||
Client.setCallIdAndRetryCount(0, 0);
|
||||
Client.setCallIdAndRetryCount(0, 0, null);
|
||||
client.stop();
|
||||
server.stop();
|
||||
}
|
||||
@ -1205,7 +1205,7 @@ public void testNoRetryOnInvalidToken() throws IOException {
|
||||
} finally {
|
||||
// Check if dummyRun called only once
|
||||
Assert.assertEquals(handler.invocations, 1);
|
||||
Client.setCallIdAndRetryCount(0, 0);
|
||||
Client.setCallIdAndRetryCount(0, 0, null);
|
||||
client.stop();
|
||||
server.stop();
|
||||
}
|
||||
@ -1250,7 +1250,7 @@ public void testCallRetryCount() throws IOException {
|
||||
final int retryCount = 255;
|
||||
// Override client to store the call id
|
||||
final Client client = new Client(LongWritable.class, conf);
|
||||
Client.setCallIdAndRetryCount(Client.nextCallId(), 255);
|
||||
Client.setCallIdAndRetryCount(Client.nextCallId(), 255, null);
|
||||
|
||||
// Attach a listener that tracks every call ID received by the server.
|
||||
final TestServer server = new TestServer(1, false);
|
||||
|
Loading…
Reference in New Issue
Block a user