HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is unstable. Contributed by Vinayakumar B.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589494 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
ee8a152520
commit
6eba48cbde
@ -401,6 +401,9 @@ Release 2.5.0 - UNRELEASED
|
|||||||
HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh
|
HADOOP-10526. Chance for Stream leakage in CompressorStream. (Rushabh
|
||||||
Shah via kihwal)
|
Shah via kihwal)
|
||||||
|
|
||||||
|
HADOOP-10251. Both NameNodes could be in STANDBY State if SNN network is unstable
|
||||||
|
(Vinayakumar B via umamahesh)
|
||||||
|
|
||||||
Release 2.4.1 - UNRELEASED
|
Release 2.4.1 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -74,6 +74,9 @@ public class HealthMonitor {
|
|||||||
private List<Callback> callbacks = Collections.synchronizedList(
|
private List<Callback> callbacks = Collections.synchronizedList(
|
||||||
new LinkedList<Callback>());
|
new LinkedList<Callback>());
|
||||||
|
|
||||||
|
private List<ServiceStateCallback> serviceStateCallbacks = Collections
|
||||||
|
.synchronizedList(new LinkedList<ServiceStateCallback>());
|
||||||
|
|
||||||
private HAServiceStatus lastServiceState = new HAServiceStatus(
|
private HAServiceStatus lastServiceState = new HAServiceStatus(
|
||||||
HAServiceState.INITIALIZING);
|
HAServiceState.INITIALIZING);
|
||||||
|
|
||||||
@ -134,7 +137,15 @@ public void addCallback(Callback cb) {
|
|||||||
public void removeCallback(Callback cb) {
|
public void removeCallback(Callback cb) {
|
||||||
callbacks.remove(cb);
|
callbacks.remove(cb);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public synchronized void addServiceStateCallback(ServiceStateCallback cb) {
|
||||||
|
this.serviceStateCallbacks.add(cb);
|
||||||
|
}
|
||||||
|
|
||||||
|
public synchronized void removeServiceStateCallback(ServiceStateCallback cb) {
|
||||||
|
serviceStateCallbacks.remove(cb);
|
||||||
|
}
|
||||||
|
|
||||||
public void shutdown() {
|
public void shutdown() {
|
||||||
LOG.info("Stopping HealthMonitor thread");
|
LOG.info("Stopping HealthMonitor thread");
|
||||||
shouldRun = false;
|
shouldRun = false;
|
||||||
@ -217,6 +228,9 @@ private void doHealthChecks() throws InterruptedException {
|
|||||||
|
|
||||||
private synchronized void setLastServiceStatus(HAServiceStatus status) {
|
private synchronized void setLastServiceStatus(HAServiceStatus status) {
|
||||||
this.lastServiceState = status;
|
this.lastServiceState = status;
|
||||||
|
for (ServiceStateCallback cb : serviceStateCallbacks) {
|
||||||
|
cb.reportServiceStatus(lastServiceState);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private synchronized void enterState(State newState) {
|
private synchronized void enterState(State newState) {
|
||||||
@ -293,4 +307,11 @@ public void run() {
|
|||||||
static interface Callback {
|
static interface Callback {
|
||||||
void enteredState(State newState);
|
void enteredState(State newState);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callback interface for service states.
|
||||||
|
*/
|
||||||
|
static interface ServiceStateCallback {
|
||||||
|
void reportServiceStatus(HAServiceStatus status);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -34,6 +34,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
import org.apache.hadoop.ha.HAServiceProtocol.RequestSource;
|
||||||
import org.apache.hadoop.util.ZKUtil;
|
import org.apache.hadoop.util.ZKUtil;
|
||||||
@ -105,6 +106,8 @@ public abstract class ZKFailoverController {
|
|||||||
|
|
||||||
private State lastHealthState = State.INITIALIZING;
|
private State lastHealthState = State.INITIALIZING;
|
||||||
|
|
||||||
|
private volatile HAServiceState serviceState = HAServiceState.INITIALIZING;
|
||||||
|
|
||||||
/** Set if a fatal error occurs */
|
/** Set if a fatal error occurs */
|
||||||
private String fatalError = null;
|
private String fatalError = null;
|
||||||
|
|
||||||
@ -294,6 +297,7 @@ private boolean confirmFormat() {
|
|||||||
private void initHM() {
|
private void initHM() {
|
||||||
healthMonitor = new HealthMonitor(conf, localTarget);
|
healthMonitor = new HealthMonitor(conf, localTarget);
|
||||||
healthMonitor.addCallback(new HealthCallbacks());
|
healthMonitor.addCallback(new HealthCallbacks());
|
||||||
|
healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
|
||||||
healthMonitor.start();
|
healthMonitor.start();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -376,6 +380,7 @@ private synchronized void becomeActive() throws ServiceFailedException {
|
|||||||
String msg = "Successfully transitioned " + localTarget +
|
String msg = "Successfully transitioned " + localTarget +
|
||||||
" to active state";
|
" to active state";
|
||||||
LOG.info(msg);
|
LOG.info(msg);
|
||||||
|
serviceState = HAServiceState.ACTIVE;
|
||||||
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
|
recordActiveAttempt(new ActiveAttemptRecord(true, msg));
|
||||||
|
|
||||||
} catch (Throwable t) {
|
} catch (Throwable t) {
|
||||||
@ -484,6 +489,7 @@ private synchronized void becomeStandby() {
|
|||||||
// TODO handle this. It's a likely case since we probably got fenced
|
// TODO handle this. It's a likely case since we probably got fenced
|
||||||
// at the same time.
|
// at the same time.
|
||||||
}
|
}
|
||||||
|
serviceState = HAServiceState.STANDBY;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
@ -574,6 +580,7 @@ private void doCedeActive(int millisToCede)
|
|||||||
delayJoiningUntilNanotime = System.nanoTime() +
|
delayJoiningUntilNanotime = System.nanoTime() +
|
||||||
TimeUnit.MILLISECONDS.toNanos(millisToCede);
|
TimeUnit.MILLISECONDS.toNanos(millisToCede);
|
||||||
elector.quitElection(needFence);
|
elector.quitElection(needFence);
|
||||||
|
serviceState = HAServiceState.INITIALIZING;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
recheckElectability();
|
recheckElectability();
|
||||||
@ -739,12 +746,16 @@ private void recheckElectability() {
|
|||||||
switch (lastHealthState) {
|
switch (lastHealthState) {
|
||||||
case SERVICE_HEALTHY:
|
case SERVICE_HEALTHY:
|
||||||
elector.joinElection(targetToData(localTarget));
|
elector.joinElection(targetToData(localTarget));
|
||||||
|
if (quitElectionOnBadState) {
|
||||||
|
quitElectionOnBadState = false;
|
||||||
|
}
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case INITIALIZING:
|
case INITIALIZING:
|
||||||
LOG.info("Ensuring that " + localTarget + " does not " +
|
LOG.info("Ensuring that " + localTarget + " does not " +
|
||||||
"participate in active master election");
|
"participate in active master election");
|
||||||
elector.quitElection(false);
|
elector.quitElection(false);
|
||||||
|
serviceState = HAServiceState.INITIALIZING;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case SERVICE_UNHEALTHY:
|
case SERVICE_UNHEALTHY:
|
||||||
@ -752,6 +763,7 @@ private void recheckElectability() {
|
|||||||
LOG.info("Quitting master election for " + localTarget +
|
LOG.info("Quitting master election for " + localTarget +
|
||||||
" and marking that fencing is necessary");
|
" and marking that fencing is necessary");
|
||||||
elector.quitElection(true);
|
elector.quitElection(true);
|
||||||
|
serviceState = HAServiceState.INITIALIZING;
|
||||||
break;
|
break;
|
||||||
|
|
||||||
case HEALTH_MONITOR_FAILED:
|
case HEALTH_MONITOR_FAILED:
|
||||||
@ -784,6 +796,44 @@ public void run() {
|
|||||||
whenNanos, TimeUnit.NANOSECONDS);
|
whenNanos, TimeUnit.NANOSECONDS);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
int serviceStateMismatchCount = 0;
|
||||||
|
boolean quitElectionOnBadState = false;
|
||||||
|
|
||||||
|
void verifyChangedServiceState(HAServiceState changedState) {
|
||||||
|
synchronized (elector) {
|
||||||
|
synchronized (this) {
|
||||||
|
if (serviceState == HAServiceState.INITIALIZING) {
|
||||||
|
if (quitElectionOnBadState) {
|
||||||
|
LOG.debug("rechecking for electability from bad state");
|
||||||
|
recheckElectability();
|
||||||
|
}
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (changedState == serviceState) {
|
||||||
|
serviceStateMismatchCount = 0;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (serviceStateMismatchCount == 0) {
|
||||||
|
// recheck one more time. As this might be due to parallel transition.
|
||||||
|
serviceStateMismatchCount++;
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
// quit the election as the expected state and reported state
|
||||||
|
// mismatches.
|
||||||
|
LOG.error("Local service " + localTarget
|
||||||
|
+ " has changed the serviceState to " + changedState
|
||||||
|
+ ". Expected was " + serviceState
|
||||||
|
+ ". Quitting election marking fencing necessary.");
|
||||||
|
delayJoiningUntilNanotime = System.nanoTime()
|
||||||
|
+ TimeUnit.MILLISECONDS.toNanos(1000);
|
||||||
|
elector.quitElection(true);
|
||||||
|
quitElectionOnBadState = true;
|
||||||
|
serviceStateMismatchCount = 0;
|
||||||
|
serviceState = HAServiceState.INITIALIZING;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* @return the last health state passed to the FC
|
* @return the last health state passed to the FC
|
||||||
* by the HealthMonitor.
|
* by the HealthMonitor.
|
||||||
@ -855,7 +905,17 @@ public void enteredState(HealthMonitor.State newState) {
|
|||||||
recheckElectability();
|
recheckElectability();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Callbacks for HAServiceStatus
|
||||||
|
*/
|
||||||
|
class ServiceStateCallBacks implements HealthMonitor.ServiceStateCallback {
|
||||||
|
@Override
|
||||||
|
public void reportServiceStatus(HAServiceStatus status) {
|
||||||
|
verifyChangedServiceState(status.getState());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
private static class ActiveAttemptRecord {
|
private static class ActiveAttemptRecord {
|
||||||
private final boolean succeeded;
|
private final boolean succeeded;
|
||||||
private final String status;
|
private final String status;
|
||||||
|
@ -18,7 +18,6 @@
|
|||||||
package org.apache.hadoop.ha;
|
package org.apache.hadoop.ha;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.*;
|
||||||
import static org.junit.Assume.assumeTrue;
|
|
||||||
|
|
||||||
import java.security.NoSuchAlgorithmException;
|
import java.security.NoSuchAlgorithmException;
|
||||||
|
|
||||||
@ -29,7 +28,6 @@
|
|||||||
import org.apache.hadoop.ha.HealthMonitor.State;
|
import org.apache.hadoop.ha.HealthMonitor.State;
|
||||||
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
|
import org.apache.hadoop.ha.MiniZKFCCluster.DummyZKFC;
|
||||||
import org.apache.hadoop.test.GenericTestUtils;
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
import org.apache.hadoop.util.Shell;
|
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.apache.log4j.Level;
|
import org.apache.log4j.Level;
|
||||||
import org.apache.zookeeper.KeeperException;
|
import org.apache.zookeeper.KeeperException;
|
||||||
@ -68,8 +66,6 @@ public class TestZKFailoverController extends ClientBaseWithFixes {
|
|||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setupConfAndServices() {
|
public void setupConfAndServices() {
|
||||||
// skip tests on Windows until after resolution of ZooKeeper client bug
|
|
||||||
assumeTrue(!Shell.WINDOWS);
|
|
||||||
conf = new Configuration();
|
conf = new Configuration();
|
||||||
conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
|
conf.set(ZKFailoverController.ZK_ACL_KEY, TEST_ACL);
|
||||||
conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
|
conf.set(ZKFailoverController.ZK_AUTH_KEY, TEST_AUTH_GOOD);
|
||||||
@ -232,6 +228,27 @@ public void testAutoFailoverOnBadHealth() throws Exception {
|
|||||||
cluster.stop();
|
cluster.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test that, when the health monitor indicates bad health status,
|
||||||
|
* failover is triggered. Also ensures that graceful active->standby
|
||||||
|
* transition is used when possible, falling back to fencing when
|
||||||
|
* the graceful approach fails.
|
||||||
|
*/
|
||||||
|
@Test(timeout=15000)
|
||||||
|
public void testAutoFailoverOnBadState() throws Exception {
|
||||||
|
try {
|
||||||
|
cluster.start();
|
||||||
|
DummyHAService svc0 = cluster.getService(0);
|
||||||
|
LOG.info("Faking svc0 to change the state, should failover to svc1");
|
||||||
|
svc0.state = HAServiceState.STANDBY;
|
||||||
|
|
||||||
|
// Should fail back to svc0 at this point
|
||||||
|
cluster.waitForHAState(1, HAServiceState.ACTIVE);
|
||||||
|
} finally {
|
||||||
|
cluster.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Test(timeout=15000)
|
@Test(timeout=15000)
|
||||||
public void testAutoFailoverOnLostZKSession() throws Exception {
|
public void testAutoFailoverOnLostZKSession() throws Exception {
|
||||||
|
Loading…
Reference in New Issue
Block a user