diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java new file mode 100644 index 0000000000..88c9301968 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/ConfigurationUpdateAssembler.java @@ -0,0 +1,181 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY; + +public final class ConfigurationUpdateAssembler { + + private ConfigurationUpdateAssembler() { + } + + public static Map constructKeyValueConfUpdate( + CapacitySchedulerConfiguration proposedConf, + SchedConfUpdateInfo mutationInfo) throws IOException { + + Map confUpdate = new HashMap<>(); + for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { + removeQueue(queueToRemove, proposedConf, confUpdate); + } + for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { + addQueue(addQueueInfo, proposedConf, confUpdate); + } + for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { + updateQueue(updateQueueInfo, proposedConf, confUpdate); + } + for (Map.Entry global : mutationInfo.getGlobalParams() + .entrySet()) { + confUpdate.put(global.getKey(), global.getValue()); + } + return confUpdate; + } + + private static void removeQueue( + String queueToRemove, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (queueToRemove == null) { + return; + } + if (queueToRemove.lastIndexOf('.') == -1) { + throw new IOException("Can't remove queue " + queueToRemove); + } + String queueName = queueToRemove.substring( + queueToRemove.lastIndexOf('.') + 1); + List siblingQueues = getSiblingQueues(queueToRemove, + proposedConf); + if (!siblingQueues.contains(queueName)) { + throw new IOException("Queue " + queueToRemove + " not found"); + } + siblingQueues.remove(queueName); + String parentQueuePath = queueToRemove.substring(0, queueToRemove + .lastIndexOf('.')); + proposedConf.setQueues(parentQueuePath, siblingQueues.toArray( + new String[0])); + String queuesConfig = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + if (siblingQueues.isEmpty()) { + confUpdate.put(queuesConfig, null); + // Unset Ordering Policy of Leaf Queue converted from + // Parent Queue after removeQueue + String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX + + parentQueuePath + CapacitySchedulerConfiguration.DOT + + ORDERING_POLICY; + proposedConf.unset(queueOrderingPolicy); + confUpdate.put(queueOrderingPolicy, null); + } else { + confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues)); + } + for (Map.Entry confRemove : proposedConf.getValByRegex( + ".*" + queueToRemove + "\\..*") + .entrySet()) { + proposedConf.unset(confRemove.getKey()); + confUpdate.put(confRemove.getKey(), null); + } + } + + private static void addQueue( + QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, + Map confUpdate) throws IOException { + if (addInfo == null) { + return; + } + String queuePath = addInfo.getQueue(); + String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); + if (queuePath.lastIndexOf('.') == -1) { + throw new IOException("Can't add invalid queue " + queuePath); + } else if (getSiblingQueues(queuePath, proposedConf).contains( + queueName)) { + throw new IOException("Can't add existing queue " + queuePath); + } + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String[] siblings = proposedConf.getQueues(parentQueue); + List siblingQueues = siblings == null ? new ArrayList<>() : + new ArrayList<>(Arrays.asList(siblings)); + siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); + proposedConf.setQueues(parentQueue, + siblingQueues.toArray(new String[0])); + confUpdate.put(CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES, + Joiner.on(',').join(siblingQueues)); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : addInfo.getParams().entrySet()) { + String keyValue = kv.getValue(); + if (keyValue == null || keyValue.isEmpty()) { + proposedConf.unset(keyPrefix + kv.getKey()); + confUpdate.put(keyPrefix + kv.getKey(), null); + } else { + proposedConf.set(keyPrefix + kv.getKey(), keyValue); + confUpdate.put(keyPrefix + kv.getKey(), keyValue); + } + } + // Unset Ordering Policy of Parent Queue converted from + // Leaf Queue after addQueue + String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY; + if (siblingQueues.size() == 1) { + proposedConf.unset(queueOrderingPolicy); + confUpdate.put(queueOrderingPolicy, null); + } + } + + private static void updateQueue(QueueConfigInfo updateInfo, + CapacitySchedulerConfiguration proposedConf, + Map confUpdate) { + if (updateInfo == null) { + return; + } + String queuePath = updateInfo.getQueue(); + String keyPrefix = CapacitySchedulerConfiguration.PREFIX + + queuePath + CapacitySchedulerConfiguration.DOT; + for (Map.Entry kv : updateInfo.getParams().entrySet()) { + String keyValue = kv.getValue(); + if (keyValue == null || keyValue.isEmpty()) { + proposedConf.unset(keyPrefix + kv.getKey()); + confUpdate.put(keyPrefix + kv.getKey(), null); + } else { + proposedConf.set(keyPrefix + kv.getKey(), keyValue); + confUpdate.put(keyPrefix + kv.getKey(), keyValue); + } + } + } + + private static List getSiblingQueues(String queuePath, Configuration conf) { + String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); + String childQueuesKey = CapacitySchedulerConfiguration.PREFIX + + parentQueue + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey)); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java index c4cb273b49..cbd217f4f2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/MutableCSConfigurationProvider.java @@ -19,7 +19,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; import org.apache.hadoop.classification.VisibleForTesting; -import org.apache.hadoop.thirdparty.com.google.common.base.Joiner; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -31,19 +30,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation; -import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; import java.io.IOException; -import java.util.ArrayList; -import java.util.Arrays; -import java.util.HashMap; -import java.util.List; import java.util.Map; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY; - /** * CS configuration provider which implements * {@link MutableConfigurationProvider} for modifying capacity scheduler @@ -79,15 +71,7 @@ protected Configuration getInitSchedulerConfig() { @Override public void init(Configuration config) throws IOException { this.confStore = YarnConfigurationStoreFactory.getStore(config); - Configuration initialSchedConf = getInitSchedulerConfig(); - initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); - this.schedConf = new Configuration(false); - // We need to explicitly set the key-values in schedConf, otherwise - // these configuration keys cannot be deleted when - // configuration is reloaded. - for (Map.Entry kv : initialSchedConf) { - schedConf.set(kv.getKey(), kv.getValue()); - } + initializeSchedConf(); try { confStore.initialize(config, schedConf, rmContext); confStore.checkVersion(); @@ -108,7 +92,7 @@ public void close() throws IOException { } @VisibleForTesting - public YarnConfigurationStore getConfStore() { + protected YarnConfigurationStore getConfStore() { return confStore; } @@ -142,7 +126,7 @@ public LogMutation logAndApplyMutation(UserGroupInformation user, CapacitySchedulerConfiguration proposedConf = new CapacitySchedulerConfiguration(schedConf, false); Map kvUpdate - = constructKeyValueConfUpdate(proposedConf, confUpdate); + = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate); LogMutation log = new LogMutation(kvUpdate, user.getShortUserName()); confStore.logMutation(log); applyMutation(proposedConf, kvUpdate); @@ -155,7 +139,7 @@ public Configuration applyChanges(Configuration oldConfiguration, CapacitySchedulerConfiguration proposedConf = new CapacitySchedulerConfiguration(oldConfiguration, false); Map kvUpdate - = constructKeyValueConfUpdate(proposedConf, confUpdate); + = ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate); applyMutation(proposedConf, kvUpdate); return proposedConf; } @@ -177,15 +161,7 @@ public void formatConfigurationInStore(Configuration config) try { confStore.format(); oldConf = new Configuration(schedConf); - Configuration initialSchedConf = new Configuration(false); - initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE); - this.schedConf = new Configuration(false); - // We need to explicitly set the key-values in schedConf, otherwise - // these configuration keys cannot be deleted when - // configuration is reloaded. - for (Map.Entry kv : initialSchedConf) { - schedConf.set(kv.getKey(), kv.getValue()); - } + initializeSchedConf(); confStore.initialize(config, schedConf, rmContext); confStore.checkVersion(); } catch (Exception e) { @@ -195,6 +171,17 @@ public void formatConfigurationInStore(Configuration config) } } + private void initializeSchedConf() { + Configuration initialSchedConf = getInitSchedulerConfig(); + this.schedConf = new Configuration(false); + // We need to explicitly set the key-values in schedConf, otherwise + // these configuration keys cannot be deleted when + // configuration is reloaded. + for (Map.Entry kv : initialSchedConf) { + schedConf.set(kv.getKey(), kv.getValue()); + } + } + @Override public void revertToOldConfig(Configuration config) throws Exception { formatLock.writeLock().lock(); @@ -233,147 +220,4 @@ public void reloadConfigurationFromStore() throws Exception { formatLock.readLock().unlock(); } } - - private List getSiblingQueues(String queuePath, Configuration conf) { - String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); - String childQueuesKey = CapacitySchedulerConfiguration.PREFIX + - parentQueue + CapacitySchedulerConfiguration.DOT + - CapacitySchedulerConfiguration.QUEUES; - return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey)); - } - - private Map constructKeyValueConfUpdate( - CapacitySchedulerConfiguration proposedConf, - SchedConfUpdateInfo mutationInfo) throws IOException { - - Map confUpdate = new HashMap<>(); - for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) { - removeQueue(queueToRemove, proposedConf, confUpdate); - } - for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) { - addQueue(addQueueInfo, proposedConf, confUpdate); - } - for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) { - updateQueue(updateQueueInfo, proposedConf, confUpdate); - } - for (Map.Entry global : mutationInfo.getGlobalParams() - .entrySet()) { - confUpdate.put(global.getKey(), global.getValue()); - } - return confUpdate; - } - - private void removeQueue( - String queueToRemove, CapacitySchedulerConfiguration proposedConf, - Map confUpdate) throws IOException { - if (queueToRemove == null) { - return; - } else { - String queueName = queueToRemove.substring( - queueToRemove.lastIndexOf('.') + 1); - if (queueToRemove.lastIndexOf('.') == -1) { - throw new IOException("Can't remove queue " + queueToRemove); - } else { - List siblingQueues = getSiblingQueues(queueToRemove, - proposedConf); - if (!siblingQueues.contains(queueName)) { - throw new IOException("Queue " + queueToRemove + " not found"); - } - siblingQueues.remove(queueName); - String parentQueuePath = queueToRemove.substring(0, queueToRemove - .lastIndexOf('.')); - proposedConf.setQueues(parentQueuePath, siblingQueues.toArray( - new String[0])); - String queuesConfig = CapacitySchedulerConfiguration.PREFIX - + parentQueuePath + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.QUEUES; - if (siblingQueues.size() == 0) { - confUpdate.put(queuesConfig, null); - // Unset Ordering Policy of Leaf Queue converted from - // Parent Queue after removeQueue - String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX - + parentQueuePath + CapacitySchedulerConfiguration.DOT - + ORDERING_POLICY; - proposedConf.unset(queueOrderingPolicy); - confUpdate.put(queueOrderingPolicy, null); - } else { - confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues)); - } - for (Map.Entry confRemove : proposedConf.getValByRegex( - ".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*") - .entrySet()) { - proposedConf.unset(confRemove.getKey()); - confUpdate.put(confRemove.getKey(), null); - } - } - } - } - - private void addQueue( - QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf, - Map confUpdate) throws IOException { - if (addInfo == null) { - return; - } else { - String queuePath = addInfo.getQueue(); - String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1); - if (queuePath.lastIndexOf('.') == -1) { - throw new IOException("Can't add invalid queue " + queuePath); - } else if (getSiblingQueues(queuePath, proposedConf).contains( - queueName)) { - throw new IOException("Can't add existing queue " + queuePath); - } - String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.')); - String[] siblings = proposedConf.getQueues(parentQueue); - List siblingQueues = siblings == null ? new ArrayList<>() : - new ArrayList<>(Arrays.asList(siblings)); - siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1)); - proposedConf.setQueues(parentQueue, - siblingQueues.toArray(new String[0])); - confUpdate.put(CapacitySchedulerConfiguration.PREFIX - + parentQueue + CapacitySchedulerConfiguration.DOT - + CapacitySchedulerConfiguration.QUEUES, - Joiner.on(',').join(siblingQueues)); - String keyPrefix = CapacitySchedulerConfiguration.PREFIX - + queuePath + CapacitySchedulerConfiguration.DOT; - for (Map.Entry kv : addInfo.getParams().entrySet()) { - if (kv.getValue() == null) { - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), kv.getValue()); - } - confUpdate.put(keyPrefix + kv.getKey(), kv.getValue()); - } - // Unset Ordering Policy of Parent Queue converted from - // Leaf Queue after addQueue - String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX - + parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY; - if (siblingQueues.size() == 1) { - proposedConf.unset(queueOrderingPolicy); - confUpdate.put(queueOrderingPolicy, null); - } - } - } - - private void updateQueue(QueueConfigInfo updateInfo, - CapacitySchedulerConfiguration proposedConf, - Map confUpdate) { - if (updateInfo == null) { - return; - } else { - String queuePath = updateInfo.getQueue(); - String keyPrefix = CapacitySchedulerConfiguration.PREFIX - + queuePath + CapacitySchedulerConfiguration.DOT; - for (Map.Entry kv : updateInfo.getParams().entrySet()) { - String keyValue = kv.getValue(); - if (keyValue == null || keyValue.isEmpty()) { - keyValue = null; - proposedConf.unset(keyPrefix + kv.getKey()); - } else { - proposedConf.set(keyPrefix + kv.getKey(), keyValue); - } - confUpdate.put(keyPrefix + kv.getKey(), keyValue); - } - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java new file mode 100644 index 0000000000..890996ac23 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/conf/TestConfigurationUpdateAssembler.java @@ -0,0 +1,173 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo; +import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.HashMap; +import java.util.Map; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNull; +import static org.junit.Assert.assertThrows; +import static org.junit.Assert.assertTrue; + +/** + * Tests {@link ConfigurationUpdateAssembler}. + */ +public class TestConfigurationUpdateAssembler { + + private static final String A_PATH = "root.a"; + private static final String B_PATH = "root.b"; + private static final String C_PATH = "root.c"; + + private static final String CONFIG_NAME = "testConfigName"; + private static final String A_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + A_PATH + + CapacitySchedulerConfiguration.DOT + CONFIG_NAME; + private static final String B_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + B_PATH + + CapacitySchedulerConfiguration.DOT + CONFIG_NAME; + private static final String C_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + C_PATH + + CapacitySchedulerConfiguration.DOT + CONFIG_NAME; + private static final String ROOT_QUEUES_PATH = CapacitySchedulerConfiguration.PREFIX + + CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT + + CapacitySchedulerConfiguration.QUEUES; + + private static final String A_INIT_CONFIG_VALUE = "aInitValue"; + private static final String A_CONFIG_VALUE = "aValue"; + private static final String B_INIT_CONFIG_VALUE = "bInitValue"; + private static final String B_CONFIG_VALUE = "bValue"; + private static final String C_CONFIG_VALUE = "cValue"; + + private CapacitySchedulerConfiguration csConfig; + + @Before + public void setUp() { + csConfig = crateInitialCSConfig(); + } + + @Test + public void testAddQueue() throws Exception { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateMap = new HashMap<>(); + updateMap.put(CONFIG_NAME, C_CONFIG_VALUE); + QueueConfigInfo queueConfigInfo = new QueueConfigInfo(C_PATH, updateMap); + updateInfo.getAddQueueInfo().add(queueConfigInfo); + + Map configurationUpdate = + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + + assertEquals(C_CONFIG_VALUE, configurationUpdate.get(C_CONFIG_PATH)); + assertEquals("a,b,c", configurationUpdate.get(ROOT_QUEUES_PATH)); + } + + @Test + public void testAddExistingQueue() { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateMap = new HashMap<>(); + updateMap.put(CONFIG_NAME, A_CONFIG_VALUE); + QueueConfigInfo queueConfigInfo = new QueueConfigInfo(A_PATH, updateMap); + updateInfo.getAddQueueInfo().add(queueConfigInfo); + + assertThrows(IOException.class, () -> { + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + }); + } + + @Test + public void testAddInvalidQueue() { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateMap = new HashMap<>(); + updateMap.put(CONFIG_NAME, A_CONFIG_VALUE); + QueueConfigInfo queueConfigInfo = new QueueConfigInfo("invalidPath", updateMap); + updateInfo.getAddQueueInfo().add(queueConfigInfo); + + assertThrows(IOException.class, () -> { + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + }); + } + + @Test + public void testUpdateQueue() throws Exception { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + Map updateMap = new HashMap<>(); + updateMap.put(CONFIG_NAME, A_CONFIG_VALUE); + QueueConfigInfo queueAConfigInfo = new QueueConfigInfo(A_PATH, updateMap); + updateInfo.getUpdateQueueInfo().add(queueAConfigInfo); + + Map updateMapQueueB = new HashMap<>(); + updateMapQueueB.put(CONFIG_NAME, B_CONFIG_VALUE); + QueueConfigInfo queueBConfigInfo = new QueueConfigInfo(B_PATH, updateMapQueueB); + + updateInfo.getUpdateQueueInfo().add(queueBConfigInfo); + + Map configurationUpdate = + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + + assertEquals(A_CONFIG_VALUE, configurationUpdate.get(A_CONFIG_PATH)); + assertEquals(B_CONFIG_VALUE, configurationUpdate.get(B_CONFIG_PATH)); + } + + @Test + public void testRemoveQueue() throws Exception { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + updateInfo.getRemoveQueueInfo().add(A_PATH); + + Map configurationUpdate = + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + + assertTrue(configurationUpdate.containsKey(A_CONFIG_PATH)); + assertNull(configurationUpdate.get(A_CONFIG_PATH)); + assertEquals("b", configurationUpdate.get(ROOT_QUEUES_PATH)); + } + + @Test + public void testRemoveInvalidQueue() { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("invalidPath"); + + assertThrows(IOException.class, () -> { + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + }); + } + + @Test + public void testRemoveNonExistingQueue() { + SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo(); + updateInfo.getRemoveQueueInfo().add("root.d"); + + assertThrows(IOException.class, () -> { + ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo); + }); + } + + private CapacitySchedulerConfiguration crateInitialCSConfig() { + CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a, b"}); + + csConf.set(A_CONFIG_PATH, A_INIT_CONFIG_VALUE); + csConf.set(B_CONFIG_PATH, B_INIT_CONFIG_VALUE); + + return csConf; + } +}