diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index f0b9b945c0..00715abda9 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -279,6 +279,8 @@ Release 0.23.3 - UNRELEASED HADOOP-8159. NetworkTopology: getLeaf should check for invalid topologies. (Colin Patrick McCabe via eli) + HADOOP-8204. TestHealthMonitor fails occasionally (todd) + BREAKDOWN OF HADOOP-7454 SUBTASKS HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index 73e88b5e2c..b60910fe17 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -18,13 +18,10 @@ package org.apache.hadoop.ha; import java.io.IOException; -import java.net.InetSocketAddress; import java.util.Collections; import java.util.LinkedList; import java.util.List; -import javax.net.SocketFactory; - import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; @@ -32,9 +29,7 @@ import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HealthCheckFailedException; -import org.apache.hadoop.ha.protocolPB.HAServiceProtocolClientSideTranslatorPB; import org.apache.hadoop.ipc.RPC; -import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.util.Daemon; import com.google.common.base.Preconditions; @@ -64,8 +59,8 @@ class HealthMonitor { /** The connected proxy */ private HAServiceProtocol proxy; - /** The address running the HA Service */ - private final InetSocketAddress addrToMonitor; + /** The HA service to monitor */ + private final HAServiceTarget targetToMonitor; private final Configuration conf; @@ -109,9 +104,9 @@ enum State { } - HealthMonitor(Configuration conf, InetSocketAddress addrToMonitor) { + HealthMonitor(Configuration conf, HAServiceTarget target) { + this.targetToMonitor = target; this.conf = conf; - this.addrToMonitor = addrToMonitor; this.sleepAfterDisconnectMillis = conf.getLong( HA_HM_SLEEP_AFTER_DISCONNECT_KEY, @@ -170,7 +165,7 @@ private void tryConnect() { proxy = createProxy(); } } catch (IOException e) { - LOG.warn("Could not connect to local service at " + addrToMonitor + + LOG.warn("Could not connect to local service at " + targetToMonitor + ": " + e.getMessage()); proxy = null; enterState(State.SERVICE_NOT_RESPONDING); @@ -181,10 +176,7 @@ private void tryConnect() { * Connect to the service to be monitored. Stubbed out for easier testing. */ protected HAServiceProtocol createProxy() throws IOException { - SocketFactory socketFactory = NetUtils.getDefaultSocketFactory(conf); - return new HAServiceProtocolClientSideTranslatorPB( - addrToMonitor, - conf, socketFactory, rpcTimeout); + return targetToMonitor.getProxy(conf, rpcTimeout); } private void doHealthChecks() throws InterruptedException { @@ -200,7 +192,7 @@ private void doHealthChecks() throws InterruptedException { enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { LOG.warn("Transport-level exception trying to monitor health of " + - addrToMonitor + ": " + t.getLocalizedMessage()); + targetToMonitor + ": " + t.getLocalizedMessage()); RPC.stopProxy(proxy); proxy = null; enterState(State.SERVICE_NOT_RESPONDING); @@ -258,7 +250,7 @@ void start() { private class MonitorDaemon extends Daemon { private MonitorDaemon() { super(); - setName("Health Monitor for " + addrToMonitor); + setName("Health Monitor for " + targetToMonitor); setUncaughtExceptionHandler(new UncaughtExceptionHandler() { @Override public void uncaughtException(Thread t, Throwable e) { @@ -297,24 +289,4 @@ public void run() { static interface Callback { void enteredState(State newState); } - - /** - * Simple main() for testing. - */ - public static void main(String[] args) throws InterruptedException { - if (args.length != 1) { - System.err.println("Usage: " + HealthMonitor.class.getName() + - " "); - System.exit(1); - } - Configuration conf = new Configuration(); - - String target = args[0]; - InetSocketAddress addr = NetUtils.createSocketAddr(target); - - HealthMonitor hm = new HealthMonitor(conf, addr); - hm.start(); - hm.join(); - } - } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index 69c4a6fde4..075117548c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -30,10 +30,12 @@ * a mock implementation. */ class DummyHAService extends HAServiceTarget { - HAServiceState state; + volatile HAServiceState state; HAServiceProtocol proxy; NodeFencer fencer; InetSocketAddress address; + boolean isHealthy = true; + boolean actUnreachable = false; DummyHAService(HAServiceState state, InetSocketAddress address) { this.state = state; @@ -47,28 +49,41 @@ private HAServiceProtocol makeMock() { @Override public void monitorHealth() throws HealthCheckFailedException, AccessControlException, IOException { + checkUnreachable(); + if (!isHealthy) { + throw new HealthCheckFailedException("not healthy"); + } } @Override public void transitionToActive() throws ServiceFailedException, AccessControlException, IOException { + checkUnreachable(); state = HAServiceState.ACTIVE; } @Override public void transitionToStandby() throws ServiceFailedException, AccessControlException, IOException { + checkUnreachable(); state = HAServiceState.STANDBY; } @Override public HAServiceStatus getServiceStatus() throws IOException { + checkUnreachable(); HAServiceStatus ret = new HAServiceStatus(state); if (state == HAServiceState.STANDBY) { ret.setReadyToBecomeActive(); } return ret; } + + private void checkUnreachable() throws IOException { + if (actUnreachable) { + throw new IOException("Connection refused (fake)"); + } + } }); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java index 3a966c2a81..4b67fa645c 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestHealthMonitor.java @@ -20,41 +20,30 @@ import static org.junit.Assert.*; import java.io.IOException; -import java.net.InetSocketAddress; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicInteger; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; -import org.apache.hadoop.ha.HAServiceProtocol; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; -import org.apache.hadoop.ha.HealthCheckFailedException; import org.apache.hadoop.ha.HealthMonitor.Callback; import org.apache.hadoop.ha.HealthMonitor.State; import org.junit.Before; import org.junit.Test; -import org.mockito.Mockito; public class TestHealthMonitor { private static final Log LOG = LogFactory.getLog( TestHealthMonitor.class); - /* bogus address to pass to constructor - never used */ - private static final InetSocketAddress BOGUS_ADDR = - new InetSocketAddress(1); - - private HAServiceProtocol mockProxy; - /** How many times has createProxy been called */ - private volatile CountDownLatch createProxyLatch; - - /** Should throw an IOE when trying to connect */ - private volatile boolean shouldThrowOnCreateProxy = false; + private AtomicInteger createProxyCount = new AtomicInteger(0); + private volatile boolean throwOOMEOnCreate = false; private HealthMonitor hm; + + private DummyHAService svc; @Before public void setupHM() throws InterruptedException, IOException { @@ -63,30 +52,21 @@ public void setupHM() throws InterruptedException, IOException { conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - mockProxy = Mockito.mock(HAServiceProtocol.class); - Mockito.doReturn(new HAServiceStatus(HAServiceState.ACTIVE)) - .when(mockProxy).getServiceStatus(); - hm = new HealthMonitor(conf, BOGUS_ADDR) { + svc = new DummyHAService(HAServiceState.ACTIVE, null); + hm = new HealthMonitor(conf, svc) { @Override protected HAServiceProtocol createProxy() throws IOException { - createProxyLatch.countDown(); - if (shouldThrowOnCreateProxy) { - throw new IOException("can't connect"); + createProxyCount.incrementAndGet(); + if (throwOOMEOnCreate) { + throw new OutOfMemoryError("oome"); } - return mockProxy; + return super.createProxy(); } }; - - createProxyLatch = new CountDownLatch(1); - LOG.info("Starting health monitor"); hm.start(); - LOG.info("Waiting for proxy to be created"); - assertTrue(createProxyLatch.await(2000, TimeUnit.MILLISECONDS)); - createProxyLatch = null; - LOG.info("Waiting for HEALTHY signal"); waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); } @@ -94,27 +74,26 @@ protected HAServiceProtocol createProxy() throws IOException { @Test(timeout=15000) public void testMonitor() throws Exception { LOG.info("Mocking bad health check, waiting for UNHEALTHY"); - Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) - .when(mockProxy).monitorHealth(); + svc.isHealthy = false; waitForState(hm, HealthMonitor.State.SERVICE_UNHEALTHY); LOG.info("Returning to healthy state, waiting for HEALTHY"); - Mockito.doNothing().when(mockProxy).monitorHealth(); + svc.isHealthy = true; waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); LOG.info("Returning an IOException, as if node went down"); // should expect many rapid retries - createProxyLatch = new CountDownLatch(3); - shouldThrowOnCreateProxy = true; - Mockito.doThrow(new IOException("Connection lost (fake)")) - .when(mockProxy).monitorHealth(); + int countBefore = createProxyCount.get(); + svc.actUnreachable = true; waitForState(hm, HealthMonitor.State.SERVICE_NOT_RESPONDING); - assertTrue("Monitor should retry if createProxy throws an IOE", - createProxyLatch.await(1000, TimeUnit.MILLISECONDS)); + + // Should retry several times + while (createProxyCount.get() < countBefore + 3) { + Thread.sleep(10); + } LOG.info("Returning to healthy state, waiting for HEALTHY"); - shouldThrowOnCreateProxy = false; - Mockito.doNothing().when(mockProxy).monitorHealth(); + svc.actUnreachable = false; waitForState(hm, HealthMonitor.State.SERVICE_HEALTHY); hm.shutdown(); @@ -129,8 +108,8 @@ public void testMonitor() throws Exception { @Test(timeout=15000) public void testHealthMonitorDies() throws Exception { LOG.info("Mocking RTE in health monitor, waiting for FAILED"); - Mockito.doThrow(new OutOfMemoryError()) - .when(mockProxy).monitorHealth(); + throwOOMEOnCreate = true; + svc.actUnreachable = true; waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); hm.shutdown(); hm.join(); @@ -151,8 +130,7 @@ public void enteredState(State newState) { } }); LOG.info("Mocking bad health check, waiting for UNHEALTHY"); - Mockito.doThrow(new HealthCheckFailedException("Fake health check failure")) - .when(mockProxy).monitorHealth(); + svc.isHealthy = false; waitForState(hm, HealthMonitor.State.HEALTH_MONITOR_FAILED); }