HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/branches/HDFS-3042@1307596 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
04416861eb
commit
6693167028
@ -0,0 +1,8 @@
|
||||
Changes for HDFS-3042 branch.
|
||||
|
||||
This change list will be merged into the trunk CHANGES.txt when the HDFS-3-42
|
||||
branch is merged.
|
||||
------------------------------
|
||||
|
||||
HADOOP-8220. ZKFailoverController doesn't handle failure to become active correctly (todd)
|
||||
|
@ -81,9 +81,15 @@ public class ActiveStandbyElector implements StatCallback, StringCallback {
|
||||
*/
|
||||
public interface ActiveStandbyElectorCallback {
|
||||
/**
|
||||
* This method is called when the app becomes the active leader
|
||||
* This method is called when the app becomes the active leader.
|
||||
* If the service fails to become active, it should throw
|
||||
* ServiceFailedException. This will cause the elector to
|
||||
* sleep for a short period, then re-join the election.
|
||||
*
|
||||
* Callback implementations are expected to manage their own
|
||||
* timeouts (e.g. when making an RPC to a remote node).
|
||||
*/
|
||||
void becomeActive();
|
||||
void becomeActive() throws ServiceFailedException;
|
||||
|
||||
/**
|
||||
* This method is called when the app becomes a standby
|
||||
@ -135,6 +141,7 @@ public interface ActiveStandbyElectorCallback {
|
||||
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
|
||||
|
||||
private static final int NUM_RETRIES = 3;
|
||||
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
|
||||
|
||||
private static enum ConnectionState {
|
||||
DISCONNECTED, CONNECTED, TERMINATED
|
||||
@ -385,8 +392,11 @@ public synchronized void processResult(int rc, String path, Object ctx,
|
||||
Code code = Code.get(rc);
|
||||
if (isSuccess(code)) {
|
||||
// we successfully created the znode. we are the leader. start monitoring
|
||||
becomeActive();
|
||||
monitorActiveStatus();
|
||||
if (becomeActive()) {
|
||||
monitorActiveStatus();
|
||||
} else {
|
||||
reJoinElectionAfterFailureToBecomeActive();
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
@ -442,7 +452,9 @@ public synchronized void processResult(int rc, String path, Object ctx,
|
||||
// creation was retried
|
||||
if (stat.getEphemeralOwner() == zkClient.getSessionId()) {
|
||||
// we own the lock znode. so we are the leader
|
||||
becomeActive();
|
||||
if (!becomeActive()) {
|
||||
reJoinElectionAfterFailureToBecomeActive();
|
||||
}
|
||||
} else {
|
||||
// we dont own the lock znode. so we are a standby.
|
||||
becomeStandby();
|
||||
@ -479,6 +491,17 @@ public synchronized void processResult(int rc, String path, Object ctx,
|
||||
fatalError(errorMessage);
|
||||
}
|
||||
|
||||
/**
|
||||
* We failed to become active. Re-join the election, but
|
||||
* sleep for a few seconds after terminating our existing
|
||||
* session, so that other nodes have a chance to become active.
|
||||
* The failure to become active is already logged inside
|
||||
* becomeActive().
|
||||
*/
|
||||
private void reJoinElectionAfterFailureToBecomeActive() {
|
||||
reJoinElection(SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE);
|
||||
}
|
||||
|
||||
/**
|
||||
* interface implementation of Zookeeper watch events (connection and node),
|
||||
* proxied by {@link WatcherWithClientRef}.
|
||||
@ -516,7 +539,7 @@ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
|
||||
// call listener to reconnect
|
||||
LOG.info("Session expired. Entering neutral mode and rejoining...");
|
||||
enterNeutralMode();
|
||||
reJoinElection();
|
||||
reJoinElection(0);
|
||||
break;
|
||||
default:
|
||||
fatalError("Unexpected Zookeeper watch event state: "
|
||||
@ -591,7 +614,7 @@ private void joinElectionInternal() {
|
||||
createLockNodeAsync();
|
||||
}
|
||||
|
||||
private void reJoinElection() {
|
||||
private void reJoinElection(int sleepTime) {
|
||||
LOG.info("Trying to re-establish ZK session");
|
||||
|
||||
// Some of the test cases rely on expiring the ZK sessions and
|
||||
@ -604,12 +627,30 @@ private void reJoinElection() {
|
||||
sessionReestablishLockForTests.lock();
|
||||
try {
|
||||
terminateConnection();
|
||||
sleepFor(sleepTime);
|
||||
|
||||
joinElectionInternal();
|
||||
} finally {
|
||||
sessionReestablishLockForTests.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Sleep for the given number of milliseconds.
|
||||
* This is non-static, and separated out, so that unit tests
|
||||
* can override the behavior not to sleep.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
protected void sleepFor(int sleepMs) {
|
||||
if (sleepMs > 0) {
|
||||
try {
|
||||
Thread.sleep(sleepMs);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
void preventSessionReestablishmentForTests() {
|
||||
sessionReestablishLockForTests.lock();
|
||||
@ -640,11 +681,7 @@ private boolean reEstablishSession() {
|
||||
success = true;
|
||||
} catch(IOException e) {
|
||||
LOG.warn(e);
|
||||
try {
|
||||
Thread.sleep(5000);
|
||||
} catch(InterruptedException e1) {
|
||||
LOG.warn(e1);
|
||||
}
|
||||
sleepFor(5000);
|
||||
}
|
||||
++connectionRetryCount;
|
||||
}
|
||||
@ -675,20 +712,24 @@ private void reset() {
|
||||
terminateConnection();
|
||||
}
|
||||
|
||||
private void becomeActive() {
|
||||
private boolean becomeActive() {
|
||||
assert wantToBeInElection;
|
||||
if (state != State.ACTIVE) {
|
||||
try {
|
||||
Stat oldBreadcrumbStat = fenceOldActive();
|
||||
writeBreadCrumbNode(oldBreadcrumbStat);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception handling the winning of election", e);
|
||||
reJoinElection();
|
||||
return;
|
||||
}
|
||||
if (state == State.ACTIVE) {
|
||||
// already active
|
||||
return true;
|
||||
}
|
||||
try {
|
||||
Stat oldBreadcrumbStat = fenceOldActive();
|
||||
writeBreadCrumbNode(oldBreadcrumbStat);
|
||||
|
||||
LOG.debug("Becoming active");
|
||||
state = State.ACTIVE;
|
||||
appClient.becomeActive();
|
||||
state = State.ACTIVE;
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Exception handling the winning of election", e);
|
||||
// Caller will handle quitting and rejoining the election.
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -242,15 +242,20 @@ private synchronized void fatalError(String err) {
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private synchronized void becomeActive() {
|
||||
private synchronized void becomeActive() throws ServiceFailedException {
|
||||
LOG.info("Trying to make " + localTarget + " active...");
|
||||
try {
|
||||
localTarget.getProxy().transitionToActive();
|
||||
HAServiceProtocolHelper.transitionToActive(localTarget.getProxy());
|
||||
LOG.info("Successfully transitioned " + localTarget +
|
||||
" to active state");
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Couldn't make " + localTarget + " active", t);
|
||||
elector.quitElection(true);
|
||||
if (t instanceof ServiceFailedException) {
|
||||
throw (ServiceFailedException)t;
|
||||
} else {
|
||||
throw new ServiceFailedException("Couldn't transition to active",
|
||||
t);
|
||||
}
|
||||
/*
|
||||
* TODO:
|
||||
* we need to make sure that if we get fenced and then quickly restarted,
|
||||
@ -297,7 +302,7 @@ ActiveStandbyElector getElectorForTests() {
|
||||
*/
|
||||
class ElectorCallbacks implements ActiveStandbyElectorCallback {
|
||||
@Override
|
||||
public void becomeActive() {
|
||||
public void becomeActive() throws ServiceFailedException {
|
||||
ZKFailoverController.this.becomeActive();
|
||||
}
|
||||
|
||||
|
@ -19,16 +19,25 @@
|
||||
|
||||
import java.util.Arrays;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
|
||||
public abstract class ActiveStandbyElectorTestUtil {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(
|
||||
ActiveStandbyElectorTestUtil.class);
|
||||
private static final long LOG_INTERVAL_MS = 500;
|
||||
|
||||
public static void waitForActiveLockData(TestContext ctx,
|
||||
ZooKeeperServer zks, String parentDir, byte[] activeData)
|
||||
throws Exception {
|
||||
long st = System.currentTimeMillis();
|
||||
long lastPrint = st;
|
||||
while (true) {
|
||||
if (ctx != null) {
|
||||
ctx.checkException();
|
||||
@ -42,10 +51,18 @@ public static void waitForActiveLockData(TestContext ctx,
|
||||
Arrays.equals(activeData, data)) {
|
||||
return;
|
||||
}
|
||||
if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
|
||||
LOG.info("Cur data: " + StringUtils.byteToHexString(data));
|
||||
lastPrint = System.currentTimeMillis();
|
||||
}
|
||||
} catch (NoNodeException nne) {
|
||||
if (activeData == null) {
|
||||
return;
|
||||
}
|
||||
if (System.currentTimeMillis() > lastPrint + LOG_INTERVAL_MS) {
|
||||
LOG.info("Cur data: no node");
|
||||
lastPrint = System.currentTimeMillis();
|
||||
}
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
|
@ -51,6 +51,8 @@ public class TestActiveStandbyElector {
|
||||
private ActiveStandbyElectorTester elector;
|
||||
|
||||
class ActiveStandbyElectorTester extends ActiveStandbyElector {
|
||||
private int sleptFor = 0;
|
||||
|
||||
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
||||
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
|
||||
super(hostPort, timeout, parent, acl, app);
|
||||
@ -61,6 +63,14 @@ public ZooKeeper getNewZooKeeper() {
|
||||
++count;
|
||||
return mockZK;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void sleepFor(int ms) {
|
||||
// don't sleep in unit tests! Instead, just record the amount of
|
||||
// time slept
|
||||
LOG.info("Would have slept for " + ms + "ms");
|
||||
sleptFor += ms;
|
||||
}
|
||||
}
|
||||
|
||||
private static final String ZK_PARENT_NAME = "/parent/node";
|
||||
@ -146,6 +156,68 @@ public void testCreateNodeResultBecomeActive() throws Exception {
|
||||
verifyExistCall(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that, when the callback fails to enter active state,
|
||||
* the elector rejoins the election after sleeping for a short period.
|
||||
*/
|
||||
@Test
|
||||
public void testFailToBecomeActive() throws Exception {
|
||||
mockNoPriorActive();
|
||||
elector.joinElection(data);
|
||||
Assert.assertEquals(0, elector.sleptFor);
|
||||
|
||||
Mockito.doThrow(new ServiceFailedException("failed to become active"))
|
||||
.when(mockApp).becomeActive();
|
||||
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK,
|
||||
ZK_LOCK_NAME);
|
||||
// Should have tried to become active
|
||||
Mockito.verify(mockApp).becomeActive();
|
||||
|
||||
// should re-join
|
||||
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
|
||||
Assert.assertEquals(2, count);
|
||||
Assert.assertTrue(elector.sleptFor > 0);
|
||||
}
|
||||
|
||||
/**
|
||||
* Verify that, when the callback fails to enter active state, after
|
||||
* a ZK disconnect (i.e from the StatCallback), that the elector rejoins
|
||||
* the election after sleeping for a short period.
|
||||
*/
|
||||
@Test
|
||||
public void testFailToBecomeActiveAfterZKDisconnect() throws Exception {
|
||||
mockNoPriorActive();
|
||||
elector.joinElection(data);
|
||||
Assert.assertEquals(0, elector.sleptFor);
|
||||
|
||||
elector.processResult(Code.CONNECTIONLOSS.intValue(), ZK_LOCK_NAME, mockZK,
|
||||
ZK_LOCK_NAME);
|
||||
Mockito.verify(mockZK, Mockito.times(2)).create(ZK_LOCK_NAME, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
|
||||
|
||||
elector.processResult(Code.NODEEXISTS.intValue(), ZK_LOCK_NAME, mockZK,
|
||||
ZK_LOCK_NAME);
|
||||
verifyExistCall(1);
|
||||
|
||||
Stat stat = new Stat();
|
||||
stat.setEphemeralOwner(1L);
|
||||
Mockito.when(mockZK.getSessionId()).thenReturn(1L);
|
||||
|
||||
// Fake failure to become active from within the stat callback
|
||||
Mockito.doThrow(new ServiceFailedException("fail to become active"))
|
||||
.when(mockApp).becomeActive();
|
||||
elector.processResult(Code.OK.intValue(), ZK_LOCK_NAME, mockZK, stat);
|
||||
Mockito.verify(mockApp, Mockito.times(1)).becomeActive();
|
||||
|
||||
// should re-join
|
||||
Mockito.verify(mockZK, Mockito.times(3)).create(ZK_LOCK_NAME, data,
|
||||
Ids.OPEN_ACL_UNSAFE, CreateMode.EPHEMERAL, elector, mockZK);
|
||||
Assert.assertEquals(2, count);
|
||||
Assert.assertTrue(elector.sleptFor > 0);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Verify that, if there is a record of a prior active node, the
|
||||
* elector asks the application to fence it before becoming active.
|
||||
|
@ -273,7 +273,8 @@ public void testBecomingActiveFails() throws Exception {
|
||||
waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
|
||||
waitForActiveLockHolder(null);
|
||||
|
||||
Mockito.verify(svc2.proxy).transitionToActive();
|
||||
Mockito.verify(svc2.proxy, Mockito.timeout(2000).atLeastOnce())
|
||||
.transitionToActive();
|
||||
|
||||
waitForHAState(svc1, HAServiceState.STANDBY);
|
||||
waitForHAState(svc2, HAServiceState.STANDBY);
|
||||
@ -283,6 +284,12 @@ public void testBecomingActiveFails() throws Exception {
|
||||
waitForHAState(svc1, HAServiceState.ACTIVE);
|
||||
waitForHAState(svc2, HAServiceState.STANDBY);
|
||||
waitForActiveLockHolder(svc1);
|
||||
|
||||
// Ensure that we can fail back to thr2 once it it is able
|
||||
// to become active (e.g the admin has restarted it)
|
||||
LOG.info("Allowing svc2 to become active, expiring svc1");
|
||||
svc2.failToBecomeActive = false;
|
||||
expireAndVerifyFailover(thr1, thr2);
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user