From b229e5a345bf7b79bc48d51861fbe7f4246bab9e Mon Sep 17 00:00:00 2001 From: Benjamin Teke Date: Fri, 10 Sep 2021 16:48:58 +0200 Subject: [PATCH] YARN-10910. AbstractCSQueue#setupQueueConfigs: Separate validation logic from initialization logic (#3407) Co-authored-by: Benjamin Teke --- .../scheduler/capacity/AbstractCSQueue.java | 215 +++++++++--------- 1 file changed, 113 insertions(+), 102 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 2da52d55b7..0abe7c266f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -114,8 +114,6 @@ public abstract class AbstractCSQueue implements CSQueue { // Track resource usage-by-label like used-resource/pending-resource, etc. volatile ResourceUsage queueUsage; - private final boolean fullPathQueueNamingPolicy = false; - // Track capacities like // used-capacity/abs-used-capacity/capacity/abs-capacity, // etc. @@ -275,9 +273,6 @@ public String getQueueShortName() { @Override public String getQueueName() { - if (fullPathQueueNamingPolicy) { - return queuePath; - } return queueName; } @@ -366,105 +361,49 @@ protected void setupQueueConfigs(Resource clusterResource, if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) { setDynamicQueueProperties(configuration); } - // get labels - this.accessibleLabels = - configuration.getAccessibleNodeLabels(getQueuePath()); - this.defaultLabelExpression = - configuration.getDefaultNodeLabelExpression( - getQueuePath()); - this.resourceTypes = new HashSet(); - for (AbsoluteResourceType type : AbsoluteResourceType.values()) { - resourceTypes.add(type.toString().toLowerCase()); - } - // inherit from parent if labels not set - if (this.accessibleLabels == null && parent != null) { - this.accessibleLabels = parent.getAccessibleNodeLabels(); - } + // Collect and set the Node label configuration + initializeNodeLabels(configuration); - // inherit from parent if labels not set - if (this.defaultLabelExpression == null && parent != null - && this.accessibleLabels.containsAll( - parent.getAccessibleNodeLabels())) { - this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); - } - - if (csContext.getCapacitySchedulerQueueManager() != null - && csContext.getCapacitySchedulerQueueManager() - .getConfiguredNodeLabels() != null) { - this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager() - .getConfiguredNodeLabels().getLabelsByQueue(getQueuePath()); - } else { - // Fallback to suboptimal but correct logic - this.configuredNodeLabels = csContext.getConfiguration() - .getConfiguredNodeLabels(queuePath); - } - - // After we setup labels, we can setup capacities + // Initialize the queue capacities setupConfigurableCapacities(configuration); updateAbsoluteCapacities(); - // Also fetch minimum/maximum resource constraint for this queue if - // configured. + // Fetch minimum/maximum resource limits for this queue if + // configured capacityConfigType = CapacityConfigType.NONE; + this.resourceTypes = new HashSet<>(); + for (AbsoluteResourceType type : AbsoluteResourceType.values()) { + resourceTypes.add(type.toString().toLowerCase()); + } updateConfigurableResourceRequirement(getQueuePath(), clusterResource); - // Setup queue's maximumAllocation respecting the global setting - // and queue setting + // Setup queue's maximumAllocation respecting the global + // and the queue settings setupMaximumAllocation(configuration); - // Max parallel apps - int queueMaxParallelApps = - configuration.getMaxParallelAppsForQueue(getQueuePath()); - setMaxParallelApps(queueMaxParallelApps); - - // initialized the queue state based on previous state, configured state - // and its parent state. - QueueState previous = getState(); - QueueState configuredState = configuration - .getConfiguredState(getQueuePath()); - QueueState parentState = (parent == null) ? null : parent.getState(); - initializeQueueState(previous, configuredState, parentState); + // Initialize the queue state based on previous state, configured state + // and its parent state + initializeQueueState(configuration); authorizer = YarnAuthorizationProvider.getInstance(configuration); this.acls = configuration.getAcls(getQueuePath()); - // Update metrics - CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, - this, labelManager, null); - - // Check if labels of this queue is a subset of parent queue, only do this - // when we not root - if (parent != null && parent.getParent() != null) { - if (parent.getAccessibleNodeLabels() != null && !parent - .getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { - // if parent isn't "*", child shouldn't be "*" too - if (this.getAccessibleNodeLabels().contains( - RMNodeLabelsManager.ANY)) { - throw new IOException("Parent's accessible queue is not ANY(*), " - + "but child's accessible queue is *"); - } else{ - Set diff = Sets.difference(this.getAccessibleNodeLabels(), - parent.getAccessibleNodeLabels()); - if (!diff.isEmpty()) { - throw new IOException( - "Some labels of child queue is not a subset " - + "of parent queue, these labels=[" + StringUtils - .join(diff, ",") + "]"); - } - } - } - } + this.userWeights = getUserWeightsFromHierarchy(configuration); this.reservationsContinueLooking = configuration.getReservationContinueLook(); + // Update metrics + CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, + this, labelManager, null); + + // Store preemption settings this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this, configuration); this.intraQueuePreemptionDisabledInHierarchy = isIntraQueueHierarchyPreemptionDisabled(this, configuration); - this.priority = configuration.getQueuePriority( getQueuePath()); @@ -472,20 +411,8 @@ protected void setupQueueConfigs(Resource clusterResource, setMultiNodeSortingPolicyName( configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath())); - this.userWeights = getUserWeightsFromHierarchy(configuration); - - maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration); - defaultApplicationLifetime = - getInheritedDefaultAppLifetime(this, configuration, - maxApplicationLifetime); - if (maxApplicationLifetime > 0 && - defaultApplicationLifetime > maxApplicationLifetime) { - throw new YarnRuntimeException( - "Default lifetime " + defaultApplicationLifetime - + " can't exceed maximum lifetime " + maxApplicationLifetime); - } - defaultApplicationLifetime = defaultApplicationLifetime > 0 - ? defaultApplicationLifetime : maxApplicationLifetime; + // Setup application related limits + setupApplicationLimits(configuration); } finally { writeLock.unlock(); } @@ -517,6 +444,78 @@ protected void setDynamicQueueProperties( } } + private void initializeNodeLabels( + CapacitySchedulerConfiguration configuration) throws IOException { + // Collect and store labels + this.accessibleLabels = + configuration.getAccessibleNodeLabels(getQueuePath()); + this.defaultLabelExpression = + configuration.getDefaultNodeLabelExpression( + getQueuePath()); + + // Inherit labels from parent if not set + if (this.accessibleLabels == null && parent != null) { + this.accessibleLabels = parent.getAccessibleNodeLabels(); + } + + // If the accessible labels is not null and the queue has a parent with a + // similar set of labels copy the defaultNodeLabelExpression from the parent + if (this.accessibleLabels != null && parent != null + && this.defaultLabelExpression == null && + this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) { + this.defaultLabelExpression = parent.getDefaultNodeLabelExpression(); + } + + if (csContext.getCapacitySchedulerQueueManager() != null + && csContext.getCapacitySchedulerQueueManager() + .getConfiguredNodeLabels() != null) { + this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager() + .getConfiguredNodeLabels().getLabelsByQueue(getQueuePath()); + } else { + // Fallback to suboptimal but correct logic + this.configuredNodeLabels = csContext.getConfiguration() + .getConfiguredNodeLabels(queuePath); + } + + // Validate the initialized settings + validateNodeLabels(); + } + + private void validateNodeLabels() throws IOException { + // Check if labels of this queue is a subset of parent queue, only do this + // when the queue in question is not root + if (parent != null && parent.getParent() != null) { + if (parent.getAccessibleNodeLabels() != null && !parent + .getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) { + // if parent isn't "*", child shouldn't be "*" too + if (this.getAccessibleNodeLabels().contains( + RMNodeLabelsManager.ANY)) { + throw new IOException("Parent's accessible queue is not ANY(*), " + + "but child's accessible queue is *"); + } else{ + Set diff = Sets.difference(this.getAccessibleNodeLabels(), + parent.getAccessibleNodeLabels()); + if (!diff.isEmpty()) { + throw new IOException( + "Some labels of child queue is not a subset " + + "of parent queue, these labels=[" + StringUtils + .join(diff, ",") + "]"); + } + } + } + } + } + + private void setupApplicationLimits(CapacitySchedulerConfiguration configuration) { + // Store max parallel apps property + this.maxParallelApps = configuration.getMaxParallelAppsForQueue(getQueuePath()); + + maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration); + defaultApplicationLifetime = + getInheritedDefaultAppLifetime(this, configuration, + maxApplicationLifetime); + } + private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) { String myQueuePath = getQueuePath(); /* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration @@ -705,8 +704,12 @@ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) { minimumAllocation); } - private void initializeQueueState(QueueState previousState, - QueueState configuredState, QueueState parentState) { + private void initializeQueueState(CapacitySchedulerConfiguration configuration) { + QueueState previousState = getState(); + QueueState configuredState = configuration + .getConfiguredState(getQueuePath()); + QueueState parentState = (parent == null) ? null : parent.getState(); + // verify that we can not any value for State other than RUNNING/STOPPED if (configuredState != null && configuredState != QueueState.RUNNING && configuredState != QueueState.STOPPED) { @@ -1013,17 +1016,25 @@ private long getInheritedDefaultAppLifetime(CSQueue q, // lifetime. Otherwise, use current queue's max lifetime value for its // default lifetime. if (defaultAppLifetimeWasSpecifiedInConfig) { - if (parentsDefaultAppLifetime <= myMaxAppLifetime) { - defaultAppLifetime = parentsDefaultAppLifetime; - } else { - defaultAppLifetime = myMaxAppLifetime; - } + defaultAppLifetime = + Math.min(parentsDefaultAppLifetime, myMaxAppLifetime); } else { // Default app lifetime value was not set anywhere in this queue's // hierarchy. Use current queue's max lifetime as its default. defaultAppLifetime = myMaxAppLifetime; } } // else if >= 0, default lifetime was set at this level. Just use it. + + if (myMaxAppLifetime > 0 && + defaultAppLifetime > myMaxAppLifetime) { + throw new YarnRuntimeException( + "Default lifetime " + defaultAppLifetime + + " can't exceed maximum lifetime " + myMaxAppLifetime); + } + + if (defaultAppLifetime <= 0) { + defaultAppLifetime = myMaxAppLifetime; + } return defaultAppLifetime; }