From 0987a7b8cbbbb2c1e4c2095821d98a7db19644df Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 16 Nov 2017 11:22:48 -0800 Subject: [PATCH] YARN-7419. CapacityScheduler: Allow auto leaf queue creation after queue mapping. (Suma Shivaprasad via wangda) Change-Id: Ia1704bb8cb5070e5b180b5a85787d7b9ca57ebc6 --- .../server/resourcemanager/RMAppManager.java | 7 +- .../ApplicationPlacementContext.java | 52 ++ .../placement/PlacementManager.java | 34 +- .../placement/PlacementRule.java | 7 +- .../UserGroupMappingPlacementRule.java | 284 ++++++- .../resourcemanager/rmapp/RMAppImpl.java | 87 +- .../scheduler/capacity/AbstractCSQueue.java | 2 +- .../capacity/AbstractManagedParentQueue.java | 196 +++-- .../capacity/AutoCreatedLeafQueue.java | 27 +- .../scheduler/capacity/CapacityScheduler.java | 157 +++- .../CapacitySchedulerConfiguration.java | 153 ++++ .../CapacitySchedulerQueueManager.java | 103 ++- .../capacity/ManagedParentQueue.java | 158 ++++ .../scheduler/capacity/ParentQueue.java | 13 - .../scheduler/capacity/PlanQueue.java | 25 +- .../event/AppAddedSchedulerEvent.java | 37 +- .../resourcemanager/TestAppManager.java | 29 +- .../TestUserGroupMappingPlacementRule.java | 14 +- .../scheduler/TestSchedulerUtils.java | 1 + .../capacity/TestCapacityScheduler.java | 6 +- ...estCapacitySchedulerAutoQueueCreation.java | 794 ++++++++++++++++++ 21 files changed, 1921 insertions(+), 265 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index d0425907f6..5e82f401b6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -360,13 +360,8 @@ protected void recoverApplication(ApplicationStateData appState, private RMAppImpl createAndPopulateNewRMApp( ApplicationSubmissionContext submissionContext, long submitTime, String user, boolean isRecovery, long startTime) throws YarnException { + if (!isRecovery) { - // Do queue mapping - if (rmContext.getQueuePlacementManager() != null) { - // We only do queue mapping when it's a new application - rmContext.getQueuePlacementManager().placeApplication( - submissionContext, user); - } // fail the submission if configured application timeout value is invalid RMServerUtils.validateApplicationTimeouts( submissionContext.getApplicationTimeouts()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java new file mode 100644 index 0000000000..f2f92b81fb --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/ApplicationPlacementContext.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +/** + * Each placement rule when it successfully places an application onto a queue + * returns a PlacementRuleContext which encapsulates the queue the + * application was mapped to and any parent queue for the queue (if configured) + */ +public class ApplicationPlacementContext { + + private String queue; + + private String parentQueue; + + public ApplicationPlacementContext(String queue) { + this(queue,null); + } + + public ApplicationPlacementContext(String queue, String parentQueue) { + this.queue = queue; + this.parentQueue = parentQueue; + } + + public String getQueue() { + return queue; + } + + public String getParentQueue() { + return parentQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index 43a4deb70e..c0067385b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -23,7 +23,6 @@ import java.util.concurrent.locks.ReentrantReadWriteLock.ReadLock; import java.util.concurrent.locks.ReentrantReadWriteLock.WriteLock; -import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; @@ -53,36 +52,33 @@ public void updateRules(List rules) { } } - public void placeApplication(ApplicationSubmissionContext asc, String user) - throws YarnException { + public ApplicationPlacementContext placeApplication( + ApplicationSubmissionContext asc, String user) throws YarnException { + try { readLock.lock(); + if (null == rules || rules.isEmpty()) { - return; + return null; } - - String newQueueName = null; + + ApplicationPlacementContext placement = null; for (PlacementRule rule : rules) { - newQueueName = rule.getQueueForApp(asc, user); - if (newQueueName != null) { + placement = rule.getPlacementForApp(asc, user); + if (placement != null) { break; } } - + // Failed to get where to place application - if (null == newQueueName && null == asc.getQueue()) { - String msg = "Failed to get where to place application=" - + asc.getApplicationId(); + if (null == placement && null == asc.getQueue()) { + String msg = "Failed to get where to place application=" + asc + .getApplicationId(); LOG.error(msg); throw new YarnException(msg); } - - // Set it to ApplicationSubmissionContext - if (!StringUtils.equals(asc.getQueue(), newQueueName)) { - LOG.info("Placed application=" + asc.getApplicationId() + " to queue=" - + newQueueName + ", original queue=" + asc.getQueue()); - asc.setQueue(newQueueName); - } + + return placement; } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java index 47dc48a51c..a9d5e3337e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java @@ -25,6 +25,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.RMContext; public abstract class PlacementRule { + public String getName() { return this.getClass().getName(); } @@ -50,6 +51,6 @@ public void initialize(Map parameters, RMContext rmContext) * in the {@link PlacementManager} will take care *

*/ - public abstract String getQueueForApp(ApplicationSubmissionContext asc, - String user) throws YarnException; -} + public abstract ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) throws YarnException; +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index d617d16185..9901f4a1e0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -19,8 +19,10 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement; import java.io.IOException; +import java.util.ArrayList; import java.util.List; +import org.apache.commons.lang.StringUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -32,6 +34,15 @@ import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping.MappingType; import com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; public class UserGroupMappingPlacementRule extends PlacementRule { private static final Log LOG = LogFactory @@ -66,17 +77,41 @@ public String toString() { MappingType type; String source; String queue; + String parentQueue; + + public final static String DELIMITER = ":"; public QueueMapping(MappingType type, String source, String queue) { this.type = type; this.source = source; this.queue = queue; + this.parentQueue = null; } - + + public QueueMapping(MappingType type, String source, + String queue, String parentQueue) { + this.type = type; + this.source = source; + this.queue = queue; + this.parentQueue = parentQueue; + } + public String getQueue() { return queue; } - + + public String getParentQueue() { + return parentQueue; + } + + public MappingType getType() { + return type; + } + + public String getSource() { + return source; + } + @Override public int hashCode() { return super.hashCode(); @@ -93,6 +128,13 @@ public boolean equals(Object obj) { return false; } } + + public String toString() { + return type.toString() + DELIMITER + source + DELIMITER + + (parentQueue != null ? + parentQueue + "." + queue : + queue); + } } public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, @@ -102,26 +144,27 @@ public UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, this.groups = groups; } - private String getMappedQueue(String user) throws IOException { + private ApplicationPlacementContext getPlacementForUser(String user) + throws IOException { for (QueueMapping mapping : mappings) { if (mapping.type == MappingType.USER) { if (mapping.source.equals(CURRENT_USER_MAPPING)) { if (mapping.queue.equals(CURRENT_USER_MAPPING)) { - return user; + return getPlacementContext(mapping, user); } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { - return groups.getGroups(user).get(0); + return getPlacementContext(mapping, groups.getGroups(user).get(0)); } else { - return mapping.queue; + return getPlacementContext(mapping); } } if (user.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } if (mapping.type == MappingType.GROUP) { for (String userGroups : groups.getGroups(user)) { if (userGroups.equals(mapping.source)) { - return mapping.queue; + return getPlacementContext(mapping); } } } @@ -130,13 +173,14 @@ private String getMappedQueue(String user) throws IOException { } @Override - public String getQueueForApp(ApplicationSubmissionContext asc, String user) + public ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) throws YarnException { String queueName = asc.getQueue(); ApplicationId applicationId = asc.getApplicationId(); if (mappings != null && mappings.size() > 0) { try { - String mappedQueue = getMappedQueue(user); + ApplicationPlacementContext mappedQueue = getPlacementForUser(user); if (mappedQueue != null) { // We have a mapping, should we use it? if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) @@ -153,10 +197,224 @@ public String getQueueForApp(ApplicationSubmissionContext asc, String user) throw new YarnException(message); } } - - return queueName; + return null; } - + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping) { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, + String leafQueueName) { + if (!StringUtils.isEmpty(mapping.parentQueue)) { + return new ApplicationPlacementContext(leafQueueName, + mapping.getParentQueue()); + } else{ + return new ApplicationPlacementContext(leafQueueName); + } + } + + @VisibleForTesting + public static UserGroupMappingPlacementRule get( + CapacitySchedulerContext schedulerContext) throws IOException { + CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized queue mappings, override: " + overrideWithQueueMappings); + + List queueMappings = conf.getQueueMappings(); + + // Get new user/group mappings + List newMappings = new ArrayList<>(); + + CapacitySchedulerQueueManager queueManager = + schedulerContext.getCapacitySchedulerQueueManager(); + + // check if mappings refer to valid queues + for (QueueMapping mapping : queueMappings) { + + QueuePath queuePath = extractQueuePath(mapping.getQueue()); + if (isStaticQueueMapping(mapping)) { + //Try getting queue by its leaf queue name + // without splitting into parent/leaf queues + CSQueue queue = queueManager.getQueue(mapping.getQueue()); + if (ifQueueDoesNotExist(queue)) { + //Try getting the queue by extracting leaf and parent queue names + //Assuming its a potential auto created leaf queue + queue = queueManager.getQueue(queuePath.getLeafQueue()); + + if (ifQueueDoesNotExist(queue)) { + //if leaf queue does not exist, + // this could be a potential auto created leaf queue + //validate if parent queue is specified, + // then it should exist and + // be an instance of AutoCreateEnabledParentQueue + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping + .getQueue()); + } + newMappings.add(newMapping); + } else{ + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, + queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + // if queue exists, validate + // if its an instance of leaf queue + // if its an instance of auto created leaf queue, + // then extract parent queue name and update queue mapping + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, + queue, mapping, queuePath); + newMappings.add(newMapping); + } + } else{ + //If it is a dynamic queue mapping, + // we can safely assume leaf queue name does not have '.' in it + // validate + // if parent queue is specified, then + // parent queue exists and an instance of AutoCreateEnabledParentQueue + // + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping != null) { + newMappings.add(newMapping); + } else{ + newMappings.add(mapping); + } + } + } + + // initialize groups if mappings are present + if (newMappings.size() > 0) { + Groups groups = new Groups(conf); + return new UserGroupMappingPlacementRule(overrideWithQueueMappings, + newMappings, groups); + } + + return null; + } + + private static QueueMapping validateAndGetQueueMapping( + CapacitySchedulerQueueManager queueManager, CSQueue queue, + QueueMapping mapping, QueuePath queuePath) throws IOException { + if (!(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue : " + mapping.getQueue()); + } + + if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping, queuePath); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping.getQueue()); + } + return newMapping; + } + return mapping; + } + + private static boolean ifQueueDoesNotExist(CSQueue queue) { + return queue == null; + } + + private static QueueMapping validateAndGetAutoCreatedQueueMapping( + CapacitySchedulerQueueManager queueManager, QueueMapping mapping, + QueuePath queuePath) throws IOException { + if (queuePath.hasParentQueue()) { + //if parent queue is specified, + // then it should exist and be an instance of ManagedParentQueue + validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()), + queuePath.getParentQueue(), queuePath.getLeafQueue()); + return new QueueMapping(mapping.getType(), mapping.getSource(), + queuePath.getLeafQueue(), queuePath.getParentQueue()); + } + + return null; + } + + private static boolean isStaticQueueMapping(QueueMapping mapping) { + return !mapping.getQueue().contains( + UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mapping + .getQueue().contains( + UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING); + } + + private static class QueuePath { + + public String parentQueue; + public String leafQueue; + + public QueuePath(final String leafQueue) { + this.leafQueue = leafQueue; + } + + public QueuePath(final String parentQueue, final String leafQueue) { + this.parentQueue = parentQueue; + this.leafQueue = leafQueue; + } + + public String getParentQueue() { + return parentQueue; + } + + public String getLeafQueue() { + return leafQueue; + } + + public boolean hasParentQueue() { + return parentQueue != null; + } + + @Override + public String toString() { + return parentQueue + DOT + leafQueue; + } + } + + private static QueuePath extractQueuePath(String queueName) + throws IOException { + int parentQueueNameEndIndex = queueName.lastIndexOf(DOT); + + if (parentQueueNameEndIndex > -1) { + final String parentQueue = queueName.substring(0, parentQueueNameEndIndex) + .trim(); + final String leafQueue = queueName.substring(parentQueueNameEndIndex + 1) + .trim(); + return new QueuePath(parentQueue, leafQueue); + } + + return new QueuePath(queueName); + } + + private static void validateParentQueue(CSQueue parentQueue, + String parentQueueName, String leafQueueName) throws IOException { + if (parentQueue == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue [" + parentQueueName + "]"); + } else if (!(parentQueue instanceof ManagedParentQueue)) { + throw new IOException("mapping contains leaf queue [" + leafQueueName + + "] and invalid parent queue which " + + "does not have auto creation of leaf queues enabled [" + + parentQueueName + "]"); + } else if (!parentQueue.getQueueName().equals(parentQueueName)) { + throw new IOException( + "mapping contains invalid or non-leaf queue [" + leafQueueName + + "] and invalid parent queue " + + "which does not match existing leaf queue's parent : [" + + parentQueueName + "] does not match [ " + parentQueue + .getQueueName() + "]"); + } + } + @VisibleForTesting public List getQueueMappings() { return mappings; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java index ae5f6b487c..85d355fe00 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/rmapp/RMAppImpl.java @@ -68,6 +68,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.security.AMRMTokenIdentifier; import org.apache.hadoop.yarn.security.client.ClientToAMTokenIdentifier; @@ -83,6 +84,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.blacklist.BlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.DisabledBlacklistManager; import org.apache.hadoop.yarn.server.resourcemanager.blacklist.SimpleBlacklistManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.Recoverable; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -158,6 +161,8 @@ public class RMAppImpl implements RMApp, Recoverable { private boolean isNumAttemptsBeyondThreshold = false; + + // Mutable fields private long startTime; private long finishTime = 0; @@ -1073,38 +1078,51 @@ public RMAppState transition(RMAppImpl app, RMAppEvent event) { app.getUser(), BuilderUtils.parseTokensConf(app.submissionContext)); } catch (Exception e) { - String msg = "Failed to fetch user credentials from application:" - + e.getMessage(); + String msg = "Failed to fetch user credentials from application:" + e + .getMessage(); app.diagnostics.append(msg); LOG.error(msg, e); } } - for (Map.Entry timeout : - app.applicationTimeouts.entrySet()) { + for (Map.Entry timeout : app.applicationTimeouts + .entrySet()) { app.rmContext.getRMAppLifetimeMonitor().registerApp(app.applicationId, timeout.getKey(), timeout.getValue()); if (LOG.isDebugEnabled()) { long remainingTime = timeout.getValue() - app.systemClock.getTime(); LOG.debug("Application " + app.applicationId + " is registered for timeout monitor, type=" + timeout.getKey() - + " remaining timeout=" - + (remainingTime > 0 ? remainingTime / 1000 : 0) + " seconds"); + + " remaining timeout=" + (remainingTime > 0 ? + remainingTime / 1000 : + 0) + " seconds"); } } + ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(app.rmContext, + app.submissionContext, app.user); + } catch (Exception e) { + String msg = "Failed to place application to queue :" + e.getMessage(); + app.diagnostics.append(msg); + LOG.error(msg, e); + } + // No existent attempts means the attempt associated with this app was not // started or started but not yet saved. if (app.attempts.isEmpty()) { - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false, app.applicationPriority)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false, + app.applicationPriority, placementContext)); return RMAppState.SUBMITTED; } // Add application to scheduler synchronously to guarantee scheduler // knows applications before AM or NM re-registers. - app.scheduler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, true, app.applicationPriority)); + app.scheduler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, true, + app.applicationPriority, placementContext)); // recover attempts app.recoverAppAttempts(); @@ -1120,8 +1138,20 @@ private static final class AddApplicationToSchedulerTransition extends RMAppTransition { @Override public void transition(RMAppImpl app, RMAppEvent event) { - app.handler.handle(new AppAddedSchedulerEvent(app.user, - app.submissionContext, false, app.applicationPriority)); + ApplicationPlacementContext placementContext = null; + try { + placementContext = placeApplication(app.rmContext, + app.submissionContext, app.user); + replaceQueueFromPlacementContext(placementContext, + app.submissionContext); + } catch (YarnException e) { + String msg = "Failed to place application to queue :" + e.getMessage(); + app.diagnostics.append(msg); + LOG.error(msg, e); + } + app.handler.handle( + new AppAddedSchedulerEvent(app.user, app.submissionContext, false, + app.applicationPriority, placementContext)); // send the ATS create Event app.sendATSCreateEvent(); } @@ -2013,4 +2043,37 @@ private void clearUnusedFields() { this.submissionContext.setAMContainerSpec(null); this.submissionContext.setLogAggregationContext(null); } + + @VisibleForTesting + static ApplicationPlacementContext placeApplication(RMContext rmContext, + ApplicationSubmissionContext context, String user) throws YarnException { + + ApplicationPlacementContext placementContext = null; + PlacementManager placementManager = rmContext.getQueuePlacementManager(); + + if (placementManager != null) { + placementContext = placementManager.placeApplication(context, user); + } else{ + LOG.error( + "Queue Placement Manager is null. Cannot place application :" + " " + + context.getApplicationId() + " to queue "); + } + + return placementContext; + } + + static void replaceQueueFromPlacementContext( + ApplicationPlacementContext placementContext, + ApplicationSubmissionContext context) { + // Set it to ApplicationSubmissionContext + //apply queue mapping only to new application submissions + if (placementContext != null && !StringUtils.equals(context.getQueue(), + placementContext.getQueue())) { + LOG.info("Placed application=" + context.getApplicationId() + " to queue=" + + placementContext.getQueue() + ", original queue=" + context + .getQueue()); + context.setQueue(placementContext.getQueue()); + } + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 183cb36278..74c85ce2bf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -78,7 +78,7 @@ public abstract class AbstractCSQueue implements CSQueue { final String queueName; private final String queuePath; volatile int numContainers; - + final Resource minimumAllocation; volatile Resource maximumAllocation; private volatile QueueState state = null; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java index b3d1b4738d..46f5cf113e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractManagedParentQueue.java @@ -35,31 +35,13 @@ public abstract class AbstractManagedParentQueue extends ParentQueue { private static final Logger LOG = LoggerFactory.getLogger( AbstractManagedParentQueue.class); - private int maxAppsForAutoCreatedQueues; - private int maxAppsPerUserForAutoCreatedQueues; - private int userLimit; - private float userLimitFactor; + protected AutoCreatedLeafQueueTemplate leafQueueTemplate; public AbstractManagedParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); super.setupQueueConfigs(csContext.getClusterResource()); - initializeLeafQueueConfigs(); - - StringBuffer queueInfo = new StringBuffer(); - queueInfo.append("Created Managed Parent Queue: ").append(queueName) - .append("\nof type : [" + getClass()) - .append("]\nwith capacity: [") - .append(super.getCapacity()).append("]\nwith max capacity: [") - .append(super.getMaximumCapacity()).append("\nwith max apps: [") - .append(getMaxApplicationsForAutoCreatedQueues()) - .append("]\nwith max apps per user: [") - .append(getMaxApplicationsPerUserForAutoCreatedQueues()) - .append("]\nwith user limit: [").append(getUserLimit()) - .append("]\nwith user limit factor: [") - .append(getUserLimitFactor()).append("]."); - LOG.info(queueInfo.toString()); } @Override @@ -71,8 +53,6 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) // Set new configs setupQueueConfigs(clusterResource); - initializeLeafQueueConfigs(); - // run reinitialize on each existing queue, to trigger absolute cap // recomputations for (CSQueue res : this.getChildQueues()) { @@ -87,72 +67,29 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) * Initialize leaf queue configs from template configurations specified on * parent queue. */ - protected void initializeLeafQueueConfigs() { + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + (String queuePath) { CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - final String queuePath = super.getQueuePath(); + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplateBuilder = new + AutoCreatedLeafQueueTemplate.Builder(); int maxApps = conf.getMaximumApplicationsPerQueue(queuePath); if (maxApps < 0) { maxApps = (int) ( CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * getAbsoluteCapacity()); } - userLimit = conf.getUserLimit(queuePath); - userLimitFactor = conf.getUserLimitFactor(queuePath); - maxAppsForAutoCreatedQueues = maxApps; - maxAppsPerUserForAutoCreatedQueues = - (int) (maxApps * (userLimit / 100.0f) * userLimitFactor); - } + int userLimit = conf.getUserLimit(queuePath); + float userLimitFactor = conf.getUserLimitFactor(queuePath); + leafQueueTemplateBuilder.userLimit(userLimit) + .userLimitFactor(userLimitFactor) + .maxApps(maxApps) + .maxAppsPerUser( + (int) (maxApps * (userLimit / 100.0f) * userLimitFactor)); - /** - * Number of maximum applications for each of the auto created leaf queues. - * - * @return maxAppsForAutoCreatedQueues - */ - public int getMaxApplicationsForAutoCreatedQueues() { - return maxAppsForAutoCreatedQueues; - } - - /** - * Number of maximum applications per user for each of the auto created - * leaf queues. - * - * @return maxAppsPerUserForAutoCreatedQueues - */ - public int getMaxApplicationsPerUserForAutoCreatedQueues() { - return maxAppsPerUserForAutoCreatedQueues; - } - - /** - * User limit value for each of the auto created leaf queues. - * - * @return userLimit - */ - public int getUserLimitForAutoCreatedQueues() { - return userLimit; - } - - /** - * User limit factor value for each of the auto created leaf queues. - * - * @return userLimitFactor - */ - public float getUserLimitFactor() { - return userLimitFactor; - } - - public int getMaxAppsForAutoCreatedQueues() { - return maxAppsForAutoCreatedQueues; - } - - public int getMaxAppsPerUserForAutoCreatedQueues() { - return maxAppsPerUserForAutoCreatedQueues; - } - - public int getUserLimit() { - return userLimit; + return leafQueueTemplateBuilder; } /** @@ -229,4 +166,111 @@ public CSQueue removeChildQueue(String childQueueName) } return childQueue; } + + protected float sumOfChildCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } + + protected float sumOfChildAbsCapacities() { + try { + writeLock.lock(); + float ret = 0; + for (CSQueue l : childQueues) { + ret += l.getAbsoluteCapacity(); + } + return ret; + } finally { + writeLock.unlock(); + } + } + + public static class AutoCreatedLeafQueueTemplate { + + private QueueCapacities queueCapacities; + + private int maxApps; + private int maxAppsPerUser; + private int userLimit; + private float userLimitFactor; + + AutoCreatedLeafQueueTemplate(Builder builder) { + this.maxApps = builder.maxApps; + this.maxAppsPerUser = builder.maxAppsPerUser; + this.userLimit = builder.userLimit; + this.userLimitFactor = builder.userLimitFactor; + this.queueCapacities = builder.queueCapacities; + } + + public static class Builder { + private int maxApps; + private int maxAppsPerUser; + + private int userLimit; + private float userLimitFactor; + + private QueueCapacities queueCapacities; + + Builder maxApps(int maxApplications) { + this.maxApps = maxApplications; + return this; + } + + Builder maxAppsPerUser(int maxApplicationsPerUser) { + this.maxAppsPerUser = maxApplicationsPerUser; + return this; + } + + Builder userLimit(int usrLimit) { + this.userLimit = usrLimit; + return this; + } + + Builder userLimitFactor(float ulf) { + this.userLimitFactor = ulf; + return this; + } + + Builder capacities(QueueCapacities capacities) { + this.queueCapacities = capacities; + return this; + } + + AutoCreatedLeafQueueTemplate build() { + return new AutoCreatedLeafQueueTemplate(this); + } + } + + public int getUserLimit() { + return userLimit; + } + + public float getUserLimitFactor() { + return userLimitFactor; + } + + public QueueCapacities getQueueCapacities() { + return queueCapacities; + } + + public int getMaxApps() { + return maxApps; + } + + public int getMaxAppsPerUser() { + return maxAppsPerUser; + } + } + + public AutoCreatedLeafQueueTemplate getLeafQueueTemplate() { + return leafQueueTemplate; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java index 4eb7cdd9d9..bc206d4152 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AutoCreatedLeafQueue.java @@ -21,6 +21,8 @@ import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity + .AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -42,17 +44,18 @@ public AutoCreatedLeafQueue(CapacitySchedulerContext cs, String queueName, AbstractManagedParentQueue parent) throws IOException { super(cs, queueName, parent, null); - updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForAutoCreatedQueues(), - parent.getMaxApplicationsPerUserForAutoCreatedQueues()); - + AutoCreatedLeafQueueTemplate leafQueueTemplate = + parent.getLeafQueueTemplate(); + updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), + leafQueueTemplate.getUserLimitFactor(), + leafQueueTemplate.getMaxApps(), + leafQueueTemplate.getMaxAppsPerUser()); this.parent = parent; } @Override - public void reinitialize(CSQueue newlyParsedQueue, - Resource clusterResource) throws IOException { + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { try { writeLock.lock(); @@ -62,10 +65,12 @@ public void reinitialize(CSQueue newlyParsedQueue, CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, null); - updateApplicationAndUserLimits(parent.getUserLimitForAutoCreatedQueues(), - parent.getUserLimitFactor(), - parent.getMaxApplicationsForAutoCreatedQueues(), - parent.getMaxApplicationsPerUserForAutoCreatedQueues()); + AutoCreatedLeafQueueTemplate leafQueueTemplate = + parent.getLeafQueueTemplate(); + updateApplicationAndUserLimits(leafQueueTemplate.getUserLimit(), + leafQueueTemplate.getUserLimitFactor(), + leafQueueTemplate.getMaxApps(), + leafQueueTemplate.getMaxAppsPerUser()); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 6f630f8d7c..ed30ad181e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -41,7 +41,6 @@ import org.apache.hadoop.conf.Configurable; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; @@ -64,10 +63,10 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; -import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationConstants; @@ -146,6 +145,8 @@ import com.google.common.base.Preconditions; import com.google.common.util.concurrent.SettableFuture; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING; + @LimitedPrivate("yarn") @Evolving @SuppressWarnings("unchecked") @@ -560,44 +561,17 @@ public int getPendingBacklogs() { } @VisibleForTesting - public UserGroupMappingPlacementRule - getUserGroupMappingPlacementRule() throws IOException { + public PlacementRule getUserGroupMappingPlacementRule() throws IOException { try { readLock.lock(); - boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); - LOG.info( - "Initialized queue mappings, override: " + overrideWithQueueMappings); - - // Get new user/group mappings - List newMappings = conf.getQueueMappings(); - // check if mappings refer to valid queues - for (QueueMapping mapping : newMappings) { - String mappingQueue = mapping.getQueue(); - if (!mappingQueue.equals( - UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) && !mappingQueue - .equals(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)) { - CSQueue queue = getQueue(mappingQueue); - if (queue == null || !(queue instanceof LeafQueue)) { - throw new IOException( - "mapping contains invalid or non-leaf queue " + mappingQueue); - } - } - } - - // initialize groups if mappings are present - if (newMappings.size() > 0) { - Groups groups = new Groups(conf); - return new UserGroupMappingPlacementRule(overrideWithQueueMappings, - newMappings, groups); - } - - return null; + return UserGroupMappingPlacementRule.get(this); } finally { readLock.unlock(); } } - private void updatePlacementRules() throws IOException { + @VisibleForTesting + void updatePlacementRules() throws IOException { // Initialize placement rules Collection placementRuleStrs = conf.getStringCollection( YarnConfiguration.QUEUE_PLACEMENT_RULES); @@ -731,37 +705,92 @@ private void addApplicationOnRecovery( } } - private void addApplication(ApplicationId applicationId, - String queueName, String user, Priority priority) { + private void addApplication(ApplicationId applicationId, String queueName, + String user, Priority priority, + ApplicationPlacementContext placementContext) { try { writeLock.lock(); if (isSystemAppsLimitReached()) { String message = "Maximum system application limit reached," + "cannot accept submission of application: " + applicationId; - this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( - applicationId, RMAppEventType.APP_REJECTED, message)); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); return; } // Sanity checks. CSQueue queue = getQueue(queueName); + + if (queue == null && placementContext != null) { + //Could be a potential auto-created leaf queue + try { + queue = autoCreateLeafQueue(placementContext); + } catch (YarnException | IOException e) { + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user : " + user + + " to queue : " + queueName + " failed : " + e.getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + } + } + if (queue == null) { - String message = + final String message = "Application " + applicationId + " submitted by user " + user + " to unknown queue: " + queueName; + this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); return; } + if (!(queue instanceof LeafQueue)) { String message = - "Application " + applicationId + " submitted by user " + user - + " to non-leaf queue: " + queueName; + "Application " + applicationId + " submitted by user : " + user + + " to non-leaf queue : " + queueName; this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, message)); return; + } else if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + //If queue already exists and auto-queue creation was not required, + //placement context should not be null + if (placementContext == null) { + String message = + "Application " + applicationId + " submission by user : " + user + + " to specified queue : " + queueName + " is prohibited. " + + "Verify automatic queue mapping for user exists in " + + QUEUE_MAPPING; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + // For a queue which exists already and + // not auto-created above, then its parent queue should match + // the parent queue specified in queue mapping + } else if (!queue.getParent().getQueueName().equals( + placementContext.getParentQueue())) { + String message = + "Auto created Leaf queue " + placementContext.getQueue() + " " + + "already exists under queue : " + queue + .getParent().getQueuePath() + + ".But Queue mapping configuration " + + CapacitySchedulerConfiguration.QUEUE_MAPPING + " has been " + + "updated to a different parent queue : " + + placementContext.getParentQueue() + + " for the specified user : " + user; + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); + return; + } } + // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); @@ -1483,7 +1512,8 @@ public void handle(SchedulerEvent event) { if (queueName != null) { if (!appAddedEvent.getIsAppRecovering()) { addApplication(appAddedEvent.getApplicationId(), queueName, - appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); + appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority(), + appAddedEvent.getPlacementContext()); } else { addApplicationOnRecovery(appAddedEvent.getApplicationId(), queueName, appAddedEvent.getUser(), appAddedEvent.getApplicatonPriority()); @@ -2001,7 +2031,8 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) try { writeLock.lock(); LeafQueue queue = this.queueManager.getAndCheckLeafQueue(inQueue); - ParentQueue parent = (ParentQueue) queue.getParent(); + AbstractManagedParentQueue parent = (AbstractManagedParentQueue) queue + .getParent(); if (!(queue instanceof AutoCreatedLeafQueue)) { throw new SchedulerDynamicEditException( @@ -2010,7 +2041,8 @@ public void setEntitlement(String inQueue, QueueEntitlement entitlement) } if (parent == null - || !(AbstractManagedParentQueue.class.isAssignableFrom(parent.getClass()))) { + || !(AbstractManagedParentQueue.class.isAssignableFrom( + parent.getClass()))) { throw new SchedulerDynamicEditException( "The parent of AutoCreatedLeafQueue " + inQueue + " must be a PlanQueue/ManagedParentQueue"); @@ -2655,4 +2687,43 @@ public MutableConfigurationProvider getMutableConfProvider() { } return null; } + + private LeafQueue autoCreateLeafQueue( + ApplicationPlacementContext placementContext) + throws IOException, YarnException { + + AutoCreatedLeafQueue autoCreatedLeafQueue = null; + + String leafQueueName = placementContext.getQueue(); + String parentQueueName = placementContext.getParentQueue(); + + if (!StringUtils.isEmpty(parentQueueName)) { + CSQueue parentQueue = getQueue(parentQueueName); + + if (parentQueue != null && conf.isAutoCreateChildQueueEnabled( + parentQueue.getQueuePath())) { + + ManagedParentQueue autoCreateEnabledParentQueue = + (ManagedParentQueue) parentQueue; + autoCreatedLeafQueue = new AutoCreatedLeafQueue(this, leafQueueName, + autoCreateEnabledParentQueue); + + addQueue(autoCreatedLeafQueue); + + //TODO - Set entitlement through capacity management policy + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping specifies an invalid parent queue " + + "which does not exist " + + parentQueueName); + } + } else{ + throw new SchedulerDynamicEditException( + "Could not auto-create leaf queue for " + leafQueueName + + ". Queue mapping does not specify" + + " which parent queue it needs to be created under."); + } + return autoCreatedLeafQueue; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index bfead35934..4515453c93 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -907,6 +907,12 @@ public boolean getOverrideWithQueueMappings() { DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE); } + @Private + @VisibleForTesting + public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) { + setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings); + } + /** * Returns a collection of strings, trimming leading and trailing whitespeace * on each value @@ -981,6 +987,31 @@ public List getQueueMappings() { return mappings; } + @Private + @VisibleForTesting + public void setQueuePlacementRules(Collection queuePlacementRules) { + if (queuePlacementRules == null) { + return; + } + String str = StringUtils.join(",", queuePlacementRules); + setStrings(YarnConfiguration.QUEUE_PLACEMENT_RULES, str); + } + + @Private + @VisibleForTesting + public void setQueueMappings(List queueMappings) { + if (queueMappings == null) { + return; + } + + List queueMappingStrs = new ArrayList<>(); + for (QueueMapping mapping : queueMappings) { + queueMappingStrs.add(mapping.toString()); + } + + setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); + } + public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); @@ -1523,4 +1554,126 @@ public long getDefaultLifetimePerQueue(String queue) { public void setDefaultLifetimePerQueue(String queue, long defaultLifetime) { setLong(getQueuePrefix(queue) + DEFAULT_LIFETIME_SUFFIX, defaultLifetime); } + + @Private + public static final boolean DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED = false; + + @Private + public static final String AUTO_CREATE_CHILD_QUEUE_ENABLED = + "auto-create-child-queue.enabled"; + + @Private + public static final String AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX = + "leaf-queue-template"; + + @Private + public static final String AUTO_CREATE_QUEUE_MAX_QUEUES = + "auto-create-child-queue.max-queues"; + + @Private + public static final int DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES = 1000; + + /** + * If true, this queue will be created as a Parent Queue which Auto Created + * leaf child queues + * + * @param queuePath The queues path + * @return true if auto create is enabled for child queues else false. Default + * is false + */ + @Private + public boolean isAutoCreateChildQueueEnabled(String queuePath) { + boolean isAutoCreateEnabled = getBoolean( + getQueuePrefix(queuePath) + AUTO_CREATE_CHILD_QUEUE_ENABLED, + DEFAULT_AUTO_CREATE_CHILD_QUEUE_ENABLED); + return isAutoCreateEnabled; + } + + @Private + @VisibleForTesting + public void setAutoCreateChildQueueEnabled(String queuePath, + boolean autoCreationEnabled) { + setBoolean(getQueuePrefix(queuePath) + + AUTO_CREATE_CHILD_QUEUE_ENABLED, + autoCreationEnabled); + } + + /** + * Get the auto created leaf queue's template configuration prefix + * Leaf queue's template capacities are configured at the parent queue + * + * @param queuePath parent queue's path + * @return Config prefix for leaf queue template configurations + */ + @Private + public String getAutoCreatedQueueTemplateConfPrefix(String queuePath) { + return queuePath + DOT + AUTO_CREATED_LEAF_QUEUE_TEMPLATE_PREFIX; + } + + @Private + public static final String FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = + "auto-create-child-queue.fail-on-exceeding-parent-capacity"; + + @Private + public static final boolean DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY = + false; + + /** + * Fail further auto leaf queue creation when parent's guaranteed capacity is + * exceeded. + * + * @param queuePath the parent queue's path + * @return true if configured to fail else false + */ + @Private + public boolean getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + String queuePath) { + boolean shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity = + getBoolean(getQueuePrefix(queuePath) + + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, + DEFAULT_FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY); + return shouldFailAutoQueueCreationOnExceedingGuaranteedCapacity; + } + + @VisibleForTesting + @Private + public void setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + String queuePath, boolean autoCreationEnabled) { + setBoolean( + getQueuePrefix(queuePath) + + FAIL_AUTO_CREATION_ON_EXCEEDING_CAPACITY, + autoCreationEnabled); + } + + /** + * Get the max number of leaf queues that are allowed to be created under + * a parent queue + * + * @param queuePath the paret queue's path + * @return the max number of leaf queues allowed to be auto created + */ + @Private + public int getAutoCreatedQueuesMaxChildQueuesLimit(String queuePath) { + return getInt(getQueuePrefix(queuePath) + + AUTO_CREATE_QUEUE_MAX_QUEUES, + DEFAULT_AUTO_CREATE_QUEUE_MAX_QUEUES); + } + + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateCapacity(String queuePath, + float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setCapacity(leafQueueConfPrefix, val); + } + + @Private + @VisibleForTesting + public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath, + float val) { + String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix( + queuePath); + setMaximumCapacity(leafQueueConfPrefix, val); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index 7be2529a43..eb501233b2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -154,7 +154,7 @@ public void setCapacitySchedulerContext( * @throws IOException if fails to initialize queues */ public void initializeQueues(CapacitySchedulerConfiguration conf) - throws IOException { + throws IOException { root = parseQueue(this.csContext, conf, null, CapacitySchedulerConfiguration.ROOT, queues, queues, NOOP); setQueueAcls(authorizer, appPriorityACLManager, queues); @@ -176,7 +176,7 @@ public void reinitializeQueues(CapacitySchedulerConfiguration newConf) if (!csContext.isConfigurationMutable() || csContext.getRMContext().getHAServiceState() != HAServiceProtocol.HAServiceState.STANDBY) { - // Ensure queue hiearchy in the new XML file is proper. + // Ensure queue hierarchy in the new XML file is proper. validateQueueHierarchy(queues, newQueues); } @@ -216,11 +216,13 @@ static CSQueue parseQueue( Map oldQueues, QueueHook hook) throws IOException { CSQueue queue; - String fullQueueName = - (parent == null) ? queueName - : (parent.getQueuePath() + "." + queueName); + String fullQueueName = (parent == null) ? + queueName : + (parent.getQueuePath() + "." + queueName); String[] childQueueNames = conf.getQueues(fullQueueName); boolean isReservableQueue = conf.isReservable(fullQueueName); + boolean isAutoCreateEnabled = conf.isAutoCreateChildQueueEnabled( + fullQueueName); if (childQueueNames == null || childQueueNames.length == 0) { if (null == parent) { throw new IllegalStateException( @@ -229,9 +231,8 @@ static CSQueue parseQueue( // Check if the queue will be dynamically managed by the Reservation // system if (isReservableQueue) { - queue = - new PlanQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + queue = new PlanQueue(csContext, queueName, parent, + oldQueues.get(queueName)); //initializing the "internal" default queue, for SLS compatibility String defReservationId = @@ -249,38 +250,46 @@ static CSQueue parseQueue( ((PlanQueue) queue).setChildQueues(childQueues); queues.put(defReservationId, resQueue); - } else { - queue = - new LeafQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + } else if (isAutoCreateEnabled) { + queue = new ManagedParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else{ + queue = new LeafQueue(csContext, queueName, parent, + oldQueues.get(queueName)); // Used only for unit tests queue = hook.hook(queue); } - } else { + } else{ if (isReservableQueue) { throw new IllegalStateException( "Only Leaf Queues can be reservable for " + queueName); } - ParentQueue parentQueue = - new ParentQueue(csContext, queueName, parent, - oldQueues.get(queueName)); + + ParentQueue parentQueue; + if (isAutoCreateEnabled) { + parentQueue = new ManagedParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } else{ + parentQueue = new ParentQueue(csContext, queueName, parent, + oldQueues.get(queueName)); + } // Used only for unit tests queue = hook.hook(parentQueue); List childQueues = new ArrayList<>(); for (String childQueueName : childQueueNames) { - CSQueue childQueue = - parseQueue(csContext, conf, queue, childQueueName, - queues, oldQueues, hook); + CSQueue childQueue = parseQueue(csContext, conf, queue, childQueueName, + queues, oldQueues, hook); childQueues.add(childQueue); } parentQueue.setChildQueues(childQueues); + } - if (queue instanceof LeafQueue && queues.containsKey(queueName) - && queues.get(queueName) instanceof LeafQueue) { + if (queue instanceof LeafQueue && queues.containsKey(queueName) && queues + .get(queueName) instanceof LeafQueue) { throw new IOException("Two leaf queues were named " + queueName + ". Leaf queue names must be distinct"); } @@ -312,27 +321,46 @@ private void validateQueueHierarchy(Map queues, if (oldQueue.getState() == QueueState.STOPPED) { LOG.info("Deleting Queue " + queueName + ", as it is not" + " present in the modified capacity configuration xml"); - } else { + } else{ throw new IOException(oldQueue.getQueuePath() + " is deleted from" + " the new capacity scheduler configuration, but the" - + " queue is not yet in stopped state. " - + "Current State : " + oldQueue.getState()); + + " queue is not yet in stopped state. " + "Current State : " + + oldQueue.getState()); } } else if (!oldQueue.getQueuePath().equals(newQueue.getQueuePath())) { //Queue's cannot be moved from one hierarchy to other - throw new IOException(queueName + " is moved from:" - + oldQueue.getQueuePath() + " to:" + newQueue.getQueuePath() - + " after refresh, which is not allowed."); - } else if (oldQueue instanceof LeafQueue + throw new IOException( + queueName + " is moved from:" + oldQueue.getQueuePath() + " to:" + + newQueue.getQueuePath() + + " after refresh, which is not allowed."); + } else if (oldQueue instanceof ParentQueue + && !(oldQueue instanceof ManagedParentQueue) + && newQueue instanceof ManagedParentQueue) { + throw new IOException( + "Can not convert parent queue: " + oldQueue.getQueuePath() + + " to auto create enabled parent queue since " + + "it could have other pre-configured queues which is not " + + "supported"); + } else if (oldQueue instanceof ManagedParentQueue + && !(newQueue instanceof ManagedParentQueue)) { + throw new IOException( + "Cannot convert auto create enabled parent queue: " + oldQueue + .getQueuePath() + " to leaf queue. Please check " + + " parent queue's configuration " + + CapacitySchedulerConfiguration + .AUTO_CREATE_CHILD_QUEUE_ENABLED + + " is set to true"); + } else if (oldQueue instanceof LeafQueue && newQueue instanceof ParentQueue) { if (oldQueue.getState() == QueueState.STOPPED) { LOG.info("Converting the leaf queue: " + oldQueue.getQueuePath() + " to parent queue."); - } else { - throw new IOException("Can not convert the leaf queue: " - + oldQueue.getQueuePath() + " to parent queue since " - + "it is not yet in stopped state. Current State : " - + oldQueue.getState()); + } else{ + throw new IOException( + "Can not convert the leaf queue: " + oldQueue.getQueuePath() + + " to parent queue since " + + "it is not yet in stopped state. Current State : " + + oldQueue.getState()); } } else if (oldQueue instanceof ParentQueue && newQueue instanceof LeafQueue) { @@ -352,6 +380,7 @@ private void validateQueueHierarchy(Map queues, */ private void updateQueues(Map existingQueues, Map newQueues) { + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); for (Map.Entry e : newQueues.entrySet()) { String queueName = e.getKey(); CSQueue queue = e.getValue(); @@ -363,7 +392,13 @@ private void updateQueues(Map existingQueues, .iterator(); itr.hasNext();) { Map.Entry e = itr.next(); String queueName = e.getKey(); - if (!newQueues.containsKey(queueName)) { + CSQueue existingQueue = e.getValue(); + + //TODO - Handle case when auto create is disabled on parent queues + if (!newQueues.containsKey(queueName) && !( + existingQueue instanceof AutoCreatedLeafQueue && conf + .isAutoCreateChildQueueEnabled( + existingQueue.getParent().getQueuePath()))) { itr.remove(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java new file mode 100644 index 0000000000..ff795e47b2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ManagedParentQueue.java @@ -0,0 +1,158 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerDynamicEditException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; + +/** + * Auto Creation enabled Parent queue. This queue initially does not have any + * children to start with and all child + * leaf queues will be auto created. Currently this does not allow other + * pre-configured leaf or parent queues to + * co-exist along with auto-created leaf queues. The auto creation is limited + * to leaf queues currently. + */ +public class ManagedParentQueue extends AbstractManagedParentQueue { + + private boolean shouldFailAutoCreationWhenGuaranteedCapacityExceeded = false; + + private static final Logger LOG = LoggerFactory.getLogger( + ManagedParentQueue.class); + + public ManagedParentQueue(final CapacitySchedulerContext cs, + final String queueName, final CSQueue parent, final CSQueue old) + throws IOException { + super(cs, queueName, parent, old); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Managed Parent Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); + } + + @Override + public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) + throws IOException { + validate(newlyParsedQueue); + super.reinitialize(newlyParsedQueue, clusterResource); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix( + csContext.getConfiguration()); + this.leafQueueTemplate = initializeLeafQueueConfigs( + leafQueueTemplateConfPrefix).build(); + } + + @Override + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs( + String queuePath) { + + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = + super.initializeLeafQueueConfigs(queuePath); + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(conf); + QueueCapacities queueCapacities = new QueueCapacities(false); + CSQueueUtils.loadUpdateAndCheckCapacities(leafQueueTemplateConfPrefix, + csContext.getConfiguration(), queueCapacities, getQueueCapacities()); + leafQueueTemplate.capacities(queueCapacities); + + shouldFailAutoCreationWhenGuaranteedCapacityExceeded = + conf.getShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded( + getQueuePath()); + + return leafQueueTemplate; + } + + protected void validate(final CSQueue newlyParsedQueue) throws IOException { + // Sanity check + if (!(newlyParsedQueue instanceof ManagedParentQueue) || !newlyParsedQueue + .getQueuePath().equals(getQueuePath())) { + throw new IOException( + "Trying to reinitialize " + getQueuePath() + " from " + + newlyParsedQueue.getQueuePath()); + } + } + + @Override + public void addChildQueue(CSQueue childQueue) + throws SchedulerDynamicEditException { + try { + writeLock.lock(); + + if (childQueue == null || !(childQueue instanceof AutoCreatedLeafQueue)) { + throw new SchedulerDynamicEditException( + "Expected child queue to be an instance of AutoCreatedLeafQueue"); + } + + CapacitySchedulerConfiguration conf = csContext.getConfiguration(); + ManagedParentQueue parentQueue = + (ManagedParentQueue) childQueue.getParent(); + + String leafQueueName = childQueue.getQueueName(); + int maxQueues = conf.getAutoCreatedQueuesMaxChildQueuesLimit( + parentQueue.getQueuePath()); + + if (parentQueue.getChildQueues().size() >= maxQueues) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ".Max Child " + + "Queue limit exceeded which is configured as : " + maxQueues + + " and number of child queues is : " + parentQueue + .getChildQueues().size()); + } + + if (shouldFailAutoCreationWhenGuaranteedCapacityExceeded) { + if (getLeafQueueTemplate().getQueueCapacities().getAbsoluteCapacity() + + parentQueue.sumOfChildAbsCapacities() > parentQueue + .getAbsoluteCapacity()) { + throw new SchedulerDynamicEditException( + "Cannot auto create leaf queue " + leafQueueName + ". Child " + + "queues capacities have reached parent queue : " + + parentQueue.getQueuePath() + " guaranteed capacity"); + } + } + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) childQueue; + super.addChildQueue(leafQueue); + //TODO - refresh policy queue after capacity management is added + + } finally { + writeLock.unlock(); + } + } + + private String getLeafQueueConfigPrefix(CapacitySchedulerConfiguration conf) { + return conf.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()); + } + +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index d61951bb8f..959ca51eb2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1081,17 +1081,4 @@ public void stopQueue() { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } - - protected float sumOfChildCapacities() { - try { - writeLock.lock(); - float ret = 0; - for (CSQueue l : childQueues) { - ret += l.getCapacity(); - } - return ret; - } finally { - writeLock.unlock(); - } - } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 4ab2e9f14d..b7f8aa6996 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -40,6 +40,19 @@ public class PlanQueue extends AbstractManagedParentQueue { public PlanQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); + + StringBuffer queueInfo = new StringBuffer(); + queueInfo.append("Created Plan Queue: ").append(queueName).append( + "]\nwith capacity: [").append(super.getCapacity()).append( + "]\nwith max capacity: [").append(super.getMaximumCapacity()).append( + "\nwith max apps: [").append(leafQueueTemplate.getMaxApps()).append( + "]\nwith max apps per user: [").append( + leafQueueTemplate.getMaxAppsPerUser()).append("]\nwith user limit: [") + .append(leafQueueTemplate.getUserLimit()).append( + "]\nwith user limit factor: [").append( + leafQueueTemplate.getUserLimitFactor()).append("]."); + LOG.info(queueInfo.toString()); } @Override @@ -47,17 +60,21 @@ public void reinitialize(CSQueue newlyParsedQueue, Resource clusterResource) throws IOException { validate(newlyParsedQueue); super.reinitialize(newlyParsedQueue, clusterResource); + this.leafQueueTemplate = initializeLeafQueueConfigs(getQueuePath()).build(); } @Override - protected void initializeLeafQueueConfigs() { - String queuePath = super.getQueuePath(); + protected AutoCreatedLeafQueueTemplate.Builder initializeLeafQueueConfigs + (String queuePath) { + AutoCreatedLeafQueueTemplate.Builder leafQueueTemplate = super + .initializeLeafQueueConfigs + (queuePath); showReservationsAsQueues = csContext.getConfiguration() .getShowReservationAsQueues(queuePath); - super.initializeLeafQueueConfigs(); + return leafQueueTemplate; } - private void validate(final CSQueue newlyParsedQueue) throws IOException { + protected void validate(final CSQueue newlyParsedQueue) throws IOException { // Sanity check if (!(newlyParsedQueue instanceof PlanQueue) || !newlyParsedQueue .getQueuePath().equals(getQueuePath())) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java index 0a8d6fe529..80b7f2fb82 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/AppAddedSchedulerEvent.java @@ -6,9 +6,9 @@ * to you under the Apache License, Version 2.0 (the * "License"); you may not use this file except in compliance * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

* Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. @@ -22,6 +22,8 @@ import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ReservationId; +import org.apache.hadoop.yarn.server.resourcemanager.placement + .ApplicationPlacementContext; public class AppAddedSchedulerEvent extends SchedulerEvent { @@ -31,15 +33,23 @@ public class AppAddedSchedulerEvent extends SchedulerEvent { private final ReservationId reservationID; private final boolean isAppRecovering; private final Priority appPriority; + private final ApplicationPlacementContext placementContext; public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user) { - this(applicationId, queue, user, false, null, Priority.newInstance(0)); + this(applicationId, queue, user, false, null, Priority.newInstance(0), + null); + } + + public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, + String user, ApplicationPlacementContext placementContext) { + this(applicationId, queue, user, false, null, Priority.newInstance(0), + placementContext); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user, ReservationId reservationID, Priority appPriority) { - this(applicationId, queue, user, false, reservationID, appPriority); + this(applicationId, queue, user, false, reservationID, appPriority, null); } public AppAddedSchedulerEvent(String user, @@ -47,12 +57,20 @@ public AppAddedSchedulerEvent(String user, Priority appPriority) { this(submissionContext.getApplicationId(), submissionContext.getQueue(), user, isAppRecovering, submissionContext.getReservationID(), - appPriority); + appPriority, null); + } + + public AppAddedSchedulerEvent(String user, + ApplicationSubmissionContext submissionContext, boolean isAppRecovering, + Priority appPriority, ApplicationPlacementContext placementContext) { + this(submissionContext.getApplicationId(), submissionContext.getQueue(), + user, isAppRecovering, submissionContext.getReservationID(), + appPriority, placementContext); } public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, String user, boolean isAppRecovering, ReservationId reservationID, - Priority appPriority) { + Priority appPriority, ApplicationPlacementContext placementContext) { super(SchedulerEventType.APP_ADDED); this.applicationId = applicationId; this.queue = queue; @@ -60,6 +78,7 @@ public AppAddedSchedulerEvent(ApplicationId applicationId, String queue, this.reservationID = reservationID; this.isAppRecovering = isAppRecovering; this.appPriority = appPriority; + this.placementContext = placementContext; } public ApplicationId getApplicationId() { @@ -85,4 +104,8 @@ public ReservationId getReservationID() { public Priority getApplicatonPriority() { return appPriority; } + + public ApplicationPlacementContext getPlacementContext() { + return placementContext; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 81793217c4..9445fa6efd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -44,7 +44,6 @@ import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.io.DataOutputBuffer; import org.apache.hadoop.security.AccessControlException; -import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.service.Service; import org.apache.hadoop.yarn.MockApps; @@ -70,6 +69,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.ahs.RMApplicationHistoryWriter; import org.apache.hadoop.yarn.server.resourcemanager.metrics.SystemMetricsPublisher; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp; @@ -84,7 +84,6 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.ContainerAllocationExpirer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; import org.apache.hadoop.yarn.server.resourcemanager.timelineservice.RMTimelineCollectorManager; import org.apache.hadoop.yarn.server.security.ApplicationACLsManager; @@ -856,28 +855,32 @@ public void testEscapeApplicationSummary() { Assert.assertTrue(msg.contains("preemptedResources=")); Assert.assertTrue(msg.contains("applicationType=MAPREDUCE")); } - + @Test public void testRMAppSubmitWithQueueChanged() throws Exception { // Setup a PlacementManager returns a new queue PlacementManager placementMgr = mock(PlacementManager.class); - doAnswer(new Answer() { + doAnswer(new Answer() { @Override - public Void answer(InvocationOnMock invocation) throws Throwable { - ApplicationSubmissionContext ctx = - (ApplicationSubmissionContext) invocation.getArguments()[0]; - ctx.setQueue("newQueue"); - return null; + public ApplicationPlacementContext answer(InvocationOnMock invocation) + throws Throwable { + return new ApplicationPlacementContext("newQueue"); } - - }).when(placementMgr).placeApplication(any(ApplicationSubmissionContext.class), - any(String.class)); + + }).when(placementMgr).placeApplication( + any(ApplicationSubmissionContext.class), any(String.class)); rmContext.setQueuePlacementManager(placementMgr); - + asContext.setQueue("oldQueue"); appMonitor.submitApplication(asContext, "test"); + RMApp app = rmContext.getRMApps().get(appId); + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.START); + rmContext.getRMApps().get(appId).handle(event); + event = new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED); + rmContext.getRMApps().get(appId).handle(event); + Assert.assertNotNull("app is null", app); Assert.assertEquals("newQueue", asContext.getQueue()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index 61bc8d9be2..864acc3ae5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -52,14 +52,14 @@ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, String inputQueue, String expectedQueue, boolean overwrite) throws YarnException { Groups groups = new Groups(conf); - UserGroupMappingPlacementRule rule = - new UserGroupMappingPlacementRule(overwrite, Arrays.asList(queueMapping), - groups); - ApplicationSubmissionContext asc = - Records.newRecord(ApplicationSubmissionContext.class); + UserGroupMappingPlacementRule rule = new UserGroupMappingPlacementRule( + overwrite, Arrays.asList(queueMapping), groups); + ApplicationSubmissionContext asc = Records.newRecord( + ApplicationSubmissionContext.class); asc.setQueue(inputQueue); - String queue = rule.getQueueForApp(asc, inputUser); - Assert.assertEquals(expectedQueue, queue); + ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser); + Assert.assertEquals(expectedQueue, + ctx != null ? ctx.getQueue() : inputQueue); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java index cdc67ed60e..cb1f794190 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerUtils.java @@ -803,6 +803,7 @@ public static void waitSchedulerApplicationAttemptStopped( Map> applications, EventHandler handler, String queueName) throws Exception { + ApplicationId appId = ApplicationId.newInstance(System.currentTimeMillis(), 1); AppAddedSchedulerEvent appAddedEvent = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 800d023183..1edb0dab0b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -103,6 +103,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics; @@ -904,7 +905,7 @@ void checkQueueCapacities(CapacityScheduler cs, (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f); } - private void checkQueueCapacity(CSQueue q, float expectedCapacity, + void checkQueueCapacity(CSQueue q, float expectedCapacity, float expectedAbsCapacity, float expectedMaxCapacity, float expectedAbsMaxCapacity) { final float epsilon = 1e-5f; @@ -917,7 +918,7 @@ private void checkQueueCapacity(CSQueue q, float expectedCapacity, q.getAbsoluteMaximumCapacity(), epsilon); } - private CSQueue findQueue(CSQueue root, String queuePath) { + CSQueue findQueue(CSQueue root, String queuePath) { if (root.getQueuePath().equals(queuePath)) { return root; } @@ -1396,7 +1397,6 @@ public void testAddAndRemoveAppFromCapacityScheduler() throws Exception { AbstractYarnScheduler cs = (AbstractYarnScheduler) rm .getResourceScheduler(); - SchedulerApplication app = TestSchedulerUtils.verifyAppAddedAndRemovedFromScheduler( cs.getSchedulerApplications(), cs, "a1"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java new file mode 100644 index 0000000000..7090bc929f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -0,0 +1,794 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueState; +import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.event.AsyncDispatcher; +import org.apache.hadoop.yarn.event.Event; +import org.apache.hadoop.yarn.event.EventHandler; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.factories.RecordFactory; +import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEventType; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ContainerUpdates; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.NMTokenSecretManagerInRM; +import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager; +import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.List; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.LinkedBlockingQueue; +import java.util.concurrent.TimeUnit; + +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; +import static org.junit.Assert.assertTrue; +import static org.junit.Assert.fail; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * Tests for creation and reinitilization of auto created leaf queues + * under a ManagedParentQueue. + */ +public class TestCapacitySchedulerAutoQueueCreation { + + private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private final int GB = 1024; + private final static ContainerUpdates NULL_UPDATE_REQUESTS = + new ContainerUpdates(); + + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; + private static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; + private static final String A1 = A + ".a1"; + private static final String A2 = A + ".a2"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static final String C1 = C + ".c1"; + private static final String C2 = C + ".c2"; + private static final String C3 = C + ".c3"; + private static float A_CAPACITY = 20f; + private static float B_CAPACITY = 40f; + private static float C_CAPACITY = 20f; + private static float D_CAPACITY = 20f; + private static float A1_CAPACITY = 30; + private static float A2_CAPACITY = 70; + private static float B1_CAPACITY = 60f; + private static float B2_CAPACITY = 20f; + private static float B3_CAPACITY = 20f; + private static float C1_CAPACITY = 20f; + private static float C2_CAPACITY = 20f; + + private static String USER = "user_"; + private static String USER0 = USER + 0; + private static String USER2 = USER + 2; + private static String PARENT_QUEUE = "c"; + + private MockRM mockRM = null; + + private CapacityScheduler cs; + + private final TestCapacityScheduler tcs = new TestCapacityScheduler(); + + private static SpyDispatcher dispatcher; + + private static EventHandler rmAppEventEventHandler; + + private static class SpyDispatcher extends AsyncDispatcher { + + private static BlockingQueue eventQueue = + new LinkedBlockingQueue<>(); + + private static class SpyRMAppEventHandler implements EventHandler { + public void handle(Event event) { + eventQueue.add(event); + } + } + + @Override + protected void dispatch(Event event) { + eventQueue.add(event); + } + + @Override + public EventHandler getEventHandler() { + return rmAppEventEventHandler; + } + + void spyOnNextEvent(Event expectedEvent, long timeout) + throws InterruptedException { + + Event event = eventQueue.poll(timeout, TimeUnit.MILLISECONDS); + assertEquals(expectedEvent.getType(), event.getType()); + assertEquals(expectedEvent.getClass(), event.getClass()); + } + } + + @Before + public void setUp() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List queuePlacementRules = new ArrayList<>(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + setupQueueMappings(conf); + + mockRM = new MockRM(conf); + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + + dispatcher = new SpyDispatcher(); + rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); + dispatcher.register(RMAppEventType.class, rmAppEventEventHandler); + cs.updatePlacementRules(); + mockRM.start(); + + cs.start(); + } + + private CapacitySchedulerConfiguration setupQueueMappings( + CapacitySchedulerConfiguration conf) { + + //set queue mapping + List queueMappings = + new ArrayList<>(); + for (int i = 0; i <= 3; i++) { + //Set C as parent queue name for auto queue creation + UserGroupMappingPlacementRule.QueueMapping userQueueMapping = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, + USER + i, getQueueMapping(PARENT_QUEUE, USER + i)); + queueMappings.add(userQueueMapping); + } + + conf.setQueueMappings(queueMappings); + //override with queue mappings + conf.setOverrideWithQueueMappings(true); + return conf; + } + + /** + * @param conf, to be modified + * @return, CS configuration which has C + * as an auto creation enabled parent queue + *

+ * root + * / \ \ \ + * a b c d + * / \ / | \ + * a1 a2 b1 b2 b3 + */ + private CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b", "c", "d" }); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + conf.setCapacity(C, C_CAPACITY); + conf.setCapacity(D, D_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] { "a1", "a2" }); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[] { "b1", "b2", "b3" }); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueTemplateCapacity(C, 50.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + + LOG.info("Setup " + C + " as an auto leaf creation enabled parent queue"); + + conf.setUserLimitFactor(D, 1.0f); + conf.setAutoCreateChildQueueEnabled(D, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueTemplateCapacity(D, 10.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(D, 100.0f); + + LOG.info("Setup " + D + " as an auto leaf creation enabled parent queue"); + + return conf; + } + + @After + public void tearDown() throws Exception { + if (mockRM != null) { + mockRM.stop(); + } + } + + @Test(timeout = 10000) + public void testAutoCreateLeafQueueCreation() throws Exception { + + try { + // submit an app + submitApp(cs, USER0, USER0, PARENT_QUEUE); + + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + assertNotNull(cs.getQueue(USER0)); + + AutoCreatedLeafQueue autoCreatedLeafQueue = + (AutoCreatedLeafQueue) cs.getQueue(USER0); + ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( + PARENT_QUEUE); + assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); + validateCapacities(autoCreatedLeafQueue); + } finally { + cleanupQueue(USER0); + } + } + + @Test + public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception { + + try { + String host = "127.0.0.1"; + RMNode node = MockNodes.newNodeInfo(0, MockNodes.newResource(4 * GB), 1, + host); + cs.handle(new NodeAddedSchedulerEvent(node)); + + // submit an app + + RMApp app = mockRM.submitApp(GB, "test-auto-queue-creation-1", USER0, + null, USER0); + // check preconditions + List appsInC = cs.getAppsInQueue(PARENT_QUEUE); + assertEquals(1, appsInC.size()); + + assertNotNull(cs.getQueue(USER0)); + + AutoCreatedLeafQueue autoCreatedLeafQueue = + (AutoCreatedLeafQueue) cs.getQueue(USER0); + ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue( + PARENT_QUEUE); + assertEquals(parentQueue, autoCreatedLeafQueue.getParent()); + validateCapacities(autoCreatedLeafQueue); + + ApplicationAttemptId appAttemptId = appsInC.get(0); + + Priority priority = TestUtils.createMockPriority(1); + RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory( + null); + ResourceRequest r1 = TestUtils.createResourceRequest(ResourceRequest.ANY, + 1 * GB, 1, true, priority, recordFactory); + + cs.allocate(appAttemptId, Collections.singletonList(r1), + Collections.emptyList(), Collections.singletonList(host), + null, NULL_UPDATE_REQUESTS); + + //And this will result in container assignment for app1 + CapacityScheduler.schedule(cs); + + //change state to draining + autoCreatedLeafQueue.stopQueue(); + + cs.killAllAppsInQueue(USER0); + + mockRM.waitForState(appAttemptId, RMAppAttemptState.KILLED); + + mockRM.waitForState(appAttemptId.getApplicationId(), RMAppState.KILLED); + + //change state to stopped + autoCreatedLeafQueue.stopQueue(); + assertEquals(QueueState.STOPPED, + autoCreatedLeafQueue.getQueueInfo().getQueueState()); + + cs.reinitialize(cs.getConf(), mockRM.getRMContext()); + + AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue( + USER0); + validateCapacities(leafQueue); + + } finally { + cleanupQueue(USER0); + } + } + + @Test + public void testRefreshQueuesWithAutoCreatedLeafQueues() throws Exception { + + MockRM newMockRM = setupSchedulerInstance(); + try { + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + CapacitySchedulerConfiguration conf = newCS.getConfiguration(); + + // Test add one auto created queue dynamically and manually modify + // capacity + ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c"); + AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1", + parentQueue); + newCS.addQueue(c1); + c1.setEntitlement(new QueueEntitlement(C1_CAPACITY / 100, 1f)); + + // Test add another auto created queue and use setEntitlement to modify + // capacity + AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2", + (ManagedParentQueue) newCS.getQueue("c")); + newCS.addQueue(c2); + newCS.setEntitlement("c2", new QueueEntitlement(C2_CAPACITY / 100, 1f)); + + // Verify all allocations match + checkQueueCapacities(newCS, C_CAPACITY, D_CAPACITY); + + // Reinitialize and verify all dynamic queued survived + + conf.setCapacity(A, 20f); + conf.setCapacity(B, 20f); + conf.setCapacity(C, 40f); + conf.setCapacity(D, 20f); + newCS.reinitialize(conf, newMockRM.getRMContext()); + + checkQueueCapacities(newCS, 40f, 20f); + + //chnage parent template configs and reinitialize + conf.setAutoCreatedLeafQueueTemplateCapacity(C, 30.0f); + conf.setAutoCreatedLeafQueueTemplateMaxCapacity(C, 100.0f); + newCS.reinitialize(conf, newMockRM.getRMContext()); + + ManagedParentQueue c = (ManagedParentQueue) newCS.getQueue("c"); + AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", c); + newCS.addQueue(c3); + + AbstractManagedParentQueue.AutoCreatedLeafQueueTemplate + leafQueueTemplate = parentQueue.getLeafQueueTemplate(); + QueueCapacities cap = leafQueueTemplate.getQueueCapacities(); + c3.setEntitlement( + new QueueEntitlement(cap.getCapacity(), cap.getMaximumCapacity())); + newCS.reinitialize(conf, newMockRM.getRMContext()); + + checkQueueCapacities(newCS, 40f, 20f); + } finally { + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } + } + + @Test + public void testConvertAutoCreateDisabledOnManagedParentQueueFails() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + + newConf.setAutoCreateChildQueueEnabled(C, false); + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + } catch (IOException e) { + //expected exception + } finally { + newCS.stop(); + } + } + + @Test + public void testConvertLeafQueueToParentQueueWithAutoCreate() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + newConf.setAutoCreatedLeafQueueTemplateCapacity(A1, A1_CAPACITY / 10); + newConf.setAutoCreateChildQueueEnabled(A1, true); + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + final LeafQueue a1Queue = (LeafQueue) newCS.getQueue("a1"); + a1Queue.stopQueue(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + } finally { + newCS.stop(); + } + } + + @Test + public void testConvertFailsFromParentQueueToManagedParentQueue() + throws Exception { + CapacityScheduler newCS = new CapacityScheduler(); + try { + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(); + setupQueueConfiguration(newConf); + newConf.setAutoCreatedLeafQueueTemplateCapacity(A, A_CAPACITY / 10); + newConf.setAutoCreateChildQueueEnabled(A, true); + + newCS.setConf(new YarnConfiguration()); + newCS.setRMContext(mockRM.getRMContext()); + newCS.init(cs.getConf()); + newCS.start(); + + final ParentQueue a1Queue = (ParentQueue) newCS.getQueue("a"); + a1Queue.stopQueue(); + + newCS.reinitialize(newConf, + new RMContextImpl(null, null, null, null, null, null, + new RMContainerTokenSecretManager(newConf), + new NMTokenSecretManagerInRM(newConf), + new ClientToAMTokenSecretManagerInRM(), null)); + + fail("Expected exception while converting a parent queue to" + + " an auto create enabled parent queue"); + } catch (IOException e) { + //expected exception + } finally { + newCS.stop(); + } + } + + @Test(timeout = 10000) + public void testAutoCreateLeafQueueFailsWithNoQueueMapping() + throws Exception { + + final String INVALID_USER = "invalid_user"; + + // submit an app under a different queue name which does not exist + // and queue mapping does not exist for this user + RMApp app = mockRM.submitApp(GB, "app", INVALID_USER, null, INVALID_USER, + false); + mockRM.drainEvents(); + mockRM.waitForState(app.getApplicationId(), RMAppState.FAILED); + assertEquals(RMAppState.FAILED, app.getState()); + } + + private void validateCapacities(AutoCreatedLeafQueue autoCreatedLeafQueue) { + assertEquals(autoCreatedLeafQueue.getCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getMaximumCapacity(), 0.0f, EPSILON); + assertEquals(autoCreatedLeafQueue.getAbsoluteMaximumCapacity(), 0.0f, + EPSILON); + int maxAppsForAutoCreatedQueues = (int) ( + CapacitySchedulerConfiguration.DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS + * autoCreatedLeafQueue.getParent().getAbsoluteCapacity()); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), + maxAppsForAutoCreatedQueues); + assertEquals(autoCreatedLeafQueue.getMaxApplicationsPerUser(), + (int) (maxAppsForAutoCreatedQueues * (cs.getConfiguration() + .getUserLimitFactor( + autoCreatedLeafQueue.getParent().getQueuePath())))); + } + + private void cleanupQueue(String queueName) throws YarnException { + AutoCreatedLeafQueue queue = (AutoCreatedLeafQueue) cs.getQueue(queueName); + if (queue != null) { + queue.setEntitlement(new QueueEntitlement(0.0f, 0.0f)); + ((ManagedParentQueue) queue.getParent()).removeChildQueue( + queue.getQueueName()); + cs.getCapacitySchedulerQueueManager().removeQueue(queue.getQueueName()); + } else{ + throw new YarnException("Queue does not exist " + queueName); + } + } + + String getQueueMapping(String parentQueue, String leafQueue) { + return parentQueue + DOT + leafQueue; + } + + @Test(timeout = 10000) + public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping() + throws Exception { + + MockRM newMockRM = setupSchedulerInstance(); + try { + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + + //"a" is not auto create enabled + + //dynamic queue mapping + try { + setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a", + CURRENT_USER_MAPPING); + newCS.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage().contains( + "invalid parent queue which does not have auto creation of leaf " + + "queues enabled [" + + "a" + "]")); + } + + //"a" is not auto create enabled and app_user does not exist as a leaf + // queue + //static queue mapping + try { + setupQueueMapping(newCS, "app_user", "INVALID_PARENT_QUEUE", + "app_user"); + newCS.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage() + .contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]")); + } + } finally { + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } + } + + @Test(timeout = 10000) + public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping() + throws Exception { + + MockRM newMockRM = setupSchedulerInstance(); + + try { + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + + setupQueueMapping(newCS, CURRENT_USER_MAPPING, "c", CURRENT_USER_MAPPING); + newCS.updatePlacementRules(); + + try { + setupQueueMapping(newCS, CURRENT_USER_MAPPING, "", + CURRENT_USER_MAPPING); + newCS.updatePlacementRules(); + fail("Expected invalid parent queue mapping failure"); + } catch (IOException e) { + //expected exception + assertTrue(e.getMessage().contains("invalid parent queue []")); + } + } finally { + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } + } + + @Test + public void testParentQueueUpdateInQueueMappingFailsAfterAutoCreation() + throws Exception { + + MockRM newMockRM = setupSchedulerInstance(); + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + + try { + newMockRM.start(); + newCS.start(); + + submitApp(newCS, USER0, USER0, PARENT_QUEUE); + + assertNotNull(newCS.getQueue(USER0)); + + setupQueueMapping(newCS, USER0, "d", USER0); + newCS.updatePlacementRules(); + + RMContext rmContext = mock(RMContext.class); + when(rmContext.getDispatcher()).thenReturn(dispatcher); + newCS.setRMContext(rmContext); + + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, USER0, + USER0, new ApplicationPlacementContext(USER0, "d")); + newCS.handle(addAppEvent); + + RMAppEvent event = new RMAppEvent(appId, RMAppEventType.APP_REJECTED, + "error"); + dispatcher.spyOnNextEvent(event, 10000); + } finally { + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } + } + + @Test + public void testAutoCreationFailsWhenParentCapacityExceeded() + throws IOException, SchedulerDynamicEditException { + MockRM newMockRM = setupSchedulerInstance(); + CapacityScheduler newCS = + (CapacityScheduler) newMockRM.getResourceScheduler(); + + try { + CapacitySchedulerConfiguration conf = newCS.getConfiguration(); + conf.setShouldFailAutoQueueCreationWhenGuaranteedCapacityExceeded(C, + true); + + newCS.reinitialize(conf, newMockRM.getRMContext()); + + // Test add one auto created queue dynamically and manually modify + // capacity + ManagedParentQueue parentQueue = (ManagedParentQueue) newCS.getQueue("c"); + AutoCreatedLeafQueue c1 = new AutoCreatedLeafQueue(newCS, "c1", + parentQueue); + newCS.addQueue(c1); + c1.setEntitlement(new QueueEntitlement(0.5f, 1f)); + + AutoCreatedLeafQueue c2 = new AutoCreatedLeafQueue(newCS, "c2", + parentQueue); + newCS.addQueue(c2); + c2.setEntitlement(new QueueEntitlement(0.5f, 1f)); + + try { + AutoCreatedLeafQueue c3 = new AutoCreatedLeafQueue(newCS, "c3", + parentQueue); + newCS.addQueue(c3); + fail("Expected exception for auto queue creation failure"); + } catch (SchedulerDynamicEditException e) { + //expected exception + } + } finally { + if (newMockRM != null) { + ((CapacityScheduler) newMockRM.getResourceScheduler()).stop(); + newMockRM.stop(); + } + } + } + + private List setupQueueMapping( + CapacityScheduler newCS, String user, String parentQueue, String queue) { + List queueMappings = + new ArrayList<>(); + queueMappings.add(new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, user, + getQueueMapping(parentQueue, queue))); + newCS.getConfiguration().setQueueMappings(queueMappings); + return queueMappings; + } + + private MockRM setupSchedulerInstance() { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + List queuePlacementRules = new ArrayList(); + queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); + conf.setQueuePlacementRules(queuePlacementRules); + + setupQueueMappings(conf); + + MockRM newMockRM = new MockRM(conf); + return newMockRM; + } + + void checkQueueCapacities(CapacityScheduler newCS, float capacityC, + float capacityD) { + CSQueue rootQueue = newCS.getRootQueue(); + CSQueue queueC = tcs.findQueue(rootQueue, C); + CSQueue queueD = tcs.findQueue(rootQueue, D); + CSQueue queueC1 = tcs.findQueue(queueC, C1); + CSQueue queueC2 = tcs.findQueue(queueC, C2); + CSQueue queueC3 = tcs.findQueue(queueC, C3); + + float capC = capacityC / 100.0f; + float capD = capacityD / 100.0f; + + tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f, + (C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); + tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f, + (C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); + + if (queueC3 != null) { + ManagedParentQueue parentQueue = (ManagedParentQueue) queueC; + QueueCapacities cap = + parentQueue.getLeafQueueTemplate().getQueueCapacities(); + tcs.checkQueueCapacity(queueC3, cap.getCapacity(), + (cap.getCapacity()) * capC, 1.0f, 1.0f); + } + } + + ApplicationAttemptId submitApp(CapacityScheduler newCS, String user, + String queue, String parentQueue) { + ApplicationId appId = BuilderUtils.newApplicationId(1, 1); + SchedulerEvent addAppEvent = new AppAddedSchedulerEvent(appId, queue, user, + new ApplicationPlacementContext(queue, parentQueue)); + ApplicationAttemptId appAttemptId = BuilderUtils.newApplicationAttemptId( + appId, 1); + SchedulerEvent addAttemptEvent = new AppAttemptAddedSchedulerEvent( + appAttemptId, false); + newCS.handle(addAppEvent); + newCS.handle(addAttemptEvent); + return appAttemptId; + } +}