YARN-11070. Minimum resource ratio is overridden by subsequent labels. Contributed by Andras Gyori
This commit is contained in:
parent
ad0a1dc897
commit
7dd288ce8c
@ -1298,7 +1298,7 @@ void updateEffectiveResources(Resource clusterResource) {
|
||||
CapacityConfigType.ABSOLUTE_RESOURCE)) {
|
||||
newEffectiveMinResource = createNormalizedMinResource(
|
||||
usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label),
|
||||
((ParentQueue) parent).getEffectiveMinRatioPerResource());
|
||||
((ParentQueue) parent).getEffectiveMinRatio(label));
|
||||
|
||||
// Max resource of a queue should be the minimum of {parent's maxResources,
|
||||
// this queue's maxResources}. Both parent's maxResources and this queue's
|
||||
|
@ -25,6 +25,7 @@
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableList;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
@ -101,11 +102,11 @@ public class ParentQueue extends AbstractCSQueue {
|
||||
|
||||
private AutoCreatedQueueTemplate autoCreatedQueueTemplate;
|
||||
|
||||
// effective min ratio per resource, it is used during updateClusterResource,
|
||||
// leaf queue can use this to calculate effective resources.
|
||||
// This field will not be edited, reference will point to a new immutable map
|
||||
// after every time recalculation
|
||||
private volatile Map<String, Float> effectiveMinRatioPerResource;
|
||||
// A ratio of the queue's effective minimum resource and the summary of the configured
|
||||
// minimum resource of its children grouped by labels and calculated for each resource names
|
||||
// distinctively.
|
||||
private final Map<String, Map<String, Float>> effectiveMinResourceRatio =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
public ParentQueue(CapacitySchedulerQueueContext queueContext,
|
||||
String queueName, CSQueue parent, CSQueue old) throws IOException {
|
||||
@ -1328,8 +1329,8 @@ private void calculateEffectiveResourcesAndCapacity(String label,
|
||||
}
|
||||
}
|
||||
|
||||
effectiveMinRatioPerResource = getEffectiveMinRatioPerResource(
|
||||
configuredMinResources, numeratorForMinRatio);
|
||||
effectiveMinResourceRatio.put(label, getEffectiveMinRatio(
|
||||
configuredMinResources, numeratorForMinRatio));
|
||||
|
||||
// Update effective resources for my self;
|
||||
if (rootQueue) {
|
||||
@ -1340,7 +1341,7 @@ private void calculateEffectiveResourcesAndCapacity(String label,
|
||||
}
|
||||
}
|
||||
|
||||
private Map<String, Float> getEffectiveMinRatioPerResource(
|
||||
private Map<String, Float> getEffectiveMinRatio(
|
||||
Resource configuredMinResources, Resource numeratorForMinRatio) {
|
||||
Map<String, Float> effectiveMinRatioPerResource = new HashMap<>();
|
||||
if (numeratorForMinRatio != null) {
|
||||
@ -1637,9 +1638,8 @@ void decrementRunnableApps() {
|
||||
}
|
||||
}
|
||||
|
||||
// This is a locking free method
|
||||
Map<String, Float> getEffectiveMinRatioPerResource() {
|
||||
return effectiveMinRatioPerResource;
|
||||
Map<String, Float> getEffectiveMinRatio(String label) {
|
||||
return effectiveMinResourceRatio.get(label);
|
||||
}
|
||||
|
||||
@Override
|
||||
|
@ -22,12 +22,15 @@
|
||||
import java.util.HashSet;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceLimits;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -91,6 +94,8 @@ public class TestAbsoluteResourceConfiguration {
|
||||
Resource.newInstance(25 * GB, 5);
|
||||
private static final Resource QUEUE_D_TEMPL_MAXRES =
|
||||
Resource.newInstance(150 * GB, 20);
|
||||
public static final String X_LABEL = "X";
|
||||
public static final String Y_LABEL = "Y";
|
||||
|
||||
private static Set<String> resourceTypes = new HashSet<>(
|
||||
Arrays.asList("memory", "vcores"));
|
||||
@ -141,6 +146,26 @@ private CapacitySchedulerConfiguration setupComplexQueueConfiguration(
|
||||
return csConf;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration setupLabeledConfiguration(
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
csConf.setMinimumResourceRequirement("", QUEUEA_FULL, Resource.newInstance(20 * GB, 8));
|
||||
csConf.setMinimumResourceRequirement("", QUEUEB_FULL, Resource.newInstance(10 * GB, 3));
|
||||
csConf.setMinimumResourceRequirement("", QUEUEC_FULL, Resource.newInstance(10 * GB, 2));
|
||||
csConf.setMinimumResourceRequirement("", QUEUED_FULL, Resource.newInstance(10 * GB, 2));
|
||||
|
||||
csConf.setMinimumResourceRequirement(X_LABEL, QUEUEA_FULL, Resource.newInstance(20 * GB, 8));
|
||||
csConf.setMinimumResourceRequirement(X_LABEL, QUEUEB_FULL, Resource.newInstance(10 * GB, 3));
|
||||
csConf.setMinimumResourceRequirement(X_LABEL, QUEUEC_FULL, Resource.newInstance(10 * GB, 2));
|
||||
csConf.setMinimumResourceRequirement(X_LABEL, QUEUED_FULL, Resource.newInstance(10 * GB, 2));
|
||||
|
||||
csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEA_FULL, Resource.newInstance(2 * GB, 1));
|
||||
csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEB_FULL, Resource.newInstance(2 * GB, 1));
|
||||
csConf.setMinimumResourceRequirement(Y_LABEL, QUEUEC_FULL, Resource.newInstance(2 * GB, 1));
|
||||
csConf.setMinimumResourceRequirement(Y_LABEL, QUEUED_FULL, Resource.newInstance(2 * GB, 2));
|
||||
|
||||
return csConf;
|
||||
}
|
||||
|
||||
private CapacitySchedulerConfiguration setupMinMaxResourceConfiguration(
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
|
||||
@ -576,6 +601,42 @@ public void testValidateAbsoluteResourceConfig() throws Exception {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testDownscalingForLabels() throws Exception {
|
||||
CapacitySchedulerConfiguration csConf = setupSimpleQueueConfiguration(false);
|
||||
setupLabeledConfiguration(csConf);
|
||||
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
|
||||
MockRM rm = new MockRM(csConf);
|
||||
rm.start();
|
||||
|
||||
MockNM nm1 = rm.registerNode("127.0.0.1:1234", 8 * GB, 5);
|
||||
MockNM nm2 = rm.registerNode("127.0.0.2:1234", 8 * GB, 5);
|
||||
MockNM nm3 = rm.registerNode("127.0.0.3:1234", 8 * GB, 5);
|
||||
MockNM nm4 = rm.registerNode("127.0.0.4:1234", 8 * GB, 5);
|
||||
|
||||
rm.getRMContext().getNodeLabelManager().addToCluserNodeLabelsWithDefaultExclusivity(
|
||||
ImmutableSet.of(X_LABEL, Y_LABEL));
|
||||
rm.getRMContext().getNodeLabelManager().addLabelsToNode(
|
||||
ImmutableMap.of(nm1.getNodeId(), ImmutableSet.of(X_LABEL),
|
||||
nm2.getNodeId(), ImmutableSet.of(X_LABEL),
|
||||
nm3.getNodeId(), ImmutableSet.of(X_LABEL),
|
||||
nm4.getNodeId(), ImmutableSet.of(Y_LABEL)));
|
||||
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
CSQueue root = cs.getRootQueue();
|
||||
root.updateClusterResource(cs.getClusterResource(), new ResourceLimits(cs.getClusterResource()));
|
||||
|
||||
Resource childrenResource = root.getChildQueues().stream().map(q -> q.getEffectiveCapacity(
|
||||
X_LABEL)).reduce(Resources::add).orElse(Resource.newInstance(0, 0));
|
||||
|
||||
Assert.assertTrue("Children of root have more resource than overall cluster resource",
|
||||
Resources.greaterThan(cs.getResourceCalculator(), cs.getClusterResource(),
|
||||
root.getEffectiveCapacity(X_LABEL), childrenResource));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testEffectiveResourceAfterReducingClusterResource()
|
||||
throws Exception {
|
||||
|
Loading…
Reference in New Issue
Block a user