YARN-10910. AbstractCSQueue#setupQueueConfigs: Separate validation logic from initialization logic (#3407)
Co-authored-by: Benjamin Teke <bteke@cloudera.com>
This commit is contained in:
parent
29a6f141d4
commit
b229e5a345
@ -114,8 +114,6 @@ public abstract class AbstractCSQueue implements CSQueue {
|
||||
// Track resource usage-by-label like used-resource/pending-resource, etc.
|
||||
volatile ResourceUsage queueUsage;
|
||||
|
||||
private final boolean fullPathQueueNamingPolicy = false;
|
||||
|
||||
// Track capacities like
|
||||
// used-capacity/abs-used-capacity/capacity/abs-capacity,
|
||||
// etc.
|
||||
@ -275,9 +273,6 @@ public String getQueueShortName() {
|
||||
|
||||
@Override
|
||||
public String getQueueName() {
|
||||
if (fullPathQueueNamingPolicy) {
|
||||
return queuePath;
|
||||
}
|
||||
return queueName;
|
||||
}
|
||||
|
||||
@ -366,105 +361,49 @@ protected void setupQueueConfigs(Resource clusterResource,
|
||||
if (isDynamicQueue() || this instanceof AbstractAutoCreatedLeafQueue) {
|
||||
setDynamicQueueProperties(configuration);
|
||||
}
|
||||
// get labels
|
||||
this.accessibleLabels =
|
||||
configuration.getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression =
|
||||
configuration.getDefaultNodeLabelExpression(
|
||||
getQueuePath());
|
||||
this.resourceTypes = new HashSet<String>();
|
||||
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
|
||||
resourceTypes.add(type.toString().toLowerCase());
|
||||
}
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
// Collect and set the Node label configuration
|
||||
initializeNodeLabels(configuration);
|
||||
|
||||
// inherit from parent if labels not set
|
||||
if (this.defaultLabelExpression == null && parent != null
|
||||
&& this.accessibleLabels.containsAll(
|
||||
parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
if (csContext.getCapacitySchedulerQueueManager() != null
|
||||
&& csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels() != null) {
|
||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
||||
} else {
|
||||
// Fallback to suboptimal but correct logic
|
||||
this.configuredNodeLabels = csContext.getConfiguration()
|
||||
.getConfiguredNodeLabels(queuePath);
|
||||
}
|
||||
|
||||
// After we setup labels, we can setup capacities
|
||||
// Initialize the queue capacities
|
||||
setupConfigurableCapacities(configuration);
|
||||
updateAbsoluteCapacities();
|
||||
|
||||
// Also fetch minimum/maximum resource constraint for this queue if
|
||||
// configured.
|
||||
// Fetch minimum/maximum resource limits for this queue if
|
||||
// configured
|
||||
capacityConfigType = CapacityConfigType.NONE;
|
||||
this.resourceTypes = new HashSet<>();
|
||||
for (AbsoluteResourceType type : AbsoluteResourceType.values()) {
|
||||
resourceTypes.add(type.toString().toLowerCase());
|
||||
}
|
||||
updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
|
||||
|
||||
// Setup queue's maximumAllocation respecting the global setting
|
||||
// and queue setting
|
||||
// Setup queue's maximumAllocation respecting the global
|
||||
// and the queue settings
|
||||
setupMaximumAllocation(configuration);
|
||||
|
||||
// Max parallel apps
|
||||
int queueMaxParallelApps =
|
||||
configuration.getMaxParallelAppsForQueue(getQueuePath());
|
||||
setMaxParallelApps(queueMaxParallelApps);
|
||||
|
||||
// initialized the queue state based on previous state, configured state
|
||||
// and its parent state.
|
||||
QueueState previous = getState();
|
||||
QueueState configuredState = configuration
|
||||
.getConfiguredState(getQueuePath());
|
||||
QueueState parentState = (parent == null) ? null : parent.getState();
|
||||
initializeQueueState(previous, configuredState, parentState);
|
||||
// Initialize the queue state based on previous state, configured state
|
||||
// and its parent state
|
||||
initializeQueueState(configuration);
|
||||
|
||||
authorizer = YarnAuthorizationProvider.getInstance(configuration);
|
||||
|
||||
this.acls = configuration.getAcls(getQueuePath());
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
this, labelManager, null);
|
||||
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when we not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null && !parent
|
||||
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(
|
||||
RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else{
|
||||
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=[" + StringUtils
|
||||
.join(diff, ",") + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
this.userWeights = getUserWeightsFromHierarchy(configuration);
|
||||
|
||||
this.reservationsContinueLooking =
|
||||
configuration.getReservationContinueLook();
|
||||
|
||||
// Update metrics
|
||||
CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource,
|
||||
this, labelManager, null);
|
||||
|
||||
// Store preemption settings
|
||||
this.preemptionDisabled = isQueueHierarchyPreemptionDisabled(this,
|
||||
configuration);
|
||||
this.intraQueuePreemptionDisabledInHierarchy =
|
||||
isIntraQueueHierarchyPreemptionDisabled(this, configuration);
|
||||
|
||||
this.priority = configuration.getQueuePriority(
|
||||
getQueuePath());
|
||||
|
||||
@ -472,20 +411,8 @@ protected void setupQueueConfigs(Resource clusterResource,
|
||||
setMultiNodeSortingPolicyName(
|
||||
configuration.getMultiNodesSortingAlgorithmPolicy(getQueuePath()));
|
||||
|
||||
this.userWeights = getUserWeightsFromHierarchy(configuration);
|
||||
|
||||
maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration);
|
||||
defaultApplicationLifetime =
|
||||
getInheritedDefaultAppLifetime(this, configuration,
|
||||
maxApplicationLifetime);
|
||||
if (maxApplicationLifetime > 0 &&
|
||||
defaultApplicationLifetime > maxApplicationLifetime) {
|
||||
throw new YarnRuntimeException(
|
||||
"Default lifetime " + defaultApplicationLifetime
|
||||
+ " can't exceed maximum lifetime " + maxApplicationLifetime);
|
||||
}
|
||||
defaultApplicationLifetime = defaultApplicationLifetime > 0
|
||||
? defaultApplicationLifetime : maxApplicationLifetime;
|
||||
// Setup application related limits
|
||||
setupApplicationLimits(configuration);
|
||||
} finally {
|
||||
writeLock.unlock();
|
||||
}
|
||||
@ -517,6 +444,78 @@ protected void setDynamicQueueProperties(
|
||||
}
|
||||
}
|
||||
|
||||
private void initializeNodeLabels(
|
||||
CapacitySchedulerConfiguration configuration) throws IOException {
|
||||
// Collect and store labels
|
||||
this.accessibleLabels =
|
||||
configuration.getAccessibleNodeLabels(getQueuePath());
|
||||
this.defaultLabelExpression =
|
||||
configuration.getDefaultNodeLabelExpression(
|
||||
getQueuePath());
|
||||
|
||||
// Inherit labels from parent if not set
|
||||
if (this.accessibleLabels == null && parent != null) {
|
||||
this.accessibleLabels = parent.getAccessibleNodeLabels();
|
||||
}
|
||||
|
||||
// If the accessible labels is not null and the queue has a parent with a
|
||||
// similar set of labels copy the defaultNodeLabelExpression from the parent
|
||||
if (this.accessibleLabels != null && parent != null
|
||||
&& this.defaultLabelExpression == null &&
|
||||
this.accessibleLabels.containsAll(parent.getAccessibleNodeLabels())) {
|
||||
this.defaultLabelExpression = parent.getDefaultNodeLabelExpression();
|
||||
}
|
||||
|
||||
if (csContext.getCapacitySchedulerQueueManager() != null
|
||||
&& csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels() != null) {
|
||||
this.configuredNodeLabels = csContext.getCapacitySchedulerQueueManager()
|
||||
.getConfiguredNodeLabels().getLabelsByQueue(getQueuePath());
|
||||
} else {
|
||||
// Fallback to suboptimal but correct logic
|
||||
this.configuredNodeLabels = csContext.getConfiguration()
|
||||
.getConfiguredNodeLabels(queuePath);
|
||||
}
|
||||
|
||||
// Validate the initialized settings
|
||||
validateNodeLabels();
|
||||
}
|
||||
|
||||
private void validateNodeLabels() throws IOException {
|
||||
// Check if labels of this queue is a subset of parent queue, only do this
|
||||
// when the queue in question is not root
|
||||
if (parent != null && parent.getParent() != null) {
|
||||
if (parent.getAccessibleNodeLabels() != null && !parent
|
||||
.getAccessibleNodeLabels().contains(RMNodeLabelsManager.ANY)) {
|
||||
// if parent isn't "*", child shouldn't be "*" too
|
||||
if (this.getAccessibleNodeLabels().contains(
|
||||
RMNodeLabelsManager.ANY)) {
|
||||
throw new IOException("Parent's accessible queue is not ANY(*), "
|
||||
+ "but child's accessible queue is *");
|
||||
} else{
|
||||
Set<String> diff = Sets.difference(this.getAccessibleNodeLabels(),
|
||||
parent.getAccessibleNodeLabels());
|
||||
if (!diff.isEmpty()) {
|
||||
throw new IOException(
|
||||
"Some labels of child queue is not a subset "
|
||||
+ "of parent queue, these labels=[" + StringUtils
|
||||
.join(diff, ",") + "]");
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void setupApplicationLimits(CapacitySchedulerConfiguration configuration) {
|
||||
// Store max parallel apps property
|
||||
this.maxParallelApps = configuration.getMaxParallelAppsForQueue(getQueuePath());
|
||||
|
||||
maxApplicationLifetime = getInheritedMaxAppLifetime(this, configuration);
|
||||
defaultApplicationLifetime =
|
||||
getInheritedDefaultAppLifetime(this, configuration,
|
||||
maxApplicationLifetime);
|
||||
}
|
||||
|
||||
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
|
||||
String myQueuePath = getQueuePath();
|
||||
/* YARN-10869: When using AutoCreatedLeafQueues, the passed configuration
|
||||
@ -705,8 +704,12 @@ public Resource getEffectiveMaxCapacityDown(String label, Resource factor) {
|
||||
minimumAllocation);
|
||||
}
|
||||
|
||||
private void initializeQueueState(QueueState previousState,
|
||||
QueueState configuredState, QueueState parentState) {
|
||||
private void initializeQueueState(CapacitySchedulerConfiguration configuration) {
|
||||
QueueState previousState = getState();
|
||||
QueueState configuredState = configuration
|
||||
.getConfiguredState(getQueuePath());
|
||||
QueueState parentState = (parent == null) ? null : parent.getState();
|
||||
|
||||
// verify that we can not any value for State other than RUNNING/STOPPED
|
||||
if (configuredState != null && configuredState != QueueState.RUNNING
|
||||
&& configuredState != QueueState.STOPPED) {
|
||||
@ -1013,17 +1016,25 @@ private long getInheritedDefaultAppLifetime(CSQueue q,
|
||||
// lifetime. Otherwise, use current queue's max lifetime value for its
|
||||
// default lifetime.
|
||||
if (defaultAppLifetimeWasSpecifiedInConfig) {
|
||||
if (parentsDefaultAppLifetime <= myMaxAppLifetime) {
|
||||
defaultAppLifetime = parentsDefaultAppLifetime;
|
||||
} else {
|
||||
defaultAppLifetime = myMaxAppLifetime;
|
||||
}
|
||||
defaultAppLifetime =
|
||||
Math.min(parentsDefaultAppLifetime, myMaxAppLifetime);
|
||||
} else {
|
||||
// Default app lifetime value was not set anywhere in this queue's
|
||||
// hierarchy. Use current queue's max lifetime as its default.
|
||||
defaultAppLifetime = myMaxAppLifetime;
|
||||
}
|
||||
} // else if >= 0, default lifetime was set at this level. Just use it.
|
||||
|
||||
if (myMaxAppLifetime > 0 &&
|
||||
defaultAppLifetime > myMaxAppLifetime) {
|
||||
throw new YarnRuntimeException(
|
||||
"Default lifetime " + defaultAppLifetime
|
||||
+ " can't exceed maximum lifetime " + myMaxAppLifetime);
|
||||
}
|
||||
|
||||
if (defaultAppLifetime <= 0) {
|
||||
defaultAppLifetime = myMaxAppLifetime;
|
||||
}
|
||||
return defaultAppLifetime;
|
||||
}
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user