MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. Contributed by Jason Lowe.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1241659 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
90bc439b66
commit
59b88655bc
@ -755,6 +755,9 @@ Release 0.23.1 - Unreleased
|
||||
requesting containers so that scheduler can give off data local containers
|
||||
correctly. (Siddarth Seth via vinodkv)
|
||||
|
||||
MAPREDUCE-3833. Fixed a bug in reinitiaziling of queues. (Jason Lowe via
|
||||
acmurthy)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -368,6 +368,12 @@ public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||
|
||||
ParentQueue parentQueue = (ParentQueue)queue;
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource,
|
||||
parentQueue.capacity, parentQueue.absoluteCapacity,
|
||||
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
|
||||
parentQueue.state, parentQueue.acls);
|
||||
|
||||
// Re-configure existing child queues and add new ones
|
||||
// The CS has already checked to ensure all existing child queues are present!
|
||||
Map<String, CSQueue> currentChildQueues = getQueues(childQueues);
|
||||
@ -389,12 +395,6 @@ public synchronized void reinitialize(CSQueue queue, Resource clusterResource)
|
||||
// Re-sort all queues
|
||||
childQueues.clear();
|
||||
childQueues.addAll(currentChildQueues.values());
|
||||
|
||||
// Set new configs
|
||||
setupQueueConfigs(clusterResource,
|
||||
parentQueue.capacity, parentQueue.absoluteCapacity,
|
||||
parentQueue.maximumCapacity, parentQueue.absoluteMaxCapacity,
|
||||
parentQueue.state, parentQueue.acls);
|
||||
}
|
||||
|
||||
Map<String, CSQueue> getQueues(Set<CSQueue> queues) {
|
||||
|
@ -18,11 +18,12 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import junit.framework.Assert;
|
||||
import junit.framework.TestCase;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
@ -47,6 +48,21 @@
|
||||
public class TestCapacityScheduler {
|
||||
private static final Log LOG = LogFactory.getLog(TestCapacityScheduler.class);
|
||||
|
||||
private static final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
private static final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
private static final String A1 = A + ".a1";
|
||||
private static final String A2 = A + ".a2";
|
||||
private static final String B1 = B + ".b1";
|
||||
private static final String B2 = B + ".b2";
|
||||
private static final String B3 = B + ".b3";
|
||||
private static int A_CAPACITY = 10;
|
||||
private static int B_CAPACITY = 90;
|
||||
private static int A1_CAPACITY = 30;
|
||||
private static int A2_CAPACITY = 70;
|
||||
private static int B1_CAPACITY = 50;
|
||||
private static int B2_CAPACITY = 30;
|
||||
private static int B3_CAPACITY = 20;
|
||||
|
||||
private ResourceManager resourceManager = null;
|
||||
|
||||
@Before
|
||||
@ -200,35 +216,102 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"});
|
||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
||||
|
||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||
conf.setCapacity(A, 10);
|
||||
|
||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||
conf.setCapacity(B, 90);
|
||||
conf.setCapacity(A, A_CAPACITY);
|
||||
conf.setCapacity(B, B_CAPACITY);
|
||||
|
||||
// Define 2nd-level queues
|
||||
final String A1 = A + ".a1";
|
||||
final String A2 = A + ".a2";
|
||||
conf.setQueues(A, new String[] {"a1", "a2"});
|
||||
conf.setCapacity(A1, 30);
|
||||
conf.setCapacity(A1, A1_CAPACITY);
|
||||
conf.setUserLimitFactor(A1, 100.0f);
|
||||
conf.setCapacity(A2, 70);
|
||||
conf.setCapacity(A2, A2_CAPACITY);
|
||||
conf.setUserLimitFactor(A2, 100.0f);
|
||||
|
||||
final String B1 = B + ".b1";
|
||||
final String B2 = B + ".b2";
|
||||
final String B3 = B + ".b3";
|
||||
conf.setQueues(B, new String[] {"b1", "b2", "b3"});
|
||||
conf.setCapacity(B1, 50);
|
||||
conf.setCapacity(B1, B1_CAPACITY);
|
||||
conf.setUserLimitFactor(B1, 100.0f);
|
||||
conf.setCapacity(B2, 30);
|
||||
conf.setCapacity(B2, B2_CAPACITY);
|
||||
conf.setUserLimitFactor(B2, 100.0f);
|
||||
conf.setCapacity(B3, 20);
|
||||
conf.setCapacity(B3, B3_CAPACITY);
|
||||
conf.setUserLimitFactor(B3, 100.0f);
|
||||
|
||||
LOG.info("Setup top-level queues a and b");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRefreshQueues() throws Exception {
|
||||
CapacityScheduler cs = new CapacityScheduler();
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
setupQueueConfiguration(conf);
|
||||
cs.reinitialize(conf, null, null);
|
||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||
|
||||
conf.setCapacity(A, 80);
|
||||
conf.setCapacity(B, 20);
|
||||
cs.reinitialize(conf, null,null);
|
||||
checkQueueCapacities(cs, 80, 20);
|
||||
}
|
||||
|
||||
private void checkQueueCapacities(CapacityScheduler cs,
|
||||
int capacityA, int capacityB) {
|
||||
CSQueue rootQueue = cs.getRootQueue();
|
||||
CSQueue queueA = findQueue(rootQueue, A);
|
||||
CSQueue queueB = findQueue(rootQueue, B);
|
||||
CSQueue queueA1 = findQueue(queueA, A1);
|
||||
CSQueue queueA2 = findQueue(queueA, A2);
|
||||
CSQueue queueB1 = findQueue(queueB, B1);
|
||||
CSQueue queueB2 = findQueue(queueB, B2);
|
||||
CSQueue queueB3 = findQueue(queueB, B3);
|
||||
|
||||
float capA = capacityA / 100.0f;
|
||||
float capB = capacityB / 100.0f;
|
||||
|
||||
checkQueueCapacity(queueA, capA, capA, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f,
|
||||
(A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueA2, (float)A2_CAPACITY / 100.0f,
|
||||
(A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueB1, (float)B1_CAPACITY / 100.0f,
|
||||
(B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueB2, (float)B2_CAPACITY / 100.0f,
|
||||
(B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||
checkQueueCapacity(queueB3, (float)B3_CAPACITY / 100.0f,
|
||||
(B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||
}
|
||||
|
||||
private void checkQueueCapacity(CSQueue q, float expectedCapacity,
|
||||
float expectedAbsCapacity, float expectedMaxCapacity,
|
||||
float expectedAbsMaxCapacity) {
|
||||
final float epsilon = 1e-5f;
|
||||
assertEquals("capacity", expectedCapacity, q.getCapacity(), epsilon);
|
||||
assertEquals("absolute capacity", expectedAbsCapacity,
|
||||
q.getAbsoluteCapacity(), epsilon);
|
||||
assertEquals("maximum capacity", expectedMaxCapacity,
|
||||
q.getMaximumCapacity(), epsilon);
|
||||
assertEquals("absolute maximum capacity", expectedAbsMaxCapacity,
|
||||
q.getAbsoluteMaximumCapacity(), epsilon);
|
||||
}
|
||||
|
||||
private CSQueue findQueue(CSQueue root, String queuePath) {
|
||||
if (root.getQueuePath().equals(queuePath)) {
|
||||
return root;
|
||||
}
|
||||
|
||||
List<CSQueue> childQueues = root.getChildQueues();
|
||||
if (childQueues != null) {
|
||||
for (CSQueue q : childQueues) {
|
||||
if (queuePath.startsWith(q.getQueuePath())) {
|
||||
CSQueue result = findQueue(q, queuePath);
|
||||
if (result != null) {
|
||||
return result;
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private void checkApplicationResourceUsage(int expected,
|
||||
Application application) {
|
||||
Assert.assertEquals(expected, application.getUsedResources().getMemory());
|
||||
|
Loading…
Reference in New Issue
Block a user