YARN-8405. RM zk-state-store.parent-path ACLs has been changed since HADOOP-14773. Contributed by Íñigo Goiri.
This commit is contained in:
parent
2b2f672022
commit
2df73dace0
@ -290,6 +290,18 @@ public boolean create(final String path, List<ACL> zkAcl) throws Exception {
|
||||
* @throws Exception If it cannot create the file.
|
||||
*/
|
||||
public void createRootDirRecursively(String path) throws Exception {
|
||||
createRootDirRecursively(path, null);
|
||||
}
|
||||
|
||||
/**
|
||||
* Utility function to ensure that the configured base znode exists.
|
||||
* This recursively creates the znode as well as all of its parents.
|
||||
* @param path Path of the znode to create.
|
||||
* @param zkAcl ACLs for ZooKeeper.
|
||||
* @throws Exception If it cannot create the file.
|
||||
*/
|
||||
public void createRootDirRecursively(String path, List<ACL> zkAcl)
|
||||
throws Exception {
|
||||
String[] pathParts = path.split("/");
|
||||
Preconditions.checkArgument(
|
||||
pathParts.length >= 1 && pathParts[0].isEmpty(),
|
||||
@ -298,7 +310,7 @@ public void createRootDirRecursively(String path) throws Exception {
|
||||
|
||||
for (int i = 1; i < pathParts.length; i++) {
|
||||
sb.append("/").append(pathParts[i]);
|
||||
create(sb.toString());
|
||||
create(sb.toString(), zkAcl);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -35,6 +35,7 @@
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.Query;
|
||||
import org.apache.hadoop.hdfs.server.federation.store.records.QueryResult;
|
||||
import org.apache.hadoop.util.curator.ZKCuratorManager;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.apache.zookeeper.data.Stat;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
@ -70,6 +71,8 @@ public class StateStoreZooKeeperImpl extends StateStoreSerializableImpl {
|
||||
|
||||
/** Interface to ZooKeeper. */
|
||||
private ZKCuratorManager zkManager;
|
||||
/** ACLs for ZooKeeper. */
|
||||
private List<ACL> zkAcl;
|
||||
|
||||
|
||||
@Override
|
||||
@ -83,6 +86,7 @@ public boolean initDriver() {
|
||||
try {
|
||||
this.zkManager = new ZKCuratorManager(conf);
|
||||
this.zkManager.start();
|
||||
this.zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Cannot initialize the ZK connection", e);
|
||||
return false;
|
||||
@ -95,7 +99,7 @@ public <T extends BaseRecord> boolean initRecordStorage(
|
||||
String className, Class<T> clazz) {
|
||||
try {
|
||||
String checkPath = getNodePath(baseZNode, className);
|
||||
zkManager.createRootDirRecursively(checkPath);
|
||||
zkManager.createRootDirRecursively(checkPath, zkAcl);
|
||||
return true;
|
||||
} catch (Exception e) {
|
||||
LOG.error("Cannot initialize ZK node for {}: {}",
|
||||
|
@ -73,6 +73,7 @@
|
||||
import org.apache.hadoop.yarn.server.federation.store.utils.FederationPolicyStoreInputValidator;
|
||||
import org.apache.hadoop.yarn.server.federation.store.utils.FederationStateStoreUtils;
|
||||
import org.apache.hadoop.yarn.server.records.Version;
|
||||
import org.apache.zookeeper.data.ACL;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -133,9 +134,10 @@ public void init(Configuration conf) throws YarnException {
|
||||
|
||||
// Create base znode for each entity
|
||||
try {
|
||||
zkManager.createRootDirRecursively(membershipZNode);
|
||||
zkManager.createRootDirRecursively(appsZNode);
|
||||
zkManager.createRootDirRecursively(policiesZNode);
|
||||
List<ACL> zkAcl = ZKCuratorManager.getZKAcls(conf);
|
||||
zkManager.createRootDirRecursively(membershipZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(appsZNode, zkAcl);
|
||||
zkManager.createRootDirRecursively(policiesZNode, zkAcl);
|
||||
} catch (Exception e) {
|
||||
String errMsg = "Cannot create base directories: " + e.getMessage();
|
||||
FederationStateStoreUtils.logAndThrowStoreException(LOG, errMsg);
|
||||
|
@ -379,7 +379,7 @@ public synchronized void initInternal(Configuration conf)
|
||||
@Override
|
||||
public synchronized void startInternal() throws Exception {
|
||||
// ensure root dirs exist
|
||||
zkManager.createRootDirRecursively(znodeWorkingPath);
|
||||
zkManager.createRootDirRecursively(znodeWorkingPath, zkAcl);
|
||||
create(zkRootNodePath);
|
||||
setRootNodeAcls();
|
||||
delete(fencingNodePath);
|
||||
|
@ -90,7 +90,7 @@ public void initialize(Configuration config, Configuration schedConf,
|
||||
this.confStorePath = getNodePath(znodeParentPath, CONF_STORE_PATH);
|
||||
this.fencingNodePath = getNodePath(znodeParentPath, FENCING_PATH);
|
||||
|
||||
zkManager.createRootDirRecursively(znodeParentPath);
|
||||
zkManager.createRootDirRecursively(znodeParentPath, zkAcl);
|
||||
zkManager.delete(fencingNodePath);
|
||||
|
||||
if (!zkManager.exists(logsPath)) {
|
||||
|
@ -25,6 +25,7 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol;
|
||||
import org.apache.hadoop.ha.HAServiceProtocol.StateChangeRequestInfo;
|
||||
import org.apache.hadoop.io.Text;
|
||||
@ -206,7 +207,7 @@ private String getDelegationTokenNode(int rmDTSequenceNumber, int splitIdx) {
|
||||
|
||||
private RMStateStore createStore(Configuration conf) throws Exception {
|
||||
workingZnode = "/jira/issue/3077/rmstore";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
||||
curatorTestingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
conf.setLong(YarnConfiguration.RM_EPOCH, epoch);
|
||||
@ -339,7 +340,7 @@ public Version getCurrentVersion() throws Exception {
|
||||
public RMStateStore getRMStateStore() throws Exception {
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
workingZnode = "/jira/issue/3077/rmstore";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
||||
curatorTestingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.store = new TestZKRMStateStoreInternal(conf, workingZnode) {
|
||||
@ -380,9 +381,9 @@ public static Configuration createHARMConf(String rmIds, String rmId,
|
||||
conf.set(YarnConfiguration.RM_HA_IDS, rmIds);
|
||||
conf.setBoolean(YarnConfiguration.RECOVERY_ENABLED, true);
|
||||
conf.set(YarnConfiguration.RM_STORE, ZKRMStateStore.class.getName());
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
||||
curatorTestServer.getConnectString());
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.set(YarnConfiguration.RM_HA_ID, rmId);
|
||||
conf.set(YarnConfiguration.RM_WEBAPP_ADDRESS, "localhost:0");
|
||||
conf.setBoolean(
|
||||
@ -419,31 +420,37 @@ private static boolean verifyZKACL(String id, String scheme, int perm,
|
||||
public void testZKRootPathAcls() throws Exception {
|
||||
StateChangeRequestInfo req = new StateChangeRequestInfo(
|
||||
HAServiceProtocol.RequestSource.REQUEST_BY_USER);
|
||||
String rootPath =
|
||||
YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH + "/" +
|
||||
ZKRMStateStore.ROOT_ZNODE_NAME;
|
||||
String parentPath = YarnConfiguration.DEFAULT_ZK_RM_STATE_STORE_PARENT_PATH;
|
||||
String rootPath = parentPath + "/" + ZKRMStateStore.ROOT_ZNODE_NAME;
|
||||
|
||||
// Start RM with HA enabled
|
||||
Configuration conf =
|
||||
createHARMConf("rm1,rm2", "rm1", 1234, false, curatorTestingServer);
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
|
||||
int perm = 23;// rwca=1+2+4+16
|
||||
ResourceManager rm = new MockRM(conf);
|
||||
rm.start();
|
||||
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||
List<ACL> acls =
|
||||
((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
|
||||
ZKRMStateStore stateStore = (ZKRMStateStore) rm.getRMContext().getStateStore();
|
||||
List<ACL> acls = stateStore.getACL(rootPath);
|
||||
assertEquals(acls.size(), 2);
|
||||
// CREATE and DELETE permissions for root node based on RM ID
|
||||
verifyZKACL("digest", "localhost", Perms.CREATE | Perms.DELETE, acls);
|
||||
verifyZKACL(
|
||||
"world", "anyone", Perms.ALL ^ (Perms.CREATE | Perms.DELETE), acls);
|
||||
|
||||
acls = stateStore.getACL(parentPath);
|
||||
assertEquals(1, acls.size());
|
||||
assertEquals(perm, acls.get(0).getPerms());
|
||||
rm.close();
|
||||
|
||||
// Now start RM with HA disabled. NoAuth Exception should not be thrown.
|
||||
conf.setBoolean(YarnConfiguration.RM_HA_ENABLED, false);
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, YarnConfiguration.DEFAULT_RM_ZK_ACL);
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||
acls = ((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
|
||||
acls = stateStore.getACL(rootPath);
|
||||
assertEquals(acls.size(), 1);
|
||||
verifyZKACL("world", "anyone", Perms.ALL, acls);
|
||||
rm.close();
|
||||
@ -453,7 +460,7 @@ public void testZKRootPathAcls() throws Exception {
|
||||
rm = new MockRM(conf);
|
||||
rm.start();
|
||||
rm.getRMContext().getRMAdminService().transitionToActive(req);
|
||||
acls = ((ZKRMStateStore)rm.getRMContext().getStateStore()).getACL(rootPath);
|
||||
acls = stateStore.getACL(rootPath);
|
||||
assertEquals(acls.size(), 2);
|
||||
verifyZKACL("digest", "localhost", Perms.CREATE | Perms.DELETE, acls);
|
||||
verifyZKACL(
|
||||
|
@ -22,13 +22,12 @@
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.curator.test.TestingServer;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStoreTestBase.TestDispatcher;
|
||||
import org.apache.hadoop.util.ZKUtil;
|
||||
|
||||
import org.apache.zookeeper.CreateMode;
|
||||
import org.apache.zookeeper.ZooDefs;
|
||||
import org.apache.zookeeper.server.auth.DigestAuthenticationProvider;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
@ -38,7 +37,6 @@
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertTrue;
|
||||
import static org.junit.Assert.fail;
|
||||
|
||||
@ -90,7 +88,7 @@ public TestZKRMStateStore(Configuration conf, String workingZnode)
|
||||
|
||||
public RMStateStore getRMStateStore(Configuration conf) throws Exception {
|
||||
String workingZnode = "/Test";
|
||||
conf.set(YarnConfiguration.RM_ZK_ADDRESS,
|
||||
conf.set(CommonConfigurationKeys.ZK_ADDRESS,
|
||||
testingServer.getConnectString());
|
||||
conf.set(YarnConfiguration.ZK_RM_STATE_STORE_PARENT_PATH, workingZnode);
|
||||
this.store = new TestZKRMStateStore(conf, workingZnode);
|
||||
@ -103,8 +101,8 @@ public void testZKClientRetry() throws Exception {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
final String path = "/test";
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.setLong(YarnConfiguration.RM_ZK_RETRY_INTERVAL_MS, 100);
|
||||
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.setLong(CommonConfigurationKeys.ZK_RETRY_INTERVAL_MS, 100);
|
||||
final ZKRMStateStore store =
|
||||
(ZKRMStateStore) zkClientTester.getRMStateStore(conf);
|
||||
TestDispatcher dispatcher = new TestDispatcher();
|
||||
@ -133,7 +131,7 @@ public void run() {
|
||||
public void testSetZKAcl() {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, "world:anyone:rwca");
|
||||
conf.set(CommonConfigurationKeys.ZK_ACL, "world:anyone:rwca");
|
||||
try {
|
||||
zkClientTester.store.delete(zkClientTester.store
|
||||
.znodeWorkingPath);
|
||||
@ -146,7 +144,7 @@ public void testSetZKAcl() {
|
||||
public void testInvalidZKAclConfiguration() {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, "randomstring&*");
|
||||
conf.set(CommonConfigurationKeys.ZK_ACL, "randomstring&*");
|
||||
try {
|
||||
zkClientTester.getRMStateStore(conf);
|
||||
fail("ZKRMStateStore created with bad ACL");
|
||||
@ -163,10 +161,10 @@ public void testInvalidZKAclConfiguration() {
|
||||
public void testZKAuths() throws Exception {
|
||||
TestZKClient zkClientTester = new TestZKClient();
|
||||
YarnConfiguration conf = new YarnConfiguration();
|
||||
conf.setInt(YarnConfiguration.RM_ZK_NUM_RETRIES, 1);
|
||||
conf.setInt(YarnConfiguration.RM_ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.set(YarnConfiguration.RM_ZK_ACL, TEST_ACL);
|
||||
conf.set(YarnConfiguration.RM_ZK_AUTH, TEST_AUTH_GOOD);
|
||||
conf.setInt(CommonConfigurationKeys.ZK_NUM_RETRIES, 1);
|
||||
conf.setInt(CommonConfigurationKeys.ZK_TIMEOUT_MS, ZK_TIMEOUT_MS);
|
||||
conf.set(CommonConfigurationKeys.ZK_ACL, TEST_ACL);
|
||||
conf.set(CommonConfigurationKeys.ZK_AUTH, TEST_AUTH_GOOD);
|
||||
|
||||
zkClientTester.getRMStateStore(conf);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user