HDFS-11395. RequestHedgingProxyProvider#RequestHedgingInvocationHandler hides the Exception thrown from NameNode. Contributed by Nandakumar.
This commit is contained in:
parent
b8c69557b7
commit
55796a0946
@ -240,12 +240,15 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||
private final long delay;
|
||||
private final RetryAction action;
|
||||
private final long expectedFailoverCount;
|
||||
private final Exception failException;
|
||||
|
||||
RetryInfo(long delay, RetryAction action, long expectedFailoverCount) {
|
||||
RetryInfo(long delay, RetryAction action, long expectedFailoverCount,
|
||||
Exception failException) {
|
||||
this.delay = delay;
|
||||
this.retryTime = Time.monotonicNow() + delay;
|
||||
this.action = action;
|
||||
this.expectedFailoverCount = expectedFailoverCount;
|
||||
this.failException = failException;
|
||||
}
|
||||
|
||||
boolean isFailover() {
|
||||
@ -258,11 +261,16 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||
&& action.action == RetryAction.RetryDecision.FAIL;
|
||||
}
|
||||
|
||||
Exception getFailException() {
|
||||
return failException;
|
||||
}
|
||||
|
||||
static RetryInfo newRetryInfo(RetryPolicy policy, Exception e,
|
||||
Counters counters, boolean idempotentOrAtMostOnce,
|
||||
long expectedFailoverCount) throws Exception {
|
||||
RetryAction max = null;
|
||||
long maxRetryDelay = 0;
|
||||
Exception ex = null;
|
||||
|
||||
final Iterable<Exception> exceptions = e instanceof MultiException ?
|
||||
((MultiException) e).getExceptions().values()
|
||||
@ -279,10 +287,13 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||
|
||||
if (max == null || max.action.compareTo(a.action) < 0) {
|
||||
max = a;
|
||||
if (a.action == RetryAction.RetryDecision.FAIL) {
|
||||
ex = exception;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount);
|
||||
return new RetryInfo(maxRetryDelay, max, expectedFailoverCount, ex);
|
||||
}
|
||||
}
|
||||
|
||||
@ -359,7 +370,7 @@ public class RetryInvocationHandler<T> implements RpcInvocationHandler {
|
||||
+ ". Not retrying because " + retryInfo.action.reason, e);
|
||||
}
|
||||
}
|
||||
throw e;
|
||||
throw retryInfo.getFailException();
|
||||
}
|
||||
|
||||
log(method, retryInfo.isFailover(), counters.failovers, retryInfo.delay, e);
|
||||
|
@ -17,7 +17,6 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.lang.reflect.InvocationHandler;
|
||||
import java.lang.reflect.Method;
|
||||
import java.lang.reflect.Proxy;
|
||||
@ -122,7 +121,7 @@ public class RequestHedgingProxyProvider<T> extends
|
||||
} catch (Exception ex) {
|
||||
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
||||
logProxyException(ex, tProxyInfo.proxyInfo);
|
||||
badResults.put(tProxyInfo.proxyInfo, ex);
|
||||
badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
|
||||
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
||||
numAttempts--;
|
||||
}
|
||||
@ -207,16 +206,36 @@ public class RequestHedgingProxyProvider<T> extends
|
||||
* @return If the exception is caused by an standby namenode.
|
||||
*/
|
||||
private boolean isStandbyException(Exception ex) {
|
||||
Throwable cause = ex.getCause();
|
||||
if (cause != null) {
|
||||
Throwable cause2 = cause.getCause();
|
||||
if (cause2 instanceof RemoteException) {
|
||||
RemoteException remoteException = (RemoteException)cause2;
|
||||
IOException unwrapRemoteException =
|
||||
remoteException.unwrapRemoteException();
|
||||
return unwrapRemoteException instanceof StandbyException;
|
||||
}
|
||||
Exception exception = unwrapException(ex);
|
||||
if (exception instanceof RemoteException) {
|
||||
return ((RemoteException) exception).unwrapRemoteException()
|
||||
instanceof StandbyException;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Unwraps the exception. <p>
|
||||
* Example:
|
||||
* <blockquote><pre>
|
||||
* if ex is
|
||||
* ExecutionException(InvocationTargetExeption(SomeException))
|
||||
* returns SomeException
|
||||
* </pre></blockquote>
|
||||
*
|
||||
* @return unwrapped exception
|
||||
*/
|
||||
private Exception unwrapException(Exception ex) {
|
||||
if (ex != null) {
|
||||
Throwable cause = ex.getCause();
|
||||
if (cause instanceof Exception) {
|
||||
Throwable innerCause = cause.getCause();
|
||||
if (innerCause instanceof Exception) {
|
||||
return (Exception) innerCause;
|
||||
}
|
||||
return (Exception) cause;
|
||||
}
|
||||
}
|
||||
return ex;
|
||||
}
|
||||
}
|
||||
|
@ -17,7 +17,10 @@
|
||||
*/
|
||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||
|
||||
import java.io.EOFException;
|
||||
import java.io.FileNotFoundException;
|
||||
import java.io.IOException;
|
||||
import java.net.ConnectException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.net.URI;
|
||||
import java.net.URISyntaxException;
|
||||
@ -30,6 +33,8 @@ import org.apache.hadoop.hdfs.DFSConfigKeys;
|
||||
import org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider.ProxyFactory;
|
||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||
import org.apache.hadoop.io.retry.MultiException;
|
||||
import org.apache.hadoop.ipc.RemoteException;
|
||||
import org.apache.hadoop.ipc.StandbyException;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.test.GenericTestUtils;
|
||||
import org.apache.hadoop.util.Time;
|
||||
@ -38,6 +43,7 @@ import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.BeforeClass;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Matchers;
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.stubbing.Answer;
|
||||
@ -198,7 +204,7 @@ public class TestRequestHedgingProxyProvider {
|
||||
Assert.assertTrue(stats.length == 1);
|
||||
Assert.assertEquals(2, stats[0]);
|
||||
|
||||
// Counter shuodl update only once
|
||||
// Counter should update only once
|
||||
Assert.assertEquals(5, counter.get());
|
||||
|
||||
stats = provider.getProxy().proxy.getStats();
|
||||
@ -347,6 +353,106 @@ public class TestRequestHedgingProxyProvider {
|
||||
Assert.assertEquals(12, counter.get());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHedgingWhenFileNotFoundException() throws Exception {
|
||||
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito
|
||||
.when(active.getBlockLocations(Matchers.anyString(),
|
||||
Matchers.anyLong(), Matchers.anyLong()))
|
||||
.thenThrow(new RemoteException("java.io.FileNotFoundException",
|
||||
"File does not exist!"));
|
||||
|
||||
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito
|
||||
.when(standby.getBlockLocations(Matchers.anyString(),
|
||||
Matchers.anyLong(), Matchers.anyLong()))
|
||||
.thenThrow(
|
||||
new RemoteException("org.apache.hadoop.ipc.StandbyException",
|
||||
"Standby NameNode"));
|
||||
|
||||
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||
NamenodeProtocols.class, createFactory(active, standby));
|
||||
try {
|
||||
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
|
||||
Assert.fail("Should fail since the active namenode throws"
|
||||
+ " FileNotFoundException!");
|
||||
} catch (MultiException me) {
|
||||
for (Exception ex : me.getExceptions().values()) {
|
||||
Exception rEx = ((RemoteException) ex).unwrapRemoteException();
|
||||
if (rEx instanceof StandbyException) {
|
||||
continue;
|
||||
}
|
||||
Assert.assertTrue(rEx instanceof FileNotFoundException);
|
||||
}
|
||||
}
|
||||
Mockito.verify(active).getBlockLocations(Matchers.anyString(),
|
||||
Matchers.anyLong(), Matchers.anyLong());
|
||||
Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
|
||||
Matchers.anyLong(), Matchers.anyLong());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHedgingWhenConnectException() throws Exception {
|
||||
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito.when(active.getStats()).thenThrow(new ConnectException());
|
||||
|
||||
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito.when(standby.getStats())
|
||||
.thenThrow(
|
||||
new RemoteException("org.apache.hadoop.ipc.StandbyException",
|
||||
"Standby NameNode"));
|
||||
|
||||
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||
NamenodeProtocols.class, createFactory(active, standby));
|
||||
try {
|
||||
provider.getProxy().proxy.getStats();
|
||||
Assert.fail("Should fail since the active namenode throws"
|
||||
+ " ConnectException!");
|
||||
} catch (MultiException me) {
|
||||
for (Exception ex : me.getExceptions().values()) {
|
||||
if (ex instanceof RemoteException) {
|
||||
Exception rEx = ((RemoteException) ex)
|
||||
.unwrapRemoteException();
|
||||
Assert.assertTrue("Unexpected RemoteException: " + rEx.getMessage(),
|
||||
rEx instanceof StandbyException);
|
||||
} else {
|
||||
Assert.assertTrue(ex instanceof ConnectException);
|
||||
}
|
||||
}
|
||||
}
|
||||
Mockito.verify(active).getStats();
|
||||
Mockito.verify(standby).getStats();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHedgingWhenConnectAndEOFException() throws Exception {
|
||||
NamenodeProtocols active = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito.when(active.getStats()).thenThrow(new EOFException());
|
||||
|
||||
NamenodeProtocols standby = Mockito.mock(NamenodeProtocols.class);
|
||||
Mockito.when(standby.getStats()).thenThrow(new ConnectException());
|
||||
|
||||
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||
NamenodeProtocols.class, createFactory(active, standby));
|
||||
try {
|
||||
provider.getProxy().proxy.getStats();
|
||||
Assert.fail("Should fail since both active and standby namenodes throw"
|
||||
+ " Exceptions!");
|
||||
} catch (MultiException me) {
|
||||
for (Exception ex : me.getExceptions().values()) {
|
||||
if (!(ex instanceof ConnectException) &&
|
||||
!(ex instanceof EOFException)) {
|
||||
Assert.fail("Unexpected Exception " + ex.getMessage());
|
||||
}
|
||||
}
|
||||
}
|
||||
Mockito.verify(active).getStats();
|
||||
Mockito.verify(standby).getStats();
|
||||
}
|
||||
|
||||
private ProxyFactory<NamenodeProtocols> createFactory(
|
||||
NamenodeProtocols... protos) {
|
||||
final Iterator<NamenodeProtocols> iterator =
|
||||
|
Loading…
x
Reference in New Issue
Block a user