YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving events for old client. (Zhihai Xu via kasha)

This commit is contained in:
Karthik Kambatla 2015-03-04 19:47:02 -08:00
parent ded0200e9c
commit 8d88691d16
4 changed files with 70 additions and 30 deletions

View File

@ -90,6 +90,14 @@ protected static class CountdownWatcher implements Watcher {
// XXX this doesn't need to be volatile! (Should probably be final) // XXX this doesn't need to be volatile! (Should probably be final)
volatile CountDownLatch clientConnected; volatile CountDownLatch clientConnected;
volatile boolean connected; volatile boolean connected;
protected ZooKeeper client;
public void initializeWatchedClient(ZooKeeper zk) {
if (client != null) {
throw new RuntimeException("Watched Client was already set");
}
client = zk;
}
public CountdownWatcher() { public CountdownWatcher() {
reset(); reset();
@ -191,8 +199,7 @@ protected TestableZooKeeper createClient(CountdownWatcher watcher,
zk.close(); zk.close();
} }
} }
watcher.initializeWatchedClient(zk);
return zk; return zk;
} }

View File

@ -701,6 +701,9 @@ Release 2.7.0 - UNRELEASED
YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending
jobs. (Siqi Li via kasha) jobs. (Siqi Li via kasha)
YARN-3242. Asynchrony in ZK-close can lead to ZKRMStateStore watcher receiving
events for old client. (Zhihai Xu via kasha)
Release 2.6.0 - 2014-11-18 Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -153,7 +153,13 @@ public class ZKRMStateStore extends RMStateStore {
@VisibleForTesting @VisibleForTesting
protected ZooKeeper zkClient; protected ZooKeeper zkClient;
private ZooKeeper oldZkClient;
/* activeZkClient is not used to do actual operations,
* it is only used to verify client session for watched events and
* it gets activated into zkClient on connection event.
*/
@VisibleForTesting
ZooKeeper activeZkClient;
/** Fencing related variables */ /** Fencing related variables */
private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK"; private static final String FENCING_LOCK = "RM_ZK_FENCING_LOCK";
@ -355,21 +361,14 @@ public Void run() throws KeeperException, InterruptedException {
} }
private synchronized void closeZkClients() throws IOException { private synchronized void closeZkClients() throws IOException {
if (zkClient != null) { zkClient = null;
if (activeZkClient != null) {
try { try {
zkClient.close(); activeZkClient.close();
} catch (InterruptedException e) { } catch (InterruptedException e) {
throw new IOException("Interrupted while closing ZK", e); throw new IOException("Interrupted while closing ZK", e);
} }
zkClient = null; activeZkClient = null;
}
if (oldZkClient != null) {
try {
oldZkClient.close();
} catch (InterruptedException e) {
throw new IOException("Interrupted while closing old ZK", e);
}
oldZkClient = null;
} }
} }
@ -830,11 +829,16 @@ public synchronized void deleteStore() throws Exception {
* hides the ZK methods of the store from its public interface * hides the ZK methods of the store from its public interface
*/ */
private final class ForwardingWatcher implements Watcher { private final class ForwardingWatcher implements Watcher {
private ZooKeeper watchedZkClient;
public ForwardingWatcher(ZooKeeper client) {
this.watchedZkClient = client;
}
@Override @Override
public void process(WatchedEvent event) { public void process(WatchedEvent event) {
try { try {
ZKRMStateStore.this.processWatchEvent(event); ZKRMStateStore.this.processWatchEvent(watchedZkClient, event);
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": " LOG.error("Failed to process watcher event " + event + ": "
+ StringUtils.stringifyException(t)); + StringUtils.stringifyException(t));
@ -845,8 +849,16 @@ public void process(WatchedEvent event) {
@VisibleForTesting @VisibleForTesting
@Private @Private
@Unstable @Unstable
public synchronized void processWatchEvent(WatchedEvent event) public synchronized void processWatchEvent(ZooKeeper zk,
throws Exception { WatchedEvent event) throws Exception {
// only process watcher event from current ZooKeeper Client session.
if (zk != activeZkClient) {
LOG.info("Ignore watcher event type: " + event.getType() +
" with state:" + event.getState() + " for path:" +
event.getPath() + " from old session");
return;
}
Event.EventType eventType = event.getType(); Event.EventType eventType = event.getType();
LOG.info("Watcher event type: " + eventType + " with state:" LOG.info("Watcher event type: " + eventType + " with state:"
+ event.getState() + " for path:" + event.getPath() + " for " + this); + event.getState() + " for path:" + event.getPath() + " for " + this);
@ -857,17 +869,15 @@ public synchronized void processWatchEvent(WatchedEvent event)
switch (event.getState()) { switch (event.getState()) {
case SyncConnected: case SyncConnected:
LOG.info("ZKRMStateStore Session connected"); LOG.info("ZKRMStateStore Session connected");
if (oldZkClient != null) { if (zkClient == null) {
// the SyncConnected must be from the client that sent Disconnected // the SyncConnected must be from the client that sent Disconnected
zkClient = oldZkClient; zkClient = activeZkClient;
oldZkClient = null;
ZKRMStateStore.this.notifyAll(); ZKRMStateStore.this.notifyAll();
LOG.info("ZKRMStateStore Session restored"); LOG.info("ZKRMStateStore Session restored");
} }
break; break;
case Disconnected: case Disconnected:
LOG.info("ZKRMStateStore Session disconnected"); LOG.info("ZKRMStateStore Session disconnected");
oldZkClient = zkClient;
zkClient = null; zkClient = null;
break; break;
case Expired: case Expired:
@ -1100,7 +1110,8 @@ private synchronized void createConnection()
for (int retries = 0; retries < numRetries && zkClient == null; for (int retries = 0; retries < numRetries && zkClient == null;
retries++) { retries++) {
try { try {
zkClient = getNewZooKeeper(); activeZkClient = getNewZooKeeper();
zkClient = activeZkClient;
for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) { for (ZKUtil.ZKAuthInfo zkAuth : zkAuths) {
zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth()); zkClient.addAuthInfo(zkAuth.getScheme(), zkAuth.getAuth());
} }
@ -1130,7 +1141,7 @@ private synchronized void createConnection()
protected synchronized ZooKeeper getNewZooKeeper() protected synchronized ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException { throws IOException, InterruptedException {
ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null); ZooKeeper zk = new ZooKeeper(zkHostPort, zkSessionTimeout, null);
zk.register(new ForwardingWatcher()); zk.register(new ForwardingWatcher(zk));
return zk; return zk;
} }

View File

@ -71,6 +71,7 @@ class TestZKClient {
ZKRMStateStore store; ZKRMStateStore store;
boolean forExpire = false; boolean forExpire = false;
TestForwardingWatcher oldWatcher;
TestForwardingWatcher watcher; TestForwardingWatcher watcher;
CyclicBarrier syncBarrier = new CyclicBarrier(2); CyclicBarrier syncBarrier = new CyclicBarrier(2);
@ -86,35 +87,36 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
@Override @Override
public ZooKeeper getNewZooKeeper() public ZooKeeper getNewZooKeeper()
throws IOException, InterruptedException { throws IOException, InterruptedException {
oldWatcher = watcher;
watcher = new TestForwardingWatcher();
return createClient(watcher, hostPort, ZK_TIMEOUT_MS); return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
} }
@Override @Override
public synchronized void processWatchEvent(WatchedEvent event) public synchronized void processWatchEvent(ZooKeeper zk,
throws Exception { WatchedEvent event) throws Exception {
if (forExpire) { if (forExpire) {
// a hack... couldn't find a way to trigger expired event. // a hack... couldn't find a way to trigger expired event.
WatchedEvent expriredEvent = new WatchedEvent( WatchedEvent expriredEvent = new WatchedEvent(
Watcher.Event.EventType.None, Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Expired, null); Watcher.Event.KeeperState.Expired, null);
super.processWatchEvent(expriredEvent); super.processWatchEvent(zk, expriredEvent);
forExpire = false; forExpire = false;
syncBarrier.await(); syncBarrier.await();
} else { } else {
super.processWatchEvent(event); super.processWatchEvent(zk, event);
} }
} }
} }
private class TestForwardingWatcher extends private class TestForwardingWatcher extends
ClientBaseWithFixes.CountdownWatcher { ClientBaseWithFixes.CountdownWatcher {
public void process(WatchedEvent event) { public void process(WatchedEvent event) {
super.process(event); super.process(event);
try { try {
if (store != null) { if (store != null) {
store.processWatchEvent(event); store.processWatchEvent(client, event);
} }
} catch (Throwable t) { } catch (Throwable t) {
LOG.error("Failed to process watcher event " + event + ": " LOG.error("Failed to process watcher event " + event + ": "
@ -127,7 +129,6 @@ public RMStateStore getRMStateStore(Configuration conf) throws Exception {
String workingZnode = "/Test"; String workingZnode = "/Test";
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort); conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode); conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
watcher = new TestForwardingWatcher();
this.store = new TestZKRMStateStore(conf, workingZnode); this.store = new TestZKRMStateStore(conf, workingZnode);
return this.store; return this.store;
} }
@ -239,6 +240,24 @@ public void testZKSessionTimeout() throws Exception {
LOG.error(error, e); LOG.error(error, e);
fail(error); fail(error);
} }
// send Disconnected event from old client session to ZKRMStateStore
// check the current client session is not affected.
Assert.assertTrue(zkClientTester.oldWatcher != null);
WatchedEvent disconnectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.Disconnected, null);
zkClientTester.oldWatcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient != null);
zkClientTester.watcher.process(disconnectedEvent);
Assert.assertTrue(store.zkClient == null);
WatchedEvent connectedEvent = new WatchedEvent(
Watcher.Event.EventType.None,
Watcher.Event.KeeperState.SyncConnected, null);
zkClientTester.watcher.process(connectedEvent);
Assert.assertTrue(store.zkClient != null);
Assert.assertTrue(store.zkClient == store.activeZkClient);
} }
@Test(timeout = 20000) @Test(timeout = 20000)