YARN-10292. FS-CS converter: add an option to enable asynchronous scheduling in CapacityScheduler. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2020-06-16 18:01:39 +02:00
parent d73cdb1c86
commit 52efe48d79
7 changed files with 123 additions and 10 deletions

View File

@ -22,6 +22,7 @@ public class ConversionOptions {
private DryRunResultHolder dryRunResultHolder;
private boolean dryRun;
private boolean noTerminalRuleCheck;
private boolean enableAsyncScheduler;
public ConversionOptions(DryRunResultHolder dryRunResultHolder,
boolean dryRun) {
@ -41,6 +42,14 @@ public boolean isNoRuleTerminalCheck() {
return noTerminalRuleCheck;
}
public void setEnableAsyncScheduler(boolean enableAsyncScheduler) {
this.enableAsyncScheduler = enableAsyncScheduler;
}
public boolean isEnableAsyncScheduler() {
return enableAsyncScheduler;
}
public void handleWarning(String msg, Logger log) {
if (dryRun) {
dryRunResultHolder.addDryRunWarning(msg);

View File

@ -109,6 +109,9 @@ public enum CliOption {
SKIP_VERIFICATION("skip verification", "s",
"skip-verification",
"Skips the verification of the converted configuration", false),
ENABLE_ASYNC_SCHEDULER("enable asynchronous scheduler", "a", "enable-async-scheduler",
"Enables the Asynchronous scheduler which decouples the CapacityScheduler" +
" scheduling from Node Heartbeats.", false),
HELP("help", "h", "help", "Displays the list of options", false);
private final String name;
@ -220,6 +223,8 @@ private FSConfigToCSConfigConverter prepareAndGetConverter(
conversionOptions.setDryRun(dryRun);
conversionOptions.setNoTerminalRuleCheck(
cliParser.hasOption(CliOption.NO_TERMINAL_RULE_CHECK.shortSwitch));
conversionOptions.setEnableAsyncScheduler(
cliParser.hasOption(CliOption.ENABLE_ASYNC_SCHEDULER.shortSwitch));
checkOptionPresent(cliParser, CliOption.YARN_SITE);
checkOutputDefined(cliParser, dryRun);

View File

@ -270,7 +270,8 @@ private void convertYarnSiteXml(Configuration inputYarnSiteConfig,
FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter();
siteConverter.convertSiteProperties(inputYarnSiteConfig,
convertedYarnSiteConfig, drfUsed);
convertedYarnSiteConfig, drfUsed,
conversionOptions.isEnableAsyncScheduler());
// See docs: "allow-undeclared-pools" and "user-as-default-queue" are
// ignored if we have placement rules

View File

@ -38,7 +38,7 @@ public class FSYarnSiteConverter {
@SuppressWarnings({"deprecation", "checkstyle:linelength"})
public void convertSiteProperties(Configuration conf,
Configuration yarnSiteConfig, boolean drfUsed) {
Configuration yarnSiteConfig, boolean drfUsed, boolean enableAsyncScheduler) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
@ -146,6 +146,10 @@ public void convertSiteProperties(Configuration conf,
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getCanonicalName());
}
if (enableAsyncScheduler) {
yarnSiteConfig.setBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
}
}
public boolean isPreemptionEnabled() {

View File

@ -651,4 +651,35 @@ public void testValidationSkippedWhenOutputIsConsole() throws Exception {
verifyZeroInteractions(mockValidator);
}
@Test
public void testEnabledAsyncScheduling() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p",
"-a");
argumentHandler.parseAndConvert(args);
assertTrue("-a switch had no effect",
conversionOptions.isEnableAsyncScheduler());
}
@Test
public void testDisabledAsyncScheduling() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(conversionOptions, mockValidator);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE, "-p");
argumentHandler.parseAndConvert(args);
assertFalse("-a switch wasn't provided but async scheduling option is true",
conversionOptions.isEnableAsyncScheduler());
}
}

View File

@ -717,6 +717,41 @@ public void testPlacementRulesConversionEnabled() throws Exception {
any(Boolean.class));
}
@Test
public void testConversionWhenAsyncSchedulingIsEnabled()
throws Exception {
boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(true);
assertTrue("Asynchronous scheduling should be true", schedulingEnabledValue);
}
@Test
public void testConversionWhenAsyncSchedulingIsDisabled() throws Exception {
boolean schedulingEnabledValue = testConversionWithAsyncSchedulingOption(false);
assertEquals("Asynchronous scheduling should be the default value",
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE,
schedulingEnabledValue);
}
private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML)
.build();
ConversionOptions conversionOptions = createDefaultConversionOptions();
conversionOptions.setEnableAsyncScheduler(enabled);
converter = new FSConfigToCSConfigConverter(ruleHandler,
conversionOptions);
converter.convert(params);
Configuration convertedConfig = converter.getYarnSiteConfig();
return convertedConfig.getBoolean(CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE);
}
private Configuration getConvertedCSConfig(String dir) throws IOException {
File capacityFile = new File(dir, "capacity-scheduler.xml");
ByteArrayInputStream input =

View File

@ -26,6 +26,7 @@
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
@ -52,7 +53,8 @@ public void testSiteContinuousSchedulingConversion() {
yarnConfig.setInt(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
@ -67,7 +69,7 @@ public void testSiteMinimumAllocationIncrementConversion() {
yarnConfig.setInt("yarn.resource-types.memory-mb.increment-allocation", 11);
yarnConfig.setInt("yarn.resource-types.vcores.increment-allocation", 5);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false, false);
assertEquals("Memory alloc increment", 11,
yarnConvertedConfig.getInt("yarn.scheduler.minimum-allocation-mb",
@ -85,7 +87,8 @@ public void testSitePreemptionConversion() {
FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
321);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Preemption enabled",
yarnConvertedConfig.getBoolean(
@ -105,7 +108,8 @@ public void testSitePreemptionConversion() {
public void testSiteAssignMultipleConversion() {
yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertTrue("Assign multiple",
yarnConvertedConfig.getBoolean(
@ -117,7 +121,8 @@ public void testSiteAssignMultipleConversion() {
public void testSiteMaxAssignConversion() {
yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Max assign", 111,
yarnConvertedConfig.getInt(
@ -131,7 +136,8 @@ public void testSiteLocalityThresholdConversion() {
yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK,
"321.321");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Locality threshold node", "123.123",
yarnConvertedConfig.get(
@ -143,7 +149,8 @@ public void testSiteLocalityThresholdConversion() {
@Test
public void testSiteDrfEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
false);
assertEquals("Resource calculator type", DominantResourceCalculator.class,
yarnConvertedConfig.getClass(
@ -152,11 +159,32 @@ public void testSiteDrfEnabledConversion() {
@Test
public void testSiteDrfDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertEquals("Resource calculator type", DefaultResourceCalculator.class,
yarnConvertedConfig.getClass(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
CapacitySchedulerConfiguration.DEFAULT_RESOURCE_CALCULATOR_CLASS));
}
@Test
public void testAsyncSchedulingEnabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, true,
true);
assertTrue("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
}
@Test
public void testAsyncSchedulingDisabledConversion() {
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig, false,
false);
assertFalse("Asynchronous scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE,
CapacitySchedulerConfiguration.DEFAULT_SCHEDULE_ASYNCHRONOUSLY_ENABLE));
}
}