From 524a7523c427b55273133078898ae3535897bada Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Thu, 8 Nov 2018 16:02:48 -0800 Subject: [PATCH] YARN-8990. Fix fair scheduler race condition in app submit and queue cleanup. (Contributed by Wilfred Spiegelenburg) --- .../scheduler/fair/FSLeafQueue.java | 14 +++ .../scheduler/fair/FairScheduler.java | 19 ++- .../scheduler/fair/QueueManager.java | 113 ++++++++++++------ 3 files changed, 104 insertions(+), 42 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java index 7e4dab8088..a038887327 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSLeafQueue.java @@ -651,4 +651,18 @@ public void addAssignedApp(ApplicationId applicationId) { writeLock.unlock(); } } + + /** + * This method is called when an application is removed from this queue + * during the submit process. + * @param applicationId the application's id + */ + public void removeAssignedApp(ApplicationId applicationId) { + writeLock.lock(); + try { + assignedApps.remove(applicationId); + } finally { + writeLock.unlock(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index da5e4c9347..e5d2a066c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -473,7 +473,7 @@ protected void addApplication(ApplicationId applicationId, writeLock.lock(); try { RMApp rmApp = rmContext.getRMApps().get(applicationId); - FSLeafQueue queue = assignToQueue(rmApp, queueName, user); + FSLeafQueue queue = assignToQueue(rmApp, queueName, user, applicationId); if (queue == null) { return; } @@ -499,6 +499,7 @@ protected void addApplication(ApplicationId applicationId, applicationId, queue.getName(), invalidAMResourceRequests, queue.getMaxShare()); rejectApplicationWithMessage(applicationId, msg); + queue.removeAssignedApp(applicationId); return; } } @@ -513,6 +514,7 @@ protected void addApplication(ApplicationId applicationId, + " cannot submit applications to queue " + queue.getName() + "(requested queuename is " + queueName + ")"; rejectApplicationWithMessage(applicationId, msg); + queue.removeAssignedApp(applicationId); return; } @@ -520,7 +522,6 @@ protected void addApplication(ApplicationId applicationId, new SchedulerApplication(queue, user); applications.put(applicationId, application); queue.getMetrics().submitApp(user); - queue.addAssignedApp(applicationId); LOG.info("Accepted application " + applicationId + " from user: " + user + ", in queue: " + queue.getName() @@ -597,11 +598,19 @@ protected void addApplicationAttempt( } /** - * Helper method that attempts to assign the app to a queue. The method is - * responsible to call the appropriate event-handler if the app is rejected. + * Helper method for the tests to assign the app to a queue. */ @VisibleForTesting FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { + return assignToQueue(rmApp, queueName, user, null); + } + + /** + * Helper method that attempts to assign the app to a queue. The method is + * responsible to call the appropriate event-handler if the app is rejected. + */ + private FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user, + ApplicationId applicationId) { FSLeafQueue queue = null; String appRejectMsg = null; @@ -611,7 +620,7 @@ FSLeafQueue assignToQueue(RMApp rmApp, String queueName, String user) { if (queueName == null) { appRejectMsg = "Application rejected by queue placement policy"; } else { - queue = queueMgr.getLeafQueue(queueName, true); + queue = queueMgr.getLeafQueue(queueName, true, applicationId); if (queue == null) { appRejectMsg = queueName + " is not a leaf queue"; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java index 83717657c1..2ca32c3018 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueueManager.java @@ -38,6 +38,7 @@ import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.xml.sax.SAXException; @@ -71,7 +72,7 @@ private void execute() { Boolean removed = removeEmptyIncompatibleQueues(queueToCreate, queueType).orElse(null); if (Boolean.TRUE.equals(removed)) { - FSQueue queue = getQueue(queueToCreate, true, queueType, false); + FSQueue queue = getQueue(queueToCreate, true, queueType, false, null); if (queue != null && // if queueToCreate is present in the allocation config, set it // to static @@ -124,30 +125,49 @@ public void initialize(Configuration conf) throws IOException, /** * Get a leaf queue by name, creating it if the create param is - * true and is necessary. - * If the queue is not or can not be a leaf queue, i.e. it already exists as a - * parent queue, or one of the parents in its name is already a leaf queue, - * null is returned. + * true and the queue does not exist. + * If the queue is not or can not be a leaf queue, i.e. it already exists as + * a parent queue, or one of the parents in its name is already a leaf queue, + * null is returned. * * The root part of the name is optional, so a queue underneath the root * named "queue1" could be referred to as just "queue1", and a queue named * "queue2" underneath a parent named "parent1" that is underneath the root * could be referred to as just "parent1.queue2". + * @param name name of the queue + * @param create true if the queue must be created if it does + * not exist, false otherwise + * @return the leaf queue or null if the queue cannot be found */ public FSLeafQueue getLeafQueue(String name, boolean create) { - return getLeafQueue(name, create, true); + return getLeafQueue(name, create, null, true); } - private FSLeafQueue getLeafQueue( - String name, - boolean create, - boolean recomputeSteadyShares) { - FSQueue queue = getQueue( - name, - create, - FSQueueType.LEAF, - recomputeSteadyShares - ); + /** + * Get a leaf queue by name, creating it if the create param is + * true and the queue does not exist. + * If the queue is not or can not be a leaf queue, i.e. it already exists as + * a parent queue, or one of the parents in its name is already a leaf queue, + * null is returned. + * + * If the application will be assigned to the queue if the applicationId is + * not null + * @param name name of the queue + * @param create true if the queue must be created if it does + * not exist, false otherwise + * @param applicationId the application ID to assign to the queue + * @return the leaf queue or null if teh queue cannot be found + */ + public FSLeafQueue getLeafQueue(String name, boolean create, + ApplicationId applicationId) { + return getLeafQueue(name, create, applicationId, true); + } + + private FSLeafQueue getLeafQueue(String name, boolean create, + ApplicationId applicationId, + boolean recomputeSteadyShares) { + FSQueue queue = getQueue(name, create, FSQueueType.LEAF, + recomputeSteadyShares, applicationId); if (queue instanceof FSParentQueue) { return null; } @@ -168,42 +188,55 @@ public boolean removeLeafQueue(String name) { /** * Get a parent queue by name, creating it if the create param is - * true and is necessary. - * If the queue is not or can not be a parent queue, - * i.e. it already exists as a - * leaf queue, or one of the parents in its name is already a leaf queue, - * null is returned. + * true and the queue does not exist. + * If the queue is not or can not be a parent queue, i.e. it already exists + * as a leaf queue, or one of the parents in its name is already a leaf + * queue, null is returned. * * The root part of the name is optional, so a queue underneath the root * named "queue1" could be referred to as just "queue1", and a queue named * "queue2" underneath a parent named "parent1" that is underneath the root * could be referred to as just "parent1.queue2". + * @param name name of the queue + * @param create true if the queue must be created if it does + * not exist, false otherwise + * @return the parent queue or null if the queue cannot be found */ public FSParentQueue getParentQueue(String name, boolean create) { return getParentQueue(name, create, true); } - public FSParentQueue getParentQueue( - String name, - boolean create, + /** + * Get a parent queue by name, creating it if the create param is + * true and the queue does not exist. + * If the queue is not or can not be a parent queue, i.e. it already exists + * as a leaf queue, or one of the parents in its name is already a leaf + * queue, null is returned. + * + * The root part of the name is optional, so a queue underneath the root + * named "queue1" could be referred to as just "queue1", and a queue named + * "queue2" underneath a parent named "parent1" that is underneath the root + * could be referred to as just "parent1.queue2". + * @param name name of the queue + * @param create true if the queue must be created if it does + * not exist, false otherwise + * @param recomputeSteadyShares true if the steady fair share + * should be recalculated when a queue is added, + * false otherwise + * @return the parent queue or null if the queue cannot be found + */ + public FSParentQueue getParentQueue(String name, boolean create, boolean recomputeSteadyShares) { - FSQueue queue = getQueue( - name, - create, - FSQueueType.PARENT, - recomputeSteadyShares - ); + FSQueue queue = getQueue(name, create, FSQueueType.PARENT, + recomputeSteadyShares, null); if (queue instanceof FSLeafQueue) { return null; } return (FSParentQueue) queue; } - private FSQueue getQueue( - String name, - boolean create, - FSQueueType queueType, - boolean recomputeSteadyShares) { + private FSQueue getQueue(String name, boolean create, FSQueueType queueType, + boolean recomputeSteadyShares, ApplicationId applicationId) { boolean recompute = recomputeSteadyShares; name = ensureRootPrefix(name); FSQueue queue; @@ -215,8 +248,14 @@ private FSQueue getQueue( } else { recompute = false; } + // At this point the queue exists and we need to assign the app if to the + // but only to a leaf queue + if (applicationId != null && queue instanceof FSLeafQueue) { + ((FSLeafQueue)queue).addAssignedApp(applicationId); + } } - if (recompute) { + // Don't recompute if it is an existing queue or no change was made + if (recompute && queue != null) { rootQueue.recomputeSteadyShares(); } return queue; @@ -614,7 +653,7 @@ private void ensureQueueExistsAndIsCompatibleAndIsStatic( incompatibleQueuesPendingRemoval.add( new IncompatibleQueueRemovalTask(name, queueType)); } else { - FSQueue queue = getQueue(name, true, queueType, false); + FSQueue queue = getQueue(name, true, queueType, false, null); if (queue != null) { queue.setDynamic(false); }