diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java index 8487602910..ffdd9288c4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/io/retry/RetryInvocationHandler.java @@ -240,12 +240,15 @@ private static class RetryInfo { private final long delay; private final RetryAction action; private final long expectedFailoverCount; + private final Exception failException; - RetryInfo(long delay, RetryAction action, long expectedFailoverCount) { + RetryInfo(long delay, RetryAction action, long expectedFailoverCount, + Exception failException) { this.delay = delay; this.retryTime = Time.monotonicNow() + delay; this.action = action; this.expectedFailoverCount = expectedFailoverCount; + this.failException = failException; } boolean isFailover() { @@ -258,11 +261,16 @@ boolean isFail() { && action.action == RetryAction.RetryDecision.FAIL; } + Exception getFailException() { + return failException; + } + static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, Counters counters, boolean idempotentOrAtMostOnce, long expectedFailoverCount) throws Exception { RetryAction max = null; long maxRetryDelay = 0; + Exception ex = null; final Iterable exceptions = e instanceof MultiException ? ((MultiException) e).getExceptions().values() @@ -279,10 +287,13 @@ static RetryInfo newRetryInfo(RetryPolicy policy, Exception e, if (max == null || max.action.compareTo(a.action) < 0) { max = a; + if (a.action == RetryAction.RetryDecision.FAIL) { + ex = exception; + } } } - return new RetryInfo(maxRetryDelay, max, expectedFailoverCount); + return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex); } } @@ -359,7 +370,7 @@ private RetryInfo handleException(final Method method, final int callId, + ". Not retrying because " + retryInfo.action.reason, e); } } - throw e; + throw retryInfo.getFailException(); } log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java index 945e92f68c..a765e95411 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/namenode/ha/RequestHedgingProxyProvider.java @@ -17,7 +17,6 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; -import java.io.IOException; import java.lang.reflect.InvocationHandler; import java.lang.reflect.Method; import java.lang.reflect.Proxy; @@ -122,7 +121,7 @@ public Object call() throws Exception { } catch (Exception ex) { ProxyInfo tProxyInfo = proxyMap.get(callResultFuture); logProxyException(ex, tProxyInfo.proxyInfo); - badResults.put(tProxyInfo.proxyInfo, ex); + badResults.put(tProxyInfo.proxyInfo, unwrapException(ex)); LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo); numAttempts--; } @@ -207,16 +206,36 @@ private void logProxyException(Exception ex, String proxyInfo) { * @return If the exception is caused by an standby namenode. */ private boolean isStandbyException(Exception ex) { - Throwable cause = ex.getCause(); - if (cause != null) { - Throwable cause2 = cause.getCause(); - if (cause2 instanceof RemoteException) { - RemoteException remoteException = (RemoteException)cause2; - IOException unwrapRemoteException = - remoteException.unwrapRemoteException(); - return unwrapRemoteException instanceof StandbyException; - } + Exception exception = unwrapException(ex); + if (exception instanceof RemoteException) { + return ((RemoteException) exception).unwrapRemoteException() + instanceof StandbyException; } return false; } + + /** + * Unwraps the exception.

+ * Example: + *

+   * if ex is
+   * ExecutionException(InvocationTargetExeption(SomeException))
+   * returns SomeException
+   * 
+ * + * @return unwrapped exception + */ + private Exception unwrapException(Exception ex) { + if (ex != null) { + Throwable cause = ex.getCause(); + if (cause instanceof Exception) { + Throwable innerCause = cause.getCause(); + if (innerCause instanceof Exception) { + return (Exception) innerCause; + } + return (Exception) cause; + } + } + return ex; + } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java index 3a910c14aa..37532d5c88 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/namenode/ha/TestRequestHedgingProxyProvider.java @@ -17,7 +17,10 @@ */ package org.apache.hadoop.hdfs.server.namenode.ha; +import java.io.EOFException; +import java.io.FileNotFoundException; import java.io.IOException; +import java.net.ConnectException; import java.net.InetSocketAddress; import java.net.URI; import java.net.URISyntaxException; @@ -30,6 +33,8 @@ import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory; import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols; import org.apache.hadoop.io.retry.MultiException; +import org.apache.hadoop.ipc.RemoteException; +import org.apache.hadoop.ipc.StandbyException; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.Time; @@ -38,6 +43,7 @@ import org.junit.Before; import org.junit.BeforeClass; import org.junit.Test; +import org.mockito.Matchers; import org.mockito.Mockito; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; @@ -198,7 +204,7 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { Assert.assertTrue(stats.length == 1); Assert.assertEquals(2, stats[0]); - // Counter shuodl update only once + // Counter should update only once Assert.assertEquals(5, counter.get()); stats = provider.getProxy().proxy.getStats(); @@ -347,6 +353,106 @@ public long[] answer(InvocationOnMock invocation) throws Throwable { Assert.assertEquals(12, counter.get()); } + @Test + public void testHedgingWhenFileNotFoundException() throws Exception { + NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + Mockito + .when(active.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow(new RemoteException("java.io.FileNotFoundException", + "File does not exist!")); + + NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + Mockito + .when(standby.getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong())) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + NamenodeProtocols.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L); + Assert.fail("Should fail since the active namenode throws" + + " FileNotFoundException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + Exception rEx = ((RemoteException) ex).unwrapRemoteException(); + if (rEx instanceof StandbyException) { + continue; + } + Assert.assertTrue(rEx instanceof FileNotFoundException); + } + } + Mockito.verify(active).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + Mockito.verify(standby).getBlockLocations(Matchers.anyString(), + Matchers.anyLong(), Matchers.anyLong()); + } + + @Test + public void testHedgingWhenConnectException() throws Exception { + NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + Mockito.when(active.getStats()).thenThrow(new ConnectException()); + + NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + Mockito.when(standby.getStats()) + .thenThrow( + new RemoteException("org.apache.hadoop.ipc.StandbyException", + "Standby NameNode")); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + NamenodeProtocols.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since the active namenode throws" + + " ConnectException!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + if (ex instanceof RemoteException) { + Exception rEx = ((RemoteException) ex) + .unwrapRemoteException(); + Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(), + rEx instanceof StandbyException); + } else { + Assert.assertTrue(ex instanceof ConnectException); + } + } + } + Mockito.verify(active).getStats(); + Mockito.verify(standby).getStats(); + } + + @Test + public void testHedgingWhenConnectAndEOFException() throws Exception { + NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class); + Mockito.when(active.getStats()).thenThrow(new EOFException()); + + NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class); + Mockito.when(standby.getStats()).thenThrow(new ConnectException()); + + RequestHedgingProxyProvider provider = + new RequestHedgingProxyProvider<>(conf, nnUri, + NamenodeProtocols.class, createFactory(active, standby)); + try { + provider.getProxy().proxy.getStats(); + Assert.fail("Should fail since both active and standby namenodes throw" + + " Exceptions!"); + } catch (MultiException me) { + for (Exception ex : me.getExceptions().values()) { + if (!(ex instanceof ConnectException) && + !(ex instanceof EOFException)) { + Assert.fail("Unexpected Exception " + ex.getMessage()); + } + } + } + Mockito.verify(active).getStats(); + Mockito.verify(standby).getStats(); + } + private ProxyFactory createFactory( NamenodeProtocols... protos) { final Iterator iterator =