From 4b1b6b858ac6584c376b9810a5b93e0c023124bc Mon Sep 17 00:00:00 2001 From: 9uapaw Date: Thu, 7 Oct 2021 17:09:38 +0200 Subject: [PATCH] =?UTF-8?q?YARN-10953.=20Make=20CapacityScheduler#getOrCre?= =?UTF-8?q?ateQueueFromPlacementConte=E2=80=A6=20Contributed=20by=20Andras?= =?UTF-8?q?=20Gyori?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../scheduler/capacity/CapacityScheduler.java | 148 ++++++++++-------- .../CapacitySchedulerConfiguration.java | 2 +- .../CapacitySchedulerQueueManager.java | 39 +++-- .../capacity/TestCapacityScheduler.java | 40 +++++ ...CapacitySchedulerNewQueueAutoCreation.java | 3 +- ...WebServicesCapacitySchedDynamicConfig.java | 5 +- 6 files changed, 148 insertions(+), 89 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 30b00b139c..8ef0a23390 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -234,6 +234,7 @@ public Configuration getConf() { private boolean multiNodePlacementEnabled; private boolean printedVerboseLoggingForAsyncScheduling; + private boolean appShouldFailFast; /** * EXPERT @@ -355,6 +356,9 @@ void initScheduler(Configuration configuration) throws this.assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); this.maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); + this.appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast( + getConfig()); + // number of threads for async scheduling int maxAsyncSchedulingThreads = this.conf.getInt( CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_MAXIMUM_THREAD, @@ -491,6 +495,8 @@ public void reinitialize(Configuration newConf, RMContext rmContext, assignMultipleEnabled = this.conf.getAssignMultipleEnabled(); maxAssignPerHeartbeat = this.conf.getMaxAssignPerHeartbeat(); offswitchPerHeartbeatLimit = this.conf.getOffSwitchPerHeartbeatLimit(); + appShouldFailFast = CapacitySchedulerConfiguration.shouldAppFailFast( + getConfig()); LOG.info("assignMultipleEnabled = " + assignMultipleEnabled + "\n" + "maxAssignPerHeartbeat = " + maxAssignPerHeartbeat + "\n" + @@ -880,7 +886,7 @@ private void addApplicationOnRecovery(ApplicationId applicationId, if (queue == null) { //During a restart, this indicates a queue was removed, which is //not presently supported - if (!getConfiguration().shouldAppFailFast(getConfig())) { + if (!appShouldFailFast) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.KILL, "Application killed on recovery as it" @@ -901,7 +907,7 @@ private void addApplicationOnRecovery(ApplicationId applicationId, if (!(queue instanceof LeafQueue)) { // During RM restart, this means leaf queue was converted to a parent // queue, which is not supported for running apps. - if (!getConfiguration().shouldAppFailFast(getConfig())) { + if (!appShouldFailFast) { this.rmContext.getDispatcher().getEventHandler().handle( new RMAppEvent(applicationId, RMAppEventType.KILL, "Application killed on recovery as it was " @@ -951,73 +957,83 @@ private CSQueue getOrCreateQueueFromPlacementContext(ApplicationId applicationId, String user, String queueName, ApplicationPlacementContext placementContext, boolean isRecovery) { - CSQueue queue = getQueue(queueName); - ApplicationPlacementContext fallbackContext = placementContext; + QueuePath queuePath = new QueuePath(queueName); - if (queue == null) { - // Even if placement rules are turned off, we still have the opportunity - // to auto create a queue. - if (placementContext == null) { - fallbackContext = CSQueueUtils.extractQueuePath(queueName); - } - - //we need to make sure there is no empty path parts present - String path = fallbackContext.getFullQueuePath(); - String[] pathParts = path.split("\\."); - for (int i = 0; i < pathParts.length; i++) { - if ("".equals(pathParts[i])) { - LOG.error("Application submitted to invalid path: '{}'", path); - return null; - } - } - - if (fallbackContext.hasParentQueue()) { - try { - writeLock.lock(); - return queueManager.createQueue(fallbackContext); - } catch (YarnException | IOException e) { - // A null queue is expected if the placementContext is null. In order - // not to disrupt the control flow, if we fail to auto create a queue, - // we fall back to the original logic. - if (placementContext == null) { - LOG.error("Could not auto-create leaf queue " + queueName + - " due to : ", e); - return null; - } - if (isRecovery) { - if (!getConfiguration().shouldAppFailFast(getConfig())) { - LOG.error("Could not auto-create leaf queue " + queueName + - " due to : ", e); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.KILL, - "Application killed on recovery" - + " as it was submitted to queue " + queueName - + " which could not be auto-created")); - } else{ - String queueErrorMsg = - "Queue named " + queueName + " could not be " - + "auto-created during application recovery."; - LOG.error(FATAL, queueErrorMsg, e); - throw new QueueInvalidException(queueErrorMsg); - } - } else{ - LOG.error("Could not auto-create leaf queue due to : ", e); - final String message = - "Application " + applicationId + " submission by user : " - + user - + " to queue : " + queueName + " failed : " + e - .getMessage(); - this.rmContext.getDispatcher().getEventHandler().handle( - new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, - message)); - } - } finally { - writeLock.unlock(); - } - } + if (queue != null) { + return queue; + } + + if (isAmbiguous(queueName)) { + return null; + } + + if (placementContext != null) { + queuePath = new QueuePath(placementContext.getFullQueuePath()); + } + + //we need to make sure there are no empty path parts present + if (queuePath.hasEmptyPart()) { + LOG.error("Application submitted to invalid path due to empty parts: " + + "'{}'", queuePath); + return null; + } + + if (!queuePath.hasParent()) { + LOG.error("Application submitted to a queue without parent" + + " '{}'", queuePath); + return null; + } + + try { + writeLock.lock(); + return queueManager.createQueue(queuePath); + } catch (YarnException | IOException e) { + // A null queue is expected if the placementContext is null. In order + // not to disrupt the control flow, if we fail to auto create a queue, + // we fall back to the original logic. + if (placementContext == null) { + LOG.error("Could not auto-create leaf queue " + queueName + + " due to : ", e); + return null; + } + handleQueueCreationError(applicationId, user, queueName, isRecovery, e); + } finally { + writeLock.unlock(); + } + return null; + } + + private void handleQueueCreationError( + ApplicationId applicationId, String user, String queueName, + boolean isRecovery, Exception e) { + if (isRecovery) { + if (!appShouldFailFast) { + LOG.error("Could not auto-create leaf queue " + queueName + + " due to : ", e); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.KILL, + "Application killed on recovery" + + " as it was submitted to queue " + queueName + + " which did not exist and could not be auto-created")); + } else { + String queueErrorMsg = + "Queue named " + queueName + " could not be " + + "auto-created during application recovery."; + LOG.error(FATAL, queueErrorMsg, e); + throw new QueueInvalidException(queueErrorMsg); + } + } else { + LOG.error("Could not auto-create leaf queue due to : ", e); + final String message = + "Application " + applicationId + " submission by user : " + + user + + " to queue : " + queueName + " failed : " + e + .getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle( + new RMAppEvent(applicationId, RMAppEventType.APP_REJECTED, + message)); } - return queue; } private void addApplication(ApplicationId applicationId, String queueName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 60e55275e2..670f19bfa9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1672,7 +1672,7 @@ public boolean getLazyPreemptionEnabled() { return getBoolean(LAZY_PREEMPTION_ENABLED, DEFAULT_LAZY_PREEMPTION_ENABLED); } - public boolean shouldAppFailFast(Configuration conf) { + public static boolean shouldAppFailFast(Configuration conf) { return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java index a26fadf434..c719231723 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueManager.java @@ -29,7 +29,6 @@ import java.util.Set; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -528,10 +527,10 @@ public void addLegacyDynamicQueue(Queue queue) * @throws YarnException if the given path is not eligible to be auto created * @throws IOException if the given path can not be added to the parent */ - public LeafQueue createQueue(ApplicationPlacementContext queue) + public LeafQueue createQueue(QueuePath queue) throws YarnException, IOException { - String leafQueueName = queue.getQueue(); - String parentQueueName = queue.getParentQueue(); + String leafQueueName = queue.getLeafName(); + String parentQueueName = queue.getParent(); if (!StringUtils.isEmpty(parentQueueName)) { CSQueue parentQueue = getQueue(parentQueueName); @@ -563,16 +562,22 @@ public LeafQueue createQueue(ApplicationPlacementContext queue) * to be auto created */ public List determineMissingParents( - ApplicationPlacementContext queue) throws SchedulerDynamicEditException { - if (!queue.hasParentQueue()) { + QueuePath queue) throws SchedulerDynamicEditException { + if (!queue.hasParent()) { throw new SchedulerDynamicEditException("Can not auto create queue " - + queue.getFullQueuePath() + " due to missing ParentQueue path."); + + queue.getFullPath() + " due to missing ParentQueue path."); + } + + if (isAmbiguous(queue.getParent())) { + throw new SchedulerDynamicEditException("Could not auto-create queue " + + queue + " due to ParentQueue " + queue.getParent() + + " being ambiguous."); } // Start from the first parent int firstStaticParentDistance = 1; - StringBuilder parentCandidate = new StringBuilder(queue.getParentQueue()); + StringBuilder parentCandidate = new StringBuilder(queue.getParent()); LinkedList parentsToCreate = new LinkedList<>(); CSQueue firstExistingParent = getQueue(parentCandidate.toString()); @@ -584,7 +589,7 @@ public List determineMissingParents( if (firstStaticParentDistance > MAXIMUM_DYNAMIC_QUEUE_DEPTH) { throw new SchedulerDynamicEditException( - "Could not auto create queue " + queue.getFullQueuePath() + "Could not auto create queue " + queue.getFullPath() + ". The distance of the LeafQueue from the first static " + "ParentQueue is " + firstStaticParentDistance + ", which is " + "above the limit."); @@ -607,7 +612,7 @@ public List determineMissingParents( if (!(firstExistingParent instanceof ParentQueue)) { throw new SchedulerDynamicEditException( "Could not auto create hierarchy of " - + queue.getFullQueuePath() + ". Queue " + queue.getParentQueue() + + + queue.getFullPath() + ". Queue " + queue.getParent() + " is not a ParentQueue." ); } @@ -616,7 +621,7 @@ public List determineMissingParents( if (!existingParentQueue.isEligibleForAutoQueueCreation()) { throw new SchedulerDynamicEditException("Auto creation of queue " + - queue.getFullQueuePath() + " is not enabled under parent " + queue.getFullPath() + " is not enabled under parent " + existingParentQueue.getQueuePath()); } @@ -637,12 +642,12 @@ public void reinitConfiguredNodeLabels(CapacitySchedulerConfiguration conf) { this.configuredNodeLabels = new ConfiguredNodeLabels(conf); } - private LeafQueue createAutoQueue(ApplicationPlacementContext queue) + private LeafQueue createAutoQueue(QueuePath queue) throws SchedulerDynamicEditException { List parentsToCreate = determineMissingParents(queue); // First existing parent is either the parent of the last missing parent // or the parent of the given path - String existingParentName = queue.getParentQueue(); + String existingParentName = queue.getParent(); if (!parentsToCreate.isEmpty()) { existingParentName = parentsToCreate.get(0).substring( 0, parentsToCreate.get(0).lastIndexOf(".")); @@ -657,21 +662,21 @@ private LeafQueue createAutoQueue(ApplicationPlacementContext queue) } LeafQueue leafQueue = existingParentQueue.addDynamicLeafQueue( - queue.getFullQueuePath()); + queue.getFullPath()); addQueue(leafQueue.getQueuePath(), leafQueue); return leafQueue; } - private LeafQueue createLegacyAutoQueue(ApplicationPlacementContext queue) + private LeafQueue createLegacyAutoQueue(QueuePath queue) throws IOException, SchedulerDynamicEditException { - CSQueue parentQueue = getQueue(queue.getParentQueue()); + CSQueue parentQueue = getQueue(queue.getParent()); // Case 1: Handle ManagedParentQueue ManagedParentQueue autoCreateEnabledParentQueue = (ManagedParentQueue) parentQueue; AutoCreatedLeafQueue autoCreatedLeafQueue = new AutoCreatedLeafQueue( - csContext, queue.getQueue(), autoCreateEnabledParentQueue); + csContext, queue.getLeafName(), autoCreateEnabledParentQueue); addLegacyDynamicQueue(autoCreatedLeafQueue); return autoCreatedLeafQueue; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index b62c237147..efa736d53c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -1566,6 +1566,46 @@ private MockRM setUpMove(Configuration config) { return rm; } + @Test + public void testAppSubmission() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setQueues(A, new String[] {"a1", "a2", "b"}); + conf.setCapacity(A1, 20); + conf.setCapacity("root.a.b", 10); + MockRM rm = new MockRM(conf); + rm.start(); + + RMApp noParentQueueApp = submitAppAndWaitForState(rm, "q", RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, noParentQueueApp.getState()); + + RMApp ambiguousQueueApp = submitAppAndWaitForState(rm, "b", RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, ambiguousQueueApp.getState()); + + RMApp emptyPartQueueApp = submitAppAndWaitForState(rm, "root..a1", RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, emptyPartQueueApp.getState()); + + RMApp failedAutoQueue = submitAppAndWaitForState(rm, "root.a.b.c.d", RMAppState.FAILED); + Assert.assertEquals(RMAppState.FAILED, failedAutoQueue.getState()); + } + + private RMApp submitAppAndWaitForState(MockRM rm, String b, RMAppState state) throws Exception { + MockRMAppSubmissionData ambiguousQueueAppData = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withWaitForAppAcceptedState(false) + .withAppName("app") + .withUser("user") + .withAcls(null) + .withQueue(b) + .withUnmanagedAM(false) + .build(); + RMApp app1 = MockRMAppSubmitter.submit(rm, ambiguousQueueAppData); + rm.waitForState(app1.getApplicationId(), state); + return app1; + } + @Test public void testMoveAppBasic() throws Exception { MockRM rm = setUpMove(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index 053a1354d1..9b23fdd903 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -1209,8 +1209,7 @@ public void testParentQueueDynamicChildRemoval() throws Exception { protected LeafQueue createQueue(String queuePath) throws YarnException, IOException { - return autoQueueHandler.createQueue( - CSQueueUtils.extractQueuePath(queuePath)); + return autoQueueHandler.createQueue(new QueuePath(queuePath)); } private void assertQueueMinResource(CSQueue queue, float expected) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java index 6938519c73..afd73621d3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/webapp/TestRMWebServicesCapacitySchedDynamicConfig.java @@ -42,11 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedQueueTemplate; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath; import org.apache.hadoop.yarn.webapp.GenericExceptionHandler; import org.apache.hadoop.yarn.webapp.GuiceServletConfig; import org.apache.hadoop.yarn.webapp.JerseyTestBase; @@ -359,8 +359,7 @@ private void initAutoQueueHandler() throws Exception { private LeafQueue createQueue(String queuePath) throws YarnException, IOException { - return autoQueueHandler.createQueue( - CSQueueUtils.extractQueuePath(queuePath)); + return autoQueueHandler.createQueue(new QueuePath(queuePath)); } private JSONObject sendRequestToSchedulerEndpoint() throws Exception {