diff --git a/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt new file mode 100644 index 0000000000..a6ff9074e4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/CHANGES.HDFS-3042.txt @@ -0,0 +1,8 @@ +Changes for HDFS-3042 branch. + +This change list will be merged into the trunk CHANGES.txt when the HDFS-3-42 +branch is merged. +------------------------------ + +HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd) + diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java index e23260ecf9..46023c5c3a 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ActiveStandbyElector.java @@ -81,9 +81,15 @@ public class ActiveStandbyElector implements StatCallback, StringCallback { */ public interface ActiveStandbyElectorCallback { /** - * This method is called when the app becomes the active leader + * This method is called when the app becomes the active leader. + * If the service fails to become active, it should throw + * ServiceFailedException. This will cause the elector to + * sleep for a short period, then re-join the election. + * + * Callback implementations are expected to manage their own + * timeouts (e.g. when making an RPC to a remote node). */ - void becomeActive(); + void becomeActive() throws ServiceFailedException; /** * This method is called when the app becomes a standby @@ -135,6 +141,7 @@ public interface ActiveStandbyElectorCallback { public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class); private static final int NUM_RETRIES = 3; + private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000; private static enum ConnectionState { DISCONNECTED, CONNECTED, TERMINATED @@ -385,8 +392,11 @@ public synchronized void processResult(int rc, String path, Object ctx, Code code = Code.get(rc); if (isSuccess(code)) { // we successfully created the znode. we are the leader. start monitoring - becomeActive(); - monitorActiveStatus(); + if (becomeActive()) { + monitorActiveStatus(); + } else { + reJoinElectionAfterFailureToBecomeActive(); + } return; } @@ -442,7 +452,9 @@ public synchronized void processResult(int rc, String path, Object ctx, // creation was retried if (stat.getEphemeralOwner() == zkClient.getSessionId()) { // we own the lock znode. so we are the leader - becomeActive(); + if (!becomeActive()) { + reJoinElectionAfterFailureToBecomeActive(); + } } else { // we dont own the lock znode. so we are a standby. becomeStandby(); @@ -479,6 +491,17 @@ public synchronized void processResult(int rc, String path, Object ctx, fatalError(errorMessage); } + /** + * We failed to become active. Re-join the election, but + * sleep for a few seconds after terminating our existing + * session, so that other nodes have a chance to become active. + * The failure to become active is already logged inside + * becomeActive(). + */ + private void reJoinElectionAfterFailureToBecomeActive() { + reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE); + } + /** * interface implementation of Zookeeper watch events (connection and node), * proxied by {@link WatcherWithClientRef}. @@ -516,7 +539,7 @@ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) { // call listener to reconnect LOG.info("Session expired. Entering neutral mode and rejoining..."); enterNeutralMode(); - reJoinElection(); + reJoinElection(0); break; default: fatalError("Unexpected Zookeeper watch event state: " @@ -591,7 +614,7 @@ private void joinElectionInternal() { createLockNodeAsync(); } - private void reJoinElection() { + private void reJoinElection(int sleepTime) { LOG.info("Trying to re-establish ZK session"); // Some of the test cases rely on expiring the ZK sessions and @@ -604,12 +627,30 @@ private void reJoinElection() { sessionReestablishLockForTests.lock(); try { terminateConnection(); + sleepFor(sleepTime); + joinElectionInternal(); } finally { sessionReestablishLockForTests.unlock(); } } - + + /** + * Sleep for the given number of milliseconds. + * This is non-static, and separated out, so that unit tests + * can override the behavior not to sleep. + */ + @VisibleForTesting + protected void sleepFor(int sleepMs) { + if (sleepMs > 0) { + try { + Thread.sleep(sleepMs); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + } + @VisibleForTesting void preventSessionReestablishmentForTests() { sessionReestablishLockForTests.lock(); @@ -640,11 +681,7 @@ private boolean reEstablishSession() { success = true; } catch(IOException e) { LOG.warn(e); - try { - Thread.sleep(5000); - } catch(InterruptedException e1) { - LOG.warn(e1); - } + sleepFor(5000); } ++connectionRetryCount; } @@ -675,20 +712,24 @@ private void reset() { terminateConnection(); } - private void becomeActive() { + private boolean becomeActive() { assert wantToBeInElection; - if (state != State.ACTIVE) { - try { - Stat oldBreadcrumbStat = fenceOldActive(); - writeBreadCrumbNode(oldBreadcrumbStat); - } catch (Exception e) { - LOG.warn("Exception handling the winning of election", e); - reJoinElection(); - return; - } + if (state == State.ACTIVE) { + // already active + return true; + } + try { + Stat oldBreadcrumbStat = fenceOldActive(); + writeBreadCrumbNode(oldBreadcrumbStat); + LOG.debug("Becoming active"); - state = State.ACTIVE; appClient.becomeActive(); + state = State.ACTIVE; + return true; + } catch (Exception e) { + LOG.warn("Exception handling the winning of election", e); + // Caller will handle quitting and rejoining the election. + return false; } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 565c93b545..98c3f0d5c1 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -242,15 +242,20 @@ private synchronized void fatalError(String err) { notifyAll(); } - private synchronized void becomeActive() { + private synchronized void becomeActive() throws ServiceFailedException { LOG.info("Trying to make " + localTarget + " active..."); try { - localTarget.getProxy().transitionToActive(); + HAServiceProtocolHelper.transitionToActive(localTarget.getProxy()); LOG.info("Successfully transitioned " + localTarget + " to active state"); } catch (Throwable t) { LOG.fatal("Couldn't make " + localTarget + " active", t); - elector.quitElection(true); + if (t instanceof ServiceFailedException) { + throw (ServiceFailedException)t; + } else { + throw new ServiceFailedException("Couldn't transition to active", + t); + } /* * TODO: * we need to make sure that if we get fenced and then quickly restarted, @@ -297,7 +302,7 @@ ActiveStandbyElector getElectorForTests() { */ class ElectorCallbacks implements ActiveStandbyElectorCallback { @Override - public void becomeActive() { + public void becomeActive() throws ServiceFailedException { ZKFailoverController.this.becomeActive(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java index dc87ebdd86..bd9b40aad7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/ActiveStandbyElectorTestUtil.java @@ -19,16 +19,25 @@ import java.util.Arrays; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.test.MultithreadedTestUtil.TestContext; +import org.apache.hadoop.util.StringUtils; import org.apache.zookeeper.KeeperException.NoNodeException; import org.apache.zookeeper.data.Stat; import org.apache.zookeeper.server.ZooKeeperServer; public abstract class ActiveStandbyElectorTestUtil { + + private static final Log LOG = LogFactory.getLog( + ActiveStandbyElectorTestUtil.class); + private static final long LOG_INTERVAL_MS = 500; public static void waitForActiveLockData(TestContext ctx, ZooKeeperServer zks, String parentDir, byte[] activeData) throws Exception { + long st = System.currentTimeMillis(); + long lastPrint = st; while (true) { if (ctx != null) { ctx.checkException(); @@ -42,10 +51,18 @@ public static void waitForActiveLockData(TestContext ctx, Arrays.equals(activeData, data)) { return; } + if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) { + LOG.info("Cur data: " + StringUtils.byteToHexString(data)); + lastPrint = System.currentTimeMillis(); + } } catch (NoNodeException nne) { if (activeData == null) { return; } + if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) { + LOG.info("Cur data: no node"); + lastPrint = System.currentTimeMillis(); + } } Thread.sleep(50); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java index b9786aea36..b1b7ec34bf 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestActiveStandbyElector.java @@ -51,6 +51,8 @@ public class TestActiveStandbyElector { private ActiveStandbyElectorTester elector; class ActiveStandbyElectorTester extends ActiveStandbyElector { + private int sleptFor = 0; + ActiveStandbyElectorTester(String hostPort, int timeout, String parent, List acl, ActiveStandbyElectorCallback app) throws IOException { super(hostPort, timeout, parent, acl, app); @@ -61,6 +63,14 @@ public ZooKeeper getNewZooKeeper() { ++count; return mockZK; } + + @Override + protected void sleepFor(int ms) { + // don't sleep in unit tests! Instead, just record the amount of + // time slept + LOG.info("Would have slept for " + ms + "ms"); + sleptFor += ms; + } } private static final String ZK_PARENT_NAME = "/parent/node"; @@ -146,6 +156,68 @@ public void testCreateNodeResultBecomeActive() throws Exception { verifyExistCall(1); } + /** + * Verify that, when the callback fails to enter active state, + * the elector rejoins the election after sleeping for a short period. + */ + @Test + public void testFailToBecomeActive() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); + Assert.assertEquals(0, elector.sleptFor); + + Mockito.doThrow(new ServiceFailedException("failed to become active")) + .when(mockApp).becomeActive(); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + // Should have tried to become active + Mockito.verify(mockApp).becomeActive(); + + // should re-join + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + Assert.assertEquals(2, count); + Assert.assertTrue(elector.sleptFor > 0); + } + + /** + * Verify that, when the callback fails to enter active state, after + * a ZK disconnect (i.e from the StatCallback), that the elector rejoins + * the election after sleeping for a short period. + */ + @Test + public void testFailToBecomeActiveAfterZKDisconnect() throws Exception { + mockNoPriorActive(); + elector.joinElection(data); + Assert.assertEquals(0, elector.sleptFor); + + elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + + elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK, + ZK_LOCK_NAME); + verifyExistCall(1); + + Stat stat = new Stat(); + stat.setEphemeralOwner(1L); + Mockito.when(mockZK.getSessionId()).thenReturn(1L); + + // Fake failure to become active from within the stat callback + Mockito.doThrow(new ServiceFailedException("fail to become active")) + .when(mockApp).becomeActive(); + elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat); + Mockito.verify(mockApp, Mockito.times(1)).becomeActive(); + + // should re-join + Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data, + Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK); + Assert.assertEquals(2, count); + Assert.assertTrue(elector.sleptFor > 0); + } + + /** * Verify that, if there is a record of a prior active node, the * elector asks the application to fence it before becoming active. diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java index 93f46a533e..d90b8d0e46 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -273,7 +273,8 @@ public void testBecomingActiveFails() throws Exception { waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY); waitForActiveLockHolder(null); - Mockito.verify(svc2.proxy).transitionToActive(); + Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce()) + .transitionToActive(); waitForHAState(svc1, HAServiceState.STANDBY); waitForHAState(svc2, HAServiceState.STANDBY); @@ -283,6 +284,12 @@ public void testBecomingActiveFails() throws Exception { waitForHAState(svc1, HAServiceState.ACTIVE); waitForHAState(svc2, HAServiceState.STANDBY); waitForActiveLockHolder(svc1); + + // Ensure that we can fail back to thr2 once it it is able + // to become active (e.g the admin has restarted it) + LOG.info("Allowing svc2 to become active, expiring svc1"); + svc2.failToBecomeActive = false; + expireAndVerifyFailover(thr1, thr2); } finally { stopFCs(); }