diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 1ff04f8d7c..2638f7e332 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -138,6 +138,8 @@ Release 0.23.3 - UNRELEASED HADOOP-7030. Add TableMapping topology implementation to read host to rack mapping from a file. (Patrick Angeles and tomwhite via tomwhite) + HADOOP-8206. Common portion of a ZK-based failover controller (todd) + IMPROVEMENTS HADOOP-7524. Change RPC to allow multiple protocols including multuple diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index b77187b793..ef05456249 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -35,6 +35,7 @@ import org.apache.zookeeper.Watcher; import org.apache.zookeeper.WatchedEvent; import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.ZKUtil; import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.CreateMode; import org.apache.zookeeper.AsyncCallback.*; @@ -135,11 +136,11 @@ public interface ActiveStandbyElectorCallback { private static final int NUM_RETRIES = 3; - private enum ConnectionState { + private static enum ConnectionState { DISCONNECTED, CONNECTED, TERMINATED }; - private enum State { + static enum State { INIT, ACTIVE, STANDBY, NEUTRAL }; @@ -282,6 +283,32 @@ public synchronized void ensureParentZNode() LOG.info("Successfully created " + znodeWorkingDir + " in ZK."); } + + /** + * Clear all of the state held within the parent ZNode. + * This recursively deletes everything within the znode as well as the + * parent znode itself. It should only be used when it's certain that + * no electors are currently participating in the election. + */ + public synchronized void clearParentZNode() + throws IOException, InterruptedException { + try { + LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK..."); + + zkDoWithRetries(new ZKAction() { + @Override + public Void run() throws KeeperException, InterruptedException { + ZKUtil.deleteRecursive(zkClient, znodeWorkingDir); + return null; + } + }); + } catch (KeeperException e) { + throw new IOException("Couldn't clear parent znode " + znodeWorkingDir, + e); + } + LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK."); + } + /** * Any service instance can drop out of the election by calling quitElection. @@ -592,6 +619,11 @@ void allowSessionReestablishmentForTests() { long getZKSessionIdForTests() { return zkClient.getSessionId(); } + + @VisibleForTesting + synchronized State getStateForTests() { + return state; + } private boolean reEstablishSession() { int connectionRetryCount = 0; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java index 9cff2a50c4..cf3c90e542 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/FailoverController.java @@ -24,7 +24,9 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ipc.RPC; import com.google.common.base.Preconditions; @@ -40,6 +42,8 @@ public class FailoverController { private static final Log LOG = LogFactory.getLog(FailoverController.class); + private static final int GRACEFUL_FENCE_TIMEOUT = 5000; + /** * Perform pre-failover checks on the given service we plan to * failover to, eg to prevent failing over to a service (eg due @@ -96,7 +100,35 @@ private static void preFailoverChecks(HAServiceTarget target, "Got an IO exception", e); } } - + + + /** + * Try to get the HA state of the node at the given address. This + * function is guaranteed to be "quick" -- ie it has a short timeout + * and no retries. Its only purpose is to avoid fencing a node that + * has already restarted. + */ + static boolean tryGracefulFence(Configuration conf, + HAServiceTarget svc) { + HAServiceProtocol proxy = null; + try { + proxy = svc.getProxy(conf, GRACEFUL_FENCE_TIMEOUT); + proxy.transitionToStandby(); + return true; + } catch (ServiceFailedException sfe) { + LOG.warn("Unable to gracefully make " + svc + " standby (" + + sfe.getMessage() + ")"); + } catch (IOException ioe) { + LOG.warn("Unable to gracefully make " + svc + + " standby (unable to connect)", ioe); + } finally { + if (proxy != null) { + RPC.stopProxy(proxy); + } + } + return false; + } + /** * Failover from service 1 to service 2. If the failover fails * then try to failback. @@ -118,16 +150,9 @@ public static void failover(HAServiceTarget fromSvc, // Try to make fromSvc standby boolean tryFence = true; - try { - HAServiceProtocolHelper.transitionToStandby(fromSvc.getProxy()); - // We should try to fence if we failed or it was forced - tryFence = forceFence ? true : false; - } catch (ServiceFailedException sfe) { - LOG.warn("Unable to make " + fromSvc + " standby (" + - sfe.getMessage() + ")"); - } catch (IOException ioe) { - LOG.warn("Unable to make " + fromSvc + - " standby (unable to connect)", ioe); + + if (tryGracefulFence(new Configuration(), fromSvc)) { + tryFence = forceFence; } // Fence fromSvc if it's required or forced by the user diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index b60910fe17..753352945d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -188,7 +188,8 @@ private void doHealthChecks() throws InterruptedException { proxy.monitorHealth(); healthy = true; } catch (HealthCheckFailedException e) { - LOG.warn("Service health check failed: " + e.getMessage()); + LOG.warn("Service health check failed for " + targetToMonitor + + ": " + e.getMessage()); enterState(State.SERVICE_UNHEALTHY); } catch (Throwable t) { LOG.warn("Transport-level exception trying to monitor health of " + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java new file mode 100644 index 0000000000..565c93b545 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -0,0 +1,387 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.security.PrivilegedAction; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.security.SecurityUtil; +import org.apache.hadoop.util.Tool; +import org.apache.hadoop.util.ToolRunner; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; + +import com.google.common.annotations.VisibleForTesting; +import com.google.common.base.Preconditions; + +@InterfaceAudience.LimitedPrivate("HDFS") +public abstract class ZKFailoverController implements Tool { + + static final Log LOG = LogFactory.getLog(ZKFailoverController.class); + + // TODO: this should be namespace-scoped + public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum"; + private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms"; + private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000; + private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode"; + static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha"; + + /** Unable to format the parent znode in ZK */ + static final int ERR_CODE_FORMAT_DENIED = 2; + /** The parent znode doesn't exist in ZK */ + static final int ERR_CODE_NO_PARENT_ZNODE = 3; + /** Fencing is not properly configured */ + static final int ERR_CODE_NO_FENCER = 4; + + private Configuration conf; + + private HealthMonitor healthMonitor; + private ActiveStandbyElector elector; + + private HAServiceTarget localTarget; + + private String parentZnode; + + private State lastHealthState = State.INITIALIZING; + + /** Set if a fatal error occurs */ + private String fatalError = null; + + @Override + public void setConf(Configuration conf) { + this.conf = conf; + localTarget = getLocalTarget(); + } + + + protected abstract byte[] targetToData(HAServiceTarget target); + protected abstract HAServiceTarget getLocalTarget(); + protected abstract HAServiceTarget dataToTarget(byte[] data); + + + @Override + public Configuration getConf() { + return conf; + } + + @Override + public int run(final String[] args) throws Exception { + // TODO: need to hook DFS here to find the NN keytab info, etc, + // similar to what DFSHAAdmin does. Annoying that this is in common. + try { + return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction() { + @Override + public Integer run() { + try { + return doRun(args); + } catch (Exception t) { + throw new RuntimeException(t); + } + } + }); + } catch (RuntimeException rte) { + throw (Exception)rte.getCause(); + } + } + + private int doRun(String[] args) + throws HadoopIllegalArgumentException, IOException, InterruptedException { + initZK(); + if (args.length > 0) { + if ("-formatZK".equals(args[0])) { + boolean force = false; + boolean interactive = true; + for (int i = 1; i < args.length; i++) { + if ("-force".equals(args[i])) { + force = true; + } else if ("-nonInteractive".equals(args[i])) { + interactive = false; + } else { + badArg(args[i]); + } + } + return formatZK(force, interactive); + } else { + badArg(args[0]); + } + } + + if (!elector.parentZNodeExists()) { + LOG.fatal("Unable to start failover controller. " + + "Parent znode does not exist.\n" + + "Run with -formatZK flag to initialize ZooKeeper."); + return ERR_CODE_NO_PARENT_ZNODE; + } + + try { + localTarget.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + LOG.fatal("Fencing is not configured for " + localTarget + ".\n" + + "You must configure a fencing method before using automatic " + + "failover.", e); + return ERR_CODE_NO_FENCER; + } + + initHM(); + mainLoop(); + return 0; + } + + private void badArg(String arg) { + printUsage(); + throw new HadoopIllegalArgumentException( + "Bad argument: " + arg); + } + + private void printUsage() { + System.err.println("Usage: " + this.getClass().getSimpleName() + + " [-formatZK [-force | -nonInteractive]]"); + } + + private int formatZK(boolean force, boolean interactive) + throws IOException, InterruptedException { + if (elector.parentZNodeExists()) { + if (!force && (!interactive || !confirmFormat())) { + return ERR_CODE_FORMAT_DENIED; + } + + try { + elector.clearParentZNode(); + } catch (IOException e) { + LOG.error("Unable to clear zk parent znode", e); + return 1; + } + } + + elector.ensureParentZNode(); + return 0; + } + + private boolean confirmFormat() { + System.err.println( + "===============================================\n" + + "The configured parent znode " + parentZnode + " already exists.\n" + + "Are you sure you want to clear all failover information from\n" + + "ZooKeeper?\n" + + "WARNING: Before proceeding, ensure that all HDFS services and\n" + + "failover controllers are stopped!\n" + + "==============================================="); + try { + return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?"); + } catch (IOException e) { + LOG.debug("Failed to confirm", e); + return false; + } + } + + // ------------------------------------------ + // Begin actual guts of failover controller + // ------------------------------------------ + + private void initHM() { + healthMonitor = new HealthMonitor(conf, localTarget); + healthMonitor.addCallback(new HealthCallbacks()); + healthMonitor.start(); + } + + private void initZK() throws HadoopIllegalArgumentException, IOException { + String zkQuorum = conf.get(ZK_QUORUM_KEY); + int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY, + ZK_SESSION_TIMEOUT_DEFAULT); + parentZnode = conf.get(ZK_PARENT_ZNODE_KEY, + ZK_PARENT_ZNODE_DEFAULT); + // TODO: need ZK ACL support in config, also maybe auth! + List zkAcls = Ids.OPEN_ACL_UNSAFE; + + Preconditions.checkArgument(zkQuorum != null, + "Missing required configuration '%s' for ZooKeeper quorum", + ZK_QUORUM_KEY); + Preconditions.checkArgument(zkTimeout > 0, + "Invalid ZK session timeout %s", zkTimeout); + + + elector = new ActiveStandbyElector(zkQuorum, + zkTimeout, parentZnode, zkAcls, new ElectorCallbacks()); + } + + private synchronized void mainLoop() throws InterruptedException { + while (fatalError == null) { + wait(); + } + assert fatalError != null; // only get here on fatal + throw new RuntimeException( + "ZK Failover Controller failed: " + fatalError); + } + + private synchronized void fatalError(String err) { + LOG.fatal("Fatal error occurred:" + err); + fatalError = err; + notifyAll(); + } + + private synchronized void becomeActive() { + LOG.info("Trying to make " + localTarget + " active..."); + try { + localTarget.getProxy().transitionToActive(); + LOG.info("Successfully transitioned " + localTarget + + " to active state"); + } catch (Throwable t) { + LOG.fatal("Couldn't make " + localTarget + " active", t); + elector.quitElection(true); +/* +* TODO: +* we need to make sure that if we get fenced and then quickly restarted, +* none of these calls will retry across the restart boundary +* perhaps the solution is that, whenever the nn starts, it gets a unique +* ID, and when we start becoming active, we record it, and then any future +* calls use the same ID +*/ + + } + } + + private synchronized void becomeStandby() { + LOG.info("ZK Election indicated that " + localTarget + + " should become standby"); + try { + localTarget.getProxy().transitionToStandby(); + LOG.info("Successfully transitioned " + localTarget + + " to standby state"); + } catch (Exception e) { + LOG.error("Couldn't transition " + localTarget + " to standby state", + e); + // TODO handle this. It's a likely case since we probably got fenced + // at the same time. + } + } + + /** + * @return the last health state passed to the FC + * by the HealthMonitor. + */ + @VisibleForTesting + State getLastHealthState() { + return lastHealthState; + } + + @VisibleForTesting + ActiveStandbyElector getElectorForTests() { + return elector; + } + + /** + * Callbacks from elector + */ + class ElectorCallbacks implements ActiveStandbyElectorCallback { + @Override + public void becomeActive() { + ZKFailoverController.this.becomeActive(); + } + + @Override + public void becomeStandby() { + ZKFailoverController.this.becomeStandby(); + } + + @Override + public void enterNeutralMode() { + } + + @Override + public void notifyFatalError(String errorMessage) { + fatalError(errorMessage); + } + + @Override + public void fenceOldActive(byte[] data) { + HAServiceTarget target = dataToTarget(data); + + LOG.info("Should fence: " + target); + boolean gracefulWorked = + FailoverController.tryGracefulFence(conf, target); + if (gracefulWorked) { + // It's possible that it's in standby but just about to go into active, + // no? Is there some race here? + LOG.info("Successfully transitioned " + target + " to standby " + + "state without fencing"); + return; + } + + try { + target.checkFencingConfigured(); + } catch (BadFencingConfigurationException e) { + LOG.error("Couldn't fence old active " + target, e); + // TODO: see below todo + throw new RuntimeException(e); + } + + if (!target.getFencer().fence(target)) { + // TODO: this will end up in some kind of tight loop, + // won't it? We need some kind of backoff + throw new RuntimeException("Unable to fence " + target); + } + } + } + + /** + * Callbacks from HealthMonitor + */ + class HealthCallbacks implements HealthMonitor.Callback { + @Override + public void enteredState(HealthMonitor.State newState) { + LOG.info("Local service " + localTarget + + " entered state: " + newState); + switch (newState) { + case SERVICE_HEALTHY: + LOG.info("Joining master election for " + localTarget); + elector.joinElection(targetToData(localTarget)); + break; + + case INITIALIZING: + LOG.info("Ensuring that " + localTarget + " does not " + + "participate in active master election"); + elector.quitElection(false); + break; + + case SERVICE_UNHEALTHY: + case SERVICE_NOT_RESPONDING: + LOG.info("Quitting master election for " + localTarget + + " and marking that fencing is necessary"); + elector.quitElection(true); + break; + + case HEALTH_MONITOR_FAILED: + fatalError("Health monitor failed!"); + break; + + default: + throw new IllegalArgumentException("Unhandled state:" + newState); + } + + lastHealthState = newState; + } + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java index d162d52e0b..49581000ca 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/ToolRunner.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.util; +import java.io.IOException; import java.io.PrintStream; import org.apache.hadoop.classification.InterfaceAudience; @@ -92,4 +93,33 @@ public static void printGenericCommandUsage(PrintStream out) { GenericOptionsParser.printGenericCommandUsage(out); } + + /** + * Print out a prompt to the user, and return true if the user + * responds with "y" or "yes". (case insensitive) + */ + public static boolean confirmPrompt(String prompt) throws IOException { + while (true) { + System.err.print(prompt + " (Y or N) "); + StringBuilder responseBuilder = new StringBuilder(); + while (true) { + int c = System.in.read(); + if (c == -1 || c == '\r' || c == '\n') { + break; + } + responseBuilder.append((char)c); + } + + String response = responseBuilder.toString(); + if (response.equalsIgnoreCase("y") || + response.equalsIgnoreCase("yes")) { + return true; + } else if (response.equalsIgnoreCase("n") || + response.equalsIgnoreCase("no")) { + return false; + } + System.err.println("Invalid input: " + response); + // else ask them again + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java index e24cf87aa8..dc87ebdd86 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java @@ -50,4 +50,15 @@ public static void waitForActiveLockData(TestContext ctx, Thread.sleep(50); } } + + public static void waitForElectorState(TestContext ctx, + ActiveStandbyElector elector, + ActiveStandbyElector.State state) throws Exception { + while (elector.getStateForTests() != state) { + if (ctx != null) { + ctx.checkException(); + } + Thread.sleep(50); + } + } } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java index 075117548c..3f9be3cacc 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/DummyHAService.java @@ -19,12 +19,15 @@ import java.io.IOException; import java.net.InetSocketAddress; +import java.util.ArrayList; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.security.AccessControlException; import org.mockito.Mockito; +import com.google.common.collect.Lists; + /** * Test-only implementation of {@link HAServiceTarget}, which returns * a mock implementation. @@ -36,12 +39,20 @@ class DummyHAService extends HAServiceTarget { InetSocketAddress address; boolean isHealthy = true; boolean actUnreachable = false; + boolean failToBecomeActive; + + static ArrayList instances = Lists.newArrayList(); + int index; DummyHAService(HAServiceState state, InetSocketAddress address) { this.state = state; this.proxy = makeMock(); this.fencer = Mockito.mock(NodeFencer.class); this.address = address; + synchronized (instances) { + instances.add(this); + this.index = instances.size(); + } } private HAServiceProtocol makeMock() { @@ -59,6 +70,10 @@ public void monitorHealth() throws HealthCheckFailedException, public void transitionToActive() throws ServiceFailedException, AccessControlException, IOException { checkUnreachable(); + if (failToBecomeActive) { + throw new ServiceFailedException("injected failure"); + } + state = HAServiceState.ACTIVE; } @@ -97,7 +112,7 @@ public HAServiceProtocol getProxy(Configuration conf, int timeout) throws IOException { return proxy; } - + @Override public NodeFencer getFencer() { return fencer; @@ -106,4 +121,13 @@ public NodeFencer getFencer() { @Override public void checkFencingConfigured() throws BadFencingConfigurationException { } -} \ No newline at end of file + + @Override + public String toString() { + return "DummyHAService #" + index; + } + + public static HAServiceTarget getInstance(int serial) { + return instances.get(serial - 1); + } +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java index 853a0e3446..f3c604e771 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestNodeFencer.java @@ -156,6 +156,10 @@ public boolean tryFence(HAServiceTarget target, String args) { @Override public void checkArgs(String args) { } + + public static HAServiceTarget getLastFencedService() { + return fencedSvc; + } } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java new file mode 100644 index 0000000000..93f46a533e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -0,0 +1,457 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.ha; + +import static org.junit.Assert.*; + +import java.io.File; +import java.net.InetSocketAddress; + +import org.apache.commons.logging.impl.Log4JLogger; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; +import org.apache.hadoop.ha.HealthMonitor.State; +import org.apache.hadoop.test.MultithreadedTestUtil; +import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread; +import org.apache.log4j.Level; +import org.apache.zookeeper.KeeperException.NoNodeException; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.server.ZooKeeperServer; +import org.apache.zookeeper.test.ClientBase; +import org.junit.Before; +import org.junit.Test; +import org.mockito.Mockito; + +import com.google.common.primitives.Ints; + +public class TestZKFailoverController extends ClientBase { + private Configuration conf; + private DummyHAService svc1; + private DummyHAService svc2; + private TestContext ctx; + private DummyZKFCThread thr1, thr2; + + static { + ((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL); + } + + @Override + public void setUp() throws Exception { + // build.test.dir is used by zookeeper + new File(System.getProperty("build.test.dir", "build")).mkdirs(); + super.setUp(); + } + + @Before + public void setupConfAndServices() { + conf = new Configuration(); + conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort); + // Fast check interval so tests run faster + conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50); + conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50); + svc1 = new DummyHAService(HAServiceState.INITIALIZING, + new InetSocketAddress("svc1", 1234)); + svc2 = new DummyHAService(HAServiceState.INITIALIZING, + new InetSocketAddress("svc2", 1234)); + } + + /** + * Set up two services and their failover controllers. svc1 is started + * first, so that it enters ACTIVE state, and then svc2 is started, + * which enters STANDBY + */ + private void setupFCs() throws Exception { + // Format the base dir, should succeed + assertEquals(0, runFC(svc1, "-formatZK")); + + ctx = new MultithreadedTestUtil.TestContext(); + thr1 = new DummyZKFCThread(ctx, svc1); + ctx.addThread(thr1); + thr1.start(); + + LOG.info("Waiting for svc1 to enter active state"); + waitForHAState(svc1, HAServiceState.ACTIVE); + + LOG.info("Adding svc2"); + thr2 = new DummyZKFCThread(ctx, svc2); + thr2.start(); + waitForHAState(svc2, HAServiceState.STANDBY); + } + + private void stopFCs() throws Exception { + if (thr1 != null) { + thr1.interrupt(); + } + if (thr2 != null) { + thr2.interrupt(); + } + if (ctx != null) { + ctx.stop(); + } + } + + /** + * Test that the various command lines for formatting the ZK directory + * function correctly. + */ + @Test(timeout=15000) + public void testFormatZK() throws Exception { + // Run without formatting the base dir, + // should barf + assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE, + runFC(svc1)); + + // Format the base dir, should succeed + assertEquals(0, runFC(svc1, "-formatZK")); + + // Should fail to format if already formatted + assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED, + runFC(svc1, "-formatZK", "-nonInteractive")); + + // Unless '-force' is on + assertEquals(0, runFC(svc1, "-formatZK", "-force")); + } + + /** + * Test that the ZKFC won't run if fencing is not configured for the + * local service. + */ + @Test(timeout=15000) + public void testFencingMustBeConfigured() throws Exception { + svc1 = Mockito.spy(svc1); + Mockito.doThrow(new BadFencingConfigurationException("no fencing")) + .when(svc1).checkFencingConfigured(); + // Format the base dir, should succeed + assertEquals(0, runFC(svc1, "-formatZK")); + // Try to run the actual FC, should fail without a fencer + assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER, + runFC(svc1)); + } + + /** + * Test that, when the health monitor indicates bad health status, + * failover is triggered. Also ensures that graceful active->standby + * transition is used when possible, falling back to fencing when + * the graceful approach fails. + */ + @Test(timeout=15000) + public void testAutoFailoverOnBadHealth() throws Exception { + try { + setupFCs(); + + LOG.info("Faking svc1 unhealthy, should failover to svc2"); + svc1.isHealthy = false; + LOG.info("Waiting for svc1 to enter standby state"); + waitForHAState(svc1, HAServiceState.STANDBY); + waitForHAState(svc2, HAServiceState.ACTIVE); + + LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " + + "and fail to gracefully go to standby"); + svc1.isHealthy = true; + svc2.actUnreachable = true; + + // Allow fencing to succeed + Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2)); + // Should fail back to svc1 at this point + waitForHAState(svc1, HAServiceState.ACTIVE); + // and fence svc2 + Mockito.verify(svc2.fencer).fence(Mockito.same(svc2)); + } finally { + stopFCs(); + } + } + + @Test(timeout=15000) + public void testAutoFailoverOnLostZKSession() throws Exception { + try { + setupFCs(); + + // Expire svc1, it should fail over to svc2 + expireAndVerifyFailover(thr1, thr2); + + // Expire svc2, it should fail back to svc1 + expireAndVerifyFailover(thr2, thr1); + + LOG.info("======= Running test cases second time to test " + + "re-establishment ========="); + // Expire svc1, it should fail over to svc2 + expireAndVerifyFailover(thr1, thr2); + + // Expire svc2, it should fail back to svc1 + expireAndVerifyFailover(thr2, thr1); + } finally { + stopFCs(); + } + } + + private void expireAndVerifyFailover(DummyZKFCThread fromThr, + DummyZKFCThread toThr) throws Exception { + DummyHAService fromSvc = fromThr.zkfc.localTarget; + DummyHAService toSvc = toThr.zkfc.localTarget; + + fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests(); + try { + expireActiveLockHolder(fromSvc); + + waitForHAState(fromSvc, HAServiceState.STANDBY); + waitForHAState(toSvc, HAServiceState.ACTIVE); + } finally { + fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests(); + } + } + + /** + * Test that, if the standby node is unhealthy, it doesn't try to become + * active + */ + @Test(timeout=15000) + public void testDontFailoverToUnhealthyNode() throws Exception { + try { + setupFCs(); + + // Make svc2 unhealthy, and wait for its FC to notice the bad health. + svc2.isHealthy = false; + waitForHealthState(thr2.zkfc, + HealthMonitor.State.SERVICE_UNHEALTHY); + + // Expire svc1 + thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests(); + try { + expireActiveLockHolder(svc1); + + LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" + + " a chance to take the lock, if it is ever going to."); + Thread.sleep(1000); + + // Ensure that no one holds the lock. + waitForActiveLockHolder(null); + + } finally { + LOG.info("Allowing svc1's elector to re-establish its connection"); + thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests(); + } + // svc1 should get the lock again + waitForActiveLockHolder(svc1); + } finally { + stopFCs(); + } + } + + /** + * Test that the ZKFC successfully quits the election when it fails to + * become active. This allows the old node to successfully fail back. + */ + @Test(timeout=15000) + public void testBecomingActiveFails() throws Exception { + try { + setupFCs(); + + LOG.info("Making svc2 fail to become active"); + svc2.failToBecomeActive = true; + + LOG.info("Faking svc1 unhealthy, should NOT successfully " + + "failover to svc2"); + svc1.isHealthy = false; + waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY); + waitForActiveLockHolder(null); + + Mockito.verify(svc2.proxy).transitionToActive(); + + waitForHAState(svc1, HAServiceState.STANDBY); + waitForHAState(svc2, HAServiceState.STANDBY); + + LOG.info("Faking svc1 healthy again, should go back to svc1"); + svc1.isHealthy = true; + waitForHAState(svc1, HAServiceState.ACTIVE); + waitForHAState(svc2, HAServiceState.STANDBY); + waitForActiveLockHolder(svc1); + } finally { + stopFCs(); + } + } + + /** + * Test that, when ZooKeeper fails, the system remains in its + * current state, without triggering any failovers, and without + * causing the active node to enter standby state. + */ + @Test(timeout=15000) + public void testZooKeeperFailure() throws Exception { + try { + setupFCs(); + + // Record initial ZK sessions + long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests(); + long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests(); + + LOG.info("====== Stopping ZK server"); + stopServer(); + waitForServerDown(hostPort, CONNECTION_TIMEOUT); + + LOG.info("====== Waiting for services to enter NEUTRAL mode"); + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr1.zkfc.getElectorForTests(), + ActiveStandbyElector.State.NEUTRAL); + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr2.zkfc.getElectorForTests(), + ActiveStandbyElector.State.NEUTRAL); + + LOG.info("====== Checking that the services didn't change HA state"); + assertEquals(HAServiceState.ACTIVE, svc1.state); + assertEquals(HAServiceState.STANDBY, svc2.state); + + LOG.info("====== Restarting server"); + startServer(); + waitForServerUp(hostPort, CONNECTION_TIMEOUT); + + // Nodes should go back to their original states, since they re-obtain + // the same sessions. + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr1.zkfc.getElectorForTests(), + ActiveStandbyElector.State.ACTIVE); + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr2.zkfc.getElectorForTests(), + ActiveStandbyElector.State.STANDBY); + // Check HA states didn't change. + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr1.zkfc.getElectorForTests(), + ActiveStandbyElector.State.ACTIVE); + ActiveStandbyElectorTestUtil.waitForElectorState(ctx, + thr2.zkfc.getElectorForTests(), + ActiveStandbyElector.State.STANDBY); + // Check they re-used the same sessions and didn't spuriously reconnect + assertEquals(session1, + thr1.zkfc.getElectorForTests().getZKSessionIdForTests()); + assertEquals(session2, + thr2.zkfc.getElectorForTests().getZKSessionIdForTests()); + } finally { + stopFCs(); + } + } + + /** + * Expire the ZK session of the given service. This requires + * (and asserts) that the given service be the current active. + * @throws NoNodeException if no service holds the lock + */ + private void expireActiveLockHolder(DummyHAService expectedActive) + throws NoNodeException { + ZooKeeperServer zks = getServer(serverFactory); + Stat stat = new Stat(); + byte[] data = zks.getZKDatabase().getData( + ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" + + ActiveStandbyElector.LOCK_FILENAME, stat, null); + + assertArrayEquals(Ints.toByteArray(expectedActive.index), data); + long session = stat.getEphemeralOwner(); + LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session); + zks.closeSession(session); + } + + /** + * Wait for the given HA service to enter the given HA state. + */ + private void waitForHAState(DummyHAService svc, HAServiceState state) + throws Exception { + while (svc.state != state) { + ctx.checkException(); + Thread.sleep(50); + } + } + + /** + * Wait for the ZKFC to be notified of a change in health state. + */ + private void waitForHealthState(DummyZKFC zkfc, State state) + throws Exception { + while (zkfc.getLastHealthState() != state) { + ctx.checkException(); + Thread.sleep(50); + } + } + + /** + * Wait for the given HA service to become the active lock holder. + * If the passed svc is null, waits for there to be no active + * lock holder. + */ + private void waitForActiveLockHolder(DummyHAService svc) + throws Exception { + ZooKeeperServer zks = getServer(serverFactory); + ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks, + ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT, + (svc == null) ? null : Ints.toByteArray(svc.index)); + } + + + private int runFC(DummyHAService target, String ... args) throws Exception { + DummyZKFC zkfc = new DummyZKFC(target); + zkfc.setConf(conf); + return zkfc.run(args); + } + + /** + * Test-thread which runs a ZK Failover Controller corresponding + * to a given dummy service. + */ + private class DummyZKFCThread extends TestingThread { + private final DummyZKFC zkfc; + + public DummyZKFCThread(TestContext ctx, DummyHAService svc) { + super(ctx); + this.zkfc = new DummyZKFC(svc); + zkfc.setConf(conf); + } + + @Override + public void doWork() throws Exception { + try { + assertEquals(0, zkfc.run(new String[0])); + } catch (InterruptedException ie) { + // Interrupted by main thread, that's OK. + } + } + } + + private static class DummyZKFC extends ZKFailoverController { + private final DummyHAService localTarget; + + public DummyZKFC(DummyHAService localTarget) { + this.localTarget = localTarget; + } + + @Override + protected byte[] targetToData(HAServiceTarget target) { + return Ints.toByteArray(((DummyHAService)target).index); + } + + @Override + protected HAServiceTarget dataToTarget(byte[] data) { + int index = Ints.fromByteArray(data); + return DummyHAService.getInstance(index); + } + + @Override + protected HAServiceTarget getLocalTarget() { + return localTarget; + } + } +}