YARN-10005. Code improvements in MutableCSConfigurationProvider. Contributed by Peter Szucs
This commit is contained in:
parent
d340c4a7a1
commit
22c9f28f4d
@ -0,0 +1,181 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
|
||||||
|
|
||||||
|
public final class ConfigurationUpdateAssembler {
|
||||||
|
|
||||||
|
private ConfigurationUpdateAssembler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
public static Map<String, String> constructKeyValueConfUpdate(
|
||||||
|
CapacitySchedulerConfiguration proposedConf,
|
||||||
|
SchedConfUpdateInfo mutationInfo) throws IOException {
|
||||||
|
|
||||||
|
Map<String, String> confUpdate = new HashMap<>();
|
||||||
|
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
||||||
|
removeQueue(queueToRemove, proposedConf, confUpdate);
|
||||||
|
}
|
||||||
|
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
|
||||||
|
addQueue(addQueueInfo, proposedConf, confUpdate);
|
||||||
|
}
|
||||||
|
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
||||||
|
updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
||||||
|
}
|
||||||
|
for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
|
||||||
|
.entrySet()) {
|
||||||
|
confUpdate.put(global.getKey(), global.getValue());
|
||||||
|
}
|
||||||
|
return confUpdate;
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void removeQueue(
|
||||||
|
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
|
||||||
|
Map<String, String> confUpdate) throws IOException {
|
||||||
|
if (queueToRemove == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
if (queueToRemove.lastIndexOf('.') == -1) {
|
||||||
|
throw new IOException("Can't remove queue " + queueToRemove);
|
||||||
|
}
|
||||||
|
String queueName = queueToRemove.substring(
|
||||||
|
queueToRemove.lastIndexOf('.') + 1);
|
||||||
|
List<String> siblingQueues = getSiblingQueues(queueToRemove,
|
||||||
|
proposedConf);
|
||||||
|
if (!siblingQueues.contains(queueName)) {
|
||||||
|
throw new IOException("Queue " + queueToRemove + " not found");
|
||||||
|
}
|
||||||
|
siblingQueues.remove(queueName);
|
||||||
|
String parentQueuePath = queueToRemove.substring(0, queueToRemove
|
||||||
|
.lastIndexOf('.'));
|
||||||
|
proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
|
||||||
|
new String[0]));
|
||||||
|
String queuesConfig = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
|
||||||
|
+ CapacitySchedulerConfiguration.QUEUES;
|
||||||
|
if (siblingQueues.isEmpty()) {
|
||||||
|
confUpdate.put(queuesConfig, null);
|
||||||
|
// Unset Ordering Policy of Leaf Queue converted from
|
||||||
|
// Parent Queue after removeQueue
|
||||||
|
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
|
||||||
|
+ ORDERING_POLICY;
|
||||||
|
proposedConf.unset(queueOrderingPolicy);
|
||||||
|
confUpdate.put(queueOrderingPolicy, null);
|
||||||
|
} else {
|
||||||
|
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
|
||||||
|
}
|
||||||
|
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
|
||||||
|
".*" + queueToRemove + "\\..*")
|
||||||
|
.entrySet()) {
|
||||||
|
proposedConf.unset(confRemove.getKey());
|
||||||
|
confUpdate.put(confRemove.getKey(), null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void addQueue(
|
||||||
|
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
|
||||||
|
Map<String, String> confUpdate) throws IOException {
|
||||||
|
if (addInfo == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String queuePath = addInfo.getQueue();
|
||||||
|
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
|
||||||
|
if (queuePath.lastIndexOf('.') == -1) {
|
||||||
|
throw new IOException("Can't add invalid queue " + queuePath);
|
||||||
|
} else if (getSiblingQueues(queuePath, proposedConf).contains(
|
||||||
|
queueName)) {
|
||||||
|
throw new IOException("Can't add existing queue " + queuePath);
|
||||||
|
}
|
||||||
|
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
||||||
|
String[] siblings = proposedConf.getQueues(parentQueue);
|
||||||
|
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
|
||||||
|
new ArrayList<>(Arrays.asList(siblings));
|
||||||
|
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
|
||||||
|
proposedConf.setQueues(parentQueue,
|
||||||
|
siblingQueues.toArray(new String[0]));
|
||||||
|
confUpdate.put(CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueue + CapacitySchedulerConfiguration.DOT
|
||||||
|
+ CapacitySchedulerConfiguration.QUEUES,
|
||||||
|
Joiner.on(',').join(siblingQueues));
|
||||||
|
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ queuePath + CapacitySchedulerConfiguration.DOT;
|
||||||
|
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
|
||||||
|
String keyValue = kv.getValue();
|
||||||
|
if (keyValue == null || keyValue.isEmpty()) {
|
||||||
|
proposedConf.unset(keyPrefix + kv.getKey());
|
||||||
|
confUpdate.put(keyPrefix + kv.getKey(), null);
|
||||||
|
} else {
|
||||||
|
proposedConf.set(keyPrefix + kv.getKey(), keyValue);
|
||||||
|
confUpdate.put(keyPrefix + kv.getKey(), keyValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
// Unset Ordering Policy of Parent Queue converted from
|
||||||
|
// Leaf Queue after addQueue
|
||||||
|
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
||||||
|
if (siblingQueues.size() == 1) {
|
||||||
|
proposedConf.unset(queueOrderingPolicy);
|
||||||
|
confUpdate.put(queueOrderingPolicy, null);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static void updateQueue(QueueConfigInfo updateInfo,
|
||||||
|
CapacitySchedulerConfiguration proposedConf,
|
||||||
|
Map<String, String> confUpdate) {
|
||||||
|
if (updateInfo == null) {
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
String queuePath = updateInfo.getQueue();
|
||||||
|
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
||||||
|
+ queuePath + CapacitySchedulerConfiguration.DOT;
|
||||||
|
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
|
||||||
|
String keyValue = kv.getValue();
|
||||||
|
if (keyValue == null || keyValue.isEmpty()) {
|
||||||
|
proposedConf.unset(keyPrefix + kv.getKey());
|
||||||
|
confUpdate.put(keyPrefix + kv.getKey(), null);
|
||||||
|
} else {
|
||||||
|
proposedConf.set(keyPrefix + kv.getKey(), keyValue);
|
||||||
|
confUpdate.put(keyPrefix + kv.getKey(), keyValue);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private static List<String> getSiblingQueues(String queuePath, Configuration conf) {
|
||||||
|
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
||||||
|
String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
|
||||||
|
parentQueue + CapacitySchedulerConfiguration.DOT +
|
||||||
|
CapacitySchedulerConfiguration.QUEUES;
|
||||||
|
return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
|
||||||
|
}
|
||||||
|
}
|
@ -19,7 +19,6 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
import org.apache.hadoop.thirdparty.com.google.common.base.Joiner;
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
@ -31,19 +30,12 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.MutableConfigurationProvider;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf.YarnConfigurationStore.LogMutation;
|
||||||
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
|
||||||
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
|
||||||
import java.util.Arrays;
|
|
||||||
import java.util.HashMap;
|
|
||||||
import java.util.List;
|
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ORDERING_POLICY;
|
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* CS configuration provider which implements
|
* CS configuration provider which implements
|
||||||
* {@link MutableConfigurationProvider} for modifying capacity scheduler
|
* {@link MutableConfigurationProvider} for modifying capacity scheduler
|
||||||
@ -79,15 +71,7 @@ protected Configuration getInitSchedulerConfig() {
|
|||||||
@Override
|
@Override
|
||||||
public void init(Configuration config) throws IOException {
|
public void init(Configuration config) throws IOException {
|
||||||
this.confStore = YarnConfigurationStoreFactory.getStore(config);
|
this.confStore = YarnConfigurationStoreFactory.getStore(config);
|
||||||
Configuration initialSchedConf = getInitSchedulerConfig();
|
initializeSchedConf();
|
||||||
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
|
||||||
this.schedConf = new Configuration(false);
|
|
||||||
// We need to explicitly set the key-values in schedConf, otherwise
|
|
||||||
// these configuration keys cannot be deleted when
|
|
||||||
// configuration is reloaded.
|
|
||||||
for (Map.Entry<String, String> kv : initialSchedConf) {
|
|
||||||
schedConf.set(kv.getKey(), kv.getValue());
|
|
||||||
}
|
|
||||||
try {
|
try {
|
||||||
confStore.initialize(config, schedConf, rmContext);
|
confStore.initialize(config, schedConf, rmContext);
|
||||||
confStore.checkVersion();
|
confStore.checkVersion();
|
||||||
@ -108,7 +92,7 @@ public void close() throws IOException {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public YarnConfigurationStore getConfStore() {
|
protected YarnConfigurationStore getConfStore() {
|
||||||
return confStore;
|
return confStore;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -142,7 +126,7 @@ public LogMutation logAndApplyMutation(UserGroupInformation user,
|
|||||||
CapacitySchedulerConfiguration proposedConf =
|
CapacitySchedulerConfiguration proposedConf =
|
||||||
new CapacitySchedulerConfiguration(schedConf, false);
|
new CapacitySchedulerConfiguration(schedConf, false);
|
||||||
Map<String, String> kvUpdate
|
Map<String, String> kvUpdate
|
||||||
= constructKeyValueConfUpdate(proposedConf, confUpdate);
|
= ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
|
||||||
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
|
LogMutation log = new LogMutation(kvUpdate, user.getShortUserName());
|
||||||
confStore.logMutation(log);
|
confStore.logMutation(log);
|
||||||
applyMutation(proposedConf, kvUpdate);
|
applyMutation(proposedConf, kvUpdate);
|
||||||
@ -155,7 +139,7 @@ public Configuration applyChanges(Configuration oldConfiguration,
|
|||||||
CapacitySchedulerConfiguration proposedConf =
|
CapacitySchedulerConfiguration proposedConf =
|
||||||
new CapacitySchedulerConfiguration(oldConfiguration, false);
|
new CapacitySchedulerConfiguration(oldConfiguration, false);
|
||||||
Map<String, String> kvUpdate
|
Map<String, String> kvUpdate
|
||||||
= constructKeyValueConfUpdate(proposedConf, confUpdate);
|
= ConfigurationUpdateAssembler.constructKeyValueConfUpdate(proposedConf, confUpdate);
|
||||||
applyMutation(proposedConf, kvUpdate);
|
applyMutation(proposedConf, kvUpdate);
|
||||||
return proposedConf;
|
return proposedConf;
|
||||||
}
|
}
|
||||||
@ -177,15 +161,7 @@ public void formatConfigurationInStore(Configuration config)
|
|||||||
try {
|
try {
|
||||||
confStore.format();
|
confStore.format();
|
||||||
oldConf = new Configuration(schedConf);
|
oldConf = new Configuration(schedConf);
|
||||||
Configuration initialSchedConf = new Configuration(false);
|
initializeSchedConf();
|
||||||
initialSchedConf.addResource(YarnConfiguration.CS_CONFIGURATION_FILE);
|
|
||||||
this.schedConf = new Configuration(false);
|
|
||||||
// We need to explicitly set the key-values in schedConf, otherwise
|
|
||||||
// these configuration keys cannot be deleted when
|
|
||||||
// configuration is reloaded.
|
|
||||||
for (Map.Entry<String, String> kv : initialSchedConf) {
|
|
||||||
schedConf.set(kv.getKey(), kv.getValue());
|
|
||||||
}
|
|
||||||
confStore.initialize(config, schedConf, rmContext);
|
confStore.initialize(config, schedConf, rmContext);
|
||||||
confStore.checkVersion();
|
confStore.checkVersion();
|
||||||
} catch (Exception e) {
|
} catch (Exception e) {
|
||||||
@ -195,6 +171,17 @@ public void formatConfigurationInStore(Configuration config)
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
private void initializeSchedConf() {
|
||||||
|
Configuration initialSchedConf = getInitSchedulerConfig();
|
||||||
|
this.schedConf = new Configuration(false);
|
||||||
|
// We need to explicitly set the key-values in schedConf, otherwise
|
||||||
|
// these configuration keys cannot be deleted when
|
||||||
|
// configuration is reloaded.
|
||||||
|
for (Map.Entry<String, String> kv : initialSchedConf) {
|
||||||
|
schedConf.set(kv.getKey(), kv.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void revertToOldConfig(Configuration config) throws Exception {
|
public void revertToOldConfig(Configuration config) throws Exception {
|
||||||
formatLock.writeLock().lock();
|
formatLock.writeLock().lock();
|
||||||
@ -233,147 +220,4 @@ public void reloadConfigurationFromStore() throws Exception {
|
|||||||
formatLock.readLock().unlock();
|
formatLock.readLock().unlock();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private List<String> getSiblingQueues(String queuePath, Configuration conf) {
|
|
||||||
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
|
||||||
String childQueuesKey = CapacitySchedulerConfiguration.PREFIX +
|
|
||||||
parentQueue + CapacitySchedulerConfiguration.DOT +
|
|
||||||
CapacitySchedulerConfiguration.QUEUES;
|
|
||||||
return new ArrayList<>(conf.getTrimmedStringCollection(childQueuesKey));
|
|
||||||
}
|
|
||||||
|
|
||||||
private Map<String, String> constructKeyValueConfUpdate(
|
|
||||||
CapacitySchedulerConfiguration proposedConf,
|
|
||||||
SchedConfUpdateInfo mutationInfo) throws IOException {
|
|
||||||
|
|
||||||
Map<String, String> confUpdate = new HashMap<>();
|
|
||||||
for (String queueToRemove : mutationInfo.getRemoveQueueInfo()) {
|
|
||||||
removeQueue(queueToRemove, proposedConf, confUpdate);
|
|
||||||
}
|
|
||||||
for (QueueConfigInfo addQueueInfo : mutationInfo.getAddQueueInfo()) {
|
|
||||||
addQueue(addQueueInfo, proposedConf, confUpdate);
|
|
||||||
}
|
|
||||||
for (QueueConfigInfo updateQueueInfo : mutationInfo.getUpdateQueueInfo()) {
|
|
||||||
updateQueue(updateQueueInfo, proposedConf, confUpdate);
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, String> global : mutationInfo.getGlobalParams()
|
|
||||||
.entrySet()) {
|
|
||||||
confUpdate.put(global.getKey(), global.getValue());
|
|
||||||
}
|
|
||||||
return confUpdate;
|
|
||||||
}
|
|
||||||
|
|
||||||
private void removeQueue(
|
|
||||||
String queueToRemove, CapacitySchedulerConfiguration proposedConf,
|
|
||||||
Map<String, String> confUpdate) throws IOException {
|
|
||||||
if (queueToRemove == null) {
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
String queueName = queueToRemove.substring(
|
|
||||||
queueToRemove.lastIndexOf('.') + 1);
|
|
||||||
if (queueToRemove.lastIndexOf('.') == -1) {
|
|
||||||
throw new IOException("Can't remove queue " + queueToRemove);
|
|
||||||
} else {
|
|
||||||
List<String> siblingQueues = getSiblingQueues(queueToRemove,
|
|
||||||
proposedConf);
|
|
||||||
if (!siblingQueues.contains(queueName)) {
|
|
||||||
throw new IOException("Queue " + queueToRemove + " not found");
|
|
||||||
}
|
|
||||||
siblingQueues.remove(queueName);
|
|
||||||
String parentQueuePath = queueToRemove.substring(0, queueToRemove
|
|
||||||
.lastIndexOf('.'));
|
|
||||||
proposedConf.setQueues(parentQueuePath, siblingQueues.toArray(
|
|
||||||
new String[0]));
|
|
||||||
String queuesConfig = CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
|
|
||||||
+ CapacitySchedulerConfiguration.QUEUES;
|
|
||||||
if (siblingQueues.size() == 0) {
|
|
||||||
confUpdate.put(queuesConfig, null);
|
|
||||||
// Unset Ordering Policy of Leaf Queue converted from
|
|
||||||
// Parent Queue after removeQueue
|
|
||||||
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ parentQueuePath + CapacitySchedulerConfiguration.DOT
|
|
||||||
+ ORDERING_POLICY;
|
|
||||||
proposedConf.unset(queueOrderingPolicy);
|
|
||||||
confUpdate.put(queueOrderingPolicy, null);
|
|
||||||
} else {
|
|
||||||
confUpdate.put(queuesConfig, Joiner.on(',').join(siblingQueues));
|
|
||||||
}
|
|
||||||
for (Map.Entry<String, String> confRemove : proposedConf.getValByRegex(
|
|
||||||
".*" + queueToRemove.replaceAll("\\.", "\\.") + "\\..*")
|
|
||||||
.entrySet()) {
|
|
||||||
proposedConf.unset(confRemove.getKey());
|
|
||||||
confUpdate.put(confRemove.getKey(), null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void addQueue(
|
|
||||||
QueueConfigInfo addInfo, CapacitySchedulerConfiguration proposedConf,
|
|
||||||
Map<String, String> confUpdate) throws IOException {
|
|
||||||
if (addInfo == null) {
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
String queuePath = addInfo.getQueue();
|
|
||||||
String queueName = queuePath.substring(queuePath.lastIndexOf('.') + 1);
|
|
||||||
if (queuePath.lastIndexOf('.') == -1) {
|
|
||||||
throw new IOException("Can't add invalid queue " + queuePath);
|
|
||||||
} else if (getSiblingQueues(queuePath, proposedConf).contains(
|
|
||||||
queueName)) {
|
|
||||||
throw new IOException("Can't add existing queue " + queuePath);
|
|
||||||
}
|
|
||||||
String parentQueue = queuePath.substring(0, queuePath.lastIndexOf('.'));
|
|
||||||
String[] siblings = proposedConf.getQueues(parentQueue);
|
|
||||||
List<String> siblingQueues = siblings == null ? new ArrayList<>() :
|
|
||||||
new ArrayList<>(Arrays.<String>asList(siblings));
|
|
||||||
siblingQueues.add(queuePath.substring(queuePath.lastIndexOf('.') + 1));
|
|
||||||
proposedConf.setQueues(parentQueue,
|
|
||||||
siblingQueues.toArray(new String[0]));
|
|
||||||
confUpdate.put(CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ parentQueue + CapacitySchedulerConfiguration.DOT
|
|
||||||
+ CapacitySchedulerConfiguration.QUEUES,
|
|
||||||
Joiner.on(',').join(siblingQueues));
|
|
||||||
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ queuePath + CapacitySchedulerConfiguration.DOT;
|
|
||||||
for (Map.Entry<String, String> kv : addInfo.getParams().entrySet()) {
|
|
||||||
if (kv.getValue() == null) {
|
|
||||||
proposedConf.unset(keyPrefix + kv.getKey());
|
|
||||||
} else {
|
|
||||||
proposedConf.set(keyPrefix + kv.getKey(), kv.getValue());
|
|
||||||
}
|
|
||||||
confUpdate.put(keyPrefix + kv.getKey(), kv.getValue());
|
|
||||||
}
|
|
||||||
// Unset Ordering Policy of Parent Queue converted from
|
|
||||||
// Leaf Queue after addQueue
|
|
||||||
String queueOrderingPolicy = CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ parentQueue + CapacitySchedulerConfiguration.DOT + ORDERING_POLICY;
|
|
||||||
if (siblingQueues.size() == 1) {
|
|
||||||
proposedConf.unset(queueOrderingPolicy);
|
|
||||||
confUpdate.put(queueOrderingPolicy, null);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void updateQueue(QueueConfigInfo updateInfo,
|
|
||||||
CapacitySchedulerConfiguration proposedConf,
|
|
||||||
Map<String, String> confUpdate) {
|
|
||||||
if (updateInfo == null) {
|
|
||||||
return;
|
|
||||||
} else {
|
|
||||||
String queuePath = updateInfo.getQueue();
|
|
||||||
String keyPrefix = CapacitySchedulerConfiguration.PREFIX
|
|
||||||
+ queuePath + CapacitySchedulerConfiguration.DOT;
|
|
||||||
for (Map.Entry<String, String> kv : updateInfo.getParams().entrySet()) {
|
|
||||||
String keyValue = kv.getValue();
|
|
||||||
if (keyValue == null || keyValue.isEmpty()) {
|
|
||||||
keyValue = null;
|
|
||||||
proposedConf.unset(keyPrefix + kv.getKey());
|
|
||||||
} else {
|
|
||||||
proposedConf.set(keyPrefix + kv.getKey(), keyValue);
|
|
||||||
}
|
|
||||||
confUpdate.put(keyPrefix + kv.getKey(), keyValue);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -0,0 +1,173 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.conf;
|
||||||
|
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.QueueConfigInfo;
|
||||||
|
import org.apache.hadoop.yarn.webapp.dao.SchedConfUpdateInfo;
|
||||||
|
import org.junit.Before;
|
||||||
|
import org.junit.Test;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.HashMap;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
|
import static org.junit.Assert.assertEquals;
|
||||||
|
import static org.junit.Assert.assertNull;
|
||||||
|
import static org.junit.Assert.assertThrows;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Tests {@link ConfigurationUpdateAssembler}.
|
||||||
|
*/
|
||||||
|
public class TestConfigurationUpdateAssembler {
|
||||||
|
|
||||||
|
private static final String A_PATH = "root.a";
|
||||||
|
private static final String B_PATH = "root.b";
|
||||||
|
private static final String C_PATH = "root.c";
|
||||||
|
|
||||||
|
private static final String CONFIG_NAME = "testConfigName";
|
||||||
|
private static final String A_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + A_PATH +
|
||||||
|
CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
|
||||||
|
private static final String B_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + B_PATH +
|
||||||
|
CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
|
||||||
|
private static final String C_CONFIG_PATH = CapacitySchedulerConfiguration.PREFIX + C_PATH +
|
||||||
|
CapacitySchedulerConfiguration.DOT + CONFIG_NAME;
|
||||||
|
private static final String ROOT_QUEUES_PATH = CapacitySchedulerConfiguration.PREFIX +
|
||||||
|
CapacitySchedulerConfiguration.ROOT + CapacitySchedulerConfiguration.DOT +
|
||||||
|
CapacitySchedulerConfiguration.QUEUES;
|
||||||
|
|
||||||
|
private static final String A_INIT_CONFIG_VALUE = "aInitValue";
|
||||||
|
private static final String A_CONFIG_VALUE = "aValue";
|
||||||
|
private static final String B_INIT_CONFIG_VALUE = "bInitValue";
|
||||||
|
private static final String B_CONFIG_VALUE = "bValue";
|
||||||
|
private static final String C_CONFIG_VALUE = "cValue";
|
||||||
|
|
||||||
|
private CapacitySchedulerConfiguration csConfig;
|
||||||
|
|
||||||
|
@Before
|
||||||
|
public void setUp() {
|
||||||
|
csConfig = crateInitialCSConfig();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddQueue() throws Exception {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateMap = new HashMap<>();
|
||||||
|
updateMap.put(CONFIG_NAME, C_CONFIG_VALUE);
|
||||||
|
QueueConfigInfo queueConfigInfo = new QueueConfigInfo(C_PATH, updateMap);
|
||||||
|
updateInfo.getAddQueueInfo().add(queueConfigInfo);
|
||||||
|
|
||||||
|
Map<String, String> configurationUpdate =
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
|
||||||
|
assertEquals(C_CONFIG_VALUE, configurationUpdate.get(C_CONFIG_PATH));
|
||||||
|
assertEquals("a,b,c", configurationUpdate.get(ROOT_QUEUES_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddExistingQueue() {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateMap = new HashMap<>();
|
||||||
|
updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
|
||||||
|
QueueConfigInfo queueConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
|
||||||
|
updateInfo.getAddQueueInfo().add(queueConfigInfo);
|
||||||
|
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testAddInvalidQueue() {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateMap = new HashMap<>();
|
||||||
|
updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
|
||||||
|
QueueConfigInfo queueConfigInfo = new QueueConfigInfo("invalidPath", updateMap);
|
||||||
|
updateInfo.getAddQueueInfo().add(queueConfigInfo);
|
||||||
|
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateQueue() throws Exception {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
Map<String, String> updateMap = new HashMap<>();
|
||||||
|
updateMap.put(CONFIG_NAME, A_CONFIG_VALUE);
|
||||||
|
QueueConfigInfo queueAConfigInfo = new QueueConfigInfo(A_PATH, updateMap);
|
||||||
|
updateInfo.getUpdateQueueInfo().add(queueAConfigInfo);
|
||||||
|
|
||||||
|
Map<String, String> updateMapQueueB = new HashMap<>();
|
||||||
|
updateMapQueueB.put(CONFIG_NAME, B_CONFIG_VALUE);
|
||||||
|
QueueConfigInfo queueBConfigInfo = new QueueConfigInfo(B_PATH, updateMapQueueB);
|
||||||
|
|
||||||
|
updateInfo.getUpdateQueueInfo().add(queueBConfigInfo);
|
||||||
|
|
||||||
|
Map<String, String> configurationUpdate =
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
|
||||||
|
assertEquals(A_CONFIG_VALUE, configurationUpdate.get(A_CONFIG_PATH));
|
||||||
|
assertEquals(B_CONFIG_VALUE, configurationUpdate.get(B_CONFIG_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveQueue() throws Exception {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
updateInfo.getRemoveQueueInfo().add(A_PATH);
|
||||||
|
|
||||||
|
Map<String, String> configurationUpdate =
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
|
||||||
|
assertTrue(configurationUpdate.containsKey(A_CONFIG_PATH));
|
||||||
|
assertNull(configurationUpdate.get(A_CONFIG_PATH));
|
||||||
|
assertEquals("b", configurationUpdate.get(ROOT_QUEUES_PATH));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveInvalidQueue() {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
updateInfo.getRemoveQueueInfo().add("invalidPath");
|
||||||
|
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRemoveNonExistingQueue() {
|
||||||
|
SchedConfUpdateInfo updateInfo = new SchedConfUpdateInfo();
|
||||||
|
updateInfo.getRemoveQueueInfo().add("root.d");
|
||||||
|
|
||||||
|
assertThrows(IOException.class, () -> {
|
||||||
|
ConfigurationUpdateAssembler.constructKeyValueConfUpdate(csConfig, updateInfo);
|
||||||
|
});
|
||||||
|
}
|
||||||
|
|
||||||
|
private CapacitySchedulerConfiguration crateInitialCSConfig() {
|
||||||
|
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||||
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a, b"});
|
||||||
|
|
||||||
|
csConf.set(A_CONFIG_PATH, A_INIT_CONFIG_VALUE);
|
||||||
|
csConf.set(B_CONFIG_PATH, B_INIT_CONFIG_VALUE);
|
||||||
|
|
||||||
|
return csConf;
|
||||||
|
}
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user