YARN-10099. FS-CS converter: handle allow-undeclared-pools and user-as-default-queue properly and fix misc issues. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2020-01-30 16:03:38 +01:00
parent 5977360878
commit a7d72c523a
8 changed files with 305 additions and 28 deletions

View File

@ -96,7 +96,7 @@ public class AllocationFileLoaderService extends AbstractService {
private Thread reloadThread;
private volatile boolean running = true;
AllocationFileLoaderService(FairScheduler scheduler) {
public AllocationFileLoaderService(FairScheduler scheduler) {
this(SystemClock.getInstance(), scheduler);
}
@ -186,7 +186,7 @@ public void serviceStop() throws Exception {
* classpath, but loaded like a regular File.
*/
@VisibleForTesting
Path getAllocationFile(Configuration conf)
public Path getAllocationFile(Configuration conf)
throws UnsupportedFileSystemException {
String allocFilePath = conf.get(FairSchedulerConfiguration.ALLOCATION_FILE,
FairSchedulerConfiguration.DEFAULT_ALLOCATION_FILE);

View File

@ -26,26 +26,37 @@
import java.util.List;
import java.util.Map;
import javax.xml.parsers.DocumentBuilder;
import javax.xml.parsers.DocumentBuilderFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.security.ConfiguredYarnAuthorizer;
import org.apache.hadoop.yarn.security.YarnAuthorizationProvider;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.w3c.dom.Document;
import org.w3c.dom.Element;
import org.w3c.dom.NodeList;
import com.google.common.annotations.VisibleForTesting;
@ -77,7 +88,7 @@ public class FSConfigToCSConfigConverter {
private ConversionOptions conversionOptions;
private boolean drfUsed = false;
private Configuration yarnSiteConfig;
private Configuration convertedYarnSiteConfig;
private Configuration capacitySchedulerConfig;
private FSConfigToCSConfigRuleHandler ruleHandler;
@ -98,11 +109,11 @@ public void convert(FSConfigToCSConfigConverterParams params)
validateParams(params);
prepareOutputFiles(params.getOutputDirectory(), params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration conf = createConfiguration(params);
handleFairSchedulerConfig(params, conf);
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
handleFairSchedulerConfig(params, inputYarnSiteConfig);
this.clusterResource = getClusterResource(params);
convert(conf);
convert(inputYarnSiteConfig);
}
private void prepareOutputFiles(String outputDirectory, boolean console)
@ -162,13 +173,10 @@ private void loadConversionRules(String rulesFile) throws IOException {
}
}
private Configuration createConfiguration(
private Configuration getInputYarnSiteConfig(
FSConfigToCSConfigConverterParams params) {
Configuration conf = new YarnConfiguration();
conf.addResource(new Path(params.getYarnSiteXmlConfig()));
conf.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
conf.setBoolean(FairSchedulerConfiguration.NO_TERMINAL_RULE_CHECK,
conversionOptions.isNoRuleTerminalCheck());
return conf;
}
@ -199,7 +207,7 @@ private void handleFairSchedulerConfig(
}
@VisibleForTesting
void convert(Configuration conf) throws Exception {
void convert(Configuration inputYarnSiteConfig) throws Exception {
System.out.println(WARNING_TEXT);
// initialize Fair Scheduler
@ -207,9 +215,20 @@ void convert(Configuration conf) throws Exception {
PlacementManager placementManager = new PlacementManager();
ctx.setQueuePlacementManager(placementManager);
// Prepare a separate config for the FS instance
// to force the use of ConfiguredYarnAuthorizer, otherwise
// it might use that of Ranger
Configuration fsConfig = new Configuration(inputYarnSiteConfig);
fsConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
fsConfig.setBoolean(FairSchedulerConfiguration.NO_TERMINAL_RULE_CHECK,
conversionOptions.isNoRuleTerminalCheck());
fsConfig.setClass(YarnConfiguration.YARN_AUTHORIZATION_PROVIDER,
ConfiguredYarnAuthorizer.class, YarnAuthorizationProvider.class);
FairScheduler fs = new FairScheduler();
fs.setRMContext(ctx);
fs.init(conf);
fs.init(fsConfig);
boolean havePlacementPolicies =
checkPlacementPoliciesPresent(fs, inputYarnSiteConfig);
drfUsed = isDrfUsed(fs);
@ -217,13 +236,13 @@ void convert(Configuration conf) throws Exception {
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
yarnSiteConfig = new Configuration(false);
convertedYarnSiteConfig = new Configuration(false);
capacitySchedulerConfig = new Configuration(false);
checkUserMaxApps(allocConf);
checkUserMaxAppsDefault(allocConf);
convertYarnSiteXml(conf);
convertYarnSiteXml(inputYarnSiteConfig, havePlacementPolicies);
convertCapacitySchedulerXml(fs);
if (consoleMode) {
@ -235,7 +254,7 @@ void convert(Configuration conf) throws Exception {
System.out.println();
System.out.println("======= " + YARN_SITE_XML + " =======");
}
yarnSiteConfig.writeXml(yarnSiteOutputStream);
convertedYarnSiteConfig.writeXml(yarnSiteOutputStream);
}
@VisibleForTesting
@ -248,17 +267,23 @@ void setCapacitySchedulerConfigOutputStream(OutputStream out) {
this.capacitySchedulerOutputStream = out;
}
private void convertYarnSiteXml(Configuration conf) {
private void convertYarnSiteXml(Configuration inputYarnSiteConfig,
boolean havePlacementPolicies) {
FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter();
siteConverter.convertSiteProperties(conf, yarnSiteConfig, drfUsed);
siteConverter.convertSiteProperties(inputYarnSiteConfig,
convertedYarnSiteConfig, drfUsed);
autoCreateChildQueues = siteConverter.isAutoCreateChildQueues();
// See docs: "allow-undeclared-pools" and "user-as-default-queue" are
// ignored if we have placement rules
autoCreateChildQueues =
!havePlacementPolicies && siteConverter.isAutoCreateChildQueues();
userAsDefaultQueue =
!havePlacementPolicies && siteConverter.isUserAsDefaultQueue();
preemptionEnabled = siteConverter.isPreemptionEnabled();
sizeBasedWeight = siteConverter.isSizeBasedWeight();
userAsDefaultQueue = siteConverter.isUserAsDefaultQueue();
checkReservationSystem(conf);
checkReservationSystem(inputYarnSiteConfig);
}
private void convertCapacitySchedulerXml(FairScheduler fs) {
@ -402,6 +427,46 @@ FSConfigToCSConfigRuleHandler getRuleHandler() {
@VisibleForTesting
Configuration getYarnSiteConfig() {
return yarnSiteConfig;
return convertedYarnSiteConfig;
}
/*
* Determines whether <queuePlacementPolicy> is present
* in the allocation file or not.
*
* Note that placementManager.getPlacementRules.size()
* doesn't work - by default, "allow-undeclared-pools" and
* "user-as-default-queue" are translated to policies internally
* inside QueuePlacementPolicy.fromConfiguration().
*
*/
private boolean checkPlacementPoliciesPresent(FairScheduler scheduler,
Configuration inputYarnSiteConfig)
throws RuntimeException {
try (AllocationFileLoaderService loader =
new AllocationFileLoaderService(scheduler)){
Path allocFilePath = loader.getAllocationFile(inputYarnSiteConfig);
FileSystem fs = allocFilePath.getFileSystem(inputYarnSiteConfig);
DocumentBuilderFactory docBuilderFactory =
DocumentBuilderFactory.newInstance();
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
Document doc = builder.parse(fs.open(allocFilePath));
Element root = doc.getDocumentElement();
NodeList elements = root.getChildNodes();
AllocationFileParser allocationFileParser =
new AllocationFileParser(elements);
allocationFileParser.parse();
docBuilderFactory.setIgnoringComments(true);
return
allocationFileParser.getQueuePlacementPolicy().isPresent();
} catch (Exception e) {
throw new PreconditionException("Unable to parse allocation file", e);
}
}
}

View File

@ -30,6 +30,10 @@ public PreconditionException(String message) {
super(message);
}
public PreconditionException(String message, Throwable cause) {
super(message, cause);
}
public PreconditionException(String message, MissingArgumentException ex) {
super(message, ex);
}

View File

@ -34,6 +34,8 @@ class QueuePlacementConverter {
private static final String PRIMARY_GROUP = "%primary_group";
private static final String SECONDARY_GROUP = "%secondary_group";
private static final String RULE_SEPARATOR = ",";
Map<String, String> convertPlacementPolicy(PlacementManager placementManager,
FSConfigToCSConfigRuleHandler ruleHandler, boolean userAsDefaultQueue) {
StringBuilder mapping = new StringBuilder();
@ -59,7 +61,7 @@ Map<String, String> convertPlacementPolicy(PlacementManager placementManager,
} else {
if (!userAsDefaultQueue) {
if (mapping.length() > 0) {
mapping.append(";");
mapping.append(RULE_SEPARATOR);
}
mapping.append("u:" + USER + ":" + USER);
}
@ -72,18 +74,18 @@ Map<String, String> convertPlacementPolicy(PlacementManager placementManager,
"yarn.scheduler.capacity.queue-mappings-override.enable", "false");
} else if (rule instanceof PrimaryGroupPlacementRule) {
if (mapping.length() > 0) {
mapping.append(";");
mapping.append(RULE_SEPARATOR);
}
mapping.append("u:" + USER + ":" + PRIMARY_GROUP);
} else if (rule instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
if (mapping.length() > 0) {
mapping.append(";");
mapping.append(RULE_SEPARATOR);
}
mapping.append("u:" + USER + ":").append(defaultRule.defaultQueueName);
} else if (rule instanceof SecondaryGroupExistingPlacementRule) {
if (mapping.length() > 0) {
mapping.append(";");
mapping.append(RULE_SEPARATOR);
}
mapping.append("u:" + USER + ":" + SECONDARY_GROUP);
} else if (!(rule instanceof RejectPlacementRule)) {
@ -103,7 +105,7 @@ private void handleNestedRule(StringBuilder mapping,
UserPlacementRule userRule) {
PlacementRule pr = userRule.getParentRule();
if (mapping.length() > 0) {
mapping.append(";");
mapping.append(RULE_SEPARATOR);
}
if (pr instanceof PrimaryGroupPlacementRule) {
mapping.append("u:" + USER + ":" + PRIMARY_GROUP + "." + USER);

View File

@ -27,6 +27,7 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.ABORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
@ -74,6 +75,9 @@ public class TestFSConfigToCSConfigConverter {
prepareFileName("fair-scheduler-onlyfairpolicy.xml");
private static final String FS_MIXED_POLICY_XML =
prepareFileName("fair-scheduler-orderingpolicy-mixed.xml");
private static final String FS_NO_PLACEMENT_RULES_XML =
prepareFileName("fair-scheduler-noplacementrules.xml");
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
@ -504,6 +508,112 @@ public void testConversionWhenMixedPolicyIsUsed() throws Exception {
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, null));
}
@SuppressWarnings("checkstyle:linelength")
public void testUserAsDefaultQueueWithPlacementRules() throws Exception {
config = new Configuration(false);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FAIR_SCHEDULER_XML);
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
String expectedMappingRules =
"u:%user:root.admins.devs.%user,u:%user:root.users.%user,u:%user:root.default";
String mappingRules =
convertedConf.get(CapacitySchedulerConfiguration.QUEUE_MAPPING);
assertEquals("Mapping rules", expectedMappingRules, mappingRules);
}
@Test
public void testUserAsDefaultQueueTrueWithoutPlacementRules()
throws Exception {
testUserAsDefaultQueueWithoutPlacementRules(true);
}
@Test
public void testUserAsDefaultQueueFalseWithoutPlacementRules()
throws Exception {
testUserAsDefaultQueueWithoutPlacementRules(false);
}
private void testUserAsDefaultQueueWithoutPlacementRules(boolean
userAsDefaultQueue) throws Exception {
config = new Configuration(false);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FS_NO_PLACEMENT_RULES_XML);
config.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
userAsDefaultQueue);
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
String mappingRules =
convertedConf.get(CapacitySchedulerConfiguration.QUEUE_MAPPING);
if (userAsDefaultQueue) {
assertEquals("Mapping rules", "u:%user:%user", mappingRules);
} else {
assertEquals("Mapping rules", "u:%user:root.default", mappingRules);
}
}
@Test
public void testAutoCreateChildQueuesWithPlacementRules() throws Exception {
config = new Configuration(false);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FAIR_SCHEDULER_XML);
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
true);
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
String property =
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
assertNull("Auto-create queue shouldn't be set",
convertedConf.get(property));
}
@Test
public void testAutoCreateChildQueuesTrueWithoutPlacementRules()
throws Exception {
testAutoCreateChildQueuesWithoutPlacementRules(true);
}
@Test
public void testAutoCreateChildQueuesFalseWithoutPlacementRules()
throws Exception {
testAutoCreateChildQueuesWithoutPlacementRules(false);
}
private void testAutoCreateChildQueuesWithoutPlacementRules(
boolean allowUndeclaredPools) throws Exception {
config = new Configuration(false);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FS_NO_PLACEMENT_RULES_XML);
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
allowUndeclaredPools);
converter.convert(config);
Configuration convertedConf = getConvertedCSConfig();
String property =
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
if (allowUndeclaredPools) {
assertEquals("Auto-create queue wasn't enabled", true,
convertedConf.getBoolean(property, false));
} else {
assertNull("Auto-create queue shouldn't be set",
convertedConf.get(property));
}
}
private Configuration getConvertedCSConfig() {
ByteArrayInputStream input =
new ByteArrayInputStream(csConfigOut.toByteArray());

View File

@ -334,7 +334,6 @@ public void testQueueMaximumCapacity() {
@Test
public void testQueueAutoCreateChildQueue() {
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, true);
converter = builder
.withCapacitySchedulerConfig(csConfig)
.withAutoCreateChildQueues(true)
@ -346,6 +345,19 @@ public void testQueueAutoCreateChildQueue() {
csConfig);
}
@Test
public void testQueueWithNoAutoCreateChildQueue() {
converter = builder
.withCapacitySchedulerConfig(csConfig)
.withAutoCreateChildQueues(false)
.build();
converter.convertQueueHierarchy(rootQueue);
assertNoValueForQueues(ALL_QUEUES, ".auto-create-child-queue.enabled",
csConfig);
}
@Test
public void testQueueSizeBasedWeightEnabled() {
converter = builder.withSizeBasedWeight(true).build();

View File

@ -224,7 +224,7 @@ public void testConvertMultiplePlacementRules() {
Map<String, String> properties = convert(false);
verifyMapping(properties,
"u:%user:%user;u:%user:%primary_group;u:%user:%secondary_group");
"u:%user:%user,u:%user:%primary_group,u:%user:%secondary_group");
verifyZeroInteractions(ruleHandler);
}

View File

@ -0,0 +1,84 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<user name="alice">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>10</userMaxAppsDefault>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
</allocations>