MAPREDUCE-4311. Capacity scheduler.xml does not accept decimal values for capacity and maximum-capacity settings (Karthik Kambatla via tgraves)
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1351700 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
277b3dd736
commit
01b61c76a8
@ -581,6 +581,9 @@ Release 0.23.3 - UNRELEASED
|
|||||||
MAPREDUCE-3927. Shuffle hang when set map.failures.percent
|
MAPREDUCE-3927. Shuffle hang when set map.failures.percent
|
||||||
(Bhallamudi Venkata Siva Kamesh via tgraves)
|
(Bhallamudi Venkata Siva Kamesh via tgraves)
|
||||||
|
|
||||||
|
MAPREDUCE-4311. Capacity scheduler.xml does not accept decimal values for
|
||||||
|
capacity and maximum-capacity settings (Karthik Kambatla via tgraves)
|
||||||
|
|
||||||
Release 0.23.2 - UNRELEASED
|
Release 0.23.2 - UNRELEASED
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -80,13 +80,13 @@ public class CapacitySchedulerConfiguration extends Configuration {
|
|||||||
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
|
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT = 0.1f;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final int UNDEFINED = -1;
|
public static final float UNDEFINED = -1;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final int MINIMUM_CAPACITY_VALUE = 1;
|
public static final float MINIMUM_CAPACITY_VALUE = 1;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final int MAXIMUM_CAPACITY_VALUE = 100;
|
public static final float MAXIMUM_CAPACITY_VALUE = 100;
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
public static final int DEFAULT_USER_LIMIT = 100;
|
public static final int DEFAULT_USER_LIMIT = 100;
|
||||||
@ -132,8 +132,8 @@ public float getMaximumApplicationMasterResourcePercent() {
|
|||||||
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
|
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getCapacity(String queue) {
|
public float getCapacity(String queue) {
|
||||||
int capacity = getInt(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
|
float capacity = getFloat(getQueuePrefix(queue) + CAPACITY, UNDEFINED);
|
||||||
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
|
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||||
throw new IllegalArgumentException("Illegal " +
|
throw new IllegalArgumentException("Illegal " +
|
||||||
"capacity of " + capacity + " for queue " + queue);
|
"capacity of " + capacity + " for queue " + queue);
|
||||||
@ -143,31 +143,31 @@ public int getCapacity(String queue) {
|
|||||||
return capacity;
|
return capacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setCapacity(String queue, int capacity) {
|
public void setCapacity(String queue, float capacity) {
|
||||||
setInt(getQueuePrefix(queue) + CAPACITY, capacity);
|
setFloat(getQueuePrefix(queue) + CAPACITY, capacity);
|
||||||
LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
LOG.debug("CSConf - setCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
||||||
", capacity=" + capacity);
|
", capacity=" + capacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getMaximumCapacity(String queue) {
|
public float getMaximumCapacity(String queue) {
|
||||||
int maxCapacity =
|
float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY,
|
||||||
getInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, MAXIMUM_CAPACITY_VALUE);
|
MAXIMUM_CAPACITY_VALUE);
|
||||||
return maxCapacity;
|
return maxCapacity;
|
||||||
}
|
}
|
||||||
|
|
||||||
public void setMaximumCapacity(String queue, int maxCapacity) {
|
public void setMaximumCapacity(String queue, float maxCapacity) {
|
||||||
if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
if (maxCapacity > MAXIMUM_CAPACITY_VALUE) {
|
||||||
throw new IllegalArgumentException("Illegal " +
|
throw new IllegalArgumentException("Illegal " +
|
||||||
"maximum-capacity of " + maxCapacity + " for queue " + queue);
|
"maximum-capacity of " + maxCapacity + " for queue " + queue);
|
||||||
}
|
}
|
||||||
setInt(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
setFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY, maxCapacity);
|
||||||
LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
LOG.debug("CSConf - setMaxCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
||||||
", maxCapacity=" + maxCapacity);
|
", maxCapacity=" + maxCapacity);
|
||||||
}
|
}
|
||||||
|
|
||||||
public int getUserLimit(String queue) {
|
public int getUserLimit(String queue) {
|
||||||
int userLimit =
|
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
|
||||||
getInt(getQueuePrefix(queue) + USER_LIMIT, DEFAULT_USER_LIMIT);
|
DEFAULT_USER_LIMIT);
|
||||||
return userLimit;
|
return userLimit;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -111,7 +111,7 @@ public ParentQueue(CapacitySchedulerContext cs,
|
|||||||
cs.getConfiguration().getEnableUserMetrics(),
|
cs.getConfiguration().getEnableUserMetrics(),
|
||||||
cs.getConf());
|
cs.getConf());
|
||||||
|
|
||||||
int rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
|
float rawCapacity = cs.getConfiguration().getCapacity(getQueuePath());
|
||||||
|
|
||||||
if (rootQueue &&
|
if (rootQueue &&
|
||||||
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
|
(rawCapacity != CapacitySchedulerConfiguration.MAXIMUM_CAPACITY_VALUE)) {
|
||||||
|
@ -59,13 +59,13 @@ public class TestCapacityScheduler {
|
|||||||
private static final String B1 = B + ".b1";
|
private static final String B1 = B + ".b1";
|
||||||
private static final String B2 = B + ".b2";
|
private static final String B2 = B + ".b2";
|
||||||
private static final String B3 = B + ".b3";
|
private static final String B3 = B + ".b3";
|
||||||
private static int A_CAPACITY = 10;
|
private static float A_CAPACITY = 10.5f;
|
||||||
private static int B_CAPACITY = 90;
|
private static float B_CAPACITY = 89.5f;
|
||||||
private static int A1_CAPACITY = 30;
|
private static float A1_CAPACITY = 30;
|
||||||
private static int A2_CAPACITY = 70;
|
private static float A2_CAPACITY = 70;
|
||||||
private static int B1_CAPACITY = 50;
|
private static float B1_CAPACITY = 50;
|
||||||
private static int B2_CAPACITY = 30;
|
private static float B2_CAPACITY = 30;
|
||||||
private static int B3_CAPACITY = 20;
|
private static float B3_CAPACITY = 20;
|
||||||
|
|
||||||
private ResourceManager resourceManager = null;
|
private ResourceManager resourceManager = null;
|
||||||
|
|
||||||
@ -250,14 +250,14 @@ public void testRefreshQueues() throws Exception {
|
|||||||
cs.reinitialize(conf, null, null);
|
cs.reinitialize(conf, null, null);
|
||||||
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
|
||||||
|
|
||||||
conf.setCapacity(A, 80);
|
conf.setCapacity(A, 80f);
|
||||||
conf.setCapacity(B, 20);
|
conf.setCapacity(B, 20f);
|
||||||
cs.reinitialize(conf, null,null);
|
cs.reinitialize(conf, null,null);
|
||||||
checkQueueCapacities(cs, 80, 20);
|
checkQueueCapacities(cs, 80f, 20f);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void checkQueueCapacities(CapacityScheduler cs,
|
private void checkQueueCapacities(CapacityScheduler cs,
|
||||||
int capacityA, int capacityB) {
|
float capacityA, float capacityB) {
|
||||||
CSQueue rootQueue = cs.getRootQueue();
|
CSQueue rootQueue = cs.getRootQueue();
|
||||||
CSQueue queueA = findQueue(rootQueue, A);
|
CSQueue queueA = findQueue(rootQueue, A);
|
||||||
CSQueue queueB = findQueue(rootQueue, B);
|
CSQueue queueB = findQueue(rootQueue, B);
|
||||||
@ -274,13 +274,13 @@ private void checkQueueCapacities(CapacityScheduler cs,
|
|||||||
checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f);
|
checkQueueCapacity(queueB, capB, capB, 1.0f, 1.0f);
|
||||||
checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f,
|
checkQueueCapacity(queueA1, A1_CAPACITY / 100.0f,
|
||||||
(A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
(A1_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
||||||
checkQueueCapacity(queueA2, (float)A2_CAPACITY / 100.0f,
|
checkQueueCapacity(queueA2, A2_CAPACITY / 100.0f,
|
||||||
(A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
(A2_CAPACITY/100.0f) * capA, 1.0f, 1.0f);
|
||||||
checkQueueCapacity(queueB1, (float)B1_CAPACITY / 100.0f,
|
checkQueueCapacity(queueB1, B1_CAPACITY / 100.0f,
|
||||||
(B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
(B1_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||||
checkQueueCapacity(queueB2, (float)B2_CAPACITY / 100.0f,
|
checkQueueCapacity(queueB2, B2_CAPACITY / 100.0f,
|
||||||
(B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
(B2_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||||
checkQueueCapacity(queueB3, (float)B3_CAPACITY / 100.0f,
|
checkQueueCapacity(queueB3, B3_CAPACITY / 100.0f,
|
||||||
(B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
(B3_CAPACITY/100.0f) * capB, 1.0f, 1.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -340,7 +340,7 @@ public void testParseQueue() throws IOException {
|
|||||||
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
setupQueueConfiguration(conf);
|
setupQueueConfiguration(conf);
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT + ".a.a1", new String[] {"b1"} );
|
||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100);
|
conf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||||
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
conf.setUserLimitFactor(CapacitySchedulerConfiguration.ROOT + ".a.a1.b1", 100.0f);
|
||||||
|
|
||||||
cs.reinitialize(conf, null, null);
|
cs.reinitialize(conf, null, null);
|
||||||
|
@ -135,7 +135,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|||||||
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
|
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
||||||
conf.setCapacity(Q_A, 9);
|
conf.setCapacity(Q_A, 8.5f);
|
||||||
conf.setMaximumCapacity(Q_A, 20);
|
conf.setMaximumCapacity(Q_A, 20);
|
||||||
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
conf.setAcl(Q_A, QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
|
||||||
@ -145,7 +145,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
|||||||
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
|
conf.setAcl(Q_B, QueueACL.SUBMIT_APPLICATIONS, "*");
|
||||||
|
|
||||||
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
||||||
conf.setCapacity(Q_C, 1);
|
conf.setCapacity(Q_C, 1.5f);
|
||||||
conf.setMaximumCapacity(Q_C, 10);
|
conf.setMaximumCapacity(Q_C, 10);
|
||||||
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
|
conf.setAcl(Q_C, QueueACL.SUBMIT_APPLICATIONS, " ");
|
||||||
|
|
||||||
@ -208,8 +208,8 @@ public void testInitializeQueue() throws Exception {
|
|||||||
//can add more sturdy test with 3-layer queues
|
//can add more sturdy test with 3-layer queues
|
||||||
//once MAPREDUCE:3410 is resolved
|
//once MAPREDUCE:3410 is resolved
|
||||||
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
assertEquals(0.09, a.getCapacity(), epsilon);
|
assertEquals(0.085, a.getCapacity(), epsilon);
|
||||||
assertEquals(0.09, a.getAbsoluteCapacity(), epsilon);
|
assertEquals(0.085, a.getAbsoluteCapacity(), epsilon);
|
||||||
assertEquals(0.2, a.getMaximumCapacity(), epsilon);
|
assertEquals(0.2, a.getMaximumCapacity(), epsilon);
|
||||||
assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
|
assertEquals(0.2, a.getAbsoluteMaximumCapacity(), epsilon);
|
||||||
|
|
||||||
@ -220,8 +220,8 @@ public void testInitializeQueue() throws Exception {
|
|||||||
assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
|
assertEquals(0.99, b.getAbsoluteMaximumCapacity(), epsilon);
|
||||||
|
|
||||||
ParentQueue c = (ParentQueue)queues.get(C);
|
ParentQueue c = (ParentQueue)queues.get(C);
|
||||||
assertEquals(0.01, c.getCapacity(), epsilon);
|
assertEquals(0.015, c.getCapacity(), epsilon);
|
||||||
assertEquals(0.01, c.getAbsoluteCapacity(), epsilon);
|
assertEquals(0.015, c.getAbsoluteCapacity(), epsilon);
|
||||||
assertEquals(0.1, c.getMaximumCapacity(), epsilon);
|
assertEquals(0.1, c.getMaximumCapacity(), epsilon);
|
||||||
assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
|
assertEquals(0.1, c.getAbsoluteMaximumCapacity(), epsilon);
|
||||||
}
|
}
|
||||||
|
@ -18,16 +18,27 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import static org.junit.Assert.*;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.mockito.Mockito.*;
|
import static org.junit.Assert.assertFalse;
|
||||||
|
import static org.junit.Assert.assertTrue;
|
||||||
|
import static org.mockito.Matchers.any;
|
||||||
|
import static org.mockito.Matchers.eq;
|
||||||
|
import static org.mockito.Mockito.doAnswer;
|
||||||
|
import static org.mockito.Mockito.doReturn;
|
||||||
|
import static org.mockito.Mockito.inOrder;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.reset;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
import java.util.Map;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.commons.logging.Log;
|
import org.apache.commons.logging.Log;
|
||||||
import org.apache.commons.logging.LogFactory;
|
import org.apache.commons.logging.LogFactory;
|
||||||
|
import org.apache.hadoop.security.UserGroupInformation;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
@ -35,10 +46,6 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.NodeType;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||||
import org.apache.hadoop.security.UserGroupInformation;
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
|
||||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
|
||||||
|
|
||||||
import org.junit.After;
|
import org.junit.After;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -289,10 +296,10 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
|
|||||||
conf.setCapacity(Q_B, 50);
|
conf.setCapacity(Q_B, 50);
|
||||||
|
|
||||||
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
||||||
conf.setCapacity(Q_C, 20);
|
conf.setCapacity(Q_C, 19.5f);
|
||||||
|
|
||||||
final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
|
final String Q_D = CapacitySchedulerConfiguration.ROOT + "." + D;
|
||||||
conf.setCapacity(Q_D, 20);
|
conf.setCapacity(Q_D, 20.5f);
|
||||||
|
|
||||||
// Define 2-nd level queues
|
// Define 2-nd level queues
|
||||||
conf.setQueues(Q_A, new String[] {A1, A2});
|
conf.setQueues(Q_A, new String[] {A1, A2});
|
||||||
|
@ -124,11 +124,11 @@ private static void setupQueueConfiguration(
|
|||||||
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
conf.setCapacity(CapacitySchedulerConfiguration.ROOT, 100);
|
||||||
|
|
||||||
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
final String A = CapacitySchedulerConfiguration.ROOT + ".a";
|
||||||
conf.setCapacity(A, 10);
|
conf.setCapacity(A, 10.5f);
|
||||||
conf.setMaximumCapacity(A, 50);
|
conf.setMaximumCapacity(A, 50);
|
||||||
|
|
||||||
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
final String B = CapacitySchedulerConfiguration.ROOT + ".b";
|
||||||
conf.setCapacity(B, 90);
|
conf.setCapacity(B, 89.5f);
|
||||||
|
|
||||||
// Define 2nd-level queues
|
// Define 2nd-level queues
|
||||||
final String A1 = A + ".a1";
|
final String A1 = A + ".a1";
|
||||||
|
Loading…
Reference in New Issue
Block a user