From 543701387fc0054082a17da15e85e9e8f8025801 Mon Sep 17 00:00:00 2001 From: Todd Lipcon Date: Fri, 30 Mar 2012 20:30:47 +0000 Subject: [PATCH] HADOOP-8228. Auto HA: Refactor tests and add stress tests. Contributed by Todd Lipcon. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1307599 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.HDFS-3042.txt | 1 + .../hadoop/ha/ActiveStandbyElector.java | 10 +- .../hadoop/ha/ZKFailoverController.java | 7 +- .../org/apache/hadoop/ha/DummyHAService.java | 40 +- .../apache/hadoop/ha/DummySharedResource.java | 52 +++ .../org/apache/hadoop/ha/MiniZKFCCluster.java | 279 ++++++++++++++ .../hadoop/ha/TestZKFailoverController.java | 364 +++++------------- .../ha/TestZKFailoverControllerStress.java | 178 +++++++++ 8 files changed, 654 insertions(+), 277 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt index a6ff9074e4..d180932ee3 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt @@ -6,3 +6,4 @@ branch is merged. HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd) +HADOOP-8228. Auto HA: Refactor tests and add stress tests. (todd) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index 46023c5c3a..16c37a7972 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -140,7 +140,7 @@ public interface ActiveStandbyElectorCallback { public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); - private static final int NUM_RETRIES = 3; + static int NUM_RETRIES = 3; private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static enum ConnectionState { @@ -662,8 +662,12 @@ void allowSessionReestablishmentForTests() { } @VisibleForTesting - long getZKSessionIdForTests() { - return zkClient.getSessionId(); + synchronized long getZKSessionIdForTests() { + if (zkClient != null) { + return zkClient.getSessionId(); + } else { + return -1; + } } @VisibleForTesting diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 98c3f0d5c1..917c9d32b4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -146,7 +146,12 @@ private int doRun(String[] args) } initHM(); - mainLoop(); + try { + mainLoop(); + } finally { + healthMonitor.shutdown(); + healthMonitor.join(); + } return 0; } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index a8cd3d644f..3e0bb9eb12 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -22,6 +22,8 @@ import java.net.InetSocketAddress; import java.util.ArrayList; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.security.AccessControlException; @@ -34,6 +36,7 @@ * a mock implementation. */ class DummyHAService extends HAServiceTarget { + public static final Log LOG = LogFactory.getLog(DummyHAService.class); volatile HAServiceState state; HAServiceProtocol proxy; NodeFencer fencer; @@ -42,13 +45,21 @@ class DummyHAService extends HAServiceTarget { boolean actUnreachable = false; boolean failToBecomeActive; + DummySharedResource sharedResource; + static ArrayList instances = Lists.newArrayList(); int index; DummyHAService(HAServiceState state, InetSocketAddress address) { this.state = state; this.proxy = makeMock(); - this.fencer = Mockito.mock(NodeFencer.class); + try { + Configuration conf = new Configuration(); + conf.set(NodeFencer.CONF_METHODS_KEY, DummyFencer.class.getName()); + this.fencer = Mockito.spy(NodeFencer.create(conf)); + } catch (BadFencingConfigurationException e) { + throw new RuntimeException(e); + } this.address = address; synchronized (instances) { instances.add(this); @@ -56,6 +67,10 @@ class DummyHAService extends HAServiceTarget { } } + public void setSharedResource(DummySharedResource rsrc) { + this.sharedResource = rsrc; + } + private HAServiceProtocol makeMock() { return Mockito.spy(new MockHAProtocolImpl()); } @@ -107,7 +122,9 @@ public void transitionToActive() throws ServiceFailedException, if (failToBecomeActive) { throw new ServiceFailedException("injected failure"); } - + if (sharedResource != null) { + sharedResource.take(DummyHAService.this); + } state = HAServiceState.ACTIVE; } @@ -115,6 +132,9 @@ public void transitionToActive() throws ServiceFailedException, public void transitionToStandby() throws ServiceFailedException, AccessControlException, IOException { checkUnreachable(); + if (sharedResource != null) { + sharedResource.release(DummyHAService.this); + } state = HAServiceState.STANDBY; } @@ -138,4 +158,20 @@ private void checkUnreachable() throws IOException { public void close() throws IOException { } } + + public static class DummyFencer implements FenceMethod { + + public void checkArgs(String args) throws BadFencingConfigurationException { + } + + @Override + public boolean tryFence(HAServiceTarget target, String args) + throws BadFencingConfigurationException { + LOG.info("tryFence(" + target + ")"); + DummyHAService svc = (DummyHAService)target; + svc.sharedResource.release(svc); + return true; + } + } + } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java new file mode 100644 index 0000000000..a7cf41dd99 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummySharedResource.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import org.junit.Assert; + +/** + * A fake shared resource, for use in automatic failover testing. + * This simulates a real shared resource like a shared edit log. + * When the {@link DummyHAService} instances change state or get + * fenced, they notify the shared resource, which asserts that + * we never have two HA services who think they're holding the + * resource at the same time. + */ +public class DummySharedResource { + private DummyHAService holder = null; + private int violations = 0; + + public synchronized void take(DummyHAService newHolder) { + if (holder == null || holder == newHolder) { + holder = newHolder; + } else { + violations++; + throw new IllegalStateException("already held by: " + holder); + } + } + + public synchronized void release(DummyHAService oldHolder) { + if (holder == oldHolder) { + holder = null; + } + } + + public synchronized void assertNoViolations() { + Assert.assertEquals(0, violations); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java new file mode 100644 index 0000000000..fa8ea10b7c --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/MiniZKFCCluster.java @@ -0,0 +1,279 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import static org.junit.Assert.assertArrayEquals; +import static org.junit.Assert.assertEquals; + +import java.net.InetSocketAddress; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.ZooKeeperServer; + +import com.google.common.base.Preconditions; +import com.google.common.primitives.Ints; + +/** + * Harness for starting two dummy ZK FailoverControllers, associated with + * DummyHAServices. This harness starts two such ZKFCs, designated by + * indexes 0 and 1, and provides utilities for building tests around them. + */ +public class MiniZKFCCluster { + private final TestContext ctx; + private final ZooKeeperServer zks; + + private DummyHAService svcs[]; + private DummyZKFCThread thrs[]; + private Configuration conf; + + private DummySharedResource sharedResource = new DummySharedResource(); + + private static final Log LOG = LogFactory.getLog(MiniZKFCCluster.class); + + public MiniZKFCCluster(Configuration conf, ZooKeeperServer zks) { + this.conf = conf; + // Fast check interval so tests run faster + conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); + svcs = new DummyHAService[2]; + svcs[0] = new DummyHAService(HAServiceState.INITIALIZING, + new InetSocketAddress("svc1", 1234)); + svcs[0].setSharedResource(sharedResource); + svcs[1] = new DummyHAService(HAServiceState.INITIALIZING, + new InetSocketAddress("svc2", 1234)); + svcs[1].setSharedResource(sharedResource); + + this.ctx = new TestContext(); + this.zks = zks; + } + + /** + * Set up two services and their failover controllers. svc1 is started + * first, so that it enters ACTIVE state, and then svc2 is started, + * which enters STANDBY + */ + public void start() throws Exception { + // Format the base dir, should succeed + thrs = new DummyZKFCThread[2]; + thrs[0] = new DummyZKFCThread(ctx, svcs[0]); + assertEquals(0, thrs[0].zkfc.run(new String[]{"-formatZK"})); + ctx.addThread(thrs[0]); + thrs[0].start(); + + LOG.info("Waiting for svc0 to enter active state"); + waitForHAState(0, HAServiceState.ACTIVE); + + LOG.info("Adding svc1"); + thrs[1] = new DummyZKFCThread(ctx, svcs[1]); + thrs[1].start(); + waitForHAState(1, HAServiceState.STANDBY); + } + + /** + * Stop the services. + * @throws Exception if either of the services had encountered a fatal error + */ + public void stop() throws Exception { + for (DummyZKFCThread thr : thrs) { + if (thr != null) { + thr.interrupt(); + } + } + if (ctx != null) { + ctx.stop(); + } + sharedResource.assertNoViolations(); + } + + /** + * @return the TestContext implementation used internally. This allows more + * threads to be added to the context, etc. + */ + public TestContext getTestContext() { + return ctx; + } + + public DummyHAService getService(int i) { + return svcs[i]; + } + + public ActiveStandbyElector getElector(int i) { + return thrs[i].zkfc.getElectorForTests(); + } + + public void setHealthy(int idx, boolean healthy) { + svcs[idx].isHealthy = healthy; + } + + public void setFailToBecomeActive(int idx, boolean doFail) { + svcs[idx].failToBecomeActive = doFail; + } + + public void setUnreachable(int idx, boolean unreachable) { + svcs[idx].actUnreachable = unreachable; + } + + /** + * Wait for the given HA service to enter the given HA state. + */ + public void waitForHAState(int idx, HAServiceState state) + throws Exception { + DummyHAService svc = getService(idx); + while (svc.state != state) { + ctx.checkException(); + Thread.sleep(50); + } + } + + /** + * Wait for the ZKFC to be notified of a change in health state. + */ + public void waitForHealthState(int idx, State state) + throws Exception { + ZKFailoverController zkfc = thrs[idx].zkfc; + while (zkfc.getLastHealthState() != state) { + ctx.checkException(); + Thread.sleep(50); + } + } + + /** + * Wait for the given elector to enter the given elector state. + * @param idx the service index (0 or 1) + * @param state the state to wait for + * @throws Exception if it times out, or an exception occurs on one + * of the ZKFC threads while waiting. + */ + public void waitForElectorState(int idx, + ActiveStandbyElector.State state) throws Exception { + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + getElector(idx), state); + } + + + + /** + * Expire the ZK session of the given service. This requires + * (and asserts) that the given service be the current active. + * @throws NoNodeException if no service holds the lock + */ + public void expireActiveLockHolder(int idx) + throws NoNodeException { + Stat stat = new Stat(); + byte[] data = zks.getZKDatabase().getData( + ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" + + ActiveStandbyElector.LOCK_FILENAME, stat, null); + + assertArrayEquals(Ints.toByteArray(svcs[idx].index), data); + long session = stat.getEphemeralOwner(); + LOG.info("Expiring svc " + idx + "'s zookeeper session " + session); + zks.closeSession(session); + } + + + /** + * Wait for the given HA service to become the active lock holder. + * If the passed svc is null, waits for there to be no active + * lock holder. + */ + public void waitForActiveLockHolder(Integer idx) + throws Exception { + DummyHAService svc = idx == null ? null : svcs[idx]; + ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks, + ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT, + (idx == null) ? null : Ints.toByteArray(svc.index)); + } + + + /** + * Expires the ZK session associated with service 'fromIdx', and waits + * until service 'toIdx' takes over. + * @throws Exception if the target service does not become active + */ + public void expireAndVerifyFailover(int fromIdx, int toIdx) + throws Exception { + Preconditions.checkArgument(fromIdx != toIdx); + + getElector(fromIdx).preventSessionReestablishmentForTests(); + try { + expireActiveLockHolder(fromIdx); + + waitForHAState(fromIdx, HAServiceState.STANDBY); + waitForHAState(toIdx, HAServiceState.ACTIVE); + } finally { + getElector(fromIdx).allowSessionReestablishmentForTests(); + } + } + + /** + * Test-thread which runs a ZK Failover Controller corresponding + * to a given dummy service. + */ + private class DummyZKFCThread extends TestingThread { + private final DummyZKFC zkfc; + + public DummyZKFCThread(TestContext ctx, DummyHAService svc) { + super(ctx); + this.zkfc = new DummyZKFC(svc); + zkfc.setConf(conf); + } + + @Override + public void doWork() throws Exception { + try { + assertEquals(0, zkfc.run(new String[0])); + } catch (InterruptedException ie) { + // Interrupted by main thread, that's OK. + } + } + } + + static class DummyZKFC extends ZKFailoverController { + private final DummyHAService localTarget; + + public DummyZKFC(DummyHAService localTarget) { + this.localTarget = localTarget; + } + + @Override + protected byte[] targetToData(HAServiceTarget target) { + return Ints.toByteArray(((DummyHAService)target).index); + } + + @Override + protected HAServiceTarget dataToTarget(byte[] data) { + int index = Ints.fromByteArray(data); + return DummyHAService.getInstance(index); + } + + @Override + protected HAServiceTarget getLocalTarget() { + return localTarget; + } + } +} \ No newline at end of file diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java index d90b8d0e46..06808b3e1f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -17,36 +17,24 @@ */ package org.apache.hadoop.ha; -import static org.junit.Assert.*; +import static org.junit.Assert.assertEquals; import java.io.File; -import java.net.InetSocketAddress; import org.apache.commons.logging.impl.Log4JLogger; import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HealthMonitor.State; -import org.apache.hadoop.test.MultithreadedTestUtil; -import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; -import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; +import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC; import org.apache.log4j.Level; -import org.apache.zookeeper.KeeperException.NoNodeException; -import org.apache.zookeeper.data.Stat; -import org.apache.zookeeper.server.ZooKeeperServer; import org.apache.zookeeper.test.ClientBase; import org.junit.Before; import org.junit.Test; import org.mockito.Mockito; -import com.google.common.primitives.Ints; - public class TestZKFailoverController extends ClientBase { private Configuration conf; - private DummyHAService svc1; - private DummyHAService svc2; - private TestContext ctx; - private DummyZKFCThread thr1, thr2; + private MiniZKFCCluster cluster; static { ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL); @@ -63,49 +51,7 @@ public void setUp() throws Exception { public void setupConfAndServices() { conf = new Configuration(); conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort); - // Fast check interval so tests run faster - conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); - conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); - conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); - svc1 = new DummyHAService(HAServiceState.INITIALIZING, - new InetSocketAddress("svc1", 1234)); - svc2 = new DummyHAService(HAServiceState.INITIALIZING, - new InetSocketAddress("svc2", 1234)); - } - - /** - * Set up two services and their failover controllers. svc1 is started - * first, so that it enters ACTIVE state, and then svc2 is started, - * which enters STANDBY - */ - private void setupFCs() throws Exception { - // Format the base dir, should succeed - assertEquals(0, runFC(svc1, "-formatZK")); - - ctx = new MultithreadedTestUtil.TestContext(); - thr1 = new DummyZKFCThread(ctx, svc1); - ctx.addThread(thr1); - thr1.start(); - - LOG.info("Waiting for svc1 to enter active state"); - waitForHAState(svc1, HAServiceState.ACTIVE); - - LOG.info("Adding svc2"); - thr2 = new DummyZKFCThread(ctx, svc2); - thr2.start(); - waitForHAState(svc2, HAServiceState.STANDBY); - } - - private void stopFCs() throws Exception { - if (thr1 != null) { - thr1.interrupt(); - } - if (thr2 != null) { - thr2.interrupt(); - } - if (ctx != null) { - ctx.stop(); - } + this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory)); } /** @@ -114,20 +60,21 @@ private void stopFCs() throws Exception { */ @Test(timeout=15000) public void testFormatZK() throws Exception { + DummyHAService svc = cluster.getService(1); // Run without formatting the base dir, // should barf assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE, - runFC(svc1)); + runFC(svc)); // Format the base dir, should succeed - assertEquals(0, runFC(svc1, "-formatZK")); + assertEquals(0, runFC(svc, "-formatZK")); // Should fail to format if already formatted assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED, - runFC(svc1, "-formatZK", "-nonInteractive")); + runFC(svc, "-formatZK", "-nonInteractive")); // Unless '-force' is on - assertEquals(0, runFC(svc1, "-formatZK", "-force")); + assertEquals(0, runFC(svc, "-formatZK", "-force")); } /** @@ -136,14 +83,14 @@ public void testFormatZK() throws Exception { */ @Test(timeout=15000) public void testFencingMustBeConfigured() throws Exception { - svc1 = Mockito.spy(svc1); + DummyHAService svc = Mockito.spy(cluster.getService(0)); Mockito.doThrow(new BadFencingConfigurationException("no fencing")) - .when(svc1).checkFencingConfigured(); + .when(svc).checkFencingConfigured(); // Format the base dir, should succeed - assertEquals(0, runFC(svc1, "-formatZK")); + assertEquals(0, runFC(svc, "-formatZK")); // Try to run the actual FC, should fail without a fencer assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER, - runFC(svc1)); + runFC(svc)); } /** @@ -155,66 +102,50 @@ public void testFencingMustBeConfigured() throws Exception { @Test(timeout=15000) public void testAutoFailoverOnBadHealth() throws Exception { try { - setupFCs(); + cluster.start(); + DummyHAService svc1 = cluster.getService(1); - LOG.info("Faking svc1 unhealthy, should failover to svc2"); - svc1.isHealthy = false; - LOG.info("Waiting for svc1 to enter standby state"); - waitForHAState(svc1, HAServiceState.STANDBY); - waitForHAState(svc2, HAServiceState.ACTIVE); + LOG.info("Faking svc0 unhealthy, should failover to svc1"); + cluster.setHealthy(0, false); + + LOG.info("Waiting for svc0 to enter standby state"); + cluster.waitForHAState(0, HAServiceState.STANDBY); + cluster.waitForHAState(1, HAServiceState.ACTIVE); - LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " + + LOG.info("Allowing svc0 to be healthy again, making svc1 unreachable " + "and fail to gracefully go to standby"); - svc1.isHealthy = true; - svc2.actUnreachable = true; - - // Allow fencing to succeed - Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2)); - // Should fail back to svc1 at this point - waitForHAState(svc1, HAServiceState.ACTIVE); - // and fence svc2 - Mockito.verify(svc2.fencer).fence(Mockito.same(svc2)); + cluster.setUnreachable(1, true); + cluster.setHealthy(0, true); + + // Should fail back to svc0 at this point + cluster.waitForHAState(0, HAServiceState.ACTIVE); + // and fence svc1 + Mockito.verify(svc1.fencer).fence(Mockito.same(svc1)); } finally { - stopFCs(); + cluster.stop(); } } @Test(timeout=15000) public void testAutoFailoverOnLostZKSession() throws Exception { try { - setupFCs(); + cluster.start(); - // Expire svc1, it should fail over to svc2 - expireAndVerifyFailover(thr1, thr2); + // Expire svc0, it should fail over to svc1 + cluster.expireAndVerifyFailover(0, 1); - // Expire svc2, it should fail back to svc1 - expireAndVerifyFailover(thr2, thr1); + // Expire svc1, it should fail back to svc0 + cluster.expireAndVerifyFailover(1, 0); LOG.info("======= Running test cases second time to test " + "re-establishment ========="); - // Expire svc1, it should fail over to svc2 - expireAndVerifyFailover(thr1, thr2); + // Expire svc0, it should fail over to svc1 + cluster.expireAndVerifyFailover(0, 1); - // Expire svc2, it should fail back to svc1 - expireAndVerifyFailover(thr2, thr1); + // Expire svc1, it should fail back to svc0 + cluster.expireAndVerifyFailover(1, 0); } finally { - stopFCs(); - } - } - - private void expireAndVerifyFailover(DummyZKFCThread fromThr, - DummyZKFCThread toThr) throws Exception { - DummyHAService fromSvc = fromThr.zkfc.localTarget; - DummyHAService toSvc = toThr.zkfc.localTarget; - - fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests(); - try { - expireActiveLockHolder(fromSvc); - - waitForHAState(fromSvc, HAServiceState.STANDBY); - waitForHAState(toSvc, HAServiceState.ACTIVE); - } finally { - fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests(); + cluster.stop(); } } @@ -225,33 +156,32 @@ private void expireAndVerifyFailover(DummyZKFCThread fromThr, @Test(timeout=15000) public void testDontFailoverToUnhealthyNode() throws Exception { try { - setupFCs(); + cluster.start(); - // Make svc2 unhealthy, and wait for its FC to notice the bad health. - svc2.isHealthy = false; - waitForHealthState(thr2.zkfc, - HealthMonitor.State.SERVICE_UNHEALTHY); + // Make svc1 unhealthy, and wait for its FC to notice the bad health. + cluster.setHealthy(1, false); + cluster.waitForHealthState(1, HealthMonitor.State.SERVICE_UNHEALTHY); - // Expire svc1 - thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests(); + // Expire svc0 + cluster.getElector(0).preventSessionReestablishmentForTests(); try { - expireActiveLockHolder(svc1); + cluster.expireActiveLockHolder(0); - LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" + + LOG.info("Expired svc0's ZK session. Waiting a second to give svc1" + " a chance to take the lock, if it is ever going to."); Thread.sleep(1000); // Ensure that no one holds the lock. - waitForActiveLockHolder(null); + cluster.waitForActiveLockHolder(null); } finally { - LOG.info("Allowing svc1's elector to re-establish its connection"); - thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests(); + LOG.info("Allowing svc0's elector to re-establish its connection"); + cluster.getElector(0).allowSessionReestablishmentForTests(); } - // svc1 should get the lock again - waitForActiveLockHolder(svc1); + // svc0 should get the lock again + cluster.waitForActiveLockHolder(0); } finally { - stopFCs(); + cluster.stop(); } } @@ -262,36 +192,38 @@ public void testDontFailoverToUnhealthyNode() throws Exception { @Test(timeout=15000) public void testBecomingActiveFails() throws Exception { try { - setupFCs(); + cluster.start(); + DummyHAService svc1 = cluster.getService(1); - LOG.info("Making svc2 fail to become active"); - svc2.failToBecomeActive = true; + LOG.info("Making svc1 fail to become active"); + cluster.setFailToBecomeActive(1, true); - LOG.info("Faking svc1 unhealthy, should NOT successfully " + - "failover to svc2"); - svc1.isHealthy = false; - waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY); - waitForActiveLockHolder(null); + LOG.info("Faking svc0 unhealthy, should NOT successfully " + + "failover to svc1"); + cluster.setHealthy(0, false); + cluster.waitForHealthState(0, State.SERVICE_UNHEALTHY); + cluster.waitForActiveLockHolder(null); - Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce()) + + Mockito.verify(svc1.proxy, Mockito.timeout(2000).atLeastOnce()) .transitionToActive(); - waitForHAState(svc1, HAServiceState.STANDBY); - waitForHAState(svc2, HAServiceState.STANDBY); + cluster.waitForHAState(0, HAServiceState.STANDBY); + cluster.waitForHAState(1, HAServiceState.STANDBY); - LOG.info("Faking svc1 healthy again, should go back to svc1"); - svc1.isHealthy = true; - waitForHAState(svc1, HAServiceState.ACTIVE); - waitForHAState(svc2, HAServiceState.STANDBY); - waitForActiveLockHolder(svc1); + LOG.info("Faking svc0 healthy again, should go back to svc0"); + cluster.setHealthy(0, true); + cluster.waitForHAState(0, HAServiceState.ACTIVE); + cluster.waitForHAState(1, HAServiceState.STANDBY); + cluster.waitForActiveLockHolder(0); - // Ensure that we can fail back to thr2 once it it is able + // Ensure that we can fail back to svc1 once it it is able // to become active (e.g the admin has restarted it) - LOG.info("Allowing svc2 to become active, expiring svc1"); - svc2.failToBecomeActive = false; - expireAndVerifyFailover(thr1, thr2); + LOG.info("Allowing svc1 to become active, expiring svc0"); + svc1.failToBecomeActive = false; + cluster.expireAndVerifyFailover(0, 1); } finally { - stopFCs(); + cluster.stop(); } } @@ -303,27 +235,25 @@ public void testBecomingActiveFails() throws Exception { @Test(timeout=15000) public void testZooKeeperFailure() throws Exception { try { - setupFCs(); + cluster.start(); // Record initial ZK sessions - long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests(); - long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests(); + long session0 = cluster.getElector(0).getZKSessionIdForTests(); + long session1 = cluster.getElector(1).getZKSessionIdForTests(); LOG.info("====== Stopping ZK server"); stopServer(); waitForServerDown(hostPort, CONNECTION_TIMEOUT); LOG.info("====== Waiting for services to enter NEUTRAL mode"); - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr1.zkfc.getElectorForTests(), + cluster.waitForElectorState(0, ActiveStandbyElector.State.NEUTRAL); - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr2.zkfc.getElectorForTests(), + cluster.waitForElectorState(1, ActiveStandbyElector.State.NEUTRAL); LOG.info("====== Checking that the services didn't change HA state"); - assertEquals(HAServiceState.ACTIVE, svc1.state); - assertEquals(HAServiceState.STANDBY, svc2.state); + assertEquals(HAServiceState.ACTIVE, cluster.getService(0).state); + assertEquals(HAServiceState.STANDBY, cluster.getService(1).state); LOG.info("====== Restarting server"); startServer(); @@ -331,134 +261,26 @@ public void testZooKeeperFailure() throws Exception { // Nodes should go back to their original states, since they re-obtain // the same sessions. - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr1.zkfc.getElectorForTests(), - ActiveStandbyElector.State.ACTIVE); - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr2.zkfc.getElectorForTests(), - ActiveStandbyElector.State.STANDBY); + cluster.waitForElectorState(0, ActiveStandbyElector.State.ACTIVE); + cluster.waitForElectorState(1, ActiveStandbyElector.State.STANDBY); // Check HA states didn't change. - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr1.zkfc.getElectorForTests(), - ActiveStandbyElector.State.ACTIVE); - ActiveStandbyElectorTestUtil.waitForElectorState(ctx, - thr2.zkfc.getElectorForTests(), - ActiveStandbyElector.State.STANDBY); + cluster.waitForHAState(0, HAServiceState.ACTIVE); + cluster.waitForHAState(1, HAServiceState.STANDBY); + // Check they re-used the same sessions and didn't spuriously reconnect + assertEquals(session0, + cluster.getElector(0).getZKSessionIdForTests()); assertEquals(session1, - thr1.zkfc.getElectorForTests().getZKSessionIdForTests()); - assertEquals(session2, - thr2.zkfc.getElectorForTests().getZKSessionIdForTests()); + cluster.getElector(1).getZKSessionIdForTests()); } finally { - stopFCs(); + cluster.stop(); } } - /** - * Expire the ZK session of the given service. This requires - * (and asserts) that the given service be the current active. - * @throws NoNodeException if no service holds the lock - */ - private void expireActiveLockHolder(DummyHAService expectedActive) - throws NoNodeException { - ZooKeeperServer zks = getServer(serverFactory); - Stat stat = new Stat(); - byte[] data = zks.getZKDatabase().getData( - ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" + - ActiveStandbyElector.LOCK_FILENAME, stat, null); - - assertArrayEquals(Ints.toByteArray(expectedActive.index), data); - long session = stat.getEphemeralOwner(); - LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session); - zks.closeSession(session); - } - - /** - * Wait for the given HA service to enter the given HA state. - */ - private void waitForHAState(DummyHAService svc, HAServiceState state) - throws Exception { - while (svc.state != state) { - ctx.checkException(); - Thread.sleep(50); - } - } - - /** - * Wait for the ZKFC to be notified of a change in health state. - */ - private void waitForHealthState(DummyZKFC zkfc, State state) - throws Exception { - while (zkfc.getLastHealthState() != state) { - ctx.checkException(); - Thread.sleep(50); - } - } - - /** - * Wait for the given HA service to become the active lock holder. - * If the passed svc is null, waits for there to be no active - * lock holder. - */ - private void waitForActiveLockHolder(DummyHAService svc) - throws Exception { - ZooKeeperServer zks = getServer(serverFactory); - ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks, - ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT, - (svc == null) ? null : Ints.toByteArray(svc.index)); - } - - private int runFC(DummyHAService target, String ... args) throws Exception { DummyZKFC zkfc = new DummyZKFC(target); zkfc.setConf(conf); return zkfc.run(args); } - /** - * Test-thread which runs a ZK Failover Controller corresponding - * to a given dummy service. - */ - private class DummyZKFCThread extends TestingThread { - private final DummyZKFC zkfc; - - public DummyZKFCThread(TestContext ctx, DummyHAService svc) { - super(ctx); - this.zkfc = new DummyZKFC(svc); - zkfc.setConf(conf); - } - - @Override - public void doWork() throws Exception { - try { - assertEquals(0, zkfc.run(new String[0])); - } catch (InterruptedException ie) { - // Interrupted by main thread, that's OK. - } - } - } - - private static class DummyZKFC extends ZKFailoverController { - private final DummyHAService localTarget; - - public DummyZKFC(DummyHAService localTarget) { - this.localTarget = localTarget; - } - - @Override - protected byte[] targetToData(HAServiceTarget target) { - return Ints.toByteArray(((DummyHAService)target).index); - } - - @Override - protected HAServiceTarget dataToTarget(byte[] data) { - int index = Ints.fromByteArray(data); - return DummyHAService.getInstance(index); - } - - @Override - protected HAServiceTarget getLocalTarget() { - return localTarget; - } - } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java new file mode 100644 index 0000000000..9914d8f4f9 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverControllerStress.java @@ -0,0 +1,178 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.io.File; +import java.util.Random; +import java.util.Set; + +import javax.management.ObjectName; + +import org.apache.hadoop.conf.Configuration; +import org.apache.zookeeper.test.ClientBase; +import org.apache.zookeeper.test.JMXEnv; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; +import org.mockito.invocation.InvocationOnMock; +import org.mockito.stubbing.Answer; + +/** + * Stress test for ZKFailoverController. + * Starts multiple ZKFCs for dummy services, and then performs many automatic + * failovers. While doing so, ensures that a fake "shared resource" + * (simulating the shared edits dir) is only owned by one service at a time. + */ +public class TestZKFailoverControllerStress extends ClientBase { + + private static final int STRESS_RUNTIME_SECS = 30; + private static final int EXTRA_TIMEOUT_SECS = 10; + + private Configuration conf; + private MiniZKFCCluster cluster; + + @Override + public void setUp() throws Exception { + // build.test.dir is used by zookeeper + new File(System.getProperty("build.test.dir", "build")).mkdirs(); + super.setUp(); + } + + @Before + public void setupConfAndServices() throws Exception { + conf = new Configuration(); + conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort); + this.cluster = new MiniZKFCCluster(conf, getServer(serverFactory)); + cluster.start(); + } + + @After + public void stopCluster() throws Exception { + cluster.stop(); + } + + /** + * ZK seems to have a bug when we muck with its sessions + * behind its back, causing disconnects, etc. This bug + * ends up leaving JMX beans around at the end of the test, + * and ClientBase's teardown method will throw an exception + * if it finds JMX beans leaked. So, clear them out there + * to workaround the ZK bug. See ZOOKEEPER-1438. + */ + @After + public void clearZKJMX() throws Exception { + Set names = JMXEnv.ensureAll(); + for (ObjectName n : names) { + JMXEnv.conn().unregisterMBean(n); + } + } + + /** + * Simply fail back and forth between two services for the + * configured amount of time, via expiring their ZK sessions. + */ + @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000) + public void testExpireBackAndForth() throws Exception { + long st = System.currentTimeMillis(); + long runFor = STRESS_RUNTIME_SECS * 1000; + + int i = 0; + while (System.currentTimeMillis() - st < runFor) { + // flip flop the services back and forth + int from = i % 2; + int to = (i + 1) % 2; + + // Expire one service, it should fail over to the other + LOG.info("Failing over via expiration from " + from + " to " + to); + cluster.expireAndVerifyFailover(from, to); + + i++; + } + } + + /** + * Randomly expire the ZK sessions of the two ZKFCs. This differs + * from the above test in that it is not a controlled failover - + * we just do random expirations and expect neither one to ever + * generate fatal exceptions. + */ + @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000) + public void testRandomExpirations() throws Exception { + long st = System.currentTimeMillis(); + long runFor = STRESS_RUNTIME_SECS * 1000; + + Random r = new Random(); + while (System.currentTimeMillis() - st < runFor) { + cluster.getTestContext().checkException(); + int targetIdx = r.nextInt(2); + ActiveStandbyElector target = cluster.getElector(targetIdx); + long sessId = target.getZKSessionIdForTests(); + if (sessId != -1) { + LOG.info(String.format("Expiring session %x for svc %d", + sessId, targetIdx)); + getServer(serverFactory).closeSession(sessId); + } + Thread.sleep(r.nextInt(300)); + } + } + + /** + * Have the services fail their health checks half the time, + * causing the master role to bounce back and forth in the + * cluster. Meanwhile, causes ZK to disconnect clients every + * 50ms, to trigger the retry code and failures to become active. + */ + @Test(timeout=(STRESS_RUNTIME_SECS + EXTRA_TIMEOUT_SECS) * 1000) + public void testRandomHealthAndDisconnects() throws Exception { + long runFor = STRESS_RUNTIME_SECS * 1000; + Mockito.doAnswer(new RandomlyThrow(0)) + .when(cluster.getService(0).proxy).monitorHealth(); + Mockito.doAnswer(new RandomlyThrow(1)) + .when(cluster.getService(1).proxy).monitorHealth(); + ActiveStandbyElector.NUM_RETRIES = 100; + + long st = System.currentTimeMillis(); + while (System.currentTimeMillis() - st < runFor) { + cluster.getTestContext().checkException(); + serverFactory.closeAll(); + Thread.sleep(50); + } + } + + + /** + * Randomly throw an exception half the time the method is called + */ + @SuppressWarnings("rawtypes") + private static class RandomlyThrow implements Answer { + private Random r = new Random(); + private final int svcIdx; + public RandomlyThrow(int svcIdx) { + this.svcIdx = svcIdx; + } + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + if (r.nextBoolean()) { + LOG.info("Throwing an exception for svc " + svcIdx); + throw new HealthCheckFailedException("random failure"); + } + return invocation.callRealMethod(); + } + } +}