HADOOP-7888. TestFailoverProxy fails intermittently on trunk. Contributed by Jason Lowe.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1211728 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Aaron Myers 2011-12-08 01:21:24 +00:00
parent 58361d3f34
commit d9690b0922
3 changed files with 35 additions and 32 deletions

View File

@ -120,6 +120,9 @@ Trunk (unreleased changes)
HADOOP-7887. KerberosAuthenticatorHandler is not setting HADOOP-7887. KerberosAuthenticatorHandler is not setting
KerberosName name rules from configuration. (tucu) KerberosName name rules from configuration. (tucu)
HADOOP-7888. TestFailoverProxy fails intermittently on trunk. (Jason Lowe
via atm)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-7761. Improve the performance of raw comparisons. (todd) HADOOP-7761. Improve the performance of raw comparisons. (todd)

View File

@ -103,15 +103,12 @@ public Object invoke(Object proxy, Method method, Object[] args)
if (invocationAttemptFailoverCount == proxyProviderFailoverCount) { if (invocationAttemptFailoverCount == proxyProviderFailoverCount) {
proxyProvider.performFailover(currentProxy); proxyProvider.performFailover(currentProxy);
proxyProviderFailoverCount++; proxyProviderFailoverCount++;
currentProxy = proxyProvider.getProxy();
} else { } else {
LOG.warn("A failover has occurred since the start of this method" LOG.warn("A failover has occurred since the start of this method"
+ " invocation attempt."); + " invocation attempt.");
} }
} }
// The call to getProxy() could technically only be made in the event
// performFailover() is called, but it needs to be out here for the
// purpose of testing.
currentProxy = proxyProvider.getProxy();
invocationFailoverCount++; invocationFailoverCount++;
} }
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {

View File

@ -36,34 +36,18 @@ public static class FlipFlopProxyProvider implements FailoverProxyProvider {
private Object impl1; private Object impl1;
private Object impl2; private Object impl2;
private boolean latchEnabled = false;
private CountDownLatch getProxyLatch;
private int failoversOccurred = 0; private int failoversOccurred = 0;
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl, public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
Object standbyImpl, int getProxyCountDown) { Object standbyImpl) {
this.iface = iface; this.iface = iface;
this.impl1 = activeImpl; this.impl1 = activeImpl;
this.impl2 = standbyImpl; this.impl2 = standbyImpl;
currentlyActive = impl1; currentlyActive = impl1;
getProxyLatch = new CountDownLatch(getProxyCountDown);
}
public FlipFlopProxyProvider(Class<?> iface, Object activeImpl,
Object standbyImpl) {
this(iface, activeImpl, standbyImpl, 0);
} }
@Override @Override
public Object getProxy() { public Object getProxy() {
if (latchEnabled) {
getProxyLatch.countDown();
try {
getProxyLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
}
return currentlyActive; return currentlyActive;
} }
@ -83,10 +67,6 @@ public void close() throws IOException {
// Nothing to do. // Nothing to do.
} }
public void setLatchEnabled(boolean latchEnabled) {
this.latchEnabled = latchEnabled;
}
public int getFailoversOccurred() { public int getFailoversOccurred() {
return failoversOccurred; return failoversOccurred;
} }
@ -214,6 +194,32 @@ public void testFailoverOnNetworkExceptionIdempotentOperation()
assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent()); assertEquals("impl2", unreliable.succeedsOnceThenFailsReturningStringIdempotent());
} }
private static class SynchronizedUnreliableImplementation extends UnreliableImplementation {
private CountDownLatch methodLatch;
public SynchronizedUnreliableImplementation(String identifier,
TypeOfExceptionToFailWith exceptionToFailWith, int threadCount) {
super(identifier, exceptionToFailWith);
methodLatch = new CountDownLatch(threadCount);
}
@Override
public String failsIfIdentifierDoesntMatch(String identifier)
throws UnreliableException, StandbyException, IOException {
// Wait until all threads are trying to invoke this method
methodLatch.countDown();
try {
methodLatch.await();
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
return super.failsIfIdentifierDoesntMatch(identifier);
}
}
private static class ConcurrentMethodThread extends Thread { private static class ConcurrentMethodThread extends Thread {
private UnreliableInterface unreliable; private UnreliableInterface unreliable;
@ -240,11 +246,11 @@ public void run() {
public void testConcurrentMethodFailures() throws InterruptedException { public void testConcurrentMethodFailures() throws InterruptedException {
FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider( FlipFlopProxyProvider proxyProvider = new FlipFlopProxyProvider(
UnreliableInterface.class, UnreliableInterface.class,
new UnreliableImplementation("impl1", new SynchronizedUnreliableImplementation("impl1",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION), TypeOfExceptionToFailWith.STANDBY_EXCEPTION,
2),
new UnreliableImplementation("impl2", new UnreliableImplementation("impl2",
TypeOfExceptionToFailWith.STANDBY_EXCEPTION), TypeOfExceptionToFailWith.STANDBY_EXCEPTION));
2);
final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy final UnreliableInterface unreliable = (UnreliableInterface)RetryProxy
.create(UnreliableInterface.class, proxyProvider, .create(UnreliableInterface.class, proxyProvider,
@ -253,9 +259,6 @@ public void testConcurrentMethodFailures() throws InterruptedException {
ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable); ConcurrentMethodThread t1 = new ConcurrentMethodThread(unreliable);
ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable); ConcurrentMethodThread t2 = new ConcurrentMethodThread(unreliable);
// Getting a proxy will now wait on a latch.
proxyProvider.setLatchEnabled(true);
t1.start(); t1.start();
t2.start(); t2.start();
t1.join(); t1.join();