From 379baa5eb65a65df50461f337b88ff13f2134aeb Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 3 Mar 2022 16:44:12 +0100 Subject: [PATCH] YARN-10947. Simplify AbstractCSQueue#initializeQueueState. Contributed by Andras Gyori --- .../scheduler/capacity/AbstractCSQueue.java | 56 +---------- .../scheduler/capacity/CSQueueUtils.java | 14 +-- .../scheduler/capacity/QueueStateHelper.java | 98 +++++++++++++++++++ 3 files changed, 101 insertions(+), 67 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index e924932a36..b0ab336b7a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -349,7 +349,7 @@ protected void setupQueueConfigs(Resource clusterResource) throws // Initialize the queue state based on previous state, configured state // and its parent state - initializeQueueState(); + QueueStateHelper.setQueueState(this); authorizer = YarnAuthorizationProvider.getInstance(configuration); @@ -553,60 +553,6 @@ public QueueCapacityVector getConfiguredCapacityVector( return configuredCapacityVectors.get(label); } - private void initializeQueueState() { - QueueState previousState = getState(); - QueueState configuredState = queueContext.getConfiguration() - .getConfiguredState(getQueuePath()); - QueueState parentState = (parent == null) ? null : parent.getState(); - - // verify that we can not any value for State other than RUNNING/STOPPED - if (configuredState != null && configuredState != QueueState.RUNNING - && configuredState != QueueState.STOPPED) { - throw new IllegalArgumentException("Invalid queue state configuration." - + " We can only use RUNNING or STOPPED."); - } - // If we did not set state in configuration, use Running as default state - QueueState defaultState = QueueState.RUNNING; - - if (previousState == null) { - // If current state of the queue is null, we would inherit the state - // from its parent. If this queue does not has parent, such as root queue, - // we would use the configured state. - if (parentState == null) { - updateQueueState((configuredState == null) ? defaultState - : configuredState); - } else { - if (configuredState == null) { - updateQueueState((parentState == QueueState.DRAINING) ? - QueueState.STOPPED : parentState); - } else if (configuredState == QueueState.RUNNING - && parentState != QueueState.RUNNING) { - throw new IllegalArgumentException( - "The parent queue:" + parent.getQueuePath() - + " cannot be STOPPED as the child queue:" + getQueuePath() - + " is in RUNNING state."); - } else { - updateQueueState(configuredState); - } - } - } else { - // when we get a refreshQueue request from AdminService, - if (previousState == QueueState.RUNNING) { - if (configuredState == QueueState.STOPPED) { - stopQueue(); - } - } else { - if (configuredState == QueueState.RUNNING) { - try { - activateQueue(); - } catch (YarnException ex) { - throw new IllegalArgumentException(ex.getMessage()); - } - } - } - } - } - protected QueueInfo getQueueInfo() { // Deliberately doesn't use lock here, because this method will be invoked // from schedulerApplicationAttempt, to avoid deadlock, sacrifice diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index 244bb62d50..c6d50a1cfb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -19,11 +19,12 @@ import java.util.Set; +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; import org.apache.hadoop.util.Sets; +import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; import org.apache.hadoop.yarn.server.utils.Lock; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; @@ -302,15 +303,4 @@ public static void updateAbsoluteCapacitiesByNodeLabels(QueueCapacities queueCap } } } - - public static ApplicationPlacementContext extractQueuePath(String queuePath) { - int parentQueueNameEndIndex = queuePath.lastIndexOf("."); - if (parentQueueNameEndIndex > -1) { - String parent = queuePath.substring(0, parentQueueNameEndIndex).trim(); - String leaf = queuePath.substring(parentQueueNameEndIndex + 1).trim(); - return new ApplicationPlacementContext(leaf, parent); - } else{ - return new ApplicationPlacementContext(queuePath); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java new file mode 100644 index 0000000000..5ec7d01bce --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/QueueStateHelper.java @@ -0,0 +1,98 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.exceptions.YarnException; + +import java.util.Set; + +/** + * Collects all logic that are handling queue state transitions. + */ +public final class QueueStateHelper { + private static final Set VALID_STATE_CONFIGURATIONS = ImmutableSet.of( + QueueState.RUNNING, QueueState.STOPPED); + private static final QueueState DEFAULT_STATE = QueueState.RUNNING; + + private QueueStateHelper() {} + + /** + * Sets the current state of the queue based on its previous state, its parent's state and its + * configured state. + * @param queue the queue whose state is set + */ + public static void setQueueState(AbstractCSQueue queue) { + QueueState previousState = queue.getState(); + QueueState configuredState = queue.getQueueContext().getConfiguration().getConfiguredState( + queue.getQueuePath()); + QueueState parentState = (queue.getParent() == null) ? null : queue.getParent().getState(); + + // verify that we can not any value for State other than RUNNING/STOPPED + if (configuredState != null && !VALID_STATE_CONFIGURATIONS.contains(configuredState)) { + throw new IllegalArgumentException("Invalid queue state configuration." + + " We can only use RUNNING or STOPPED."); + } + + if (previousState == null) { + initializeState(queue, configuredState, parentState); + } else { + reinitializeState(queue, previousState, configuredState); + } + } + + private static void reinitializeState( + AbstractCSQueue queue, QueueState previousState, QueueState configuredState) { + // when we get a refreshQueue request from AdminService, + if (previousState == QueueState.RUNNING) { + if (configuredState == QueueState.STOPPED) { + queue.stopQueue(); + } + } else { + if (configuredState == QueueState.RUNNING) { + try { + queue.activateQueue(); + } catch (YarnException ex) { + throw new IllegalArgumentException(ex.getMessage()); + } + } + } + } + + private static void initializeState( + AbstractCSQueue queue, QueueState configuredState, QueueState parentState) { + QueueState currentState = configuredState == null ? DEFAULT_STATE : configuredState; + + if (parentState != null) { + if (configuredState == QueueState.RUNNING && parentState != QueueState.RUNNING) { + throw new IllegalArgumentException( + "The parent queue:" + queue.getParent().getQueuePath() + + " cannot be STOPPED as the child queue:" + queue.getQueuePath() + + " is in RUNNING state."); + } + + if (configuredState == null) { + currentState = parentState == QueueState.DRAINING ? QueueState.STOPPED : parentState; + } + } + + queue.updateQueueState(currentState); + } +}