HADOOP-10535. Make the retry numbers in ActiveStandbyElector configurable. Contributed by Jing Zhao.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1589905 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
c7e87574cb
commit
6d4c7df434
@ -355,6 +355,9 @@ Release 2.5.0 - UNRELEASED
|
|||||||
|
|
||||||
HADOOP-10503. Move junit up to v 4.11. (cnauroth)
|
HADOOP-10503. Move junit up to v 4.11. (cnauroth)
|
||||||
|
|
||||||
|
HADOOP-10535. Make the retry numbers in ActiveStandbyElector configurable.
|
||||||
|
(jing9)
|
||||||
|
|
||||||
OPTIMIZATIONS
|
OPTIMIZATIONS
|
||||||
|
|
||||||
BUG FIXES
|
BUG FIXES
|
||||||
|
@ -199,6 +199,11 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
|||||||
"ha.failover-controller.graceful-fence.connection.retries";
|
"ha.failover-controller.graceful-fence.connection.retries";
|
||||||
public static final int HA_FC_GRACEFUL_FENCE_CONNECTION_RETRIES_DEFAULT = 1;
|
public static final int HA_FC_GRACEFUL_FENCE_CONNECTION_RETRIES_DEFAULT = 1;
|
||||||
|
|
||||||
|
/** number of zookeeper operation retry times in ActiveStandbyElector */
|
||||||
|
public static final String HA_FC_ELECTOR_ZK_OP_RETRIES_KEY =
|
||||||
|
"ha.failover-controller.active-standby-elector.zk.op.retries";
|
||||||
|
public static final int HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT = 3;
|
||||||
|
|
||||||
/* Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState */
|
/* Timeout that the CLI (manual) FC waits for monitorHealth, getServiceState */
|
||||||
public static final String HA_FC_CLI_CHECK_TIMEOUT_KEY =
|
public static final String HA_FC_CLI_CHECK_TIMEOUT_KEY =
|
||||||
"ha.failover-controller.cli-check.rpc-timeout.ms";
|
"ha.failover-controller.cli-check.rpc-timeout.ms";
|
||||||
|
@ -143,7 +143,6 @@ public interface ActiveStandbyElectorCallback {
|
|||||||
|
|
||||||
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
|
public static final Log LOG = LogFactory.getLog(ActiveStandbyElector.class);
|
||||||
|
|
||||||
static int NUM_RETRIES = 3;
|
|
||||||
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
|
private static final int SLEEP_AFTER_FAILURE_TO_BECOME_ACTIVE = 1000;
|
||||||
|
|
||||||
private static enum ConnectionState {
|
private static enum ConnectionState {
|
||||||
@ -170,6 +169,7 @@ static enum State {
|
|||||||
private final String zkLockFilePath;
|
private final String zkLockFilePath;
|
||||||
private final String zkBreadCrumbPath;
|
private final String zkBreadCrumbPath;
|
||||||
private final String znodeWorkingDir;
|
private final String znodeWorkingDir;
|
||||||
|
private final int maxRetryNum;
|
||||||
|
|
||||||
private Lock sessionReestablishLockForTests = new ReentrantLock();
|
private Lock sessionReestablishLockForTests = new ReentrantLock();
|
||||||
private boolean wantToBeInElection;
|
private boolean wantToBeInElection;
|
||||||
@ -207,7 +207,7 @@ static enum State {
|
|||||||
public ActiveStandbyElector(String zookeeperHostPorts,
|
public ActiveStandbyElector(String zookeeperHostPorts,
|
||||||
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
||||||
List<ZKAuthInfo> authInfo,
|
List<ZKAuthInfo> authInfo,
|
||||||
ActiveStandbyElectorCallback app) throws IOException,
|
ActiveStandbyElectorCallback app, int maxRetryNum) throws IOException,
|
||||||
HadoopIllegalArgumentException, KeeperException {
|
HadoopIllegalArgumentException, KeeperException {
|
||||||
if (app == null || acl == null || parentZnodeName == null
|
if (app == null || acl == null || parentZnodeName == null
|
||||||
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
||||||
@ -221,6 +221,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
|
|||||||
znodeWorkingDir = parentZnodeName;
|
znodeWorkingDir = parentZnodeName;
|
||||||
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
|
zkLockFilePath = znodeWorkingDir + "/" + LOCK_FILENAME;
|
||||||
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
|
zkBreadCrumbPath = znodeWorkingDir + "/" + BREADCRUMB_FILENAME;
|
||||||
|
this.maxRetryNum = maxRetryNum;
|
||||||
|
|
||||||
// createConnection for future API calls
|
// createConnection for future API calls
|
||||||
createConnection();
|
createConnection();
|
||||||
@ -439,7 +440,7 @@ public synchronized void processResult(int rc, String path, Object ctx,
|
|||||||
LOG.debug(errorMessage);
|
LOG.debug(errorMessage);
|
||||||
|
|
||||||
if (shouldRetry(code)) {
|
if (shouldRetry(code)) {
|
||||||
if (createRetryCount < NUM_RETRIES) {
|
if (createRetryCount < maxRetryNum) {
|
||||||
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
|
LOG.debug("Retrying createNode createRetryCount: " + createRetryCount);
|
||||||
++createRetryCount;
|
++createRetryCount;
|
||||||
createLockNodeAsync();
|
createLockNodeAsync();
|
||||||
@ -500,7 +501,7 @@ public synchronized void processResult(int rc, String path, Object ctx,
|
|||||||
LOG.debug(errorMessage);
|
LOG.debug(errorMessage);
|
||||||
|
|
||||||
if (shouldRetry(code)) {
|
if (shouldRetry(code)) {
|
||||||
if (statRetryCount < NUM_RETRIES) {
|
if (statRetryCount < maxRetryNum) {
|
||||||
++statRetryCount;
|
++statRetryCount;
|
||||||
monitorLockNodeAsync();
|
monitorLockNodeAsync();
|
||||||
return;
|
return;
|
||||||
@ -735,7 +736,7 @@ synchronized State getStateForTests() {
|
|||||||
private boolean reEstablishSession() {
|
private boolean reEstablishSession() {
|
||||||
int connectionRetryCount = 0;
|
int connectionRetryCount = 0;
|
||||||
boolean success = false;
|
boolean success = false;
|
||||||
while(!success && connectionRetryCount < NUM_RETRIES) {
|
while(!success && connectionRetryCount < maxRetryNum) {
|
||||||
LOG.debug("Establishing zookeeper connection for " + this);
|
LOG.debug("Establishing zookeeper connection for " + this);
|
||||||
try {
|
try {
|
||||||
createConnection();
|
createConnection();
|
||||||
@ -972,14 +973,14 @@ public Void run() throws KeeperException, InterruptedException {
|
|||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private static <T> T zkDoWithRetries(ZKAction<T> action)
|
private <T> T zkDoWithRetries(ZKAction<T> action) throws KeeperException,
|
||||||
throws KeeperException, InterruptedException {
|
InterruptedException {
|
||||||
int retry = 0;
|
int retry = 0;
|
||||||
while (true) {
|
while (true) {
|
||||||
try {
|
try {
|
||||||
return action.run();
|
return action.run();
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
if (shouldRetry(ke.code()) && ++retry < NUM_RETRIES) {
|
if (shouldRetry(ke.code()) && ++retry < maxRetryNum) {
|
||||||
continue;
|
continue;
|
||||||
}
|
}
|
||||||
throw ke;
|
throw ke;
|
||||||
|
@ -32,6 +32,7 @@
|
|||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
import org.apache.hadoop.ha.HAServiceProtocol.HAServiceState;
|
||||||
@ -341,10 +342,12 @@ private void initZK() throws HadoopIllegalArgumentException, IOException,
|
|||||||
Preconditions.checkArgument(zkTimeout > 0,
|
Preconditions.checkArgument(zkTimeout > 0,
|
||||||
"Invalid ZK session timeout %s", zkTimeout);
|
"Invalid ZK session timeout %s", zkTimeout);
|
||||||
|
|
||||||
|
int maxRetryNum = conf.getInt(
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
|
||||||
elector = new ActiveStandbyElector(zkQuorum,
|
elector = new ActiveStandbyElector(zkQuorum,
|
||||||
zkTimeout, getParentZnode(), zkAcls, zkAuths,
|
zkTimeout, getParentZnode(), zkAcls, zkAuths,
|
||||||
new ElectorCallbacks());
|
new ElectorCallbacks(), maxRetryNum);
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getParentZnode() {
|
private String getParentZnode() {
|
||||||
|
@ -39,6 +39,7 @@
|
|||||||
import org.mockito.Mockito;
|
import org.mockito.Mockito;
|
||||||
|
|
||||||
import org.apache.hadoop.HadoopIllegalArgumentException;
|
import org.apache.hadoop.HadoopIllegalArgumentException;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
||||||
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
||||||
@ -59,8 +60,9 @@ class ActiveStandbyElectorTester extends ActiveStandbyElector {
|
|||||||
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
||||||
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
|
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
|
||||||
KeeperException {
|
KeeperException {
|
||||||
super(hostPort, timeout, parent, acl,
|
super(hostPort, timeout, parent, acl, Collections
|
||||||
Collections.<ZKAuthInfo>emptyList(), app);
|
.<ZKAuthInfo> emptyList(), app,
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
@ -715,7 +717,8 @@ public void testEnsureBaseNodeFails() throws Exception {
|
|||||||
public void testWithoutZKServer() throws Exception {
|
public void testWithoutZKServer() throws Exception {
|
||||||
try {
|
try {
|
||||||
new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
|
new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
|
||||||
Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp);
|
Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp,
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
|
||||||
Assert.fail("Did not throw zookeeper connection loss exceptions!");
|
Assert.fail("Did not throw zookeeper connection loss exceptions!");
|
||||||
} catch (KeeperException ke) {
|
} catch (KeeperException ke) {
|
||||||
GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);
|
GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);
|
||||||
|
@ -26,6 +26,7 @@
|
|||||||
import java.util.UUID;
|
import java.util.UUID;
|
||||||
|
|
||||||
import org.apache.commons.logging.impl.Log4JLogger;
|
import org.apache.commons.logging.impl.Log4JLogger;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.State;
|
import org.apache.hadoop.ha.ActiveStandbyElector.State;
|
||||||
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
import org.apache.hadoop.util.ZKUtil.ZKAuthInfo;
|
||||||
@ -70,9 +71,9 @@ public void setUp() throws Exception {
|
|||||||
for (int i = 0; i < NUM_ELECTORS; i++) {
|
for (int i = 0; i < NUM_ELECTORS; i++) {
|
||||||
cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
|
cbs[i] = Mockito.mock(ActiveStandbyElectorCallback.class);
|
||||||
appDatas[i] = Ints.toByteArray(i);
|
appDatas[i] = Ints.toByteArray(i);
|
||||||
electors[i] = new ActiveStandbyElector(
|
electors[i] = new ActiveStandbyElector(hostPort, 5000, PARENT_DIR,
|
||||||
hostPort, 5000, PARENT_DIR, Ids.OPEN_ACL_UNSAFE,
|
Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), cbs[i],
|
||||||
Collections.<ZKAuthInfo>emptyList(), cbs[i]);
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
|
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.util.Shell;
|
import org.apache.hadoop.util.Shell;
|
||||||
import org.apache.hadoop.util.Time;
|
import org.apache.hadoop.util.Time;
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
@ -126,8 +127,7 @@ public void testRandomHealthAndDisconnects() throws Exception {
|
|||||||
.when(cluster.getService(0).proxy).monitorHealth();
|
.when(cluster.getService(0).proxy).monitorHealth();
|
||||||
Mockito.doAnswer(new RandomlyThrow(1))
|
Mockito.doAnswer(new RandomlyThrow(1))
|
||||||
.when(cluster.getService(1).proxy).monitorHealth();
|
.when(cluster.getService(1).proxy).monitorHealth();
|
||||||
ActiveStandbyElector.NUM_RETRIES = 100;
|
conf.setInt(CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY, 100);
|
||||||
|
|
||||||
// Don't start until after the above mocking. Otherwise we can get
|
// Don't start until after the above mocking. Otherwise we can get
|
||||||
// Mockito errors if the HM calls the proxy in the middle of
|
// Mockito errors if the HM calls the proxy in the middle of
|
||||||
// setting up the mock.
|
// setting up the mock.
|
||||||
|
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector;
|
import org.apache.hadoop.ha.ActiveStandbyElector;
|
||||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||||
import org.apache.hadoop.ha.ServiceFailedException;
|
import org.apache.hadoop.ha.ServiceFailedException;
|
||||||
@ -85,8 +86,11 @@ protected synchronized void serviceInit(Configuration conf)
|
|||||||
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
|
List<ACL> zkAcls = RMZKUtils.getZKAcls(conf);
|
||||||
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
|
List<ZKUtil.ZKAuthInfo> zkAuths = RMZKUtils.getZKAuths(conf);
|
||||||
|
|
||||||
|
int maxRetryNum = conf.getInt(
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_KEY,
|
||||||
|
CommonConfigurationKeys.HA_FC_ELECTOR_ZK_OP_RETRIES_DEFAULT);
|
||||||
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
|
elector = new ActiveStandbyElector(zkQuorum, (int) zkSessionTimeout,
|
||||||
electionZNode, zkAcls, zkAuths, this);
|
electionZNode, zkAcls, zkAuths, this, maxRetryNum);
|
||||||
|
|
||||||
elector.ensureParentZNode();
|
elector.ensureParentZNode();
|
||||||
if (!isParentZnodeSafe(clusterId)) {
|
if (!isParentZnodeSafe(clusterId)) {
|
||||||
|
Loading…
Reference in New Issue
Block a user