From 82ec28f4421c162a505ba5e5b329e4be199878a7 Mon Sep 17 00:00:00 2001 From: Sunil G Date: Wed, 19 Aug 2020 11:54:48 +0530 Subject: [PATCH] YARN-10396. Max applications calculation per queue disregards queue level settings in absolute mode. Contributed by Benjamin Teke. --- .../scheduler/capacity/ParentQueue.java | 14 +- .../scheduler/capacity/TestParentQueue.java | 134 ++++++++++++++++-- 2 files changed, 132 insertions(+), 16 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index bbb80ba733..923e687500 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -1123,8 +1123,18 @@ private void deriveCapacityFromAbsoluteConfigurations(String label, if (childQueue instanceof LeafQueue) { LeafQueue leafQueue = (LeafQueue) childQueue; CapacitySchedulerConfiguration conf = csContext.getConfiguration(); - int maxApplications = (int) (conf.getMaximumSystemApplications() - * childQueue.getQueueCapacities().getAbsoluteCapacity(label)); + int maxApplications = + conf.getMaximumApplicationsPerQueue(childQueue.getQueuePath()); + if (maxApplications < 0) { + int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue(); + if (maxGlobalPerQueueApps > 0) { + maxApplications = (int) (maxGlobalPerQueueApps * + childQueue.getQueueCapacities().getAbsoluteCapacity(label)); + } else { + maxApplications = (int) (conf.getMaximumSystemApplications() + * childQueue.getQueueCapacities().getAbsoluteCapacity(label)); + } + } leafQueue.setMaxApplications(maxApplications); int maxApplicationsPerUser = Math.min(maxApplications, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java index 0560d595a6..9ed0388aec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestParentQueue.java @@ -108,15 +108,17 @@ public void setUp() throws Exception { private static final String A = "a"; private static final String B = "b"; + private static final String Q_A = + CapacitySchedulerConfiguration.ROOT + "." + A; + private static final String Q_B = + CapacitySchedulerConfiguration.ROOT + "." + B; private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) { // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B}); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; conf.setCapacity(Q_A, 30); - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; conf.setCapacity(Q_B, 70); LOG.info("Setup top-level queues a and b"); @@ -128,11 +130,9 @@ private void setupSingleLevelQueuesWithAbsoluteResource( // Define top-level queues conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B}); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; conf.setMinimumResourceRequirement("", Q_A, QUEUE_A_RESOURCE); - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; conf.setMinimumResourceRequirement("", Q_B, QUEUE_B_RESOURCE); @@ -368,9 +368,7 @@ public void testSingleLevelQueues() throws Exception { public void testSingleLevelQueuesPrecision() throws Exception { // Setup queue configs setupSingleLevelQueues(csConf); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + "a"; csConf.setCapacity(Q_A, 30); - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + "b"; csConf.setCapacity(Q_B, 70.5F); CSQueueStore queues = new CSQueueStore(); @@ -434,10 +432,8 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) { // Define top-level queues csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D}); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; conf.setCapacity(Q_A, 10); - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; conf.setCapacity(Q_B, 50); final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C; @@ -656,7 +652,6 @@ public void testQueueCapacitySettingChildZero() throws Exception { setupMultiLevelQueues(csConf); // set child queues capacity to 0 when parents not 0 - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; csConf.setCapacity(Q_B + "." + B1, 0); csConf.setCapacity(Q_B + "." + B2, 0); csConf.setCapacity(Q_B + "." + B3, 0); @@ -673,9 +668,7 @@ public void testQueueCapacitySettingParentZero() throws Exception { setupMultiLevelQueues(csConf); // set parent capacity to 0 when child not 0 - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; csConf.setCapacity(Q_B, 0); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; csConf.setCapacity(Q_A, 60); CSQueueStore queues = new CSQueueStore(); @@ -690,13 +683,11 @@ public void testQueueCapacityZero() throws Exception { setupMultiLevelQueues(csConf); // set parent and child capacity to 0 - final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B; csConf.setCapacity(Q_B, 0); csConf.setCapacity(Q_B + "." + B1, 0); csConf.setCapacity(Q_B + "." + B2, 0); csConf.setCapacity(Q_B + "." + B3, 0); - final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A; csConf.setCapacity(Q_A, 60); CSQueueStore queues = new CSQueueStore(); @@ -1029,10 +1020,125 @@ public void testAbsoluteResourceWithChangeInClusterResource() QUEUE_B_RESOURCE_70PERC); } + @Test + public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception { + // Setup queue configs + setupSingleLevelQueuesWithAbsoluteResource(csConf); + + CSQueueStore queues = new CSQueueStore(); + CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf, + null, CapacitySchedulerConfiguration.ROOT, queues, queues, + TestUtils.spyHook); + + // Setup some nodes + int numNodes = 2; + final long memoryPerNode = (QUEUE_A_RESOURCE.getMemorySize() + + QUEUE_B_RESOURCE.getMemorySize()) / numNodes; + int coresPerNode = (QUEUE_A_RESOURCE.getVirtualCores() + + QUEUE_B_RESOURCE.getVirtualCores()) / numNodes; + + Resource clusterResource = Resources.createResource( + numNodes * memoryPerNode, numNodes * coresPerNode); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Start testing + // Only MaximumSystemApplications is set in csConf + LeafQueue a = (LeafQueue) queues.get(A); + LeafQueue b = (LeafQueue) queues.get(B); + + float queueAScale = (float) QUEUE_A_RESOURCE.getMemorySize() / + (float) clusterResource.getMemorySize(); + float queueBScale = (float) QUEUE_B_RESOURCE.getMemorySize() / + (float) clusterResource.getMemorySize(); + + assertEquals(queueAScale, a.getQueueCapacities().getCapacity(), + DELTA); + assertEquals(1f, a.getQueueCapacities().getMaximumCapacity(), + DELTA); + assertEquals(queueAScale, a.getQueueCapacities().getAbsoluteCapacity(), + DELTA); + assertEquals(1f, + a.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA); + assertEquals((int) (csConf.getMaximumSystemApplications() * queueAScale), + a.getMaxApplications()); + assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser()); + + assertEquals(queueBScale, + b.getQueueCapacities().getCapacity(), DELTA); + assertEquals(1f, + b.getQueueCapacities().getMaximumCapacity(), DELTA); + assertEquals(queueBScale, + b.getQueueCapacities().getAbsoluteCapacity(), DELTA); + assertEquals(1f, + b.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA); + assertEquals((int) (csConf.getMaximumSystemApplications() * queueBScale), + b.getMaxApplications()); + assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); + + // Set GlobalMaximumApplicationsPerQueue in csConf + csConf.setGlobalMaximumApplicationsPerQueue(20000); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() * + queueAScale), a.getMaxApplications()); + assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser()); + assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() * + queueBScale), b.getMaxApplications()); + assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); + + // Set MaximumApplicationsPerQueue in csConf + int queueAMaxApplications = 30000; + int queueBMaxApplications = 30000; + csConf.set("yarn.scheduler.capacity." + Q_A + ".maximum-applications", + Integer.toString(queueAMaxApplications)); + csConf.set("yarn.scheduler.capacity." + Q_B + ".maximum-applications", + Integer.toString(queueBMaxApplications)); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + assertEquals(queueAMaxApplications, a.getMaxApplications()); + assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser()); + assertEquals(queueBMaxApplications, b.getMaxApplications()); + assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); + + // Extra cases for testing maxApplicationsPerUser + int halfPercent = 50; + int oneAndQuarterPercent = 125; + a.getUsersManager().setUserLimit(halfPercent); + b.getUsersManager().setUserLimit(oneAndQuarterPercent); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + assertEquals(a.getMaxApplications() * halfPercent / 100, + a.getMaxApplicationsPerUser()); + // Q_B's limit per user shouldn't be greater + // than the whole queue's application limit + assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); + + float userLimitFactorQueueA = 0.9f; + float userLimitFactorQueueB = 1.1f; + a.getUsersManager().setUserLimit(halfPercent); + a.getUsersManager().setUserLimitFactor(userLimitFactorQueueA); + b.getUsersManager().setUserLimit(100); + b.getUsersManager().setUserLimitFactor(userLimitFactorQueueB); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + assertEquals((int) (a.getMaxApplications() * halfPercent * + userLimitFactorQueueA / 100), a.getMaxApplicationsPerUser()); + // Q_B's limit per user shouldn't be greater + // than the whole queue's application limit + assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); + + } + @After public void tearDown() throws Exception { } - + private ResourceLimits anyResourceLimits() { return any(ResourceLimits.class); }