diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java index 5a03b034c3..69e1233e16 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/AsyncCallHandler.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.io.retry; +import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -27,17 +28,21 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.io.InterruptedIOException; import java.lang.reflect.Method; -import java.util.LinkedList; +import java.util.Iterator; import java.util.Queue; +import java.util.concurrent.ConcurrentLinkedQueue; import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeoutException; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; /** Handle async calls. */ @InterfaceAudience.Private public class AsyncCallHandler { - static final Logger LOG = LoggerFactory.getLogger(AsyncCallHandler.class); + public static final Logger LOG = LoggerFactory.getLogger( + AsyncCallHandler.class); private static final ThreadLocal> LOWER_LAYER_ASYNC_RETURN = new ThreadLocal<>(); @@ -73,35 +78,34 @@ private static AsyncGet getLowerLayerAsyncReturn() { /** A simple concurrent queue which keeping track the empty start time. */ static class ConcurrentQueue { - private final Queue queue = new LinkedList<>(); - private long emptyStartTime = Time.monotonicNow(); + private final Queue queue = new ConcurrentLinkedQueue<>(); + private final AtomicLong emptyStartTime + = new AtomicLong(Time.monotonicNow()); - synchronized int size() { - return queue.size(); + Iterator iterator() { + return queue.iterator(); } /** Is the queue empty for more than the given time in millisecond? */ - synchronized boolean isEmpty(long time) { - return queue.isEmpty() && Time.monotonicNow() - emptyStartTime > time; + boolean isEmpty(long time) { + return Time.monotonicNow() - emptyStartTime.get() > time + && queue.isEmpty(); } - synchronized void offer(T c) { + void offer(T c) { final boolean added = queue.offer(c); Preconditions.checkState(added); } - synchronized T poll() { - Preconditions.checkState(!queue.isEmpty()); - final T t = queue.poll(); + void checkEmpty() { if (queue.isEmpty()) { - emptyStartTime = Time.monotonicNow(); + emptyStartTime.set(Time.monotonicNow()); } - return t; } } /** A queue for handling async calls. */ - static class AsyncCallQueue { + class AsyncCallQueue { private final ConcurrentQueue queue = new ConcurrentQueue<>(); private final Processor processor = new Processor(); @@ -113,20 +117,29 @@ void addCall(AsyncCall call) { processor.tryStart(); } - void checkCalls() { - final int size = queue.size(); - for (int i = 0; i < size; i++) { - final AsyncCall c = queue.poll(); - if (!c.isDone()) { - queue.offer(c); // the call is not done yet, add it back. + long checkCalls() { + final long startTime = Time.monotonicNow(); + long minWaitTime = Processor.MAX_WAIT_PERIOD; + + for (final Iterator i = queue.iterator(); i.hasNext();) { + final AsyncCall c = i.next(); + if (c.isDone()) { + i.remove(); // the call is done, remove it from the queue. + queue.checkEmpty(); + } else { + final Long waitTime = c.getWaitTime(startTime); + if (waitTime != null && waitTime > 0 && waitTime < minWaitTime) { + minWaitTime = waitTime; + } } } + return minWaitTime; } /** Process the async calls in the queue. */ private class Processor { - static final long GRACE_PERIOD = 10*1000L; - static final long SLEEP_PERIOD = 100L; + static final long GRACE_PERIOD = 3*1000L; + static final long MAX_WAIT_PERIOD = 100L; private final AtomicReference running = new AtomicReference<>(); @@ -141,15 +154,16 @@ void tryStart() { @Override public void run() { for (; isRunning(this);) { + final long waitTime = checkCalls(); + tryStop(this); + try { - Thread.sleep(SLEEP_PERIOD); + synchronized (AsyncCallHandler.this) { + AsyncCallHandler.this.wait(waitTime); + } } catch (InterruptedException e) { kill(this); - return; } - - checkCalls(); - tryStop(this); } } }; @@ -215,10 +229,9 @@ static class AsyncCall extends RetryInvocationHandler.Call { private AsyncGet lowerLayerAsyncGet; AsyncCall(Method method, Object[] args, boolean isRpc, int callId, - RetryInvocationHandler.Counters counters, RetryInvocationHandler retryInvocationHandler, AsyncCallHandler asyncCallHandler) { - super(method, args, isRpc, callId, counters, retryInvocationHandler); + super(method, args, isRpc, callId, retryInvocationHandler); this.asyncCallHandler = asyncCallHandler; } @@ -226,6 +239,7 @@ static class AsyncCall extends RetryInvocationHandler.Call { /** @return true if the call is done; otherwise, return false. */ boolean isDone() { final CallReturn r = invokeOnce(); + LOG.debug("#{}: {}", getCallId(), r.getState()); switch (r.getState()) { case RETURNED: case EXCEPTION: @@ -234,6 +248,7 @@ boolean isDone() { case RETRY: invokeOnce(); break; + case WAIT_RETRY: case ASYNC_CALL_IN_PROGRESS: case ASYNC_INVOKED: // nothing to do @@ -244,13 +259,25 @@ boolean isDone() { return false; } + @Override + CallReturn processWaitTimeAndRetryInfo() { + final Long waitTime = getWaitTime(Time.monotonicNow()); + LOG.trace("#{} processRetryInfo: waitTime={}", getCallId(), waitTime); + if (waitTime != null && waitTime > 0) { + return CallReturn.WAIT_RETRY; + } + processRetryInfo(); + return CallReturn.RETRY; + } + @Override CallReturn invoke() throws Throwable { LOG.debug("{}.invoke {}", getClass().getSimpleName(), this); if (lowerLayerAsyncGet != null) { // async call was submitted early, check the lower level async call final boolean isDone = lowerLayerAsyncGet.isDone(); - LOG.trace("invoke: lowerLayerAsyncGet.isDone()? {}", isDone); + LOG.trace("#{} invoke: lowerLayerAsyncGet.isDone()? {}", + getCallId(), isDone); if (!isDone) { return CallReturn.ASYNC_CALL_IN_PROGRESS; } @@ -262,7 +289,7 @@ CallReturn invoke() throws Throwable { } // submit a new async call - LOG.trace("invoke: ASYNC_INVOKED"); + LOG.trace("#{} invoke: ASYNC_INVOKED", getCallId()); final boolean mode = Client.isAsynchronousMode(); try { Client.setAsynchronousMode(true); @@ -271,9 +298,9 @@ CallReturn invoke() throws Throwable { Preconditions.checkState(r == null); lowerLayerAsyncGet = getLowerLayerAsyncReturn(); - if (counters.isZeros()) { + if (getCounters().isZeros()) { // first async attempt, initialize - LOG.trace("invoke: initAsyncCall"); + LOG.trace("#{} invoke: initAsyncCall", getCallId()); asyncCallHandler.initAsyncCall(this, asyncCallReturn); } return CallReturn.ASYNC_INVOKED; @@ -287,9 +314,9 @@ CallReturn invoke() throws Throwable { private volatile boolean hasSuccessfulCall = false; AsyncCall newAsyncCall(Method method, Object[] args, boolean isRpc, - int callId, RetryInvocationHandler.Counters counters, + int callId, RetryInvocationHandler retryInvocationHandler) { - return new AsyncCall(method, args, isRpc, callId, counters, + return new AsyncCall(method, args, isRpc, callId, retryInvocationHandler, this); } @@ -318,4 +345,9 @@ public boolean isDone() { }; ASYNC_RETURN.set(asyncGet); } + + @VisibleForTesting + public static long getGracePeriod() { + return AsyncCallQueue.Processor.GRACE_PERIOD; + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java index 943725c64f..022b78507f 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/CallReturn.java @@ -29,6 +29,8 @@ enum State { EXCEPTION, /** Call should be retried according to the {@link RetryPolicy}. */ RETRY, + /** Call should wait and then retry according to the {@link RetryPolicy}. */ + WAIT_RETRY, /** Call, which is async, is still in progress. */ ASYNC_CALL_IN_PROGRESS, /** Call, which is async, just has been invoked. */ @@ -39,6 +41,7 @@ enum State { State.ASYNC_CALL_IN_PROGRESS); static final CallReturn ASYNC_INVOKED = new CallReturn(State.ASYNC_INVOKED); static final CallReturn RETRY = new CallReturn(State.RETRY); + static final CallReturn WAIT_RETRY = new CallReturn(State.WAIT_RETRY); private final Object returnValue; private final Throwable thrown; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 5198c0d27a..7bd3a15c4b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -18,13 +18,14 @@ package org.apache.hadoop.io.retry; import com.google.common.annotations.VisibleForTesting; -import org.apache.commons.logging.Log; -import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.io.retry.FailoverProxyProvider.ProxyInfo; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.ipc.*; import org.apache.hadoop.ipc.Client.ConnectionId; +import org.apache.hadoop.util.Time; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.io.InterruptedIOException; @@ -41,33 +42,51 @@ */ @InterfaceAudience.Private public class RetryInvocationHandler implements RpcInvocationHandler { - public static final Log LOG = LogFactory.getLog(RetryInvocationHandler.class); + public static final Logger LOG = LoggerFactory.getLogger( + RetryInvocationHandler.class); static class Call { private final Method method; private final Object[] args; private final boolean isRpc; private final int callId; - final Counters counters; + private final Counters counters = new Counters(); private final RetryPolicy retryPolicy; private final RetryInvocationHandler retryInvocationHandler; + private RetryInfo retryInfo; + Call(Method method, Object[] args, boolean isRpc, int callId, - Counters counters, RetryInvocationHandler retryInvocationHandler) { + RetryInvocationHandler retryInvocationHandler) { this.method = method; this.args = args; this.isRpc = isRpc; this.callId = callId; - this.counters = counters; this.retryPolicy = retryInvocationHandler.getRetryPolicy(method); this.retryInvocationHandler = retryInvocationHandler; } + int getCallId() { + return callId; + } + + Counters getCounters() { + return counters; + } + + synchronized Long getWaitTime(final long now) { + return retryInfo == null? null: retryInfo.retryTime - now; + } + /** Invoke the call once without retrying. */ synchronized CallReturn invokeOnce() { try { + if (retryInfo != null) { + return processWaitTimeAndRetryInfo(); + } + // The number of times this invocation handler has ever been failed over // before this method invocation attempt. Used to prevent concurrent // failed method invocations from triggering multiple failover attempts. @@ -76,28 +95,70 @@ synchronized CallReturn invokeOnce() { return invoke(); } catch (Exception e) { if (LOG.isTraceEnabled()) { - LOG.trace(this, e); + LOG.trace(toString(), e); } if (Thread.currentThread().isInterrupted()) { // If interrupted, do not retry. throw e; } - retryInvocationHandler.handleException( - method, retryPolicy, failoverCount, counters, e); - return CallReturn.RETRY; + + retryInfo = retryInvocationHandler.handleException( + method, callId, retryPolicy, counters, failoverCount, e); + return processWaitTimeAndRetryInfo(); } } catch(Throwable t) { return new CallReturn(t); } } + /** + * It first processes the wait time, if there is any, + * and then invokes {@link #processRetryInfo()}. + * + * If the wait time is positive, it either sleeps for synchronous calls + * or immediately returns for asynchronous calls. + * + * @return {@link CallReturn#RETRY} if the retryInfo is processed; + * otherwise, return {@link CallReturn#WAIT_RETRY}. + */ + CallReturn processWaitTimeAndRetryInfo() throws InterruptedIOException { + final Long waitTime = getWaitTime(Time.monotonicNow()); + LOG.trace("#{} processRetryInfo: retryInfo={}, waitTime={}", + callId, retryInfo, waitTime); + if (waitTime != null && waitTime > 0) { + try { + Thread.sleep(retryInfo.delay); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Interrupted while waiting to retry", e); + InterruptedIOException intIOE = new InterruptedIOException( + "Retry interrupted"); + intIOE.initCause(e); + throw intIOE; + } + } + processRetryInfo(); + return CallReturn.RETRY; + } + + synchronized void processRetryInfo() { + counters.retries++; + if (retryInfo.isFailover()) { + retryInvocationHandler.proxyDescriptor.failover( + retryInfo.expectedFailoverCount, method, callId); + counters.failovers++; + } + retryInfo = null; + } + CallReturn invoke() throws Throwable { return new CallReturn(invokeMethod()); } Object invokeMethod() throws Throwable { if (isRpc) { - Client.setCallIdAndRetryCount(callId, counters.retries); + Client.setCallIdAndRetryCount(callId, counters.retries, + retryInvocationHandler.asyncCallHandler); } return retryInvocationHandler.invokeMethod(method, args); } @@ -146,15 +207,16 @@ synchronized long getFailoverCount() { return failoverCount; } - synchronized void failover(long expectedFailoverCount, Method method) { + synchronized void failover(long expectedFailoverCount, Method method, + int callId) { // Make sure that concurrent failed invocations only cause a single // actual failover. if (failoverCount == expectedFailoverCount) { fpp.performFailover(proxyInfo.proxy); failoverCount++; } else { - LOG.warn("A failover has occurred since the start of " - + proxyInfo.getString(method.getName())); + LOG.warn("A failover has occurred since the start of call #" + callId + + " " + proxyInfo.getString(method.getName())); } proxyInfo = fpp.getProxy(); } @@ -172,22 +234,33 @@ void close() throws IOException { } private static class RetryInfo { + private final long retryTime; private final long delay; - private final RetryAction failover; - private final RetryAction fail; + private final RetryAction action; + private final long expectedFailoverCount; - RetryInfo(long delay, RetryAction failover, RetryAction fail) { + RetryInfo(long delay, RetryAction action, long expectedFailoverCount) { this.delay = delay; - this.failover = failover; - this.fail = fail; + this.retryTime = Time.monotonicNow() + delay; + this.action = action; + this.expectedFailoverCount = expectedFailoverCount; + } + + boolean isFailover() { + return action != null + && action.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY; + } + + boolean isFail() { + return action != null + && action.action == RetryAction.RetryDecision.FAIL; } static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, - Counters counters, boolean idempotentOrAtMostOnce) throws Exception { + Counters counters, boolean idempotentOrAtMostOnce, + long expectedFailoverCount) throws Exception { + RetryAction max = null; long maxRetryDelay = 0; - RetryAction failover = null; - RetryAction retry = null; - RetryAction fail = null; final Iterable exceptions = e instanceof MultiException ? ((MultiException) e).getExceptions().values() @@ -195,23 +268,19 @@ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, for (Exception exception : exceptions) { final RetryAction a = policy.shouldRetry(exception, counters.retries, counters.failovers, idempotentOrAtMostOnce); - if (a.action == RetryAction.RetryDecision.FAIL) { - fail = a; - } else { + if (a.action != RetryAction.RetryDecision.FAIL) { // must be a retry or failover - if (a.action == RetryAction.RetryDecision.FAILOVER_AND_RETRY) { - failover = a; - } else { - retry = a; - } if (a.delayMillis > maxRetryDelay) { maxRetryDelay = a.delayMillis; } } + + if (max == null || max.action.compareTo(a.action) < 0) { + max = a; + } } - return new RetryInfo(maxRetryDelay, failover, - failover == null && retry == null? fail: null); + return new RetryInfo(maxRetryDelay, max, expectedFailoverCount); } } @@ -246,13 +315,12 @@ private long getFailoverCount() { return proxyDescriptor.getFailoverCount(); } - private Call newCall(Method method, Object[] args, boolean isRpc, int callId, - Counters counters) { + private Call newCall(Method method, Object[] args, boolean isRpc, + int callId) { if (Client.isAsynchronousMode()) { - return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, - counters, this); + return asyncCallHandler.newAsyncCall(method, args, isRpc, callId, this); } else { - return new Call(method, args, isRpc, callId, counters, this); + return new Call(method, args, isRpc, callId, this); } } @@ -261,9 +329,8 @@ public Object invoke(Object proxy, Method method, Object[] args) throws Throwable { final boolean isRpc = isRpcInvocation(proxyDescriptor.getProxy()); final int callId = isRpc? Client.nextCallId(): RpcConstants.INVALID_CALL_ID; - final Counters counters = new Counters(); - final Call call = newCall(method, args, isRpc, callId, counters); + final Call call = newCall(method, args, isRpc, callId); while (true) { final CallReturn c = call.invokeOnce(); final CallReturn.State state = c.getState(); @@ -275,45 +342,24 @@ public Object invoke(Object proxy, Method method, Object[] args) } } - private void handleException(final Method method, final RetryPolicy policy, - final long expectedFailoverCount, final Counters counters, - final Exception ex) throws Exception { - final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, ex, counters, - proxyDescriptor.idempotentOrAtMostOnce(method)); - counters.retries++; - - if (retryInfo.fail != null) { + private RetryInfo handleException(final Method method, final int callId, + final RetryPolicy policy, final Counters counters, + final long expectFailoverCount, final Exception e) throws Exception { + final RetryInfo retryInfo = RetryInfo.newRetryInfo(policy, e, + counters, proxyDescriptor.idempotentOrAtMostOnce(method), + expectFailoverCount); + if (retryInfo.isFail()) { // fail. - if (retryInfo.fail.reason != null) { - LOG.warn("Exception while invoking " + if (retryInfo.action.reason != null) { + LOG.warn("Exception while invoking call #" + callId + " " + proxyDescriptor.getProxyInfo().getString(method.getName()) - + ". Not retrying because " + retryInfo.fail.reason, ex); + + ". Not retrying because " + retryInfo.action.reason, e); } - throw ex; + throw e; } - // retry - final boolean isFailover = retryInfo.failover != null; - - log(method, isFailover, counters.failovers, retryInfo.delay, ex); - - if (retryInfo.delay > 0) { - try { - Thread.sleep(retryInfo.delay); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - LOG.warn("Interrupted while waiting to retry", e); - InterruptedIOException intIOE = new InterruptedIOException( - "Retry interrupted"); - intIOE.initCause(e); - throw intIOE; - } - } - - if (isFailover) { - proxyDescriptor.failover(expectedFailoverCount, method); - counters.failovers++; - } + log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e); + return retryInfo; } private void log(final Method method, final boolean isFailover, diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java index f3e2bd1713..20c0307e37 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryPolicy.java @@ -67,6 +67,7 @@ public String toString() { } public enum RetryDecision { + // Ordering: FAIL < RETRY < FAILOVER_AND_RETRY. FAIL, RETRY, FAILOVER_AND_RETRY diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ed8d905a96..183bad41fa 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -93,6 +93,8 @@ public class Client implements AutoCloseable { private static final ThreadLocal callId = new ThreadLocal(); private static final ThreadLocal retryCount = new ThreadLocal(); + private static final ThreadLocal EXTERNAL_CALL_HANDLER + = new ThreadLocal<>(); private static final ThreadLocal> ASYNC_RPC_RESPONSE = new ThreadLocal<>(); private static final ThreadLocal asynchronousMode = @@ -111,13 +113,15 @@ protected Boolean initialValue() { } /** Set call id and retry count for the next call. */ - public static void setCallIdAndRetryCount(int cid, int rc) { + public static void setCallIdAndRetryCount(int cid, int rc, + Object externalHandler) { Preconditions.checkArgument(cid != RpcConstants.INVALID_CALL_ID); Preconditions.checkState(callId.get() == null); Preconditions.checkArgument(rc != RpcConstants.INVALID_RETRY_COUNT); callId.set(cid); retryCount.set(rc); + EXTERNAL_CALL_HANDLER.set(externalHandler); } private ConcurrentMap connections = @@ -333,6 +337,7 @@ static class Call { IOException error; // exception, null if success final RPC.RpcKind rpcKind; // Rpc EngineKind boolean done; // true when call is done + private final Object externalHandler; private Call(RPC.RpcKind rpcKind, Writable param) { this.rpcKind = rpcKind; @@ -352,6 +357,8 @@ private Call(RPC.RpcKind rpcKind, Writable param) { } else { this.retry = rc; } + + this.externalHandler = EXTERNAL_CALL_HANDLER.get(); } @Override @@ -364,6 +371,12 @@ public String toString() { protected synchronized void callComplete() { this.done = true; notify(); // notify caller + + if (externalHandler != null) { + synchronized (externalHandler) { + externalHandler.notify(); + } + } } /** Set the exception when there is an error. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java index 8a61c04fee..56dec3a203 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/io/retry/TestDefaultRetryPolicy.java @@ -21,6 +21,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.RemoteException; import org.apache.hadoop.ipc.RetriableException; +import org.junit.Assert; import org.junit.Rule; import org.junit.Test; import org.junit.rules.Timeout; @@ -35,7 +36,18 @@ */ public class TestDefaultRetryPolicy { @Rule - public Timeout timeout = new Timeout(300000); + public Timeout timeout = new Timeout(30000); + + /** Verify FAIL < RETRY < FAILOVER_AND_RETRY. */ + @Test + public void testRetryDecisionOrdering() throws Exception { + Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo( + RetryPolicy.RetryAction.RetryDecision.RETRY) < 0); + Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.RETRY.compareTo( + RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0); + Assert.assertTrue(RetryPolicy.RetryAction.RetryDecision.FAIL.compareTo( + RetryPolicy.RetryAction.RetryDecision.FAILOVER_AND_RETRY) < 0); + } /** * Verify that the default retry policy correctly retries diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java index 4450c0ccbf..3f2802f330 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestAsyncIPC.java @@ -367,7 +367,7 @@ public void testCallIdAndRetry() throws IOException, InterruptedException, Call createCall(RpcKind rpcKind, Writable rpcRequest) { // Set different call id and retry count for the next call Client.setCallIdAndRetryCount(Client.nextCallId(), - TestIPC.RANDOM.nextInt(255)); + TestIPC.RANDOM.nextInt(255), null); final Call call = super.createCall(rpcKind, rpcRequest); @@ -421,7 +421,7 @@ public void testCallRetryCount() throws IOException, InterruptedException, final int retryCount = 255; // Override client to store the call id final Client client = new Client(LongWritable.class, conf); - Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount); + Client.setCallIdAndRetryCount(Client.nextCallId(), retryCount, null); // Attach a listener that tracks every call ID received by the server. final TestServer server = new TestIPC.TestServer(1, false, conf); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java index 6bfcc537da..82da62de77 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestIPC.java @@ -1172,7 +1172,7 @@ public void run() { retryProxy.dummyRun(); Assert.assertEquals(TestInvocationHandler.retry, totalRetry + 1); } finally { - Client.setCallIdAndRetryCount(0, 0); + Client.setCallIdAndRetryCount(0, 0, null); client.stop(); server.stop(); } @@ -1205,7 +1205,7 @@ public void testNoRetryOnInvalidToken() throws IOException { } finally { // Check if dummyRun called only once Assert.assertEquals(handler.invocations, 1); - Client.setCallIdAndRetryCount(0, 0); + Client.setCallIdAndRetryCount(0, 0, null); client.stop(); server.stop(); } @@ -1250,7 +1250,7 @@ public void testCallRetryCount() throws IOException { final int retryCount = 255; // Override client to store the call id final Client client = new Client(LongWritable.class, conf); - Client.setCallIdAndRetryCount(Client.nextCallId(), 255); + Client.setCallIdAndRetryCount(Client.nextCallId(), 255, null); // Attach a listener that tracks every call ID received by the server. final TestServer server = new TestServer(1, false);