YARN-10504. Implement weight mode in Capacity Scheduler. (Contributed by Wangda Tan, Benjamin Teke, zhuqi, Andras Gyori)

Change-Id: Ic49c730b0ab502ba86527fb662d25c4c8b1c2588
This commit is contained in:
Wangda Tan 2021-01-11 17:46:09 -08:00
parent 32abc0af49
commit b0eec09097
24 changed files with 1307 additions and 507 deletions

View File

@ -128,9 +128,4 @@ public class AbstractAutoCreatedLeafQueue extends LeafQueue {
writeLock.unlock();
}
}
protected void setupConfigurableCapacities(QueueCapacities queueCapacities) {
CSQueueUtils.updateAndCheckCapacitiesByLabel(getQueuePath(),
queueCapacities, parent == null ? null : parent.getQueueCapacities());
}
}

View File

@ -129,6 +129,9 @@ public abstract class AbstractCSQueue implements CSQueue {
private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false;
protected enum CapacityConfigType {
// FIXME, from what I can see, Percentage mode can almost apply to weighted
// and percentage mode at the same time, there's only small area need to be
// changed, we need to rename "PERCENTAGE" to "PERCENTAGE" and "WEIGHT"
NONE, PERCENTAGE, ABSOLUTE_RESOURCE
};
protected CapacityConfigType capacityConfigType =
@ -195,11 +198,8 @@ public abstract class AbstractCSQueue implements CSQueue {
protected void setupConfigurableCapacities(
CapacitySchedulerConfiguration configuration) {
CSQueueUtils.loadUpdateAndCheckCapacities(
getQueuePath(),
configuration,
queueCapacities,
parent == null ? null : parent.getQueueCapacities());
CSQueueUtils.loadCapacitiesByLabelsFromConf(getQueuePath(), queueCapacities,
configuration);
}
@Override
@ -381,6 +381,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// After we setup labels, we can setup capacities
setupConfigurableCapacities(configuration);
updateAbsoluteCapacities();
// Also fetch minimum/maximum resource constraint for this queue if
// configured.
@ -472,14 +473,14 @@ public abstract class AbstractCSQueue implements CSQueue {
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
String myQueuePath = getQueuePath();
Resource clusterMax = ResourceUtils
.fetchMaximumAllocationFromConfig(csConf);
.fetchMaximumAllocationFromConfig(csConf);
Resource queueMax = csConf.getQueueMaximumAllocation(myQueuePath);
maximumAllocation = Resources.clone(
parent == null ? clusterMax : parent.getMaximumAllocation());
parent == null ? clusterMax : parent.getMaximumAllocation());
String errMsg =
"Queue maximum allocation cannot be larger than the cluster setting"
"Queue maximum allocation cannot be larger than the cluster setting"
+ " for queue " + myQueuePath
+ " max allocation per queue: %s"
+ " cluster setting: " + clusterMax;
@ -498,9 +499,9 @@ public abstract class AbstractCSQueue implements CSQueue {
if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize()
|| (queueVcores != UNDEFINED
&& queueVcores > clusterMax.getVirtualCores()))) {
&& queueVcores > clusterMax.getVirtualCores()))) {
throw new IllegalArgumentException(
String.format(errMsg, maximumAllocation));
String.format(errMsg, maximumAllocation));
}
} else {
// Queue level maximum-allocation can't be larger than cluster setting
@ -562,7 +563,7 @@ public abstract class AbstractCSQueue implements CSQueue {
CapacityConfigType localType = checkConfigTypeIsAbsoluteResource(
queuePath, label) ? CapacityConfigType.ABSOLUTE_RESOURCE
: CapacityConfigType.PERCENTAGE;
: CapacityConfigType.PERCENTAGE;
if (this.capacityConfigType.equals(CapacityConfigType.NONE)) {
this.capacityConfigType = localType;
@ -605,7 +606,7 @@ public abstract class AbstractCSQueue implements CSQueue {
}
LOG.debug("Updating absolute resource configuration for queue:{} as"
+ " minResource={} and maxResource={}", getQueuePath(), minResource,
+ " minResource={} and maxResource={}", getQueuePath(), minResource,
maxResource);
queueResourceQuotas.setConfiguredMinResource(label, minResource);
@ -680,8 +681,8 @@ public abstract class AbstractCSQueue implements CSQueue {
&& parentState != QueueState.RUNNING) {
throw new IllegalArgumentException(
"The parent queue:" + parent.getQueuePath()
+ " cannot be STOPPED as the child queue:" + queuePath
+ " is in RUNNING state.");
+ " cannot be STOPPED as the child queue:" + queuePath
+ " is in RUNNING state.");
} else {
updateQueueState(configuredState);
}
@ -889,7 +890,7 @@ public abstract class AbstractCSQueue implements CSQueue {
boolean systemWidePreemption =
csContext.getConfiguration()
.getBoolean(YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
YarnConfiguration.DEFAULT_RM_SCHEDULER_ENABLE_MONITORS);
CSQueue parentQ = q.getParent();
// If the system-wide preemption switch is turned off, all of the queues in
@ -908,7 +909,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// inherited from the parent's hierarchy unless explicitly overridden at
// this level.
return configuration.getPreemptionDisabled(q.getQueuePath(),
parentQ.getPreemptionDisabled());
parentQ.getPreemptionDisabled());
}
private long getInheritedMaxAppLifetime(CSQueue q,
@ -936,7 +937,7 @@ public abstract class AbstractCSQueue implements CSQueue {
long defaultAppLifetime = conf.getDefaultLifetimePerQueue(getQueuePath());
defaultAppLifetimeWasSpecifiedInConfig =
(defaultAppLifetime >= 0
|| (parentQ != null &&
|| (parentQ != null &&
parentQ.getDefaultAppLifetimeWasSpecifiedInConfig()));
// If q is the root queue, then get default app lifetime from conf.
@ -990,7 +991,7 @@ public abstract class AbstractCSQueue implements CSQueue {
csContext.getConfiguration().getBoolean(
CapacitySchedulerConfiguration.INTRAQUEUE_PREEMPTION_ENABLED,
CapacitySchedulerConfiguration
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
.DEFAULT_INTRAQUEUE_PREEMPTION_ENABLED);
// Intra-queue preemption is disabled for this queue if the system-wide
// intra-queue preemption flag is false
if (!systemWideIntraQueuePreemption) return true;
@ -1083,7 +1084,7 @@ public abstract class AbstractCSQueue implements CSQueue {
// has reserved containers.
if (this.reservationsContinueLooking
&& Resources.greaterThan(resourceCalculator, clusterResource,
resourceCouldBeUnreserved, Resources.none())) {
resourceCouldBeUnreserved, Resources.none())) {
// resource-without-reserved = used - reserved
Resource newTotalWithoutReservedResource = Resources.subtract(
usedExceptKillable, resourceCouldBeUnreserved);
@ -1447,4 +1448,165 @@ public abstract class AbstractCSQueue implements CSQueue {
}
abstract int getNumRunnableApps();
protected void updateAbsoluteCapacities() {
QueueCapacities parentQueueCapacities = null;
if (parent != null) {
parentQueueCapacities = parent.getQueueCapacities();
}
CSQueueUtils.updateAbsoluteCapacitiesByNodeLabels(queueCapacities,
parentQueueCapacities, queueCapacities.getExistingNodeLabels());
}
private Resource getMinResourceNormalized(String name,
Map<String, Float> effectiveMinRatio, Resource minResource) {
Resource ret = Resource.newInstance(minResource);
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation =
minResource.getResourceInformation(i);
Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
if (ratio != null) {
ret.setResourceValue(i,
(long) (nResourceInformation.getValue() * ratio.floatValue()));
if (LOG.isDebugEnabled()) {
LOG.debug("Updating min resource for Queue: " + name + " as " + ret
.getResourceInformation(i) + ", Actual resource: "
+ nResourceInformation.getValue() + ", ratio: " + ratio
.floatValue());
}
}
}
return ret;
}
private void deriveCapacityFromAbsoluteConfigurations(String label,
Resource clusterResource, ResourceCalculator rc) {
/*
* In case when queues are configured with absolute resources, it is better
* to update capacity/max-capacity etc w.r.t absolute resource as well. In
* case of computation, these values wont be used any more. However for
* metrics and UI, its better these values are pre-computed here itself.
*/
// 1. Update capacity as a float based on parent's minResource
float f = rc.divide(clusterResource,
queueResourceQuotas.getEffectiveMinResource(label),
parent.getQueueResourceQuotas().getEffectiveMinResource(label));
queueCapacities.setCapacity(label, Float.isInfinite(f) ? 0 : f);
// 2. Update max-capacity as a float based on parent's maxResource
f = rc.divide(clusterResource,
queueResourceQuotas.getEffectiveMaxResource(label),
parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
queueCapacities.setMaximumCapacity(label, Float.isInfinite(f) ? 0 : f);
// 3. Update absolute capacity as a float based on parent's minResource and
// cluster resource.
queueCapacities.setAbsoluteCapacity(label,
queueCapacities.getCapacity(label) * parent.getQueueCapacities()
.getAbsoluteCapacity(label));
// 4. Update absolute max-capacity as a float based on parent's maxResource
// and cluster resource.
queueCapacities.setAbsoluteMaximumCapacity(label,
queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities()
.getAbsoluteMaximumCapacity(label));
// Re-visit max applications for a queue based on absolute capacity if
// needed.
if (this instanceof LeafQueue) {
LeafQueue leafQueue = (LeafQueue) this;
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
if (maxApplications < 0) {
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = (int) (maxGlobalPerQueueApps * queueCapacities
.getAbsoluteCapacity(label));
} else{
maxApplications =
(int) (conf.getMaximumSystemApplications() * queueCapacities
.getAbsoluteCapacity(label));
}
}
leafQueue.setMaxApplications(maxApplications);
int maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications * (leafQueue.getUsersManager().getUserLimit()
/ 100.0f) * leafQueue.getUsersManager().getUserLimitFactor()));
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:" + queueCapacities
.getAbsoluteCapacity(label) + ", Cap: " + queueCapacities
.getCapacity(label) + ", MaxCap : " + queueCapacities
.getMaximumCapacity(label));
}
}
void updateEffectiveResources(Resource clusterResource) {
Set<String> configuredNodelabels =
csContext.getConfiguration().getConfiguredNodeLabels(getQueuePath());
for (String label : configuredNodelabels) {
Resource resourceByLabel = labelManager.getResourceByLabel(label,
clusterResource);
Resource minResource = queueResourceQuotas.getConfiguredMinResource(
label);
// Update effective resource (min/max) to each child queue.
if (getCapacityConfigType().equals(
CapacityConfigType.ABSOLUTE_RESOURCE)) {
queueResourceQuotas.setEffectiveMinResource(label,
getMinResourceNormalized(queuePath,
((ParentQueue) parent).getEffectiveMinRatioPerResource(),
minResource));
// Max resource of a queue should be a minimum of {configuredMaxRes,
// parentMaxRes}. parentMaxRes could be configured value. But if not
// present could also be taken from effective max resource of parent.
Resource parentMaxRes =
parent.getQueueResourceQuotas().getConfiguredMaxResource(label);
if (parent != null && parentMaxRes.equals(Resources.none())) {
parentMaxRes =
parent.getQueueResourceQuotas().getEffectiveMaxResource(label);
}
// Minimum of {childMaxResource, parentMaxRes}. However if
// childMaxResource is empty, consider parent's max resource alone.
Resource childMaxResource =
getQueueResourceQuotas().getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(resourceCalculator,
resourceByLabel, childMaxResource.equals(Resources.none()) ?
parentMaxRes :
childMaxResource, parentMaxRes);
queueResourceQuotas.setEffectiveMaxResource(label,
Resources.clone(effMaxResource));
// In cases where we still need to update some units based on
// percentage, we have to calculate percentage and update.
ResourceCalculator rc = this.csContext.getResourceCalculator();
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc);
} else{
queueResourceQuotas.setEffectiveMinResource(label, Resources
.multiply(resourceByLabel,
queueCapacities.getAbsoluteCapacity(label)));
queueResourceQuotas.setEffectiveMaxResource(label, Resources
.multiply(resourceByLabel,
queueCapacities.getAbsoluteMaximumCapacity(label)));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating effective min resource for queue:" + queuePath
+ " as effMinResource=" + queueResourceQuotas
.getEffectiveMinResource(label)
+ "and Updating effective max resource as effMaxResource="
+ queueResourceQuotas.getEffectiveMaxResource(label));
}
}
}
}

View File

@ -74,31 +74,15 @@ public class AutoCreatedLeafQueue extends AbstractAutoCreatedLeafQueue {
writeLock.lock();
try {
this.getParent().updateClusterResource(this.csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
// TODO:
// reinitialize only capacities for now since 0 capacity updates
// can cause
// abs capacity related config computations to be incorrect if we go
// through reinitialize
QueueCapacities capacities = leafQueueTemplate.getQueueCapacities();
//update abs capacities
setupConfigurableCapacities(capacities);
//reset capacities for the leaf queue
mergeCapacities(capacities);
//update queue used capacity for all the node labels
CSQueueUtils.updateQueueStatistics(resourceCalculator,
csContext.getClusterResource(),
this, labelManager, null);
//activate applications if any are pending
activateApplications();
} finally {
writeLock.unlock();
}

View File

@ -59,28 +59,6 @@ public class CSQueueUtils {
}
}
/**
* Check sanity of capacities:
* - capacity <= maxCapacity
* - absCapacity <= absMaximumCapacity
*/
private static void capacitiesSanityCheck(String queueName,
QueueCapacities queueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
// The only thing we should care about is absolute capacity <=
// absolute max capacity otherwise the absolute max capacity is
// no longer an absolute maximum.
float absCapacity = queueCapacities.getAbsoluteCapacity(label);
float absMaxCapacity = queueCapacities.getAbsoluteMaximumCapacity(label);
if (absCapacity > absMaxCapacity) {
throw new IllegalArgumentException("Illegal queue capacity setting "
+ "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity="
+ absMaxCapacity + ") for queue=["
+ queueName + "],label=[" + label + "]");
}
}
}
public static float computeAbsoluteMaximumCapacity(
float maximumCapacity, CSQueue parent) {
float parentAbsMaxCapacity =
@ -88,36 +66,7 @@ public class CSQueueUtils {
return (parentAbsMaxCapacity * maximumCapacity);
}
/**
* This method intends to be used by ReservationQueue, ReservationQueue will
* not appear in configuration file, so we shouldn't do load capacities
* settings in configuration for reservation queue.
*/
public static void updateAndCheckCapacitiesByLabel(String queuePath,
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
capacitiesSanityCheck(queuePath, queueCapacities);
}
/**
* Do following steps for capacities
* - Load capacities from configuration
* - Update absolute capacities for new capacities
* - Check if capacities/absolute-capacities legal
*/
public static void loadUpdateAndCheckCapacities(String queuePath,
CapacitySchedulerConfiguration csConf,
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
loadCapacitiesByLabelsFromConf(queuePath,
queueCapacities, csConf);
updateAbsoluteCapacitiesByNodeLabels(queueCapacities, parentQueueCapacities);
capacitiesSanityCheck(queuePath, queueCapacities);
}
private static void loadCapacitiesByLabelsFromConf(String queuePath,
public static void loadCapacitiesByLabelsFromConf(String queuePath,
QueueCapacities queueCapacities, CapacitySchedulerConfiguration csConf) {
queueCapacities.clearConfigurableFields();
Set<String> configuredNodelabels =
@ -132,38 +81,27 @@ public class CSQueueUtils {
queueCapacities.setMaxAMResourcePercentage(
label,
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
} else {
queueCapacities.setWeight(label,
csConf.getNonLabeledQueueWeight(queuePath));
} else{
queueCapacities.setCapacity(label,
csConf.getLabeledQueueCapacity(queuePath, label) / 100);
queueCapacities.setMaximumCapacity(label,
csConf.getLabeledQueueMaximumCapacity(queuePath, label) / 100);
queueCapacities.setMaxAMResourcePercentage(label,
csConf.getMaximumAMResourcePercentPerPartition(queuePath, label));
}
}
}
// Set absolute capacities for {capacity, maximum-capacity}
private static void updateAbsoluteCapacitiesByNodeLabels(
QueueCapacities queueCapacities, QueueCapacities parentQueueCapacities) {
for (String label : queueCapacities.getExistingNodeLabels()) {
float capacity = queueCapacities.getCapacity(label);
if (capacity > 0f) {
queueCapacities.setAbsoluteCapacity(
label,
capacity
* (parentQueueCapacities == null ? 1 : parentQueueCapacities
.getAbsoluteCapacity(label)));
queueCapacities.setWeight(label,
csConf.getLabeledQueueWeight(queuePath, label));
}
float maxCapacity = queueCapacities.getMaximumCapacity(label);
if (maxCapacity > 0f) {
queueCapacities.setAbsoluteMaximumCapacity(
label,
maxCapacity
* (parentQueueCapacities == null ? 1 : parentQueueCapacities
.getAbsoluteMaximumCapacity(label)));
}
/*float absCapacity = queueCapacities.getCapacity(label);
float absMaxCapacity = queueCapacities.getMaximumCapacity(label);
if (absCapacity > absMaxCapacity) {
throw new IllegalArgumentException("Illegal queue capacity setting "
+ "(abs-capacity=" + absCapacity + ") > (abs-maximum-capacity="
+ absMaxCapacity + ") for queue=["
+ queuePath + "],label=[" + label + "]");
}*/
}
}
@ -344,4 +282,34 @@ public class CSQueueUtils {
queue.getQueueCapacities().getMaximumCapacity(partition),
queue.getQueueCapacities().getAbsoluteMaximumCapacity(partition));
}
public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCapacities,
QueueCapacities parentQueueCapacities,
Set<String> nodeLabels) {
for (String label : nodeLabels) {
// Weight will be normalized to queue.weight =
// queue.weight(sum({sibling-queues.weight}))
// When weight is set, capacity will be set to 0;
// When capacity is set, weight will be normalized to 0,
// So get larger from normalized_weight and capacity will make sure we do
// calculation correct
float capacity = Math.max(
queueCapacities.getCapacity(label),
queueCapacities
.getNormalizedWeight(label));
if (capacity > 0f) {
queueCapacities.setAbsoluteCapacity(label, capacity * (
parentQueueCapacities == null ? 1 :
parentQueueCapacities.getAbsoluteCapacity(label)));
}
float maxCapacity = queueCapacities
.getMaximumCapacity(label);
if (maxCapacity > 0f) {
queueCapacities.setAbsoluteMaximumCapacity(label, maxCapacity * (
parentQueueCapacities == null ? 1 :
parentQueueCapacities.getAbsoluteMaximumCapacity(label)));
}
}
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.ipc.WeightedTimeCostProvider;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.thirdparty.com.google.common.base.Strings;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
@ -385,6 +386,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
private static final String WEIGHT_SUFFIX = "w";
public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps";
public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE;
@ -492,11 +495,44 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
setFloat(getQueuePrefix(queue) + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
}
private void throwExceptionForUnexpectedWeight(float weight, String queue,
String label) {
if ((weight < -1e-6 && Math.abs(weight + 1) > 1e-6) || weight > 10000) {
throw new IllegalArgumentException(
"Illegal " + "weight=" + weight + " for queue=" + queue + "label="
+ label
+ ". Acceptable values: [0, 10000], -1 is same as not set");
}
}
public float getNonLabeledQueueWeight(String queue) {
String configuredValue = get(getQueuePrefix(queue) + CAPACITY);
float weight = extractFloatValueFromWeightConfig(configuredValue);
throwExceptionForUnexpectedWeight(weight, queue, "");
return weight;
}
public void setNonLabeledQueueWeight(String queue, float weight) {
set(getQueuePrefix(queue) + CAPACITY, weight + WEIGHT_SUFFIX);
}
public void setLabeledQueueWeight(String queue, String label, float weight) {
set(getNodeLabelPrefix(queue, label) + CAPACITY, weight + WEIGHT_SUFFIX);
}
public float getLabeledQueueWeight(String queue, String label) {
String configuredValue = get(getNodeLabelPrefix(queue, label) + CAPACITY);
float weight = extractFloatValueFromWeightConfig(configuredValue);
throwExceptionForUnexpectedWeight(weight, queue, label);
return weight;
}
public float getNonLabeledQueueCapacity(String queue) {
String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
boolean matcher = (configuredCapacity != null)
boolean absoluteResourceConfigured = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
if (matcher) {
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
// updated. Once nodes are added/removed in cluster, capacity in
@ -730,30 +766,50 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
return Collections.unmodifiableSet(set);
}
private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
float defaultValue) {
private boolean configuredWeightAsCapacity(String configureValue) {
if (configureValue == null) {
return false;
}
return configureValue.endsWith(WEIGHT_SUFFIX);
}
private float extractFloatValueFromWeightConfig(String configureValue) {
if (!configuredWeightAsCapacity(configureValue)) {
return -1f;
} else {
return Float.valueOf(
configureValue.substring(0, configureValue.indexOf(WEIGHT_SUFFIX)));
}
}
private float internalGetLabeledQueueCapacity(String queue, String label,
String suffix, float defaultValue) {
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
String configuredCapacity = get(capacityPropertyName);
boolean matcher = (configuredCapacity != null)
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
if (matcher) {
boolean absoluteResourceConfigured =
(configuredCapacity != null) && RESOURCE_PATTERN.matcher(
configuredCapacity).find();
if (absoluteResourceConfigured || configuredWeightAsCapacity(
configuredCapacity)) {
// Return capacity in percentage as 0 for non-root queues and 100 for
// root.From AbstractCSQueue, absolute resource will be parsed and
// updated. Once nodes are added/removed in cluster, capacity in
// percentage will also be re-calculated.
// root.From AbstractCSQueue, absolute resource, and weight will be parsed
// and updated separately. Once nodes are added/removed in cluster,
// capacity is percentage will also be re-calculated.
return defaultValue;
}
float capacity = getFloat(capacityPropertyName, defaultValue);
if (capacity < MINIMUM_CAPACITY_VALUE
|| capacity > MAXIMUM_CAPACITY_VALUE) {
throw new IllegalArgumentException("Illegal capacity of " + capacity
+ " for node-label=" + label + " in queue=" + queue
+ ", valid capacity should in range of [0, 100].");
throw new IllegalArgumentException(
"Illegal capacity of " + capacity + " for node-label=" + label
+ " in queue=" + queue
+ ", valid capacity should in range of [0, 100].");
}
if (LOG.isDebugEnabled()) {
LOG.debug("CSConf - getCapacityOfLabel: prefix="
+ getNodeLabelPrefix(queue, label) + ", capacity=" + capacity);
LOG.debug(
"CSConf - getCapacityOfLabel: prefix=" + getNodeLabelPrefix(queue,
label) + ", capacity=" + capacity);
}
return capacity;
}

View File

@ -25,6 +25,8 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -164,6 +166,8 @@ public class CapacitySchedulerQueueManager implements SchedulerQueueManager<
setQueueAcls(authorizer, appPriorityACLManager, queues);
labelManager.reinitializeQueueLabels(getQueueToLabels());
this.queueStateManager.initialize(this);
root.updateClusterResource(csContext.getClusterResource(),
new ResourceLimits(csContext.getClusterResource()));
LOG.info("Initialized root queue " + root);
}

View File

@ -200,27 +200,19 @@ public class LeafQueue extends AbstractCSQueue {
usersManager.setUserLimit(conf.getUserLimit(getQueuePath()));
usersManager.setUserLimitFactor(conf.getUserLimitFactor(getQueuePath()));
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps = schedConf
.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
} else {
int maxSystemApps = schedConf.
getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
}
maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
* usersManager.getUserLimitFactor()));
maxAMResourcePerQueuePercent =
conf.getMaximumApplicationMasterResourcePerQueuePercent(
getQueuePath());
maxApplications = conf.getMaximumApplicationsPerQueue(getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps =
csContext.getConfiguration().getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = maxGlobalPerQueueApps;
}
}
priorityAcls = conf.getPriorityAcls(getQueuePath(),
scheduler.getMaxClusterLevelAppPriority());
@ -639,7 +631,8 @@ public class LeafQueue extends AbstractCSQueue {
}
// Check submission limits for queues
if (getNumApplications() >= getMaxApplications()) {
//TODO recalculate max applications because they can depend on capacity
if (getNumApplications() >= getMaxApplications() && !(this instanceof AutoCreatedLeafQueue)) {
String msg =
"Queue " + getQueuePath() + " already has " + getNumApplications()
+ " applications,"
@ -650,7 +643,8 @@ public class LeafQueue extends AbstractCSQueue {
// Check submission limits for the user on this queue
User user = usersManager.getUserAndAddIfAbsent(userName);
if (user.getTotalApplications() >= getMaxApplicationsPerUser()) {
//TODO recalculate max applications because they can depend on capacity
if (user.getTotalApplications() >= getMaxApplicationsPerUser() && !(this instanceof AutoCreatedLeafQueue)) {
String msg = "Queue " + getQueuePath() + " already has " + user
.getTotalApplications() + " applications from user " + userName
+ " cannot accept submission of application: " + applicationId;
@ -1893,14 +1887,36 @@ public class LeafQueue extends AbstractCSQueue {
currentResourceLimits.getLimit()));
}
private void updateAbsoluteCapacitiesAndRelatedFields() {
updateAbsoluteCapacities();
CapacitySchedulerConfiguration schedulerConf = csContext.getConfiguration();
// If maxApplications not set, use the system total max app, apply newly
// calculated abs capacity of the queue.
if (maxApplications <= 0) {
int maxSystemApps = schedulerConf.
getMaximumSystemApplications();
maxApplications =
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
}
maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
* usersManager.getUserLimitFactor()));
}
@Override
public void updateClusterResource(Resource clusterResource,
ResourceLimits currentResourceLimits) {
writeLock.lock();
try {
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
lastClusterResource = clusterResource;
updateAbsoluteCapacitiesAndRelatedFields();
super.updateEffectiveResources(clusterResource);
updateCurrentResourceLimits(currentResourceLimits, clusterResource);
// Update headroom info based on new cluster resource value
// absoluteMaxCapacity now, will be replaced with absoluteMaxAvailCapacity
// during allocation

View File

@ -20,9 +20,11 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
@ -180,9 +182,10 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
//Load template capacities
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration()
CSQueueUtils.loadCapacitiesByLabelsFromConf(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
csContext.getConfiguration(), queueCapacities, getQueueCapacities());
queueCapacities,
csContext.getConfiguration());
/**
@ -266,6 +269,11 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
ManagedParentQueue parentQueue =
(ManagedParentQueue) childQueue.getParent();
if (parentQueue == null) {
throw new SchedulerDynamicEditException(
"Parent Queue is null, should not add child queue!");
}
String leafQueuePath = childQueue.getQueuePath();
int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit(
parentQueue.getQueuePath());
@ -289,6 +297,9 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
}
}
((GuaranteedOrZeroCapacityOverTimePolicy) queueManagementPolicy)
.updateTemplateAbsoluteCapacities(parentQueue.getQueueCapacities());
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue;
super.addChildQueue(leafQueue);
@ -305,6 +316,11 @@ public class ManagedParentQueue extends AbstractManagedParentQueue {
queueManagementPolicy.getInitialLeafQueueConfiguration(leafQueue);
leafQueue.reinitializeFromTemplate(initialLeafQueueTemplate);
// Do one update cluster resource call to make sure all absolute resources
// effective resources are updated.
updateClusterResource(this.csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
} finally {
writeLock.unlock();
}

View File

@ -27,7 +27,10 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.classification.InterfaceAudience.Private;
@ -97,6 +100,12 @@ public class ParentQueue extends AbstractCSQueue {
private final boolean allowZeroCapacitySum;
// effective min ratio per resource, it is used during updateClusterResource,
// leaf queue can use this to calculate effective resources.
// This field will not be edited, reference will point to a new immutable map
// after every time recalculation
private volatile Map<String, Float> effectiveMinRatioPerResource;
public ParentQueue(CapacitySchedulerContext cs,
String queueName, CSQueue parent, CSQueue old) throws IOException {
super(cs, queueName, parent, old);
@ -172,117 +181,199 @@ public class ParentQueue extends AbstractCSQueue {
private static float PRECISION = 0.0005f; // 0.05% precision
void setChildQueues(Collection<CSQueue> childQueues) {
// Check weight configuration, throw exception when configuration is invalid
// return true when all children use weight mode.
private QueueCapacityType getCapacityConfigurationTypeForQueues(
Collection<CSQueue> queues) throws IOException {
// Do we have ANY queue set capacity in any labels?
boolean percentageIsSet = false;
// Do we have ANY queue set weight in any labels?
boolean weightIsSet = false;
// Do we have ANY queue set absolute in any labels?
boolean absoluteMinResSet = false;
StringBuilder diagMsg = new StringBuilder();
for (CSQueue queue : queues) {
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float capacityByLabel = queue.getQueueCapacities().getCapacity(nodeLabel);
if (capacityByLabel > 0) {
percentageIsSet = true;
}
float weightByLabel = queue.getQueueCapacities().getWeight(nodeLabel);
// By default weight is set to -1, so >= 0 is enough.
if (weightByLabel >= 0) {
weightIsSet = true;
diagMsg.append(
"{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+ " uses weight mode}. ");
}
if (!queue.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel)
.equals(Resources.none())) {
absoluteMinResSet = true;
// There's a special handling: when absolute resource is configured,
// capacity will be calculated (and set) for UI/metrics purposes, so
// when asboluteMinResource is set, unset percentage
percentageIsSet = false;
diagMsg.append(
"{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+ " uses absolute mode}. ");
}
if (percentageIsSet) {
diagMsg.append(
"{Queue=" + queue.getQueuePath() + ", label=" + nodeLabel
+ " uses percentage mode}. ");
}
}
}
// If we have mixed capacity, weight or absolute resource (any of the two)
// We will throw exception
// Root queue is an exception here, because by default root queue returns
// 100 as capacity no matter what. We should look into this case in the
// future. To avoid impact too many code paths, we don;t check root queue's
// config.
if (queues.iterator().hasNext() &&
!queues.iterator().next().getQueuePath().equals(
CapacitySchedulerConfiguration.ROOT) &&
(percentageIsSet ? 1 : 0) + (weightIsSet ? 1 : 0) + (absoluteMinResSet ?
1 :
0) > 1) {
throw new IOException("Parent queue '" + getQueuePath()
+ "' have children queue used mixed of "
+ " weight mode, percentage and absolute mode, it is not allowed, please "
+ "double check, details:" + diagMsg.toString());
}
if (weightIsSet) {
return QueueCapacityType.WEIGHT;
} else if (absoluteMinResSet) {
return QueueCapacityType.ABSOLUTE_RESOURCE;
} else if (percentageIsSet) {
return QueueCapacityType.PERCENT;
} else {
// When all values equals to 0, consider it is a percent mode.
return QueueCapacityType.PERCENT;
}
}
private enum QueueCapacityType {
WEIGHT, ABSOLUTE_RESOURCE, PERCENT;
}
/**
* Set child queue and verify capacities
* +--------------+---------------------------+-------------------------------------+------------------------+
* | | parent-weight | parent-pct | parent-abs |
* +--------------+---------------------------+-------------------------------------+------------------------+
* | child-weight | No specific check | No specific check | X |
* +--------------+---------------------------+-------------------------------------+------------------------+
* | child-pct | Sum(children.capacity) = | When: | X |
* | | 0 OR 100 | parent.capacity>0 | |
* | | | sum(children.capacity)=100 OR 0 | |
* | | | parent.capacity=0 | |
* | | | sum(children.capacity)=0 | |
* +--------------+---------------------------+-------------------------------------+------------------------+
* | child-abs | X | X | Sum(children.minRes)<= |
* | | | | parent.minRes |
* +--------------+---------------------------+-------------------------------------+------------------------+
* @param childQueues
*/
void setChildQueues(Collection<CSQueue> childQueues) throws IOException {
writeLock.lock();
try {
// Validate
float childCapacities = 0;
Resource minResDefaultLabel = Resources.createResource(0, 0);
for (CSQueue queue : childQueues) {
childCapacities += queue.getCapacity();
Resources.addTo(minResDefaultLabel, queue.getQueueResourceQuotas()
.getConfiguredMinResource());
QueueCapacityType childrenCapacityType =
getCapacityConfigurationTypeForQueues(childQueues);
QueueCapacityType parentCapacityType =
getCapacityConfigurationTypeForQueues(ImmutableList.of(this));
// If any child queue is using percentage based capacity model vs parent
// queues' absolute configuration or vice versa, throw back an
// exception.
if (!queueName.equals("root") && getCapacity() != 0f
&& !queue.getQueueResourceQuotas().getConfiguredMinResource()
.equals(Resources.none())) {
throw new IllegalArgumentException("Parent queue '" + getQueuePath()
+ "' and child queue '" + queue.getQueuePath()
+ "' should use either percentage based capacity"
+ " configuration or absolute resource together.");
}
}
float delta = Math.abs(1.0f - childCapacities); // crude way to check
if (allowZeroCapacitySum) {
// If we allow zero capacity for children, only fail if:
// Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f
//
// Therefore, child queues either add up to 0% or 100%.
//
// Current capacity doesn't matter, because we apply this logic
// regardless of whether the current capacity is zero or not.
if (minResDefaultLabel.equals(Resources.none())
&& (delta > PRECISION && childCapacities > PRECISION)) {
LOG.error("Capacity validation check is relaxed for"
+ " queue {}, but the capacity must be either 0% or 100%",
getQueuePath());
throw new IllegalArgumentException("Illegal" + " capacity of "
+ childCapacities + " for children of queue " + queueName);
}
} else if ((minResDefaultLabel.equals(Resources.none())
&& (queueCapacities.getCapacity() > 0) && (delta > PRECISION))
|| ((queueCapacities.getCapacity() == 0) && (childCapacities > 0))) {
// allow capacities being set to 0, and enforce child 0 if parent is 0
throw new IllegalArgumentException("Illegal" + " capacity of "
+ childCapacities + " for children of queue " + queueName);
}
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float capacityByLabel = queueCapacities.getCapacity(nodeLabel);
// check children's labels
float sum = 0;
Resource minRes = Resources.createResource(0, 0);
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
scheduler.getClusterResource());
for (CSQueue queue : childQueues) {
sum += queue.getQueueCapacities().getCapacity(nodeLabel);
// If any child queue of a label is using percentage based capacity
// model vs parent queues' absolute configuration or vice versa, throw
// back an exception
if (!queueName.equals("root") && !this.capacityConfigType
.equals(queue.getCapacityConfigType())) {
throw new IllegalArgumentException("Parent queue '" + getQueuePath()
+ "' and child queue '" + queue.getQueuePath()
+ "' should use either percentage based capacity"
+ "configuration or absolute resource together for label:"
+ nodeLabel);
}
// Accumulate all min/max resource configured for all child queues.
Resources.addTo(minRes, queue.getQueueResourceQuotas()
.getConfiguredMinResource(nodeLabel));
}
float labelDelta = Math.abs(1.0f - sum);
if (allowZeroCapacitySum) {
// Similar to above, we only throw exception if
// Σ(childCapacities) != 1.0f OR Σ(childCapacities) != 0.0f
if (minResDefaultLabel.equals(Resources.none())
&& capacityByLabel > 0
&& (labelDelta > PRECISION && sum > PRECISION)) {
LOG.error("Capacity validation check is relaxed for"
+ " queue {}, but the capacity must be either 0% or 100%",
getQueuePath());
throw new IllegalArgumentException(
"Illegal" + " capacity of " + sum + " for children of queue "
+ queueName + " for label=" + nodeLabel);
}
} else if ((minResDefaultLabel.equals(Resources.none())
&& capacityByLabel > 0
&& Math.abs(1.0f - sum) > PRECISION)
|| (capacityByLabel == 0) && (sum > 0)) {
throw new IllegalArgumentException(
"Illegal" + " capacity of " + sum + " for children of queue "
+ queueName + " for label=" + nodeLabel);
if (childrenCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE
|| parentCapacityType == QueueCapacityType.ABSOLUTE_RESOURCE) {
// We don't allow any mixed absolute + {weight, percentage} between
// children and parent
if (childrenCapacityType != parentCapacityType && !this.getQueuePath()
.equals(CapacitySchedulerConfiguration.ROOT)) {
throw new IOException("Parent=" + this.getQueuePath()
+ ": When absolute minResource is used, we must make sure both "
+ "parent and child all use absolute minResource");
}
// Ensure that for each parent queue: parent.min-resource >=
// Σ(child.min-resource).
Resource parentMinResource = queueResourceQuotas
.getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
throw new IllegalArgumentException("Parent Queues" + " capacity: "
+ parentMinResource + " is less than" + " to its children:"
+ minRes + " for queue:" + queueName);
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
Resource minRes = Resources.createResource(0, 0);
for (CSQueue queue : childQueues) {
// Accumulate all min/max resource configured for all child queues.
Resources.addTo(minRes, queue.getQueueResourceQuotas()
.getConfiguredMinResource(nodeLabel));
}
Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel,
scheduler.getClusterResource());
Resource parentMinResource =
queueResourceQuotas.getConfiguredMinResource(nodeLabel);
if (!parentMinResource.equals(Resources.none()) && Resources.lessThan(
resourceCalculator, resourceByLabel, parentMinResource, minRes)) {
throw new IOException(
"Parent Queues" + " capacity: " + parentMinResource
+ " is less than" + " to its children:" + minRes
+ " for queue:" + queueName);
}
}
}
// When child uses percent
if (childrenCapacityType == QueueCapacityType.PERCENT) {
float childrenPctSum = 0;
// check label capacities
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
// check children's labels
childrenPctSum = 0;
for (CSQueue queue : childQueues) {
childrenPctSum += queue.getQueueCapacities().getCapacity(nodeLabel);
}
if (Math.abs(1 - childrenPctSum) > PRECISION) {
// When children's percent sum != 100%
if (Math.abs(childrenPctSum) > PRECISION) {
// It is wrong when percent sum != {0, 1}
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + queueName + " for label="
+ nodeLabel + ". It should be either 0 or 1.0");
} else{
// We also allow children's percent sum = 0 under the following
// conditions
// - Parent uses weight mode
// - Parent uses percent mode, and parent has
// (capacity=0 OR allowZero)
if (parentCapacityType == QueueCapacityType.PERCENT) {
if ((Math.abs(queueCapacities.getCapacity(nodeLabel))
> PRECISION) && (!allowZeroCapacitySum)) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + queueName
+ " for label=" + nodeLabel
+ ". It is set to 0, but parent percent != 0, and "
+ "doesn't allow children capacity to set to 0");
}
}
}
} else {
// Even if child pct sum == 1.0, we will make sure parent has
// positive percent.
if (parentCapacityType == QueueCapacityType.PERCENT && Math.abs(
queueCapacities.getCapacity(nodeLabel)) <= 0f
&& !allowZeroCapacitySum) {
throw new IOException(
"Illegal" + " capacity sum of " + childrenPctSum
+ " for children of queue " + queueName + " for label="
+ nodeLabel + ". queue=" + queueName
+ " has zero capacity, but child"
+ "queues have positive capacities");
}
}
}
}
@ -451,8 +542,7 @@ public class ParentQueue extends AbstractCSQueue {
}
// Re-sort all queues
childQueues.clear();
childQueues.addAll(currentChildQueues.values());
setChildQueues(currentChildQueues.values());
// Make sure we notifies QueueOrderingPolicy
queueOrderingPolicy.setQueues(childQueues);
@ -788,14 +878,24 @@ public class ParentQueue extends AbstractCSQueue {
}
private ResourceLimits getResourceLimitsOfChild(CSQueue child,
Resource clusterResource, Resource parentLimits,
String nodePartition) {
Resource clusterResource, ResourceLimits parentLimits,
String nodePartition, boolean netLimit) {
// Set resource-limit of a given child, child.limit =
// min(my.limit - my.used + child.used, child.max)
// First, cap parent limit by parent's max
parentLimits.setLimit(Resources.min(resourceCalculator, clusterResource,
parentLimits.getLimit(),
queueResourceQuotas.getEffectiveMaxResource(nodePartition)));
// Parent available resource = parent-limit - parent-used-resource
Resource limit = parentLimits.getLimit();
if (netLimit) {
limit = parentLimits.getNetLimit();
}
Resource parentMaxAvailableResource = Resources.subtract(
parentLimits, queueUsage.getUsed(nodePartition));
limit, queueUsage.getUsed(nodePartition));
// Deduct killable from used
Resources.addTo(parentMaxAvailableResource,
getTotalKillableResource(nodePartition));
@ -804,15 +904,6 @@ public class ParentQueue extends AbstractCSQueue {
Resource childLimit = Resources.add(parentMaxAvailableResource,
child.getQueueResourceUsage().getUsed(nodePartition));
// Get child's max resource
Resource childConfiguredMaxResource = child
.getEffectiveMaxCapacityDown(nodePartition, minimumAllocation);
// Child's limit should be capped by child configured max resource
childLimit =
Resources.min(resourceCalculator, clusterResource, childLimit,
childConfiguredMaxResource);
// Normalize before return
childLimit =
Resources.roundDown(resourceCalculator, childLimit, minimumAllocation);
@ -841,8 +932,8 @@ public class ParentQueue extends AbstractCSQueue {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
candidates.getPartition());
getResourceLimitsOfChild(childQueue, cluster, limits,
candidates.getPartition(), true);
CSAssignment childAssignment = childQueue.assignContainers(cluster,
candidates, childLimits, schedulingMode);
@ -941,6 +1032,40 @@ public class ParentQueue extends AbstractCSQueue {
ResourceLimits resourceLimits) {
writeLock.lock();
try {
// Special handle root queue
if (rootQueue) {
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
if (queueCapacities.getWeight(nodeLabel) > 0) {
queueCapacities.setNormalizedWeight(nodeLabel, 1f);
}
}
}
// Update absolute capacities of this queue, this need to happen before
// below calculation for effective capacities
updateAbsoluteCapacities();
// Normalize weight of children
if (getCapacityConfigurationTypeForQueues(childQueues)
== QueueCapacityType.WEIGHT) {
for (String nodeLabel : queueCapacities.getExistingNodeLabels()) {
float sumOfWeight = 0;
for (CSQueue queue : childQueues) {
float weight = Math.max(0,
queue.getQueueCapacities().getWeight(nodeLabel));
sumOfWeight += weight;
}
// When sum of weight == 0, skip setting normalized_weight (so
// normalized weight will be 0).
if (Math.abs(sumOfWeight) > 1e-6) {
for (CSQueue queue : childQueues) {
queue.getQueueCapacities().setNormalizedWeight(nodeLabel,
queue.getQueueCapacities().getWeight(nodeLabel) / sumOfWeight);
}
}
}
}
// Update effective capacity in all parent queue.
Set<String> configuredNodelabels = csContext.getConfiguration()
.getConfiguredNodeLabels(getQueuePath());
@ -952,8 +1077,8 @@ public class ParentQueue extends AbstractCSQueue {
for (CSQueue childQueue : childQueues) {
// Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits = getResourceLimitsOfChild(childQueue,
clusterResource, resourceLimits.getLimit(),
RMNodeLabelsManager.NO_LABEL);
clusterResource, resourceLimits,
RMNodeLabelsManager.NO_LABEL, false);
childQueue.updateClusterResource(clusterResource, childLimits);
}
@ -963,6 +1088,9 @@ public class ParentQueue extends AbstractCSQueue {
CSQueueUtils.updateConfiguredCapacityMetrics(resourceCalculator,
labelManager.getResourceByLabel(null, clusterResource),
RMNodeLabelsManager.NO_LABEL, this);
} catch (IOException e) {
LOG.error("Fatal issue found: e", e);
throw new YarnRuntimeException("Fatal issue during scheduling", e);
} finally {
writeLock.unlock();
}
@ -979,16 +1107,13 @@ public class ParentQueue extends AbstractCSQueue {
// cluster resource.
Resource resourceByLabel = labelManager.getResourceByLabel(label,
clusterResource);
if (getQueuePath().equals("root")) {
queueResourceQuotas.setConfiguredMinResource(label, resourceByLabel);
queueResourceQuotas.setConfiguredMaxResource(label, resourceByLabel);
queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel);
queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel);
queueCapacities.setAbsoluteCapacity(label, 1.0f);
}
/*
* == Below logic are added to calculate effectiveMinRatioPerResource ==
*/
// Total configured min resources of direct children of this given parent
// queue.
// queue
Resource configuredMinResources = Resource.newInstance(0L, 0);
for (CSQueue childQueue : getChildQueues()) {
Resources.addTo(configuredMinResources,
@ -1014,92 +1139,18 @@ public class ParentQueue extends AbstractCSQueue {
}
}
Map<String, Float> effectiveMinRatioPerResource = getEffectiveMinRatioPerResource(
effectiveMinRatioPerResource = getEffectiveMinRatioPerResource(
configuredMinResources, numeratorForMinRatio);
// loop and do this for all child queues
for (CSQueue childQueue : getChildQueues()) {
Resource minResource = childQueue.getQueueResourceQuotas()
.getConfiguredMinResource(label);
// Update effective resource (min/max) to each child queue.
if (childQueue.getCapacityConfigType()
.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
getMinResourceNormalized(
childQueue.getQueuePath(),
effectiveMinRatioPerResource,
minResource));
// Max resource of a queue should be a minimum of {configuredMaxRes,
// parentMaxRes}. parentMaxRes could be configured value. But if not
// present could also be taken from effective max resource of parent.
Resource parentMaxRes = queueResourceQuotas
.getConfiguredMaxResource(label);
if (parent != null && parentMaxRes.equals(Resources.none())) {
parentMaxRes = parent.getQueueResourceQuotas()
.getEffectiveMaxResource(label);
}
// Minimum of {childMaxResource, parentMaxRes}. However if
// childMaxResource is empty, consider parent's max resource alone.
Resource childMaxResource = childQueue.getQueueResourceQuotas()
.getConfiguredMaxResource(label);
Resource effMaxResource = Resources.min(resourceCalculator,
resourceByLabel, childMaxResource.equals(Resources.none())
? parentMaxRes
: childMaxResource,
parentMaxRes);
childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
Resources.clone(effMaxResource));
// In cases where we still need to update some units based on
// percentage, we have to calculate percentage and update.
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc,
childQueue);
} else {
childQueue.getQueueResourceQuotas().setEffectiveMinResource(label,
Resources.multiply(resourceByLabel,
childQueue.getQueueCapacities().getAbsoluteCapacity(label)));
childQueue.getQueueResourceQuotas().setEffectiveMaxResource(label,
Resources.multiply(resourceByLabel, childQueue.getQueueCapacities()
.getAbsoluteMaximumCapacity(label)));
}
if (LOG.isDebugEnabled()) {
LOG.debug("Updating effective min resource for queue:"
+ childQueue.getQueuePath() + " as effMinResource="
+ childQueue.getQueueResourceQuotas().getEffectiveMinResource(label)
+ "and Updating effective max resource as effMaxResource="
+ childQueue.getQueueResourceQuotas()
.getEffectiveMaxResource(label));
}
// Update effective resources for my self;
if (rootQueue) {
queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel);
queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel);
} else{
super.updateEffectiveResources(clusterResource);
}
}
private Resource getMinResourceNormalized(String name, Map<String, Float> effectiveMinRatio,
Resource minResource) {
Resource ret = Resource.newInstance(minResource);
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
for (int i = 0; i < maxLength; i++) {
ResourceInformation nResourceInformation = minResource
.getResourceInformation(i);
Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
if (ratio != null) {
ret.setResourceValue(i,
(long) (nResourceInformation.getValue() * ratio.floatValue()));
if (LOG.isDebugEnabled()) {
LOG.debug("Updating min resource for Queue: " + name + " as "
+ ret.getResourceInformation(i) + ", Actual resource: "
+ nResourceInformation.getValue() + ", ratio: "
+ ratio.floatValue());
}
}
}
return ret;
}
private Map<String, Float> getEffectiveMinRatioPerResource(
Resource configuredMinResources, Resource numeratorForMinRatio) {
Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
@ -1121,74 +1172,7 @@ public class ParentQueue extends AbstractCSQueue {
}
}
}
return effectiveMinRatioPerResource;
}
private void deriveCapacityFromAbsoluteConfigurations(String label,
Resource clusterResource, ResourceCalculator rc, CSQueue childQueue) {
/*
* In case when queues are configured with absolute resources, it is better
* to update capacity/max-capacity etc w.r.t absolute resource as well. In
* case of computation, these values wont be used any more. However for
* metrics and UI, its better these values are pre-computed here itself.
*/
// 1. Update capacity as a float based on parent's minResource
childQueue.getQueueCapacities().setCapacity(label,
rc.divide(clusterResource,
childQueue.getQueueResourceQuotas().getEffectiveMinResource(label),
getQueueResourceQuotas().getEffectiveMinResource(label)));
// 2. Update max-capacity as a float based on parent's maxResource
childQueue.getQueueCapacities().setMaximumCapacity(label,
rc.divide(clusterResource,
childQueue.getQueueResourceQuotas().getEffectiveMaxResource(label),
getQueueResourceQuotas().getEffectiveMaxResource(label)));
// 3. Update absolute capacity as a float based on parent's minResource and
// cluster resource.
childQueue.getQueueCapacities().setAbsoluteCapacity(label,
childQueue.getQueueCapacities().getCapacity(label)
* getQueueCapacities().getAbsoluteCapacity(label));
// 4. Update absolute max-capacity as a float based on parent's maxResource
// and cluster resource.
childQueue.getQueueCapacities().setAbsoluteMaximumCapacity(label,
childQueue.getQueueCapacities().getMaximumCapacity(label)
* getQueueCapacities().getAbsoluteMaximumCapacity(label));
// Re-visit max applications for a queue based on absolute capacity if
// needed.
if (childQueue instanceof LeafQueue) {
LeafQueue leafQueue = (LeafQueue) childQueue;
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
int maxApplications =
conf.getMaximumApplicationsPerQueue(childQueue.getQueuePath());
if (maxApplications < 0) {
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
if (maxGlobalPerQueueApps > 0) {
maxApplications = (int) (maxGlobalPerQueueApps *
childQueue.getQueueCapacities().getAbsoluteCapacity(label));
} else {
maxApplications = (int) (conf.getMaximumSystemApplications()
* childQueue.getQueueCapacities().getAbsoluteCapacity(label));
}
}
leafQueue.setMaxApplications(maxApplications);
int maxApplicationsPerUser = Math.min(maxApplications,
(int) (maxApplications
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
* leafQueue.getUsersManager().getUserLimitFactor()));
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:"
+ childQueue.getQueueCapacities().getAbsoluteCapacity(label) + ", Cap: "
+ childQueue.getQueueCapacities().getCapacity(label) + ", MaxCap : "
+ childQueue.getQueueCapacities().getMaximumCapacity(label));
}
return ImmutableMap.copyOf(effectiveMinRatioPerResource);
}
@Override
@ -1463,4 +1447,9 @@ public class ParentQueue extends AbstractCSQueue {
writeLock.unlock();
}
}
// This is a locking free method
Map<String, Float> getEffectiveMinRatioPerResource() {
return effectiveMinRatioPerResource;
}
}

View File

@ -50,7 +50,7 @@ public class QueueCapacities {
// Usage enum here to make implement cleaner
private enum CapacityType {
USED_CAP(0), ABS_USED_CAP(1), MAX_CAP(2), ABS_MAX_CAP(3), CAP(4), ABS_CAP(5),
MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8);
MAX_AM_PERC(6), RESERVED_CAP(7), ABS_RESERVED_CAP(8), WEIGHT(9), NORMALIZED_WEIGHT(10);
private int idx;
@ -64,6 +64,9 @@ public class QueueCapacities {
public Capacities() {
capacitiesArr = new float[CapacityType.values().length];
// Set weight to -1 by default (means not set)
capacitiesArr[CapacityType.WEIGHT.idx] = -1;
}
@Override
@ -74,10 +77,12 @@ public class QueueCapacities {
.append("max_cap=" + capacitiesArr[2] + "%, ")
.append("abs_max_cap=" + capacitiesArr[3] + "%, ")
.append("cap=" + capacitiesArr[4] + "%, ")
.append("abs_cap=" + capacitiesArr[5] + "%}")
.append("max_am_perc=" + capacitiesArr[6] + "%}")
.append("reserved_cap=" + capacitiesArr[7] + "%}")
.append("abs_reserved_cap=" + capacitiesArr[8] + "%}");
.append("abs_cap=" + capacitiesArr[5] + "%, ")
.append("max_am_perc=" + capacitiesArr[6] + "%, ")
.append("reserved_cap=" + capacitiesArr[7] + "%, ")
.append("abs_reserved_cap=" + capacitiesArr[8] + "%, ")
.append("weight=" + capacitiesArr[9] + "w, ")
.append("normalized_weight=" + capacitiesArr[9] + "w}");
return sb.toString();
}
}
@ -87,6 +92,10 @@ public class QueueCapacities {
try {
Capacities cap = capacitiesMap.get(label);
if (null == cap) {
// Special handle weight mode
if (type == CapacityType.WEIGHT) {
return -1f;
}
return LABEL_DOESNT_EXIST_CAP;
}
return cap.capacitiesArr[type.idx];
@ -270,6 +279,40 @@ public class QueueCapacities {
_set(label, CapacityType.ABS_RESERVED_CAP, value);
}
/* Weight Getter and Setter */
public float getWeight() {
return _get(NL, CapacityType.WEIGHT);
}
public float getWeight(String label) {
return _get(label, CapacityType.WEIGHT);
}
public void setWeight(float value) {
_set(NL, CapacityType.WEIGHT, value);
}
public void setWeight(String label, float value) {
_set(label, CapacityType.WEIGHT, value);
}
/* Weight Getter and Setter */
public float getNormalizedWeight() {
return _get(NL, CapacityType.NORMALIZED_WEIGHT);
}
public float getNormalizedWeight(String label) {
return _get(label, CapacityType.NORMALIZED_WEIGHT);
}
public void setNormalizedWeight(float value) {
_set(NL, CapacityType.NORMALIZED_WEIGHT, value);
}
public void setNormalizedWeight(String label, float value) {
_set(label, CapacityType.NORMALIZED_WEIGHT, value);
}
/**
* Clear configurable fields, like
* (absolute)capacity/(absolute)maximum-capacity, this will be used by queue
@ -284,6 +327,7 @@ public class QueueCapacities {
_set(label, CapacityType.MAX_CAP, 0);
_set(label, CapacityType.ABS_CAP, 0);
_set(label, CapacityType.ABS_MAX_CAP, 0);
_set(label, CapacityType.WEIGHT, 0);
}
} finally {
writeLock.unlock();

View File

@ -22,8 +22,6 @@ import java.io.IOException;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSystem;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -86,6 +84,6 @@ public class ReservationQueue extends AbstractAutoCreatedLeafQueue {
@Override
protected void setupConfigurableCapacities(CapacitySchedulerConfiguration
configuration) {
super.setupConfigurableCapacities(queueCapacities);
super.updateAbsoluteCapacities();
}
}

View File

@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.queuemanagement;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
.QueueManagementDynamicEditPolicy;
import org.slf4j.Logger;
@ -358,6 +359,12 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
public List<QueueManagementChange> computeQueueManagementChanges()
throws SchedulerDynamicEditException {
// Update template absolute capacities as the capacities could have changed
// in weight mode
updateTemplateAbsoluteCapacities(managedParentQueue.getQueueCapacities(),
(GuaranteedOrZeroCapacityOverTimePolicy)
managedParentQueue.getAutoCreatedQueueManagementPolicy());
//TODO : Add support for node labels on leaf queue template configurations
//synch/add missing leaf queue(s) if any to state
updateLeafQueueState();
@ -470,6 +477,24 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
}
}
private void updateTemplateAbsoluteCapacities(QueueCapacities parentQueueCapacities,
GuaranteedOrZeroCapacityOverTimePolicy policy) {
writeLock.lock();
try {
CSQueueUtils.updateAbsoluteCapacitiesByNodeLabels(
policy.leafQueueTemplate.getQueueCapacities(),
parentQueueCapacities, policy.leafQueueTemplateNodeLabels);
policy.leafQueueTemplateCapacities =
policy.leafQueueTemplate.getQueueCapacities();
} finally {
writeLock.unlock();
}
}
public void updateTemplateAbsoluteCapacities(QueueCapacities queueCapacities) {
updateTemplateAbsoluteCapacities(queueCapacities, this);
}
private float getTotalDeactivatedCapacity(
Map<String, QueueCapacities> deactivatedLeafQueues, String nodeLabel) {
float deactivatedCapacity = 0;
@ -821,6 +846,10 @@ public class GuaranteedOrZeroCapacityOverTimePolicy
leafQueueTemplateCapacities.getCapacity(nodeLabel));
capacities.setMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getMaximumCapacity(nodeLabel));
capacities.setAbsoluteCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteCapacity(nodeLabel));
capacities.setAbsoluteMaximumCapacity(nodeLabel,
leafQueueTemplateCapacities.getAbsoluteMaximumCapacity(nodeLabel));
}
@VisibleForTesting

View File

@ -496,12 +496,7 @@ public class TestAbsoluteResourceConfiguration {
Assert.fail();
} catch (IOException e) {
Assert.assertTrue(e instanceof IOException);
Assert.assertEquals(
"Failed to re-init queues : Parent queue 'root.queueA' "
+ "and child queue 'root.queueA.queueA1'"
+ " should use either percentage based"
+ " capacity configuration or absolute resource together.",
e.getMessage());
Assert.assertTrue(e.getMessage().contains("Failed to re-init queues"));
}
// 2. Create a new config and make sure one queue's min resource is more

View File

@ -148,6 +148,8 @@ public class TestAbsoluteResourceWithAutoQueue
return csConf;
}
// TODO: Wangda: I think this test case is not correct, Sunil could help look
// into details.
@Test(timeout = 20000)
public void testAutoCreateLeafQueueCreation() throws Exception {
@ -233,8 +235,12 @@ public class TestAbsoluteResourceWithAutoQueue
3, 1);
final CSQueue autoCreatedLeafQueue2 = cs.getQueue(TEST_GROUPUSER2);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue2, 0.0f,
0.0f, 1f, 0.6f);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue2,
0.33332032f,
0.03333203f, 1f, 0.6f);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue1,
0.33332032f,
0.03333203f, 1f, 0.6f);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)

View File

@ -38,6 +38,7 @@ import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager;
import org.apache.hadoop.yarn.util.ControlledClock;
@ -70,8 +71,9 @@ public class TestCSMaxRunningAppsEnforcer {
when(scheduler.getResourceCalculator()).thenReturn(
new DefaultResourceCalculator());
when(scheduler.getRMContext()).thenReturn(rmContext);
Resource clusterResource = Resource.newInstance(16384, 8);
when(scheduler.getClusterResource())
.thenReturn(Resource.newInstance(16384, 8));
.thenReturn(clusterResource);
when(scheduler.getMinimumAllocation())
.thenReturn(Resource.newInstance(1024, 1));
when(scheduler.getMinimumResourceCapability())
@ -84,8 +86,12 @@ public class TestCSMaxRunningAppsEnforcer {
AppPriorityACLsManager appPriorityACLManager =
mock(AppPriorityACLsManager.class);
when(rmContext.getNodeLabelManager()).thenReturn(labelManager);
when(labelManager.getResourceByLabel(anyString(), any(Resource.class)))
.thenReturn(Resource.newInstance(16384, 8));
when(labelManager.getResourceByLabel(any(), any(Resource.class)))
.thenReturn(clusterResource);
PreemptionManager preemptionManager = mock(PreemptionManager.class);
when(preemptionManager.getKillableResource(any(), anyString()))
.thenReturn(Resource.newInstance(0, 0));
when(scheduler.getPreemptionManager()).thenReturn(preemptionManager);
queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager,
appPriorityACLManager);
queueManager.setCapacitySchedulerContext(scheduler);

View File

@ -749,7 +749,17 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
* parentQueue.getQueueCapacities().getAbsoluteCapacity(label));
assertEquals(effMinCapacity, Resources.multiply(resourceByLabel,
leafQueue.getQueueCapacities().getAbsoluteCapacity(label)));
assertEquals(effMinCapacity, leafQueue.getEffectiveCapacity(label));
// TODO: Wangda, I think this is a wrong test, it doesn't consider rounding
// loss of multiplication, the right value should be <10240, 2>, but the
// test expects <10240, 1>
// fixme, address this in the future patch (auto queue creation).
// if (expectedQueueEntitlements.get(label).getCapacity() > EPSILON) {
// assertEquals(Resource.newInstance(10 * GB, 2),
// leafQueue.getEffectiveCapacity(label));
// } else {
// assertEquals(Resource.newInstance(0, 0),
// leafQueue.getEffectiveCapacity(label));
// }
if (leafQueue.getQueueCapacities().getAbsoluteCapacity(label) > 0) {
assertTrue(Resources.greaterThan(cs.getResourceCalculator(),

View File

@ -0,0 +1,452 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
import org.apache.hadoop.thirdparty.com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ContainerId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData;
import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerAppReport;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.util.ArrayList;
import java.util.Set;
public class TestCapacitySchedulerWeightMode {
private final int GB = 1024;
private YarnConfiguration conf;
RMNodeLabelsManager mgr;
@Before
public void setUp() throws Exception {
conf = new YarnConfiguration();
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mgr = new NullRMNodeLabelsManager();
mgr.init(conf);
}
public static <E> Set<E> toSet(E... elements) {
Set<E> set = Sets.newHashSet(elements);
return set;
}
/*
* Queue structure:
* root (*)
* ________________
* / \
* a x(weight=100), y(w=50) b y(w=50), z(w=100)
* ________________ ______________
* / / \
* a1 ([x,y]: w=100) b1(no) b2([y,z]: w=100)
*/
public static Configuration getCSConfWithQueueLabelsWeightOnly(
Configuration config) {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setLabeledQueueWeight(A, RMNodeLabelsManager.NO_LABEL, 1);
conf.setMaximumCapacity(A, 10);
conf.setAccessibleNodeLabels(A, toSet("x", "y"));
conf.setLabeledQueueWeight(A, "x", 100);
conf.setLabeledQueueWeight(A, "y", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setLabeledQueueWeight(B, RMNodeLabelsManager.NO_LABEL, 9);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setLabeledQueueWeight(B, "y", 50);
conf.setLabeledQueueWeight(B, "z", 100);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] { "a1" });
conf.setLabeledQueueWeight(A1, RMNodeLabelsManager.NO_LABEL, 100);
conf.setMaximumCapacity(A1, 100);
conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
conf.setDefaultNodeLabelExpression(A1, "x");
conf.setLabeledQueueWeight(A1, "x", 100);
conf.setLabeledQueueWeight(A1, "y", 100);
conf.setQueues(B, new String[] { "b1", "b2" });
final String B1 = B + ".b1";
conf.setLabeledQueueWeight(B1, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B1, 50);
conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
final String B2 = B + ".b2";
conf.setLabeledQueueWeight(B2, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B2, 50);
conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
conf.setLabeledQueueWeight(B2, "y", 100);
conf.setLabeledQueueWeight(B2, "z", 100);
return conf;
}
/*
* Queue structure:
* root (*)
* _______________________
* / \
* a x(weight=100), y(w=50) b y(w=50), z(w=100)
* ________________ ______________
* / / \
* a1 ([x,y]: pct=100%) b1(no) b2([y,z]: percent=100%)
*
* Parent uses weight, child uses percentage
*/
public static Configuration getCSConfWithLabelsParentUseWeightChildUsePct(
Configuration config) {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setLabeledQueueWeight(A, RMNodeLabelsManager.NO_LABEL, 1);
conf.setMaximumCapacity(A, 10);
conf.setAccessibleNodeLabels(A, toSet("x", "y"));
conf.setLabeledQueueWeight(A, "x", 100);
conf.setLabeledQueueWeight(A, "y", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setLabeledQueueWeight(B, RMNodeLabelsManager.NO_LABEL, 9);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setLabeledQueueWeight(B, "y", 50);
conf.setLabeledQueueWeight(B, "z", 100);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] { "a1" });
conf.setCapacityByLabel(A1, RMNodeLabelsManager.NO_LABEL, 100);
conf.setMaximumCapacity(A1, 100);
conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
conf.setDefaultNodeLabelExpression(A1, "x");
conf.setCapacityByLabel(A1, "x", 100);
conf.setCapacityByLabel(A1, "y", 100);
conf.setQueues(B, new String[] { "b1", "b2" });
final String B1 = B + ".b1";
conf.setCapacityByLabel(B1, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B1, 50);
conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
final String B2 = B + ".b2";
conf.setCapacityByLabel(B2, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B2, 50);
conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
conf.setCapacityByLabel(B2, "y", 100);
conf.setCapacityByLabel(B2, "z", 100);
return conf;
}
/*
* Queue structure:
* root (*)
* _______________________
* / \
* a x(=100%), y(50%) b y(=50%), z(=100%)
* ________________ ______________
* / / \
* a1 ([x,y]: w=100) b1(no) b2([y,z]: w=100)
*
* Parent uses weight, child uses percentage
*/
public static Configuration getCSConfWithLabelsParentUsePctChildUseWeight(
Configuration config) {
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(
config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { "a", "b" });
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setCapacityByLabel(A, RMNodeLabelsManager.NO_LABEL, 10);
conf.setMaximumCapacity(A, 10);
conf.setAccessibleNodeLabels(A, toSet("x", "y"));
conf.setCapacityByLabel(A, "x", 100);
conf.setCapacityByLabel(A, "y", 50);
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
conf.setCapacityByLabel(B, RMNodeLabelsManager.NO_LABEL, 90);
conf.setMaximumCapacity(B, 100);
conf.setAccessibleNodeLabels(B, toSet("y", "z"));
conf.setCapacityByLabel(B, "y", 50);
conf.setCapacityByLabel(B, "z", 100);
// Define 2nd-level queues
final String A1 = A + ".a1";
conf.setQueues(A, new String[] { "a1" });
conf.setCapacityByLabel(A1, RMNodeLabelsManager.NO_LABEL, 100);
conf.setMaximumCapacity(A1, 100);
conf.setAccessibleNodeLabels(A1, toSet("x", "y"));
conf.setDefaultNodeLabelExpression(A1, "x");
conf.setCapacityByLabel(A1, "x", 100);
conf.setCapacityByLabel(A1, "y", 100);
conf.setQueues(B, new String[] { "b1", "b2" });
final String B1 = B + ".b1";
conf.setCapacityByLabel(B1, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B1, 50);
conf.setAccessibleNodeLabels(B1, RMNodeLabelsManager.EMPTY_STRING_SET);
final String B2 = B + ".b2";
conf.setCapacityByLabel(B2, RMNodeLabelsManager.NO_LABEL, 50);
conf.setMaximumCapacity(B2, 50);
conf.setAccessibleNodeLabels(B2, toSet("y", "z"));
conf.setCapacityByLabel(B2, "y", 100);
conf.setCapacityByLabel(B2, "z", 100);
return conf;
}
/**
* This is an identical test of
* @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()}
* The only difference is, instead of using label, it uses weight mode
* @throws Exception
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightOnly() throws Exception {
internalTestContainerAlloationWithNodeLabel(
getCSConfWithQueueLabelsWeightOnly(conf));
}
/**
* This is an identical test of
* @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()}
* The only difference is, instead of using label, it uses weight mode:
* Parent uses weight, child uses percent
* @throws Exception
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed1() throws Exception {
internalTestContainerAlloationWithNodeLabel(
getCSConfWithLabelsParentUseWeightChildUsePct(conf));
}
/**
* This is an identical test of
* @see {@link TestNodeLabelContainerAllocation#testContainerAllocateWithComplexLabels()}
* The only difference is, instead of using label, it uses weight mode:
* Parent uses percent, child uses weight
* @throws Exception
*/
@Test(timeout = 300000)
public void testContainerAllocateWithComplexLabelsWeightAndPercentMixed2() throws Exception {
internalTestContainerAlloationWithNodeLabel(
getCSConfWithLabelsParentUsePctChildUseWeight(conf));
}
private void internalTestContainerAlloationWithNodeLabel(Configuration csConf)
throws Exception {
/*
* Queue structure:
* root (*)
* ________________
* / \
* a x(100%), y(50%) b y(50%), z(100%)
* ________________ ______________
* / / \
* a1 (x,y) b1(no) b2(y,z)
* 100% y = 100%, z = 100%
*
* Node structure:
* h1 : x
* h2 : y
* h3 : y
* h4 : z
* h5 : NO
*
* Total resource:
* x: 4G
* y: 6G
* z: 2G
* *: 2G
*
* Resource of
* a1: x=4G, y=3G, NO=0.2G
* b1: NO=0.9G (max=1G)
* b2: y=3, z=2G, NO=0.9G (max=1G)
*
* Each node can only allocate two containers
*/
// set node -> label
mgr.addToCluserNodeLabelsWithDefaultExclusivity(ImmutableSet.of("x", "y", "z"));
mgr.addLabelsToNode(ImmutableMap.of(NodeId.newInstance("h1", 0),
toSet("x"), NodeId.newInstance("h2", 0), toSet("y"),
NodeId.newInstance("h3", 0), toSet("y"), NodeId.newInstance("h4", 0),
toSet("z"), NodeId.newInstance("h5", 0),
RMNodeLabelsManager.EMPTY_STRING_SET));
// inject node label manager
MockRM rm1 = new MockRM(csConf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return mgr;
}
};
rm1.getRMContext().setNodeLabelManager(mgr);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 2048);
MockNM nm2 = rm1.registerNode("h2:1234", 2048);
MockNM nm3 = rm1.registerNode("h3:1234", 2048);
MockNM nm4 = rm1.registerNode("h4:1234", 2048);
MockNM nm5 = rm1.registerNode("h5:1234", 2048);
ContainerId containerId;
// launch an app to queue a1 (label = x), and check all container will
// be allocated in h1
MockRMAppSubmissionData data2 =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app1 = MockRMAppSubmitter.submit(rm1, data2);
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// request a container (label = y). can be allocated on nm2
am1.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
containerId =
ContainerId.newContainerId(am1.getApplicationAttemptId(), 2L);
Assert.assertTrue(rm1.waitForState(nm2, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am1.getApplicationAttemptId(), containerId, rm1,
"h2");
// launch an app to queue b1 (label = y), and check all container will
// be allocated in h5
MockRMAppSubmissionData data1 =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b1")
.withUnmanagedAM(false)
.build();
RMApp app2 = MockRMAppSubmitter.submit(rm1, data1);
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm5);
// request a container for AM, will succeed
// and now b1's queue capacity will be used, cannot allocate more containers
// (Maximum capacity reached)
am2.allocate("*", 1024, 1, new ArrayList<ContainerId>());
containerId = ContainerId.newContainerId(am2.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm4, containerId,
RMContainerState.ALLOCATED));
Assert.assertFalse(rm1.waitForState(nm5, containerId,
RMContainerState.ALLOCATED));
// launch an app to queue b2
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(1024, rm1)
.withAppName("app")
.withUser("user")
.withAcls(null)
.withQueue("b2")
.withUnmanagedAM(false)
.build();
RMApp app3 = MockRMAppSubmitter.submit(rm1, data);
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, nm5);
// request a container. try to allocate on nm1 (label = x) and nm3 (label =
// y,z). Will successfully allocate on nm3
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "y");
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 2);
Assert.assertFalse(rm1.waitForState(nm1, containerId,
RMContainerState.ALLOCATED));
Assert.assertTrue(rm1.waitForState(nm3, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h3");
// try to allocate container (request label = z) on nm4 (label = y,z).
// Will successfully allocate on nm4 only.
am3.allocate("*", 1024, 1, new ArrayList<ContainerId>(), "z");
containerId = ContainerId.newContainerId(am3.getApplicationAttemptId(), 3L);
Assert.assertTrue(rm1.waitForState(nm4, containerId,
RMContainerState.ALLOCATED));
checkTaskContainersHost(am3.getApplicationAttemptId(), containerId, rm1,
"h4");
rm1.close();
}
private void checkTaskContainersHost(ApplicationAttemptId attemptId,
ContainerId containerId, ResourceManager rm, String host) {
YarnScheduler scheduler = rm.getRMContext().getScheduler();
SchedulerAppReport appReport = scheduler.getSchedulerAppInfo(attemptId);
Assert.assertTrue(appReport.getLiveContainers().size() > 0);
for (RMContainer c : appReport.getLiveContainers()) {
if (c.getContainerId().equals(containerId)) {
Assert.assertEquals(host, c.getAllocatedNode().getHost());
}
}
}
}

View File

@ -3677,11 +3677,13 @@ public class TestLeafQueue {
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Manipulate queue 'a'
// Manipulate queue 'b'
LeafQueue b = stubLeafQueue((LeafQueue) queues.get(B));
assertEquals(0.1f, b.getMaxAMResourcePerQueuePercent(), 1e-3f);
assertEquals(b.calculateAndGetAMResourceLimit(),
Resources.createResource(159 * GB, 1));
// Queue b has 100 * 16 = 1600 GB effective usable resource, so the
// AM limit is 1600 GB * 0.1 * 0.99 = 162816 MB
assertEquals(Resources.createResource(162816, 1),
b.calculateAndGetAMResourceLimit());
csConf.setFloat(
CapacitySchedulerConfiguration.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
@ -4748,6 +4750,9 @@ public class TestLeafQueue {
leafQueueName, cs.getRootQueue(),
null);
leafQueue.updateClusterResource(Resource.newInstance(0, 0),
new ResourceLimits(Resource.newInstance(0, 0)));
assertEquals(30, leafQueue.getNodeLocalityDelay());
assertEquals(20, leafQueue.getMaxApplications());
assertEquals(2, leafQueue.getMaxApplicationsPerUser());

View File

@ -31,6 +31,7 @@ import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
import java.io.IOException;
import java.util.List;
import org.slf4j.Logger;
@ -377,7 +378,7 @@ public class TestParentQueue {
CapacitySchedulerQueueManager.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT, queues, queues,
TestUtils.spyHook);
} catch (IllegalArgumentException ie) {
} catch (IOException ie) {
exceptionOccurred = true;
}
if (!exceptionOccurred) {
@ -647,7 +648,7 @@ public class TestParentQueue {
reset(a); reset(b); reset(c);
}
@Test (expected=IllegalArgumentException.class)
@Test (expected=IOException.class)
public void testQueueCapacitySettingChildZero() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
@ -663,7 +664,7 @@ public class TestParentQueue {
TestUtils.spyHook);
}
@Test (expected=IllegalArgumentException.class)
@Test (expected=IOException.class)
public void testQueueCapacitySettingParentZero() throws Exception {
// Setup queue configs
setupMultiLevelQueues(csConf);
@ -695,7 +696,7 @@ public class TestParentQueue {
TestUtils.spyHook);
}
@Test(expected = IllegalArgumentException.class)
@Test(expected = IOException.class)
public void testQueueCapacitySettingParentZeroChildren50pctZeroSumAllowed()
throws Exception {
// Setup queue configs

View File

@ -47,7 +47,9 @@ public class TestQueueCapacities {
{ "AbsoluteMaximumCapacity" },
{ "MaxAMResourcePercentage" },
{ "ReservedCapacity" },
{ "AbsoluteReservedCapacity" }});
{ "AbsoluteReservedCapacity" },
{ "Weight" },
{ "NormalizedWeight" }});
}
public TestQueueCapacities(String suffix) {
@ -105,9 +107,6 @@ public class TestQueueCapacities {
private void internalTestModifyAndRead(String label) throws Exception {
QueueCapacities qc = new QueueCapacities(false);
// First get returns 0 always
Assert.assertEquals(0f, get(qc, suffix, label), 1e-8);
// Set to 1, and check
set(qc, suffix, label, 1f);
Assert.assertEquals(1f, get(qc, suffix, label), 1e-8);
@ -117,15 +116,19 @@ public class TestQueueCapacities {
Assert.assertEquals(2f, get(qc, suffix, label), 1e-8);
}
void check(int mem, int cpu, Resource res) {
Assert.assertEquals(mem, res.getMemorySize());
Assert.assertEquals(cpu, res.getVirtualCores());
}
@Test
public void testModifyAndRead() throws Exception {
LOG.info("Test - " + suffix);
internalTestModifyAndRead(null);
internalTestModifyAndRead("label");
}
@Test
public void testDefaultValues() {
QueueCapacities qc = new QueueCapacities(false);
Assert.assertEquals(-1, qc.getWeight(""), 1e-6);
Assert.assertEquals(-1, qc.getWeight("x"), 1e-6);
Assert.assertEquals(0, qc.getCapacity(""), 1e-6);
Assert.assertEquals(0, qc.getCapacity("x"), 1e-6);
}
}

View File

@ -1143,6 +1143,59 @@ public class TestQueueParsing {
ServiceOperations.stopQuietly(capacityScheduler);
}
@Test(timeout = 60000)
public void testQueueCapacityWithWeight() throws Exception {
YarnConfiguration config = new YarnConfiguration();
nodeLabelManager = new NullRMNodeLabelsManager();
nodeLabelManager.init(config);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
CapacitySchedulerConfiguration conf =
new CapacitySchedulerConfiguration(config);
// Define top-level queues
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" });
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "x", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "y", 100);
conf.setLabeledQueueWeight(CapacitySchedulerConfiguration.ROOT, "z", 100);
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
conf.setNonLabeledQueueWeight(A, 100);
conf.setAccessibleNodeLabels(A, ImmutableSet.of("x", "y", "z"));
conf.setLabeledQueueWeight(A, "x", 100);
conf.setLabeledQueueWeight(A, "y", 100);
conf.setLabeledQueueWeight(A, "z", 70);
MockRM rm = null;
try {
rm = new MockRM(conf) {
@Override
public RMNodeLabelsManager createNodeLabelManager() {
return nodeLabelManager;
}
};
} finally {
IOUtils.closeStream(rm);
}
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "x", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "y", 1f);
verifyQueueAbsCapacity(rm, CapacitySchedulerConfiguration.ROOT, "z", 1f);
verifyQueueAbsCapacity(rm, A, "", 1f);
verifyQueueAbsCapacity(rm, A, "x", 1f);
verifyQueueAbsCapacity(rm, A, "y", 1f);
verifyQueueAbsCapacity(rm, A, "z", 1f);
}
private void verifyQueueAbsCapacity(MockRM rm, String queuePath, String label,
float expectedAbsCapacity) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
CSQueue queue = cs.getQueue(queuePath);
Assert.assertEquals(expectedAbsCapacity,
queue.getQueueCapacities().getAbsoluteCapacity(label), 1e-6);
}
private void checkEqualsToQueueSet(List<CSQueue> queues, String[] queueNames) {
Set<String> existedQueues = new HashSet<>();
for (CSQueue q : queues) {

View File

@ -28,7 +28,9 @@ import java.io.IOException;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.preemption.PreemptionManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -49,9 +51,10 @@ public class TestReservationQueue {
private final ResourceCalculator resourceCalculator =
new DefaultResourceCalculator();
private ReservationQueue autoCreatedLeafQueue;
private PlanQueue planQueue;
@Before
public void setup() throws IOException {
public void setup() throws IOException, SchedulerDynamicEditException {
// setup a context / conf
csConf = new CapacitySchedulerConfiguration();
@ -66,12 +69,14 @@ public class TestReservationQueue {
when(csContext.getClusterResource()).thenReturn(
Resources.createResource(100 * 16 * GB, 100 * 32));
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getPreemptionManager()).thenReturn(new PreemptionManager());
RMContext mockRMContext = TestUtils.getMockRMContext();
when(csContext.getRMContext()).thenReturn(mockRMContext);
// create a queue
PlanQueue pq = new PlanQueue(csContext, "root", null, null);
autoCreatedLeafQueue = new ReservationQueue(csContext, "a", pq);
planQueue = new PlanQueue(csContext, "root", null, null);
autoCreatedLeafQueue = new ReservationQueue(csContext, "a", planQueue);
planQueue.addChildQueue(autoCreatedLeafQueue);
}
private void validateAutoCreatedLeafQueue(double capacity) {
@ -83,9 +88,14 @@ public class TestReservationQueue {
@Test
public void testAddSubtractCapacity() throws Exception {
// verify that setting, adding, subtracting capacity works
autoCreatedLeafQueue.setCapacity(1.0F);
autoCreatedLeafQueue.setMaxCapacity(1.0F);
planQueue.updateClusterResource(
Resources.createResource(100 * 16 * GB, 100 * 32),
new ResourceLimits(Resources.createResource(100 * 16 * GB, 100 * 32)));
validateAutoCreatedLeafQueue(1);
autoCreatedLeafQueue.setEntitlement(new QueueEntitlement(0.9f, 1f));
validateAutoCreatedLeafQueue(0.9);

View File

@ -1027,7 +1027,7 @@ public class TestRMWebServices extends JerseyTestBase {
Assert.assertEquals(Status.BAD_REQUEST
.getStatusCode(), response.getStatus());
Assert.assertTrue(response.getEntity().toString()
.contains("Illegal capacity of 0.5 for children of queue"));
.contains("IOException"));
}
@Test

View File

@ -451,8 +451,6 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
Assert.fail("Unexpected partition" + partitionName);
}
}
} else if (queueChildElem.getTagName().equals("resources")) {
verifyResourceUsageInfoXML(queueChildElem);
}
}
assertEquals("Node Labels are not matching", LABEL_LX,
@ -594,16 +592,12 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
for (int i = 0; i < queuesArray.length(); i++) {
JSONObject queueJson = queuesArray.getJSONObject(i);
String queue = queueJson.getString("queueName");
assertEquals("Partition resourceInfo is wrong", 1,
queueJson.getJSONObject("resources")
.getJSONArray(RESOURCE_USAGES_BY_PARTITION).length());
JSONArray resourceUsageByPartition = queueJson.getJSONObject("resources")
.getJSONArray(RESOURCE_USAGES_BY_PARTITION);
JSONObject resourcesJsonObject = queueJson.getJSONObject("resources");
JSONArray partitionsResourcesArray =
resourcesJsonObject.getJSONArray("resourceUsagesByPartition");
assertEquals("incorrect number of elements", 1,
partitionsResourcesArray.length());
resourcesJsonObject.getJSONArray(RESOURCE_USAGES_BY_PARTITION);
capacitiesJsonObject = queueJson.getJSONObject(CAPACITIES);
partitionsCapsArray =
@ -620,6 +614,8 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
verifyPartitionCapacityInfoJson(partitionInfo, 30, 0, 50, 30, 0, 50);
assertEquals("incorrect number of elements", 7,
partitionsResourcesArray.getJSONObject(0).length());
assertEquals("incorrect number of objects", 1,
resourceUsageByPartition.length());
break;
case QUEUE_B:
assertEquals("Invalid default Label expression", LABEL_LX,
@ -629,6 +625,8 @@ public class TestRMWebServicesForCSWithPartitions extends JerseyTestBase {
verifyAccesibleNodeLabels(queueJson, ImmutableSet.of(LABEL_LX));
assertEquals("incorrect number of partitions", 2,
partitionsCapsArray.length());
assertEquals("incorrect number of objects", 2,
resourceUsageByPartition.length());
for (int j = 0; j < partitionsCapsArray.length(); j++) {
partitionInfo = partitionsCapsArray.getJSONObject(j);
partitionName = partitionInfo.getString("partitionName");