From 3c9e3d5321134720afe1d7f280ed16dd3ac04b99 Mon Sep 17 00:00:00 2001 From: Tamas Domok Date: Fri, 1 Oct 2021 04:05:52 +0200 Subject: [PATCH] YARN-10960. Extract test queues and related methods from TestCapacityScheduler. Contributed by Tamas Domok Co-authored-by: Tamas Domok --- .../CapacitySchedulerQueueHelpers.java | 342 ++++++++++++++++ .../capacity/CapacitySchedulerTestBase.java | 22 -- .../capacity/TestCapacityScheduler.java | 370 +++--------------- .../TestCapacitySchedulerDynamicBehavior.java | 34 +- ...acitySchedulerWorkflowPriorityMapping.java | 15 + 5 files changed, 433 insertions(+), 350 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java new file mode 100644 index 0000000000..bc7e2b317a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java @@ -0,0 +1,342 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.junit.Assert; + +import static org.junit.Assert.assertEquals; + +public final class CapacitySchedulerQueueHelpers { + + public static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + public static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + public static final String A1 = A + ".a1"; + public static final String A2 = A + ".a2"; + public static final String B1 = B + ".b1"; + public static final String B2 = B + ".b2"; + public static final String B3 = B + ".b3"; + public static final float A_CAPACITY = 10.5f; + public static final float B_CAPACITY = 89.5f; + public static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; + public static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; + public static final String X1 = P1 + ".x1"; + public static final String X2 = P1 + ".x2"; + public static final String Y1 = P2 + ".y1"; + public static final String Y2 = P2 + ".y2"; + public static final float A1_CAPACITY = 30; + public static final float A2_CAPACITY = 70; + public static final float B1_CAPACITY = 79.2f; + public static final float B2_CAPACITY = 0.8f; + public static final float B3_CAPACITY = 20; + + private CapacitySchedulerQueueHelpers() { + throw new IllegalStateException("Utility class"); + } + + /** + * @param conf, to be modified + * @return + * root + * / \ + * a b + * / \ / | \ + * a1 a2 b1 b2 b3 + * + */ + public static CapacitySchedulerConfiguration setupQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[]{"b1", "b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + return conf; + } + + /** + * @param conf, to be modified + * @return CS configuration which has deleted all childred of queue(b) + * root + * / \ + * a b + * / \ + * a1 a2 + */ + public static CapacitySchedulerConfiguration setupQueueConfWithoutChildrenOfB( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + return conf; + } + + /** + * @param conf, to be modified + * @return CS configuration which has deleted a queue(b1) + * root + * / \ + * a b + * / \ | \ + * a1 a2 b2 b3 + */ + public static CapacitySchedulerConfiguration setupQueueConfigurationWithoutB1( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[]{"b2", "b3"}); + conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + return conf; + } + + /** + * @param conf, to be modified + * @return CS configuration which has converted b1 to parent queue + * root + * / \ + * a b + * / \ / | \ + * a1 a2 b1 b2 b3 + * | + * b11 + */ + public static CapacitySchedulerConfiguration setupQueueConfigurationWithB1AsParentQueue( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + conf.setQueues(B, new String[]{"b1", "b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + // Set childQueue for B1 + conf.setQueues(B1, new String[]{"b11"}); + final String b11 = B1 + ".b11"; + conf.setCapacity(b11, 100.0f); + conf.setUserLimitFactor(b11, 100.0f); + + return conf; + } + + /** + * @param conf, to be modified + * @return CS configuration which has deleted a Parent queue(b) + */ + public static CapacitySchedulerConfiguration setupQueueConfigurationWithoutB( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a"}); + + conf.setCapacity(A, A_CAPACITY + B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, A2_CAPACITY); + conf.setUserLimitFactor(A2, 100.0f); + + return conf; + } + + public static CapacitySchedulerConfiguration setupBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"a", "b"}); + + conf.setCapacity(A, 80f); + conf.setCapacity(B, 20f); + conf.setUserLimitFactor(A, 100); + conf.setUserLimitFactor(B, 100); + conf.setMaximumCapacity(A, 100); + conf.setMaximumCapacity(B, 100); + return conf; + } + + public static CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[]{"p1", "p2"}); + + conf.setCapacity(P1, 50f); + conf.setMaximumCapacity(P1, 50f); + conf.setCapacity(P2, 50f); + conf.setMaximumCapacity(P2, 100f); + // Define 2nd-level queues + conf.setQueues(P1, new String[]{"x1", "x2"}); + conf.setCapacity(X1, 80f); + conf.setMaximumCapacity(X1, 100f); + conf.setUserLimitFactor(X1, 2f); + conf.setCapacity(X2, 20f); + conf.setMaximumCapacity(X2, 100f); + conf.setUserLimitFactor(X2, 2f); + + conf.setQueues(P2, new String[]{"y1", "y2"}); + conf.setCapacity(Y1, 80f); + conf.setUserLimitFactor(Y1, 2f); + conf.setCapacity(Y2, 20f); + conf.setUserLimitFactor(Y2, 2f); + return conf; + } + + public static class ExpectedCapacities { + private final float capacity; + private final float absCapacity; + + public ExpectedCapacities(float capacity, float parentCapacity) { + this.capacity = capacity; + absCapacity = this.capacity * parentCapacity; + } + + public float getCapacity() { + return capacity; + } + + public float getAbsCapacity() { + return absCapacity; + } + } + + public static Map getDefaultCapacities(float capA, float capB) { + Map capacities = new HashMap<>(); + capacities.put(A, new ExpectedCapacities(capA, 1.0f)); + capacities.put(B, new ExpectedCapacities(capB, 1.0f)); + capacities.put(A1, new ExpectedCapacities((A1_CAPACITY / 100.0f), capA)); + capacities.put(A2, new ExpectedCapacities((A2_CAPACITY / 100.0f), capA)); + capacities.put(B1, new ExpectedCapacities((B1_CAPACITY / 100.0f), capB)); + capacities.put(B2, new ExpectedCapacities((B2_CAPACITY / 100.0f), capB)); + capacities.put(B3, new ExpectedCapacities((B3_CAPACITY / 100.0f), capB)); + return capacities; + } + + public static void checkQueueStructureCapacities(CapacityScheduler cs) { + float capA = A_CAPACITY / 100.0f; + float capB = B_CAPACITY / 100.0f; + checkQueueStructureCapacities(cs, getDefaultCapacities(capA, capB)); + } + + public static void checkQueueStructureCapacities(CapacityScheduler cs, + Map capacities) { + CSQueue rootQueue = cs.getRootQueue(); + for (Map.Entry entry : capacities.entrySet()) { + CSQueue queue = findQueue(rootQueue, entry.getKey()); + Assert.assertNotNull(queue); + assertQueueCapacities(queue, entry.getValue()); + } + } + + public static void assertQueueCapacities(CSQueue q, ExpectedCapacities capacities) { + final float epsilon = 1e-5f; + assertEquals("capacity", capacities.getCapacity(), q.getCapacity(), epsilon); + assertEquals("absolute capacity", capacities.getAbsCapacity(), + q.getAbsoluteCapacity(), epsilon); + assertEquals("maximum capacity", 1.0f, q.getMaximumCapacity(), epsilon); + assertEquals("absolute maximum capacity", 1.0f, q.getAbsoluteMaximumCapacity(), epsilon); + } + + public static CSQueue findQueue(CSQueue root, String queuePath) { + if (root.getQueuePath().equals(queuePath)) { + return root; + } + + List childQueues = root.getChildQueues(); + if (childQueues != null) { + for (CSQueue q : childQueues) { + if (queuePath.startsWith(q.getQueuePath())) { + CSQueue result = findQueue(q, queuePath); + if (result != null) { + return result; + } + } + } + } + + return null; + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java index f36b1e2486..daca16f3ef 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerTestBase.java @@ -29,28 +29,6 @@ public class CapacitySchedulerTestBase { protected final int GB = 1024; - protected static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - protected static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - protected static final String A1 = A + ".a1"; - protected static final String A2 = A + ".a2"; - protected static final String B1 = B + ".b1"; - protected static final String B2 = B + ".b2"; - protected static final String B3 = B + ".b3"; - protected static float A_CAPACITY = 10.5f; - protected static float B_CAPACITY = 89.5f; - protected static final String P1 = CapacitySchedulerConfiguration.ROOT + ".p1"; - protected static final String P2 = CapacitySchedulerConfiguration.ROOT + ".p2"; - protected static final String X1 = P1 + ".x1"; - protected static final String X2 = P1 + ".x2"; - protected static final String Y1 = P2 + ".y1"; - protected static final String Y2 = P2 + ".y2"; - protected static float A1_CAPACITY = 30; - protected static float A2_CAPACITY = 70; - protected static float B1_CAPACITY = 79.2f; - protected static float B2_CAPACITY = 0.8f; - protected static float B3_CAPACITY = 20; - - @SuppressWarnings("unchecked") protected Set toSet(E... elements) { Set set = Sets.newHashSet(elements); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index eea017848a..fa2ec08e13 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -19,6 +19,29 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import static org.apache.hadoop.yarn.server.resourcemanager.MockNM.createMockNodeStatus; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.ExpectedCapacities; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfWithoutChildrenOfB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithB1AsParentQueue; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfigurationWithoutB1; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB; @@ -644,225 +667,6 @@ private void nodeUpdate(NodeManager nm) { resourceManager.getResourceScheduler().handle(nodeUpdate); } - /** - * @param conf - * @return - * root - * / \ - * a b - * / \ / | \ - * a1 a2 b1 b2 b3 - * - */ - private CapacitySchedulerConfiguration setupQueueConfiguration( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); - - conf.setCapacity(A, A_CAPACITY); - conf.setCapacity(B, B_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] {"a1", "a2"}); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - conf.setQueues(B, new String[] {"b1", "b2", "b3"}); - conf.setCapacity(B1, B1_CAPACITY); - conf.setUserLimitFactor(B1, 100.0f); - conf.setCapacity(B2, B2_CAPACITY); - conf.setUserLimitFactor(B2, 100.0f); - conf.setCapacity(B3, B3_CAPACITY); - conf.setUserLimitFactor(B3, 100.0f); - - LOG.info("Setup top-level queues a and b"); - return conf; - } - - /** - * @param conf, to be modified - * @return, CS configuration which has deleted all childred of queue(b) - * root - * / \ - * a b - * / \ - * a1 a2 - */ - private CapacitySchedulerConfiguration setupQueueConfWithOutChildrenOfB( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[] {"a","b"}); - - conf.setCapacity(A, A_CAPACITY); - conf.setCapacity(B, B_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] {"a1","a2"}); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - LOG.info("Setup top-level queues a and b (without children)"); - return conf; - } - - /** - * @param conf, to be modified - * @return, CS configuration which has deleted a queue(b1) - * root - * / \ - * a b - * / \ | \ - * a1 a2 b2 b3 - */ - private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB1( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[] { "a", "b" }); - - conf.setCapacity(A, A_CAPACITY); - conf.setCapacity(B, B_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] { "a1", "a2" }); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - conf.setQueues(B, new String[] { "b2", "b3" }); - conf.setCapacity(B2, B2_CAPACITY + B1_CAPACITY); //as B1 is deleted - conf.setUserLimitFactor(B2, 100.0f); - conf.setCapacity(B3, B3_CAPACITY); - conf.setUserLimitFactor(B3, 100.0f); - - LOG.info("Setup top-level queues a and b (without b3)"); - return conf; - } - - /** - * @param conf, to be modified - * @return, CS configuration which has converted b1 to parent queue - * root - * / \ - * a b - * / \ / | \ - * a1 a2 b1 b2 b3 - * | - * b11 - */ - private CapacitySchedulerConfiguration - setupQueueConfigurationWithB1AsParentQueue( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[] { "a", "b" }); - - conf.setCapacity(A, A_CAPACITY); - conf.setCapacity(B, B_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] { "a1", "a2" }); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - conf.setQueues(B, new String[] {"b1","b2", "b3"}); - conf.setCapacity(B1, B1_CAPACITY); - conf.setUserLimitFactor(B1, 100.0f); - conf.setCapacity(B2, B2_CAPACITY); - conf.setUserLimitFactor(B2, 100.0f); - conf.setCapacity(B3, B3_CAPACITY); - conf.setUserLimitFactor(B3, 100.0f); - - // Set childQueue for B1 - conf.setQueues(B1, new String[] {"b11"}); - String B11 = B1 + ".b11"; - conf.setCapacity(B11, 100.0f); - conf.setUserLimitFactor(B11, 100.0f); - - return conf; - } - - /** - * @param conf, to be modified - * @return, CS configuration which has deleted a - * Parent queue(b) - */ - private CapacitySchedulerConfiguration setupQueueConfigurationWithOutB( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] { "a" }); - - conf.setCapacity(A, A_CAPACITY + B_CAPACITY); - - // Define 2nd-level queues - conf.setQueues(A, new String[] { "a1", "a2" }); - conf.setCapacity(A1, A1_CAPACITY); - conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, A2_CAPACITY); - conf.setUserLimitFactor(A2, 100.0f); - - LOG.info("Setup top-level queues a"); - return conf; - } - - - private CapacitySchedulerConfiguration setupBlockedQueueConfiguration( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[]{"a", "b"}); - - conf.setCapacity(A, 80f); - conf.setCapacity(B, 20f); - conf.setUserLimitFactor(A, 100); - conf.setUserLimitFactor(B, 100); - conf.setMaximumCapacity(A, 100); - conf.setMaximumCapacity(B, 100); - LOG.info("Setup top-level queues a and b"); - return conf; - } - - private CapacitySchedulerConfiguration setupOtherBlockedQueueConfiguration( - CapacitySchedulerConfiguration conf) { - - // Define top-level queues - conf.setQueues(CapacitySchedulerConfiguration.ROOT, - new String[]{"p1", "p2"}); - - conf.setCapacity(P1, 50f); - conf.setMaximumCapacity(P1, 50f); - conf.setCapacity(P2, 50f); - conf.setMaximumCapacity(P2, 100f); - // Define 2nd-level queues - conf.setQueues(P1, new String[] {"x1", "x2"}); - conf.setCapacity(X1, 80f); - conf.setMaximumCapacity(X1, 100f); - conf.setUserLimitFactor(X1, 2f); - conf.setCapacity(X2, 20f); - conf.setMaximumCapacity(X2, 100f); - conf.setUserLimitFactor(X2, 2f); - - conf.setQueues(P2, new String[]{"y1", "y2"}); - conf.setCapacity(Y1, 80f); - conf.setUserLimitFactor(Y1, 2f); - conf.setCapacity(Y2, 20f); - conf.setUserLimitFactor(Y2, 2f); - return conf; - } @Test public void testMaximumCapacitySetup() { @@ -925,76 +729,15 @@ null, new RMContainerTokenSecretManager(conf), cs.init(conf); cs.start(); cs.reinitialize(conf, rmContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, 80f, 20f); + checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f)); cs.stop(); } - void checkQueueCapacities(CapacityScheduler cs, - float capacityA, float capacityB) { - CSQueue rootQueue = cs.getRootQueue(); - CSQueue queueA = findQueue(rootQueue, A); - CSQueue queueB = findQueue(rootQueue, B); - CSQueue queueA1 = findQueue(queueA, A1); - CSQueue queueA2 = findQueue(queueA, A2); - CSQueue queueB1 = findQueue(queueB, B1); - CSQueue queueB2 = findQueue(queueB, B2); - CSQueue queueB3 = findQueue(queueB, B3); - - float capA = capacityA / 100.0f; - float capB = capacityB / 100.0f; - - checkQueueCapacity(queueA, capA, capA, 1.0f, 1.0f); - checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f); - checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f, - (A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f); - checkQueueCapacity(queueA2, A2_CAPACITY / 100.0f, - (A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f); - checkQueueCapacity(queueB1, B1_CAPACITY / 100.0f, - (B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f); - checkQueueCapacity(queueB2, B2_CAPACITY / 100.0f, - (B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f); - checkQueueCapacity(queueB3, B3_CAPACITY / 100.0f, - (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f); - } - - void checkQueueCapacity(CSQueue q, float expectedCapacity, - float expectedAbsCapacity, float expectedMaxCapacity, - float expectedAbsMaxCapacity) { - final float epsilon = 1e-5f; - assertEquals("capacity", expectedCapacity, q.getCapacity(), epsilon); - assertEquals("absolute capacity", expectedAbsCapacity, - q.getAbsoluteCapacity(), epsilon); - assertEquals("maximum capacity", expectedMaxCapacity, - q.getMaximumCapacity(), epsilon); - assertEquals("absolute maximum capacity", expectedAbsMaxCapacity, - q.getAbsoluteMaximumCapacity(), epsilon); - } - - CSQueue findQueue(CSQueue root, String queuePath) { - if (root.getQueuePath().equals(queuePath)) { - return root; - } - - List childQueues = root.getChildQueues(); - if (childQueues != null) { - for (CSQueue q : childQueues) { - if (queuePath.startsWith(q.getQueuePath())) { - CSQueue result = findQueue(q, queuePath); - if (result != null) { - return result; - } - } - } - } - - return null; - } - private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemorySize()); @@ -1110,35 +853,40 @@ public void testRefreshQueuesWithNewQueue() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, new RMContextImpl(null, null, null, null, null, - null, new RMContainerTokenSecretManager(conf), - new NMTokenSecretManagerInRM(conf), - new ClientToAMTokenSecretManagerInRM(), null)); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + null, new RMContainerTokenSecretManager(conf), + new NMTokenSecretManagerInRM(conf), + new ClientToAMTokenSecretManagerInRM(), null)); + checkQueueStructureCapacities(cs); // Add a new queue b4 - String B4 = B + ".b4"; - float B4_CAPACITY = 10; + final String b4 = B + ".b4"; + final float b4Capacity = 10; + final float modifiedB3Capacity = B3_CAPACITY - b4Capacity; - B3_CAPACITY -= B4_CAPACITY; try { conf.setCapacity(A, 80f); conf.setCapacity(B, 20f); - conf.setQueues(B, new String[] {"b1", "b2", "b3", "b4"}); + conf.setQueues(B, new String[]{"b1", "b2", "b3", "b4"}); conf.setCapacity(B1, B1_CAPACITY); conf.setCapacity(B2, B2_CAPACITY); - conf.setCapacity(B3, B3_CAPACITY); - conf.setCapacity(B4, B4_CAPACITY); - cs.reinitialize(conf,mockContext); - checkQueueCapacities(cs, 80f, 20f); + conf.setCapacity(B3, modifiedB3Capacity); + conf.setCapacity(b4, b4Capacity); + cs.reinitialize(conf, mockContext); + + final float capA = 80f / 100.0f; + final float capB = 20f / 100.0f; + Map expectedCapacities = getDefaultCapacities(capA, capB); + expectedCapacities.put(B3, new ExpectedCapacities(modifiedB3Capacity / 100.0f, capB)); + expectedCapacities.put(b4, new ExpectedCapacities(b4Capacity / 100.0f, capB)); + checkQueueStructureCapacities(cs, expectedCapacities); // Verify parent for B4 CSQueue rootQueue = cs.getRootQueue(); CSQueue queueB = findQueue(rootQueue, B); - CSQueue queueB4 = findQueue(queueB, B4); + CSQueue queueB4 = findQueue(queueB, b4); assertEquals(queueB, queueB4.getParent()); } finally { - B3_CAPACITY += B4_CAPACITY; cs.stop(); } } @@ -3111,7 +2859,7 @@ public void testRefreshQueuesMaxAllocationRefresh() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); assertEquals("max allocation in CS", YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, @@ -3206,7 +2954,7 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); CSQueue rootQueue = cs.getRootQueue(); CSQueue queueA = findQueue(rootQueue, A); @@ -3277,7 +3025,7 @@ public void testRefreshQueuesMaxAllocationCSError() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); assertEquals("max allocation MB in CS", 10240, cs.getMaximumResourceCapability().getMemorySize()); @@ -3323,7 +3071,7 @@ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); assertEquals("max allocation MB in CS", 10240, cs.getMaximumResourceCapability().getMemorySize()); @@ -3398,7 +3146,7 @@ public void testQueuesMaxAllocationInheritance() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); CSQueue rootQueue = cs.getRootQueue(); CSQueue queueA = findQueue(rootQueue, A); @@ -3509,7 +3257,7 @@ public void testVerifyQueuesMaxAllocationConf() throws Exception { cs.init(conf); cs.start(); cs.reinitialize(conf, mockContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT, "memory-mb=" + largerMem + ",vcores=2"); @@ -4944,7 +4692,7 @@ null, new RMContainerTokenSecretManager(conf), cs.init(conf); cs.start(); cs.reinitialize(conf, rmContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); // test delete leaf queue when there is application running. Map queues = @@ -4955,7 +4703,7 @@ null, new RMContainerTokenSecretManager(conf), .thenReturn(QueueState.STOPPED); cs.getCapacitySchedulerQueueManager().addQueue(b1QTobeDeleted, csB1Queue); conf = new CapacitySchedulerConfiguration(); - setupQueueConfigurationWithOutB1(conf); + setupQueueConfigurationWithoutB1(conf); try { cs.reinitialize(conf, mockContext); fail("Expected to throw exception when refresh queue tries to delete a" @@ -4966,7 +4714,7 @@ null, new RMContainerTokenSecretManager(conf), // test delete leaf queue(root.b.b1) when there is no application running. conf = new CapacitySchedulerConfiguration(); - setupQueueConfigurationWithOutB1(conf); + setupQueueConfigurationWithoutB1(conf); try { cs.reinitialize(conf, mockContext); } catch (IOException e) { @@ -4986,7 +4734,7 @@ null, new RMContainerTokenSecretManager(conf), conf = new CapacitySchedulerConfiguration(); setupQueueConfiguration(conf); cs.reinitialize(conf, rmContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); // set the configurations such that it fails once but should be successfull // next time @@ -5010,7 +4758,7 @@ null, new RMContainerTokenSecretManager(conf), // test delete Parent queue when there is application running. conf = new CapacitySchedulerConfiguration(); - setupQueueConfigurationWithOutB(conf); + setupQueueConfigurationWithoutB(conf); try { cs.reinitialize(conf, mockContext); fail("Expected to throw exception when refresh queue tries to delete a" @@ -5021,7 +4769,7 @@ null, new RMContainerTokenSecretManager(conf), // test delete Parent queue when there is no application running. conf = new CapacitySchedulerConfiguration(); - setupQueueConfigurationWithOutB(conf); + setupQueueConfigurationWithoutB(conf); try { cs.reinitialize(conf, mockContext); } catch (IOException e) { @@ -5061,7 +4809,7 @@ null, new RMContainerTokenSecretManager(conf), cs.init(conf); cs.start(); cs.reinitialize(conf, rmContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); // test delete all leaf queues when there is no application running. Map queues = @@ -5081,7 +4829,7 @@ null, new RMContainerTokenSecretManager(conf), cs.getCapacitySchedulerQueueManager().addQueue("b3", bQueue); conf = new CapacitySchedulerConfiguration(); - setupQueueConfWithOutChildrenOfB(conf); + setupQueueConfWithoutChildrenOfB(conf); // test convert parent queue to leaf queue(root.b) when there is no // application running. @@ -5138,7 +4886,7 @@ null, new RMContainerTokenSecretManager(conf), cs.init(conf); cs.start(); cs.reinitialize(conf, rmContext); - checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); String targetQueue = "b1"; CSQueue b1 = cs.getQueue(targetQueue); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java index eb17c61e8e..4dd537d3a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerDynamicBehavior.java @@ -18,6 +18,20 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.checkQueueStructureCapacities; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.getDefaultCapacities; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertTrue; import static org.junit.Assert.fail; @@ -44,20 +58,6 @@ public class TestCapacitySchedulerDynamicBehavior { private static final Logger LOG = LoggerFactory .getLogger(TestCapacitySchedulerDynamicBehavior.class); - private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - private static final String B1 = B + ".b1"; - private static final String B2 = B + ".b2"; - private static final String B3 = B + ".b3"; - private static float A_CAPACITY = 10.5f; - private static float B_CAPACITY = 89.5f; - private static float A1_CAPACITY = 30; - private static float A2_CAPACITY = 70; - private static float B1_CAPACITY = 79.2f; - private static float B2_CAPACITY = 0.8f; - private static float B3_CAPACITY = 20; - - private final TestCapacityScheduler tcs = new TestCapacityScheduler(); private int GB = 1024; @@ -98,7 +98,7 @@ public void testRefreshQueuesWithReservations() throws Exception { cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); // Verify all allocations match - tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); // Reinitialize and verify all dynamic queued survived CapacitySchedulerConfiguration conf = cs.getConfiguration(); @@ -106,7 +106,7 @@ public void testRefreshQueuesWithReservations() throws Exception { conf.setCapacity(B, 20f); cs.reinitialize(conf, rm.getRMContext()); - tcs.checkQueueCapacities(cs, 80f, 20f); + checkQueueStructureCapacities(cs, getDefaultCapacities(80f / 100.0f, 20f / 100.0f)); } @Test @@ -154,7 +154,7 @@ public void testAddQueueFailCases() throws Exception { cs.setEntitlement("a2", new QueueEntitlement(A2_CAPACITY / 100, 1.0f)); // Verify all allocations match - tcs.checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + checkQueueStructureCapacities(cs); cs.stop(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java index 63fc37e997..ba0fb9b64d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java @@ -18,6 +18,21 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A1_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.A2_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B1_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B2_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B3_CAPACITY; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.B_CAPACITY; + import static org.junit.Assert.assertEquals; import java.util.Arrays;