YARN-10197. FS-CS converter: fix emitted ordering policy string and max-am-resource percent value. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-03-26 12:35:10 +01:00
parent 348685dcb9
commit 9a297ff31d
6 changed files with 175 additions and 112 deletions

View File

@ -17,6 +17,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;
import java.io.File;
import java.io.FileNotFoundException;
@ -264,16 +265,6 @@ public class FSConfigToCSConfigConverter {
convertedYarnSiteConfig.writeXml(yarnSiteOutputStream);
}
@VisibleForTesting
void setYarnSiteOutputStream(OutputStream out) {
this.yarnSiteOutputStream = out;
}
@VisibleForTesting
void setCapacitySchedulerConfigOutputStream(OutputStream out) {
this.capacitySchedulerOutputStream = out;
}
private void convertYarnSiteXml(Configuration inputYarnSiteConfig,
boolean havePlacementPolicies) {
FSYarnSiteConverter siteConverter =
@ -339,10 +330,17 @@ public class FSConfigToCSConfigConverter {
}
private void emitDefaultMaxAMShare() {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
String.valueOf(queueMaxAMShareDefault));
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
1.0f);
} else {
capacitySchedulerConfig.setFloat(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
queueMaxAMShareDefault);
}
}
private void emitACLs(FairScheduler fs) {
@ -440,6 +438,11 @@ public class FSConfigToCSConfigConverter {
return convertedYarnSiteConfig;
}
@VisibleForTesting
Configuration getCapacitySchedulerConfig() {
return capacitySchedulerConfig;
}
@VisibleForTesting
void setConvertPlacementRules(boolean convertPlacementRules) {
this.convertPlacementRules = convertPlacementRules;

View File

@ -21,10 +21,8 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.C
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.hadoop.conf.Configuration;
@ -43,8 +41,11 @@ import org.apache.hadoop.yarn.util.resource.Resources;
*
*/
public class FSQueueConverter {
public static final float QUEUE_MAX_AM_SHARE_DISABLED = -1.0f;
private static final int MAX_RUNNING_APPS_UNSET = Integer.MIN_VALUE;
private final Set<String> leafQueueNames;
private static final String FAIR_POLICY = "fair";
private static final String FIFO_POLICY = "fifo";
private final FSConfigToCSConfigRuleHandler ruleHandler;
private Configuration capacitySchedulerConfig;
private final boolean preemptionEnabled;
@ -59,7 +60,6 @@ public class FSQueueConverter {
private ConversionOptions conversionOptions;
public FSQueueConverter(FSQueueConverterBuilder builder) {
this.leafQueueNames = new HashSet<>();
this.ruleHandler = builder.ruleHandler;
this.capacitySchedulerConfig = builder.capacitySchedulerConfig;
this.preemptionEnabled = builder.preemptionEnabled;
@ -76,15 +76,6 @@ public class FSQueueConverter {
List<FSQueue> children = queue.getChildQueues();
final String queueName = queue.getName();
if (queue instanceof FSLeafQueue) {
String shortName = getQueueShortName(queueName);
if (!leafQueueNames.add(shortName)) {
String msg = String.format("Leaf queues must be unique, "
+ "%s is defined at least twice", shortName);
conversionOptions.handleConversionError(msg);
}
}
emitChildQueues(queueName, children);
emitMaxAMShare(queueName, queue);
emitMaxRunningApps(queueName, queue);
@ -132,14 +123,15 @@ public class FSQueueConverter {
// Direct floating point comparison is OK here
if (queueMaxAmShare != 0.0f
&& queueMaxAmShare != queueMaxAMShareDefault
&& queueMaxAmShare != -1.0f) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-am-resource-percent", String.valueOf(queueMaxAmShare));
&& queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", queueMaxAmShare);
}
if (queueMaxAmShare == -1.0f) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-am-resource-percent", "1.0");
if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
&& queueMaxAmShare != queueMaxAMShareDefault) {
capacitySchedulerConfig.setFloat(PREFIX + queueName +
".maximum-am-resource-percent", 1.0f);
}
}
@ -265,18 +257,18 @@ public class FSQueueConverter {
switch (policy) {
case DominantResourceFairnessPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FairSharePolicy.NAME);
+ ".ordering-policy", FAIR_POLICY);
break;
case FairSharePolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FairSharePolicy.NAME);
+ ".ordering-policy", FAIR_POLICY);
if (drfUsed) {
ruleHandler.handleFairAsDrf(queueName);
}
break;
case FifoPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FifoPolicy.NAME);
+ ".ordering-policy", FIFO_POLICY);
break;
default:
String msg = String.format("Unexpected ordering policy " +

View File

@ -36,7 +36,6 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
@ -82,7 +81,8 @@ public class TestFSConfigToCSConfigConverter {
prepareFileName("fair-scheduler-orderingpolicy-mixed.xml");
private static final String FS_NO_PLACEMENT_RULES_XML =
prepareFileName("fair-scheduler-noplacementrules.xml");
private static final String FS_MAX_AM_SHARE_DISABLED_XML =
prepareFileName("fair-scheduler-defaultMaxAmShareDisabled.xml");
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
@ -96,8 +96,6 @@ public class TestFSConfigToCSConfigConverter {
private FSConfigToCSConfigConverter converter;
private Configuration config;
private ByteArrayOutputStream csConfigOut;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private FSConfigConverterTestCommons converterTestCommons;
@ -141,11 +139,6 @@ public class TestFSConfigToCSConfigConverter {
converter = new FSConfigToCSConfigConverter(ruleHandler,
createDefaultConversionOptions());
converter.setClusterResource(CLUSTER_RESOURCE);
ByteArrayOutputStream yarnSiteOut = new ByteArrayOutputStream();
csConfigOut = new ByteArrayOutputStream();
converter.setCapacitySchedulerConfigOutputStream(csConfigOut);
converter.setYarnSiteOutputStream(yarnSiteOut);
}
private FSConfigToCSConfigConverterParams.Builder
@ -166,7 +159,7 @@ public class TestFSConfigToCSConfigConverter {
public void testDefaultMaxApplications() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
Configuration conf = converter.getCapacitySchedulerConfig();
int maxApps =
conf.getInt(
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, -1);
@ -178,19 +171,54 @@ public class TestFSConfigToCSConfigConverter {
public void testDefaultMaxAMShare() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
Configuration conf = converter.getCapacitySchedulerConfig();
String maxAmShare =
conf.get(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT);
assertEquals("Default max AM share", "0.16", maxAmShare);
assertEquals("root.admins.alice max-am-resource-percent", "0.15",
conf.get(PREFIX + "root.admins.alice.maximum-am-resource-percent"));
assertNull("root.users.joe maximum-am-resource-percent should be null",
conf.get(PREFIX + "root.users.joe maximum-am-resource-percent"));
}
@Test
public void testDefaultMaxAMShareDisabled() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FS_MAX_AM_SHARE_DISABLED_XML)
.build();
converter.convert(params);
Configuration conf = converter.getCapacitySchedulerConfig();
// -1.0 means disabled ==> 1.0 in CS
assertEquals("Default max-am-resource-percent", "1.0",
conf.get(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT));
// root.admins.bob -1.0 equals to the default -1.0
assertNull("root.admins.bob maximum-am-resource-percent should be null",
conf.get(PREFIX + "root.admins.bob.maximum-am-resource-percent"));
// root.admins.alice 0.15 != -1.0
assertEquals("root.admins.alice max-am-resource-percent", "0.15",
conf.get(PREFIX + "root.admins.alice.maximum-am-resource-percent"));
// root.users.joe is unset, inherits -1.0
assertNull("root.users.joe maximum-am-resource-percent should be null",
conf.get(PREFIX + "root.users.joe.maximum-am-resource-percent"));
}
@Test
public void testConvertACLs() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
Configuration conf = converter.getCapacitySchedulerConfig();
// root
assertEquals("root submit ACL", "alice,bob,joe,john hadoop_users",
@ -227,7 +255,7 @@ public class TestFSConfigToCSConfigConverter {
public void testDefaultMaxRunningApps() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
Configuration conf = converter.getCapacitySchedulerConfig();
// default setting
assertEquals("Default max apps", 15,
@ -547,7 +575,7 @@ public class TestFSConfigToCSConfigConverter {
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
Configuration convertedConf = converter.getCapacitySchedulerConfig();
String expectedMappingRules =
"u:%user:root.admins.devs.%user,u:%user:root.users.%user,u:%user:root.default";
@ -580,7 +608,7 @@ public class TestFSConfigToCSConfigConverter {
converter.setConvertPlacementRules(true);
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
Configuration convertedConf = converter.getCapacitySchedulerConfig();
String mappingRules =
convertedConf.get(CapacitySchedulerConfiguration.QUEUE_MAPPING);
@ -602,7 +630,7 @@ public class TestFSConfigToCSConfigConverter {
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
Configuration convertedConf = converter.getCapacitySchedulerConfig();
String property =
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
assertNull("Auto-create queue shouldn't be set",
@ -632,7 +660,7 @@ public class TestFSConfigToCSConfigConverter {
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
Configuration convertedConf = converter.getCapacitySchedulerConfig();
String property =
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
@ -676,17 +704,6 @@ public class TestFSConfigToCSConfigConverter {
any(Boolean.class));
}
private Configuration getConvertedCSConfig() {
ByteArrayInputStream input =
new ByteArrayInputStream(csConfigOut.toByteArray());
assertTrue("CS config output has length of 0!",
csConfigOut.toByteArray().length > 0);
Configuration conf = new Configuration(false);
conf.addResource(input);
return conf;
}
private Configuration getConvertedCSConfig(String dir) throws IOException {
File capacityFile = new File(dir, "capacity-scheduler.xml");
ByteArrayInputStream input =

View File

@ -142,12 +142,6 @@ public class TestFSQueueConverter {
.withConversionOptions(conversionOptions);
}
private FSQueueConverter prepareDryRunConverter() {
conversionOptions.setDryRun(true);
converter = builder.withConversionOptions(conversionOptions).build();
return converter;
}
@Test
public void testConvertQueueHierarchy() {
converter = builder.build();
@ -179,24 +173,6 @@ public class TestFSQueueConverter {
assertNoValueForQueues(leafs, ".queues", csConfig);
}
@Test
public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
converter = builder.build();
expectedException.expect(ConversionException.class);
expectedException.expectMessage("Leaf queues must be unique");
String absolutePath =
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
.getAbsolutePath();
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
}
@Test
public void testQueueMaxAMShare() {
converter = builder.build();
@ -417,11 +393,11 @@ public class TestFSQueueConverter {
// root.users
assertEquals("root.users.joe ordering policy", "fair",
csConfig.get(PREFIX + "root.users.joe.ordering-policy"));
assertEquals("root.users.john ordering policy", "FIFO",
assertEquals("root.users.john ordering policy", "fifo",
csConfig.get(PREFIX + "root.users.john.ordering-policy"));
// root.admins
assertEquals("root.admins.alice ordering policy", "FIFO",
assertEquals("root.admins.alice ordering policy", "fifo",
csConfig.get(PREFIX + "root.admins.alice.ordering-policy"));
assertEquals("root.admins.bob ordering policy", "fair",
csConfig.get(PREFIX + "root.admins.bob.ordering-policy"));
@ -470,28 +446,6 @@ public class TestFSQueueConverter {
converter.convertQueueHierarchy(rootQueue);
}
@Test
public void testDryRunWithMultipleLeafQueueNames() throws IOException {
String absolutePath =
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
.getAbsolutePath();
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
prepareDryRunConverter();
converter.convertQueueHierarchy(rootQueue);
assertEquals("Dry run errors", 1, dryRunResultHolder.getErrors().size());
assertEquals("Dry run warnings", 0,
dryRunResultHolder.getWarnings().size());
String error = dryRunResultHolder.getErrors().iterator().next();
assertTrue("Unexpected error message",
error.contains("Leaf queues must be unique"));
}
private void assertNoValueForQueues(Set<String> queues, String postfix,
Configuration config) {
for (String queue : queues) {

View File

@ -45,6 +45,7 @@
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
<maxAMShare>0.16</maxAMShare>
</queue>
</queue>
<queue name="admins" type="parent">

View File

@ -0,0 +1,96 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<minResources>memory-mb=4096, vcores=1</minResources>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<minResources>memory-mb=4096, vcores=1</minResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<user name="alice">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>10</userMaxAppsDefault>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>