YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator. Contributed by Karthik Kambatla
This commit is contained in:
parent
0e80d51983
commit
960b8f19ca
@ -294,6 +294,9 @@ Release 2.8.0 - UNRELEASED
|
||||
|
||||
YARN-1462. AHS API and other AHS changes to handle tags for completed MR jobs. (xgong)
|
||||
|
||||
YARN-2716. Refactor ZKRMStateStore retry code with Apache Curator.
|
||||
(Karthik Kambatla via jianhe)
|
||||
|
||||
OPTIMIZATIONS
|
||||
|
||||
YARN-3339. TestDockerContainerExecutor should pull a single image and not
|
||||
|
@ -413,7 +413,7 @@ private static void addDeprecatedKeys() {
|
||||
|
||||
public static final String RM_ZK_RETRY_INTERVAL_MS =
|
||||
RM_ZK_PREFIX + "retry-interval-ms";
|
||||
public static final long DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
|
||||
public static final int DEFAULT_RM_ZK_RETRY_INTERVAL_MS = 1000;
|
||||
|
||||
public static final String RM_ZK_TIMEOUT_MS = RM_ZK_PREFIX + "timeout-ms";
|
||||
public static final int DEFAULT_RM_ZK_TIMEOUT_MS = 10000;
|
||||
|
@ -174,6 +174,14 @@
|
||||
<groupId>org.apache.hadoop</groupId>
|
||||
<artifactId>hadoop-yarn-server-web-proxy</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-client</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.curator</groupId>
|
||||
<artifactId>curator-test</artifactId>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.apache.zookeeper</groupId>
|
||||
<artifactId>zookeeper</artifactId>
|
||||
|
File diff suppressed because it is too large
Load Diff
@ -40,7 +40,6 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||
import org.apache.hadoop.io.Text;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.delegation.DelegationKey;
|
||||
@ -76,7 +75,7 @@
|
||||
import org.apache.hadoop.yarn.server.security.MasterKeyData;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
|
||||
public class RMStateStoreTestBase extends ClientBaseWithFixes{
|
||||
public class RMStateStoreTestBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(RMStateStoreTestBase.class);
|
||||
|
||||
|
@ -18,18 +18,10 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.recovery;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.curator.framework.CuratorFramework;
|
||||
import org.apache.curator.framework.CuratorFrameworkFactory;
|
||||
import org.apache.curator.retry.RetryNTimes;
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
@ -50,6 +42,7 @@
|
||||
import org.apache.hadoop.yarn.security.client.RMDelegationTokenIdentifier;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.hadoop.yarn.server.records.impl.pb.VersionPBImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationAttemptStateData;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
@ -61,22 +54,49 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
|
||||
import org.apache.hadoop.yarn.util.ConverterUtils;
|
||||
import org.apache.zookeeper.KeeperException;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.io.IOException;
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
public class TestZKRMStateStore extends RMStateStoreTestBase {
|
||||
|
||||
public static final Log LOG = LogFactory.getLog(TestZKRMStateStore.class);
|
||||
private static final int ZK_TIMEOUT_MS = 1000;
|
||||
private TestingServer curatorTestingServer;
|
||||
private CuratorFramework curatorFramework;
|
||||
|
||||
@Before
|
||||
public void setupCuratorServer() throws Exception {
|
||||
curatorTestingServer = new TestingServer();
|
||||
curatorTestingServer.start();
|
||||
curatorFramework = CuratorFrameworkFactory.builder()
|
||||
.connectString(curatorTestingServer.getConnectString())
|
||||
.retryPolicy(new RetryNTimes(100, 100))
|
||||
.build();
|
||||
curatorFramework.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupCuratorServer() throws IOException {
|
||||
curatorFramework.close();
|
||||
curatorTestingServer.stop();
|
||||
}
|
||||
|
||||
class TestZKRMStateStoreTester implements RMStateStoreHelper {
|
||||
|
||||
ZooKeeper client;
|
||||
TestZKRMStateStoreInternal store;
|
||||
String workingZnode;
|
||||
|
||||
|
||||
class TestZKRMStateStoreInternal extends ZKRMStateStore {
|
||||
|
||||
public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
||||
@ -86,11 +106,6 @@ public TestZKRMStateStoreInternal(Configuration conf, String workingZnode)
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZooKeeper getNewZooKeeper() throws IOException {
|
||||
return client;
|
||||
}
|
||||
|
||||
public String getVersionNode() {
|
||||
return znodeWorkingPath + "/" + ROOT_ZNODE_NAME + "/" + VERSION_NODE;
|
||||
}
|
||||
@ -109,7 +124,7 @@ public String getAppNode(String appId) {
|
||||
* @throws Exception
|
||||
*/
|
||||
public void testRetryingCreateRootDir() throws Exception {
|
||||
createRootDir(znodeWorkingPath);
|
||||
create(znodeWorkingPath);
|
||||
}
|
||||
|
||||
}
|
||||
@ -117,23 +132,24 @@ public void testRetryingCreateRootDir() throws Exception {
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
workingZnode = "/jira/issue/3077/rmstore";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
curatorTestingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.client = createClient();
|
||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode);
|
||||
return this.store;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean isFinalStateValid() throws Exception {
|
||||
List<String> nodes = client.getChildren(store.znodeWorkingPath, false);
|
||||
return nodes.size() == 1;
|
||||
return 1 ==
|
||||
curatorFramework.getChildren().forPath(store.znodeWorkingPath).size();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void writeVersion(Version version) throws Exception {
|
||||
client.setData(store.getVersionNode(), ((VersionPBImpl) version)
|
||||
.getProto().toByteArray(), -1);
|
||||
curatorFramework.setData().withVersion(-1)
|
||||
.forPath(store.getVersionNode(),
|
||||
((VersionPBImpl) version).getProto().toByteArray());
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -142,10 +158,8 @@ public Version getCurrentVersion() throws Exception {
|
||||
}
|
||||
|
||||
public boolean appExists(RMApp app) throws Exception {
|
||||
Stat node =
|
||||
client.exists(store.getAppNode(app.getApplicationId().toString()),
|
||||
false);
|
||||
return node !=null;
|
||||
return null != curatorFramework.checkExists()
|
||||
.forPath(store.getAppNode(app.getApplicationId().toString()));
|
||||
}
|
||||
}
|
||||
|
||||
@ -178,9 +192,9 @@ public Version getCurrentVersion() throws Exception {
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
workingZnode = "/jira/issue/3077/rmstore";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
curatorTestingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.client = createClient();
|
||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
|
||||
Version storedVersion = null;
|
||||
|
||||
@ -217,7 +231,8 @@ private Configuration createHARMConf(
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
curatorTestingServer.getConnectString());
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
||||
|
@ -25,6 +25,8 @@
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import javax.crypto.SecretKey;
|
||||
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.util.Tool;
|
||||
@ -73,10 +75,11 @@ public class TestZKRMStateStorePerf extends RMStateStoreTestBase
|
||||
private ZKRMStateStore store;
|
||||
private AMRMTokenSecretManager appTokenMgr;
|
||||
private ClientToAMTokenSecretManagerInRM clientToAMTokenMgr;
|
||||
private TestingServer curatorTestingServer;
|
||||
|
||||
@Before
|
||||
public void setUpZKServer() throws Exception {
|
||||
super.setUp();
|
||||
curatorTestingServer = new TestingServer();
|
||||
}
|
||||
|
||||
@After
|
||||
@ -87,7 +90,7 @@ public void tearDown() throws Exception {
|
||||
if (appTokenMgr != null) {
|
||||
appTokenMgr.stop();
|
||||
}
|
||||
super.tearDown();
|
||||
curatorTestingServer.stop();
|
||||
}
|
||||
|
||||
private void initStore(String hostPort) {
|
||||
@ -95,7 +98,8 @@ private void initStore(String hostPort) {
|
||||
RMContext rmContext = mock(RMContext.class);
|
||||
|
||||
conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, optHostPort.or(this.hostPort));
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
optHostPort.or(curatorTestingServer.getConnectString()));
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
|
||||
store = new ZKRMStateStore();
|
||||
@ -140,7 +144,7 @@ public int run(String[] args) {
|
||||
|
||||
if (launchLocalZK) {
|
||||
try {
|
||||
setUp();
|
||||
setUpZKServer();
|
||||
} catch (Exception e) {
|
||||
System.err.println("failed to setup. : " + e.getMessage());
|
||||
return -1;
|
||||
|
@ -20,39 +20,32 @@
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.ha.ClientBaseWithFixes;
|
||||
import org.apache.hadoop.util.StringUtils;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.WatchedEvent;
|
||||
import org.apache.zookeeper.Watcher;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.ZooKeeper;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.CyclicBarrier;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
public class TestZKRMStateStoreZKClientConnections extends
|
||||
ClientBaseWithFixes {
|
||||
|
||||
private static final int ZK_OP_WAIT_TIME = 3000;
|
||||
private static final int ZK_TIMEOUT_MS = 1000;
|
||||
public class TestZKRMStateStoreZKClientConnections {
|
||||
private Log LOG =
|
||||
LogFactory.getLog(TestZKRMStateStoreZKClientConnections.class);
|
||||
|
||||
private static final int ZK_TIMEOUT_MS = 1000;
|
||||
private static final String DIGEST_USER_PASS="test-user:test-password";
|
||||
private static final String TEST_AUTH_GOOD = "digest:" + DIGEST_USER_PASS;
|
||||
private static final String DIGEST_USER_HASH;
|
||||
@ -66,14 +59,22 @@ public class TestZKRMStateStoreZKClientConnections extends
|
||||
}
|
||||
private static final String TEST_ACL = "digest:" + DIGEST_USER_HASH + ":rwcda";
|
||||
|
||||
private TestingServer testingServer;
|
||||
|
||||
@Before
|
||||
public void setupZKServer() throws Exception {
|
||||
testingServer = new TestingServer();
|
||||
testingServer.start();
|
||||
}
|
||||
|
||||
@After
|
||||
public void cleanupZKServer() throws Exception {
|
||||
testingServer.stop();
|
||||
}
|
||||
|
||||
class TestZKClient {
|
||||
|
||||
ZKRMStateStore store;
|
||||
boolean forExpire = false;
|
||||
TestForwardingWatcher oldWatcher;
|
||||
TestForwardingWatcher watcher;
|
||||
CyclicBarrier syncBarrier = new CyclicBarrier(2);
|
||||
|
||||
protected class TestZKRMStateStore extends ZKRMStateStore {
|
||||
|
||||
@ -83,51 +84,12 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
|
||||
start();
|
||||
assertTrue(znodeWorkingPath.equals(workingZnode));
|
||||
}
|
||||
|
||||
@Override
|
||||
public ZooKeeper getNewZooKeeper()
|
||||
throws IOException, InterruptedException {
|
||||
oldWatcher = watcher;
|
||||
watcher = new TestForwardingWatcher();
|
||||
return createClient(watcher, hostPort, ZK_TIMEOUT_MS);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void processWatchEvent(ZooKeeper zk,
|
||||
WatchedEvent event) throws Exception {
|
||||
|
||||
if (forExpire) {
|
||||
// a hack... couldn't find a way to trigger expired event.
|
||||
WatchedEvent expriredEvent = new WatchedEvent(
|
||||
Watcher.Event.EventType.None,
|
||||
Watcher.Event.KeeperState.Expired, null);
|
||||
super.processWatchEvent(zk, expriredEvent);
|
||||
forExpire = false;
|
||||
syncBarrier.await();
|
||||
} else {
|
||||
super.processWatchEvent(zk, event);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private class TestForwardingWatcher extends
|
||||
ClientBaseWithFixes.CountdownWatcher {
|
||||
public void process(WatchedEvent event) {
|
||||
super.process(event);
|
||||
try {
|
||||
if (store != null) {
|
||||
store.processWatchEvent(client, event);
|
||||
}
|
||||
} catch (Throwable t) {
|
||||
LOG.error("Failed to process watcher event " + event + ": "
|
||||
+ StringUtils.stringifyException(t));
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
|
||||
String workingZnode = "/Test";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS, hostPort);
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
testingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.store = new TestZKRMStateStore(conf, workingZnode);
|
||||
return this.store;
|
||||
@ -147,12 +109,12 @@ public void testZKClientRetry() throws Exception {
|
||||
store.setRMDispatcher(dispatcher);
|
||||
final AtomicBoolean assertionFailedInThread = new AtomicBoolean(false);
|
||||
|
||||
stopServer();
|
||||
testingServer.stop();
|
||||
Thread clientThread = new Thread() {
|
||||
@Override
|
||||
public void run() {
|
||||
try {
|
||||
store.getDataWithRetries(path, true);
|
||||
store.getData(path);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
assertionFailedInThread.set(true);
|
||||
@ -160,114 +122,19 @@ public void run() {
|
||||
}
|
||||
};
|
||||
Thread.sleep(2000);
|
||||
startServer();
|
||||
testingServer.start();
|
||||
clientThread.join();
|
||||
Assert.assertFalse(assertionFailedInThread.get());
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testZKClientDisconnectAndReconnect()
|
||||
throws Exception {
|
||||
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
String path = "/test";
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
ZKRMStateStore store =
|
||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||
TestDispatcher dispatcher = new TestDispatcher();
|
||||
store.setRMDispatcher(dispatcher);
|
||||
|
||||
// trigger watch
|
||||
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
store.getDataWithRetries(path, true);
|
||||
store.setDataWithRetries(path, "newBytes".getBytes(), 0);
|
||||
|
||||
stopServer();
|
||||
zkClientTester.watcher.waitForDisconnected(ZK_OP_WAIT_TIME);
|
||||
try {
|
||||
store.getDataWithRetries(path, true);
|
||||
fail("Expected ZKClient time out exception");
|
||||
} catch (Exception e) {
|
||||
assertTrue(e.getMessage().contains(
|
||||
"Wait for ZKClient creation timed out"));
|
||||
}
|
||||
|
||||
// ZKRMStateStore Session restored
|
||||
startServer();
|
||||
zkClientTester.watcher.waitForConnected(ZK_OP_WAIT_TIME);
|
||||
byte[] ret = null;
|
||||
try {
|
||||
ret = store.getDataWithRetries(path, true);
|
||||
} catch (Exception e) {
|
||||
String error = "ZKRMStateStore Session restore failed";
|
||||
LOG.error(error, e);
|
||||
fail(error);
|
||||
}
|
||||
assertEquals("newBytes", new String(ret));
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testZKSessionTimeout() throws Exception {
|
||||
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
String path = "/test";
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
ZKRMStateStore store =
|
||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||
TestDispatcher dispatcher = new TestDispatcher();
|
||||
store.setRMDispatcher(dispatcher);
|
||||
|
||||
// a hack to trigger expired event
|
||||
zkClientTester.forExpire = true;
|
||||
|
||||
// trigger watch
|
||||
store.createWithRetries(path, null, ZooDefs.Ids.OPEN_ACL_UNSAFE,
|
||||
CreateMode.PERSISTENT);
|
||||
store.getDataWithRetries(path, true);
|
||||
store.setDataWithRetries(path, "bytes".getBytes(), 0);
|
||||
|
||||
zkClientTester.syncBarrier.await();
|
||||
// after this point, expired event has already been processed.
|
||||
|
||||
try {
|
||||
byte[] ret = store.getDataWithRetries(path, false);
|
||||
assertEquals("bytes", new String(ret));
|
||||
} catch (Exception e) {
|
||||
String error = "New session creation failed";
|
||||
LOG.error(error, e);
|
||||
fail(error);
|
||||
}
|
||||
|
||||
// send Disconnected event from old client session to ZKRMStateStore
|
||||
// check the current client session is not affected.
|
||||
Assert.assertTrue(zkClientTester.oldWatcher != null);
|
||||
WatchedEvent disconnectedEvent = new WatchedEvent(
|
||||
Watcher.Event.EventType.None,
|
||||
Watcher.Event.KeeperState.Disconnected, null);
|
||||
zkClientTester.oldWatcher.process(disconnectedEvent);
|
||||
Assert.assertTrue(store.zkClient != null);
|
||||
|
||||
zkClientTester.watcher.process(disconnectedEvent);
|
||||
Assert.assertTrue(store.zkClient == null);
|
||||
WatchedEvent connectedEvent = new WatchedEvent(
|
||||
Watcher.Event.EventType.None,
|
||||
Watcher.Event.KeeperState.SyncConnected, null);
|
||||
zkClientTester.watcher.process(connectedEvent);
|
||||
Assert.assertTrue(store.zkClient != null);
|
||||
Assert.assertTrue(store.zkClient == store.activeZkClient);
|
||||
}
|
||||
|
||||
@Test(timeout = 20000)
|
||||
public void testSetZKAcl() {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
|
||||
try {
|
||||
zkClientTester.store.zkClient.delete(zkClientTester.store
|
||||
.znodeWorkingPath, -1);
|
||||
zkClientTester.store.delete(zkClientTester.store
|
||||
.znodeWorkingPath);
|
||||
fail("Shouldn't be able to delete path");
|
||||
} catch (Exception e) {/* expected behavior */
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user