HDFS-10962. TestRequestHedgingProxyProvider is flaky.
This commit is contained in:
parent
00160f71b6
commit
e68c7b96c7
@ -31,14 +31,14 @@
|
|||||||
import java.util.concurrent.Executors;
|
import java.util.concurrent.Executors;
|
||||||
import java.util.concurrent.Future;
|
import java.util.concurrent.Future;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
|
||||||
import org.apache.commons.logging.LogFactory;
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ipc.RemoteException;
|
import org.apache.hadoop.ipc.RemoteException;
|
||||||
import org.apache.hadoop.ipc.StandbyException;
|
import org.apache.hadoop.ipc.StandbyException;
|
||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import org.apache.hadoop.io.retry.MultiException;
|
import org.apache.hadoop.io.retry.MultiException;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* A FailoverProxyProvider implementation that technically does not "failover"
|
* A FailoverProxyProvider implementation that technically does not "failover"
|
||||||
@ -51,8 +51,8 @@
|
|||||||
public class RequestHedgingProxyProvider<T> extends
|
public class RequestHedgingProxyProvider<T> extends
|
||||||
ConfiguredFailoverProxyProvider<T> {
|
ConfiguredFailoverProxyProvider<T> {
|
||||||
|
|
||||||
private static final Log LOG =
|
public static final Logger LOG =
|
||||||
LogFactory.getLog(RequestHedgingProxyProvider.class);
|
LoggerFactory.getLogger(RequestHedgingProxyProvider.class);
|
||||||
|
|
||||||
class RequestHedgingInvocationHandler implements InvocationHandler {
|
class RequestHedgingInvocationHandler implements InvocationHandler {
|
||||||
|
|
||||||
@ -100,6 +100,8 @@ public RequestHedgingInvocationHandler(
|
|||||||
Callable<Object> c = new Callable<Object>() {
|
Callable<Object> c = new Callable<Object>() {
|
||||||
@Override
|
@Override
|
||||||
public Object call() throws Exception {
|
public Object call() throws Exception {
|
||||||
|
LOG.trace("Invoking method {} on proxy {}", method,
|
||||||
|
pEntry.getValue().proxyInfo);
|
||||||
return method.invoke(pEntry.getValue().proxy, args);
|
return method.invoke(pEntry.getValue().proxy, args);
|
||||||
}
|
}
|
||||||
};
|
};
|
||||||
@ -114,15 +116,14 @@ public Object call() throws Exception {
|
|||||||
try {
|
try {
|
||||||
retVal = callResultFuture.get();
|
retVal = callResultFuture.get();
|
||||||
successfulProxy = proxyMap.get(callResultFuture);
|
successfulProxy = proxyMap.get(callResultFuture);
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Invocation successful on [{}]",
|
||||||
LOG.debug("Invocation successful on ["
|
successfulProxy.proxyInfo);
|
||||||
+ successfulProxy.proxyInfo + "]");
|
|
||||||
}
|
|
||||||
return retVal;
|
return retVal;
|
||||||
} catch (Exception ex) {
|
} catch (Exception ex) {
|
||||||
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
ProxyInfo<T> tProxyInfo = proxyMap.get(callResultFuture);
|
||||||
logProxyException(ex, tProxyInfo.proxyInfo);
|
logProxyException(ex, tProxyInfo.proxyInfo);
|
||||||
badResults.put(tProxyInfo.proxyInfo, ex);
|
badResults.put(tProxyInfo.proxyInfo, ex);
|
||||||
|
LOG.trace("Unsuccessful invocation on [{}]", tProxyInfo.proxyInfo);
|
||||||
numAttempts--;
|
numAttempts--;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -136,6 +137,7 @@ public Object call() throws Exception {
|
|||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
if (executor != null) {
|
if (executor != null) {
|
||||||
|
LOG.trace("Shutting down threadpool executor");
|
||||||
executor.shutdownNow();
|
executor.shutdownNow();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -193,12 +195,9 @@ public synchronized void performFailover(T currentProxy) {
|
|||||||
*/
|
*/
|
||||||
private void logProxyException(Exception ex, String proxyInfo) {
|
private void logProxyException(Exception ex, String proxyInfo) {
|
||||||
if (isStandbyException(ex)) {
|
if (isStandbyException(ex)) {
|
||||||
if (LOG.isDebugEnabled()) {
|
LOG.debug("Invocation returned standby exception on [{}]", proxyInfo);
|
||||||
LOG.debug("Invocation returned standby exception on [" +
|
|
||||||
proxyInfo + "]");
|
|
||||||
}
|
|
||||||
} else {
|
} else {
|
||||||
LOG.warn("Invocation returned exception on [" + proxyInfo + "]");
|
LOG.warn("Invocation returned exception on [{}]", proxyInfo);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -31,9 +31,12 @@
|
|||||||
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
import org.apache.hadoop.hdfs.server.protocol.NamenodeProtocols;
|
||||||
import org.apache.hadoop.io.retry.MultiException;
|
import org.apache.hadoop.io.retry.MultiException;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
|
import org.apache.log4j.Level;
|
||||||
import org.junit.Assert;
|
import org.junit.Assert;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
@ -47,6 +50,11 @@ public class TestRequestHedgingProxyProvider {
|
|||||||
private URI nnUri;
|
private URI nnUri;
|
||||||
private String ns;
|
private String ns;
|
||||||
|
|
||||||
|
@BeforeClass
|
||||||
|
public static void setupClass() throws Exception {
|
||||||
|
GenericTestUtils.setLogLevel(RequestHedgingProxyProvider.LOG, Level.TRACE);
|
||||||
|
}
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() throws URISyntaxException {
|
public void setup() throws URISyntaxException {
|
||||||
ns = "mycluster-" + Time.monotonicNow();
|
ns = "mycluster-" + Time.monotonicNow();
|
||||||
@ -66,13 +74,19 @@ public void setup() throws URISyntaxException {
|
|||||||
@Test
|
@Test
|
||||||
public void testHedgingWhenOneFails() throws Exception {
|
public void testHedgingWhenOneFails() throws Exception {
|
||||||
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
|
final NamenodeProtocols goodMock = Mockito.mock(NamenodeProtocols.class);
|
||||||
Mockito.when(goodMock.getStats()).thenReturn(new long[] {1});
|
Mockito.when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
|
||||||
|
@Override
|
||||||
|
public long[] answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
return new long[]{1};
|
||||||
|
}
|
||||||
|
});
|
||||||
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
|
final NamenodeProtocols badMock = Mockito.mock(NamenodeProtocols.class);
|
||||||
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
|
Mockito.when(badMock.getStats()).thenThrow(new IOException("Bad mock !!"));
|
||||||
|
|
||||||
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
RequestHedgingProxyProvider<NamenodeProtocols> provider =
|
||||||
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
|
new RequestHedgingProxyProvider<>(conf, nnUri, NamenodeProtocols.class,
|
||||||
createFactory(goodMock, badMock));
|
createFactory(badMock, goodMock));
|
||||||
long[] stats = provider.getProxy().proxy.getStats();
|
long[] stats = provider.getProxy().proxy.getStats();
|
||||||
Assert.assertTrue(stats.length == 1);
|
Assert.assertTrue(stats.length == 1);
|
||||||
Mockito.verify(badMock).getStats();
|
Mockito.verify(badMock).getStats();
|
||||||
|
Loading…
Reference in New Issue
Block a user