diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index 8e9e6f5205..5362a70d71 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -401,6 +401,9 @@ Release 2.5.0 - UNRELEASED HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh Shah via kihwal) + HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is unstable + (Vinayakumar B via umamahesh) + Release 2.4.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java index a349626ece..0d14444250 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/HealthMonitor.java @@ -74,6 +74,9 @@ public class HealthMonitor { private List callbacks = Collections.synchronizedList( new LinkedList()); + private List serviceStateCallbacks = Collections + .synchronizedList(new LinkedList()); + private HAServiceStatus lastServiceState = new HAServiceStatus( HAServiceState.INITIALIZING); @@ -134,7 +137,15 @@ public void addCallback(Callback cb) { public void removeCallback(Callback cb) { callbacks.remove(cb); } - + + public synchronized void addServiceStateCallback(ServiceStateCallback cb) { + this.serviceStateCallbacks.add(cb); + } + + public synchronized void removeServiceStateCallback(ServiceStateCallback cb) { + serviceStateCallbacks.remove(cb); + } + public void shutdown() { LOG.info("Stopping HealthMonitor thread"); shouldRun = false; @@ -217,6 +228,9 @@ private void doHealthChecks() throws InterruptedException { private synchronized void setLastServiceStatus(HAServiceStatus status) { this.lastServiceState = status; + for (ServiceStateCallback cb : serviceStateCallbacks) { + cb.reportServiceStatus(lastServiceState); + } } private synchronized void enterState(State newState) { @@ -293,4 +307,11 @@ public void run() { static interface Callback { void enteredState(State newState); } + + /** + * Callback interface for service states. + */ + static interface ServiceStateCallback { + void reportServiceStatus(HAServiceStatus status); + } } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java index 3c72c4720f..dd8ca8a6bd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ha/ZKFailoverController.java @@ -34,6 +34,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException; import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback; +import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState; import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo; import org.apache.hadoop.ha.HAServiceProtocol.RequestSource; import org.apache.hadoop.util.ZKUtil; @@ -105,6 +106,8 @@ public abstract class ZKFailoverController { private State lastHealthState = State.INITIALIZING; + private volatile HAServiceState serviceState = HAServiceState.INITIALIZING; + /** Set if a fatal error occurs */ private String fatalError = null; @@ -294,6 +297,7 @@ private boolean confirmFormat() { private void initHM() { healthMonitor = new HealthMonitor(conf, localTarget); healthMonitor.addCallback(new HealthCallbacks()); + healthMonitor.addServiceStateCallback(new ServiceStateCallBacks()); healthMonitor.start(); } @@ -376,6 +380,7 @@ private synchronized void becomeActive() throws ServiceFailedException { String msg = "Successfully transitioned " + localTarget + " to active state"; LOG.info(msg); + serviceState = HAServiceState.ACTIVE; recordActiveAttempt(new ActiveAttemptRecord(true, msg)); } catch (Throwable t) { @@ -484,6 +489,7 @@ private synchronized void becomeStandby() { // TODO handle this. It's a likely case since we probably got fenced // at the same time. } + serviceState = HAServiceState.STANDBY; } @@ -574,6 +580,7 @@ private void doCedeActive(int millisToCede) delayJoiningUntilNanotime = System.nanoTime() + TimeUnit.MILLISECONDS.toNanos(millisToCede); elector.quitElection(needFence); + serviceState = HAServiceState.INITIALIZING; } } recheckElectability(); @@ -739,12 +746,16 @@ private void recheckElectability() { switch (lastHealthState) { case SERVICE_HEALTHY: elector.joinElection(targetToData(localTarget)); + if (quitElectionOnBadState) { + quitElectionOnBadState = false; + } break; case INITIALIZING: LOG.info("Ensuring that " + localTarget + " does not " + "participate in active master election"); elector.quitElection(false); + serviceState = HAServiceState.INITIALIZING; break; case SERVICE_UNHEALTHY: @@ -752,6 +763,7 @@ private void recheckElectability() { LOG.info("Quitting master election for " + localTarget + " and marking that fencing is necessary"); elector.quitElection(true); + serviceState = HAServiceState.INITIALIZING; break; case HEALTH_MONITOR_FAILED: @@ -784,6 +796,44 @@ public void run() { whenNanos, TimeUnit.NANOSECONDS); } + int serviceStateMismatchCount = 0; + boolean quitElectionOnBadState = false; + + void verifyChangedServiceState(HAServiceState changedState) { + synchronized (elector) { + synchronized (this) { + if (serviceState == HAServiceState.INITIALIZING) { + if (quitElectionOnBadState) { + LOG.debug("rechecking for electability from bad state"); + recheckElectability(); + } + return; + } + if (changedState == serviceState) { + serviceStateMismatchCount = 0; + return; + } + if (serviceStateMismatchCount == 0) { + // recheck one more time. As this might be due to parallel transition. + serviceStateMismatchCount++; + return; + } + // quit the election as the expected state and reported state + // mismatches. + LOG.error("Local service " + localTarget + + " has changed the serviceState to " + changedState + + ". Expected was " + serviceState + + ". Quitting election marking fencing necessary."); + delayJoiningUntilNanotime = System.nanoTime() + + TimeUnit.MILLISECONDS.toNanos(1000); + elector.quitElection(true); + quitElectionOnBadState = true; + serviceStateMismatchCount = 0; + serviceState = HAServiceState.INITIALIZING; + } + } + } + /** * @return the last health state passed to the FC * by the HealthMonitor. @@ -855,7 +905,17 @@ public void enteredState(HealthMonitor.State newState) { recheckElectability(); } } - + + /** + * Callbacks for HAServiceStatus + */ + class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback { + @Override + public void reportServiceStatus(HAServiceStatus status) { + verifyChangedServiceState(status.getState()); + } + } + private static class ActiveAttemptRecord { private final boolean succeeded; private final String status; diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java index 5b722e1cc5..83a29dd11d 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ha/TestZKFailoverController.java @@ -18,7 +18,6 @@ package org.apache.hadoop.ha; import static org.junit.Assert.*; -import static org.junit.Assume.assumeTrue; import java.security.NoSuchAlgorithmException; @@ -29,7 +28,6 @@ import org.apache.hadoop.ha.HealthMonitor.State; import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC; import org.apache.hadoop.test.GenericTestUtils; -import org.apache.hadoop.util.Shell; import org.apache.hadoop.util.Time; import org.apache.log4j.Level; import org.apache.zookeeper.KeeperException; @@ -68,8 +66,6 @@ public class TestZKFailoverController extends ClientBaseWithFixes { @Before public void setupConfAndServices() { - // skip tests on Windows until after resolution of ZooKeeper client bug - assumeTrue(!Shell.WINDOWS); conf = new Configuration(); conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL); conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD); @@ -232,6 +228,27 @@ public void testAutoFailoverOnBadHealth() throws Exception { cluster.stop(); } } + + /** + * Test that, when the health monitor indicates bad health status, + * failover is triggered. Also ensures that graceful active->standby + * transition is used when possible, falling back to fencing when + * the graceful approach fails. + */ + @Test(timeout=15000) + public void testAutoFailoverOnBadState() throws Exception { + try { + cluster.start(); + DummyHAService svc0 = cluster.getService(0); + LOG.info("Faking svc0 to change the state, should failover to svc1"); + svc0.state = HAServiceState.STANDBY; + + // Should fail back to svc0 at this point + cluster.waitForHAState(1, HAServiceState.ACTIVE); + } finally { + cluster.stop(); + } + } @Test(timeout=15000) public void testAutoFailoverOnLostZKSession() throws Exception {