HDFS-13388. RequestHedgingProxyProvider calls multiple configured NNs all the time. Contributed by Jinglun.
This commit is contained in:
parent
1c1ce63cda
commit
63803e7051
@ -79,6 +79,20 @@ public RequestHedgingInvocationHandler(
|
|||||||
public Object
|
public Object
|
||||||
invoke(Object proxy, final Method method, final Object[] args)
|
invoke(Object proxy, final Method method, final Object[] args)
|
||||||
throws Throwable {
|
throws Throwable {
|
||||||
|
if (currentUsedProxy != null) {
|
||||||
|
try {
|
||||||
|
Object retVal = method.invoke(currentUsedProxy.proxy, args);
|
||||||
|
LOG.debug("Invocation successful on [{}]",
|
||||||
|
currentUsedProxy.proxyInfo);
|
||||||
|
return retVal;
|
||||||
|
} catch (InvocationTargetException ex) {
|
||||||
|
Exception unwrappedException = unwrapInvocationTargetException(ex);
|
||||||
|
logProxyException(unwrappedException, currentUsedProxy.proxyInfo);
|
||||||
|
LOG.trace("Unsuccessful invocation on [{}]",
|
||||||
|
currentUsedProxy.proxyInfo);
|
||||||
|
throw unwrappedException;
|
||||||
|
}
|
||||||
|
}
|
||||||
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
|
Map<Future<Object>, ProxyInfo<T>> proxyMap = new HashMap<>();
|
||||||
int numAttempts = 0;
|
int numAttempts = 0;
|
||||||
|
|
||||||
|
@ -43,10 +43,15 @@
|
|||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.BeforeClass;
|
import org.junit.BeforeClass;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.test.GenericTestUtils.assertExceptionContains;
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
import org.mockito.Matchers;
|
import org.mockito.Matchers;
|
||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
import org.mockito.invocation.InvocationOnMock;
|
import org.mockito.invocation.InvocationOnMock;
|
||||||
import org.mockito.stubbing.Answer;
|
import org.mockito.stubbing.Answer;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
|
||||||
import com.google.common.collect.Lists;
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
@ -99,6 +104,80 @@ public long[] answer(InvocationOnMock invocation) throws Throwable {
|
|||||||
Mockito.verify(goodMock).getStats();
|
Mockito.verify(goodMock).getStats();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRequestNNAfterOneSuccess() throws Exception {
|
||||||
|
final AtomicInteger goodCount = new AtomicInteger(0);
|
||||||
|
final AtomicInteger badCount = new AtomicInteger(0);
|
||||||
|
final ClientProtocol goodMock = mock(ClientProtocol.class);
|
||||||
|
when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
|
||||||
|
@Override
|
||||||
|
public long[] answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
goodCount.incrementAndGet();
|
||||||
|
Thread.sleep(1000);
|
||||||
|
return new long[]{1};
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final ClientProtocol badMock = mock(ClientProtocol.class);
|
||||||
|
when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
|
||||||
|
@Override
|
||||||
|
public long[] answer(InvocationOnMock invocation) throws Throwable {
|
||||||
|
badCount.incrementAndGet();
|
||||||
|
throw new IOException("Bad mock !!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<ClientProtocol> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
|
||||||
|
createFactory(badMock, goodMock));
|
||||||
|
ClientProtocol proxy = provider.getProxy().proxy;
|
||||||
|
proxy.getStats();
|
||||||
|
assertEquals(1, goodCount.get());
|
||||||
|
assertEquals(1, badCount.get());
|
||||||
|
// We will only use the successful proxy after a successful invocation.
|
||||||
|
proxy.getStats();
|
||||||
|
assertEquals(2, goodCount.get());
|
||||||
|
assertEquals(1, badCount.get());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testExceptionInfo() throws Exception {
|
||||||
|
final ClientProtocol goodMock = mock(ClientProtocol.class);
|
||||||
|
when(goodMock.getStats()).thenAnswer(new Answer<long[]>() {
|
||||||
|
private boolean first = true;
|
||||||
|
@Override
|
||||||
|
public long[] answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
if (first) {
|
||||||
|
Thread.sleep(1000);
|
||||||
|
first = false;
|
||||||
|
return new long[] {1};
|
||||||
|
} else {
|
||||||
|
throw new IOException("Expected Exception Info");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
});
|
||||||
|
final ClientProtocol badMock = mock(ClientProtocol.class);
|
||||||
|
when(badMock.getStats()).thenAnswer(new Answer<long[]>() {
|
||||||
|
@Override
|
||||||
|
public long[] answer(InvocationOnMock invocation)
|
||||||
|
throws Throwable {
|
||||||
|
throw new IOException("Bad Mock! This is Standby!");
|
||||||
|
}
|
||||||
|
});
|
||||||
|
|
||||||
|
RequestHedgingProxyProvider<ClientProtocol> provider =
|
||||||
|
new RequestHedgingProxyProvider<>(conf, nnUri, ClientProtocol.class,
|
||||||
|
createFactory(badMock, goodMock));
|
||||||
|
ClientProtocol proxy = provider.getProxy().proxy;
|
||||||
|
proxy.getStats();
|
||||||
|
// Test getting the exception when the successful proxy encounters one.
|
||||||
|
try {
|
||||||
|
proxy.getStats();
|
||||||
|
} catch (IOException e) {
|
||||||
|
assertExceptionContains("Expected Exception Info", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testHedgingWhenOneIsSlow() throws Exception {
|
public void testHedgingWhenOneIsSlow() throws Exception {
|
||||||
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
|
final ClientProtocol goodMock = Mockito.mock(ClientProtocol.class);
|
||||||
|
Loading…
Reference in New Issue
Block a user