YARN-10573. Enhance placement rule conversion in fs2cs in weight mode and enable it by default. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2021-01-19 09:42:40 +01:00
parent 6abdb148e4
commit a326f22606
9 changed files with 155 additions and 87 deletions

View File

@ -102,13 +102,12 @@ public enum CliOption {
"Disables checking whether a placement rule is terminal to maintain" +
" backward compatibility with configs that were made before YARN-8967.",
false),
CONVERT_PLACEMENT_RULES("convert placement rules",
"m", "convert-placement-rules",
"Convert Fair Scheduler placement rules to Capacity" +
" Scheduler mapping rules", false),
SKIP_VERIFICATION("skip verification", "s",
"skip-verification",
"Skips the verification of the converted configuration", false),
SKIP_PLACEMENT_RULES_CONVERSION("skip placement rules conversion",
"sp", "skip-convert-placement-rules",
"Do not convert placement rules", false),
ENABLE_ASYNC_SCHEDULER("enable asynchronous scheduler", "a", "enable-async-scheduler",
"Enables the Asynchronous scheduler which decouples the CapacityScheduler" +
" scheduling from Node Heartbeats.", false),
@ -253,7 +252,8 @@ private FSConfigToCSConfigConverterParams validateInputFiles(
String outputDir =
cliParser.getOptionValue(CliOption.OUTPUT_DIR.shortSwitch);
boolean convertPlacementRules =
cliParser.hasOption(CliOption.CONVERT_PLACEMENT_RULES.shortSwitch);
!cliParser.hasOption(
CliOption.SKIP_PLACEMENT_RULES_CONVERSION.shortSwitch);
checkFile(CliOption.YARN_SITE, yarnSiteXmlFile);
checkFile(CliOption.FAIR_SCHEDULER, fairSchedulerXmlFile);

View File

@ -321,7 +321,7 @@ private void performRuleConversion(FairScheduler fs)
MappingRulesDescription desc =
placementConverter.convertPlacementPolicy(placementManager,
ruleHandler, capacitySchedulerConfig);
ruleHandler, capacitySchedulerConfig, usePercentages);
ObjectMapper mapper = new ObjectMapper();
// close output stream if we write to a file, leave it open otherwise

View File

@ -29,7 +29,7 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -61,18 +61,9 @@ public class FSConfigToCSConfigRuleHandler {
public static final String MIN_RESOURCES =
"minResources.action";
public static final String USER_MAX_RUNNING_APPS =
"userMaxRunningApps.action";
public static final String USER_MAX_APPS_DEFAULT =
"userMaxAppsDefault.action";
public static final String DYNAMIC_MAX_ASSIGN =
"dynamicMaxAssign.action";
public static final String SPECIFIED_NOT_FIRST =
"specifiedNotFirstRule.action";
public static final String RESERVATION_SYSTEM =
"reservationSystem.action";
@ -91,6 +82,9 @@ public class FSConfigToCSConfigRuleHandler {
public static final String CHILD_STATIC_DYNAMIC_CONFLICT =
"childStaticDynamicConflict.action";
public static final String PARENT_CHILD_CREATE_DIFFERS =
"parentChildCreateDiff.action";
@VisibleForTesting
enum RuleAction {
WARNING,
@ -132,16 +126,14 @@ public void initPropertyActions() {
setActionForProperty(MAX_CHILD_CAPACITY);
setActionForProperty(MAX_RESOURCES);
setActionForProperty(MIN_RESOURCES);
setActionForProperty(USER_MAX_RUNNING_APPS);
setActionForProperty(USER_MAX_APPS_DEFAULT);
setActionForProperty(DYNAMIC_MAX_ASSIGN);
setActionForProperty(SPECIFIED_NOT_FIRST);
setActionForProperty(RESERVATION_SYSTEM);
setActionForProperty(QUEUE_AUTO_CREATE);
setActionForProperty(FAIR_AS_DRF);
setActionForProperty(QUEUE_DYNAMIC_CREATE);
setActionForProperty(PARENT_DYNAMIC_CREATE);
setActionForProperty(CHILD_STATIC_DYNAMIC_CONFLICT);
setActionForProperty(PARENT_CHILD_CREATE_DIFFERS);
}
public void handleMaxCapacityPercentage(String queueName) {
@ -223,6 +215,15 @@ public void handleChildStaticDynamicConflict(String parentPath) {
handle(CHILD_STATIC_DYNAMIC_CONFLICT, null, msg);
}
public void handleFSParentAndChildCreateFlagDiff(Policy policy) {
String msg = String.format("Placement rules: the policy %s originally uses"
+ " true/false or false/true \"create\" settings on the Fair Scheduler"
+ " side. This is not supported and create flag will be set"
+ " to *true* in the generated JSON rule chain", policy.name());
handle(PARENT_CHILD_CREATE_DIFFERS, null, msg);
}
private void handle(String actionName, String fsSetting, String message) {
RuleAction action = actions.get(actionName);

View File

@ -42,7 +42,8 @@ class QueuePlacementConverter {
MappingRulesDescription convertPlacementPolicy(
PlacementManager placementManager,
FSConfigToCSConfigRuleHandler ruleHandler,
CapacitySchedulerConfiguration convertedCSconfig) {
CapacitySchedulerConfiguration convertedCSconfig,
boolean usePercentages) {
MappingRulesDescription desc = new MappingRulesDescription();
List<Rule> rules = new ArrayList<>();
@ -59,32 +60,40 @@ MappingRulesDescription convertPlacementPolicy(
userRule,
ruleHandler,
create,
convertedCSconfig);
convertedCSconfig,
usePercentages);
} else {
rules.add(createRule(Policy.USER, create, ruleHandler));
rules.add(createRule(Policy.USER, create, ruleHandler,
usePercentages));
}
} else if (fsRule instanceof SpecifiedPlacementRule) {
rules.add(createRule(Policy.SPECIFIED, create, ruleHandler));
rules.add(createRule(Policy.SPECIFIED, create, ruleHandler,
usePercentages));
} else if (fsRule instanceof PrimaryGroupPlacementRule) {
rules.add(createRule(Policy.PRIMARY_GROUP, create, ruleHandler));
rules.add(createRule(Policy.PRIMARY_GROUP, create, ruleHandler,
usePercentages));
} else if (fsRule instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) fsRule;
String defaultQueueName = defaultRule.defaultQueueName;
Rule rule;
if (DEFAULT_QUEUE.equals(defaultQueueName)) {
rule = createRule(Policy.DEFAULT_QUEUE, create, ruleHandler);
rule = createRule(Policy.DEFAULT_QUEUE, create, ruleHandler,
usePercentages);
} else {
rule = createRule(Policy.CUSTOM, create, ruleHandler);
rule = createRule(Policy.CUSTOM, create, ruleHandler,
usePercentages);
rule.setCustomPlacement(defaultQueueName);
}
rules.add(rule);
} else if (fsRule instanceof SecondaryGroupExistingPlacementRule) {
Rule rule = createRule(Policy.SECONDARY_GROUP, create, ruleHandler);
Rule rule = createRule(Policy.SECONDARY_GROUP, create, ruleHandler,
usePercentages);
rules.add(rule);
} else if (fsRule instanceof RejectPlacementRule) {
rules.add(createRule(Policy.REJECT, false, ruleHandler));
rules.add(createRule(Policy.REJECT, false, ruleHandler,
usePercentages));
} else {
throw new IllegalArgumentException("Unknown placement rule: " + fsRule);
}
@ -99,7 +108,8 @@ private void handleNestedRule(List<Rule> rules,
UserPlacementRule userRule,
FSConfigToCSConfigRuleHandler ruleHandler,
boolean create,
CapacitySchedulerConfiguration csConf) {
CapacitySchedulerConfiguration csConf,
boolean usePercentages) {
PlacementRule parentRule = userRule.getParentRule();
boolean parentCreate = ((FSPlacementRule) parentRule).getCreateFlag();
Policy policy;
@ -124,12 +134,13 @@ private void handleNestedRule(List<Rule> rules,
ruleHandler,
parentCreate,
queueName,
csConf);
csConf,
usePercentages);
rules.add(rule);
}
private Rule createRule(Policy policy, boolean create,
FSConfigToCSConfigRuleHandler ruleHandler) {
FSConfigToCSConfigRuleHandler ruleHandler, boolean usePercentages) {
Rule rule = new Rule();
rule.setPolicy(policy);
rule.setCreate(create);
@ -137,7 +148,7 @@ private Rule createRule(Policy policy, boolean create,
rule.setFallbackResult(SKIP_RESULT);
rule.setType(Type.USER);
if (create) {
if (usePercentages && create) {
// display warning that these queues must exist and
// cannot be created automatically under "root"
if (policy == Policy.PRIMARY_GROUP
@ -159,29 +170,41 @@ private Rule createNestedRule(Policy policy,
FSConfigToCSConfigRuleHandler ruleHandler,
boolean fsParentCreate,
String parentQueue,
CapacitySchedulerConfiguration csConf) {
CapacitySchedulerConfiguration csConf,
boolean usePercentages) {
Rule rule = createRule(policy, create, ruleHandler);
Rule rule = createRule(policy, create, ruleHandler, usePercentages);
if (parentQueue != null) {
rule.setParentQueue(parentQueue);
}
// create flag for the parent rule is not supported
if (fsParentCreate) {
if (policy == Policy.PRIMARY_GROUP_USER) {
ruleHandler.handleFSParentCreateFlag("root.<primaryGroup>");
} else if (policy == Policy.SECONDARY_GROUP_USER) {
ruleHandler.handleFSParentCreateFlag("root.<secondaryGroup>");
} else {
ruleHandler.handleFSParentCreateFlag(parentQueue);
if (usePercentages) {
// create flag for the parent rule is not supported
if (fsParentCreate) {
if (policy == Policy.PRIMARY_GROUP_USER) {
ruleHandler.handleFSParentCreateFlag("root.<primaryGroup>");
} else if (policy == Policy.SECONDARY_GROUP_USER) {
ruleHandler.handleFSParentCreateFlag("root.<secondaryGroup>");
} else {
ruleHandler.handleFSParentCreateFlag(parentQueue);
}
}
}
// check if parent conflicts with existing static queues
if (create && policy == Policy.USER) {
ruleHandler.handleRuleAutoCreateFlag(parentQueue);
checkStaticDynamicConflict(parentQueue, csConf, ruleHandler);
// check if parent conflicts with existing static queues
if (create && policy == Policy.USER) {
ruleHandler.handleRuleAutoCreateFlag(parentQueue);
checkStaticDynamicConflict(parentQueue, csConf, ruleHandler);
}
} else {
// weight mode, we have only minor limitations
rule.setCreate(fsParentCreate || create);
// we don't support nested create flags yet, so "true/false"
// "false/true" settings are ignored
if (fsParentCreate ^ create) {
ruleHandler.handleFSParentAndChildCreateFlagDiff(policy);
}
}
return rule;

View File

@ -612,11 +612,11 @@ private void testPlacementRuleConversion(boolean enabled) throws Exception {
if (enabled) {
args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-p", "-m");
"-p");
} else {
args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-p");
"-p", "-sp");
}
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions,
@ -631,9 +631,10 @@ private void testPlacementRuleConversion(boolean enabled) throws Exception {
FSConfigToCSConfigConverterParams params = captor.getValue();
if (enabled) {
assertTrue("-m switch had no effect", params.isConvertPlacementRules());
assertTrue("Conversion should be enabled by default",
params.isConvertPlacementRules());
} else {
assertFalse("Placement rule conversion was enabled",
assertFalse("-sp switch had no effect",
params.isConvertPlacementRules());
}
}

View File

@ -22,9 +22,13 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.QUEUE_AUTO_CREATE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RESERVATION_SYSTEM;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.CHILD_STATIC_DYNAMIC_CONFLICT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.PARENT_CHILD_CREATE_DIFFERS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.FAIR_AS_DRF;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_RESOURCES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MIN_RESOURCES;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.PARENT_DYNAMIC_CREATE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.ABORT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.WARNING;
import static org.junit.Assert.assertEquals;
@ -32,6 +36,7 @@
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyBoolean;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -392,14 +397,8 @@ public void testConvertFSConfigurationRulesFile() throws Exception {
ABORT, actions.get(MAX_CAPACITY_PERCENTAGE));
assertEquals("maxChildCapacity",
ABORT, actions.get(MAX_CHILD_CAPACITY));
assertEquals("userMaxRunningApps",
ABORT, actions.get(USER_MAX_RUNNING_APPS));
assertEquals("userMaxAppsDefault",
ABORT, actions.get(USER_MAX_APPS_DEFAULT));
assertEquals("dynamicMaxAssign",
ABORT, actions.get(DYNAMIC_MAX_ASSIGN));
assertEquals("specifiedNotFirstRule",
ABORT, actions.get(SPECIFIED_NOT_FIRST));
assertEquals("reservationSystem",
ABORT, actions.get(RESERVATION_SYSTEM));
assertEquals("queueAutoCreate",
@ -427,18 +426,24 @@ public void testConvertFSConfigurationWithoutRulesFile() throws Exception {
WARNING, actions.get(MAX_CAPACITY_PERCENTAGE));
assertEquals("maxChildCapacity",
WARNING, actions.get(MAX_CHILD_CAPACITY));
assertEquals("userMaxRunningApps",
WARNING, actions.get(USER_MAX_RUNNING_APPS));
assertEquals("userMaxAppsDefault",
WARNING, actions.get(USER_MAX_APPS_DEFAULT));
assertEquals("dynamicMaxAssign",
WARNING, actions.get(DYNAMIC_MAX_ASSIGN));
assertEquals("specifiedNotFirstRule",
WARNING, actions.get(SPECIFIED_NOT_FIRST));
assertEquals("reservationSystem",
WARNING, actions.get(RESERVATION_SYSTEM));
assertEquals("queueAutoCreate",
WARNING, actions.get(QUEUE_AUTO_CREATE));
assertEquals("childStaticDynamicConflict",
WARNING, actions.get(CHILD_STATIC_DYNAMIC_CONFLICT));
assertEquals("parentChildCreateDiffers",
WARNING, actions.get(PARENT_CHILD_CREATE_DIFFERS));
assertEquals("fairAsDrf",
WARNING, actions.get(FAIR_AS_DRF));
assertEquals("maxResources",
WARNING, actions.get(MAX_RESOURCES));
assertEquals("minResources",
WARNING, actions.get(MIN_RESOURCES));
assertEquals("parentDynamicCreate",
WARNING, actions.get(PARENT_DYNAMIC_CREATE));
}
@Test
@ -661,7 +666,8 @@ public void testPlacementRulesConversionEnabled() throws Exception {
verify(placementConverter).convertPlacementPolicy(
any(PlacementManager.class),
any(FSConfigToCSConfigRuleHandler.class),
any(CapacitySchedulerConfiguration.class));
any(CapacitySchedulerConfiguration.class),
anyBoolean());
}
@Test

View File

@ -120,7 +120,6 @@ public void testConvertFSConfigurationWithConsoleParam()
FSConfigToCSConfigConverterMain.main(new String[] {
"-p",
"-m",
"-e",
"-y", YARN_SITE_XML,
"-f", FS_ALLOC_FILE,
@ -167,7 +166,6 @@ public void testConvertFSConfigurationWithLongSwitches()
FSConfigToCSConfigConverterMain.main(new String[] {
"--print",
"--convert-placement-rules",
"--rules-to-file",
"--percentage",
"--yarnsiteconfig", YARN_SITE_XML,

View File

@ -24,9 +24,6 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_QUEUE_LIMIT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.QUEUE_AUTO_CREATE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RESERVATION_SYSTEM;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.FAIR_AS_DRF;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
@ -86,9 +83,6 @@ public void testAllRulesWarning() throws IOException {
rules.put(MAX_CHILD_CAPACITY, WARNING);
rules.put(QUEUE_AUTO_CREATE, WARNING);
rules.put(RESERVATION_SYSTEM, WARNING);
rules.put(SPECIFIED_NOT_FIRST, WARNING);
rules.put(USER_MAX_APPS_DEFAULT, WARNING);
rules.put(USER_MAX_RUNNING_APPS, WARNING);
rules.put(FAIR_AS_DRF, WARNING);
ruleHandler = new FSConfigToCSConfigRuleHandler(rules,
@ -112,10 +106,6 @@ public void testAllRulesAbort() throws IOException {
rules.put(MIN_RESOURCES, ABORT);
rules.put(QUEUE_AUTO_CREATE, ABORT);
rules.put(RESERVATION_SYSTEM, ABORT);
rules.put(SPECIFIED_NOT_FIRST, ABORT);
rules.put(USER_MAX_APPS_DEFAULT, ABORT);
rules.put(USER_MAX_RUNNING_APPS, ABORT);
rules.put(USER_MAX_RUNNING_APPS, ABORT);
rules.put(FAIR_AS_DRF, ABORT);
rules.put(MAX_CHILD_QUEUE_LIMIT, "1");

View File

@ -17,6 +17,7 @@
import static org.junit.Assert.assertEquals;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyNoMoreInteractions;
@ -317,6 +318,56 @@ public void testConvertNestedDefaultGroupWithCreate() {
verifyNoMoreInteractions(ruleHandler);
}
@Test
public void testConvertNestedRuleCreateFalseFalseInWeightMode() {
testConvertNestedRuleCreateFlagInWeightMode(false, false,
false, false);
}
@Test
public void testConvertNestedRuleCreateFalseTrueInWeightMode() {
testConvertNestedRuleCreateFlagInWeightMode(false, true,
true, true);
}
@Test
public void testConvertNestedRuleCreateTrueFalseInWeightMode() {
testConvertNestedRuleCreateFlagInWeightMode(true, false,
true, true);
}
@Test
public void testConvertNestedRuleCreateTrueTrueInWeightMode() {
testConvertNestedRuleCreateFlagInWeightMode(true, true,
true, false);
}
private void testConvertNestedRuleCreateFlagInWeightMode(
boolean parentCreate,
boolean childCreate,
boolean expectedFlagOnRule,
boolean ruleHandlerShouldBeInvoked) {
UserPlacementRule fsRule = mock(UserPlacementRule.class);
PrimaryGroupPlacementRule parent = mock(PrimaryGroupPlacementRule.class);
when(parent.getCreateFlag()).thenReturn(parentCreate);
when(fsRule.getParentRule()).thenReturn(parent);
when(fsRule.getCreateFlag()).thenReturn(childCreate);
initPlacementManagerMock(fsRule);
MappingRulesDescription desc = convertInWeightMode();
Rule rule = desc.getRules().get(0);
assertEquals("Expected create flag", expectedFlagOnRule, rule.getCreate());
if (ruleHandlerShouldBeInvoked) {
verify(ruleHandler).handleFSParentAndChildCreateFlagDiff(
any(Policy.class));
verifyNoMoreInteractions(ruleHandler);
} else {
verifyZeroInteractions(ruleHandler);
}
}
@Test
public void testConvertNestedPrimaryGroupRuleWithParentCreate() {
UserPlacementRule fsRule = mock(UserPlacementRule.class);
@ -388,7 +439,12 @@ private void initPlacementManagerMock(
private MappingRulesDescription convert() {
return converter.convertPlacementPolicy(placementManager,
ruleHandler, csConf);
ruleHandler, csConf, true);
}
private MappingRulesDescription convertInWeightMode() {
return converter.convertPlacementPolicy(placementManager,
ruleHandler, csConf, false);
}
private void verifyRule(Rule rule, Policy expectedPolicy) {
@ -399,13 +455,6 @@ private void verifyRule(Rule rule, Policy expectedPolicy) {
assertEquals("Type", Type.USER, rule.getType());
}
private void verifySetDefaultRule(Rule rule, String expectedQueue) {
assertEquals("Policy type", Policy.SET_DEFAULT_QUEUE, rule.getPolicy());
assertEquals("Queue", expectedQueue, rule.getValue());
assertEquals("Fallback result",
FallbackResult.SKIP, rule.getFallbackResult());
}
private class TestPlacementRule extends FSPlacementRule {
@Override
public ApplicationPlacementContext getPlacementForApp(