MAPREDUCE-3329. Fixed CapacityScheduler to ensure maximum-capacity cannot be lesser than capacity for any queue.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1205260 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
865cd32885
commit
41b1e3ffec
@ -170,6 +170,9 @@ Release 0.23.1 - Unreleased
|
||||
MAPREDUCE-3408. yarn-daemon.sh unconditionnaly sets yarn.root.logger
|
||||
(Bruno Mahe via mahadev)
|
||||
|
||||
MAPREDUCE-3329. Fixed CapacityScheduler to ensure maximum-capacity cannot
|
||||
be lesser than capacity for any queue. (acmurthy)
|
||||
|
||||
Release 0.23.0 - 2011-11-01
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -0,0 +1,17 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
class CSQueueUtils {
|
||||
|
||||
public static void checkMaxCapacity(String queueName,
|
||||
float capacity, float maximumCapacity) {
|
||||
if (maximumCapacity != CapacitySchedulerConfiguration.UNDEFINED &&
|
||||
maximumCapacity < capacity) {
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal call to setMaxCapacity. " +
|
||||
"Queue '" + queueName + "' has " +
|
||||
"capacity (" + capacity + ") greater than " +
|
||||
"maximumCapacity (" + maximumCapacity + ")" );
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -160,6 +160,10 @@ public int getMaximumCapacity(String queue) {
|
||||
return maxCapacity;
|
||||
}
|
||||
|
||||
public void setMaximumCapacity(String queue, int maxCapacity) {
|
||||
setInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
||||
}
|
||||
|
||||
public int getUserLimit(String queue) {
|
||||
int userLimit =
|
||||
getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
|
||||
|
@ -211,16 +211,19 @@ private int computeMaxActiveApplicationsPerUser(int maxActiveApplications,
|
||||
|
||||
private synchronized void setupQueueConfigs(
|
||||
float capacity, float absoluteCapacity,
|
||||
float maxCapacity, float absoluteMaxCapacity,
|
||||
float maximumCapacity, float absoluteMaxCapacity,
|
||||
int userLimit, float userLimitFactor,
|
||||
int maxApplications, int maxApplicationsPerUser,
|
||||
int maxActiveApplications, int maxActiveApplicationsPerUser,
|
||||
QueueState state, Map<QueueACL, AccessControlList> acls)
|
||||
{
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
||||
this.capacity = capacity;
|
||||
this.absoluteCapacity = parent.getAbsoluteCapacity() * capacity;
|
||||
|
||||
this.maximumCapacity = maxCapacity;
|
||||
this.maximumCapacity = maximumCapacity;
|
||||
this.absoluteMaxCapacity = absoluteMaxCapacity;
|
||||
|
||||
this.userLimit = userLimit;
|
||||
@ -236,9 +239,9 @@ private synchronized void setupQueueConfigs(
|
||||
|
||||
this.acls = acls;
|
||||
|
||||
this.queueInfo.setCapacity(capacity);
|
||||
this.queueInfo.setMaximumCapacity(maximumCapacity);
|
||||
this.queueInfo.setQueueState(state);
|
||||
this.queueInfo.setCapacity(this.capacity);
|
||||
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
|
||||
this.queueInfo.setQueueState(this.state);
|
||||
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||
@ -250,7 +253,7 @@ private synchronized void setupQueueConfigs(
|
||||
" [= (float) configuredCapacity / 100 ]" + "\n" +
|
||||
"asboluteCapacity = " + absoluteCapacity +
|
||||
" [= parentAbsoluteCapacity * capacity ]" + "\n" +
|
||||
"maxCapacity = " + maxCapacity +
|
||||
"maxCapacity = " + maximumCapacity +
|
||||
" [= configuredMaxCapacity ]" + "\n" +
|
||||
"absoluteMaxCapacity = " + absoluteMaxCapacity +
|
||||
" [= Float.MAX_VALUE if maximumCapacity undefined, " +
|
||||
@ -394,6 +397,9 @@ synchronized void setUsedCapacity(float usedCapacity) {
|
||||
* @param maximumCapacity new max capacity
|
||||
*/
|
||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
||||
this.maximumCapacity = maximumCapacity;
|
||||
this.absoluteMaxCapacity =
|
||||
(maximumCapacity == CapacitySchedulerConfiguration.UNDEFINED) ?
|
||||
|
@ -153,6 +153,9 @@ private synchronized void setupQueueConfigs(
|
||||
float maximumCapacity, float absoluteMaxCapacity,
|
||||
QueueState state, Map<QueueACL, AccessControlList> acls
|
||||
) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
||||
this.capacity = capacity;
|
||||
this.absoluteCapacity = absoluteCapacity;
|
||||
this.maximumCapacity = maximumCapacity;
|
||||
@ -162,9 +165,9 @@ private synchronized void setupQueueConfigs(
|
||||
|
||||
this.acls = acls;
|
||||
|
||||
this.queueInfo.setCapacity(capacity);
|
||||
this.queueInfo.setMaximumCapacity(maximumCapacity);
|
||||
this.queueInfo.setQueueState(state);
|
||||
this.queueInfo.setCapacity(this.capacity);
|
||||
this.queueInfo.setMaximumCapacity(this.maximumCapacity);
|
||||
this.queueInfo.setQueueState(this.state);
|
||||
|
||||
StringBuilder aclsString = new StringBuilder();
|
||||
for (Map.Entry<QueueACL, AccessControlList> e : acls.entrySet()) {
|
||||
@ -484,6 +487,9 @@ synchronized void setUtilization(float utilization) {
|
||||
* @param maximumCapacity new max capacity
|
||||
*/
|
||||
synchronized void setMaxCapacity(float maximumCapacity) {
|
||||
// Sanity check
|
||||
CSQueueUtils.checkMaxCapacity(getQueueName(), capacity, maximumCapacity);
|
||||
|
||||
this.maximumCapacity = maximumCapacity;
|
||||
float parentAbsoluteCapacity =
|
||||
(rootQueue) ? 100.0f : parent.getAbsoluteCapacity();
|
||||
|
@ -18,6 +18,8 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import junit.framework.Assert;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
@ -35,7 +37,6 @@ public void testQueueParsing() throws Exception {
|
||||
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
//capacityScheduler.g
|
||||
}
|
||||
|
||||
private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||
@ -104,4 +105,48 @@ public void testRootQueueParsing() throws Exception {
|
||||
CapacityScheduler capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
}
|
||||
|
||||
public void testMaxCapacity() throws Exception {
|
||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||
|
||||
conf.setQueues(CapacityScheduler.ROOT, new String[] {"a", "b", "c"});
|
||||
conf.setCapacity(CapacityScheduler.ROOT, 100);
|
||||
|
||||
final String A = CapacityScheduler.ROOT + ".a";
|
||||
conf.setCapacity(A, 50);
|
||||
conf.setMaximumCapacity(A, 60);
|
||||
|
||||
final String B = CapacityScheduler.ROOT + ".b";
|
||||
conf.setCapacity(B, 50);
|
||||
conf.setMaximumCapacity(B, 45); // Should throw an exception
|
||||
|
||||
|
||||
boolean fail = false;
|
||||
CapacityScheduler capacityScheduler;
|
||||
try {
|
||||
capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
fail = true;
|
||||
}
|
||||
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong maxCap",
|
||||
fail);
|
||||
|
||||
conf.setMaximumCapacity(B, 60);
|
||||
|
||||
// Now this should work
|
||||
capacityScheduler = new CapacityScheduler();
|
||||
capacityScheduler.reinitialize(conf, null, null);
|
||||
|
||||
fail = false;
|
||||
try {
|
||||
LeafQueue a = (LeafQueue)capacityScheduler.getQueue(A);
|
||||
a.setMaxCapacity(45);
|
||||
} catch (IllegalArgumentException iae) {
|
||||
fail = true;
|
||||
}
|
||||
Assert.assertTrue("Didn't throw IllegalArgumentException for wrong " +
|
||||
"setMaxCap", fail);
|
||||
}
|
||||
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user