diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 92b9697b3b..b603135cb8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -680,6 +680,9 @@ public static boolean isAclEnabled(Configuration conf) { public static final String DEFAULT_CONFIGURATION_STORE = MEMORY_CONFIGURATION_STORE; + public static final String RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS = + YARN_PREFIX + "scheduler.configuration.mutation.acl-policy.class"; + public static final String YARN_AUTHORIZATION_PROVIDER = YARN_PREFIX + "authorization-provider"; private static final List RM_SERVICES_ADDRESS_CONF_KEYS_HTTP = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 73f4717b0e..e124c5b14a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -3379,4 +3379,15 @@ memory + + + The class to use for configuration mutation ACL policy if using a mutable + configuration provider. Controls whether a mutation request is allowed. + The DefaultConfigurationMutationACLPolicy checks if the requestor is a + YARN admin. + + yarn.scheduler.configuration.mutation.acl-policy.class + org.apache.hadoop.yarn.server.resourcemanager.scheduler.DefaultConfigurationMutationACLPolicy + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicy.java new file mode 100644 index 0000000000..724487b615 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicy.java @@ -0,0 +1,47 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; + +/** + * Interface for determining whether configuration mutations are allowed. + */ +public interface ConfigurationMutationACLPolicy { + + /** + * Initialize ACL policy with configuration and RMContext. + * @param conf Configuration to initialize with. + * @param rmContext rmContext + */ + void init(Configuration conf, RMContext rmContext); + + /** + * Check if mutation is allowed. + * @param user User issuing the request + * @param confUpdate configurations to be updated + * @return whether provided mutation is allowed or not + */ + boolean isMutationAllowed(UserGroupInformation user, QueueConfigsUpdateInfo + confUpdate); + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicyFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicyFactory.java new file mode 100644 index 0000000000..28987857c0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ConfigurationMutationACLPolicyFactory.java @@ -0,0 +1,49 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.util.ReflectionUtils; +import org.apache.hadoop.yarn.conf.YarnConfiguration; + +/** + * Factory class for creating instances of + * {@link ConfigurationMutationACLPolicy}. + */ +public final class ConfigurationMutationACLPolicyFactory { + + private static final Log LOG = LogFactory.getLog( + ConfigurationMutationACLPolicyFactory.class); + + private ConfigurationMutationACLPolicyFactory() { + // Unused. + } + + public static ConfigurationMutationACLPolicy getPolicy(Configuration conf) { + Class policyClass = + conf.getClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + DefaultConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + LOG.info("Using ConfigurationMutationACLPolicy implementation - " + + policyClass); + return ReflectionUtils.newInstance(policyClass, conf); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationACLPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationACLPolicy.java new file mode 100644 index 0000000000..680c3b85f7 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/DefaultConfigurationMutationACLPolicy.java @@ -0,0 +1,45 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.security.YarnAuthorizationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; + +/** + * Default configuration mutation ACL policy. Checks if user is YARN admin. + */ +public class DefaultConfigurationMutationACLPolicy implements + ConfigurationMutationACLPolicy { + + private YarnAuthorizationProvider authorizer; + + @Override + public void init(Configuration conf, RMContext rmContext) { + authorizer = YarnAuthorizationProvider.getInstance(conf); + } + + @Override + public boolean isMutationAllowed(UserGroupInformation user, + QueueConfigsUpdateInfo confUpdate) { + return authorizer.isAdmin(user); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java index 35e36e1c23..93a935e33f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfScheduler.java @@ -17,10 +17,11 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import java.io.IOException; -import java.util.Map; /** * Interface for a scheduler that supports changing configuration at runtime. @@ -31,10 +32,22 @@ public interface MutableConfScheduler extends ResourceScheduler { /** * Update the scheduler's configuration. * @param user Caller of this update - * @param confUpdate key-value map of the configuration update + * @param confUpdate configuration update * @throws IOException if update is invalid */ void updateConfiguration(UserGroupInformation user, - Map confUpdate) throws IOException; + QueueConfigsUpdateInfo confUpdate) throws IOException; + /** + * Get the scheduler configuration. + * @return the scheduler configuration + */ + Configuration getConfiguration(); + + /** + * Get queue object based on queue name. + * @param queueName the queue name + * @return the queue object + */ + Queue getQueue(String queueName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java index 889c3bc1f0..f04c128f6b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/MutableConfigurationProvider.java @@ -18,8 +18,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; + import java.io.IOException; -import java.util.Map; /** * Interface for allowing changing scheduler configurations. @@ -32,7 +34,7 @@ public interface MutableConfigurationProvider { * @param confUpdate Key-value pairs for configurations to be updated. * @throws IOException if scheduler could not be reinitialized */ - void mutateConfiguration(String user, Map confUpdate) - throws IOException; + void mutateConfiguration(UserGroupInformation user, QueueConfigsUpdateInfo + confUpdate) throws IOException; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 7c5839bf31..4f89f7e9d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -137,6 +137,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimplePlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -649,6 +650,7 @@ private void reinitializeQueues(CapacitySchedulerConfiguration newConf) preemptionManager.refreshQueues(null, this.getRootQueue()); } + @Override public CSQueue getQueue(String queueName) { if (queueName == null) { return null; @@ -2615,10 +2617,10 @@ public long getMaximumApplicationLifetime(String queueName) { @Override public void updateConfiguration(UserGroupInformation user, - Map confUpdate) throws IOException { + QueueConfigsUpdateInfo confUpdate) throws IOException { if (csConfProvider instanceof MutableConfigurationProvider) { ((MutableConfigurationProvider) csConfProvider).mutateConfiguration( - user.getShortUserName(), confUpdate); + user, confUpdate); } else { throw new UnsupportedOperationException("Configured CS configuration " + "provider does not support updating configuration."); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index ea1b3c070f..8b879b072b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -18,14 +18,27 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; +import com.google.common.base.Joiner; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.AccessControlException; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicyFactory; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; import java.util.Map; /** @@ -38,6 +51,7 @@ public class MutableCSConfigurationProvider implements CSConfigurationProvider, private Configuration schedConf; private YarnConfigurationStore confStore; + private ConfigurationMutationACLPolicy aclMutationPolicy; private RMContext rmContext; private Configuration conf; @@ -68,6 +82,9 @@ public void init(Configuration config) throws IOException { schedConf.set(kv.getKey(), kv.getValue()); } confStore.initialize(config, schedConf); + this.aclMutationPolicy = ConfigurationMutationACLPolicyFactory + .getPolicy(config); + aclMutationPolicy.init(config, rmContext); this.conf = config; } @@ -80,12 +97,17 @@ public CapacitySchedulerConfiguration loadConfiguration(Configuration } @Override - public void mutateConfiguration(String user, - Map confUpdate) throws IOException { + public void mutateConfiguration(UserGroupInformation user, + QueueConfigsUpdateInfo confUpdate) throws IOException { + if (!aclMutationPolicy.isMutationAllowed(user, confUpdate)) { + throw new AccessControlException("User is not admin of all modified" + + " queues."); + } Configuration oldConf = new Configuration(schedConf); - LogMutation log = new LogMutation(confUpdate, user); + Map kvUpdate = constructKeyValueConfUpdate(confUpdate); + LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); long id = confStore.logMutation(log); - for (Map.Entry kv : confUpdate.entrySet()) { + for (Map.Entry kv : kvUpdate.entrySet()) { if (kv.getValue() == null) { schedConf.unset(kv.getKey()); } else { @@ -101,4 +123,125 @@ public void mutateConfiguration(String user, } confStore.confirmMutation(id, true); } + + + private Map constructKeyValueConfUpdate( + QueueConfigsUpdateInfo mutationInfo) throws IOException { + CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); + CapacitySchedulerConfiguration proposedConf = + new CapacitySchedulerConfiguration(cs.getConfiguration(), false); + Map confUpdate = new HashMap<>(); + for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { + removeQueue(queueToRemove, proposedConf, confUpdate); + } + for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { + addQueue(addQueueInfo, proposedConf, confUpdate); + } + for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { + updateQueue(updateQueueInfo, proposedConf, confUpdate); + } + return confUpdate; + } + + private void removeQueue( + String queueToRemove, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (queueToRemove == null) { + return; + } else { + CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); + String queueName = queueToRemove.substring( + queueToRemove.lastIndexOf('.') + 1); + CSQueue queue = cs.getQueue(queueName); + if (queue == null || + !queue.getQueuePath().equals(queueToRemove)) { + throw new IOException("Queue " + queueToRemove + " not found"); + } else if (queueToRemove.lastIndexOf('.') == -1) { + throw new IOException("Can't remove queue " + queueToRemove); + } + String parentQueuePath = queueToRemove.substring(0, queueToRemove + .lastIndexOf('.')); + String[] siblingQueues = proposedConf.getQueues(parentQueuePath); + List newSiblingQueues = new ArrayList<>(); + for (String siblingQueue : siblingQueues) { + if (!siblingQueue.equals(queueName)) { + newSiblingQueues.add(siblingQueue); + } + } + proposedConf.setQueues(parentQueuePath, newSiblingQueues + .toArray(new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (newSiblingQueues.size() == 0) { + confUpdate.put(queuesConfig, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } + } + } + + private void addQueue( + QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (addInfo == null) { + return; + } else { + CapacityScheduler cs = (CapacityScheduler) rmContext.getScheduler(); + String queuePath = addInfo.getQueue(); + String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); + if (cs.getQueue(queueName) != null) { + throw new IOException("Can't add existing queue " + queuePath); + } else if (queuePath.lastIndexOf('.') == -1) { + throw new IOException("Can't add invalid queue " + queuePath); + } + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String[] siblings = proposedConf.getQueues(parentQueue); + List siblingQueues = siblings == null ? new ArrayList<>() : + new ArrayList<>(Arrays.asList(siblings)); + siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); + proposedConf.setQueues(parentQueue, + siblingQueues.toArray(new String[0])); + confUpdate.put(CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES, + Joiner.on(',').join(siblingQueues)); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : addInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + } + } + + private void updateQueue(QueueConfigInfo updateInfo, + CapacitySchedulerConfiguration proposedConf, + Map confUpdate) { + if (updateInfo == null) { + return; + } else { + String queuePath = updateInfo.getQueue(); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : updateInfo.getParams().entrySet()) { + if (kv.getValue() == null) { + proposedConf.unset(keyPrefix + kv.getKey()); + } else { + proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); + } + confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationACLPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationACLPolicy.java new file mode 100644 index 0000000000..1f94c1c5d0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/QueueAdminConfigurationMutationACLPolicy.java @@ -0,0 +1,96 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ConfigurationMutationACLPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; + +import java.io.IOException; +import java.util.HashSet; +import java.util.Set; + +/** + * A configuration mutation ACL policy which checks that user has admin + * privileges on all queues they are changing. + */ +public class QueueAdminConfigurationMutationACLPolicy implements + ConfigurationMutationACLPolicy { + + private RMContext rmContext; + + @Override + public void init(Configuration conf, RMContext context) { + this.rmContext = context; + } + + @Override + public boolean isMutationAllowed(UserGroupInformation user, + QueueConfigsUpdateInfo confUpdate) { + Set queues = new HashSet<>(); + for (QueueConfigInfo addQueueInfo : confUpdate.getAddQueueInfo()) { + queues.add(addQueueInfo.getQueue()); + } + for (String removeQueue : confUpdate.getRemoveQueueInfo()) { + queues.add(removeQueue); + } + for (QueueConfigInfo updateQueueInfo : confUpdate.getUpdateQueueInfo()) { + queues.add(updateQueueInfo.getQueue()); + } + for (String queuePath : queues) { + String queueName = queuePath.lastIndexOf('.') != -1 ? + queuePath.substring(queuePath.lastIndexOf('.') + 1) : queuePath; + QueueInfo queueInfo = null; + try { + queueInfo = rmContext.getScheduler() + .getQueueInfo(queueName, false, false); + } catch (IOException e) { + // Queue is not found, do nothing. + } + String parentPath = queuePath; + // TODO: handle global config change. + while (queueInfo == null) { + // We are adding a queue (whose parent we are possibly also adding). + // Check ACL of lowest parent queue which already exists. + parentPath = parentPath.substring(0, parentPath.lastIndexOf('.')); + String parentName = parentPath.lastIndexOf('.') != -1 ? + parentPath.substring(parentPath.lastIndexOf('.') + 1) : parentPath; + try { + queueInfo = rmContext.getScheduler() + .getQueueInfo(parentName, false, false); + } catch (IOException e) { + // Queue is not found, do nothing. + } + } + Queue queue = ((MutableConfScheduler) rmContext.getScheduler()) + .getQueue(queueInfo.getQueueName()); + if (queue != null && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, user)) { + return false; + } + } + return true; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java index 215e5117cd..d95465b62d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/RMWebServices.java @@ -142,7 +142,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; @@ -2484,10 +2483,8 @@ public Response updateSchedulerConfiguration(QueueConfigsUpdateInfo callerUGI.doAs(new PrivilegedExceptionAction() { @Override public Void run() throws IOException, YarnException { - Map confUpdate = - constructKeyValueConfUpdate(mutationInfo); - ((CapacityScheduler) scheduler).updateConfiguration(callerUGI, - confUpdate); + ((MutableConfScheduler) scheduler).updateConfiguration(callerUGI, + mutationInfo); return null; } }); @@ -2499,129 +2496,9 @@ public Void run() throws IOException, YarnException { "successfully applied.").build(); } else { return Response.status(Status.BAD_REQUEST) - .entity("Configuration change only supported by CapacityScheduler.") + .entity("Configuration change only supported by " + + "MutableConfScheduler.") .build(); } } - - private Map constructKeyValueConfUpdate( - QueueConfigsUpdateInfo mutationInfo) throws IOException { - CapacitySchedulerConfiguration currentConf = - ((CapacityScheduler) rm.getResourceScheduler()).getConfiguration(); - CapacitySchedulerConfiguration proposedConf = - new CapacitySchedulerConfiguration(currentConf, false); - Map confUpdate = new HashMap<>(); - for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { - removeQueue(queueToRemove, proposedConf, confUpdate); - } - for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { - addQueue(addQueueInfo, proposedConf, confUpdate); - } - for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { - updateQueue(updateQueueInfo, proposedConf, confUpdate); - } - return confUpdate; - } - - private void removeQueue( - String queueToRemove, CapacitySchedulerConfiguration proposedConf, - Map confUpdate) throws IOException { - if (queueToRemove == null) { - return; - } else { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - String queueName = queueToRemove.substring( - queueToRemove.lastIndexOf('.') + 1); - CSQueue queue = cs.getQueue(queueName); - if (queue == null || - !queue.getQueuePath().equals(queueToRemove)) { - throw new IOException("Queue " + queueToRemove + " not found"); - } else if (queueToRemove.lastIndexOf('.') == -1) { - throw new IOException("Can't remove queue " + queueToRemove); - } - String parentQueuePath = queueToRemove.substring(0, queueToRemove - .lastIndexOf('.')); - String[] siblingQueues = proposedConf.getQueues(parentQueuePath); - List newSiblingQueues = new ArrayList<>(); - for (String siblingQueue : siblingQueues) { - if (!siblingQueue.equals(queueName)) { - newSiblingQueues.add(siblingQueue); - } - } - proposedConf.setQueues(parentQueuePath, newSiblingQueues - .toArray(new String[0])); - String queuesConfig = CapacitySchedulerConfiguration.PREFIX + - parentQueuePath + CapacitySchedulerConfiguration.DOT + - CapacitySchedulerConfiguration.QUEUES; - if (newSiblingQueues.size() == 0) { - confUpdate.put(queuesConfig, null); - } else { - confUpdate.put(queuesConfig, Joiner.on(',').join(newSiblingQueues)); - } - for (Map.Entry confRemove : proposedConf.getValByRegex( - ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") - .entrySet()) { - proposedConf.unset(confRemove.getKey()); - confUpdate.put(confRemove.getKey(), null); - } - } - } - - private void addQueue( - QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, - Map confUpdate) throws IOException { - if (addInfo == null) { - return; - } else { - CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); - String queuePath = addInfo.getQueue(); - String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); - if (cs.getQueue(queueName) != null) { - throw new IOException("Can't add existing queue " + queuePath); - } else if (queuePath.lastIndexOf('.') == -1) { - throw new IOException("Can't add invalid queue " + queuePath); - } - String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); - String[] siblings = proposedConf.getQueues(parentQueue); - List siblingQueues = siblings == null ? new ArrayList<>() : - new ArrayList<>(Arrays.asList(siblings)); - siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); - proposedConf.setQueues(parentQueue, - siblingQueues.toArray(new String[0])); - confUpdate.put(CapacitySchedulerConfiguration.PREFIX + - parentQueue + CapacitySchedulerConfiguration.DOT + - CapacitySchedulerConfiguration.QUEUES, - Joiner.on(',').join(siblingQueues)); - String keyPrefix = CapacitySchedulerConfiguration.PREFIX + - queuePath + CapacitySchedulerConfiguration.DOT; - for (Map.Entry kv : addInfo.getParams().entrySet()) { - if (kv.getValue() == null) { - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); - } - confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); - } - } - } - - private void updateQueue(QueueConfigInfo updateInfo, - CapacitySchedulerConfiguration proposedConf, - Map confUpdate) { - if (updateInfo == null) { - return; - } else { - String queuePath = updateInfo.getQueue(); - String keyPrefix = CapacitySchedulerConfiguration.PREFIX + - queuePath + CapacitySchedulerConfiguration.DOT; - for (Map.Entry kv : updateInfo.getParams().entrySet()) { - if (kv.getValue() == null) { - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); - } - confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestConfigurationMutationACLPolicies.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestConfigurationMutationACLPolicies.java new file mode 100644 index 0000000000..4016dcffde --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestConfigurationMutationACLPolicies.java @@ -0,0 +1,154 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.QueueInfo; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.QueueAdminConfigurationMutationACLPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.Collections; +import java.util.Map; + +import static org.junit.Assert.assertFalse; +import static org.junit.Assert.assertTrue; +import static org.mockito.Matchers.anyBoolean; +import static org.mockito.Matchers.eq; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +public class TestConfigurationMutationACLPolicies { + + private ConfigurationMutationACLPolicy policy; + private RMContext rmContext; + private MutableConfScheduler scheduler; + + private static final UserGroupInformation GOOD_USER = UserGroupInformation + .createUserForTesting("goodUser", new String[] {}); + private static final UserGroupInformation BAD_USER = UserGroupInformation + .createUserForTesting("badUser", new String[] {}); + private static final Map EMPTY_MAP = + Collections.emptyMap(); + + @Before + public void setUp() throws IOException { + rmContext = mock(RMContext.class); + scheduler = mock(MutableConfScheduler.class); + when(rmContext.getScheduler()).thenReturn(scheduler); + mockQueue("a", scheduler); + mockQueue("b", scheduler); + mockQueue("b1", scheduler); + } + + private void mockQueue(String queueName, MutableConfScheduler scheduler) + throws IOException { + QueueInfo queueInfo = QueueInfo.newInstance(queueName, 0, 0, 0, null, null, + null, null, null, null, false); + when(scheduler.getQueueInfo(eq(queueName), anyBoolean(), anyBoolean())) + .thenReturn(queueInfo); + Queue queue = mock(Queue.class); + when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(GOOD_USER))) + .thenReturn(true); + when(queue.hasAccess(eq(QueueACL.ADMINISTER_QUEUE), eq(BAD_USER))) + .thenReturn(false); + when(scheduler.getQueue(eq(queueName))).thenReturn(queue); + } + @Test + public void testDefaultPolicy() { + Configuration conf = new Configuration(); + conf.set(YarnConfiguration.YARN_ADMIN_ACL, GOOD_USER.getShortUserName()); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + DefaultConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); + policy.init(conf, rmContext); + assertTrue(policy.isMutationAllowed(GOOD_USER, null)); + assertFalse(policy.isMutationAllowed(BAD_USER, null)); + } + + @Test + public void testQueueAdminBasedPolicy() { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + QueueAdminConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); + policy.init(conf, rmContext); + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + QueueConfigInfo configInfo = new QueueConfigInfo("root.a", EMPTY_MAP); + updateInfo.getUpdateQueueInfo().add(configInfo); + assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); + assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo)); + } + + @Test + public void testQueueAdminPolicyAddQueue() { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + QueueAdminConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); + policy.init(conf, rmContext); + // Add root.b.b1. Should check ACL of root.b queue. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2", EMPTY_MAP); + updateInfo.getAddQueueInfo().add(configInfo); + assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); + assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo)); + } + + @Test + public void testQueueAdminPolicyAddNestedQueue() { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + QueueAdminConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); + policy.init(conf, rmContext); + // Add root.b.b1.b11. Should check ACL of root.b queue. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + QueueConfigInfo configInfo = new QueueConfigInfo("root.b.b2.b21", EMPTY_MAP); + updateInfo.getAddQueueInfo().add(configInfo); + assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); + assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo)); + } + + @Test + public void testQueueAdminPolicyRemoveQueue() { + Configuration conf = new Configuration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER_MUTATION_ACL_POLICY_CLASS, + QueueAdminConfigurationMutationACLPolicy.class, + ConfigurationMutationACLPolicy.class); + policy = ConfigurationMutationACLPolicyFactory.getPolicy(conf); + policy.init(conf, rmContext); + // Remove root.b.b1. + QueueConfigsUpdateInfo updateInfo = new QueueConfigsUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.b.b1"); + assertTrue(policy.isMutationAllowed(GOOD_USER, updateInfo)); + assertFalse(policy.isMutationAllowed(BAD_USER, updateInfo)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java index 254da31893..13229b1086 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestMutableCSConfigurationProvider.java @@ -19,8 +19,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.QueueConfigsUpdateInfo; import org.junit.Before; import org.junit.Test; @@ -43,22 +47,34 @@ public class TestMutableCSConfigurationProvider { private MutableCSConfigurationProvider confProvider; private RMContext rmContext; - private Map goodUpdate; - private Map badUpdate; + private QueueConfigsUpdateInfo goodUpdate; + private QueueConfigsUpdateInfo badUpdate; private CapacityScheduler cs; - private static final String TEST_USER = "testUser"; + private static final UserGroupInformation TEST_USER = UserGroupInformation + .createUserForTesting("testUser", new String[] {}); @Before public void setUp() { cs = mock(CapacityScheduler.class); rmContext = mock(RMContext.class); when(rmContext.getScheduler()).thenReturn(cs); + when(cs.getConfiguration()).thenReturn( + new CapacitySchedulerConfiguration()); confProvider = new MutableCSConfigurationProvider(rmContext); - goodUpdate = new HashMap<>(); - goodUpdate.put("goodKey", "goodVal"); - badUpdate = new HashMap<>(); - badUpdate.put("badKey", "badVal"); + goodUpdate = new QueueConfigsUpdateInfo(); + Map goodUpdateMap = new HashMap<>(); + goodUpdateMap.put("goodKey", "goodVal"); + QueueConfigInfo goodUpdateInfo = new + QueueConfigInfo("root.a", goodUpdateMap); + goodUpdate.getUpdateQueueInfo().add(goodUpdateInfo); + + badUpdate = new QueueConfigsUpdateInfo(); + Map badUpdateMap = new HashMap<>(); + badUpdateMap.put("badKey", "badVal"); + QueueConfigInfo badUpdateInfo = new + QueueConfigInfo("root.a", badUpdateMap); + badUpdate.getUpdateQueueInfo().add(badUpdateInfo); } @Test @@ -66,15 +82,16 @@ public void testInMemoryBackedProvider() throws IOException { Configuration conf = new Configuration(); confProvider.init(conf); assertNull(confProvider.loadConfiguration(conf) - .get("goodKey")); + .get("yarn.scheduler.capacity.root.a.goodKey")); doNothing().when(cs).reinitialize(any(Configuration.class), any(RMContext.class)); confProvider.mutateConfiguration(TEST_USER, goodUpdate); assertEquals("goodVal", confProvider.loadConfiguration(conf) - .get("goodKey")); + .get("yarn.scheduler.capacity.root.a.goodKey")); - assertNull(confProvider.loadConfiguration(conf).get("badKey")); + assertNull(confProvider.loadConfiguration(conf).get( + "yarn.scheduler.capacity.root.a.badKey")); doThrow(new IOException()).when(cs).reinitialize(any(Configuration.class), any(RMContext.class)); try { @@ -82,6 +99,7 @@ public void testInMemoryBackedProvider() throws IOException { } catch (IOException e) { // Expected exception. } - assertNull(confProvider.loadConfiguration(conf).get("badKey")); + assertNull(confProvider.loadConfiguration(conf).get( + "yarn.scheduler.capacity.root.a.badKey")); } }