YARN-7544. Use queue-path.capacity/maximum-capacity to specify absolute min/max resources. (Sunil G via wangda)
Change-Id: I685341be213eee500f51e02f01c91def89391c17
This commit is contained in:
parent
b7b8cd5324
commit
1012b901c8
@ -329,7 +329,9 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
||||
|
||||
public static final String DEFAULT_RESOURCE_TYPES = "memory,vcores";
|
||||
|
||||
public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "\\[([^\\]]+)";
|
||||
public static final String PATTERN_FOR_ABSOLUTE_RESOURCE = "^\\[[\\w\\.,\\-_=\\ /]+\\]$";
|
||||
|
||||
private static final Pattern RESOURCE_PATTERN = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE);
|
||||
|
||||
public enum AbsoluteResourceType {
|
||||
MEMORY, VCORES;
|
||||
@ -411,14 +413,29 @@ public void setMaximumApplicationMasterResourcePerQueuePercent(String queue,
|
||||
}
|
||||
|
||||
public float getNonLabeledQueueCapacity(String queue) {
|
||||
float capacity = queue.equals("root") ? 100.0f : getFloat(
|
||||
getQueuePrefix(queue) + CAPACITY, 0f);
|
||||
if (capacity < MINIMUM_CAPACITY_VALUE || capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||
throw new IllegalArgumentException("Illegal " +
|
||||
"capacity of " + capacity + " for queue " + queue);
|
||||
String configuredCapacity = get(getQueuePrefix(queue) + CAPACITY);
|
||||
boolean matcher = (configuredCapacity != null)
|
||||
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
|
||||
if (matcher) {
|
||||
// Return capacity in percentage as 0 for non-root queues and 100 for
|
||||
// root.From AbstractCSQueue, absolute resource will be parsed and
|
||||
// updated. Once nodes are added/removed in cluster, capacity in
|
||||
// percentage will also be re-calculated.
|
||||
return queue.equals("root") ? 100.0f : 0f;
|
||||
}
|
||||
LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue) +
|
||||
", capacity=" + capacity);
|
||||
|
||||
float capacity = queue.equals("root")
|
||||
? 100.0f
|
||||
: (configuredCapacity == null)
|
||||
? 0f
|
||||
: Float.parseFloat(configuredCapacity);
|
||||
if (capacity < MINIMUM_CAPACITY_VALUE
|
||||
|| capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||
throw new IllegalArgumentException(
|
||||
"Illegal " + "capacity of " + capacity + " for queue " + queue);
|
||||
}
|
||||
LOG.debug("CSConf - getCapacity: queuePrefix=" + getQueuePrefix(queue)
|
||||
+ ", capacity=" + capacity);
|
||||
return capacity;
|
||||
}
|
||||
|
||||
@ -433,10 +450,23 @@ public void setCapacity(String queue, float capacity) {
|
||||
}
|
||||
|
||||
public float getNonLabeledQueueMaximumCapacity(String queue) {
|
||||
float maxCapacity = getFloat(getQueuePrefix(queue) + MAXIMUM_CAPACITY,
|
||||
MAXIMUM_CAPACITY_VALUE);
|
||||
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE) ?
|
||||
MAXIMUM_CAPACITY_VALUE : maxCapacity;
|
||||
String configuredCapacity = get(getQueuePrefix(queue) + MAXIMUM_CAPACITY);
|
||||
boolean matcher = (configuredCapacity != null)
|
||||
&& RESOURCE_PATTERN.matcher(configuredCapacity).find();
|
||||
if (matcher) {
|
||||
// Return capacity in percentage as 0 for non-root queues and 100 for
|
||||
// root.From AbstractCSQueue, absolute resource will be parsed and
|
||||
// updated. Once nodes are added/removed in cluster, capacity in
|
||||
// percentage will also be re-calculated.
|
||||
return 100.0f;
|
||||
}
|
||||
|
||||
float maxCapacity = (configuredCapacity == null)
|
||||
? MAXIMUM_CAPACITY_VALUE
|
||||
: Float.parseFloat(configuredCapacity);
|
||||
maxCapacity = (maxCapacity == DEFAULT_MAXIMUM_CAPACITY_VALUE)
|
||||
? MAXIMUM_CAPACITY_VALUE
|
||||
: maxCapacity;
|
||||
return maxCapacity;
|
||||
}
|
||||
|
||||
@ -590,6 +620,16 @@ public Set<String> getAccessibleNodeLabels(String queue) {
|
||||
private float internalGetLabeledQueueCapacity(String queue, String label, String suffix,
|
||||
float defaultValue) {
|
||||
String capacityPropertyName = getNodeLabelPrefix(queue, label) + suffix;
|
||||
boolean matcher = (capacityPropertyName != null)
|
||||
&& RESOURCE_PATTERN.matcher(capacityPropertyName).find();
|
||||
if (matcher) {
|
||||
// Return capacity in percentage as 0 for non-root queues and 100 for
|
||||
// root.From AbstractCSQueue, absolute resource will be parsed and
|
||||
// updated. Once nodes are added/removed in cluster, capacity in
|
||||
// percentage will also be re-calculated.
|
||||
return defaultValue;
|
||||
}
|
||||
|
||||
float capacity = getFloat(capacityPropertyName, defaultValue);
|
||||
if (capacity < MINIMUM_CAPACITY_VALUE
|
||||
|| capacity > MAXIMUM_CAPACITY_VALUE) {
|
||||
@ -1722,7 +1762,7 @@ public static String getUnits(String resourceValue) {
|
||||
public Resource getMinimumResourceRequirement(String label, String queue,
|
||||
Set<String> resourceTypes) {
|
||||
return internalGetLabeledResourceRequirementForQueue(queue, label,
|
||||
resourceTypes, MINIMUM_RESOURCE);
|
||||
resourceTypes, CAPACITY);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -1739,19 +1779,19 @@ public Resource getMinimumResourceRequirement(String label, String queue,
|
||||
public Resource getMaximumResourceRequirement(String label, String queue,
|
||||
Set<String> resourceTypes) {
|
||||
return internalGetLabeledResourceRequirementForQueue(queue, label,
|
||||
resourceTypes, MAXIMUM_RESOURCE);
|
||||
resourceTypes, MAXIMUM_CAPACITY);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMinimumResourceRequirement(String label, String queue,
|
||||
Resource resource) {
|
||||
updateMinMaxResourceToConf(label, queue, resource, MINIMUM_RESOURCE);
|
||||
updateMinMaxResourceToConf(label, queue, resource, CAPACITY);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setMaximumResourceRequirement(String label, String queue,
|
||||
Resource resource) {
|
||||
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_RESOURCE);
|
||||
updateMinMaxResourceToConf(label, queue, resource, MAXIMUM_CAPACITY);
|
||||
}
|
||||
|
||||
private void updateMinMaxResourceToConf(String label, String queue,
|
||||
@ -1786,8 +1826,8 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue,
|
||||
|
||||
// Define resource here.
|
||||
Resource resource = Resource.newInstance(0l, 0);
|
||||
Matcher matcher = Pattern.compile(PATTERN_FOR_ABSOLUTE_RESOURCE)
|
||||
.matcher(resourceString);
|
||||
Matcher matcher = RESOURCE_PATTERN.matcher(resourceString);
|
||||
|
||||
/*
|
||||
* Absolute resource configuration for a queue will be grouped by "[]".
|
||||
* Syntax of absolute resource config could be like below
|
||||
@ -1795,11 +1835,12 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue,
|
||||
*/
|
||||
if (matcher.find()) {
|
||||
// Get the sub-group.
|
||||
String subGroup = matcher.group(1);
|
||||
String subGroup = matcher.group(0);
|
||||
if (subGroup.trim().isEmpty()) {
|
||||
return Resources.none();
|
||||
}
|
||||
|
||||
subGroup = subGroup.substring(1, subGroup.length() - 1);
|
||||
for (String kvPair : subGroup.trim().split(",")) {
|
||||
String[] splits = kvPair.split("=");
|
||||
|
||||
|
@ -27,7 +27,6 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -233,7 +232,12 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue()
|
||||
@Test
|
||||
public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
|
||||
/**
|
||||
* Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
|
||||
* Queue structure is as follows.
|
||||
* root
|
||||
* / | \
|
||||
* a b c
|
||||
* / \ |
|
||||
* a1 a2 b1
|
||||
*
|
||||
* Test below cases 1) Configure percentage based capacity and absolute
|
||||
* resource together. 2) As per above tree structure, ensure all values
|
||||
@ -258,21 +262,15 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
|
||||
// Get queue object to verify min/max resource configuration.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
||||
// 1. Create a new config with capcity and min/max together. Ensure an
|
||||
// exception is thrown.
|
||||
// 1. Create a new config with min/max.
|
||||
CapacitySchedulerConfiguration csConf1 = setupSimpleQueueConfiguration(
|
||||
true);
|
||||
setupMinMaxResourceConfiguration(csConf1);
|
||||
|
||||
try {
|
||||
cs.reinitialize(csConf1, rm.getRMContext());
|
||||
Assert.fail();
|
||||
} catch (IOException e) {
|
||||
Assert.assertTrue(e instanceof IOException);
|
||||
Assert.assertEquals(
|
||||
"Failed to re-init queues : Queue 'queueA' should use either"
|
||||
+ " percentage based capacity configuration or absolute resource.",
|
||||
e.getMessage());
|
||||
Assert.fail();
|
||||
}
|
||||
rm.stop();
|
||||
|
||||
@ -368,7 +366,12 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception {
|
||||
@Test
|
||||
public void testComplexValidateAbsoluteResourceConfig() throws Exception {
|
||||
/**
|
||||
* Queue structure is as follows. root / | \ a b c / \ | a1 a2 b1
|
||||
* Queue structure is as follows.
|
||||
* root
|
||||
* / | \
|
||||
* a b c
|
||||
* / \ |
|
||||
* a1 a2 b1
|
||||
*
|
||||
* Test below cases: 1) Parent and its child queues must use either
|
||||
* percentage based or absolute resource configuration. 2) Parent's min
|
||||
@ -396,11 +399,6 @@ public void testComplexValidateAbsoluteResourceConfig() throws Exception {
|
||||
csConf.setCapacity(QUEUEB_FULL, 25f);
|
||||
csConf.setCapacity(QUEUEC_FULL, 25f);
|
||||
|
||||
// Also unset resource based config.
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resources.none());
|
||||
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resources.none());
|
||||
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, Resources.none());
|
||||
|
||||
// Get queue object to verify min/max resource configuration.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
try {
|
||||
|
Loading…
Reference in New Issue
Block a user