HADOOP-8204. TestHealthMonitor fails occasionally. Contributed by Todd Lipcon.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305199 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Todd Lipcon 2012-03-26 01:57:03 +00:00
parent 840df19cee
commit 1d5e7dde95
4 changed files with 50 additions and 83 deletions

View File

@ -279,6 +279,8 @@ Release 0.23.3 - UNRELEASED
HADOOP-8159. NetworkTopology: getLeaf should check for invalid topologies. HADOOP-8159. NetworkTopology: getLeaf should check for invalid topologies.
(Colin Patrick McCabe via eli) (Colin Patrick McCabe via eli)
HADOOP-8204. TestHealthMonitor fails occasionally (todd)
BREAKDOWN OF HADOOP-7454 SUBTASKS BREAKDOWN OF HADOOP-7454 SUBTASKS
HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh)

View File

@ -18,13 +18,10 @@
package org.apache.hadoop.ha; package org.apache.hadoop.ha;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress;
import java.util.Collections; import java.util.Collections;
import java.util.LinkedList; import java.util.LinkedList;
import java.util.List; import java.util.List;
import javax.net.SocketFactory;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -32,9 +29,7 @@
import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB;
import org.apache.hadoop.ipc.RPC; import org.apache.hadoop.ipc.RPC;
import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.util.Daemon; import org.apache.hadoop.util.Daemon;
import com.google.common.base.Preconditions; import com.google.common.base.Preconditions;
@ -64,8 +59,8 @@ class HealthMonitor {
/** The connected proxy */ /** The connected proxy */
private HAServiceProtocol proxy; private HAServiceProtocol proxy;
/** The address running the HA Service */ /** The HA service to monitor */
private final InetSocketAddress addrToMonitor; private final HAServiceTarget targetToMonitor;
private final Configuration conf; private final Configuration conf;
@ -109,9 +104,9 @@ enum State {
} }
HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) { HealthMonitor(Configuration conf, HAServiceTarget target) {
this.targetToMonitor = target;
this.conf = conf; this.conf = conf;
this.addrToMonitor = addrToMonitor;
this.sleepAfterDisconnectMillis = conf.getLong( this.sleepAfterDisconnectMillis = conf.getLong(
HA_HM_SLEEP_AFTER_DISCONNECT_KEY, HA_HM_SLEEP_AFTER_DISCONNECT_KEY,
@ -170,7 +165,7 @@ private void tryConnect() {
proxy = createProxy(); proxy = createProxy();
} }
} catch (IOException e) { } catch (IOException e) {
LOG.warn("Could not connect to local service at " + addrToMonitor + LOG.warn("Could not connect to local service at " + targetToMonitor +
": " + e.getMessage()); ": " + e.getMessage());
proxy = null; proxy = null;
enterState(State.SERVICE_NOT_RESPONDING); enterState(State.SERVICE_NOT_RESPONDING);
@ -181,10 +176,7 @@ private void tryConnect() {
* Connect to the service to be monitored. Stubbed out for easier testing. * Connect to the service to be monitored. Stubbed out for easier testing.
*/ */
protected HAServiceProtocol createProxy() throws IOException { protected HAServiceProtocol createProxy() throws IOException {
SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf); return targetToMonitor.getProxy(conf, rpcTimeout);
return new HAServiceProtocolClientSideTranslatorPB(
addrToMonitor,
conf, socketFactory, rpcTimeout);
} }
private void doHealthChecks() throws InterruptedException { private void doHealthChecks() throws InterruptedException {
@ -200,7 +192,7 @@ private void doHealthChecks() throws InterruptedException {
enterState(State.SERVICE_UNHEALTHY); enterState(State.SERVICE_UNHEALTHY);
} catch (Throwable t) { } catch (Throwable t) {
LOG.warn("Transport-level exception trying to monitor health of " + LOG.warn("Transport-level exception trying to monitor health of " +
addrToMonitor + ": " + t.getLocalizedMessage()); targetToMonitor + ": " + t.getLocalizedMessage());
RPC.stopProxy(proxy); RPC.stopProxy(proxy);
proxy = null; proxy = null;
enterState(State.SERVICE_NOT_RESPONDING); enterState(State.SERVICE_NOT_RESPONDING);
@ -258,7 +250,7 @@ void start() {
private class MonitorDaemon extends Daemon { private class MonitorDaemon extends Daemon {
private MonitorDaemon() { private MonitorDaemon() {
super(); super();
setName("Health Monitor for " + addrToMonitor); setName("Health Monitor for " + targetToMonitor);
setUncaughtExceptionHandler(new UncaughtExceptionHandler() { setUncaughtExceptionHandler(new UncaughtExceptionHandler() {
@Override @Override
public void uncaughtException(Thread t, Throwable e) { public void uncaughtException(Thread t, Throwable e) {
@ -297,24 +289,4 @@ public void run() {
static interface Callback { static interface Callback {
void enteredState(State newState); void enteredState(State newState);
} }
/**
* Simple main() for testing.
*/
public static void main(String[] args) throws InterruptedException {
if (args.length != 1) {
System.err.println("Usage: " + HealthMonitor.class.getName() +
" <addr to monitor>");
System.exit(1);
}
Configuration conf = new Configuration();
String target = args[0];
InetSocketAddress addr = NetUtils.createSocketAddr(target);
HealthMonitor hm = new HealthMonitor(conf, addr);
hm.start();
hm.join();
}
} }

View File

@ -30,10 +30,12 @@
* a mock implementation. * a mock implementation.
*/ */
class DummyHAService extends HAServiceTarget { class DummyHAService extends HAServiceTarget {
HAServiceState state; volatile HAServiceState state;
HAServiceProtocol proxy; HAServiceProtocol proxy;
NodeFencer fencer; NodeFencer fencer;
InetSocketAddress address; InetSocketAddress address;
boolean isHealthy = true;
boolean actUnreachable = false;
DummyHAService(HAServiceState state, InetSocketAddress address) { DummyHAService(HAServiceState state, InetSocketAddress address) {
this.state = state; this.state = state;
@ -47,28 +49,41 @@ private HAServiceProtocol makeMock() {
@Override @Override
public void monitorHealth() throws HealthCheckFailedException, public void monitorHealth() throws HealthCheckFailedException,
AccessControlException, IOException { AccessControlException, IOException {
checkUnreachable();
if (!isHealthy) {
throw new HealthCheckFailedException("not healthy");
}
} }
@Override @Override
public void transitionToActive() throws ServiceFailedException, public void transitionToActive() throws ServiceFailedException,
AccessControlException, IOException { AccessControlException, IOException {
checkUnreachable();
state = HAServiceState.ACTIVE; state = HAServiceState.ACTIVE;
} }
@Override @Override
public void transitionToStandby() throws ServiceFailedException, public void transitionToStandby() throws ServiceFailedException,
AccessControlException, IOException { AccessControlException, IOException {
checkUnreachable();
state = HAServiceState.STANDBY; state = HAServiceState.STANDBY;
} }
@Override @Override
public HAServiceStatus getServiceStatus() throws IOException { public HAServiceStatus getServiceStatus() throws IOException {
checkUnreachable();
HAServiceStatus ret = new HAServiceStatus(state); HAServiceStatus ret = new HAServiceStatus(state);
if (state == HAServiceState.STANDBY) { if (state == HAServiceState.STANDBY) {
ret.setReadyToBecomeActive(); ret.setReadyToBecomeActive();
} }
return ret; return ret;
} }
private void checkUnreachable() throws IOException {
if (actUnreachable) {
throw new IOException("Connection refused (fake)");
}
}
}); });
} }

View File

@ -20,42 +20,31 @@
import static org.junit.Assert.*; import static org.junit.Assert.*;
import java.io.IOException; import java.io.IOException;
import java.net.InetSocketAddress; import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ha.HAServiceProtocol;
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
import org.apache.hadoop.ha.HealthCheckFailedException;
import org.apache.hadoop.ha.HealthMonitor.Callback; import org.apache.hadoop.ha.HealthMonitor.Callback;
import org.apache.hadoop.ha.HealthMonitor.State; import org.apache.hadoop.ha.HealthMonitor.State;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
import org.mockito.Mockito;
public class TestHealthMonitor { public class TestHealthMonitor {
private static final Log LOG = LogFactory.getLog( private static final Log LOG = LogFactory.getLog(
TestHealthMonitor.class); TestHealthMonitor.class);
/* bogus address to pass to constructor - never used */
private static final InetSocketAddress BOGUS_ADDR =
new InetSocketAddress(1);
private HAServiceProtocol mockProxy;
/** How many times has createProxy been called */ /** How many times has createProxy been called */
private volatile CountDownLatch createProxyLatch; private AtomicInteger createProxyCount = new AtomicInteger(0);
private volatile boolean throwOOMEOnCreate = false;
/** Should throw an IOE when trying to connect */
private volatile boolean shouldThrowOnCreateProxy = false;
private HealthMonitor hm; private HealthMonitor hm;
private DummyHAService svc;
@Before @Before
public void setupHM() throws InterruptedException, IOException { public void setupHM() throws InterruptedException, IOException {
Configuration conf = new Configuration(); Configuration conf = new Configuration();
@ -63,30 +52,21 @@ public void setupHM() throws InterruptedException, IOException {
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
mockProxy = Mockito.mock(HAServiceProtocol.class);
Mockito.doReturn(new HAServiceStatus(HAServiceState.ACTIVE))
.when(mockProxy).getServiceStatus();
hm = new HealthMonitor(conf, BOGUS_ADDR) { svc = new DummyHAService(HAServiceState.ACTIVE, null);
hm = new HealthMonitor(conf, svc) {
@Override @Override
protected HAServiceProtocol createProxy() throws IOException { protected HAServiceProtocol createProxy() throws IOException {
createProxyLatch.countDown(); createProxyCount.incrementAndGet();
if (shouldThrowOnCreateProxy) { if (throwOOMEOnCreate) {
throw new IOException("can't connect"); throw new OutOfMemoryError("oome");
} }
return mockProxy; return super.createProxy();
} }
}; };
createProxyLatch = new CountDownLatch(1);
LOG.info("Starting health monitor"); LOG.info("Starting health monitor");
hm.start(); hm.start();
LOG.info("Waiting for proxy to be created");
assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS));
createProxyLatch = null;
LOG.info("Waiting for HEALTHY signal"); LOG.info("Waiting for HEALTHY signal");
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
} }
@ -94,27 +74,26 @@ protected HAServiceProtocol createProxy() throws IOException {
@Test(timeout=15000) @Test(timeout=15000)
public void testMonitor() throws Exception { public void testMonitor() throws Exception {
LOG.info("Mocking bad health check, waiting for UNHEALTHY"); LOG.info("Mocking bad health check, waiting for UNHEALTHY");
Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) svc.isHealthy = false;
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY); waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY);
LOG.info("Returning to healthy state, waiting for HEALTHY"); LOG.info("Returning to healthy state, waiting for HEALTHY");
Mockito.doNothing().when(mockProxy).monitorHealth(); svc.isHealthy = true;
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
LOG.info("Returning an IOException, as if node went down"); LOG.info("Returning an IOException, as if node went down");
// should expect many rapid retries // should expect many rapid retries
createProxyLatch = new CountDownLatch(3); int countBefore = createProxyCount.get();
shouldThrowOnCreateProxy = true; svc.actUnreachable = true;
Mockito.doThrow(new IOException("Connection lost (fake)"))
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING); waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING);
assertTrue("Monitor should retry if createProxy throws an IOE",
createProxyLatch.await(1000, TimeUnit.MILLISECONDS)); // Should retry several times
while (createProxyCount.get() < countBefore + 3) {
Thread.sleep(10);
}
LOG.info("Returning to healthy state, waiting for HEALTHY"); LOG.info("Returning to healthy state, waiting for HEALTHY");
shouldThrowOnCreateProxy = false; svc.actUnreachable = false;
Mockito.doNothing().when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY);
hm.shutdown(); hm.shutdown();
@ -129,8 +108,8 @@ public void testMonitor() throws Exception {
@Test(timeout=15000) @Test(timeout=15000)
public void testHealthMonitorDies() throws Exception { public void testHealthMonitorDies() throws Exception {
LOG.info("Mocking RTE in health monitor, waiting for FAILED"); LOG.info("Mocking RTE in health monitor, waiting for FAILED");
Mockito.doThrow(new OutOfMemoryError()) throwOOMEOnCreate = true;
.when(mockProxy).monitorHealth(); svc.actUnreachable = true;
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
hm.shutdown(); hm.shutdown();
hm.join(); hm.join();
@ -151,8 +130,7 @@ public void enteredState(State newState) {
} }
}); });
LOG.info("Mocking bad health check, waiting for UNHEALTHY"); LOG.info("Mocking bad health check, waiting for UNHEALTHY");
Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) svc.isHealthy = false;
.when(mockProxy).monitorHealth();
waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED);
} }