diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 0a4a14f063..968d971ce1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -146,6 +146,7 @@ protected enum CapacityConfigType { volatile Priority priority = Priority.newInstance(0); private Map userWeights = new HashMap(); + private int maxParallelApps; public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -390,6 +391,11 @@ protected void setupQueueConfigs(Resource clusterResource, // and queue setting setupMaximumAllocation(configuration); + // Max parallel apps + int queueMaxParallelApps = + configuration.getMaxParallelAppsForQueue(getQueuePath()); + setMaxParallelApps(queueMaxParallelApps); + // initialized the queue state based on previous state, configured state // and its parent state. QueueState previous = getState(); @@ -1431,4 +1437,14 @@ public long getDefaultApplicationLifetime() { public boolean getDefaultAppLifetimeWasSpecifiedInConfig() { return defaultAppLifetimeWasSpecifiedInConfig; } + + public void setMaxParallelApps(int maxParallelApps) { + this.maxParallelApps = maxParallelApps; + } + + public int getMaxParallelApps() { + return maxParallelApps; + } + + abstract int getNumRunnableApps(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java new file mode 100644 index 0000000000..d1a62b4094 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSMaxRunningAppsEnforcer.java @@ -0,0 +1,436 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.Iterator; +import java.util.List; +import java.util.Map; +import java.util.PriorityQueue; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.collect.ArrayListMultimap; +import com.google.common.collect.ListMultimap; + +/** + * Handles tracking and enforcement for user and queue maxRunningApps + * constraints. + */ +public class CSMaxRunningAppsEnforcer { + private static final Logger LOG = LoggerFactory.getLogger( + CSMaxRunningAppsEnforcer.class); + + private final CapacityScheduler scheduler; + + // Tracks the number of running applications by user. + private final Map usersNumRunnableApps; + + private final ListMultimap usersNonRunnableApps; + + public CSMaxRunningAppsEnforcer(CapacityScheduler scheduler) { + this.scheduler = scheduler; + this.usersNumRunnableApps = new HashMap(); + this.usersNonRunnableApps = ArrayListMultimap.create(); + } + + /** + * Checks whether making the application runnable would exceed any + * maxRunningApps limits. Also sets the "runnable" flag on the + * attempt. + * + * @param attempt the app attempt being checked + * @return true if the application is runnable; false otherwise + */ + public boolean checkRunnabilityWithUpdate( + FiCaSchedulerApp attempt) { + boolean attemptCanRun = !exceedUserMaxParallelApps(attempt.getUser()) + && !exceedQueueMaxParallelApps(attempt.getCSLeafQueue()); + + attempt.setRunnable(attemptCanRun); + + return attemptCanRun; + } + + /** + * Checks whether the number of user runnable apps exceeds the limitation. + * + * @param user the user name + * @return true if the number hits the limit; false otherwise + */ + private boolean exceedUserMaxParallelApps(String user) { + Integer userNumRunnable = usersNumRunnableApps.get(user); + if (userNumRunnable == null) { + userNumRunnable = 0; + } + if (userNumRunnable >= getUserMaxParallelApps(user)) { + LOG.info("Maximum runnable apps exceeded for user {}", user); + return true; + } + + return false; + } + + /** + * Recursively checks whether the number of queue runnable apps exceeds the + * limitation. + * + * @param queue the current queue + * @return true if the number hits the limit; false otherwise + */ + private boolean exceedQueueMaxParallelApps(AbstractCSQueue queue) { + // Check queue and all parent queues + while (queue != null) { + if (queue.getNumRunnableApps() >= queue.getMaxParallelApps()) { + LOG.info("Maximum runnable apps exceeded for queue {}", + queue.getQueuePath()); + return true; + } + queue = (AbstractCSQueue) queue.getParent(); + } + + return false; + } + + public void trackApp(FiCaSchedulerApp app) { + if (app.isRunnable()) { + trackRunnableApp(app); + } else { + trackNonRunnableApp(app); + } + } + /** + * Tracks the given new runnable app for purposes of maintaining max running + * app limits. + */ + private void trackRunnableApp(FiCaSchedulerApp app) { + String user = app.getUser(); + AbstractCSQueue queue = (AbstractCSQueue) app.getQueue(); + // Increment running counts for all parent queues + ParentQueue parent = (ParentQueue) queue.getParent(); + while (parent != null) { + parent.incrementRunnableApps(); + parent = (ParentQueue) parent.getParent(); + } + + Integer userNumRunnable = usersNumRunnableApps.get(user); + usersNumRunnableApps.put(user, (userNumRunnable == null ? 0 + : userNumRunnable) + 1); + } + + /** + * Tracks the given new non runnable app so that it can be made runnable when + * it would not violate max running app limits. + */ + private void trackNonRunnableApp(FiCaSchedulerApp app) { + String user = app.getUser(); + usersNonRunnableApps.put(user, app); + } + + /** + * This is called after reloading the allocation configuration when the + * scheduler is reinitialized + * + * Checks to see whether any non-runnable applications become runnable + * now that the max running apps of given queue has been changed + * + * Runs in O(n) where n is the number of apps that are non-runnable and in + * the queues that went from having no slack to having slack. + */ + + public void updateRunnabilityOnReload() { + ParentQueue rootQueue = (ParentQueue) scheduler.getRootQueue(); + List> appsNowMaybeRunnable = + new ArrayList>(); + + gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable); + + updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE); + } + + /** + * Checks to see whether any other applications runnable now that the given + * application has been removed from the given queue. And makes them so. + * + * Runs in O(n log(n)) where n is the number of queues that are under the + * highest queue that went from having no slack to having slack. + */ + public void updateRunnabilityOnAppRemoval(FiCaSchedulerApp app) { + // childqueueX might have no pending apps itself, but if a queue higher up + // in the hierarchy parentqueueY has a maxRunningApps set, an app completion + // in childqueueX could allow an app in some other distant child of + // parentqueueY to become runnable. + // An app removal will only possibly allow another app to become runnable if + // the queue was already at its max before the removal. + // Thus we find the ancestor queue highest in the tree for which the app + // that was at its maxRunningApps before the removal. + LeafQueue queue = app.getCSLeafQueue(); + AbstractCSQueue highestQueueWithAppsNowRunnable = + (queue.getNumRunnableApps() == queue.getMaxParallelApps() - 1) + ? queue : null; + + ParentQueue parent = (ParentQueue) queue.getParent(); + while (parent != null) { + if (parent.getNumRunnableApps() == parent.getMaxParallelApps() - 1) { + highestQueueWithAppsNowRunnable = parent; + } + parent = (ParentQueue) parent.getParent(); + } + + List> appsNowMaybeRunnable = + new ArrayList>(); + + // Compile lists of apps which may now be runnable + // We gather lists instead of building a set of all non-runnable apps so + // that this whole operation can be O(number of queues) instead of + // O(number of apps) + if (highestQueueWithAppsNowRunnable != null) { + gatherPossiblyRunnableAppLists(highestQueueWithAppsNowRunnable, + appsNowMaybeRunnable); + } + String user = app.getUser(); + Integer userNumRunning = usersNumRunnableApps.get(user); + if (userNumRunning == null) { + userNumRunning = 0; + } + if (userNumRunning == getUserMaxParallelApps(user) - 1) { + List userWaitingApps = usersNonRunnableApps.get(user); + if (userWaitingApps != null) { + appsNowMaybeRunnable.add(userWaitingApps); + } + } + + updateAppsRunnability(appsNowMaybeRunnable, + appsNowMaybeRunnable.size()); + } + + /** + * Checks to see whether applications are runnable now by iterating + * through each one of them and check if the queue and user have slack. + * + * if we know how many apps can be runnable, there is no need to iterate + * through all apps, maxRunnableApps is used to break out of the iteration. + */ + private void updateAppsRunnability(List> + appsNowMaybeRunnable, int maxRunnableApps) { + // Scan through and check whether this means that any apps are now runnable + Iterator iter = new MultiListStartTimeIterator( + appsNowMaybeRunnable); + FiCaSchedulerApp prev = null; + List noLongerPendingApps = new ArrayList<>(); + while (iter.hasNext()) { + FiCaSchedulerApp next = iter.next(); + if (next == prev) { + continue; + } + + if (checkRunnabilityWithUpdate(next)) { + LeafQueue nextQueue = next.getCSLeafQueue(); + LOG.info("{} is now runnable in {}", + next.getApplicationAttemptId(), nextQueue); + trackRunnableApp(next); + FiCaSchedulerApp appSched = next; + nextQueue.submitApplicationAttempt(next, next.getUser()); + noLongerPendingApps.add(appSched); + + if (noLongerPendingApps.size() >= maxRunnableApps) { + break; + } + } + + prev = next; + } + + // We remove the apps from their pending lists afterwards so that we don't + // pull them out from under the iterator. If they are not in these lists + // in the first place, there is a bug. + for (FiCaSchedulerApp appSched : noLongerPendingApps) { + if (!(appSched.getCSLeafQueue().removeNonRunnableApp(appSched))) { + LOG.error("Can't make app runnable that does not already exist in queue" + + " as non-runnable: {}. This should never happen.", + appSched.getApplicationAttemptId()); + } + + if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { + LOG.error("Waiting app {} expected to be in " + + "usersNonRunnableApps, but was not. This should never happen.", + appSched.getApplicationAttemptId()); + } + } + } + + public void untrackApp(FiCaSchedulerApp app) { + if (app.isRunnable()) { + untrackRunnableApp(app); + } else { + untrackNonRunnableApp(app); + } + } + + /** + * Updates the relevant tracking variables after a runnable app with the given + * queue and user has been removed. + */ + private void untrackRunnableApp(FiCaSchedulerApp app) { + // Update usersRunnableApps + String user = app.getUser(); + int newUserNumRunning = usersNumRunnableApps.get(user) - 1; + if (newUserNumRunning == 0) { + usersNumRunnableApps.remove(user); + } else { + usersNumRunnableApps.put(user, newUserNumRunning); + } + + // Update runnable app bookkeeping for queues + AbstractCSQueue queue = (AbstractCSQueue) app.getQueue(); + ParentQueue parent = (ParentQueue) queue.getParent(); + while (parent != null) { + parent.decrementRunnableApps(); + parent = (ParentQueue) parent.getParent(); + } + } + + /** + * Stops tracking the given non-runnable app. + */ + private void untrackNonRunnableApp(FiCaSchedulerApp app) { + usersNonRunnableApps.remove(app.getUser(), app); + } + + /** + * Traverses the queue hierarchy under the given queue to gather all lists + * of non-runnable applications. + */ + private void gatherPossiblyRunnableAppLists(AbstractCSQueue queue, + List> appLists) { + if (queue.getNumRunnableApps() < queue.getMaxParallelApps()) { + if (queue instanceof LeafQueue) { + appLists.add( + ((LeafQueue)queue).getCopyOfNonRunnableAppSchedulables()); + } else { + for (CSQueue child : queue.getChildQueues()) { + gatherPossiblyRunnableAppLists((AbstractCSQueue) child, appLists); + } + } + } + } + + private int getUserMaxParallelApps(String user) { + CapacitySchedulerConfiguration conf = scheduler.getConfiguration(); + if (conf == null) { + return Integer.MAX_VALUE; + } + + int userMaxParallelApps = conf.getMaxParallelAppsForUser(user); + + return userMaxParallelApps; + } + + /** + * Takes a list of lists, each of which is ordered by start time, and returns + * their elements in order of start time. + * + * We maintain positions in each of the lists. Each next() call advances + * the position in one of the lists. We maintain a heap that orders lists + * by the start time of the app in the current position in that list. + * This allows us to pick which list to advance in O(log(num lists)) instead + * of O(num lists) time. + */ + static class MultiListStartTimeIterator implements + Iterator { + + private List[] appLists; + private int[] curPositionsInAppLists; + private PriorityQueue appListsByCurStartTime; + + @SuppressWarnings("unchecked") + MultiListStartTimeIterator(List> appListList) { + appLists = appListList.toArray(new List[appListList.size()]); + curPositionsInAppLists = new int[appLists.length]; + appListsByCurStartTime = new PriorityQueue(); + for (int i = 0; i < appLists.length; i++) { + long time = appLists[i].isEmpty() ? Long.MAX_VALUE : appLists[i].get(0) + .getStartTime(); + appListsByCurStartTime.add(new IndexAndTime(i, time)); + } + } + + @Override + public boolean hasNext() { + return !appListsByCurStartTime.isEmpty() + && appListsByCurStartTime.peek().time != Long.MAX_VALUE; + } + + @Override + public FiCaSchedulerApp next() { + IndexAndTime indexAndTime = appListsByCurStartTime.remove(); + int nextListIndex = indexAndTime.index; + FiCaSchedulerApp next = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]); + curPositionsInAppLists[nextListIndex]++; + + if (curPositionsInAppLists[nextListIndex] < + appLists[nextListIndex].size()) { + indexAndTime.time = appLists[nextListIndex] + .get(curPositionsInAppLists[nextListIndex]).getStartTime(); + } else { + indexAndTime.time = Long.MAX_VALUE; + } + appListsByCurStartTime.add(indexAndTime); + + return next; + } + + @Override + public void remove() { + throw new UnsupportedOperationException("Remove not supported"); + } + + private static class IndexAndTime implements Comparable { + private int index; + private long time; + + IndexAndTime(int index, long time) { + this.index = index; + this.time = time; + } + + @Override + public int compareTo(IndexAndTime o) { + return time < o.time ? -1 : (time > o.time ? 1 : 0); + } + + @Override + public boolean equals(Object o) { + if (!(o instanceof IndexAndTime)) { + return false; + } + IndexAndTime other = (IndexAndTime)o; + return other.time == time; + } + + @Override + public int hashCode() { + return (int)time; + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a6aa82443c..bd2acd7611 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -242,8 +242,11 @@ public Configuration getConf() { private static final long DEFAULT_ASYNC_SCHEDULER_INTERVAL = 5; private long asyncMaxPendingBacklogs; + private CSMaxRunningAppsEnforcer maxRunningEnforcer; + public CapacityScheduler() { super(CapacityScheduler.class.getName()); + this.maxRunningEnforcer = new CSMaxRunningAppsEnforcer(this); } @Override @@ -483,6 +486,7 @@ public void reinitialize(Configuration newConf, RMContext rmContext, super.reinitialize(newConf, rmContext); } + maxRunningEnforcer.updateRunnabilityOnReload(); } finally { writeLock.unlock(); } @@ -1083,6 +1087,9 @@ private void addApplicationAttempt( // SchedulerApplication#setCurrentAppAttempt. attempt.setPriority(application.getPriority()); + maxRunningEnforcer.checkRunnabilityWithUpdate(attempt); + maxRunningEnforcer.trackApp(attempt); + queue.submitApplicationAttempt(attempt, application.getUser()); LOG.info("Added Application Attempt " + applicationAttemptId + " to scheduler from user " + application.getUser() + " in queue " @@ -1176,8 +1183,13 @@ private void doneApplicationAttempt( LOG.error( "Cannot finish application " + "from non-leaf queue: " + csQueue.getQueuePath()); - } else{ + } else { csQueue.finishApplicationAttempt(attempt, csQueue.getQueuePath()); + + maxRunningEnforcer.untrackApp(attempt); + if (attempt.isRunnable()) { + maxRunningEnforcer.updateRunnabilityOnAppRemoval(attempt); + } } } finally { writeLock.unlock(); @@ -3253,4 +3265,9 @@ public boolean isMultiNodePlacementEnabled() { public int getNumAsyncSchedulerThreads() { return asyncSchedulerThreads == null ? 0 : asyncSchedulerThreads.size(); } + + @VisibleForTesting + public void setMaxRunningAppsEnforcer(CSMaxRunningAppsEnforcer enforcer) { + this.maxRunningEnforcer = enforcer; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 496dd0b290..3bebb44a6f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -378,6 +378,10 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur public static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE); + public static final String MAX_PARALLEL_APPLICATIONS = "max-parallel-apps"; + + public static final int DEFAULT_MAX_PARALLEL_APPLICATIONS = Integer.MAX_VALUE; + /** * Different resource types supported. */ @@ -412,7 +416,11 @@ static String getQueueOrderingPolicyPrefix(String queue) { String queueName = PREFIX + queue + DOT + ORDERING_POLICY + DOT; return queueName; } - + + static String getUserPrefix(String user) { + return PREFIX + "user." + user + DOT; + } + private String getNodeLabelPrefix(String queue, String label) { if (label.equals(CommonNodeLabelsManager.NO_LABEL)) { return getQueuePrefix(queue); @@ -1392,6 +1400,31 @@ public boolean shouldAppFailFast(Configuration conf) { return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST); } + public Integer getMaxParallelAppsForQueue(String queue) { + int defaultMaxParallelAppsForQueue = + getInt(PREFIX + MAX_PARALLEL_APPLICATIONS, + DEFAULT_MAX_PARALLEL_APPLICATIONS); + + String maxParallelAppsForQueue = get(getQueuePrefix(queue) + + MAX_PARALLEL_APPLICATIONS); + + return (maxParallelAppsForQueue != null) ? + Integer.parseInt(maxParallelAppsForQueue) + : defaultMaxParallelAppsForQueue; + } + + public Integer getMaxParallelAppsForUser(String user) { + int defaultMaxParallelAppsForUser = + getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS, + DEFAULT_MAX_PARALLEL_APPLICATIONS); + String maxParallelAppsForUser = get(getUserPrefix(user) + + MAX_PARALLEL_APPLICATIONS); + + return (maxParallelAppsForUser != null) ? + Integer.parseInt(maxParallelAppsForUser) + : defaultMaxParallelAppsForUser; + } + private static final String PREEMPTION_CONFIG_PREFIX = "yarn.resourcemanager.monitor.capacity.preemption."; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9f0caf291e..4d83538c98 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -129,6 +129,9 @@ public class LeafQueue extends AbstractCSQueue { List priorityAcls = new ArrayList(); + private final List runnableApps = new ArrayList<>(); + private final List nonRunnableApps = new ArrayList<>(); + @SuppressWarnings({ "unchecked", "rawtypes" }) public LeafQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { @@ -159,6 +162,7 @@ protected void setupQueueConfigs(Resource clusterResource) setupQueueConfigs(clusterResource, csContext.getConfiguration()); } + @SuppressWarnings("checkstyle:nowhitespaceafter") protected void setupQueueConfigs(Resource clusterResource, CapacitySchedulerConfiguration conf) throws IOException { @@ -289,7 +293,9 @@ protected void setupQueueConfigs(Resource clusterResource, + " (int)(configuredMaximumSystemApplications * absoluteCapacity)]" + "\n" + "maxApplicationsPerUser = " + maxApplicationsPerUser + " [= (int)(maxApplications * (userLimit / 100.0f) * " - + "userLimitFactor) ]" + "\n" + "usedCapacity = " + + "userLimitFactor) ]" + "\n" + + "maxParallelApps = " + getMaxParallelApps() + "\n" + + "usedCapacity = " + + queueCapacities.getUsedCapacity() + " [= usedResourcesMemory / " + "(clusterResourceMemory * absoluteCapacity)]" + "\n" + "absoluteUsedCapacity = " + absoluteUsedCapacity @@ -386,7 +392,8 @@ void setUserLimitFactor(float userLimitFactor) { public int getNumApplications() { readLock.lock(); try { - return getNumPendingApplications() + getNumActiveApplications(); + return getNumPendingApplications() + getNumActiveApplications() + + getNumNonRunnableApps(); } finally { readLock.unlock(); } @@ -887,16 +894,28 @@ protected void activateApplications() { writeLock.unlock(); } } - + private void addApplicationAttempt(FiCaSchedulerApp application, User user) { writeLock.lock(); try { + applicationAttemptMap.put(application.getApplicationAttemptId(), + application); + + if (application.isRunnable()) { + runnableApps.add(application); + LOG.debug("Adding runnable application: {}", + application.getApplicationAttemptId()); + } else { + nonRunnableApps.add(application); + LOG.info("Application attempt {} is not runnable," + + " parallel limit reached", application.getApplicationAttemptId()); + return; + } + // Accept user.submitApplication(); getPendingAppsOrderingPolicy().addSchedulableEntity(application); - applicationAttemptMap.put(application.getApplicationAttemptId(), - application); // Activate applications if (Resources.greaterThan(resourceCalculator, lastClusterResource, @@ -917,7 +936,9 @@ private void addApplicationAttempt(FiCaSchedulerApp application, .getPendingApplications() + " #user-active-applications: " + user .getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + " #queue-active-applications: " - + getNumActiveApplications()); + + getNumActiveApplications() + + " #queue-nonrunnable-applications: " + + getNumNonRunnableApps()); } finally { writeLock.unlock(); } @@ -950,6 +971,15 @@ private void removeApplicationAttempt( // which is caused by wrong invoking order, will fix UT separately User user = usersManager.getUserAndAddIfAbsent(userName); + boolean runnable = runnableApps.remove(application); + if (!runnable) { + // removeNonRunnableApp acquires the write lock again, which is fine + if (!removeNonRunnableApp(application)) { + LOG.error("Given app to remove " + application + + " does not exist in queue " + getQueuePath()); + } + } + String partitionName = application.getAppAMNodePartitionName(); boolean wasActive = orderingPolicy.removeSchedulableEntity(application); if (!wasActive) { @@ -2229,4 +2259,43 @@ private void updateQueuePreemptionMetrics(RMContainer rmc) { usedSeconds); metrics.updatePreemptedForCustomResources(containerResource); } + + @Override + int getNumRunnableApps() { + readLock.lock(); + try { + return runnableApps.size(); + } finally { + readLock.unlock(); + } + } + + int getNumNonRunnableApps() { + readLock.lock(); + try { + return nonRunnableApps.size(); + } finally { + readLock.unlock(); + } + } + + boolean removeNonRunnableApp(FiCaSchedulerApp app) { + writeLock.lock(); + try { + return nonRunnableApps.remove(app); + } finally { + writeLock.unlock(); + } + } + + List getCopyOfNonRunnableAppSchedulables() { + List appsToReturn = new ArrayList<>(); + readLock.lock(); + try { + appsToReturn.addAll(nonRunnableApps); + } finally { + readLock.unlock(); + } + return appsToReturn; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 95f5468eba..bbb80ba733 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -93,6 +93,8 @@ public class ParentQueue extends AbstractCSQueue { private long lastSkipQueueDebugLoggingTimestamp = -1; + private int runnableApps; + public ParentQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { super(cs, queueName, parent, old); @@ -1383,4 +1385,32 @@ public void stopQueue() { public QueueOrderingPolicy getQueueOrderingPolicy() { return queueOrderingPolicy; } + + @Override + int getNumRunnableApps() { + readLock.lock(); + try { + return runnableApps; + } finally { + readLock.unlock(); + } + } + + void incrementRunnableApps() { + writeLock.lock(); + try { + runnableApps++; + } finally { + writeLock.unlock(); + } + } + + void decrementRunnableApps() { + writeLock.lock(); + try { + runnableApps--; + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 8f6fb63887..cf6ffd9823 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -112,6 +112,8 @@ public class FiCaSchedulerApp extends SchedulerApplicationAttempt { private AbstractContainerAllocator containerAllocator; + private boolean runnable; + /** * to hold the message if its app doesn't not get container from a node */ @@ -139,6 +141,7 @@ public FiCaSchedulerApp(ApplicationAttemptId applicationAttemptId, RMContext rmContext, Priority appPriority, boolean isAttemptRecovering, ActivitiesManager activitiesManager) { super(applicationAttemptId, user, queue, abstractUsersManager, rmContext); + this.runnable = true; RMApp rmApp = rmContext.getRMApps().get(getApplicationId()); @@ -1219,4 +1222,22 @@ public boolean moveReservation(RMContainer reservedContainer, writeLock.unlock(); } } + + public void setRunnable(boolean runnable) { + writeLock.lock(); + try { + this.runnable = runnable; + } finally { + writeLock.unlock(); + } + } + + public boolean isRunnable() { + readLock.lock(); + try { + return runnable; + } finally { + readLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java index ff5738c03a..389dd62e4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/reservation/TestReservationSystem.java @@ -26,6 +26,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSMaxRunningAppsEnforcer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; @@ -175,6 +176,9 @@ private CapacityScheduler initializeCapacityScheduler() { CapacityScheduler cs = Mockito.spy(new CapacityScheduler()); cs.setConf(conf); + CSMaxRunningAppsEnforcer enforcer = + Mockito.mock(CSMaxRunningAppsEnforcer.class); + cs.setMaxRunningAppsEnforcer(enforcer); mockRMContext = ReservationSystemTestUtil.createRMContext(conf); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java index bad943c0b5..93d8d5a7ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationLimits.java @@ -184,6 +184,7 @@ private FiCaSchedulerApp getMockApplication(int appId, String user, doReturn(amResource).when(application).getAMResource( CommonNodeLabelsManager.NO_LABEL); when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))).thenCallRealMethod(); + when(application.isRunnable()).thenReturn(true); return application; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java new file mode 100644 index 0000000000..e3c05a1b7c --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCSMaxRunningAppsEnforcer.java @@ -0,0 +1,278 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.Iterator; +import java.util.List; +import java.util.concurrent.ConcurrentHashMap; + +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.activities.ActivitiesManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.security.AppPriorityACLsManager; +import org.apache.hadoop.yarn.util.ControlledClock; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.junit.Before; +import org.junit.Test; + +public class TestCSMaxRunningAppsEnforcer { + private CapacitySchedulerQueueManager queueManager; + private CSMaxRunningAppsEnforcer maxAppsEnforcer; + private int appNum; + private ControlledClock clock; + private RMContext rmContext; + private CapacityScheduler scheduler; + private ActivitiesManager activitiesManager; + private CapacitySchedulerConfiguration csConfig; + + @Before + public void setup() throws IOException { + csConfig = new CapacitySchedulerConfiguration(); + rmContext = mock(RMContext.class); + when(rmContext.getYarnConfiguration()).thenReturn(csConfig); + when(rmContext.getRMApps()).thenReturn(new ConcurrentHashMap<>()); + clock = new ControlledClock(); + scheduler = mock(CapacityScheduler.class); + when(rmContext.getScheduler()).thenReturn(scheduler); + when(scheduler.getConf()).thenReturn(csConfig); + when(scheduler.getConfig()).thenReturn(csConfig); + when(scheduler.getConfiguration()).thenReturn(csConfig); + when(scheduler.getResourceCalculator()).thenReturn( + new DefaultResourceCalculator()); + when(scheduler.getRMContext()).thenReturn(rmContext); + when(scheduler.getClusterResource()) + .thenReturn(Resource.newInstance(16384, 8)); + when(scheduler.getMinimumAllocation()) + .thenReturn(Resource.newInstance(1024, 1)); + when(scheduler.getMinimumResourceCapability()) + .thenReturn(Resource.newInstance(1024, 1)); + activitiesManager = mock(ActivitiesManager.class); + maxAppsEnforcer = new CSMaxRunningAppsEnforcer(scheduler); + appNum = 0; + setupQueues(csConfig); + RMNodeLabelsManager labelManager = mock(RMNodeLabelsManager.class); + AppPriorityACLsManager appPriorityACLManager = + mock(AppPriorityACLsManager.class); + when(rmContext.getNodeLabelManager()).thenReturn(labelManager); + when(labelManager.getResourceByLabel(anyString(), any(Resource.class))) + .thenReturn(Resource.newInstance(16384, 8)); + queueManager = new CapacitySchedulerQueueManager(csConfig, labelManager, + appPriorityACLManager); + queueManager.setCapacitySchedulerContext(scheduler); + queueManager.initializeQueues(csConfig); + } + + private void setupQueues(CapacitySchedulerConfiguration config) { + config.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"queue1", "queue2"}); + config.setQueues("root.queue1", new String[] {"subqueue1", "subqueue2"}); + config.setQueues("root.queue1.subqueue1", new String[] {"leaf1"}); + config.setQueues("root.queue1.subqueue2", new String[] {"leaf2"}); + config.setFloat(PREFIX + "root.capacity", 100.0f); + config.setFloat(PREFIX + "root.queue1.capacity", 50.0f); + config.setFloat(PREFIX + "root.queue2.capacity", 50.0f); + config.setFloat(PREFIX + "root.queue1.subqueue1.capacity", 50.0f); + config.setFloat(PREFIX + "root.queue1.subqueue2.capacity", 50.0f); + config.setFloat(PREFIX + "root.queue1.subqueue1.leaf1.capacity", 100.0f); + config.setFloat(PREFIX + "root.queue1.subqueue2.leaf2.capacity", 100.0f); + } + + private FiCaSchedulerApp addApp(LeafQueue queue, String user) { + ApplicationId appId = ApplicationId.newInstance(0, appNum++); + ApplicationAttemptId attId = ApplicationAttemptId.newInstance(appId, 0); + + FiCaSchedulerApp attempt = new FiCaSchedulerApp(attId, + user, queue, queue.getAbstractUsersManager(), + rmContext, Priority.newInstance(0), false, + activitiesManager) { + + private final long startTime = clock.getTime(); + + @Override + public long getStartTime() { + return startTime; + } + }; + + maxAppsEnforcer.checkRunnabilityWithUpdate(attempt); + maxAppsEnforcer.trackApp(attempt); + + queue.submitApplicationAttempt(attempt, attempt.getUser()); + + return attempt; + } + + private void removeApp(FiCaSchedulerApp attempt) { + LeafQueue queue = attempt.getCSLeafQueue(); + queue.finishApplicationAttempt(attempt, queue.getQueuePath()); + maxAppsEnforcer.untrackApp(attempt); + maxAppsEnforcer.updateRunnabilityOnAppRemoval(attempt); + } + + @Test + public void testRemoveDoesNotEnableAnyApp() { + ParentQueue root = + (ParentQueue) queueManager.getRootQueue(); + LeafQueue leaf1 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue1.leaf1"); + LeafQueue leaf2 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue2.leaf2"); + root.setMaxParallelApps(2); + leaf1.setMaxParallelApps(1); + leaf2.setMaxParallelApps(1); + + FiCaSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + + removeApp(app1); + assertEquals(0, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + } + + @Test + public void testRemoveEnablesAppOnCousinQueue() { + LeafQueue leaf1 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue1.leaf1"); + LeafQueue leaf2 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue2.leaf2"); + ParentQueue queue1 = (ParentQueue) queueManager + .getQueueByFullName("root.queue1"); + queue1.setMaxParallelApps(2); + + FiCaSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + + removeApp(app1); + assertEquals(0, leaf1.getNumRunnableApps()); + assertEquals(2, leaf2.getNumRunnableApps()); + assertEquals(0, leaf2.getNumNonRunnableApps()); + } + + @Test + public void testRemoveEnablesOneByQueueOneByUser() { + LeafQueue leaf1 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue1.leaf1"); + LeafQueue leaf2 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue2.leaf2"); + leaf1.setMaxParallelApps(2); + //userMaxApps.put("user1", 1); + csConfig.setInt(PREFIX + "user.user1.max-parallel-apps", 1); + + FiCaSchedulerApp app1 = addApp(leaf1, "user1"); + addApp(leaf1, "user2"); + addApp(leaf1, "user3"); + addApp(leaf2, "user1"); + assertEquals(2, leaf1.getNumRunnableApps()); + assertEquals(1, leaf1.getNumNonRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + + removeApp(app1); + assertEquals(2, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(0, leaf1.getNumNonRunnableApps()); + assertEquals(0, leaf2.getNumNonRunnableApps()); + } + + @Test + public void testRemoveEnablingOrderedByStartTime() { + LeafQueue leaf1 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue1.leaf1"); + LeafQueue leaf2 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue2.leaf2"); + ParentQueue queue1 = (ParentQueue) queueManager + .getQueueByFullName("root.queue1"); + queue1.setMaxParallelApps(2); + FiCaSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + clock.tickSec(20); + addApp(leaf1, "user"); + assertEquals(1, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(1, leaf1.getNumNonRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + removeApp(app1); + assertEquals(0, leaf1.getNumRunnableApps()); + assertEquals(2, leaf2.getNumRunnableApps()); + assertEquals(0, leaf2.getNumNonRunnableApps()); + } + + @Test + public void testMultipleAppsWaitingOnCousinQueue() { + LeafQueue leaf1 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue1.leaf1"); + LeafQueue leaf2 = (LeafQueue) queueManager + .getQueueByFullName("root.queue1.subqueue2.leaf2"); + ParentQueue queue1 = (ParentQueue) queueManager + .getQueueByFullName("root.queue1"); + queue1.setMaxParallelApps(2); + FiCaSchedulerApp app1 = addApp(leaf1, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + addApp(leaf2, "user"); + assertEquals(1, leaf1.getNumRunnableApps()); + assertEquals(1, leaf2.getNumRunnableApps()); + assertEquals(2, leaf2.getNumNonRunnableApps()); + removeApp(app1); + assertEquals(0, leaf1.getNumRunnableApps()); + assertEquals(2, leaf2.getNumRunnableApps()); + assertEquals(1, leaf2.getNumNonRunnableApps()); + } + + @Test + public void testMultiListStartTimeIteratorEmptyAppLists() { + List> lists = + new ArrayList>(); + lists.add(Arrays.asList(mockAppAttempt(1))); + lists.add(Arrays.asList(mockAppAttempt(2))); + Iterator iter = + new CSMaxRunningAppsEnforcer.MultiListStartTimeIterator(lists); + assertEquals(1, iter.next().getStartTime()); + assertEquals(2, iter.next().getStartTime()); + } + + private FiCaSchedulerApp mockAppAttempt(long startTime) { + FiCaSchedulerApp schedApp = mock(FiCaSchedulerApp.class); + when(schedApp.getStartTime()).thenReturn(startTime); + return schedApp; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java new file mode 100644 index 0000000000..d2e3278b0d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerMaxParallelApps.java @@ -0,0 +1,312 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.util.Collections; +import java.util.List; +import java.util.Set; + +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsRequest; +import org.apache.hadoop.yarn.api.protocolrecords.GetApplicationsResponse; +import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; +import org.apache.hadoop.yarn.api.records.ApplicationReport; +import org.apache.hadoop.yarn.api.records.ContainerState; +import org.apache.hadoop.yarn.api.records.YarnApplicationState; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.MockAM; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; +import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; +import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; +import org.junit.After; +import org.junit.Before; +import org.junit.Test; + +import com.google.common.collect.Sets; + +public class TestCapacitySchedulerMaxParallelApps { + private CapacitySchedulerConfiguration conf; + private MockRM rm; + private MockNM nm1; + + private RMApp app1; + private MockAM am1; + private RMApp app2; + private MockAM am2; + private RMApp app3; + private RMAppAttempt attempt3; + private RMApp app4; + private RMAppAttempt attempt4; + + private ParentQueue rootQueue; + private LeafQueue defaultQueue; + + @Before + public void setUp() { + CapacitySchedulerConfiguration config = + new CapacitySchedulerConfiguration(); + config.set(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, + DominantResourceCalculator.class.getName()); + + conf = new CapacitySchedulerConfiguration(config); + } + + @After + public void after() { + if (rm != null) { + rm.stop(); + } + } + + @Test(timeout = 30000) + public void testMaxParallelAppsExceedsQueueSetting() throws Exception { + conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2); + executeCommonStepsAndChecks(); + testWhenSettingsExceeded(); + } + + @Test(timeout = 30000) + public void testMaxParallelAppsExceedsDefaultQueueSetting() + throws Exception { + conf.setInt("yarn.scheduler.capacity.max-parallel-apps", 2); + executeCommonStepsAndChecks(); + testWhenSettingsExceeded(); + } + + @Test(timeout = 30000) + public void testMaxParallelAppsExceedsUserSetting() throws Exception { + conf.setInt("yarn.scheduler.capacity.user.testuser.max-parallel-apps", 2); + executeCommonStepsAndChecks(); + testWhenSettingsExceeded(); + } + + @Test(timeout = 30000) + public void testMaxParallelAppsExceedsDefaultUserSetting() throws Exception { + conf.setInt("yarn.scheduler.capacity.user.max-parallel-apps", 2); + executeCommonStepsAndChecks(); + testWhenSettingsExceeded(); + } + + @Test(timeout = 30000) + public void testMaxParallelAppsWhenReloadingConfig() throws Exception { + conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2); + + executeCommonStepsAndChecks(); + + RMContext rmContext = rm.getRMContext(); + // Disable parallel apps setting + max out AM percent + conf.unset("yarn.scheduler.capacity.root.default.max-parallel-apps"); + conf.setFloat(PREFIX + "maximum-am-resource-percent", 1.0f); + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + cs.reinitialize(conf, rmContext); + + // Both app #3 and app #4 should transition to RUNNABLE + launchAMandWaitForRunning(app3, attempt3, nm1); + launchAMandWaitForRunning(app4, attempt4, nm1); + verifyRunningAndAcceptedApps(4, 0); + } + + @Test(timeout = 30000) + public void testMaxAppsReachedWithNonRunnableApps() throws Exception { + conf.setInt("yarn.scheduler.capacity.root.default.max-parallel-apps", 2); + conf.setInt("yarn.scheduler.capacity.root.default.maximum-applications", 4); + executeCommonStepsAndChecks(); + + RMApp app5 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(512, rm) + .withAppName("app5") + .withUser("testuser") + .withQueue("default") + .withWaitForAppAcceptedState(false) + .build()); + + rm.waitForState(app5.getApplicationId(), RMAppState.FAILED); + } + + private void executeCommonStepsAndChecks() throws Exception { + rm = new MockRM(conf); + rm.start(); + + nm1 = rm.registerNode("h1:1234", 4096, 8); + rm.registerNode("h2:1234", 4096, 8); + rm.registerNode("h3:1234", 4096, 8); + + rm.drainEvents(); + + app1 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(512, rm) + .withAppName("app1") + .withUser("testuser") + .withQueue("default") + .build()); + + am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + app2 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(512, rm) + .withAppName("app2") + .withUser("testuser") + .withQueue("default") + .build()); + am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + app3 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(512, rm) + .withAppName("app3") + .withUser("testuser") + .withQueue("default") + .build()); + attempt3 = MockRM.waitForAttemptScheduled(app3, rm); + + app4 = MockRMAppSubmitter.submit(rm, + MockRMAppSubmissionData.Builder.createWithMemory(512, rm) + .withAppName("app4") + .withUser("testuser") + .withQueue("default") + .build()); + attempt4 = MockRM.waitForAttemptScheduled(app4, rm); + + // Check that app attempt #3 and #4 are non-runnable + rootQueue = getRootQueue(); + defaultQueue = getDefaultQueue(); + Set nonRunnables = + Sets.newHashSet( + attempt3.getAppAttemptId(), + attempt4.getAppAttemptId()); + verifyRunnableAppsInParent(rootQueue, 2); + verifyRunnableAppsInLeaf(defaultQueue, 2, nonRunnables); + verifyRunningAndAcceptedApps(2, 2); + } + + private void testWhenSettingsExceeded() throws Exception { + // Stop app #1 + unregisterAMandWaitForFinish(app1, am1, nm1); + + // Launch app #3 + launchAMandWaitForRunning(app3, attempt3, nm1); + + // Check that attempt #4 is still non-runnable + verifyRunnableAppsInParent(rootQueue, 2); + verifyRunnableAppsInLeaf(defaultQueue, 2, + Collections.singleton(attempt4.getAppAttemptId())); + verifyRunningAndAcceptedApps(2, 1); + + // Stop app #2 + unregisterAMandWaitForFinish(app2, am2, nm1); + + // Launch app #4 + launchAMandWaitForRunning(app4, attempt4, nm1); + verifyRunnableAppsInParent(rootQueue, 2); + verifyRunnableAppsInLeaf(defaultQueue, 2, + Collections.emptySet()); + verifyRunningAndAcceptedApps(2, 0); + } + + @SuppressWarnings("checkstyle:hiddenfield") + private LeafQueue getDefaultQueue() { + CSQueue defaultQueue = + ((CapacityScheduler) rm.getResourceScheduler()).getQueue("default"); + + return (LeafQueue) defaultQueue; + } + + private ParentQueue getRootQueue() { + CSQueue root = + ((CapacityScheduler) rm.getResourceScheduler()).getQueue("root"); + + return (ParentQueue) root; + } + + private void verifyRunnableAppsInParent(ParentQueue queue, + int expectedRunnable) { + assertEquals("Num of runnable apps", expectedRunnable, + queue.getNumRunnableApps()); + } + + private void verifyRunnableAppsInLeaf(LeafQueue queue, int expectedRunnable, + Set nonRunnableIds) { + assertEquals("Num of runnable apps", expectedRunnable, + queue.getNumRunnableApps()); + + queue.getCopyOfNonRunnableAppSchedulables() + .stream() + .map(fca -> fca.getApplicationAttemptId()) + .forEach(id -> assertTrue(id + " not found as non-runnable", + nonRunnableIds.contains(id))); + } + + private void verifyRunningAndAcceptedApps(int expectedRunning, + int expectedAccepted) throws YarnException { + GetApplicationsRequest request = GetApplicationsRequest.newInstance(); + + GetApplicationsResponse resp = + rm.getClientRMService().getApplications(request); + + List apps = resp.getApplicationList(); + + long runningCount = apps + .stream() + .filter(report -> + report.getYarnApplicationState() == YarnApplicationState.RUNNING) + .count(); + + long acceptedCount = apps + .stream() + .filter(report -> + report.getYarnApplicationState() == YarnApplicationState.ACCEPTED) + .count(); + + assertEquals("Running apps count", expectedRunning, runningCount); + assertEquals("Accepted apps count", expectedAccepted, acceptedCount); + } + + private void unregisterAMandWaitForFinish(RMApp app, MockAM am, MockNM nm) + throws Exception { + am.unregisterAppAttempt(); + nm.nodeHeartbeat(app.getCurrentAppAttempt().getAppAttemptId(), 1, + ContainerState.COMPLETE); + rm.waitForState(app.getCurrentAppAttempt().getAppAttemptId(), + RMAppAttemptState.FINISHED); + } + + @SuppressWarnings("rawtypes") + private MockAM launchAMandWaitForRunning(RMApp app, RMAppAttempt attempt, + MockNM nm) throws Exception { + nm.nodeHeartbeat(true); + ((AbstractYarnScheduler)rm.getResourceScheduler()).update(); + rm.drainEvents(); + nm.nodeHeartbeat(true); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + rm.waitForState(app.getApplicationId(), RMAppState.RUNNING); + + return am; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 3353eacbd7..f664e038f1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -456,6 +456,8 @@ public void testPolicyConfiguration() throws Exception { @Test public void testAppAttemptMetrics() throws Exception { + CSMaxRunningAppsEnforcer enforcer = mock(CSMaxRunningAppsEnforcer.class); + cs.setMaxRunningAppsEnforcer(enforcer); // Manipulate queue 'a' LeafQueue a = stubLeafQueue((LeafQueue) queues.get(B)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java index 13957e9b41..aa3b5919fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueState.java @@ -218,6 +218,7 @@ private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, CommonNodeLabelsManager.NO_LABEL); when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) .thenCallRealMethod(); + when(application.isRunnable()).thenReturn(true); return application; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java index a4c1300df3..e893717a8d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueStateManager.java @@ -157,6 +157,7 @@ private FiCaSchedulerApp getMockApplication(ApplicationId appId, String user, CommonNodeLabelsManager.NO_LABEL); when(application.compareInputOrderTo(any(FiCaSchedulerApp.class))) .thenCallRealMethod(); + when(application.isRunnable()).thenReturn(true); return application; } }