HADOOP-8206. Common portion of a ZK-based failover controller. Contributed by Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1305673 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
525ceb99ad
commit
578f413778
@ -138,6 +138,8 @@ Release 0.23.3 - UNRELEASED
|
||||
HADOOP-7030. Add TableMapping topology implementation to read host to rack
|
||||
mapping from a file. (Patrick Angeles and tomwhite via tomwhite)
|
||||
|
||||
HADOOP-8206. Common portion of a ZK-based failover controller (todd)
|
||||
|
||||
IMPROVEMENTS
|
||||
|
||||
HADOOP-7524. Change RPC to allow multiple protocols including multuple
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher.Event;
|
||||
import org.apache.zookeeper.ZKUtil;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.AsyncCallback.*;
|
||||
@ -135,11 +136,11 @@ public interface ActiveStandbyElectorCallback {
|
||||
|
||||
private static final int NUM_RETRIES = 3;
|
||||
|
||||
private enum ConnectionState {
|
||||
private static enum ConnectionState {
|
||||
DISCONNECTED, CONNECTED, TERMINATED
|
||||
};
|
||||
|
||||
private enum State {
|
||||
static enum State {
|
||||
INIT, ACTIVE, STANDBY, NEUTRAL
|
||||
};
|
||||
|
||||
@ -282,6 +283,32 @@ public synchronized void ensureParentZNode()
|
||||
|
||||
LOG.info("Successfully created " + znodeWorkingDir + " in ZK.");
|
||||
}
|
||||
|
||||
/**
|
||||
* Clear all of the state held within the parent ZNode.
|
||||
* This recursively deletes everything within the znode as well as the
|
||||
* parent znode itself. It should only be used when it's certain that
|
||||
* no electors are currently participating in the election.
|
||||
*/
|
||||
public synchronized void clearParentZNode()
|
||||
throws IOException, InterruptedException {
|
||||
try {
|
||||
LOG.info("Recursively deleting " + znodeWorkingDir + " from ZK...");
|
||||
|
||||
zkDoWithRetries(new ZKAction<Void>() {
|
||||
@Override
|
||||
public Void run() throws KeeperException, InterruptedException {
|
||||
ZKUtil.deleteRecursive(zkClient, znodeWorkingDir);
|
||||
return null;
|
||||
}
|
||||
});
|
||||
} catch (KeeperException e) {
|
||||
throw new IOException("Couldn't clear parent znode " + znodeWorkingDir,
|
||||
e);
|
||||
}
|
||||
LOG.info("Successfully deleted " + znodeWorkingDir + " from ZK.");
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Any service instance can drop out of the election by calling quitElection.
|
||||
@ -592,6 +619,11 @@ void allowSessionReestablishmentForTests() {
|
||||
long getZKSessionIdForTests() {
|
||||
return zkClient.getSessionId();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
synchronized State getStateForTests() {
|
||||
return state;
|
||||
}
|
||||
|
||||
private boolean reEstablishSession() {
|
||||
int connectionRetryCount = 0;
|
||||
|
@ -24,7 +24,9 @@
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ipc.RPC;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@ -40,6 +42,8 @@ public class FailoverController {
|
||||
|
||||
private static final Log LOG = LogFactory.getLog(FailoverController.class);
|
||||
|
||||
private static final int GRACEFUL_FENCE_TIMEOUT = 5000;
|
||||
|
||||
/**
|
||||
* Perform pre-failover checks on the given service we plan to
|
||||
* failover to, eg to prevent failing over to a service (eg due
|
||||
@ -96,7 +100,35 @@ private static void preFailoverChecks(HAServiceTarget target,
|
||||
"Got an IO exception", e);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
|
||||
/**
|
||||
* Try to get the HA state of the node at the given address. This
|
||||
* function is guaranteed to be "quick" -- ie it has a short timeout
|
||||
* and no retries. Its only purpose is to avoid fencing a node that
|
||||
* has already restarted.
|
||||
*/
|
||||
static boolean tryGracefulFence(Configuration conf,
|
||||
HAServiceTarget svc) {
|
||||
HAServiceProtocol proxy = null;
|
||||
try {
|
||||
proxy = svc.getProxy(conf, GRACEFUL_FENCE_TIMEOUT);
|
||||
proxy.transitionToStandby();
|
||||
return true;
|
||||
} catch (ServiceFailedException sfe) {
|
||||
LOG.warn("Unable to gracefully make " + svc + " standby (" +
|
||||
sfe.getMessage() + ")");
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to gracefully make " + svc +
|
||||
" standby (unable to connect)", ioe);
|
||||
} finally {
|
||||
if (proxy != null) {
|
||||
RPC.stopProxy(proxy);
|
||||
}
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
/**
|
||||
* Failover from service 1 to service 2. If the failover fails
|
||||
* then try to failback.
|
||||
@ -118,16 +150,9 @@ public static void failover(HAServiceTarget fromSvc,
|
||||
|
||||
// Try to make fromSvc standby
|
||||
boolean tryFence = true;
|
||||
try {
|
||||
HAServiceProtocolHelper.transitionToStandby(fromSvc.getProxy());
|
||||
// We should try to fence if we failed or it was forced
|
||||
tryFence = forceFence ? true : false;
|
||||
} catch (ServiceFailedException sfe) {
|
||||
LOG.warn("Unable to make " + fromSvc + " standby (" +
|
||||
sfe.getMessage() + ")");
|
||||
} catch (IOException ioe) {
|
||||
LOG.warn("Unable to make " + fromSvc +
|
||||
" standby (unable to connect)", ioe);
|
||||
|
||||
if (tryGracefulFence(new Configuration(), fromSvc)) {
|
||||
tryFence = forceFence;
|
||||
}
|
||||
|
||||
// Fence fromSvc if it's required or forced by the user
|
||||
|
@ -188,7 +188,8 @@ private void doHealthChecks() throws InterruptedException {
|
||||
proxy.monitorHealth();
|
||||
healthy = true;
|
||||
} catch (HealthCheckFailedException e) {
|
||||
LOG.warn("Service health check failed: " + e.getMessage());
|
||||
LOG.warn("Service health check failed for " + targetToMonitor
|
||||
+ ": " + e.getMessage());
|
||||
enterState(State.SERVICE_UNHEALTHY);
|
||||
} catch (Throwable t) {
|
||||
LOG.warn("Transport-level exception trying to monitor health of " +
|
||||
|
@ -0,0 +1,387 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ha;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.PrivilegedAction;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||
import org.apache.hadoop.ha.HealthMonitor.State;
|
||||
import org.apache.hadoop.security.SecurityUtil;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
import org.apache.hadoop.util.ToolRunner;
|
||||
import org.apache.zookeeper.ZooDefs.Ids;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
|
||||
@InterfaceAudience.LimitedPrivate("HDFS")
|
||||
public abstract class ZKFailoverController implements Tool {
|
||||
|
||||
static final Log LOG = LogFactory.getLog(ZKFailoverController.class);
|
||||
|
||||
// TODO: this should be namespace-scoped
|
||||
public static final String ZK_QUORUM_KEY = "ha.zookeeper.quorum";
|
||||
private static final String ZK_SESSION_TIMEOUT_KEY = "ha.zookeeper.session-timeout.ms";
|
||||
private static final int ZK_SESSION_TIMEOUT_DEFAULT = 5*1000;
|
||||
private static final String ZK_PARENT_ZNODE_KEY = "ha.zookeeper.parent-znode";
|
||||
static final String ZK_PARENT_ZNODE_DEFAULT = "/hadoop-ha";
|
||||
|
||||
/** Unable to format the parent znode in ZK */
|
||||
static final int ERR_CODE_FORMAT_DENIED = 2;
|
||||
/** The parent znode doesn't exist in ZK */
|
||||
static final int ERR_CODE_NO_PARENT_ZNODE = 3;
|
||||
/** Fencing is not properly configured */
|
||||
static final int ERR_CODE_NO_FENCER = 4;
|
||||
|
||||
private Configuration conf;
|
||||
|
||||
private HealthMonitor healthMonitor;
|
||||
private ActiveStandbyElector elector;
|
||||
|
||||
private HAServiceTarget localTarget;
|
||||
|
||||
private String parentZnode;
|
||||
|
||||
private State lastHealthState = State.INITIALIZING;
|
||||
|
||||
/** Set if a fatal error occurs */
|
||||
private String fatalError = null;
|
||||
|
||||
@Override
|
||||
public void setConf(Configuration conf) {
|
||||
this.conf = conf;
|
||||
localTarget = getLocalTarget();
|
||||
}
|
||||
|
||||
|
||||
protected abstract byte[] targetToData(HAServiceTarget target);
|
||||
protected abstract HAServiceTarget getLocalTarget();
|
||||
protected abstract HAServiceTarget dataToTarget(byte[] data);
|
||||
|
||||
|
||||
@Override
|
||||
public Configuration getConf() {
|
||||
return conf;
|
||||
}
|
||||
|
||||
@Override
|
||||
public int run(final String[] args) throws Exception {
|
||||
// TODO: need to hook DFS here to find the NN keytab info, etc,
|
||||
// similar to what DFSHAAdmin does. Annoying that this is in common.
|
||||
try {
|
||||
return SecurityUtil.doAsLoginUserOrFatal(new PrivilegedAction<Integer>() {
|
||||
@Override
|
||||
public Integer run() {
|
||||
try {
|
||||
return doRun(args);
|
||||
} catch (Exception t) {
|
||||
throw new RuntimeException(t);
|
||||
}
|
||||
}
|
||||
});
|
||||
} catch (RuntimeException rte) {
|
||||
throw (Exception)rte.getCause();
|
||||
}
|
||||
}
|
||||
|
||||
private int doRun(String[] args)
|
||||
throws HadoopIllegalArgumentException, IOException, InterruptedException {
|
||||
initZK();
|
||||
if (args.length > 0) {
|
||||
if ("-formatZK".equals(args[0])) {
|
||||
boolean force = false;
|
||||
boolean interactive = true;
|
||||
for (int i = 1; i < args.length; i++) {
|
||||
if ("-force".equals(args[i])) {
|
||||
force = true;
|
||||
} else if ("-nonInteractive".equals(args[i])) {
|
||||
interactive = false;
|
||||
} else {
|
||||
badArg(args[i]);
|
||||
}
|
||||
}
|
||||
return formatZK(force, interactive);
|
||||
} else {
|
||||
badArg(args[0]);
|
||||
}
|
||||
}
|
||||
|
||||
if (!elector.parentZNodeExists()) {
|
||||
LOG.fatal("Unable to start failover controller. " +
|
||||
"Parent znode does not exist.\n" +
|
||||
"Run with -formatZK flag to initialize ZooKeeper.");
|
||||
return ERR_CODE_NO_PARENT_ZNODE;
|
||||
}
|
||||
|
||||
try {
|
||||
localTarget.checkFencingConfigured();
|
||||
} catch (BadFencingConfigurationException e) {
|
||||
LOG.fatal("Fencing is not configured for " + localTarget + ".\n" +
|
||||
"You must configure a fencing method before using automatic " +
|
||||
"failover.", e);
|
||||
return ERR_CODE_NO_FENCER;
|
||||
}
|
||||
|
||||
initHM();
|
||||
mainLoop();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private void badArg(String arg) {
|
||||
printUsage();
|
||||
throw new HadoopIllegalArgumentException(
|
||||
"Bad argument: " + arg);
|
||||
}
|
||||
|
||||
private void printUsage() {
|
||||
System.err.println("Usage: " + this.getClass().getSimpleName() +
|
||||
" [-formatZK [-force | -nonInteractive]]");
|
||||
}
|
||||
|
||||
private int formatZK(boolean force, boolean interactive)
|
||||
throws IOException, InterruptedException {
|
||||
if (elector.parentZNodeExists()) {
|
||||
if (!force && (!interactive || !confirmFormat())) {
|
||||
return ERR_CODE_FORMAT_DENIED;
|
||||
}
|
||||
|
||||
try {
|
||||
elector.clearParentZNode();
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to clear zk parent znode", e);
|
||||
return 1;
|
||||
}
|
||||
}
|
||||
|
||||
elector.ensureParentZNode();
|
||||
return 0;
|
||||
}
|
||||
|
||||
private boolean confirmFormat() {
|
||||
System.err.println(
|
||||
"===============================================\n" +
|
||||
"The configured parent znode " + parentZnode + " already exists.\n" +
|
||||
"Are you sure you want to clear all failover information from\n" +
|
||||
"ZooKeeper?\n" +
|
||||
"WARNING: Before proceeding, ensure that all HDFS services and\n" +
|
||||
"failover controllers are stopped!\n" +
|
||||
"===============================================");
|
||||
try {
|
||||
return ToolRunner.confirmPrompt("Proceed formatting " + parentZnode + "?");
|
||||
} catch (IOException e) {
|
||||
LOG.debug("Failed to confirm", e);
|
||||
return false;
|
||||
}
|
||||
}
|
||||
|
||||
// ------------------------------------------
|
||||
// Begin actual guts of failover controller
|
||||
// ------------------------------------------
|
||||
|
||||
private void initHM() {
|
||||
healthMonitor = new HealthMonitor(conf, localTarget);
|
||||
healthMonitor.addCallback(new HealthCallbacks());
|
||||
healthMonitor.start();
|
||||
}
|
||||
|
||||
private void initZK() throws HadoopIllegalArgumentException, IOException {
|
||||
String zkQuorum = conf.get(ZK_QUORUM_KEY);
|
||||
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
|
||||
ZK_SESSION_TIMEOUT_DEFAULT);
|
||||
parentZnode = conf.get(ZK_PARENT_ZNODE_KEY,
|
||||
ZK_PARENT_ZNODE_DEFAULT);
|
||||
// TODO: need ZK ACL support in config, also maybe auth!
|
||||
List<ACL> zkAcls = Ids.OPEN_ACL_UNSAFE;
|
||||
|
||||
Preconditions.checkArgument(zkQuorum != null,
|
||||
"Missing required configuration '%s' for ZooKeeper quorum",
|
||||
ZK_QUORUM_KEY);
|
||||
Preconditions.checkArgument(zkTimeout > 0,
|
||||
"Invalid ZK session timeout %s", zkTimeout);
|
||||
|
||||
|
||||
elector = new ActiveStandbyElector(zkQuorum,
|
||||
zkTimeout, parentZnode, zkAcls, new ElectorCallbacks());
|
||||
}
|
||||
|
||||
private synchronized void mainLoop() throws InterruptedException {
|
||||
while (fatalError == null) {
|
||||
wait();
|
||||
}
|
||||
assert fatalError != null; // only get here on fatal
|
||||
throw new RuntimeException(
|
||||
"ZK Failover Controller failed: " + fatalError);
|
||||
}
|
||||
|
||||
private synchronized void fatalError(String err) {
|
||||
LOG.fatal("Fatal error occurred:" + err);
|
||||
fatalError = err;
|
||||
notifyAll();
|
||||
}
|
||||
|
||||
private synchronized void becomeActive() {
|
||||
LOG.info("Trying to make " + localTarget + " active...");
|
||||
try {
|
||||
localTarget.getProxy().transitionToActive();
|
||||
LOG.info("Successfully transitioned " + localTarget +
|
||||
" to active state");
|
||||
} catch (Throwable t) {
|
||||
LOG.fatal("Couldn't make " + localTarget + " active", t);
|
||||
elector.quitElection(true);
|
||||
/*
|
||||
* TODO:
|
||||
* we need to make sure that if we get fenced and then quickly restarted,
|
||||
* none of these calls will retry across the restart boundary
|
||||
* perhaps the solution is that, whenever the nn starts, it gets a unique
|
||||
* ID, and when we start becoming active, we record it, and then any future
|
||||
* calls use the same ID
|
||||
*/
|
||||
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void becomeStandby() {
|
||||
LOG.info("ZK Election indicated that " + localTarget +
|
||||
" should become standby");
|
||||
try {
|
||||
localTarget.getProxy().transitionToStandby();
|
||||
LOG.info("Successfully transitioned " + localTarget +
|
||||
" to standby state");
|
||||
} catch (Exception e) {
|
||||
LOG.error("Couldn't transition " + localTarget + " to standby state",
|
||||
e);
|
||||
// TODO handle this. It's a likely case since we probably got fenced
|
||||
// at the same time.
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* @return the last health state passed to the FC
|
||||
* by the HealthMonitor.
|
||||
*/
|
||||
@VisibleForTesting
|
||||
State getLastHealthState() {
|
||||
return lastHealthState;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
ActiveStandbyElector getElectorForTests() {
|
||||
return elector;
|
||||
}
|
||||
|
||||
/**
|
||||
* Callbacks from elector
|
||||
*/
|
||||
class ElectorCallbacks implements ActiveStandbyElectorCallback {
|
||||
@Override
|
||||
public void becomeActive() {
|
||||
ZKFailoverController.this.becomeActive();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void becomeStandby() {
|
||||
ZKFailoverController.this.becomeStandby();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void enterNeutralMode() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void notifyFatalError(String errorMessage) {
|
||||
fatalError(errorMessage);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void fenceOldActive(byte[] data) {
|
||||
HAServiceTarget target = dataToTarget(data);
|
||||
|
||||
LOG.info("Should fence: " + target);
|
||||
boolean gracefulWorked =
|
||||
FailoverController.tryGracefulFence(conf, target);
|
||||
if (gracefulWorked) {
|
||||
// It's possible that it's in standby but just about to go into active,
|
||||
// no? Is there some race here?
|
||||
LOG.info("Successfully transitioned " + target + " to standby " +
|
||||
"state without fencing");
|
||||
return;
|
||||
}
|
||||
|
||||
try {
|
||||
target.checkFencingConfigured();
|
||||
} catch (BadFencingConfigurationException e) {
|
||||
LOG.error("Couldn't fence old active " + target, e);
|
||||
// TODO: see below todo
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
if (!target.getFencer().fence(target)) {
|
||||
// TODO: this will end up in some kind of tight loop,
|
||||
// won't it? We need some kind of backoff
|
||||
throw new RuntimeException("Unable to fence " + target);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Callbacks from HealthMonitor
|
||||
*/
|
||||
class HealthCallbacks implements HealthMonitor.Callback {
|
||||
@Override
|
||||
public void enteredState(HealthMonitor.State newState) {
|
||||
LOG.info("Local service " + localTarget +
|
||||
" entered state: " + newState);
|
||||
switch (newState) {
|
||||
case SERVICE_HEALTHY:
|
||||
LOG.info("Joining master election for " + localTarget);
|
||||
elector.joinElection(targetToData(localTarget));
|
||||
break;
|
||||
|
||||
case INITIALIZING:
|
||||
LOG.info("Ensuring that " + localTarget + " does not " +
|
||||
"participate in active master election");
|
||||
elector.quitElection(false);
|
||||
break;
|
||||
|
||||
case SERVICE_UNHEALTHY:
|
||||
case SERVICE_NOT_RESPONDING:
|
||||
LOG.info("Quitting master election for " + localTarget +
|
||||
" and marking that fencing is necessary");
|
||||
elector.quitElection(true);
|
||||
break;
|
||||
|
||||
case HEALTH_MONITOR_FAILED:
|
||||
fatalError("Health monitor failed!");
|
||||
break;
|
||||
|
||||
default:
|
||||
throw new IllegalArgumentException("Unhandled state:" + newState);
|
||||
}
|
||||
|
||||
lastHealthState = newState;
|
||||
}
|
||||
}
|
||||
}
|
@ -17,6 +17,7 @@
|
||||
*/
|
||||
package org.apache.hadoop.util;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.io.PrintStream;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
@ -92,4 +93,33 @@ public static void printGenericCommandUsage(PrintStream out) {
|
||||
GenericOptionsParser.printGenericCommandUsage(out);
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Print out a prompt to the user, and return true if the user
|
||||
* responds with "y" or "yes". (case insensitive)
|
||||
*/
|
||||
public static boolean confirmPrompt(String prompt) throws IOException {
|
||||
while (true) {
|
||||
System.err.print(prompt + " (Y or N) ");
|
||||
StringBuilder responseBuilder = new StringBuilder();
|
||||
while (true) {
|
||||
int c = System.in.read();
|
||||
if (c == -1 || c == '\r' || c == '\n') {
|
||||
break;
|
||||
}
|
||||
responseBuilder.append((char)c);
|
||||
}
|
||||
|
||||
String response = responseBuilder.toString();
|
||||
if (response.equalsIgnoreCase("y") ||
|
||||
response.equalsIgnoreCase("yes")) {
|
||||
return true;
|
||||
} else if (response.equalsIgnoreCase("n") ||
|
||||
response.equalsIgnoreCase("no")) {
|
||||
return false;
|
||||
}
|
||||
System.err.println("Invalid input: " + response);
|
||||
// else ask them again
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -50,4 +50,15 @@ public static void waitForActiveLockData(TestContext ctx,
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
public static void waitForElectorState(TestContext ctx,
|
||||
ActiveStandbyElector elector,
|
||||
ActiveStandbyElector.State state) throws Exception {
|
||||
while (elector.getStateForTests() != state) {
|
||||
if (ctx != null) {
|
||||
ctx.checkException();
|
||||
}
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -19,12 +19,15 @@
|
||||
|
||||
import java.io.IOException;
|
||||
import java.net.InetSocketAddress;
|
||||
import java.util.ArrayList;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.security.AccessControlException;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.collect.Lists;
|
||||
|
||||
/**
|
||||
* Test-only implementation of {@link HAServiceTarget}, which returns
|
||||
* a mock implementation.
|
||||
@ -36,12 +39,20 @@ class DummyHAService extends HAServiceTarget {
|
||||
InetSocketAddress address;
|
||||
boolean isHealthy = true;
|
||||
boolean actUnreachable = false;
|
||||
boolean failToBecomeActive;
|
||||
|
||||
static ArrayList<DummyHAService> instances = Lists.newArrayList();
|
||||
int index;
|
||||
|
||||
DummyHAService(HAServiceState state, InetSocketAddress address) {
|
||||
this.state = state;
|
||||
this.proxy = makeMock();
|
||||
this.fencer = Mockito.mock(NodeFencer.class);
|
||||
this.address = address;
|
||||
synchronized (instances) {
|
||||
instances.add(this);
|
||||
this.index = instances.size();
|
||||
}
|
||||
}
|
||||
|
||||
private HAServiceProtocol makeMock() {
|
||||
@ -59,6 +70,10 @@ public void monitorHealth() throws HealthCheckFailedException,
|
||||
public void transitionToActive() throws ServiceFailedException,
|
||||
AccessControlException, IOException {
|
||||
checkUnreachable();
|
||||
if (failToBecomeActive) {
|
||||
throw new ServiceFailedException("injected failure");
|
||||
}
|
||||
|
||||
state = HAServiceState.ACTIVE;
|
||||
}
|
||||
|
||||
@ -97,7 +112,7 @@ public HAServiceProtocol getProxy(Configuration conf, int timeout)
|
||||
throws IOException {
|
||||
return proxy;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public NodeFencer getFencer() {
|
||||
return fencer;
|
||||
@ -106,4 +121,13 @@ public NodeFencer getFencer() {
|
||||
@Override
|
||||
public void checkFencingConfigured() throws BadFencingConfigurationException {
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "DummyHAService #" + index;
|
||||
}
|
||||
|
||||
public static HAServiceTarget getInstance(int serial) {
|
||||
return instances.get(serial - 1);
|
||||
}
|
||||
}
|
||||
|
@ -156,6 +156,10 @@ public boolean tryFence(HAServiceTarget target, String args) {
|
||||
@Override
|
||||
public void checkArgs(String args) {
|
||||
}
|
||||
|
||||
public static HAServiceTarget getLastFencedService() {
|
||||
return fencedSvc;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -0,0 +1,457 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.ha;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
|
||||
import java.io.File;
|
||||
import java.net.InetSocketAddress;
|
||||
|
||||
import org.apache.commons.logging.impl.Log4JLogger;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||
import org.apache.hadoop.ha.HealthMonitor.State;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestContext;
|
||||
import org.apache.hadoop.test.MultithreadedTestUtil.TestingThread;
|
||||
import org.apache.log4j.Level;
|
||||
import org.apache.zookeeper.KeeperException.NoNodeException;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.apache.zookeeper.server.ZooKeeperServer;
|
||||
import org.apache.zookeeper.test.ClientBase;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.mockito.Mockito;
|
||||
|
||||
import com.google.common.primitives.Ints;
|
||||
|
||||
public class TestZKFailoverController extends ClientBase {
|
||||
private Configuration conf;
|
||||
private DummyHAService svc1;
|
||||
private DummyHAService svc2;
|
||||
private TestContext ctx;
|
||||
private DummyZKFCThread thr1, thr2;
|
||||
|
||||
static {
|
||||
((Log4JLogger)ActiveStandbyElector.LOG).getLogger().setLevel(Level.ALL);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void setUp() throws Exception {
|
||||
// build.test.dir is used by zookeeper
|
||||
new File(System.getProperty("build.test.dir", "build")).mkdirs();
|
||||
super.setUp();
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setupConfAndServices() {
|
||||
conf = new Configuration();
|
||||
conf.set(ZKFailoverController.ZK_QUORUM_KEY, hostPort);
|
||||
// Fast check interval so tests run faster
|
||||
conf.setInt(CommonConfigurationKeys.HA_HM_CHECK_INTERVAL_KEY, 50);
|
||||
conf.setInt(CommonConfigurationKeys.HA_HM_CONNECT_RETRY_INTERVAL_KEY, 50);
|
||||
conf.setInt(CommonConfigurationKeys.HA_HM_SLEEP_AFTER_DISCONNECT_KEY, 50);
|
||||
svc1 = new DummyHAService(HAServiceState.INITIALIZING,
|
||||
new InetSocketAddress("svc1", 1234));
|
||||
svc2 = new DummyHAService(HAServiceState.INITIALIZING,
|
||||
new InetSocketAddress("svc2", 1234));
|
||||
}
|
||||
|
||||
/**
|
||||
* Set up two services and their failover controllers. svc1 is started
|
||||
* first, so that it enters ACTIVE state, and then svc2 is started,
|
||||
* which enters STANDBY
|
||||
*/
|
||||
private void setupFCs() throws Exception {
|
||||
// Format the base dir, should succeed
|
||||
assertEquals(0, runFC(svc1, "-formatZK"));
|
||||
|
||||
ctx = new MultithreadedTestUtil.TestContext();
|
||||
thr1 = new DummyZKFCThread(ctx, svc1);
|
||||
ctx.addThread(thr1);
|
||||
thr1.start();
|
||||
|
||||
LOG.info("Waiting for svc1 to enter active state");
|
||||
waitForHAState(svc1, HAServiceState.ACTIVE);
|
||||
|
||||
LOG.info("Adding svc2");
|
||||
thr2 = new DummyZKFCThread(ctx, svc2);
|
||||
thr2.start();
|
||||
waitForHAState(svc2, HAServiceState.STANDBY);
|
||||
}
|
||||
|
||||
private void stopFCs() throws Exception {
|
||||
if (thr1 != null) {
|
||||
thr1.interrupt();
|
||||
}
|
||||
if (thr2 != null) {
|
||||
thr2.interrupt();
|
||||
}
|
||||
if (ctx != null) {
|
||||
ctx.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the various command lines for formatting the ZK directory
|
||||
* function correctly.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testFormatZK() throws Exception {
|
||||
// Run without formatting the base dir,
|
||||
// should barf
|
||||
assertEquals(ZKFailoverController.ERR_CODE_NO_PARENT_ZNODE,
|
||||
runFC(svc1));
|
||||
|
||||
// Format the base dir, should succeed
|
||||
assertEquals(0, runFC(svc1, "-formatZK"));
|
||||
|
||||
// Should fail to format if already formatted
|
||||
assertEquals(ZKFailoverController.ERR_CODE_FORMAT_DENIED,
|
||||
runFC(svc1, "-formatZK", "-nonInteractive"));
|
||||
|
||||
// Unless '-force' is on
|
||||
assertEquals(0, runFC(svc1, "-formatZK", "-force"));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the ZKFC won't run if fencing is not configured for the
|
||||
* local service.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testFencingMustBeConfigured() throws Exception {
|
||||
svc1 = Mockito.spy(svc1);
|
||||
Mockito.doThrow(new BadFencingConfigurationException("no fencing"))
|
||||
.when(svc1).checkFencingConfigured();
|
||||
// Format the base dir, should succeed
|
||||
assertEquals(0, runFC(svc1, "-formatZK"));
|
||||
// Try to run the actual FC, should fail without a fencer
|
||||
assertEquals(ZKFailoverController.ERR_CODE_NO_FENCER,
|
||||
runFC(svc1));
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, when the health monitor indicates bad health status,
|
||||
* failover is triggered. Also ensures that graceful active->standby
|
||||
* transition is used when possible, falling back to fencing when
|
||||
* the graceful approach fails.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testAutoFailoverOnBadHealth() throws Exception {
|
||||
try {
|
||||
setupFCs();
|
||||
|
||||
LOG.info("Faking svc1 unhealthy, should failover to svc2");
|
||||
svc1.isHealthy = false;
|
||||
LOG.info("Waiting for svc1 to enter standby state");
|
||||
waitForHAState(svc1, HAServiceState.STANDBY);
|
||||
waitForHAState(svc2, HAServiceState.ACTIVE);
|
||||
|
||||
LOG.info("Allowing svc1 to be healthy again, making svc2 unreachable " +
|
||||
"and fail to gracefully go to standby");
|
||||
svc1.isHealthy = true;
|
||||
svc2.actUnreachable = true;
|
||||
|
||||
// Allow fencing to succeed
|
||||
Mockito.doReturn(true).when(svc2.fencer).fence(Mockito.same(svc2));
|
||||
// Should fail back to svc1 at this point
|
||||
waitForHAState(svc1, HAServiceState.ACTIVE);
|
||||
// and fence svc2
|
||||
Mockito.verify(svc2.fencer).fence(Mockito.same(svc2));
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
}
|
||||
|
||||
@Test(timeout=15000)
|
||||
public void testAutoFailoverOnLostZKSession() throws Exception {
|
||||
try {
|
||||
setupFCs();
|
||||
|
||||
// Expire svc1, it should fail over to svc2
|
||||
expireAndVerifyFailover(thr1, thr2);
|
||||
|
||||
// Expire svc2, it should fail back to svc1
|
||||
expireAndVerifyFailover(thr2, thr1);
|
||||
|
||||
LOG.info("======= Running test cases second time to test " +
|
||||
"re-establishment =========");
|
||||
// Expire svc1, it should fail over to svc2
|
||||
expireAndVerifyFailover(thr1, thr2);
|
||||
|
||||
// Expire svc2, it should fail back to svc1
|
||||
expireAndVerifyFailover(thr2, thr1);
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
}
|
||||
|
||||
private void expireAndVerifyFailover(DummyZKFCThread fromThr,
|
||||
DummyZKFCThread toThr) throws Exception {
|
||||
DummyHAService fromSvc = fromThr.zkfc.localTarget;
|
||||
DummyHAService toSvc = toThr.zkfc.localTarget;
|
||||
|
||||
fromThr.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
|
||||
try {
|
||||
expireActiveLockHolder(fromSvc);
|
||||
|
||||
waitForHAState(fromSvc, HAServiceState.STANDBY);
|
||||
waitForHAState(toSvc, HAServiceState.ACTIVE);
|
||||
} finally {
|
||||
fromThr.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, if the standby node is unhealthy, it doesn't try to become
|
||||
* active
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testDontFailoverToUnhealthyNode() throws Exception {
|
||||
try {
|
||||
setupFCs();
|
||||
|
||||
// Make svc2 unhealthy, and wait for its FC to notice the bad health.
|
||||
svc2.isHealthy = false;
|
||||
waitForHealthState(thr2.zkfc,
|
||||
HealthMonitor.State.SERVICE_UNHEALTHY);
|
||||
|
||||
// Expire svc1
|
||||
thr1.zkfc.getElectorForTests().preventSessionReestablishmentForTests();
|
||||
try {
|
||||
expireActiveLockHolder(svc1);
|
||||
|
||||
LOG.info("Expired svc1's ZK session. Waiting a second to give svc2" +
|
||||
" a chance to take the lock, if it is ever going to.");
|
||||
Thread.sleep(1000);
|
||||
|
||||
// Ensure that no one holds the lock.
|
||||
waitForActiveLockHolder(null);
|
||||
|
||||
} finally {
|
||||
LOG.info("Allowing svc1's elector to re-establish its connection");
|
||||
thr1.zkfc.getElectorForTests().allowSessionReestablishmentForTests();
|
||||
}
|
||||
// svc1 should get the lock again
|
||||
waitForActiveLockHolder(svc1);
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that the ZKFC successfully quits the election when it fails to
|
||||
* become active. This allows the old node to successfully fail back.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testBecomingActiveFails() throws Exception {
|
||||
try {
|
||||
setupFCs();
|
||||
|
||||
LOG.info("Making svc2 fail to become active");
|
||||
svc2.failToBecomeActive = true;
|
||||
|
||||
LOG.info("Faking svc1 unhealthy, should NOT successfully " +
|
||||
"failover to svc2");
|
||||
svc1.isHealthy = false;
|
||||
waitForHealthState(thr1.zkfc, State.SERVICE_UNHEALTHY);
|
||||
waitForActiveLockHolder(null);
|
||||
|
||||
Mockito.verify(svc2.proxy).transitionToActive();
|
||||
|
||||
waitForHAState(svc1, HAServiceState.STANDBY);
|
||||
waitForHAState(svc2, HAServiceState.STANDBY);
|
||||
|
||||
LOG.info("Faking svc1 healthy again, should go back to svc1");
|
||||
svc1.isHealthy = true;
|
||||
waitForHAState(svc1, HAServiceState.ACTIVE);
|
||||
waitForHAState(svc2, HAServiceState.STANDBY);
|
||||
waitForActiveLockHolder(svc1);
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Test that, when ZooKeeper fails, the system remains in its
|
||||
* current state, without triggering any failovers, and without
|
||||
* causing the active node to enter standby state.
|
||||
*/
|
||||
@Test(timeout=15000)
|
||||
public void testZooKeeperFailure() throws Exception {
|
||||
try {
|
||||
setupFCs();
|
||||
|
||||
// Record initial ZK sessions
|
||||
long session1 = thr1.zkfc.getElectorForTests().getZKSessionIdForTests();
|
||||
long session2 = thr2.zkfc.getElectorForTests().getZKSessionIdForTests();
|
||||
|
||||
LOG.info("====== Stopping ZK server");
|
||||
stopServer();
|
||||
waitForServerDown(hostPort, CONNECTION_TIMEOUT);
|
||||
|
||||
LOG.info("====== Waiting for services to enter NEUTRAL mode");
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr1.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.NEUTRAL);
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr2.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.NEUTRAL);
|
||||
|
||||
LOG.info("====== Checking that the services didn't change HA state");
|
||||
assertEquals(HAServiceState.ACTIVE, svc1.state);
|
||||
assertEquals(HAServiceState.STANDBY, svc2.state);
|
||||
|
||||
LOG.info("====== Restarting server");
|
||||
startServer();
|
||||
waitForServerUp(hostPort, CONNECTION_TIMEOUT);
|
||||
|
||||
// Nodes should go back to their original states, since they re-obtain
|
||||
// the same sessions.
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr1.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.ACTIVE);
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr2.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.STANDBY);
|
||||
// Check HA states didn't change.
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr1.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.ACTIVE);
|
||||
ActiveStandbyElectorTestUtil.waitForElectorState(ctx,
|
||||
thr2.zkfc.getElectorForTests(),
|
||||
ActiveStandbyElector.State.STANDBY);
|
||||
// Check they re-used the same sessions and didn't spuriously reconnect
|
||||
assertEquals(session1,
|
||||
thr1.zkfc.getElectorForTests().getZKSessionIdForTests());
|
||||
assertEquals(session2,
|
||||
thr2.zkfc.getElectorForTests().getZKSessionIdForTests());
|
||||
} finally {
|
||||
stopFCs();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Expire the ZK session of the given service. This requires
|
||||
* (and asserts) that the given service be the current active.
|
||||
* @throws NoNodeException if no service holds the lock
|
||||
*/
|
||||
private void expireActiveLockHolder(DummyHAService expectedActive)
|
||||
throws NoNodeException {
|
||||
ZooKeeperServer zks = getServer(serverFactory);
|
||||
Stat stat = new Stat();
|
||||
byte[] data = zks.getZKDatabase().getData(
|
||||
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT + "/" +
|
||||
ActiveStandbyElector.LOCK_FILENAME, stat, null);
|
||||
|
||||
assertArrayEquals(Ints.toByteArray(expectedActive.index), data);
|
||||
long session = stat.getEphemeralOwner();
|
||||
LOG.info("Expiring svc " + expectedActive + "'s zookeeper session " + session);
|
||||
zks.closeSession(session);
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the given HA service to enter the given HA state.
|
||||
*/
|
||||
private void waitForHAState(DummyHAService svc, HAServiceState state)
|
||||
throws Exception {
|
||||
while (svc.state != state) {
|
||||
ctx.checkException();
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the ZKFC to be notified of a change in health state.
|
||||
*/
|
||||
private void waitForHealthState(DummyZKFC zkfc, State state)
|
||||
throws Exception {
|
||||
while (zkfc.getLastHealthState() != state) {
|
||||
ctx.checkException();
|
||||
Thread.sleep(50);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Wait for the given HA service to become the active lock holder.
|
||||
* If the passed svc is null, waits for there to be no active
|
||||
* lock holder.
|
||||
*/
|
||||
private void waitForActiveLockHolder(DummyHAService svc)
|
||||
throws Exception {
|
||||
ZooKeeperServer zks = getServer(serverFactory);
|
||||
ActiveStandbyElectorTestUtil.waitForActiveLockData(ctx, zks,
|
||||
ZKFailoverController.ZK_PARENT_ZNODE_DEFAULT,
|
||||
(svc == null) ? null : Ints.toByteArray(svc.index));
|
||||
}
|
||||
|
||||
|
||||
private int runFC(DummyHAService target, String ... args) throws Exception {
|
||||
DummyZKFC zkfc = new DummyZKFC(target);
|
||||
zkfc.setConf(conf);
|
||||
return zkfc.run(args);
|
||||
}
|
||||
|
||||
/**
|
||||
* Test-thread which runs a ZK Failover Controller corresponding
|
||||
* to a given dummy service.
|
||||
*/
|
||||
private class DummyZKFCThread extends TestingThread {
|
||||
private final DummyZKFC zkfc;
|
||||
|
||||
public DummyZKFCThread(TestContext ctx, DummyHAService svc) {
|
||||
super(ctx);
|
||||
this.zkfc = new DummyZKFC(svc);
|
||||
zkfc.setConf(conf);
|
||||
}
|
||||
|
||||
@Override
|
||||
public void doWork() throws Exception {
|
||||
try {
|
||||
assertEquals(0, zkfc.run(new String[0]));
|
||||
} catch (InterruptedException ie) {
|
||||
// Interrupted by main thread, that's OK.
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private static class DummyZKFC extends ZKFailoverController {
|
||||
private final DummyHAService localTarget;
|
||||
|
||||
public DummyZKFC(DummyHAService localTarget) {
|
||||
this.localTarget = localTarget;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected byte[] targetToData(HAServiceTarget target) {
|
||||
return Ints.toByteArray(((DummyHAService)target).index);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HAServiceTarget dataToTarget(byte[] data) {
|
||||
int index = Ints.fromByteArray(data);
|
||||
return DummyHAService.getInstance(index);
|
||||
}
|
||||
|
||||
@Override
|
||||
protected HAServiceTarget getLocalTarget() {
|
||||
return localTarget;
|
||||
}
|
||||
}
|
||||
}
|
Loading…
Reference in New Issue
Block a user