YARN-10982. Replace all occurences of queuePath with the new QueuePath class. Contributed by Tibor Kovacs
This commit is contained in:
parent
9b9e2ef87f
commit
a0d8cde133
@ -82,7 +82,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
protected final QueueAllocationSettings queueAllocationSettings;
|
||||
volatile CSQueue parent;
|
||||
protected final QueuePath queuePath;
|
||||
final String queueName;
|
||||
protected QueueNodeLabelsSettings queueNodeLabelsSettings;
|
||||
private volatile QueueAppLifetimeAndLimitSettings queueAppLifetimeSettings;
|
||||
private CSQueuePreemptionSettings preemptionSettings;
|
||||
@ -143,7 +142,6 @@ public AbstractCSQueue(CapacitySchedulerContext cs,
|
||||
this.labelManager = cs.getRMContext().getNodeLabelManager();
|
||||
this.parent = parent;
|
||||
this.queuePath = createQueuePath(parent, queueName);
|
||||
this.queueName = queuePath.getLeafName();
|
||||
this.resourceCalculator = cs.getResourceCalculator();
|
||||
this.activitiesManager = cs.getActivitiesManager();
|
||||
|
||||
@ -176,7 +174,7 @@ protected void setupConfigurableCapacities() {
|
||||
|
||||
protected void setupConfigurableCapacities(
|
||||
CapacitySchedulerConfiguration configuration) {
|
||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities,
|
||||
CSQueueUtils.loadCapacitiesByLabelsFromConf(queuePath, queueCapacities,
|
||||
configuration, this.queueNodeLabelsSettings.getConfiguredNodeLabels());
|
||||
}
|
||||
|
||||
@ -185,6 +183,11 @@ public String getQueuePath() {
|
||||
return queuePath.getFullPath();
|
||||
}
|
||||
|
||||
@Override
|
||||
public QueuePath getQueuePathObject() {
|
||||
return this.queuePath;
|
||||
}
|
||||
|
||||
@Override
|
||||
public float getCapacity() {
|
||||
return queueCapacities.getCapacity();
|
||||
@ -241,7 +244,7 @@ public String getQueueShortName() {
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
return queueName;
|
||||
return this.queuePath.getLeafName();
|
||||
}
|
||||
|
||||
@Override
|
||||
@ -279,11 +282,11 @@ void setMaxCapacity(float maximumCapacity) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueuePath(),
|
||||
CSQueueUtils.checkMaxCapacity(this.queuePath,
|
||||
queueCapacities.getCapacity(), maximumCapacity);
|
||||
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
|
||||
maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueuePath(),
|
||||
CSQueueUtils.checkAbsoluteCapacity(this.queuePath,
|
||||
queueCapacities.getAbsoluteCapacity(), absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
@ -301,11 +304,11 @@ void setMaxCapacity(String nodeLabel, float maximumCapacity) {
|
||||
writeLock.lock();
|
||||
try {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueuePath(),
|
||||
CSQueueUtils.checkMaxCapacity(this.queuePath,
|
||||
queueCapacities.getCapacity(nodeLabel), maximumCapacity);
|
||||
float absMaxCapacity = CSQueueUtils.computeAbsoluteMaximumCapacity(
|
||||
maximumCapacity, parent);
|
||||
CSQueueUtils.checkAbsoluteCapacity(getQueuePath(),
|
||||
CSQueueUtils.checkAbsoluteCapacity(this.queuePath,
|
||||
queueCapacities.getAbsoluteCapacity(nodeLabel), absMaxCapacity);
|
||||
|
||||
queueCapacities.setMaximumCapacity(maximumCapacity);
|
||||
@ -518,7 +521,7 @@ private void validateMinResourceIsNotGreaterThanMaxResource(Resource minResource
|
||||
|
||||
private void validateAbsoluteVsPercentageCapacityConfig(
|
||||
CapacityConfigType localType) {
|
||||
if (!getQueuePath().equals("root")
|
||||
if (!queuePath.isRoot()
|
||||
&& !this.capacityConfigType.equals(localType)) {
|
||||
throw new IllegalArgumentException("Queue '" + getQueuePath()
|
||||
+ "' should use either percentage based capacity"
|
||||
@ -623,8 +626,8 @@ protected QueueInfo getQueueInfo() {
|
||||
// consistency here.
|
||||
// TODO, improve this
|
||||
QueueInfo queueInfo = recordFactory.newRecordInstance(QueueInfo.class);
|
||||
queueInfo.setQueueName(queueName);
|
||||
queueInfo.setQueuePath(getQueuePath());
|
||||
queueInfo.setQueueName(queuePath.getLeafName());
|
||||
queueInfo.setQueuePath(queuePath.getFullPath());
|
||||
queueInfo.setAccessibleNodeLabels(queueNodeLabelsSettings.getAccessibleNodeLabels());
|
||||
queueInfo.setCapacity(queueCapacities.getCapacity());
|
||||
queueInfo.setMaximumCapacity(queueCapacities.getMaximumCapacity());
|
||||
|
@ -208,7 +208,7 @@ protected void validateQueueEntitlementChange(AbstractAutoCreatedLeafQueue
|
||||
if (!(newChildCap >= 0 && newChildCap < 1.0f + CSQueueUtils.EPSILON)) {
|
||||
throw new SchedulerDynamicEditException(
|
||||
"Sum of child queues should exceed 100% for auto creating parent "
|
||||
+ "queue : " + queueName);
|
||||
+ "queue : " + getQueueName());
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -20,13 +20,12 @@
|
||||
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.util.Lists;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_QUEUE_CREATION_V2_PREFIX;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.getQueuePrefix;
|
||||
@ -50,7 +49,7 @@ public class AutoCreatedQueueTemplate {
|
||||
private final Map<String, String> parentOnlyProperties = new HashMap<>();
|
||||
|
||||
public AutoCreatedQueueTemplate(CapacitySchedulerConfiguration configuration,
|
||||
String queuePath) {
|
||||
QueuePath queuePath) {
|
||||
setTemplateConfigEntries(configuration, queuePath);
|
||||
}
|
||||
|
||||
@ -155,14 +154,13 @@ public void setTemplateEntriesForChild(CapacitySchedulerConfiguration conf,
|
||||
* yarn.scheduler.capacity.root.*.auto-queue-creation-v2.template.capacity
|
||||
*/
|
||||
private void setTemplateConfigEntries(CapacitySchedulerConfiguration configuration,
|
||||
String queuePath) {
|
||||
QueuePath queuePath) {
|
||||
ConfigurationProperties configurationProperties =
|
||||
configuration.getConfigurationProperties();
|
||||
|
||||
List<String> queuePathParts = new ArrayList<>(Arrays.asList(
|
||||
queuePath.split("\\.")));
|
||||
List<String> queuePathParts = Lists.newArrayList(queuePath.iterator());
|
||||
|
||||
if (queuePathParts.size() <= 1 && !queuePath.equals(ROOT)) {
|
||||
if (queuePathParts.size() <= 1 && !queuePath.isRoot()) {
|
||||
// This is an invalid queue path
|
||||
return;
|
||||
}
|
||||
@ -175,7 +173,7 @@ private void setTemplateConfigEntries(CapacitySchedulerConfiguration configurati
|
||||
int supportedWildcardLevel = Math.min(queuePathMaxIndex - 1,
|
||||
MAX_WILDCARD_LEVEL);
|
||||
// Allow root to have template properties
|
||||
if (queuePath.equals(ROOT)) {
|
||||
if (queuePath.isRoot()) {
|
||||
supportedWildcardLevel = 0;
|
||||
}
|
||||
|
||||
|
@ -89,6 +89,12 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
|
||||
*/
|
||||
public String getQueuePath();
|
||||
|
||||
/**
|
||||
* Gets the queue path object.
|
||||
* @return the object of the queue
|
||||
*/
|
||||
QueuePath getQueuePathObject();
|
||||
|
||||
public PrivilegedEntity getPrivilegedEntity();
|
||||
|
||||
Resource getMaximumAllocation();
|
||||
|
@ -36,7 +36,7 @@ public class CSQueueUtils {
|
||||
/*
|
||||
* Used only by tests
|
||||
*/
|
||||
public static void checkMaxCapacity(String queuePath,
|
||||
public static void checkMaxCapacity(QueuePath queuePath,
|
||||
float capacity, float maximumCapacity) {
|
||||
if (maximumCapacity < 0.0f || maximumCapacity > 1.0f) {
|
||||
throw new IllegalArgumentException(
|
||||
@ -48,7 +48,7 @@ public static void checkMaxCapacity(String queuePath,
|
||||
/*
|
||||
* Used only by tests
|
||||
*/
|
||||
public static void checkAbsoluteCapacity(String queuePath,
|
||||
public static void checkAbsoluteCapacity(QueuePath queuePath,
|
||||
float absCapacity, float absMaxCapacity) {
|
||||
if (absMaxCapacity < (absCapacity - EPSILON)) {
|
||||
throw new IllegalArgumentException("Illegal call to setMaxCapacity. "
|
||||
@ -67,7 +67,7 @@ public static float computeAbsoluteMaximumCapacity(
|
||||
}
|
||||
|
||||
public static void loadCapacitiesByLabelsFromConf(
|
||||
String queuePath, QueueCapacities queueCapacities,
|
||||
QueuePath queuePath, QueueCapacities queueCapacities,
|
||||
CapacitySchedulerConfiguration csConf, Set<String> nodeLabels) {
|
||||
queueCapacities.clearConfigurableFields();
|
||||
|
||||
@ -81,7 +81,7 @@ public static void loadCapacitiesByLabelsFromConf(
|
||||
label,
|
||||
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
|
||||
queueCapacities.setWeight(label,
|
||||
csConf.getNonLabeledQueueWeight(queuePath));
|
||||
csConf.getNonLabeledQueueWeight(queuePath.getFullPath()));
|
||||
} else{
|
||||
queueCapacities.setCapacity(label,
|
||||
csConf.getLabeledQueueCapacity(queuePath, label) / 100);
|
||||
|
@ -542,15 +542,15 @@ public void setLabeledQueueWeight(String queue, String label, float weight) {
|
||||
set(getNodeLabelPrefix(queue, label) + CAPACITY, weight + WEIGHT_SUFFIX);
|
||||
}
|
||||
|
||||
public float getLabeledQueueWeight(String queue, String label) {
|
||||
String configuredValue = get(getNodeLabelPrefix(queue, label) + CAPACITY);
|
||||
public float getLabeledQueueWeight(QueuePath queue, String label) {
|
||||
String configuredValue = get(getNodeLabelPrefix(queue.getFullPath(), label) + CAPACITY);
|
||||
float weight = extractFloatValueFromWeightConfig(configuredValue);
|
||||
throwExceptionForUnexpectedWeight(weight, queue, label);
|
||||
throwExceptionForUnexpectedWeight(weight, queue.getFullPath(), label);
|
||||
return weight;
|
||||
}
|
||||
|
||||
public float getNonLabeledQueueCapacity(String queue) {
|
||||
String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
|
||||
public float getNonLabeledQueueCapacity(QueuePath queue) {
|
||||
String configuredCapacity = get(getQueuePrefix(queue.getFullPath()) + CAPACITY);
|
||||
boolean absoluteResourceConfigured = (configuredCapacity != null)
|
||||
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
|
||||
if (absoluteResourceConfigured || configuredWeightAsCapacity(
|
||||
@ -559,10 +559,10 @@ public float getNonLabeledQueueCapacity(String queue) {
|
||||
// root.From AbstractCSQueue, absolute resource will be parsed and
|
||||
// updated. Once nodes are added/removed in cluster, capacity in
|
||||
// percentage will also be re-calculated.
|
||||
return queue.equals("root") ? 100.0f : 0f;
|
||||
return queue.isRoot() ? 100.0f : 0f;
|
||||
}
|
||||
|
||||
float capacity = queue.equals("root")
|
||||
float capacity = queue.isRoot()
|
||||
? 100.0f
|
||||
: (configuredCapacity == null)
|
||||
? 0f
|
||||
@ -573,7 +573,7 @@ public float getNonLabeledQueueCapacity(String queue) {
|
||||
"Illegal " + "capacity of " + capacity + " for queue " + queue);
|
||||
}
|
||||
LOG.debug("CSConf - getCapacity: queuePrefix={}, capacity={}",
|
||||
getQueuePrefix(queue), capacity);
|
||||
getQueuePrefix(queue.getFullPath()), capacity);
|
||||
|
||||
return capacity;
|
||||
}
|
||||
@ -601,8 +601,8 @@ public void setCapacity(String queue, String absoluteResourceCapacity) {
|
||||
|
||||
}
|
||||
|
||||
public float getNonLabeledQueueMaximumCapacity(String queue) {
|
||||
String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
|
||||
public float getNonLabeledQueueMaximumCapacity(QueuePath queue) {
|
||||
String configuredCapacity = get(getQueuePrefix(queue.getFullPath()) + MAXIMUM_CAPACITY);
|
||||
boolean matcher = (configuredCapacity != null)
|
||||
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
|
||||
if (matcher) {
|
||||
@ -816,9 +816,9 @@ private float extractFloatValueFromWeightConfig(String configureValue) {
|
||||
}
|
||||
}
|
||||
|
||||
private float internalGetLabeledQueueCapacity(String queue, String label,
|
||||
private float internalGetLabeledQueueCapacity(QueuePath queue, String label,
|
||||
String suffix, float defaultValue) {
|
||||
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
|
||||
String capacityPropertyName = getNodeLabelPrefix(queue.getFullPath(), label) + suffix;
|
||||
String configuredCapacity = get(capacityPropertyName);
|
||||
boolean absoluteResourceConfigured =
|
||||
(configuredCapacity != null) && RESOURCE_PATTERN.matcher(
|
||||
@ -829,10 +829,10 @@ private float internalGetLabeledQueueCapacity(String queue, String label,
|
||||
// root.From AbstractCSQueue, absolute resource, and weight will be parsed
|
||||
// and updated separately. Once nodes are added/removed in cluster,
|
||||
// capacity is percentage will also be re-calculated.
|
||||
return queue.equals("root") ? 100.0f : defaultValue;
|
||||
return queue.isRoot() ? 100.0f : defaultValue;
|
||||
}
|
||||
|
||||
float capacity = queue.equals("root") ? 100.0f
|
||||
float capacity = queue.isRoot() ? 100.0f
|
||||
: getFloat(capacityPropertyName, defaultValue);
|
||||
if (capacity < MINIMUM_CAPACITY_VALUE
|
||||
|| capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||
@ -843,17 +843,17 @@ private float internalGetLabeledQueueCapacity(String queue, String label,
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug(
|
||||
"CSConf - getCapacityOfLabel: prefix=" + getNodeLabelPrefix(queue,
|
||||
"CSConf - getCapacityOfLabel: prefix=" + getNodeLabelPrefix(queue.getFullPath(),
|
||||
label) + ", capacity=" + capacity);
|
||||
}
|
||||
return capacity;
|
||||
}
|
||||
|
||||
public float getLabeledQueueCapacity(String queue, String label) {
|
||||
public float getLabeledQueueCapacity(QueuePath queue, String label) {
|
||||
return internalGetLabeledQueueCapacity(queue, label, CAPACITY, 0f);
|
||||
}
|
||||
|
||||
public float getLabeledQueueMaximumCapacity(String queue, String label) {
|
||||
public float getLabeledQueueMaximumCapacity(QueuePath queue, String label) {
|
||||
return internalGetLabeledQueueCapacity(queue, label, MAXIMUM_CAPACITY, 100f);
|
||||
}
|
||||
|
||||
@ -870,13 +870,13 @@ public void setDefaultNodeLabelExpression(String queue, String exp) {
|
||||
set(getQueuePrefix(queue) + DEFAULT_NODE_LABEL_EXPRESSION, exp);
|
||||
}
|
||||
|
||||
public float getMaximumAMResourcePercentPerPartition(String queue,
|
||||
public float getMaximumAMResourcePercentPerPartition(QueuePath queue,
|
||||
String label) {
|
||||
// If per-partition max-am-resource-percent is not configured,
|
||||
// use default value as max-am-resource-percent for this queue.
|
||||
return getFloat(getNodeLabelPrefix(queue, label)
|
||||
return getFloat(getNodeLabelPrefix(queue.getFullPath(), label)
|
||||
+ MAXIMUM_AM_RESOURCE_SUFFIX,
|
||||
getMaximumApplicationMasterResourcePerQueuePercent(queue));
|
||||
getMaximumApplicationMasterResourcePerQueuePercent(queue.getFullPath()));
|
||||
}
|
||||
|
||||
public void setMaximumAMResourcePercentPerPartition(String queue,
|
||||
@ -2189,6 +2189,11 @@ public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) {
|
||||
return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX;
|
||||
}
|
||||
|
||||
@Private
|
||||
public QueuePath getAutoCreatedQueueObjectTemplateConfPrefix(String queuePath) {
|
||||
return new QueuePath(queuePath, AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX);
|
||||
}
|
||||
|
||||
@Private
|
||||
public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY =
|
||||
"auto-create-child-queue.fail-on-exceeding-parent-capacity";
|
||||
@ -2565,13 +2570,13 @@ public Resource getMaximumResourceRequirement(String label, String queue,
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMinimumResourceRequirement(String label, String queue,
|
||||
public void setMinimumResourceRequirement(String label, QueuePath queue,
|
||||
Resource resource) {
|
||||
updateMinMaxResourceToConf(label, queue, resource, CAPACITY);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMaximumResourceRequirement(String label, String queue,
|
||||
public void setMaximumResourceRequirement(String label, QueuePath queue,
|
||||
Resource resource) {
|
||||
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
|
||||
}
|
||||
@ -2586,9 +2591,9 @@ public Map<String, QueueCapacityVector> parseConfiguredResourceVector(
|
||||
return queueResourceVectors;
|
||||
}
|
||||
|
||||
private void updateMinMaxResourceToConf(String label, String queue,
|
||||
private void updateMinMaxResourceToConf(String label, QueuePath queue,
|
||||
Resource resource, String type) {
|
||||
if (queue.equals("root")) {
|
||||
if (queue.isRoot()) {
|
||||
throw new IllegalArgumentException(
|
||||
"Cannot set resource, root queue will take 100% of cluster capacity");
|
||||
}
|
||||
@ -2603,9 +2608,9 @@ private void updateMinMaxResourceToConf(String label, String queue,
|
||||
+ ResourceUtils.
|
||||
getCustomResourcesStrings(resource) + "]");
|
||||
|
||||
String prefix = getQueuePrefix(queue) + type;
|
||||
String prefix = getQueuePrefix(queue.getFullPath()) + type;
|
||||
if (!label.isEmpty()) {
|
||||
prefix = getQueuePrefix(queue) + ACCESSIBLE_NODE_LABELS + DOT + label
|
||||
prefix = getQueuePrefix(queue.getFullPath()) + ACCESSIBLE_NODE_LABELS + DOT + label
|
||||
+ DOT + type;
|
||||
}
|
||||
set(prefix, resourceString.toString());
|
||||
|
@ -23,7 +23,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.SchedulerDynamicEditException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
|
||||
.FiCaSchedulerApp;
|
||||
@ -122,7 +121,7 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource)
|
||||
LOG.info(
|
||||
"Reinitialized Managed Parent Queue: [{}] with capacity [{}]"
|
||||
+ " with max capacity [{}]",
|
||||
queueName, super.getCapacity(), super.getMaximumCapacity());
|
||||
getQueueName(), super.getCapacity(), super.getMaximumCapacity());
|
||||
} catch (YarnException ye) {
|
||||
LOG.error("Exception while computing policy changes for leaf queue : "
|
||||
+ getQueuePath(), ye);
|
||||
@ -165,12 +164,12 @@ protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws
|
||||
CapacitySchedulerConfiguration conf =
|
||||
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
|
||||
builder.configuration(conf);
|
||||
String templateQueuePath = csContext.getConfiguration()
|
||||
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath());
|
||||
QueuePath templateQueuePath = csContext.getConfiguration()
|
||||
.getAutoCreatedQueueObjectTemplateConfPrefix(getQueuePath());
|
||||
|
||||
Set<String> templateConfiguredNodeLabels = csContext
|
||||
.getCapacitySchedulerQueueManager().getConfiguredNodeLabels()
|
||||
.getLabelsByQueue(templateQueuePath);
|
||||
.getLabelsByQueue(templateQueuePath.getFullPath());
|
||||
for (String nodeLabel : templateConfiguredNodeLabels) {
|
||||
Resource templateMinResource = conf.getMinimumResourceRequirement(
|
||||
nodeLabel, csContext.getConfiguration()
|
||||
|
@ -128,7 +128,7 @@ private ParentQueue(CapacitySchedulerContext cs,
|
||||
this.scheduler = cs;
|
||||
this.rootQueue = (parent == null);
|
||||
|
||||
float rawCapacity = csConf.getNonLabeledQueueCapacity(getQueuePath());
|
||||
float rawCapacity = csConf.getNonLabeledQueueCapacity(this.queuePath);
|
||||
|
||||
if (rootQueue &&
|
||||
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
|
||||
@ -161,7 +161,7 @@ protected void setupQueueConfigs(Resource clusterResource,
|
||||
writeLock.lock();
|
||||
try {
|
||||
autoCreatedQueueTemplate = new AutoCreatedQueueTemplate(
|
||||
csConf, getQueuePath());
|
||||
csConf, this.queuePath);
|
||||
super.setupQueueConfigs(clusterResource, csConf);
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<AccessType, AccessControlList> e : acls.entrySet()) {
|
||||
@ -182,7 +182,7 @@ protected void setupQueueConfigs(Resource clusterResource,
|
||||
((ParentQueue) parent).getQueueOrderingPolicyConfigName());
|
||||
queueOrderingPolicy.setQueues(childQueues);
|
||||
|
||||
LOG.info(queueName + ", " + getCapacityOrWeightString()
|
||||
LOG.info(getQueueName() + ", " + getCapacityOrWeightString()
|
||||
+ ", absoluteCapacity=" + this.queueCapacities.getAbsoluteCapacity()
|
||||
+ ", maxCapacity=" + this.queueCapacities.getMaximumCapacity()
|
||||
+ ", absoluteMaxCapacity=" + this.queueCapacities
|
||||
@ -333,7 +333,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
throw new IOException(
|
||||
"Parent Queues" + " capacity: " + parentMinResource
|
||||
+ " is less than" + " to its children:" + minRes
|
||||
+ " for queue:" + queueName);
|
||||
+ " for queue:" + getQueueName());
|
||||
}
|
||||
}
|
||||
}
|
||||
@ -355,7 +355,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
// It is wrong when percent sum != {0, 1}
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + queueName + " for label="
|
||||
+ " for children of queue " + getQueueName() + " for label="
|
||||
+ nodeLabel + ". It should be either 0 or 1.0");
|
||||
} else{
|
||||
// We also allow children's percent sum = 0 under the following
|
||||
@ -368,7 +368,7 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
> PRECISION) && (!allowZeroCapacitySum)) {
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + queueName
|
||||
+ " for children of queue " + getQueueName()
|
||||
+ " for label=" + nodeLabel
|
||||
+ ". It is set to 0, but parent percent != 0, and "
|
||||
+ "doesn't allow children capacity to set to 0");
|
||||
@ -383,8 +383,8 @@ void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
|
||||
&& !allowZeroCapacitySum) {
|
||||
throw new IOException(
|
||||
"Illegal" + " capacity sum of " + childrenPctSum
|
||||
+ " for children of queue " + queueName + " for label="
|
||||
+ nodeLabel + ". queue=" + queueName
|
||||
+ " for children of queue " + getQueueName() + " for label="
|
||||
+ nodeLabel + ". queue=" + getQueueName()
|
||||
+ " has zero capacity, but child"
|
||||
+ "queues have positive capacities");
|
||||
}
|
||||
@ -470,7 +470,7 @@ public List<QueueUserACLInfo> getQueueUserAclInfo(
|
||||
}
|
||||
|
||||
public String toString() {
|
||||
return queueName + ": " +
|
||||
return getQueueName() + ": " +
|
||||
"numChildQueue= " + childQueues.size() + ", " +
|
||||
getCapacityOrWeightString() + ", " +
|
||||
"absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " +
|
||||
@ -768,9 +768,9 @@ public void validateSubmitApplication(ApplicationId applicationId,
|
||||
String userName, String queue) throws AccessControlException {
|
||||
writeLock.lock();
|
||||
try {
|
||||
if (queue.equals(queueName)) {
|
||||
if (queue.equals(getQueueName())) {
|
||||
throw new AccessControlException(
|
||||
"Cannot submit application " + "to non-leaf queue: " + queueName);
|
||||
"Cannot submit application " + "to non-leaf queue: " + getQueueName());
|
||||
}
|
||||
|
||||
if (getState() != QueueState.RUNNING) {
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Objects;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ROOT;
|
||||
|
||||
/**
|
||||
* This is a helper class which represents a queue path, and has easy access
|
||||
@ -59,6 +60,15 @@ public QueuePath(String fullPath) {
|
||||
setFromFullPath(fullPath);
|
||||
}
|
||||
|
||||
/**
|
||||
* Concatenate queue path parts into one queue path string.
|
||||
* @param parts Parts of the full queue pathAutoCreatedQueueTemplate
|
||||
* @return full path of the given queue parts
|
||||
*/
|
||||
public static String concatenatePath(String... parts) {
|
||||
return String.join(DOT, parts);
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is responsible for splitting up a full queue path into parent
|
||||
* path and leaf name.
|
||||
@ -68,6 +78,11 @@ private void setFromFullPath(String fullPath) {
|
||||
parent = null;
|
||||
leaf = fullPath;
|
||||
|
||||
if (leaf == null) {
|
||||
leaf = "";
|
||||
return;
|
||||
}
|
||||
|
||||
int lastDotIdx = fullPath.lastIndexOf(DOT);
|
||||
if (lastDotIdx > -1) {
|
||||
parent = fullPath.substring(0, lastDotIdx).trim();
|
||||
@ -121,6 +136,14 @@ public boolean hasParent() {
|
||||
return parent != null;
|
||||
}
|
||||
|
||||
/**
|
||||
* Convenience getter to check if the queue is the root queue.
|
||||
* @return True if the path is root
|
||||
*/
|
||||
public boolean isRoot() {
|
||||
return !hasParent() && leaf.equals(ROOT);
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a new {@code QueuePath} from the current full path as parent, and
|
||||
* the appended child queue path as leaf.
|
||||
|
@ -44,18 +44,21 @@ public class TestAbsoluteResourceConfiguration {
|
||||
private static final String QUEUEA2 = "queueA2";
|
||||
private static final String QUEUEB1 = "queueB1";
|
||||
|
||||
private static final String QUEUEA_FULL = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + QUEUEA;
|
||||
private static final String QUEUEB_FULL = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + QUEUEB;
|
||||
private static final String QUEUEC_FULL = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + QUEUEC;
|
||||
private static final String QUEUED_FULL = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + QUEUED;
|
||||
private static final QueuePath QUEUEA_FULL =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT, QUEUEA);
|
||||
private static final QueuePath QUEUEB_FULL =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT, QUEUEB);
|
||||
private static final QueuePath QUEUEC_FULL =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT, QUEUEC);
|
||||
private static final QueuePath QUEUED_FULL =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT, QUEUED);
|
||||
|
||||
private static final String QUEUEA1_FULL = QUEUEA_FULL + "." + QUEUEA1;
|
||||
private static final String QUEUEA2_FULL = QUEUEA_FULL + "." + QUEUEA2;
|
||||
private static final String QUEUEB1_FULL = QUEUEB_FULL + "." + QUEUEB1;
|
||||
private static final QueuePath QUEUEA1_FULL =
|
||||
new QueuePath(QUEUEA_FULL.getFullPath() + "." + QUEUEA1);
|
||||
private static final QueuePath QUEUEA2_FULL =
|
||||
new QueuePath(QUEUEA_FULL.getFullPath() + "." + QUEUEA2);
|
||||
private static final QueuePath QUEUEB1_FULL =
|
||||
new QueuePath(QUEUEB_FULL.getFullPath() + "." + QUEUEB1);
|
||||
|
||||
private static final Resource QUEUE_A_MINRES = Resource.newInstance(100 * GB,
|
||||
10);
|
||||
@ -100,18 +103,18 @@ private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
|
||||
|
||||
// Set default capacities like normal configuration.
|
||||
if (isCapacityNeeded) {
|
||||
csConf.setCapacity(QUEUEA_FULL, 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL, 25f);
|
||||
csConf.setCapacity(QUEUED_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEA_FULL.getFullPath(), 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL.getFullPath(), 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL.getFullPath(), 25f);
|
||||
csConf.setCapacity(QUEUED_FULL.getFullPath(), 25f);
|
||||
}
|
||||
|
||||
csConf.setAutoCreateChildQueueEnabled(QUEUED_FULL, true);
|
||||
csConf.setAutoCreateChildQueueEnabled(QUEUED_FULL.getFullPath(), true);
|
||||
|
||||
// Setup leaf queue template configs
|
||||
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUED_FULL, "",
|
||||
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUED_FULL.getFullPath(), "",
|
||||
QUEUE_D_TEMPL_MINRES);
|
||||
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUED_FULL, "",
|
||||
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUED_FULL.getFullPath(), "",
|
||||
QUEUE_D_TEMPL_MAXRES);
|
||||
|
||||
return csConf;
|
||||
@ -122,17 +125,17 @@ private CapacitySchedulerConfiguration setupComplexQueueConfiguration(
|
||||
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{QUEUEA, QUEUEB, QUEUEC});
|
||||
csConf.setQueues(QUEUEA_FULL, new String[]{QUEUEA1, QUEUEA2});
|
||||
csConf.setQueues(QUEUEB_FULL, new String[]{QUEUEB1});
|
||||
csConf.setQueues(QUEUEA_FULL.getFullPath(), new String[]{QUEUEA1, QUEUEA2});
|
||||
csConf.setQueues(QUEUEB_FULL.getFullPath(), new String[]{QUEUEB1});
|
||||
|
||||
// Set default capacities like normal configuration.
|
||||
if (isCapacityNeeded) {
|
||||
csConf.setCapacity(QUEUEA_FULL, 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEA1_FULL, 50f);
|
||||
csConf.setCapacity(QUEUEA2_FULL, 50f);
|
||||
csConf.setCapacity(QUEUEB1_FULL, 100f);
|
||||
csConf.setCapacity(QUEUEA_FULL.getFullPath(), 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL.getFullPath(), 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL.getFullPath(), 25f);
|
||||
csConf.setCapacity(QUEUEA1_FULL.getFullPath(), 50f);
|
||||
csConf.setCapacity(QUEUEA2_FULL.getFullPath(), 50f);
|
||||
csConf.setCapacity(QUEUEB1_FULL.getFullPath(), 100f);
|
||||
}
|
||||
|
||||
return csConf;
|
||||
@ -140,6 +143,7 @@ private CapacitySchedulerConfiguration setupComplexQueueConfiguration(
|
||||
|
||||
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
|
||||
// Update min/max resource to queueA/B/C
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
|
||||
@ -180,22 +184,22 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue()
|
||||
|
||||
Assert.assertEquals("Min resource configured for QUEUEA is not correct",
|
||||
QUEUE_A_MINRES,
|
||||
csConf.getMinimumResourceRequirement("", QUEUEA_FULL, resourceTypes));
|
||||
csConf.getMinimumResourceRequirement("", QUEUEA_FULL.getFullPath(), resourceTypes));
|
||||
Assert.assertEquals("Max resource configured for QUEUEA is not correct",
|
||||
QUEUE_A_MAXRES,
|
||||
csConf.getMaximumResourceRequirement("", QUEUEA_FULL, resourceTypes));
|
||||
csConf.getMaximumResourceRequirement("", QUEUEA_FULL.getFullPath(), resourceTypes));
|
||||
Assert.assertEquals("Min resource configured for QUEUEB is not correct",
|
||||
QUEUE_B_MINRES,
|
||||
csConf.getMinimumResourceRequirement("", QUEUEB_FULL, resourceTypes));
|
||||
csConf.getMinimumResourceRequirement("", QUEUEB_FULL.getFullPath(), resourceTypes));
|
||||
Assert.assertEquals("Max resource configured for QUEUEB is not correct",
|
||||
QUEUE_B_MAXRES,
|
||||
csConf.getMaximumResourceRequirement("", QUEUEB_FULL, resourceTypes));
|
||||
csConf.getMaximumResourceRequirement("", QUEUEB_FULL.getFullPath(), resourceTypes));
|
||||
Assert.assertEquals("Min resource configured for QUEUEC is not correct",
|
||||
QUEUE_C_MINRES,
|
||||
csConf.getMinimumResourceRequirement("", QUEUEC_FULL, resourceTypes));
|
||||
csConf.getMinimumResourceRequirement("", QUEUEC_FULL.getFullPath(), resourceTypes));
|
||||
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
|
||||
QUEUE_C_MAXRES,
|
||||
csConf.getMaximumResourceRequirement("", QUEUEC_FULL, resourceTypes));
|
||||
csConf.getMaximumResourceRequirement("", QUEUEC_FULL.getFullPath(), resourceTypes));
|
||||
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
@ -487,9 +491,9 @@ public void testComplexValidateAbsoluteResourceConfig() throws Exception {
|
||||
// 1. Explicitly set percentage based config for parent queues. This will
|
||||
// make Queue A,B and C with percentage based and A1,A2 or B1 with absolute
|
||||
// resource.
|
||||
csConf.setCapacity(QUEUEA_FULL, 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEA_FULL.getFullPath(), 50f);
|
||||
csConf.setCapacity(QUEUEB_FULL.getFullPath(), 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL.getFullPath(), 25f);
|
||||
|
||||
// Get queue object to verify min/max resource configuration.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
@ -534,13 +538,13 @@ public void testValidateAbsoluteResourceConfig() throws Exception {
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {QUEUEA, QUEUEB});
|
||||
csConf.setQueues(QUEUEA_FULL, new String[] {QUEUEA1, QUEUEA2});
|
||||
csConf.setQueues(QUEUEA_FULL.getFullPath(), new String[] {QUEUEA1, QUEUEA2});
|
||||
|
||||
// Set default capacities like normal configuration.
|
||||
csConf.setCapacity(QUEUEA_FULL, "[memory=125]");
|
||||
csConf.setCapacity(QUEUEB_FULL, "[memory=0]");
|
||||
csConf.setCapacity(QUEUEA1_FULL, "[memory=100]");
|
||||
csConf.setCapacity(QUEUEA2_FULL, "[memory=25]");
|
||||
csConf.setCapacity(QUEUEA_FULL.getFullPath(), "[memory=125]");
|
||||
csConf.setCapacity(QUEUEB_FULL.getFullPath(), "[memory=0]");
|
||||
csConf.setCapacity(QUEUEA1_FULL.getFullPath(), "[memory=100]");
|
||||
csConf.setCapacity(QUEUEA2_FULL.getFullPath(), "[memory=25]");
|
||||
|
||||
// Update min/max resource to queueA
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
|
||||
@ -560,8 +564,8 @@ public void testValidateAbsoluteResourceConfig() throws Exception {
|
||||
// doesnt throw exception saying "Parent queue 'root.A' and
|
||||
// child queue 'root.A.A2' should use either percentage
|
||||
// based capacityconfiguration or absolute resource together for label"
|
||||
csConf.setCapacity(QUEUEA1_FULL, "[memory=125]");
|
||||
csConf.setCapacity(QUEUEA2_FULL, "[memory=0]");
|
||||
csConf.setCapacity(QUEUEA1_FULL.getFullPath(), "[memory=125]");
|
||||
csConf.setCapacity(QUEUEA2_FULL.getFullPath(), "[memory=0]");
|
||||
|
||||
// Get queue object to verify min/max resource configuration.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
@ -96,15 +96,15 @@ public void setUp() throws Exception {
|
||||
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
// Update min/max resource to queueA/B/C
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", QUEUED_FULL, QUEUE_D_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", new QueuePath(QUEUEA_FULL), QUEUE_A_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", new QueuePath(QUEUEB_FULL), QUEUE_B_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", new QueuePath(QUEUEC_FULL), QUEUE_C_MINRES);
|
||||
csConf.setMinimumResourceRequirement("", new QueuePath(QUEUED_FULL), QUEUE_D_MINRES);
|
||||
|
||||
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", QUEUED_FULL, QUEUE_D_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", new QueuePath(QUEUEA_FULL), QUEUE_A_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", new QueuePath(QUEUEB_FULL), QUEUE_B_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", new QueuePath(QUEUEC_FULL), QUEUE_C_MAXRES);
|
||||
csConf.setMaximumResourceRequirement("", new QueuePath(QUEUED_FULL), QUEUE_D_MAXRES);
|
||||
|
||||
return csConf;
|
||||
}
|
||||
|
@ -25,11 +25,12 @@
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE;
|
||||
|
||||
public class TestAutoCreatedQueueTemplate {
|
||||
private static final String TEST_QUEUE_ABC = "root.a.b.c";
|
||||
private static final String TEST_QUEUE_AB = "root.a.b";
|
||||
private static final String TEST_QUEUE_A = "root.a";
|
||||
private static final String TEST_QUEUE_B = "root.b";
|
||||
private static final QueuePath TEST_QUEUE_ABC = new QueuePath("root.a.b.c");
|
||||
private static final QueuePath TEST_QUEUE_AB = new QueuePath("root.a.b");
|
||||
private static final QueuePath TEST_QUEUE_A = new QueuePath("root.a");
|
||||
private static final QueuePath TEST_QUEUE_B = new QueuePath("root.b");
|
||||
private static final String ROOT = "root";
|
||||
|
||||
private CapacitySchedulerConfiguration conf;
|
||||
|
||||
@Before
|
||||
@ -43,13 +44,13 @@ public void setUp() throws Exception {
|
||||
|
||||
@Test
|
||||
public void testNonWildCardTemplate() {
|
||||
conf.set(getTemplateKey(TEST_QUEUE_AB, "capacity"), "6w");
|
||||
conf.set(getTemplateKey(TEST_QUEUE_AB.getFullPath(), "capacity"), "6w");
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC.getFullPath());
|
||||
|
||||
Assert.assertEquals("weight is not set", 6f,
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC.getFullPath()), 10e-6);
|
||||
|
||||
}
|
||||
|
||||
@ -58,10 +59,10 @@ public void testOneLevelWildcardTemplate() {
|
||||
conf.set(getTemplateKey("root.a.*", "capacity"), "6w");
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC.getFullPath());
|
||||
|
||||
Assert.assertEquals("weight is not set", 6f,
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC.getFullPath()), 10e-6);
|
||||
|
||||
}
|
||||
|
||||
@ -69,18 +70,18 @@ public void testOneLevelWildcardTemplate() {
|
||||
public void testIgnoredWhenRootWildcarded() {
|
||||
conf.set(getTemplateKey("*", "capacity"), "6w");
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, ROOT);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A);
|
||||
new AutoCreatedQueueTemplate(conf, new QueuePath(ROOT));
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A.getFullPath());
|
||||
|
||||
Assert.assertEquals("weight is set", -1f,
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_A), 10e-6);
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_A.getFullPath()), 10e-6);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIgnoredWhenNoParent() {
|
||||
conf.set(getTemplateKey("root", "capacity"), "6w");
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, ROOT);
|
||||
new AutoCreatedQueueTemplate(conf, new QueuePath(ROOT));
|
||||
template.setTemplateEntriesForChild(conf, ROOT);
|
||||
|
||||
Assert.assertEquals("weight is set", -1f,
|
||||
@ -95,21 +96,21 @@ public void testTemplatePrecedence() {
|
||||
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, TEST_QUEUE_AB);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_ABC.getFullPath());
|
||||
|
||||
Assert.assertEquals(
|
||||
"explicit template does not have the highest precedence", 6f,
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_ABC.getFullPath()), 10e-6);
|
||||
|
||||
CapacitySchedulerConfiguration newConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
newConf.set(getTemplateKey("root.a.*", "capacity"), "4w");
|
||||
template =
|
||||
new AutoCreatedQueueTemplate(newConf, TEST_QUEUE_AB);
|
||||
template.setTemplateEntriesForChild(newConf, TEST_QUEUE_ABC);
|
||||
template.setTemplateEntriesForChild(newConf, TEST_QUEUE_ABC.getFullPath());
|
||||
|
||||
Assert.assertEquals("precedence is invalid", 4f,
|
||||
newConf.getNonLabeledQueueWeight(TEST_QUEUE_ABC), 10e-6);
|
||||
newConf.getNonLabeledQueueWeight(TEST_QUEUE_ABC.getFullPath()), 10e-6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -117,10 +118,10 @@ public void testRootTemplate() {
|
||||
conf.set(getTemplateKey("root", "capacity"), "2w");
|
||||
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, ROOT);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A);
|
||||
new AutoCreatedQueueTemplate(conf, new QueuePath(ROOT));
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A.getFullPath());
|
||||
Assert.assertEquals("root property is not set", 2f,
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_A), 10e-6);
|
||||
conf.getNonLabeledQueueWeight(TEST_QUEUE_A.getFullPath()), 10e-6);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -133,21 +134,21 @@ public void testQueueSpecificTemplates() {
|
||||
"root", AUTO_CREATE_CHILD_QUEUE_AUTO_REMOVAL_ENABLE), false);
|
||||
|
||||
AutoCreatedQueueTemplate template =
|
||||
new AutoCreatedQueueTemplate(conf, ROOT);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A);
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_B, true);
|
||||
new AutoCreatedQueueTemplate(conf, new QueuePath(ROOT));
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_A.getFullPath());
|
||||
template.setTemplateEntriesForChild(conf, TEST_QUEUE_B.getFullPath(), true);
|
||||
|
||||
Assert.assertNull("default-node-label-expression is set for parent",
|
||||
conf.getDefaultNodeLabelExpression(TEST_QUEUE_A));
|
||||
conf.getDefaultNodeLabelExpression(TEST_QUEUE_A.getFullPath()));
|
||||
Assert.assertEquals("default-node-label-expression is not set for leaf",
|
||||
"test", conf.getDefaultNodeLabelExpression(TEST_QUEUE_B));
|
||||
"test", conf.getDefaultNodeLabelExpression(TEST_QUEUE_B.getFullPath()));
|
||||
Assert.assertFalse("auto queue removal is not disabled for parent",
|
||||
conf.isAutoExpiredDeletionEnabled(TEST_QUEUE_A));
|
||||
conf.isAutoExpiredDeletionEnabled(TEST_QUEUE_A.getFullPath()));
|
||||
Assert.assertEquals("weight should not be overridden when set by " +
|
||||
"queue type specific template",
|
||||
10f, conf.getNonLabeledQueueWeight(TEST_QUEUE_B), 10e-6);
|
||||
10f, conf.getNonLabeledQueueWeight(TEST_QUEUE_B.getFullPath()), 10e-6);
|
||||
Assert.assertEquals("weight should be set by common template",
|
||||
2f, conf.getNonLabeledQueueWeight(TEST_QUEUE_A), 10e-6);
|
||||
2f, conf.getNonLabeledQueueWeight(TEST_QUEUE_A.getFullPath()), 10e-6);
|
||||
|
||||
}
|
||||
|
||||
|
@ -299,9 +299,9 @@ public void testCapacitySchedulerAbsoluteConfWithCustomResourceType()
|
||||
// Define top-level queues
|
||||
newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b", "c"});
|
||||
newConf.setMinimumResourceRequirement("", "root.a",
|
||||
newConf.setMinimumResourceRequirement("", new QueuePath("root", "a"),
|
||||
aMINRES);
|
||||
newConf.setMaximumResourceRequirement("", "root.a",
|
||||
newConf.setMaximumResourceRequirement("", new QueuePath("root", "a"),
|
||||
aMAXRES);
|
||||
|
||||
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
|
@ -676,12 +676,15 @@ private void nodeUpdate(NodeManager nm) {
|
||||
@Test
|
||||
public void testMaximumCapacitySetup() {
|
||||
float delta = 0.0000001f;
|
||||
QueuePath queuePathA = new QueuePath(A);
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
|
||||
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,
|
||||
conf.getNonLabeledQueueMaximumCapacity(queuePathA), delta);
|
||||
conf.setMaximumCapacity(A, 50.0f);
|
||||
assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(A),delta);
|
||||
assertEquals(50.0f, conf.getNonLabeledQueueMaximumCapacity(queuePathA), delta);
|
||||
conf.setMaximumCapacity(A, -1);
|
||||
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,conf.getNonLabeledQueueMaximumCapacity(A),delta);
|
||||
assertEquals(CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE,
|
||||
conf.getNonLabeledQueueMaximumCapacity(queuePathA), delta);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -60,14 +60,14 @@ public class TestCapacitySchedulerConfigValidator {
|
||||
private static final String LEAF_A = "leafA";
|
||||
private static final String LEAF_B = "leafB";
|
||||
|
||||
private static final String PARENT_A_FULL_PATH = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + PARENT_A;
|
||||
private static final String LEAF_A_FULL_PATH = PARENT_A_FULL_PATH
|
||||
+ "." + LEAF_A;
|
||||
private static final String PARENT_B_FULL_PATH = CapacitySchedulerConfiguration.ROOT
|
||||
+ "." + PARENT_B;
|
||||
private static final String LEAF_B_FULL_PATH = PARENT_B_FULL_PATH
|
||||
+ "." + LEAF_B;
|
||||
private static final QueuePath PARENT_A_FULL_PATH =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT + "." + PARENT_A);
|
||||
private static final QueuePath LEAF_A_FULL_PATH =
|
||||
new QueuePath(PARENT_A_FULL_PATH + "." + LEAF_A);
|
||||
private static final QueuePath PARENT_B_FULL_PATH =
|
||||
new QueuePath(CapacitySchedulerConfiguration.ROOT + "." + PARENT_B);
|
||||
private static final QueuePath LEAF_B_FULL_PATH =
|
||||
new QueuePath(PARENT_B_FULL_PATH + "." + LEAF_B);
|
||||
|
||||
private final Resource A_MINRES = Resource.newInstance(16 * GB, 10);
|
||||
private final Resource B_MINRES = Resource.newInstance(32 * GB, 5);
|
||||
@ -225,7 +225,8 @@ public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxMemoryExceeded()
|
||||
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
|
||||
CapacitySchedulerConfiguration newConfiguration =
|
||||
new CapacitySchedulerConfiguration(cs.getConfiguration());
|
||||
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
|
||||
newConfiguration.setMaximumResourceRequirement("",
|
||||
LEAF_A_FULL_PATH, FULL_MAXRES);
|
||||
try {
|
||||
CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
|
||||
@ -245,7 +246,8 @@ public void testValidateCSConfigDefaultRCAbsoluteModeParentMaxVcoreExceeded() th
|
||||
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
|
||||
CapacitySchedulerConfiguration newConfiguration =
|
||||
new CapacitySchedulerConfiguration(cs.getConfiguration());
|
||||
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
|
||||
newConfiguration.setMaximumResourceRequirement("",
|
||||
LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
|
||||
try {
|
||||
CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
|
||||
@ -264,7 +266,8 @@ public void testValidateCSConfigDominantRCAbsoluteModeParentMaxMemoryExceeded()
|
||||
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
|
||||
CapacitySchedulerConfiguration newConfiguration =
|
||||
new CapacitySchedulerConfiguration(cs.getConfiguration());
|
||||
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, FULL_MAXRES);
|
||||
newConfiguration.setMaximumResourceRequirement("",
|
||||
LEAF_A_FULL_PATH, FULL_MAXRES);
|
||||
try {
|
||||
CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
|
||||
@ -284,7 +287,8 @@ public void testValidateCSConfigDominantRCAbsoluteModeParentMaxVcoreExceeded() t
|
||||
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
|
||||
CapacitySchedulerConfiguration newConfiguration =
|
||||
new CapacitySchedulerConfiguration(cs.getConfiguration());
|
||||
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
|
||||
newConfiguration.setMaximumResourceRequirement("",
|
||||
LEAF_A_FULL_PATH, VCORE_EXCEEDED_MAXRES);
|
||||
try {
|
||||
CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
|
||||
@ -304,7 +308,8 @@ public void testValidateCSConfigDominantRCAbsoluteModeParentMaxGPUExceeded() thr
|
||||
CapacitySchedulerConfiguration oldConfiguration = cs.getConfiguration();
|
||||
CapacitySchedulerConfiguration newConfiguration =
|
||||
new CapacitySchedulerConfiguration(cs.getConfiguration());
|
||||
newConfiguration.setMaximumResourceRequirement("", LEAF_A_FULL_PATH, GPU_EXCEEDED_MAXRES_GPU);
|
||||
newConfiguration.setMaximumResourceRequirement("",
|
||||
LEAF_A_FULL_PATH, GPU_EXCEEDED_MAXRES_GPU);
|
||||
try {
|
||||
CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfiguration, newConfiguration, rmContext);
|
||||
@ -595,8 +600,8 @@ private CapacitySchedulerConfiguration setupCSConfiguration(YarnConfiguration co
|
||||
|
||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[]{PARENT_A, PARENT_B});
|
||||
csConf.setQueues(PARENT_A_FULL_PATH, new String[]{LEAF_A});
|
||||
csConf.setQueues(PARENT_B_FULL_PATH, new String[]{LEAF_B});
|
||||
csConf.setQueues(PARENT_A_FULL_PATH.getFullPath(), new String[]{LEAF_A});
|
||||
csConf.setQueues(PARENT_B_FULL_PATH.getFullPath(), new String[]{LEAF_B});
|
||||
|
||||
if (useDominantRC) {
|
||||
setupGpuResourceValues();
|
||||
|
@ -131,10 +131,10 @@ private void setupSingleLevelQueuesWithAbsoluteResource(
|
||||
// Define top-level queues
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
|
||||
|
||||
conf.setMinimumResourceRequirement("", Q_A,
|
||||
conf.setMinimumResourceRequirement("", new QueuePath(Q_A),
|
||||
QUEUE_A_RESOURCE);
|
||||
|
||||
conf.setMinimumResourceRequirement("", Q_B,
|
||||
conf.setMinimumResourceRequirement("", new QueuePath(Q_B),
|
||||
QUEUE_B_RESOURCE);
|
||||
|
||||
LOG.info("Setup top-level queues a and b with absolute resource");
|
||||
|
@ -54,6 +54,16 @@ public void testEmptyPart() {
|
||||
Assert.assertFalse(queuePathWithoutEmptyPart.hasEmptyPart());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testNullPath() {
|
||||
QueuePath queuePathWithNullPath = new QueuePath(null);
|
||||
|
||||
Assert.assertNull(queuePathWithNullPath.getParent());
|
||||
Assert.assertEquals("", queuePathWithNullPath.getLeafName());
|
||||
Assert.assertEquals("", queuePathWithNullPath.getFullPath());
|
||||
Assert.assertFalse(queuePathWithNullPath.isRoot());
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIterator() {
|
||||
QueuePath queuePath = new QueuePath(TEST_QUEUE);
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||
import org.apache.hadoop.yarn.webapp.GenericExceptionHandler;
|
||||
import org.apache.hadoop.yarn.webapp.GuiceServletConfig;
|
||||
import org.apache.hadoop.yarn.webapp.JerseyTestBase;
|
||||
@ -292,9 +293,9 @@ public void testAddNestedQueue() throws Exception {
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(4, newCSConf.getQueues("root").length);
|
||||
assertEquals(2, newCSConf.getQueues("root.d").length);
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d1"),
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d1")),
|
||||
0.01f);
|
||||
assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity("root.d.d2"),
|
||||
assertEquals(75.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d.d2")),
|
||||
0.01f);
|
||||
|
||||
CapacitySchedulerConfiguration newConf = getSchedulerConf();
|
||||
@ -330,8 +331,8 @@ public void testAddWithUpdate() throws Exception {
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(4, newCSConf.getQueues("root").length);
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity("root.d"), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||
assertEquals(25.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.d")), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -576,7 +577,7 @@ public void testRemoveParentQueueWithCapacity() throws Exception {
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(2, newCSConf.getQueues("root").length);
|
||||
assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity("root.b"),
|
||||
assertEquals(100.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")),
|
||||
0.01f);
|
||||
}
|
||||
|
||||
@ -718,8 +719,8 @@ public void testUpdateQueueCapacity() throws Exception {
|
||||
assertEquals(Status.OK.getStatusCode(), response.getStatus());
|
||||
CapacitySchedulerConfiguration newCSConf =
|
||||
((CapacityScheduler) rm.getResourceScheduler()).getConfiguration();
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.a"), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity("root.b"), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.01f);
|
||||
assertEquals(50.0f, newCSConf.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.01f);
|
||||
}
|
||||
|
||||
@Test
|
||||
|
Loading…
Reference in New Issue
Block a user