HDFS-12813. RequestHedgingProxyProvider can hide Exception thrown from the Namenode for proxy size of 1. Contributed by Mukul Kumar Singh
This commit is contained in:
parent
60fc2a1388
commit
659e85e304
@ -18,6 +18,7 @@
|
|||||||
package org.apache.hadoop.hdfs.server.namenode.ha;
|
package org.apache.hadoop.hdfs.server.namenode.ha;
|
||||||
|
|
||||||
import java.lang.reflect.InvocationHandler;
|
import java.lang.reflect.InvocationHandler;
|
||||||
|
import java.lang.reflect.InvocationTargetException;
|
||||||
import java.lang.reflect.Method;
|
import java.lang.reflect.Method;
|
||||||
import java.lang.reflect.Proxy;
|
import java.lang.reflect.Proxy;
|
||||||
import java.net.URI;
|
import java.net.URI;
|
||||||
@ -29,6 +30,7 @@
|
|||||||
import java.util.concurrent.ExecutorService;
|
import java.util.concurrent.ExecutorService;
|
||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
import java.util.concurrent.ExecutionException;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
@ -87,9 +89,19 @@ public RequestHedgingInvocationHandler(
|
|||||||
targetProxies.remove(toIgnore);
|
targetProxies.remove(toIgnore);
|
||||||
if (targetProxies.size() == 1) {
|
if (targetProxies.size() == 1) {
|
||||||
ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
|
ProxyInfo<T> proxyInfo = targetProxies.values().iterator().next();
|
||||||
Object retVal = method.invoke(proxyInfo.proxy, args);
|
try {
|
||||||
successfulProxy = proxyInfo;
|
currentUsedProxy = proxyInfo;
|
||||||
return retVal;
|
Object retVal = method.invoke(proxyInfo.proxy, args);
|
||||||
|
LOG.debug("Invocation successful on [{}]",
|
||||||
|
currentUsedProxy.proxyInfo);
|
||||||
|
return retVal;
|
||||||
|
} catch (InvocationTargetException ex) {
|
||||||
|
Exception unwrappedException = unwrapInvocationTargetException(ex);
|
||||||
|
logProxyException(unwrappedException, currentUsedProxy.proxyInfo);
|
||||||
|
LOG.trace("Unsuccessful invocation on [{}]",
|
||||||
|
currentUsedProxy.proxyInfo);
|
||||||
|
throw unwrappedException;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
executor = Executors.newFixedThreadPool(proxies.size());
|
executor = Executors.newFixedThreadPool(proxies.size());
|
||||||
completionService = new ExecutorCompletionService<>(executor);
|
completionService = new ExecutorCompletionService<>(executor);
|
||||||
@ -112,15 +124,16 @@ public Object call() throws Exception {
|
|||||||
Future<Object> callResultFuture = completionService.take();
|
Future<Object> callResultFuture = completionService.take();
|
||||||
Object retVal;
|
Object retVal;
|
||||||
try {
|
try {
|
||||||
|
currentUsedProxy = proxyMap.get(callResultFuture);
|
||||||
retVal = callResultFuture.get();
|
retVal = callResultFuture.get();
|
||||||
successfulProxy = proxyMap.get(callResultFuture);
|
|
||||||
LOG.debug("Invocation successful on [{}]",
|
LOG.debug("Invocation successful on [{}]",
|
||||||
successfulProxy.proxyInfo);
|
currentUsedProxy.proxyInfo);
|
||||||
return retVal;
|
return retVal;
|
||||||
} catch (Exception ex) {
|
} catch (ExecutionException ex) {
|
||||||
|
Exception unwrappedException = unwrapExecutionException(ex);
|
||||||
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
||||||
logProxyException(ex, tProxyInfo.proxyInfo);
|
logProxyException(unwrappedException, tProxyInfo.proxyInfo);
|
||||||
badResults.put(tProxyInfo.proxyInfo, unwrapException(ex));
|
badResults.put(tProxyInfo.proxyInfo, unwrappedException);
|
||||||
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
||||||
numAttempts--;
|
numAttempts--;
|
||||||
}
|
}
|
||||||
@ -143,7 +156,7 @@ public Object call() throws Exception {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private volatile ProxyInfo<T> successfulProxy = null;
|
private volatile ProxyInfo<T> currentUsedProxy = null;
|
||||||
private volatile String toIgnore = null;
|
private volatile String toIgnore = null;
|
||||||
|
|
||||||
public RequestHedgingProxyProvider(Configuration conf, URI uri,
|
public RequestHedgingProxyProvider(Configuration conf, URI uri,
|
||||||
@ -154,8 +167,8 @@ public RequestHedgingProxyProvider(Configuration conf, URI uri,
|
|||||||
@SuppressWarnings("unchecked")
|
@SuppressWarnings("unchecked")
|
||||||
@Override
|
@Override
|
||||||
public synchronized ProxyInfo<T> getProxy() {
|
public synchronized ProxyInfo<T> getProxy() {
|
||||||
if (successfulProxy != null) {
|
if (currentUsedProxy != null) {
|
||||||
return successfulProxy;
|
return currentUsedProxy;
|
||||||
}
|
}
|
||||||
Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
|
Map<String, ProxyInfo<T>> targetProxyInfos = new HashMap<>();
|
||||||
StringBuilder combinedInfo = new StringBuilder("[");
|
StringBuilder combinedInfo = new StringBuilder("[");
|
||||||
@ -175,8 +188,8 @@ public synchronized ProxyInfo<T> getProxy() {
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public synchronized void performFailover(T currentProxy) {
|
public synchronized void performFailover(T currentProxy) {
|
||||||
toIgnore = successfulProxy.proxyInfo;
|
toIgnore = this.currentUsedProxy.proxyInfo;
|
||||||
successfulProxy = null;
|
this.currentUsedProxy = null;
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -187,19 +200,18 @@ public synchronized void performFailover(T currentProxy) {
|
|||||||
*/
|
*/
|
||||||
private void logProxyException(Exception ex, String proxyInfo) {
|
private void logProxyException(Exception ex, String proxyInfo) {
|
||||||
if (isStandbyException(ex)) {
|
if (isStandbyException(ex)) {
|
||||||
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo);
|
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo, ex);
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Invocation returned exception on [{}]", proxyInfo);
|
LOG.warn("Invocation returned exception on [{}]", proxyInfo, ex);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Check if the returned exception is caused by an standby namenode.
|
* Check if the returned exception is caused by an standby namenode.
|
||||||
* @param ex Exception to check.
|
* @param exception Exception to check.
|
||||||
* @return If the exception is caused by an standby namenode.
|
* @return If the exception is caused by an standby namenode.
|
||||||
*/
|
*/
|
||||||
private boolean isStandbyException(Exception ex) {
|
private boolean isStandbyException(Exception exception) {
|
||||||
Exception exception = unwrapException(ex);
|
|
||||||
if (exception instanceof RemoteException) {
|
if (exception instanceof RemoteException) {
|
||||||
return ((RemoteException) exception).unwrapRemoteException()
|
return ((RemoteException) exception).unwrapRemoteException()
|
||||||
instanceof StandbyException;
|
instanceof StandbyException;
|
||||||
@ -208,24 +220,43 @@ private boolean isStandbyException(Exception ex) {
|
|||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Unwraps the exception. <p>
|
* Unwraps the ExecutionException. <p>
|
||||||
* Example:
|
* Example:
|
||||||
* <blockquote><pre>
|
* <blockquote><pre>
|
||||||
* if ex is
|
* if ex is
|
||||||
* ExecutionException(InvocationTargetExeption(SomeException))
|
* ExecutionException(InvocationTargetException(SomeException))
|
||||||
* returns SomeException
|
* returns SomeException
|
||||||
* </pre></blockquote>
|
* </pre></blockquote>
|
||||||
*
|
*
|
||||||
* @return unwrapped exception
|
* @return unwrapped exception
|
||||||
*/
|
*/
|
||||||
private Exception unwrapException(Exception ex) {
|
private Exception unwrapExecutionException(ExecutionException ex) {
|
||||||
|
if (ex != null) {
|
||||||
|
Throwable cause = ex.getCause();
|
||||||
|
if (cause instanceof InvocationTargetException) {
|
||||||
|
return
|
||||||
|
unwrapInvocationTargetException((InvocationTargetException)cause);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return ex;
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Unwraps the InvocationTargetException. <p>
|
||||||
|
* Example:
|
||||||
|
* <blockquote><pre>
|
||||||
|
* if ex is InvocationTargetException(SomeException)
|
||||||
|
* returns SomeException
|
||||||
|
* </pre></blockquote>
|
||||||
|
*
|
||||||
|
* @return unwrapped exception
|
||||||
|
*/
|
||||||
|
private Exception unwrapInvocationTargetException(
|
||||||
|
InvocationTargetException ex) {
|
||||||
if (ex != null) {
|
if (ex != null) {
|
||||||
Throwable cause = ex.getCause();
|
Throwable cause = ex.getCause();
|
||||||
if (cause instanceof Exception) {
|
if (cause instanceof Exception) {
|
||||||
Throwable innerCause = cause.getCause();
|
|
||||||
if (innerCause instanceof Exception) {
|
|
||||||
return (Exception) innerCause;
|
|
||||||
}
|
|
||||||
return (Exception) cause;
|
return (Exception) cause;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -231,6 +231,64 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
Assert.assertEquals(1, stats[0]);
|
Assert.assertEquals(1, stats[0]);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testFileNotFoundExceptionWithSingleProxy() throws Exception {
|
||||||
|
ClientProtocol active = Mockito.mock(ClientProtocol.class);
|
||||||
|
Mockito
|
||||||
|
.when(active.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong()))
|
||||||
|
.thenThrow(new RemoteException("java.io.FileNotFoundException",
|
||||||
|
"File does not exist!"));
|
||||||
|
|
||||||
|
ClientProtocol standby = Mockito.mock(ClientProtocol.class);
|
||||||
|
Mockito
|
||||||
|
.when(standby.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong()))
|
||||||
|
.thenThrow(
|
||||||
|
new RemoteException("org.apache.hadoop.ipc.StandbyException",
|
||||||
|
"Standby NameNode"));
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<ClientProtocol> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri,
|
||||||
|
ClientProtocol.class, createFactory(standby, active));
|
||||||
|
try {
|
||||||
|
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
|
||||||
|
Assert.fail("Should fail since the active namenode throws"
|
||||||
|
+ " FileNotFoundException!");
|
||||||
|
} catch (MultiException me) {
|
||||||
|
for (Exception ex : me.getExceptions().values()) {
|
||||||
|
Exception rEx = ((RemoteException) ex).unwrapRemoteException();
|
||||||
|
if (rEx instanceof StandbyException) {
|
||||||
|
continue;
|
||||||
|
}
|
||||||
|
Assert.assertTrue(rEx instanceof FileNotFoundException);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Perform failover now, there will only be one active proxy now
|
||||||
|
provider.performFailover(active);
|
||||||
|
try {
|
||||||
|
provider.getProxy().proxy.getBlockLocations("/tmp/test.file", 0L, 20L);
|
||||||
|
Assert.fail("Should fail since the active namenode throws"
|
||||||
|
+ " FileNotFoundException!");
|
||||||
|
} catch (RemoteException ex) {
|
||||||
|
Exception rEx = ex.unwrapRemoteException();
|
||||||
|
if (rEx instanceof StandbyException) {
|
||||||
|
Mockito.verify(active).getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
Mockito.verify(standby, Mockito.times(2))
|
||||||
|
.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
} else {
|
||||||
|
Assert.assertTrue(rEx instanceof FileNotFoundException);
|
||||||
|
Mockito.verify(active, Mockito.times(2))
|
||||||
|
.getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
Mockito.verify(standby).getBlockLocations(Matchers.anyString(),
|
||||||
|
Matchers.anyLong(), Matchers.anyLong());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testPerformFailoverWith3Proxies() throws Exception {
|
public void testPerformFailoverWith3Proxies() throws Exception {
|
||||||
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
conf.set(HdfsClientConfigKeys.DFS_HA_NAMENODES_KEY_PREFIX + "." + ns,
|
||||||
|
Loading…
Reference in New Issue
Block a user