diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index ac1a26ccef..329f7de09e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -296,6 +296,10 @@ protected void setupQueueConfigs(Resource clusterResource, + queueCapacities.getAbsoluteMaximumCapacity() + " [= 1.0 maximumCapacity undefined, " + "(parentAbsoluteMaxCapacity * maximumCapacity) / 100 otherwise ]" + + "\n" + "effectiveMinResource=" + + getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n" + + " , effectiveMaxResource=" + + getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL) + "\n" + "userLimit = " + usersManager.getUserLimit() + " [= configuredUserLimit ]" + "\n" + "userLimitFactor = " + usersManager.getUserLimitFactor() @@ -502,7 +506,11 @@ public String toString() { + ", " + "usedResources=" + queueUsage.getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() - + ", " + "numContainers=" + getNumContainers(); + + ", " + "numContainers=" + getNumContainers() + ", " + + "effectiveMinResource=" + + getEffectiveCapacity(CommonNodeLabelsManager.NO_LABEL) + + " , effectiveMaxResource=" + + getEffectiveMaxCapacity(CommonNodeLabelsManager.NO_LABEL); } finally { readLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 035c460c83..d8d71c71ea 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -184,7 +184,7 @@ public void setUp() throws Exception { conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - setupQueueMappings(conf, PARENT_QUEUE, true, new int[] {0, 1, 2, 3}); + setupQueueMappings(conf, PARENT_QUEUE, true, new int[] { 0, 1, 2, 3 }); dispatcher = new SpyDispatcher(); rmAppEventEventHandler = new SpyDispatcher.SpyRMAppEventHandler(); @@ -233,8 +233,8 @@ public static CapacitySchedulerConfiguration setupQueueMappings( queuePlacementRules.add(YarnConfiguration.USER_GROUP_PLACEMENT_RULE); conf.setQueuePlacementRules(queuePlacementRules); - List existingMappings = conf - .getQueueMappings(); + List existingMappings = + conf.getQueueMappings(); //set queue mapping List queueMappings = @@ -244,8 +244,8 @@ public static CapacitySchedulerConfiguration setupQueueMappings( UserGroupMappingPlacementRule.QueueMapping userQueueMapping = new UserGroupMappingPlacementRule.QueueMapping( UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, - USER + userIds[i], getQueueMapping(parentQueue, USER + - userIds[i])); + USER + userIds[i], + getQueueMapping(parentQueue, USER + userIds[i])); queueMappings.add(userQueueMapping); } @@ -439,34 +439,6 @@ protected RMNodeLabelsManager createNodeLabelManager() { return newMockRM; } - protected void checkQueueCapacities(CapacityScheduler newCS, float capacityC, - float capacityD) { - CSQueue rootQueue = newCS.getRootQueue(); - CSQueue queueC = tcs.findQueue(rootQueue, C); - CSQueue queueD = tcs.findQueue(rootQueue, D); - CSQueue queueC1 = tcs.findQueue(queueC, C1); - CSQueue queueC2 = tcs.findQueue(queueC, C2); - CSQueue queueC3 = tcs.findQueue(queueC, C3); - - float capC = capacityC / 100.0f; - float capD = capacityD / 100.0f; - - tcs.checkQueueCapacity(queueC, capC, capC, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueD, capD, capD, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueC1, C1_CAPACITY / 100.0f, - (C1_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); - tcs.checkQueueCapacity(queueC2, C2_CAPACITY / 100.0f, - (C2_CAPACITY / 100.0f) * capC, 1.0f, 1.0f); - - if (queueC3 != null) { - ManagedParentQueue parentQueue = (ManagedParentQueue) queueC; - QueueCapacities cap = - parentQueue.getLeafQueueTemplate().getQueueCapacities(); - tcs.checkQueueCapacity(queueC3, cap.getCapacity(), - (cap.getCapacity()) * capC, 1.0f, 1.0f); - } - } - static String getQueueMapping(String parentQueue, String leafQueue) { return parentQueue + DOT + leafQueue; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java new file mode 100644 index 0000000000..a025f1ed7d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueuePreemption.java @@ -0,0 +1,202 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.junit.Before; +import org.junit.Test; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.C; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.D; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.E; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER0; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .capacity.TestCapacitySchedulerAutoCreatedQueueBase.USER3; + +/** + * Tests various preemption cases on auto-created leaf queues. All + * auto-created leaf queues will end up having same priority since they are set + * from template. Priority on ManagedParent Queues can be set however and + * priority based premption cases are based on that. + */ +public class TestCapacitySchedulerAutoCreatedQueuePreemption + extends TestCapacitySchedulerSurgicalPreemption { + + @Override + @Before + public void setUp() throws Exception { + super.setUp(); + } + + public static CapacitySchedulerConfiguration + setupQueueConfigurationForSimpleSurgicalPreemption( + CapacitySchedulerConfiguration conf) { + + //set up auto created queue configs + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "c", + true, new int[] { 1, 2 }); + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "c" }); + conf.setCapacity(C, 100f); + + conf.setUserLimitFactor(C, 1.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 30.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + return conf; + } + + protected CapacitySchedulerConfiguration + setupQueueConfigurationForPriorityBasedPreemption( + CapacitySchedulerConfiguration conf) { + + //set up auto created queue configs + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "c", + true, new int[] { 1, 2 }); + + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "d", + true, new int[] { 3, 4 }); + + TestCapacitySchedulerAutoCreatedQueueBase.setupQueueMappings(conf, "e", + true, new int[] { 0 }); + //setup new queues with one of them auto enabled + // Define top-level queues + // Set childQueue for root + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "c", "d", "e" }); + conf.setCapacity(C, 45f); + conf.setCapacity(D, 45f); + conf.setCapacity(E, 10f); + + conf.setUserLimitFactor(E, 3.0f); + conf.setUserLimitFactor(C, 3.0f); + conf.setUserLimitFactor(D, 3.0f); + conf.setAutoCreateChildQueueEnabled(C, true); + conf.setAutoCreateChildQueueEnabled(D, true); + conf.setAutoCreateChildQueueEnabled(E, true); + + //Setup leaf queue template configs + conf.setAutoCreatedLeafQueueConfigCapacity(C, 100f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(C, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(C, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(C, 3.0f); + + conf.setAutoCreatedLeafQueueConfigCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(D, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(D, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(D, 3.0f); + + conf.setAutoCreatedLeafQueueConfigCapacity(E, 100.0f); + conf.setAutoCreatedLeafQueueConfigMaxCapacity(E, 100.0f); + conf.setAutoCreatedLeafQueueConfigUserLimit(E, 100); + conf.setAutoCreatedLeafQueueConfigUserLimitFactor(E, 3.0f); + + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".c", 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".d", 2); + + return conf; + } + + @Test(timeout = 60000) + public void testSimpleSurgicalPreemptionOnAutoCreatedLeafQueues() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *

+     *                    C
+     *            /       |     \
+     *           USER1   USER2   USER3
+     *          30      30        30
+     * 
+ * + * 1) Two nodes (n1/n2) in the cluster, each of them has 20G. + * + * 2) app1 submit to queue-USER1 first, it asked 32 * 1G containers + * We will allocate 16 on n1 and 16 on n2. + * + * 3) app2 submit to queue-USER2, ask for one 1G container (for AM) + * + * 4) app2 asks for another 6G container, it will be reserved on n1 + * + * Now: we have: + * n1: 17 from app1, 1 from app2, and 1 reserved from app2 + * n2: 16 from app1. + * + * After preemption, we should expect: + * Preempt 4 containers from app1 on n1. + */ + setupQueueConfigurationForSimpleSurgicalPreemption(conf); + testSimpleSurgicalPreemption(USER1, USER2, USER1, USER2); + } + + @Test(timeout = 600000) + public void + testPreemptionFromHighestPriorityManagedParentQueueAndOldestContainer() + throws Exception { + /** + * Test case: Submit two application (app1/app2) to different queues, queue + * structure: + * + *
+     *             Root
+     *            /  |  \
+     *           c   d   e
+     *          45  45  10
+     * 
+ * + * Priority of queue_c = 1 + * Priority of queue_d = 2 + * + * 1) 5 nodes (n0-n4) in the cluster, each of them has 4G. + * + * 2) app1 submit to queue-e first (AM=1G), it asked 4 * 1G containers + * We will allocate 1 container on each of n0-n4. AM on n4. + * + * 3) app2 submit to queue-c, AM container=0.5G, allocated on n0 + * Ask for 2 * 3.5G containers. (Reserved on n0/n1) + * + * 4) app2 submit to queue-d, AM container=0.5G, allocated on n2 + * Ask for 2 * 3.5G containers. (Reserved on n2/n3) + * + * First we will preempt container on n2 since it is the oldest container of + * Highest priority queue (d) + */ + + // Total preemption = 1G per round, which is 5% of cluster resource (20G) + setupQueueConfigurationForPriorityBasedPreemption(conf); + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer( + new String[] { USER1, USER3, USER0 }, + new String[] { USER1, USER3, USER0 }); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java index 8a7e03f856..c20e0914b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerSurgicalPreemption.java @@ -29,6 +29,9 @@ import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitor; import org.apache.hadoop.yarn.server.resourcemanager.monitor.SchedulingMonitorManager; import org.apache.hadoop.yarn.server.resourcemanager.monitor.capacity.ProportionalCapacityPreemptionPolicy; + +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels + .RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; @@ -86,8 +89,19 @@ public void testSimpleSurgicalPreemption() * After preemption, we should expect: * Preempt 4 containers from app1 on n1. */ - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); + testSimpleSurgicalPreemption("a", "c", "user", "user"); + } + + protected void testSimpleSurgicalPreemption(String queue1, String queue2, + String user1, String user2) + throws Exception { + + MockRM rm1 = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm1.start(); MockNM nm1 = rm1.registerNode("h1:1234", 20 * GB); @@ -97,7 +111,7 @@ public void testSimpleSurgicalPreemption() RMNode rmNode2 = rm1.getRMContext().getRMNodes().get(nm2.getNodeId()); // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "a"); + RMApp app1 = rm1.submitApp(1 * GB, "app", user1, null, queue1); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); am1.allocate("*", 1 * GB, 32, new ArrayList()); @@ -120,7 +134,7 @@ public void testSimpleSurgicalPreemption() // Submit app2 to queue-c and asks for a 1G container for AM - RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + RMApp app2 = rm1.submitApp(1 * GB, "app", user2, null, queue2); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); // NM1/NM2 has available resource = 2G/4G @@ -632,6 +646,21 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() * Highest priority queue (b) */ + // A/B has higher priority + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a" , 1); + conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); + conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); + + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(new + String[] {"a", "b", "c"}, new String[] {"user", "user", "user"}); + + } + + protected void + testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer(String[] + queues, String[] users) throws Exception { // Total preemption = 1G per round, which is 5% of cluster resource (20G) conf.setFloat(CapacitySchedulerConfiguration.TOTAL_PREEMPTION_PER_ROUND, 0.05f); @@ -641,15 +670,11 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() conf.setQueueOrderingPolicy(CapacitySchedulerConfiguration.ROOT, CapacitySchedulerConfiguration.QUEUE_PRIORITY_UTILIZATION_ORDERING_POLICY); - // A/B has higher priority - conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".a", 1); - conf.setQueuePriority(CapacitySchedulerConfiguration.ROOT + ".b", 2); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 45f); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 45f); - conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c", 10f); - - MockRM rm1 = new MockRM(conf); - rm1.getRMContext().setNodeLabelManager(mgr); + MockRM rm1 = new MockRM(conf) { + protected RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; rm1.start(); MockNM[] mockNMs = new MockNM[5]; @@ -665,7 +690,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() } // launch an app to queue, AM container should be launched in nm1 - RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c"); + RMApp app1 = rm1.submitApp(1 * GB, "app", users[2], null, queues[2]); MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, mockNMs[4]); am1.allocate("*", 1 * GB, 4, new ArrayList<>()); @@ -685,7 +710,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() } // Submit app2 to queue-a and asks for a 0.5G container for AM (on n0) - RMApp app2 = rm1.submitApp(512, "app", "user", null, "a"); + RMApp app2 = rm1.submitApp(512, "app", users[0], null, queues[0]); MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, mockNMs[0]); FiCaSchedulerApp schedulerApp2 = cs.getApplicationAttempt( ApplicationAttemptId.newInstance(app2.getApplicationId(), 1)); @@ -703,11 +728,11 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() Assert.assertNotNull("Should reserve on nm-" + i, cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) - .getReservedContainer().getQueueName(), "a"); + .getReservedContainer().getQueueName(), queues[0]); } // Submit app3 to queue-b and asks for a 0.5G container for AM (on n2) - RMApp app3 = rm1.submitApp(512, "app", "user", null, "b"); + RMApp app3 = rm1.submitApp(512, "app", users[1], null, queues[1]); MockAM am3 = MockRM.launchAndRegisterAM(app3, rm1, mockNMs[2]); FiCaSchedulerApp schedulerApp3 = cs.getApplicationAttempt( ApplicationAttemptId.newInstance(app3.getApplicationId(), 1)); @@ -725,7 +750,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() Assert.assertNotNull("Should reserve on nm-" + i, cs.getNode(rmNodes[i].getNodeID()).getReservedContainer()); Assert.assertEquals(cs.getNode(rmNodes[i].getNodeID()) - .getReservedContainer().getQueueName(), "b"); + .getReservedContainer().getQueueName(), queues[1]); } // Sleep the timeout interval, we should be able to see 1 container selected @@ -831,6 +856,7 @@ public void testPriorityPreemptionFromHighestPriorityQueueAndOldestContainer() rm1.close(); } + @Test(timeout = 60000) public void testPreemptionForFragmentatedCluster() throws Exception { // Set additional_balance_queue_based_on_reserved_res to true to get