HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection establishment delay. Contributed by Rakesh R and Todd Lipcon.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1419831 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
7ba12a628a
commit
22a78a75b4
@ -476,6 +476,9 @@ Release 2.0.3-alpha - Unreleased
|
|||||||
HADOOP-6762. Exception while doing RPC I/O closes channel
|
HADOOP-6762. Exception while doing RPC I/O closes channel
|
||||||
(Sam Rash and todd via todd)
|
(Sam Rash and todd via todd)
|
||||||
|
|
||||||
|
HADOOP-9126. FormatZK and ZKFC startup can fail due to zkclient connection
|
||||||
|
establishment delay. (Rakesh R and todd via todd)
|
||||||
|
|
||||||
Release 2.0.2-alpha - 2012-09-07
|
Release 2.0.2-alpha - 2012-09-07
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -21,6 +21,8 @@
|
|||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.concurrent.CountDownLatch;
|
||||||
|
import java.util.concurrent.TimeUnit;
|
||||||
import java.util.concurrent.locks.Lock;
|
import java.util.concurrent.locks.Lock;
|
||||||
import java.util.concurrent.locks.ReentrantLock;
|
import java.util.concurrent.locks.ReentrantLock;
|
||||||
|
|
||||||
@ -45,6 +47,7 @@
|
|||||||
|
|
||||||
import com.google.common.annotations.VisibleForTesting;
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
import com.google.common.base.Preconditions;
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.Lists;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
*
|
*
|
||||||
@ -205,7 +208,7 @@ public ActiveStandbyElector(String zookeeperHostPorts,
|
|||||||
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
int zookeeperSessionTimeout, String parentZnodeName, List<ACL> acl,
|
||||||
List<ZKAuthInfo> authInfo,
|
List<ZKAuthInfo> authInfo,
|
||||||
ActiveStandbyElectorCallback app) throws IOException,
|
ActiveStandbyElectorCallback app) throws IOException,
|
||||||
HadoopIllegalArgumentException {
|
HadoopIllegalArgumentException, KeeperException {
|
||||||
if (app == null || acl == null || parentZnodeName == null
|
if (app == null || acl == null || parentZnodeName == null
|
||||||
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
|| zookeeperHostPorts == null || zookeeperSessionTimeout <= 0) {
|
||||||
throw new HadoopIllegalArgumentException("Invalid argument");
|
throw new HadoopIllegalArgumentException("Invalid argument");
|
||||||
@ -602,10 +605,24 @@ synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
|
|||||||
*
|
*
|
||||||
* @return new zookeeper client instance
|
* @return new zookeeper client instance
|
||||||
* @throws IOException
|
* @throws IOException
|
||||||
|
* @throws KeeperException zookeeper connectionloss exception
|
||||||
*/
|
*/
|
||||||
protected synchronized ZooKeeper getNewZooKeeper() throws IOException {
|
protected synchronized ZooKeeper getNewZooKeeper() throws IOException,
|
||||||
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
|
KeeperException {
|
||||||
zk.register(new WatcherWithClientRef(zk));
|
|
||||||
|
// Unfortunately, the ZooKeeper constructor connects to ZooKeeper and
|
||||||
|
// may trigger the Connected event immediately. So, if we register the
|
||||||
|
// watcher after constructing ZooKeeper, we may miss that event. Instead,
|
||||||
|
// we construct the watcher first, and have it queue any events it receives
|
||||||
|
// before we can set its ZooKeeper reference.
|
||||||
|
WatcherWithClientRef watcher = new WatcherWithClientRef();
|
||||||
|
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, watcher);
|
||||||
|
watcher.setZooKeeperRef(zk);
|
||||||
|
|
||||||
|
// Wait for the asynchronous success/failure. This may throw an exception
|
||||||
|
// if we don't connect within the session timeout.
|
||||||
|
watcher.waitForZKConnectionEvent(zkSessionTimeout);
|
||||||
|
|
||||||
for (ZKAuthInfo auth : zkAuthInfo) {
|
for (ZKAuthInfo auth : zkAuthInfo) {
|
||||||
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
|
zk.addAuthInfo(auth.getScheme(), auth.getAuth());
|
||||||
}
|
}
|
||||||
@ -710,13 +727,16 @@ private boolean reEstablishSession() {
|
|||||||
} catch(IOException e) {
|
} catch(IOException e) {
|
||||||
LOG.warn(e);
|
LOG.warn(e);
|
||||||
sleepFor(5000);
|
sleepFor(5000);
|
||||||
|
} catch(KeeperException e) {
|
||||||
|
LOG.warn(e);
|
||||||
|
sleepFor(5000);
|
||||||
}
|
}
|
||||||
++connectionRetryCount;
|
++connectionRetryCount;
|
||||||
}
|
}
|
||||||
return success;
|
return success;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void createConnection() throws IOException {
|
private void createConnection() throws IOException, KeeperException {
|
||||||
if (zkClient != null) {
|
if (zkClient != null) {
|
||||||
try {
|
try {
|
||||||
zkClient.close();
|
zkClient.close();
|
||||||
@ -973,14 +993,76 @@ private synchronized boolean isStaleClient(Object ctx) {
|
|||||||
* events.
|
* events.
|
||||||
*/
|
*/
|
||||||
private final class WatcherWithClientRef implements Watcher {
|
private final class WatcherWithClientRef implements Watcher {
|
||||||
private final ZooKeeper zk;
|
private ZooKeeper zk;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Latch fired whenever any event arrives. This is used in order
|
||||||
|
* to wait for the Connected event when the client is first created.
|
||||||
|
*/
|
||||||
|
private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* If any events arrive before the reference to ZooKeeper is set,
|
||||||
|
* they get queued up and later forwarded when the reference is
|
||||||
|
* available.
|
||||||
|
*/
|
||||||
|
private final List<WatchedEvent> queuedEvents = Lists.newLinkedList();
|
||||||
|
|
||||||
|
private WatcherWithClientRef() {
|
||||||
|
}
|
||||||
|
|
||||||
private WatcherWithClientRef(ZooKeeper zk) {
|
private WatcherWithClientRef(ZooKeeper zk) {
|
||||||
this.zk = zk;
|
this.zk = zk;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Waits for the next event from ZooKeeper to arrive.
|
||||||
|
*
|
||||||
|
* @param connectionTimeoutMs zookeeper connection timeout in milliseconds
|
||||||
|
* @throws KeeperException if the connection attempt times out. This will
|
||||||
|
* be a ZooKeeper ConnectionLoss exception code.
|
||||||
|
* @throws IOException if interrupted while connecting to ZooKeeper
|
||||||
|
*/
|
||||||
|
private void waitForZKConnectionEvent(int connectionTimeoutMs)
|
||||||
|
throws KeeperException, IOException {
|
||||||
|
try {
|
||||||
|
if (!hasReceivedEvent.await(connectionTimeoutMs, TimeUnit.MILLISECONDS)) {
|
||||||
|
LOG.error("Connection timed out: couldn't connect to ZooKeeper in "
|
||||||
|
+ connectionTimeoutMs + " milliseconds");
|
||||||
|
synchronized (this) {
|
||||||
|
zk.close();
|
||||||
|
}
|
||||||
|
throw KeeperException.create(Code.CONNECTIONLOSS);
|
||||||
|
}
|
||||||
|
} catch (InterruptedException e) {
|
||||||
|
Thread.currentThread().interrupt();
|
||||||
|
throw new IOException(
|
||||||
|
"Interrupted when connecting to zookeeper server", e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private synchronized void setZooKeeperRef(ZooKeeper zk) {
|
||||||
|
Preconditions.checkState(this.zk == null,
|
||||||
|
"zk already set -- must be set exactly once");
|
||||||
|
this.zk = zk;
|
||||||
|
|
||||||
|
for (WatchedEvent e : queuedEvents) {
|
||||||
|
forwardEvent(e);
|
||||||
|
}
|
||||||
|
queuedEvents.clear();
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void process(WatchedEvent event) {
|
public synchronized void process(WatchedEvent event) {
|
||||||
|
if (zk != null) {
|
||||||
|
forwardEvent(event);
|
||||||
|
} else {
|
||||||
|
queuedEvents.add(event);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void forwardEvent(WatchedEvent event) {
|
||||||
|
hasReceivedEvent.countDown();
|
||||||
try {
|
try {
|
||||||
ActiveStandbyElector.this.processWatchEvent(
|
ActiveStandbyElector.this.processWatchEvent(
|
||||||
zk, event);
|
zk, event);
|
||||||
@ -1024,5 +1106,4 @@ public String toString() {
|
|||||||
((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
|
((appData == null) ? "null" : StringUtils.byteToHexString(appData)) +
|
||||||
" cb=" + appClient;
|
" cb=" + appClient;
|
||||||
}
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -180,7 +180,15 @@ public Integer run() {
|
|||||||
|
|
||||||
private int doRun(String[] args)
|
private int doRun(String[] args)
|
||||||
throws HadoopIllegalArgumentException, IOException, InterruptedException {
|
throws HadoopIllegalArgumentException, IOException, InterruptedException {
|
||||||
|
try {
|
||||||
initZK();
|
initZK();
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
LOG.fatal("Unable to start failover controller. Unable to connect "
|
||||||
|
+ "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
|
||||||
|
+ "configured value for " + ZK_QUORUM_KEY + " and ensure that "
|
||||||
|
+ "ZooKeeper is running.");
|
||||||
|
return ERR_CODE_NO_ZK;
|
||||||
|
}
|
||||||
if (args.length > 0) {
|
if (args.length > 0) {
|
||||||
if ("-formatZK".equals(args[0])) {
|
if ("-formatZK".equals(args[0])) {
|
||||||
boolean force = false;
|
boolean force = false;
|
||||||
@ -200,24 +208,12 @@ private int doRun(String[] args)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
try {
|
|
||||||
if (!elector.parentZNodeExists()) {
|
if (!elector.parentZNodeExists()) {
|
||||||
LOG.fatal("Unable to start failover controller. " +
|
LOG.fatal("Unable to start failover controller. "
|
||||||
"Parent znode does not exist.\n" +
|
+ "Parent znode does not exist.\n"
|
||||||
"Run with -formatZK flag to initialize ZooKeeper.");
|
+ "Run with -formatZK flag to initialize ZooKeeper.");
|
||||||
return ERR_CODE_NO_PARENT_ZNODE;
|
return ERR_CODE_NO_PARENT_ZNODE;
|
||||||
}
|
}
|
||||||
} catch (IOException ioe) {
|
|
||||||
if (ioe.getCause() instanceof KeeperException.ConnectionLossException) {
|
|
||||||
LOG.fatal("Unable to start failover controller. Unable to connect " +
|
|
||||||
"to ZooKeeper quorum at " + zkQuorum + ". Please check the " +
|
|
||||||
"configured value for " + ZK_QUORUM_KEY + " and ensure that " +
|
|
||||||
"ZooKeeper is running.");
|
|
||||||
return ERR_CODE_NO_ZK;
|
|
||||||
} else {
|
|
||||||
throw ioe;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
try {
|
try {
|
||||||
localTarget.checkFencingConfigured();
|
localTarget.checkFencingConfigured();
|
||||||
@ -310,7 +306,8 @@ protected void startRPC() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
private void initZK() throws HadoopIllegalArgumentException, IOException {
|
private void initZK() throws HadoopIllegalArgumentException, IOException,
|
||||||
|
KeeperException {
|
||||||
zkQuorum = conf.get(ZK_QUORUM_KEY);
|
zkQuorum = conf.get(ZK_QUORUM_KEY);
|
||||||
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
|
int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
|
||||||
ZK_SESSION_TIMEOUT_DEFAULT);
|
ZK_SESSION_TIMEOUT_DEFAULT);
|
||||||
|
@ -42,6 +42,7 @@
|
|||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveStandbyElectorCallback;
|
||||||
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
import org.apache.hadoop.ha.ActiveStandbyElector.ActiveNotFoundException;
|
||||||
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
|
import org.apache.hadoop.ha.HAZKUtil.ZKAuthInfo;
|
||||||
|
import org.apache.hadoop.test.GenericTestUtils;
|
||||||
|
|
||||||
public class TestActiveStandbyElector {
|
public class TestActiveStandbyElector {
|
||||||
|
|
||||||
@ -56,7 +57,8 @@ class ActiveStandbyElectorTester extends ActiveStandbyElector {
|
|||||||
private int sleptFor = 0;
|
private int sleptFor = 0;
|
||||||
|
|
||||||
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
ActiveStandbyElectorTester(String hostPort, int timeout, String parent,
|
||||||
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException {
|
List<ACL> acl, ActiveStandbyElectorCallback app) throws IOException,
|
||||||
|
KeeperException {
|
||||||
super(hostPort, timeout, parent, acl,
|
super(hostPort, timeout, parent, acl,
|
||||||
Collections.<ZKAuthInfo>emptyList(), app);
|
Collections.<ZKAuthInfo>emptyList(), app);
|
||||||
}
|
}
|
||||||
@ -83,7 +85,7 @@ protected void sleepFor(int ms) {
|
|||||||
ActiveStandbyElector.BREADCRUMB_FILENAME;
|
ActiveStandbyElector.BREADCRUMB_FILENAME;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void init() throws IOException {
|
public void init() throws IOException, KeeperException {
|
||||||
count = 0;
|
count = 0;
|
||||||
mockZK = Mockito.mock(ZooKeeper.class);
|
mockZK = Mockito.mock(ZooKeeper.class);
|
||||||
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
|
mockApp = Mockito.mock(ActiveStandbyElectorCallback.class);
|
||||||
@ -705,4 +707,18 @@ public void testEnsureBaseNodeFails() throws Exception {
|
|||||||
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
|
Mockito.eq(ZK_PARENT_NAME), Mockito.<byte[]>any(),
|
||||||
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
|
Mockito.eq(Ids.OPEN_ACL_UNSAFE), Mockito.eq(CreateMode.PERSISTENT));
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* verify the zookeeper connection establishment
|
||||||
|
*/
|
||||||
|
@Test
|
||||||
|
public void testWithoutZKServer() throws Exception {
|
||||||
|
try {
|
||||||
|
new ActiveStandbyElector("127.0.0.1", 2000, ZK_PARENT_NAME,
|
||||||
|
Ids.OPEN_ACL_UNSAFE, Collections.<ZKAuthInfo> emptyList(), mockApp);
|
||||||
|
Assert.fail("Did not throw zookeeper connection loss exceptions!");
|
||||||
|
} catch (KeeperException ke) {
|
||||||
|
GenericTestUtils.assertExceptionContains( "ConnectionLoss", ke);
|
||||||
|
}
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user