From 59b88655bc15535d7bf765987cb9b82f17e16b80 Mon Sep 17 00:00:00 2001 From: Arun Murthy Date: Tue, 7 Feb 2012 22:08:55 +0000 Subject: [PATCH] MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. Contributed by Jason Lowe. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241659 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-mapreduce-project/CHANGES.txt | 3 + .../scheduler/capacity/ParentQueue.java | 12 +- .../capacity/TestCapacityScheduler.java | 115 +++++++++++++++--- 3 files changed, 108 insertions(+), 22 deletions(-) diff --git a/hadoop-mapreduce-project/CHANGES.txt b/hadoop-mapreduce-project/CHANGES.txt index 82a7331a9f..11063761f7 100644 --- a/hadoop-mapreduce-project/CHANGES.txt +++ b/hadoop-mapreduce-project/CHANGES.txt @@ -755,6 +755,9 @@ Release 0.23.1 - Unreleased requesting containers so that scheduler can give off data local containers correctly. (Siddarth Seth via vinodkv) + MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. (Jason Lowe via + acmurthy) + Release 0.23.0 - 2011-11-01 INCOMPATIBLE CHANGES diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 4010aa0ce0..da7b195889 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -368,6 +368,12 @@ public synchronized void reinitialize(CSQueue queue, Resource clusterResource) ParentQueue parentQueue = (ParentQueue)queue; + // Set new configs + setupQueueConfigs(clusterResource, + parentQueue.capacity, parentQueue.absoluteCapacity, + parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity, + parentQueue.state, parentQueue.acls); + // Re-configure existing child queues and add new ones // The CS has already checked to ensure all existing child queues are present! Map currentChildQueues = getQueues(childQueues); @@ -389,12 +395,6 @@ public synchronized void reinitialize(CSQueue queue, Resource clusterResource) // Re-sort all queues childQueues.clear(); childQueues.addAll(currentChildQueues.values()); - - // Set new configs - setupQueueConfigs(clusterResource, - parentQueue.capacity, parentQueue.absoluteCapacity, - parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity, - parentQueue.state, parentQueue.acls); } Map getQueues(Set queues) { diff --git a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 57d62aaa46..bcfd09d3c8 100644 --- a/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-mapreduce-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -18,11 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; import java.io.IOException; +import java.util.List; import junit.framework.Assert; -import junit.framework.TestCase; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; @@ -47,6 +48,21 @@ public class TestCapacityScheduler { private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class); + private static final String A = CapacitySchedulerConfiguration.ROOT + ".a"; + private static final String B = CapacitySchedulerConfiguration.ROOT + ".b"; + private static final String A1 = A + ".a1"; + private static final String A2 = A + ".a2"; + private static final String B1 = B + ".b1"; + private static final String B2 = B + ".b2"; + private static final String B3 = B + ".b3"; + private static int A_CAPACITY = 10; + private static int B_CAPACITY = 90; + private static int A1_CAPACITY = 30; + private static int A2_CAPACITY = 70; + private static int B1_CAPACITY = 50; + private static int B2_CAPACITY = 30; + private static int B3_CAPACITY = 20; + private ResourceManager resourceManager = null; @Before @@ -200,35 +216,102 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) { conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100); - final String A = CapacitySchedulerConfiguration.ROOT + ".a"; - conf.setCapacity(A, 10); - - final String B = CapacitySchedulerConfiguration.ROOT + ".b"; - conf.setCapacity(B, 90); + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); // Define 2nd-level queues - final String A1 = A + ".a1"; - final String A2 = A + ".a2"; conf.setQueues(A, new String[] {"a1", "a2"}); - conf.setCapacity(A1, 30); + conf.setCapacity(A1, A1_CAPACITY); conf.setUserLimitFactor(A1, 100.0f); - conf.setCapacity(A2, 70); + conf.setCapacity(A2, A2_CAPACITY); conf.setUserLimitFactor(A2, 100.0f); - final String B1 = B + ".b1"; - final String B2 = B + ".b2"; - final String B3 = B + ".b3"; conf.setQueues(B, new String[] {"b1", "b2", "b3"}); - conf.setCapacity(B1, 50); + conf.setCapacity(B1, B1_CAPACITY); conf.setUserLimitFactor(B1, 100.0f); - conf.setCapacity(B2, 30); + conf.setCapacity(B2, B2_CAPACITY); conf.setUserLimitFactor(B2, 100.0f); - conf.setCapacity(B3, 20); + conf.setCapacity(B3, B3_CAPACITY); conf.setUserLimitFactor(B3, 100.0f); LOG.info("Setup top-level queues a and b"); } + @Test + public void testRefreshQueues() throws Exception { + CapacityScheduler cs = new CapacityScheduler(); + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + cs.reinitialize(conf, null, null); + checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY); + + conf.setCapacity(A, 80); + conf.setCapacity(B, 20); + cs.reinitialize(conf, null,null); + checkQueueCapacities(cs, 80, 20); + } + + private void checkQueueCapacities(CapacityScheduler cs, + int capacityA, int capacityB) { + CSQueue rootQueue = cs.getRootQueue(); + CSQueue queueA = findQueue(rootQueue, A); + CSQueue queueB = findQueue(rootQueue, B); + CSQueue queueA1 = findQueue(queueA, A1); + CSQueue queueA2 = findQueue(queueA, A2); + CSQueue queueB1 = findQueue(queueB, B1); + CSQueue queueB2 = findQueue(queueB, B2); + CSQueue queueB3 = findQueue(queueB, B3); + + float capA = capacityA / 100.0f; + float capB = capacityB / 100.0f; + + checkQueueCapacity(queueA, capA, capA, 1.0f, 1.0f); + checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f); + checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f, + (A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f); + checkQueueCapacity(queueA2, (float)A2_CAPACITY / 100.0f, + (A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f); + checkQueueCapacity(queueB1, (float)B1_CAPACITY / 100.0f, + (B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f); + checkQueueCapacity(queueB2, (float)B2_CAPACITY / 100.0f, + (B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f); + checkQueueCapacity(queueB3, (float)B3_CAPACITY / 100.0f, + (B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f); + } + + private void checkQueueCapacity(CSQueue q, float expectedCapacity, + float expectedAbsCapacity, float expectedMaxCapacity, + float expectedAbsMaxCapacity) { + final float epsilon = 1e-5f; + assertEquals("capacity", expectedCapacity, q.getCapacity(), epsilon); + assertEquals("absolute capacity", expectedAbsCapacity, + q.getAbsoluteCapacity(), epsilon); + assertEquals("maximum capacity", expectedMaxCapacity, + q.getMaximumCapacity(), epsilon); + assertEquals("absolute maximum capacity", expectedAbsMaxCapacity, + q.getAbsoluteMaximumCapacity(), epsilon); + } + + private CSQueue findQueue(CSQueue root, String queuePath) { + if (root.getQueuePath().equals(queuePath)) { + return root; + } + + List childQueues = root.getChildQueues(); + if (childQueues != null) { + for (CSQueue q : childQueues) { + if (queuePath.startsWith(q.getQueuePath())) { + CSQueue result = findQueue(q, queuePath); + if (result != null) { + return result; + } + } + } + } + + return null; + } + private void checkApplicationResourceUsage(int expected, Application application) { Assert.assertEquals(expected, application.getUsedResources().getMemory());