From cbfe8fea0ee9753b1f79dd8bb9a2972d0539e9ec Mon Sep 17 00:00:00 2001 From: Suresh Srinivas Date: Wed, 25 Jan 2012 17:26:20 +0000 Subject: [PATCH] HADOOP-7992. Add ZKClient library to facilitate leader election. Contributed by Bikas Saha. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-1623@1235841 13f79535-47bb-0310-9956-ffa450edef68 --- .../hadoop-common/CHANGES.HDFS-1623.txt | 13 +- hadoop-common-project/hadoop-common/pom.xml | 28 + .../hadoop/ha/ActiveStandbyElector.java | 593 ++++++++++++++++++ .../hadoop/ha/TestActiveStandbyElector.java | 527 ++++++++++++++++ .../ha/TestActiveStandbyElectorRealZK.java | 223 +++++++ 5 files changed, 1379 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java create mode 100644 hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt index 0b86369a2d..b6bd6ed918 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-1623.txt @@ -9,21 +9,21 @@ HADOOP-7455. HA: Introduce HA Service Protocol Interface. (suresh) HADOOP-7774. HA: Administrative CLI to control HA daemons. (todd) HADOOP-7896. HA: if both NNs are in Standby mode, client needs to try failing - back and forth several times with sleeps. (atm) +back and forth several times with sleeps. (atm) HADOOP-7922. Improve some logging for client IPC failovers and - StandbyExceptions (todd) +StandbyExceptions (todd) HADOOP-7921. StandbyException should extend IOException (todd) HADOOP-7928. HA: Client failover policy is incorrectly trying to fail over all - IOExceptions (atm) +IOExceptions (atm) HADOOP-7925. Add interface and update CLI to query current state to - HAServiceProtocol (eli via todd) +HAServiceProtocol (eli via todd) HADOOP-7932. Make client connection retries on socket time outs configurable. - (Uma Maheswara Rao G via todd) +(Uma Maheswara Rao G via todd) HADOOP-7924. 
FailoverController for client-based configuration (eli) @@ -31,3 +31,6 @@ HADOOP-7961. Move HA fencing to common. (eli) HADOOP-7970. HAServiceProtocol methods must throw IOException. (Hari Mankude via suresh). + +HADOOP-7992. Add ZKClient library to facilitate leader election. +(Bikas Saha via suresh). diff --git a/hadoop-common-project/hadoop-common/pom.xml b/hadoop-common-project/hadoop-common/pom.xml index 23d61f825b..497af85aa2 100644 --- a/hadoop-common-project/hadoop-common/pom.xml +++ b/hadoop-common-project/hadoop-common/pom.xml @@ -268,6 +268,34 @@ com.jcraft jsch + + + org.apache.zookeeper + zookeeper + 3.4.2 + + + + junit + junit + + + com.sun.jdmk + jmxtools + + + com.sun.jmx + jmxri + + + + + org.apache.zookeeper + zookeeper + 3.4.2 + test-jar + test + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java new file mode 100644 index 0000000000..e91c4ce992 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -0,0 +1,593 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.Watcher; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.AsyncCallback.*; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.KeeperException.Code; + +import com.google.common.annotations.VisibleForTesting; + +/** + * + * This class implements a simple library to perform leader election on top of + * Apache Zookeeper. Using Zookeeper as a coordination service, leader election + * can be performed by atomically creating an ephemeral lock file (znode) on + * Zookeeper. The service instance that successfully creates the znode becomes + * active and the rest become standbys.
+ * This election mechanism is only efficient for small number of election + * candidates (order of 10's) because contention on single znode by a large + * number of candidates can result in Zookeeper overload.
+ * The elector does not guarantee fencing (protection of shared resources) among + * service instances. After it has notified an instance about becoming a leader, + * then that instance must ensure that it meets the service consistency + * requirements. If it cannot do so, then it is recommended to quit the + * election. The application implements the {@link ActiveStandbyElectorCallback} + * to interact with the elector + */ +@InterfaceAudience.Private +@InterfaceStability.Evolving +public class ActiveStandbyElector implements Watcher, StringCallback, + StatCallback { + + /** + * Callback interface to interact with the ActiveStandbyElector object.
+ * The application will be notified with a callback only on state changes + * (i.e. there will never be successive calls to becomeActive without an + * intermediate call to enterNeutralMode).
+ * The callbacks will be running on Zookeeper client library threads. The + * application should return from these callbacks quickly so as not to impede + * Zookeeper client library performance and notifications. The app will + * typically remember the state change and return from the callback. It will + * then proceed with implementing actions around that state change. It is + * possible to be called back again while these actions are in flight and the + * app should handle this scenario. + */ + public interface ActiveStandbyElectorCallback { + /** + * This method is called when the app becomes the active leader + */ + void becomeActive(); + + /** + * This method is called when the app becomes a standby + */ + void becomeStandby(); + + /** + * If the elector gets disconnected from Zookeeper and does not know about + * the lock state, then it will notify the service via the enterNeutralMode + * interface. The service may choose to ignore this or stop doing state + * changing operations. Upon reconnection, the elector verifies the leader + * status and calls back on the becomeActive and becomeStandby app + * interfaces.
+ * Zookeeper disconnects can happen due to network issues or loss of + * Zookeeper quorum. Thus enterNeutralMode can be used to guard against + * split-brain issues. In such situations it might be prudent to call + * becomeStandby too. However, such state change operations might be + * expensive and enterNeutralMode can help guard against doing that for + * transient issues. + */ + void enterNeutralMode(); + + /** + * If there is any fatal error (e.g. wrong ACL's, unexpected Zookeeper + * errors or Zookeeper persistent unavailability) then notifyFatalError is + * called to notify the app about it. + */ + void notifyFatalError(String errorMessage); + } + + /** + * Name of the lock znode used by the library. Protected for access in test + * classes + */ + @VisibleForTesting + protected static final String LOCKFILENAME = "ActiveStandbyElectorLock"; + + public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); + + private static final int NUM_RETRIES = 3; + + private enum ConnectionState { + DISCONNECTED, CONNECTED, TERMINATED + }; + + private enum State { + INIT, ACTIVE, STANDBY, NEUTRAL + }; + + private State state = State.INIT; + private int createRetryCount = 0; + private int statRetryCount = 0; + private ZooKeeper zkClient; + private ConnectionState zkConnectionState = ConnectionState.TERMINATED; + + private final ActiveStandbyElectorCallback appClient; + private final String zkHostPort; + private final int zkSessionTimeout; + private final List zkAcl; + private byte[] appData; + private final String zkLockFilePath; + private final String znodeWorkingDir; + + /** + * Create a new ActiveStandbyElector object
+ * The elector is created by providing to it the Zookeeper configuration, the + * parent znode under which to create the znode and a reference to the + * callback interface.
+ * The parent znode name must be the same for all service instances and + * different across services.
+ * After the leader has been lost, a new leader will be elected after the + * session timeout expires. Hence, the app must set this parameter based on + * its needs for failure response time. The session timeout must be greater + * than the Zookeeper disconnect timeout and is recommended to be 3X that + * value to enable Zookeeper to retry transient disconnections. Setting a very + * short session timeout may result in frequent transitions between active and + * standby states during issues like network outages/GS pauses. + * + * @param zookeeperHostPorts + * ZooKeeper hostPort for all ZooKeeper servers + * @param zookeeperSessionTimeout + * ZooKeeper session timeout + * @param parentZnodeName + * znode under which to create the lock + * @param acl + * ZooKeeper ACL's + * @param app + * reference to callback interface object + * @throws IOException + * @throws HadoopIllegalArgumentException + */ + public ActiveStandbyElector(String zookeeperHostPorts, + int zookeeperSessionTimeout, String parentZnodeName, List acl, + ActiveStandbyElectorCallback app) throws IOException, + HadoopIllegalArgumentException { + if (app == null || acl == null || parentZnodeName == null + || zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) { + throw new HadoopIllegalArgumentException("Invalid argument"); + } + zkHostPort = zookeeperHostPorts; + zkSessionTimeout = zookeeperSessionTimeout; + zkAcl = acl; + appClient = app; + znodeWorkingDir = parentZnodeName; + zkLockFilePath = znodeWorkingDir + "/" + LOCKFILENAME; + + // createConnection for future API calls + createConnection(); + } + + /** + * To participate in election, the app will call joinElection. The result will + * be notified by a callback on either the becomeActive or becomeStandby app + * interfaces.
+ * After this the elector will automatically monitor the leader status and + * perform re-election if necessary
+ * The app could potentially start off in standby mode and ignore the + * becomeStandby call. + * + * @param data + * to be set by the app. non-null data must be set. + * @throws HadoopIllegalArgumentException + * if valid data is not supplied + */ + public synchronized void joinElection(byte[] data) + throws HadoopIllegalArgumentException { + LOG.debug("Attempting active election"); + + if (data == null) { + throw new HadoopIllegalArgumentException("data cannot be null"); + } + + appData = new byte[data.length]; + System.arraycopy(data, 0, appData, 0, data.length); + + joinElectionInternal(); + } + + /** + * Any service instance can drop out of the election by calling quitElection. + *
+ * This will lose any leader status, if held, and stop monitoring of the lock + * node.
+ * If the instance wants to participate in election again, then it needs to + * call joinElection().
+ * This allows service instances to take themselves out of rotation for known + * impending unavailable states (e.g. long GC pause or software upgrade). + */ + public synchronized void quitElection() { + LOG.debug("Yielding from election"); + reset(); + } + + /** + * Exception thrown when there is no active leader + */ + public class ActiveNotFoundException extends Exception { + private static final long serialVersionUID = 3505396722342846462L; + } + + /** + * get data set by the active leader + * + * @return data set by the active instance + * @throws ActiveNotFoundException + * when there is no active leader + * @throws KeeperException + * other zookeeper operation errors + * @throws InterruptedException + * @throws IOException + * when ZooKeeper connection could not be established + */ + public synchronized byte[] getActiveData() throws ActiveNotFoundException, + KeeperException, InterruptedException, IOException { + try { + if (zkClient == null) { + createConnection(); + } + Stat stat = new Stat(); + return zkClient.getData(zkLockFilePath, false, stat); + } catch(KeeperException e) { + Code code = e.code(); + if (operationNodeDoesNotExist(code)) { + // handle the commonly expected cases that make sense for us + throw new ActiveNotFoundException(); + } else { + throw e; + } + } + } + + /** + * interface implementation of Zookeeper callback for create + */ + @Override + public synchronized void processResult(int rc, String path, Object ctx, + String name) { + LOG.debug("CreateNode result: " + rc + " for path: " + path + + " connectionState: " + zkConnectionState); + if (zkClient == null) { + // zkClient is nulled before closing the connection + // this is the callback with session expired after we closed the session + return; + } + + Code code = Code.get(rc); + if (operationSuccess(code)) { + // we successfully created the znode. we are the leader. start monitoring + becomeActive(); + monitorActiveStatus(); + return; + } + + if (operationNodeExists(code)) { + if (createRetryCount == 0) { + // znode exists and we did not retry the operation. so a different + // instance has created it. become standby and monitor lock. + becomeStandby(); + } + // if we had retried then the znode could have been created by our first + // attempt to the server (that we lost) and this node exists response is + // for the second attempt. verify this case via ephemeral node owner. this + // will happen on the callback for monitoring the lock. + monitorActiveStatus(); + return; + } + + String errorMessage = "Received create error from Zookeeper. code:" + + code.toString(); + LOG.debug(errorMessage); + + if (operationRetry(code)) { + if (createRetryCount < NUM_RETRIES) { + LOG.debug("Retrying createNode createRetryCount: " + createRetryCount); + ++createRetryCount; + createNode(); + return; + } + errorMessage = errorMessage + + ". Not retrying further znode create connection errors."; + } + + fatalError(errorMessage); + } + + /** + * interface implementation of Zookeeper callback for monitor (exists) + */ + @Override + public synchronized void processResult(int rc, String path, Object ctx, + Stat stat) { + LOG.debug("StatNode result: " + rc + " for path: " + path + + " connectionState: " + zkConnectionState); + if (zkClient == null) { + // zkClient is nulled before closing the connection + // this is the callback with session expired after we closed the session + return; + } + + Code code = Code.get(rc); + if (operationSuccess(code)) { + // the following owner check completes verification in case the lock znode + // creation was retried + if (stat.getEphemeralOwner() == zkClient.getSessionId()) { + // we own the lock znode. so we are the leader + becomeActive(); + } else { + // we dont own the lock znode. so we are a standby. + becomeStandby(); + } + // the watch set by us will notify about changes + return; + } + + if (operationNodeDoesNotExist(code)) { + // the lock znode disappeared before we started monitoring it + enterNeutralMode(); + joinElectionInternal(); + return; + } + + String errorMessage = "Received stat error from Zookeeper. code:" + + code.toString(); + LOG.debug(errorMessage); + + if (operationRetry(code)) { + if (statRetryCount < NUM_RETRIES) { + ++statRetryCount; + monitorNode(); + return; + } + errorMessage = errorMessage + + ". Not retrying further znode monitoring connection errors."; + } + + fatalError(errorMessage); + } + + /** + * interface implementation of Zookeeper watch events (connection and node) + */ + @Override + public synchronized void process(WatchedEvent event) { + Event.EventType eventType = event.getType(); + LOG.debug("Watcher event type: " + eventType + " with state:" + + event.getState() + " for path:" + event.getPath() + + " connectionState: " + zkConnectionState); + if (zkClient == null) { + // zkClient is nulled before closing the connection + // this is the callback with session expired after we closed the session + return; + } + + if (eventType == Event.EventType.None) { + // the connection state has changed + switch (event.getState()) { + case SyncConnected: + // if the listener was asked to move to safe state then it needs to + // be undone + ConnectionState prevConnectionState = zkConnectionState; + zkConnectionState = ConnectionState.CONNECTED; + if (prevConnectionState == ConnectionState.DISCONNECTED) { + monitorActiveStatus(); + } + break; + case Disconnected: + // ask the app to move to safe state because zookeeper connection + // is not active and we dont know our state + zkConnectionState = ConnectionState.DISCONNECTED; + enterNeutralMode(); + break; + case Expired: + // the connection got terminated because of session timeout + // call listener to reconnect + enterNeutralMode(); + reJoinElection(); + break; + default: + fatalError("Unexpected Zookeeper watch event state: " + + event.getState()); + break; + } + + return; + } + + // a watch on lock path in zookeeper has fired. so something has changed on + // the lock. ideally we should check that the path is the same as the lock + // path but trusting zookeeper for now + String path = event.getPath(); + if (path != null) { + switch (eventType) { + case NodeDeleted: + if (state == State.ACTIVE) { + enterNeutralMode(); + } + joinElectionInternal(); + break; + case NodeDataChanged: + monitorActiveStatus(); + break; + default: + LOG.debug("Unexpected node event: " + eventType + " for path: " + path); + monitorActiveStatus(); + } + + return; + } + + // some unexpected error has occurred + fatalError("Unexpected watch error from Zookeeper"); + } + + /** + * Get a new zookeeper client instance. protected so that test class can + * inherit and pass in a mock object for zookeeper + * + * @return new zookeeper client instance + * @throws IOException + */ + protected synchronized ZooKeeper getNewZooKeeper() throws IOException { + return new ZooKeeper(zkHostPort, zkSessionTimeout, this); + } + + private void fatalError(String errorMessage) { + reset(); + appClient.notifyFatalError(errorMessage); + } + + private void monitorActiveStatus() { + LOG.debug("Monitoring active leader"); + statRetryCount = 0; + monitorNode(); + } + + private void joinElectionInternal() { + if (zkClient == null) { + if (!reEstablishSession()) { + fatalError("Failed to reEstablish connection with ZooKeeper"); + return; + } + } + + createRetryCount = 0; + createNode(); + } + + private void reJoinElection() { + LOG.debug("Trying to re-establish ZK session"); + terminateConnection(); + joinElectionInternal(); + } + + private boolean reEstablishSession() { + int connectionRetryCount = 0; + boolean success = false; + while(!success && connectionRetryCount < NUM_RETRIES) { + LOG.debug("Establishing zookeeper connection"); + try { + createConnection(); + success = true; + } catch(IOException e) { + LOG.warn(e); + try { + Thread.sleep(5000); + } catch(InterruptedException e1) { + LOG.warn(e1); + } + } + ++connectionRetryCount; + } + return success; + } + + private void createConnection() throws IOException { + zkClient = getNewZooKeeper(); + } + + private void terminateConnection() { + if (zkClient == null) { + return; + } + LOG.debug("Terminating ZK connection"); + ZooKeeper tempZk = zkClient; + zkClient = null; + try { + tempZk.close(); + } catch(InterruptedException e) { + LOG.warn(e); + } + zkConnectionState = ConnectionState.TERMINATED; + } + + private void reset() { + state = State.INIT; + terminateConnection(); + } + + private void becomeActive() { + if (state != State.ACTIVE) { + LOG.debug("Becoming active"); + state = State.ACTIVE; + appClient.becomeActive(); + } + } + + private void becomeStandby() { + if (state != State.STANDBY) { + LOG.debug("Becoming standby"); + state = State.STANDBY; + appClient.becomeStandby(); + } + } + + private void enterNeutralMode() { + if (state != State.NEUTRAL) { + LOG.debug("Entering neutral mode"); + state = State.NEUTRAL; + appClient.enterNeutralMode(); + } + } + + private void createNode() { + zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, + null); + } + + private void monitorNode() { + zkClient.exists(zkLockFilePath, true, this, null); + } + + private boolean operationSuccess(Code code) { + return (code == Code.OK); + } + + private boolean operationNodeExists(Code code) { + return (code == Code.NODEEXISTS); + } + + private boolean operationNodeDoesNotExist(Code code) { + return (code == Code.NONODE); + } + + private boolean operationRetry(Code code) { + switch (code) { + case CONNECTIONLOSS: + case OPERATIONTIMEOUT: + return true; + } + return false; + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java new file mode 100644 index 0000000000..fec350d3bc --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -0,0 +1,527 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.util.List; + +import org.apache.zookeeper.AsyncCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.KeeperException.Code; +import org.apache.zookeeper.WatchedEvent; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.Watcher.Event; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.data.Stat; +import org.apache.zookeeper.ZooDefs.Ids; +import org.junit.Before; +import org.junit.Test; +import org.junit.Assert; +import org.mockito.Mockito; + +import org.apache.hadoop.HadoopIllegalArgumentException; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; + +public class TestActiveStandbyElector { + + static ZooKeeper mockZK; + static int count; + static ActiveStandbyElectorCallback mockApp; + static final byte[] data = new byte[8]; + + ActiveStandbyElectorTester elector; + + class ActiveStandbyElectorTester extends ActiveStandbyElector { + ActiveStandbyElectorTester(String hostPort, int timeout, String parent, + List acl, ActiveStandbyElectorCallback app) throws IOException { + super(hostPort, timeout, parent, acl, app); + } + + @Override + public ZooKeeper getNewZooKeeper() { + ++TestActiveStandbyElector.count; + return TestActiveStandbyElector.mockZK; + } + + } + + private static final String zkParentName = "/zookeeper"; + private static final String zkLockPathName = "/zookeeper/" + + ActiveStandbyElector.LOCKFILENAME; + + @Before + public void init() throws IOException { + count = 0; + mockZK = Mockito.mock(ZooKeeper.class); + mockApp = Mockito.mock(ActiveStandbyElectorCallback.class); + elector = new ActiveStandbyElectorTester("hostPort", 1000, zkParentName, + Ids.OPEN_ACL_UNSAFE, mockApp); + } + + /** + * verify that joinElection checks for null data + */ + @Test(expected = HadoopIllegalArgumentException.class) + public void testJoinElectionException() { + elector.joinElection(null); + } + + /** + * verify that joinElection tries to create ephemeral lock znode + */ + @Test + public void testJoinElection() { + elector.joinElection(data); + Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + } + + /** + * verify that successful znode create result becomes active and monitoring is + * started + */ + @Test + public void testCreateNodeResultBecomeActive() { + elector.joinElection(data); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + // monitor callback verifies the leader is ephemeral owner of lock but does + // not call becomeActive since its already active + Stat stat = new Stat(); + stat.setEphemeralOwner(1L); + Mockito.when(mockZK.getSessionId()).thenReturn(1L); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + // should not call neutral mode/standby/active + Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); + Mockito.verify(mockApp, Mockito.times(0)).becomeStandby(); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + // another joinElection not called. + Mockito.verify(mockZK, Mockito.times(1)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + // no new monitor called + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + } + + /** + * verify that znode create for existing node and no retry becomes standby and + * monitoring is started + */ + @Test + public void testCreateNodeResultBecomeStandby() { + elector.joinElection(data); + + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + } + + /** + * verify that znode create error result in fatal error + */ + @Test + public void testCreateNodeResultError() { + elector.joinElection(data); + + elector.processResult(Code.APIERROR.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( + "Received create error from Zookeeper. code:APIERROR"); + } + + /** + * verify that retry of network errors verifies master by session id and + * becomes active if they match. monitoring is started. + */ + @Test + public void testCreateNodeResultRetryBecomeActive() { + elector.joinElection(data); + + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + // 4 errors results in fatalError + Mockito + .verify(mockApp, Mockito.times(1)) + .notifyFatalError( + "Received create error from Zookeeper. code:CONNECTIONLOSS. "+ + "Not retrying further znode create connection errors."); + + elector.joinElection(data); + // recreate connection via getNewZooKeeper + Assert.assertEquals(2, TestActiveStandbyElector.count); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + Stat stat = new Stat(); + stat.setEphemeralOwner(1L); + Mockito.when(mockZK.getSessionId()).thenReturn(1L); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + Mockito.verify(mockZK, Mockito.times(6)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + } + + /** + * verify that retry of network errors verifies active by session id and + * becomes standby if they dont match. monitoring is started. + */ + @Test + public void testCreateNodeResultRetryBecomeStandby() { + elector.joinElection(data); + + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + Stat stat = new Stat(); + stat.setEphemeralOwner(0); + Mockito.when(mockZK.getSessionId()).thenReturn(1L); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, stat); + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + } + + /** + * verify that if create znode results in nodeexists and that znode is deleted + * before exists() watch is set then the return of the exists() method results + * in attempt to re-create the znode and become active + */ + @Test + public void testCreateNodeResultRetryNoNode() { + elector.joinElection(data); + + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + zkLockPathName); + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + elector.processResult(Code.NONODE.intValue(), zkLockPathName, null, + (Stat) null); + Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); + Mockito.verify(mockZK, Mockito.times(4)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + } + + /** + * verify that more than 3 network error retries result fatalError + */ + @Test + public void testStatNodeRetry() { + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + (Stat) null); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + (Stat) null); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + (Stat) null); + elector.processResult(Code.CONNECTIONLOSS.intValue(), zkLockPathName, null, + (Stat) null); + Mockito + .verify(mockApp, Mockito.times(1)) + .notifyFatalError( + "Received stat error from Zookeeper. code:CONNECTIONLOSS. "+ + "Not retrying further znode monitoring connection errors."); + } + + /** + * verify error in exists() callback results in fatal error + */ + @Test + public void testStatNodeError() { + elector.processResult(Code.RUNTIMEINCONSISTENCY.intValue(), zkLockPathName, + null, (Stat) null); + Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); + Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( + "Received stat error from Zookeeper. code:RUNTIMEINCONSISTENCY"); + } + + /** + * verify behavior of watcher.process callback with non-node event + */ + @Test + public void testProcessCallbackEventNone() { + elector.joinElection(data); + + WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); + Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.None); + + // first SyncConnected should not do anything + Mockito.when(mockEvent.getState()).thenReturn( + Event.KeeperState.SyncConnected); + elector.process(mockEvent); + Mockito.verify(mockZK, Mockito.times(0)).exists(Mockito.anyString(), + Mockito.anyBoolean(), Mockito. anyObject(), + Mockito. anyObject()); + + // disconnection should enter safe mode + Mockito.when(mockEvent.getState()).thenReturn( + Event.KeeperState.Disconnected); + elector.process(mockEvent); + Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); + + // re-connection should monitor master status + Mockito.when(mockEvent.getState()).thenReturn( + Event.KeeperState.SyncConnected); + elector.process(mockEvent); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + // session expired should enter safe mode and initiate re-election + // re-election checked via checking re-creation of new zookeeper and + // call to create lock znode + Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.Expired); + elector.process(mockEvent); + // already in safe mode above. should not enter safe mode again + Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); + // called getNewZooKeeper to create new session. first call was in + // constructor + Assert.assertEquals(2, TestActiveStandbyElector.count); + // once in initial joinElection and one now + Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + + // create znode success. become master and monitor + elector.processResult(Code.OK.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, + elector, null); + + // error event results in fatal error + Mockito.when(mockEvent.getState()).thenReturn(Event.KeeperState.AuthFailed); + elector.process(mockEvent); + Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( + "Unexpected Zookeeper watch event state: AuthFailed"); + // only 1 state change callback is called at a time + Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); + } + + /** + * verify behavior of watcher.process with node event + */ + @Test + public void testProcessCallbackEventNode() { + elector.joinElection(data); + + // make the object go into the monitoring state + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); + Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); + + // monitoring should be setup again after event is received + Mockito.when(mockEvent.getType()).thenReturn( + Event.EventType.NodeDataChanged); + elector.process(mockEvent); + Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, + elector, null); + + // monitoring should be setup again after event is received + Mockito.when(mockEvent.getType()).thenReturn( + Event.EventType.NodeChildrenChanged); + elector.process(mockEvent); + Mockito.verify(mockZK, Mockito.times(3)).exists(zkLockPathName, true, + elector, null); + + // lock node deletion when in standby mode should create znode again + // successful znode creation enters active state and sets monitor + Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); + elector.process(mockEvent); + // enterNeutralMode not called when app is standby and leader is lost + Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); + // once in initial joinElection() and one now + Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + Mockito.verify(mockZK, Mockito.times(4)).exists(zkLockPathName, true, + elector, null); + + // lock node deletion in active mode should enter neutral mode and create + // znode again successful znode creation enters active state and sets + // monitor + Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); + elector.process(mockEvent); + Mockito.verify(mockApp, Mockito.times(1)).enterNeutralMode(); + // another joinElection called + Mockito.verify(mockZK, Mockito.times(3)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + elector.processResult(Code.OK.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(2)).becomeActive(); + Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, + elector, null); + + // bad path name results in fatal error + Mockito.when(mockEvent.getPath()).thenReturn(null); + elector.process(mockEvent); + Mockito.verify(mockApp, Mockito.times(1)).notifyFatalError( + "Unexpected watch error from Zookeeper"); + // fatal error means no new connection other than one from constructor + Assert.assertEquals(1, TestActiveStandbyElector.count); + // no new watches after fatal error + Mockito.verify(mockZK, Mockito.times(5)).exists(zkLockPathName, true, + elector, null); + + } + + /** + * verify becomeStandby is not called if already in standby + */ + @Test + public void testSuccessiveStandbyCalls() { + elector.joinElection(data); + + // make the object go into the monitoring standby state + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + WatchedEvent mockEvent = Mockito.mock(WatchedEvent.class); + Mockito.when(mockEvent.getPath()).thenReturn(zkLockPathName); + + // notify node deletion + // monitoring should be setup again after event is received + Mockito.when(mockEvent.getType()).thenReturn(Event.EventType.NodeDeleted); + elector.process(mockEvent); + // is standby. no need to notify anything now + Mockito.verify(mockApp, Mockito.times(0)).enterNeutralMode(); + // another joinElection called. + Mockito.verify(mockZK, Mockito.times(2)).create(zkLockPathName, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, null); + // lost election + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + // still standby. so no need to notify again + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + // monitor is set again + Mockito.verify(mockZK, Mockito.times(2)).exists(zkLockPathName, true, + elector, null); + } + + /** + * verify quit election terminates connection and there are no new watches. + * next call to joinElection creates new connection and performs election + */ + @Test + public void testQuitElection() throws InterruptedException { + elector.quitElection(); + Mockito.verify(mockZK, Mockito.times(1)).close(); + // no watches added + Mockito.verify(mockZK, Mockito.times(0)).exists(zkLockPathName, true, + elector, null); + + byte[] data = new byte[8]; + elector.joinElection(data); + // getNewZooKeeper called 2 times. once in constructor and once now + Assert.assertEquals(2, TestActiveStandbyElector.count); + elector.processResult(Code.NODEEXISTS.intValue(), zkLockPathName, null, + zkLockPathName); + Mockito.verify(mockApp, Mockito.times(1)).becomeStandby(); + Mockito.verify(mockZK, Mockito.times(1)).exists(zkLockPathName, true, + elector, null); + + } + + /** + * verify that receiveActiveData gives data when active exists, tells that + * active does not exist and reports error in getting active information + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + * @throws ActiveNotFoundException + */ + @Test + public void testGetActiveData() throws ActiveNotFoundException, + KeeperException, InterruptedException, IOException { + // get valid active data + byte[] data = new byte[8]; + Mockito.when( + mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject())).thenReturn(data); + Assert.assertEquals(data, elector.getActiveData()); + Mockito.verify(mockZK, Mockito.times(1)).getData( + Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject()); + + // active does not exist + Mockito.when( + mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject())).thenThrow( + new KeeperException.NoNodeException()); + try { + elector.getActiveData(); + Assert.fail("ActiveNotFoundException expected"); + } catch(ActiveNotFoundException e) { + Mockito.verify(mockZK, Mockito.times(2)).getData( + Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject()); + } + + // error getting active data rethrows keeperexception + try { + Mockito.when( + mockZK.getData(Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject())).thenThrow( + new KeeperException.AuthFailedException()); + elector.getActiveData(); + Assert.fail("KeeperException.AuthFailedException expected"); + } catch(KeeperException.AuthFailedException ke) { + Mockito.verify(mockZK, Mockito.times(3)).getData( + Mockito.eq(zkLockPathName), Mockito.eq(false), + Mockito. anyObject()); + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java new file mode 100644 index 0000000000..85a5f8b682 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElectorRealZK.java @@ -0,0 +1,223 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.ha; + +import java.io.IOException; +import java.util.List; + +import org.junit.Assert; +import org.junit.Test; + +import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.zookeeper.CreateMode; +import org.apache.zookeeper.KeeperException; +import org.apache.zookeeper.ZooKeeper; +import org.apache.zookeeper.ZooDefs.Ids; +import org.apache.zookeeper.data.ACL; +import org.apache.zookeeper.test.ClientBase; + +/** + * Test for {@link ActiveStandbyElector} using real zookeeper. + */ +public class TestActiveStandbyElectorRealZK extends ClientBase { + static final int NUM_ELECTORS = 2; + static ZooKeeper[] zkClient = new ZooKeeper[NUM_ELECTORS]; + static int currentClientIndex = 0; + + class ActiveStandbyElectorTesterRealZK extends ActiveStandbyElector { + ActiveStandbyElectorTesterRealZK(String hostPort, int timeout, + String parent, List acl, ActiveStandbyElectorCallback app) + throws IOException { + super(hostPort, timeout, parent, acl, app); + } + + @Override + public ZooKeeper getNewZooKeeper() { + return TestActiveStandbyElectorRealZK.zkClient[ + TestActiveStandbyElectorRealZK.currentClientIndex]; + } + } + + /** + * The class object runs on a thread and waits for a signal to start from the + * test object. On getting the signal it joins the election and thus by doing + * this on multiple threads we can test simultaneous attempts at leader lock + * creation. after joining the election, the object waits on a signal to exit. + * this signal comes when the object's elector has become a leader or there is + * an unexpected fatal error. this lets another thread object to become a + * leader. + */ + class ThreadRunner implements Runnable, ActiveStandbyElectorCallback { + int index; + TestActiveStandbyElectorRealZK test; + boolean wait = true; + + ThreadRunner(int i, TestActiveStandbyElectorRealZK s) { + index = i; + test = s; + } + + @Override + public void run() { + LOG.info("starting " + index); + while(true) { + synchronized (test) { + // wait for test start signal to come + if (!test.start) { + try { + test.wait(); + } catch(InterruptedException e) { + Assert.fail(e.getMessage()); + } + } else { + break; + } + } + } + // join election + byte[] data = new byte[8]; + ActiveStandbyElector elector = test.elector[index]; + LOG.info("joining " + index); + elector.joinElection(data); + try { + while(true) { + synchronized (this) { + // wait for elector to become active/fatal error + if (wait) { + // wait to become active + // wait capped at 30s to prevent hung test + wait(30000); + } else { + break; + } + } + } + Thread.sleep(1000); + // quit election to allow other elector to become active + elector.quitElection(); + } catch(InterruptedException e) { + Assert.fail(e.getMessage()); + } + LOG.info("ending " + index); + } + + @Override + public synchronized void becomeActive() { + test.reportActive(index); + LOG.info("active " + index); + wait = false; + notifyAll(); + } + + @Override + public synchronized void becomeStandby() { + test.reportStandby(index); + LOG.info("standby " + index); + } + + @Override + public synchronized void enterNeutralMode() { + LOG.info("neutral " + index); + } + + @Override + public synchronized void notifyFatalError(String errorMessage) { + LOG.info("fatal " + index + " .Error message:" + errorMessage); + wait = false; + notifyAll(); + } + } + + boolean start = false; + int activeIndex = -1; + int standbyIndex = -1; + String parentDir = "/" + java.util.UUID.randomUUID().toString(); + + ActiveStandbyElector[] elector = new ActiveStandbyElector[NUM_ELECTORS]; + ThreadRunner[] threadRunner = new ThreadRunner[NUM_ELECTORS]; + Thread[] thread = new Thread[NUM_ELECTORS]; + + synchronized void reportActive(int index) { + if (activeIndex == -1) { + activeIndex = index; + } else { + // standby should become active + Assert.assertEquals(standbyIndex, index); + // old active should not become active + Assert.assertFalse(activeIndex == index); + } + activeIndex = index; + } + + synchronized void reportStandby(int index) { + // only 1 standby should be reported and it should not be the same as active + Assert.assertEquals(-1, standbyIndex); + standbyIndex = index; + Assert.assertFalse(activeIndex == standbyIndex); + } + + /** + * the test creates 2 electors which try to become active using a real + * zookeeper server. It verifies that 1 becomes active and 1 becomes standby. + * Upon becoming active the leader quits election and the test verifies that + * the standby now becomes active. these electors run on different threads and + * callback to the test class to report active and standby where the outcome + * is verified + * + * @throws IOException + * @throws InterruptedException + * @throws KeeperException + */ + @Test + public void testActiveStandbyTransition() throws IOException, + InterruptedException, KeeperException { + LOG.info("starting test with parentDir:" + parentDir); + start = false; + byte[] data = new byte[8]; + // create random working directory + createClient().create(parentDir, data, Ids.OPEN_ACL_UNSAFE, + CreateMode.PERSISTENT); + + for(currentClientIndex = 0; + currentClientIndex < NUM_ELECTORS; + ++currentClientIndex) { + LOG.info("creating " + currentClientIndex); + zkClient[currentClientIndex] = createClient(); + threadRunner[currentClientIndex] = new ThreadRunner(currentClientIndex, + this); + elector[currentClientIndex] = new ActiveStandbyElectorTesterRealZK( + "hostPort", 1000, parentDir, Ids.OPEN_ACL_UNSAFE, + threadRunner[currentClientIndex]); + zkClient[currentClientIndex].register(elector[currentClientIndex]); + thread[currentClientIndex] = new Thread(threadRunner[currentClientIndex]); + thread[currentClientIndex].start(); + } + + synchronized (this) { + // signal threads to start + LOG.info("signaling threads"); + start = true; + notifyAll(); + } + + for(int i = 0; i < thread.length; i++) { + thread[i].join(); + } + } +}