YARN-10915. AbstractCSQueue: Simplify complex logic in methods: deriveCapacityFromAbsoluteConfigurations and updateEffectiveResources (#3418)
Co-authored-by: Benjamin Teke <bteke@cloudera.com>
This commit is contained in:
parent
783d94f5cd
commit
5dc2f7b137
@ -1516,8 +1516,8 @@ protected void updateAbsoluteCapacities() {
|
||||
parentQueueCapacities, queueCapacities.getExistingNodeLabels());
|
||||
}
|
||||
|
||||
private Resource getMinResourceNormalized(String name,
|
||||
Map<String, Float> effectiveMinRatio, Resource minResource) {
|
||||
private Resource createNormalizedMinResource(Resource minResource,
|
||||
Map<String, Float> effectiveMinRatio) {
|
||||
Resource ret = Resource.newInstance(minResource);
|
||||
int maxLength = ResourceUtils.getNumberOfCountableResourceTypes();
|
||||
for (int i = 0; i < maxLength; i++) {
|
||||
@ -1527,18 +1527,35 @@ private Resource getMinResourceNormalized(String name,
|
||||
Float ratio = effectiveMinRatio.get(nResourceInformation.getName());
|
||||
if (ratio != null) {
|
||||
ret.setResourceValue(i,
|
||||
(long) (nResourceInformation.getValue() * ratio.floatValue()));
|
||||
(long) (nResourceInformation.getValue() * ratio));
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updating min resource for Queue: " + name + " as " + ret
|
||||
LOG.debug("Updating min resource for Queue: " + queuePath + " as " + ret
|
||||
.getResourceInformation(i) + ", Actual resource: "
|
||||
+ nResourceInformation.getValue() + ", ratio: " + ratio
|
||||
.floatValue());
|
||||
+ nResourceInformation.getValue() + ", ratio: " + ratio);
|
||||
}
|
||||
}
|
||||
}
|
||||
return ret;
|
||||
}
|
||||
|
||||
private Resource getOrInheritMaxResource(Resource resourceByLabel, String label) {
|
||||
Resource parentMaxResource =
|
||||
parent.getQueueResourceQuotas().getConfiguredMaxResource(label);
|
||||
if (parentMaxResource.equals(Resources.none())) {
|
||||
parentMaxResource =
|
||||
parent.getQueueResourceQuotas().getEffectiveMaxResource(label);
|
||||
}
|
||||
|
||||
Resource configuredMaxResource =
|
||||
getQueueResourceQuotas().getConfiguredMaxResource(label);
|
||||
if (configuredMaxResource.equals(Resources.none())) {
|
||||
return Resources.clone(parentMaxResource);
|
||||
}
|
||||
|
||||
return Resources.clone(Resources.min(resourceCalculator, resourceByLabel,
|
||||
configuredMaxResource, parentMaxResource));
|
||||
}
|
||||
|
||||
void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
|
||||
LeafQueue leafQueue) {
|
||||
int maxApplications = conf.getMaximumApplicationsPerQueue(queuePath);
|
||||
@ -1586,38 +1603,41 @@ void updateMaxAppRelatedField(CapacitySchedulerConfiguration conf,
|
||||
.getMaximumCapacity(maxLabel));
|
||||
}
|
||||
|
||||
private void deriveCapacityFromAbsoluteConfigurations(String label,
|
||||
Resource clusterResource, ResourceCalculator rc) {
|
||||
|
||||
/*
|
||||
* In case when queues are configured with absolute resources, it is better
|
||||
* to update capacity/max-capacity etc w.r.t absolute resource as well. In
|
||||
* case of computation, these values wont be used any more. However for
|
||||
* metrics and UI, its better these values are pre-computed here itself.
|
||||
*/
|
||||
|
||||
// 1. Update capacity as a float based on parent's minResource
|
||||
float f = rc.divide(clusterResource,
|
||||
void deriveCapacityFromAbsoluteConfigurations(String label,
|
||||
Resource clusterResource) {
|
||||
// Update capacity with a float calculated from the parent's minResources
|
||||
// and the recently changed queue minResources.
|
||||
// capacity = effectiveMinResource / {parent's effectiveMinResource}
|
||||
float result = resourceCalculator.divide(clusterResource,
|
||||
queueResourceQuotas.getEffectiveMinResource(label),
|
||||
parent.getQueueResourceQuotas().getEffectiveMinResource(label));
|
||||
queueCapacities.setCapacity(label, Float.isInfinite(f) ? 0 : f);
|
||||
queueCapacities.setCapacity(label,
|
||||
Float.isInfinite(result) ? 0 : result);
|
||||
|
||||
// 2. Update max-capacity as a float based on parent's maxResource
|
||||
f = rc.divide(clusterResource,
|
||||
// Update maxCapacity with a float calculated from the parent's maxResources
|
||||
// and the recently changed queue maxResources.
|
||||
// maxCapacity = effectiveMaxResource / parent's effectiveMaxResource
|
||||
result = resourceCalculator.divide(clusterResource,
|
||||
queueResourceQuotas.getEffectiveMaxResource(label),
|
||||
parent.getQueueResourceQuotas().getEffectiveMaxResource(label));
|
||||
queueCapacities.setMaximumCapacity(label, Float.isInfinite(f) ? 0 : f);
|
||||
queueCapacities.setMaximumCapacity(label,
|
||||
Float.isInfinite(result) ? 0 : result);
|
||||
|
||||
// 3. Update absolute capacity as a float based on parent's minResource and
|
||||
// cluster resource.
|
||||
// Update absolute capacity (as in fraction of the
|
||||
// whole cluster's resources) with a float calculated from the queue's
|
||||
// capacity and the parent's absoluteCapacity.
|
||||
// absoluteCapacity = capacity * parent's absoluteCapacity
|
||||
queueCapacities.setAbsoluteCapacity(label,
|
||||
queueCapacities.getCapacity(label) * parent.getQueueCapacities()
|
||||
.getAbsoluteCapacity(label));
|
||||
|
||||
// 4. Update absolute max-capacity as a float based on parent's maxResource
|
||||
// and cluster resource.
|
||||
// Update absolute maxCapacity (as in fraction of the
|
||||
// whole cluster's resources) with a float calculated from the queue's
|
||||
// maxCapacity and the parent's absoluteMaxCapacity.
|
||||
// absoluteMaxCapacity = maxCapacity * parent's absoluteMaxCapacity
|
||||
queueCapacities.setAbsoluteMaximumCapacity(label,
|
||||
queueCapacities.getMaximumCapacity(label) * parent.getQueueCapacities()
|
||||
queueCapacities.getMaximumCapacity(label) *
|
||||
parent.getQueueCapacities()
|
||||
.getAbsoluteMaximumCapacity(label));
|
||||
}
|
||||
|
||||
@ -1625,60 +1645,55 @@ void updateEffectiveResources(Resource clusterResource) {
|
||||
for (String label : configuredNodeLabels) {
|
||||
Resource resourceByLabel = labelManager.getResourceByLabel(label,
|
||||
clusterResource);
|
||||
Resource newEffectiveMinResource;
|
||||
Resource newEffectiveMaxResource;
|
||||
|
||||
Resource minResource = queueResourceQuotas.getConfiguredMinResource(
|
||||
label);
|
||||
|
||||
// Update effective resource (min/max) to each child queue.
|
||||
// Absolute and relative/weight mode needs different handling.
|
||||
if (getCapacityConfigType().equals(
|
||||
CapacityConfigType.ABSOLUTE_RESOURCE)) {
|
||||
newEffectiveMinResource = createNormalizedMinResource(
|
||||
queueResourceQuotas.getConfiguredMinResource(label),
|
||||
((ParentQueue) parent).getEffectiveMinRatioPerResource());
|
||||
|
||||
// Max resource of a queue should be the minimum of {parent's maxResources,
|
||||
// this queue's maxResources}. Both parent's maxResources and this queue's
|
||||
// maxResources can be configured. If this queue's maxResources is not
|
||||
// configured, inherit the value from the parent. If parent's
|
||||
// maxResources is not configured its inherited value must be collected.
|
||||
newEffectiveMaxResource =
|
||||
getOrInheritMaxResource(resourceByLabel, label);
|
||||
} else {
|
||||
newEffectiveMinResource = Resources
|
||||
.multiply(resourceByLabel,
|
||||
queueCapacities.getAbsoluteCapacity(label));
|
||||
newEffectiveMaxResource = Resources
|
||||
.multiply(resourceByLabel,
|
||||
queueCapacities.getAbsoluteMaximumCapacity(label));
|
||||
}
|
||||
|
||||
// Update the effective min
|
||||
queueResourceQuotas.setEffectiveMinResource(label,
|
||||
getMinResourceNormalized(queuePath,
|
||||
((ParentQueue) parent).getEffectiveMinRatioPerResource(),
|
||||
minResource));
|
||||
|
||||
// Max resource of a queue should be a minimum of {configuredMaxRes,
|
||||
// parentMaxRes}. parentMaxRes could be configured value. But if not
|
||||
// present could also be taken from effective max resource of parent.
|
||||
Resource parentMaxRes =
|
||||
parent.getQueueResourceQuotas().getConfiguredMaxResource(label);
|
||||
if (parent != null && parentMaxRes.equals(Resources.none())) {
|
||||
parentMaxRes =
|
||||
parent.getQueueResourceQuotas().getEffectiveMaxResource(label);
|
||||
}
|
||||
|
||||
// Minimum of {childMaxResource, parentMaxRes}. However if
|
||||
// childMaxResource is empty, consider parent's max resource alone.
|
||||
Resource childMaxResource =
|
||||
getQueueResourceQuotas().getConfiguredMaxResource(label);
|
||||
Resource effMaxResource = Resources.min(resourceCalculator,
|
||||
resourceByLabel, childMaxResource.equals(Resources.none()) ?
|
||||
parentMaxRes :
|
||||
childMaxResource, parentMaxRes);
|
||||
newEffectiveMinResource);
|
||||
queueResourceQuotas.setEffectiveMaxResource(label,
|
||||
Resources.clone(effMaxResource));
|
||||
|
||||
// In cases where we still need to update some units based on
|
||||
// percentage, we have to calculate percentage and update.
|
||||
ResourceCalculator rc = this.csContext.getResourceCalculator();
|
||||
deriveCapacityFromAbsoluteConfigurations(label, clusterResource, rc);
|
||||
// Re-visit max applications for a queue based on absolute capacity if
|
||||
// needed.
|
||||
} else{
|
||||
queueResourceQuotas.setEffectiveMinResource(label, Resources
|
||||
.multiply(resourceByLabel,
|
||||
queueCapacities.getAbsoluteCapacity(label)));
|
||||
queueResourceQuotas.setEffectiveMaxResource(label, Resources
|
||||
.multiply(resourceByLabel,
|
||||
queueCapacities.getAbsoluteMaximumCapacity(label)));
|
||||
}
|
||||
newEffectiveMaxResource);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Updating effective min resource for queue:" + queuePath
|
||||
+ " as effMinResource=" + queueResourceQuotas
|
||||
.getEffectiveMinResource(label)
|
||||
+ "and Updating effective max resource as effMaxResource="
|
||||
+ queueResourceQuotas.getEffectiveMaxResource(label));
|
||||
LOG.debug("Updating queue:" + queuePath
|
||||
+ " with effective minimum resource=" + newEffectiveMinResource
|
||||
+ "and effective maximum resource="
|
||||
+ newEffectiveMaxResource);
|
||||
}
|
||||
|
||||
if (getCapacityConfigType().equals(
|
||||
CapacityConfigType.ABSOLUTE_RESOURCE)) {
|
||||
/*
|
||||
* If the queues are configured with absolute resources, it is advised
|
||||
* to update capacity/max-capacity/etc. based on the newly calculated
|
||||
* resource values. These values won't be used for actual resource
|
||||
* distribution, however, for accurate metrics and the UI
|
||||
* they should be re-calculated.
|
||||
*/
|
||||
deriveCapacityFromAbsoluteConfigurations(label, clusterResource);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -74,7 +74,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSet;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.CandidateNodeSetUtils;
|
||||
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
|
||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@ -1326,14 +1325,13 @@ private void calculateEffectiveResourcesAndCapacity(String label,
|
||||
// resources, effective_min_resources will be same as configured
|
||||
// min_resources.
|
||||
Resource numeratorForMinRatio = null;
|
||||
ResourceCalculator rc = this.csContext.getResourceCalculator();
|
||||
if (getQueuePath().equals("root")) {
|
||||
if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(rc,
|
||||
if (!resourceByLabel.equals(Resources.none()) && Resources.lessThan(resourceCalculator,
|
||||
clusterResource, resourceByLabel, configuredMinResources)) {
|
||||
numeratorForMinRatio = resourceByLabel;
|
||||
}
|
||||
} else {
|
||||
if (Resources.lessThan(rc, clusterResource,
|
||||
if (Resources.lessThan(resourceCalculator, clusterResource,
|
||||
queueResourceQuotas.getEffectiveMinResource(label),
|
||||
configuredMinResources)) {
|
||||
numeratorForMinRatio = queueResourceQuotas
|
||||
|
Loading…
Reference in New Issue
Block a user