YARN-10154. CS Dynamic Queues cannot be configured with absolute resources. Contributed by Manikandan R.

This commit is contained in:
Sunil G 2020-04-17 18:51:05 +05:30
parent 56350664a7
commit 2fe122e322
11 changed files with 573 additions and 26 deletions

View File

@ -17,6 +17,7 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common
@ -67,6 +68,22 @@ public void setEntitlement(QueueEntitlement entitlement)
setEntitlement(NO_LABEL, entitlement);
}
@Override
protected Resource getMinimumAbsoluteResource(String queuePath,
String label) {
return super.getMinimumAbsoluteResource(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
@Override
protected Resource getMaximumAbsoluteResource(String queuePath,
String label) {
return super.getMaximumAbsoluteResource(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(this.getParent().getQueuePath()),
label);
}
/**
* This methods to change capacity for a queue and adjusts its
* absoluteCapacity

View File

@ -524,16 +524,26 @@ private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
return unionInheritedWeights;
}
protected Resource getMinimumAbsoluteResource(String queuePath, String label) {
Resource minResource = csContext.getConfiguration()
.getMinimumResourceRequirement(label, queuePath, resourceTypes);
return minResource;
}
protected Resource getMaximumAbsoluteResource(String queuePath, String label) {
Resource maxResource = csContext.getConfiguration()
.getMaximumResourceRequirement(label, queuePath, resourceTypes);
return maxResource;
}
protected void updateConfigurableResourceRequirement(String queuePath,
Resource clusterResource) {
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
Set<String> configuredNodelabels = conf.getConfiguredNodeLabels(queuePath);
for (String label : configuredNodelabels) {
Resource minResource = conf.getMinimumResourceRequirement(label,
queuePath, resourceTypes);
Resource maxResource = conf.getMaximumResourceRequirement(label,
queuePath, resourceTypes);
Resource minResource = getMinimumAbsoluteResource(queuePath, label);
Resource maxResource = getMaximumAbsoluteResource(queuePath, label);
LOG.debug("capacityConfigType is '{}' for queue {}",
capacityConfigType, getQueuePath());

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.QueueEntitlement;
@ -74,6 +75,9 @@ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
writeLock.lock();
try {
this.getParent().updateClusterResource(this.csContext.getClusterResource(),
new ResourceLimits(this.csContext.getClusterResource()));
// TODO:
// reinitialize only capacities for now since 0 capacity updates
// can cause
@ -100,7 +104,7 @@ public void reinitializeFromTemplate(AutoCreatedLeafQueueConfig
}
}
private void mergeCapacities(QueueCapacities capacities) {
public void mergeCapacities(QueueCapacities capacities) {
for ( String nodeLabel : capacities.getExistingNodeLabels()) {
queueCapacities.setCapacity(nodeLabel,
capacities.getCapacity(nodeLabel));

View File

@ -557,6 +557,12 @@ public void setMaximumCapacityByLabel(String queue, String label,
float capacity) {
setFloat(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY, capacity);
}
public void setMaximumCapacityByLabel(String queue, String label,
String absoluteResourceCapacity) {
set(getNodeLabelPrefix(queue, label) + MAXIMUM_CAPACITY,
absoluteResourceCapacity);
}
public int getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT,
@ -1960,11 +1966,29 @@ public void setAutoCreatedLeafQueueConfigCapacity(String queuePath,
@Private
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
String label, float val) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
String leafQueueConfPrefix =
getAutoCreatedQueueTemplateConfPrefix(queuePath);
setCapacityByLabel(leafQueueConfPrefix, label, val);
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
String label, Resource resource) {
String leafQueueConfPrefix =
getAutoCreatedQueueTemplateConfPrefix(queuePath);
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
setCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueConfigMaxCapacity(String queuePath,
@ -1983,6 +2007,23 @@ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
setMaximumCapacityByLabel(leafQueueConfPrefix, label, val);
}
@Private
@VisibleForTesting
public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
String label, Resource resource) {
String leafQueueConfPrefix = getAutoCreatedQueueTemplateConfPrefix(
queuePath);
StringBuilder resourceString = new StringBuilder();
resourceString
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
+ resource.getMemorySize() + ","
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
+ resource.getVirtualCores() + "]");
setMaximumCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
}
@VisibleForTesting
@Private
public void setAutoCreatedLeafQueueConfigUserLimit(String queuePath,
@ -2115,7 +2156,6 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue,
if (subGroup.trim().isEmpty()) {
return Resources.none();
}
subGroup = subGroup.substring(1, subGroup.length() - 1);
for (String kvPair : subGroup.trim().split(",")) {
String[] splits = kvPair.split("=");

View File

@ -22,9 +22,10 @@
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AbstractCSQueue.CapacityConfigType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -150,7 +151,7 @@ private void reinitializeQueueManagementPolicy() throws IOException {
}
}
protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() {
protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() throws IOException {
AutoCreatedLeafQueueConfig.Builder builder =
new AutoCreatedLeafQueueConfig.Builder();
@ -158,16 +159,70 @@ protected AutoCreatedLeafQueueConfig.Builder initializeLeafQueueConfigs() {
String leafQueueTemplateConfPrefix = getLeafQueueConfigPrefix(
csContext.getConfiguration());
//Load template configuration
builder.configuration(
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix));
CapacitySchedulerConfiguration conf =
super.initializeLeafQueueConfigs(leafQueueTemplateConfPrefix);
builder.configuration(conf);
for (String nodeLabel : conf
.getConfiguredNodeLabels(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()))) {
Resource templateMinResource = conf.getMinimumResourceRequirement(
nodeLabel, csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes);
if (this.capacityConfigType.equals(CapacityConfigType.PERCENTAGE)
&& !templateMinResource.equals(Resources.none())) {
throw new IOException("Managed Parent Queue " + this.getQueuePath()
+ " config type is different from leaf queue template config type");
}
}
//Load template capacities
QueueCapacities queueCapacities = new QueueCapacities(false);
CSQueueUtils.loadUpdateAndCheckCapacities(csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
csContext.getConfiguration(), queueCapacities, getQueueCapacities());
builder.capacities(queueCapacities);
/**
* Populate leaf queue template (of Parent resources configured in
* ABSOLUTE_RESOURCE) capacities with actual values for which configured has
* been defined in ABSOLUTE_RESOURCE format.
*
*/
if (this.capacityConfigType.equals(CapacityConfigType.ABSOLUTE_RESOURCE)) {
for (String label : queueCapacities.getExistingNodeLabels()) {
queueCapacities.setCapacity(label,
this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
this.csContext.getConfiguration().getMinimumResourceRequirement(
label,
this.csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes),
getQueueResourceQuotas().getConfiguredMinResource(label)));
queueCapacities.setMaximumCapacity(label,
this.csContext.getResourceCalculator().divide(
this.csContext.getClusterResource(),
this.csContext.getConfiguration().getMaximumResourceRequirement(
label,
this.csContext.getConfiguration()
.getAutoCreatedQueueTemplateConfPrefix(getQueuePath()),
resourceTypes),
getQueueResourceQuotas().getConfiguredMaxResource(label)));
queueCapacities.setAbsoluteCapacity(label,
queueCapacities.getCapacity(label)
* getQueueCapacities().getAbsoluteCapacity(label));
queueCapacities.setAbsoluteMaximumCapacity(label,
queueCapacities.getMaximumCapacity(label)
* getQueueCapacities().getAbsoluteMaximumCapacity(label));
}
}
builder.capacities(queueCapacities);
return builder;
}

View File

@ -933,7 +933,6 @@ public boolean hasChildQueues() {
private void calculateEffectiveResourcesAndCapacity(String label,
Resource clusterResource) {
// For root queue, ensure that max/min resource is updated to latest
// cluster resource.
Resource resourceByLabel = labelManager.getResourceByLabel(label,
@ -1134,7 +1133,9 @@ private void deriveCapacityFromAbsoluteConfigurations(String label,
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
+ maxApplications + ", maxApplicationsPerUser="
+ maxApplicationsPerUser + ", Abs Cap:"
+ childQueue.getQueueCapacities().getAbsoluteCapacity(label));
+ childQueue.getQueueCapacities().getAbsoluteCapacity(label) + ", Cap: "
+ childQueue.getQueueCapacities().getCapacity(label) + ", MaxCap : "
+ childQueue.getQueueCapacities().getMaximumCapacity(label));
}
}

View File

@ -24,6 +24,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
.SchedulerDynamicEditException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity
@ -51,6 +52,7 @@
.FiCaSchedulerApp;
import org.apache.hadoop.yarn.util.Clock;
import org.apache.hadoop.yarn.util.MonotonicClock;
import org.apache.hadoop.yarn.util.resource.Resources;
import java.io.IOException;
import java.util.ArrayList;
@ -679,6 +681,19 @@ public void commitQueueManagementChanges(
LOG.debug("Queue is already de-activated. Skipping "
+ "de-activation : {}", leafQueue.getQueuePath());
} else{
/**
* While deactivating queues of type ABSOLUTE_RESOURCE, configured
* min resource has to be set based on updated capacity (which is
* again based on updated queue entitlements). Otherwise,
* ParentQueue#calculateEffectiveResourcesAndCapacity calculations
* leads to incorrect results.
*/
leafQueue
.mergeCapacities(updatedQueueTemplate.getQueueCapacities());
leafQueue.getQueueResourceQuotas()
.setConfiguredMinResource(Resources.multiply(
this.scheduler.getClusterResource(), updatedQueueTemplate
.getQueueCapacities().getCapacity(nodeLabel)));
deactivate(leafQueue, nodeLabel);
}
}

View File

@ -26,6 +26,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.junit.Assert;
import org.junit.Test;
@ -38,6 +39,7 @@ public class TestAbsoluteResourceConfiguration {
private static final String QUEUEA = "queueA";
private static final String QUEUEB = "queueB";
private static final String QUEUEC = "queueC";
private static final String QUEUED = "queueD";
private static final String QUEUEA1 = "queueA1";
private static final String QUEUEA2 = "queueA2";
private static final String QUEUEB1 = "queueB1";
@ -48,6 +50,9 @@ public class TestAbsoluteResourceConfiguration {
+ "." + QUEUEB;
private static final String QUEUEC_FULL = CapacitySchedulerConfiguration.ROOT
+ "." + QUEUEC;
private static final String QUEUED_FULL = CapacitySchedulerConfiguration.ROOT
+ "." + QUEUED;
private static final String QUEUEA1_FULL = QUEUEA_FULL + "." + QUEUEA1;
private static final String QUEUEA2_FULL = QUEUEA_FULL + "." + QUEUEA2;
private static final String QUEUEB1_FULL = QUEUEB_FULL + "." + QUEUEB1;
@ -66,15 +71,23 @@ public class TestAbsoluteResourceConfiguration {
10);
private static final Resource QUEUE_B_MAXRES = Resource.newInstance(150 * GB,
30);
private static final Resource QUEUE_C_MINRES = Resource.newInstance(50 * GB,
10);
private static final Resource QUEUE_C_MINRES = Resource.newInstance(25 * GB,
5);
private static final Resource QUEUE_C_MAXRES = Resource.newInstance(150 * GB,
20);
private static final Resource QUEUE_D_MINRES = Resource.newInstance(25 * GB,
5);
private static final Resource QUEUE_D_MAXRES = Resource.newInstance(150 * GB,
20);
private static final Resource QUEUEA_REDUCED = Resource.newInstance(64000, 6);
private static final Resource QUEUEB_REDUCED = Resource.newInstance(32000, 6);
private static final Resource QUEUEC_REDUCED = Resource.newInstance(32000, 6);
private static final Resource QUEUEC_REDUCED = Resource.newInstance(16000, 3);
private static final Resource QUEUEMAX_REDUCED = Resource.newInstance(128000,
20);
private static final Resource QUEUE_D_TEMPL_MINRES =
Resource.newInstance(25 * GB, 5);
private static final Resource QUEUE_D_TEMPL_MAXRES =
Resource.newInstance(150 * GB, 20);
private static Set<String> resourceTypes = new HashSet<>(
Arrays.asList("memory", "vcores"));
@ -83,15 +96,24 @@ private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
boolean isCapacityNeeded) {
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[]{QUEUEA, QUEUEB, QUEUEC});
new String[]{QUEUEA, QUEUEB, QUEUEC, QUEUED});
// Set default capacities like normal configuration.
if (isCapacityNeeded) {
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
csConf.setCapacity(QUEUED_FULL, 25f);
}
csConf.setAutoCreateChildQueueEnabled(QUEUED_FULL, true);
// Setup leaf queue template configs
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUED_FULL, "",
QUEUE_D_TEMPL_MINRES);
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUED_FULL, "",
QUEUE_D_TEMPL_MAXRES);
return csConf;
}
@ -122,10 +144,12 @@ private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
csConf.setMinimumResourceRequirement("", QUEUED_FULL, QUEUE_D_MINRES);
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUED_FULL, QUEUE_D_MAXRES);
return csConf;
}
@ -148,9 +172,10 @@ private CapacitySchedulerConfiguration setupComplexMinMaxResourceConfig(
}
@Test
public void testSimpleMinMaxResourceConfigurartionPerQueue() {
public void testSimpleMinMaxResourceConfigurartionPerQueue()
throws Exception {
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(true);
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(false);
setupMinMaxResourceConfiguration(csConf);
Assert.assertEquals("Min resource configured for QUEUEA is not correct",
@ -171,6 +196,69 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue() {
Assert.assertEquals("Max resource configured for QUEUEC is not correct",
QUEUE_C_MAXRES,
csConf.getMaximumResourceRequirement("", QUEUEC_FULL, resourceTypes));
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
@SuppressWarnings("resource")
MockRM rm = new MockRM(csConf);
rm.start();
// Add few nodes
rm.registerNode("127.0.0.1:1234", 250 * GB, 40);
// Get queue object to verify min/max resource configuration.
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(QUEUED);
AutoCreatedLeafQueue d1 = new AutoCreatedLeafQueue(cs, "d1", parentQueue);
cs.addQueue(d1);
/**
* After adding child queue d1, it occupies all entire resource
* of Managed Parent queue
*/
cs.getRootQueue().updateClusterResource(cs.getClusterResource(),
new ResourceLimits(cs.getClusterResource()));
Assert.assertEquals(QUEUE_D_TEMPL_MINRES,
d1.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals(QUEUE_D_TEMPL_MINRES,
d1.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d1.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d1.queueResourceQuotas.getEffectiveMaxResource());
/**
* After adding child queue d2, adjustment happens and both d1 and d2 shares
* resource of Managed Parent Queue
*/
AutoCreatedLeafQueue d2 = new AutoCreatedLeafQueue(cs, "d2", parentQueue);
cs.addQueue(d2);
cs.getRootQueue().updateClusterResource(cs.getClusterResource(),
new ResourceLimits(cs.getClusterResource()));
Assert.assertEquals(QUEUE_D_TEMPL_MINRES,
d2.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals(Resource.newInstance(12800, 2),
d2.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d2.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d2.queueResourceQuotas.getEffectiveMaxResource());
Assert.assertEquals(QUEUE_D_TEMPL_MINRES,
d1.queueResourceQuotas.getConfiguredMinResource());
Assert.assertEquals(Resource.newInstance(12800, 2),
d1.queueResourceQuotas.getEffectiveMinResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d1.queueResourceQuotas.getConfiguredMaxResource());
Assert.assertEquals(QUEUE_D_TEMPL_MAXRES,
d1.queueResourceQuotas.getEffectiveMaxResource());
rm.close();
}
@Test
@ -578,7 +666,7 @@ public void testEffectiveResourceAfterIncreasingClusterResource()
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource());
Assert.assertEquals("Absolute capacity for QUEUEC is not correct",
0.2, qC.getAbsoluteCapacity(), DELTA);
0.1, qC.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Absolute Max capacity for QUEUEC is not correct",
0.6, qC.getAbsoluteMaximumCapacity(), DELTA);
@ -645,7 +733,7 @@ public void testEffectiveResourceAfterIncreasingClusterResource()
Assert.assertEquals("Effective Max resource for QUEUEC is not correct",
QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource());
Assert.assertEquals("Absolute capacity for QUEUEC is not correct",
0.133, qC.getAbsoluteCapacity(), DELTA);
0.066, qC.getAbsoluteCapacity(), DELTA);
Assert.assertEquals("Absolute Max capacity for QUEUEC is not correct",
0.4, qC.getAbsoluteMaximumCapacity(), DELTA);

View File

@ -0,0 +1,277 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager.NO_LABEL;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.queuemanagement.GuaranteedOrZeroCapacityOverTimePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.policy.FifoOrderingPolicy;
import org.junit.Before;
import org.junit.Test;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class TestAbsoluteResourceWithAutoQueue
extends TestCapacitySchedulerAutoCreatedQueueBase {
private static final Logger LOG =
LoggerFactory.getLogger(TestAbsoluteResourceWithAutoQueue.class);
private static final String QUEUEA = "queueA";
private static final String QUEUEB = "queueB";
private static final String QUEUEC = "queueC";
private static final String QUEUED = "queueD";
private static final String QUEUEA_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEA;
private static final String QUEUEB_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEB;
private static final String QUEUEC_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUEC;
private static final String QUEUED_FULL =
CapacitySchedulerConfiguration.ROOT + "." + QUEUED;
private static final Resource QUEUE_A_MINRES =
Resource.newInstance(100 * GB, 10);
private static final Resource QUEUE_A_MAXRES =
Resource.newInstance(200 * GB, 30);
private static final Resource QUEUE_B_MINRES =
Resource.newInstance(50 * GB, 10);
private static final Resource QUEUE_B_MAXRES =
Resource.newInstance(150 * GB, 30);
private static final Resource QUEUE_C_MINRES =
Resource.newInstance(25 * GB, 5);
private static final Resource QUEUE_C_MAXRES =
Resource.newInstance(150 * GB, 20);
private static final Resource QUEUE_D_MINRES =
Resource.newInstance(25 * GB, 5);
private static final Resource QUEUE_D_MAXRES =
Resource.newInstance(150 * GB, 20);
@Before
public void setUp() throws Exception {
accessibleNodeLabelsOnC.add(NO_LABEL);
}
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
CapacitySchedulerConfiguration csConf) {
// Update min/max resource to queueA/B/C
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MINRES);
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MINRES);
csConf.setMinimumResourceRequirement("", QUEUED_FULL, QUEUE_D_MINRES);
csConf.setMaximumResourceRequirement("", QUEUEA_FULL, QUEUE_A_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEB_FULL, QUEUE_B_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUEC_FULL, QUEUE_C_MAXRES);
csConf.setMaximumResourceRequirement("", QUEUED_FULL, QUEUE_D_MAXRES);
return csConf;
}
public static CapacitySchedulerConfiguration setupQueueConfiguration(
CapacitySchedulerConfiguration conf) {
return conf;
}
private CapacitySchedulerConfiguration setupSimpleQueueConfiguration(
boolean isCapacityNeeded) {
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] { QUEUEA, QUEUEB, QUEUEC, QUEUED });
// Set default capacities like normal configuration.
if (isCapacityNeeded) {
csConf.setCapacity(QUEUEA_FULL, 50f);
csConf.setCapacity(QUEUEB_FULL, 25f);
csConf.setCapacity(QUEUEC_FULL, 25f);
csConf.setCapacity(QUEUED_FULL, 25f);
}
csConf.setAutoCreateChildQueueEnabled(QUEUEC_FULL, true);
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUEC_FULL, "",
QUEUE_C_MINRES);
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUEC_FULL, "",
QUEUE_C_MAXRES);
csConf.setAutoCreateChildQueueEnabled(QUEUED_FULL, true);
// Setup leaf queue template configs
csConf.setAutoCreatedLeafQueueTemplateCapacityByLabel(QUEUED_FULL, "",
Resource.newInstance(10 * GB, 2));
csConf.setAutoCreatedLeafQueueTemplateMaxCapacity(QUEUED_FULL, "",
QUEUE_D_MAXRES);
return csConf;
}
@Test(timeout = 20000)
public void testAutoCreateLeafQueueCreation() throws Exception {
try {
CapacitySchedulerConfiguration csConf =
setupSimpleQueueConfiguration(false);
setupMinMaxResourceConfiguration(csConf);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mockRM = new MockRM(csConf);
cs = (CapacityScheduler) mockRM.getResourceScheduler();
mockRM.start();
cs.start();
// Add few nodes
mockRM.registerNode("127.0.0.1:1234", 250 * GB, 40);
setupGroupQueueMappings(QUEUED, cs.getConfiguration(), "%user");
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
submitApp(mockRM, cs.getQueue(QUEUED), TEST_GROUPUSER, TEST_GROUPUSER, 1,
1);
AutoCreatedLeafQueue autoCreatedLeafQueue =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(QUEUED);
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue, 0.4f,
0.04f, 1f, 0.6f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue) autoCreatedLeafQueue, NO_LABEL);
Map<String, Float> expectedChildQueueAbsCapacity =
new HashMap<String, Float>() {
{
put(NO_LABEL, 0.04f);
}
};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER,
expectedChildQueueAbsCapacity, new HashSet<String>() {
{
add(NO_LABEL);
}
});
validateUserAndAppLimits(autoCreatedLeafQueue, 400, 400);
assertTrue(autoCreatedLeafQueue
.getOrderingPolicy() instanceof FifoOrderingPolicy);
ApplicationId user1AppId = submitApp(mockRM, cs.getQueue(QUEUED),
TEST_GROUPUSER1, TEST_GROUPUSER1, 2, 1);
AutoCreatedLeafQueue autoCreatedLeafQueue1 =
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER1);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue1, 0.4f,
0.04f, 1f, 0.6f);
validateCapacitiesByLabel((ManagedParentQueue) parentQueue,
(AutoCreatedLeafQueue) autoCreatedLeafQueue1, NO_LABEL);
assertEquals(parentQueue, autoCreatedLeafQueue1.getParent());
Map<String, Float> expectedChildQueueAbsCapacity1 =
new HashMap<String, Float>() {
{
put(NO_LABEL, 0.08f);
}
};
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER1,
expectedChildQueueAbsCapacity1, new HashSet<String>() {
{
add(NO_LABEL);
}
});
submitApp(mockRM, cs.getQueue(QUEUED), TEST_GROUPUSER2, TEST_GROUPUSER2,
3, 1);
final CSQueue autoCreatedLeafQueue2 = cs.getQueue(TEST_GROUPUSER2);
validateCapacities((AutoCreatedLeafQueue) autoCreatedLeafQueue2, 0.0f,
0.0f, 1f, 0.6f);
GuaranteedOrZeroCapacityOverTimePolicy autoCreatedQueueManagementPolicy =
(GuaranteedOrZeroCapacityOverTimePolicy) ((ManagedParentQueue) parentQueue)
.getAutoCreatedQueueManagementPolicy();
assertEquals(0.08f, autoCreatedQueueManagementPolicy
.getAbsoluteActivatedChildQueueCapacity(NO_LABEL), EPSILON);
cs.killAllAppsInQueue(TEST_GROUPUSER1);
mockRM.waitForState(user1AppId, RMAppState.KILLED);
List<QueueManagementChange> queueManagementChanges =
autoCreatedQueueManagementPolicy.computeQueueManagementChanges();
ManagedParentQueue managedParentQueue = (ManagedParentQueue) parentQueue;
managedParentQueue
.validateAndApplyQueueManagementChanges(queueManagementChanges);
validateDeactivatedQueueEntitlement(parentQueue, TEST_GROUPUSER1,
expectedChildQueueAbsCapacity1, queueManagementChanges);
Set<String> expectedNodeLabelsUpdated = new HashSet<>();
expectedNodeLabelsUpdated.add(NO_LABEL);
validateActivatedQueueEntitlement(parentQueue, TEST_GROUPUSER2,
expectedChildQueueAbsCapacity1, queueManagementChanges,
expectedNodeLabelsUpdated);
} finally {
cleanupQueue(TEST_GROUPUSER);
cleanupQueue(TEST_GROUPUSER1);
cleanupQueue(TEST_GROUPUSER2);
}
}
@Test(expected = Exception.class)
public void testValidateLeafQueueTemplateConfigurations() {
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(true);
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
mockRM = new MockRM(csConf);
fail("Exception should be thrown as leaf queue template configuration is "
+ "not same as Parent configuration");
}
}

View File

@ -146,6 +146,10 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
public static final String TEST_GROUP = "testusergroup";
public static final String TEST_GROUPUSER = "testuser";
public static final String TEST_GROUP1 = "testusergroup1";
public static final String TEST_GROUPUSER1 = "testuser1";
public static final String TEST_GROUP2 = "testusergroup2";
public static final String TEST_GROUPUSER2 = "testuser2";
public static final String USER = "user_";
public static final String USER0 = USER + 0;
public static final String USER1 = USER + 1;
@ -304,7 +308,8 @@ public static CapacitySchedulerConfiguration setupQueueMappings(
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class);
conf.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
TEST_GROUPUSER +"=" + TEST_GROUP + ";invalid_user=invalid_group");
TEST_GROUPUSER +"=" + TEST_GROUP + ";" + TEST_GROUPUSER1 +"="
+ TEST_GROUP1 + ";" + TEST_GROUPUSER2 + "=" + TEST_GROUP2 + ";invalid_user=invalid_group");
Groups.getUserToGroupsMappingServiceWithLoadedConfiguration(conf);
QueueMapping userQueueMapping = QueueMappingBuilder.create()
@ -315,7 +320,25 @@ public static CapacitySchedulerConfiguration setupQueueMappings(
leafQueueName))
.build();
QueueMapping userQueueMapping1 = QueueMappingBuilder.create()
.type(QueueMapping.MappingType.GROUP)
.source(TEST_GROUP1)
.queue(
getQueueMapping(parentQueue,
leafQueueName))
.build();
QueueMapping userQueueMapping2 = QueueMappingBuilder.create()
.type(QueueMapping.MappingType.GROUP)
.source(TEST_GROUP2)
.queue(
getQueueMapping(parentQueue,
leafQueueName))
.build();
queueMappings.add(userQueueMapping);
queueMappings.add(userQueueMapping1);
queueMappings.add(userQueueMapping2);
existingMappings.addAll(queueMappings);
conf.setQueueMappings(existingMappings);
return conf;

View File

@ -377,10 +377,11 @@ The parent queue which has been enabled for auto leaf queue creation,supports
| Property | Description |
|:---- |:---- |
| `yarn.scheduler.capacity.<queue-path>.leaf-queue-template.capacity` | *Mandatory* parameter: Specifies the minimum guaranteed capacity for the auto-created leaf queues. Currently *Absolute Resource* configurations are not supported on auto-created leaf queues |
| `yarn.scheduler.capacity.<queue-path>.leaf-queue-template.capacity` | *Mandatory* parameter: Specifies the minimum guaranteed capacity for the auto-created leaf queues. |
| `yarn.scheduler.capacity.<queue-path>.leaf-queue-template.maximum-capacity` | *Optional* parameter: Specifies the maximum capacity for the auto-created leaf queues. This value must be smaller than or equal to the cluster maximum. |
| `yarn.scheduler.capacity.<queue-path>.leaf-queue-template.<leaf-queue-property>` | *Optional* parameter: For other queue parameters that can be configured on auto-created leaf queues like maximum-capacity, user-limit-factor, maximum-am-resource-percent ... - Refer **Queue Properties** section |
Example:
Example 1:
```
<property>
@ -421,6 +422,22 @@ Example:
</property>
```
Example 2:
```
<property>
<name>yarn.scheduler.capacity.root.parent2.auto-create-child-queue.enabled</name>
<value>true</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.parent2.leaf-queue-template.capacity</name>
<value>[memory=1024,vcores=1]</value>
</property>
<property>
<name>yarn.scheduler.capacity.root.parent2.leaf-queue-template.maximum-capacity</name>
<value>[memory=10240,vcores=10]</value>
</property>
```
* Scheduling Edit Policy configuration for auto-created queue management
Admins need to specify an additional `org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueManagementDynamicEditPolicy` scheduling edit policy to the