YARN-10507. Add the capability to fs2cs to write the converted placement rules inside capacity-scheduler.xml. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2021-01-08 13:40:33 +01:00
parent 580a6a75a3
commit 4176759909
8 changed files with 177 additions and 47 deletions

View File

@ -112,6 +112,9 @@ public enum CliOption {
ENABLE_ASYNC_SCHEDULER("enable asynchronous scheduler", "a", "enable-async-scheduler",
"Enables the Asynchronous scheduler which decouples the CapacityScheduler" +
" scheduling from Node Heartbeats.", false),
RULES_TO_FILE("rules to external file", "e", "rules-to-file",
"Generates the converted placement rules to an external JSON file " +
"called mapping-rules.json", false),
HELP("help", "h", "help", "Displays the list of options", false);
private final String name;
@ -254,6 +257,13 @@ private FSConfigToCSConfigConverterParams validateInputFiles(
checkDirectory(CliOption.OUTPUT_DIR, outputDir);
checkOutputDirDoesNotContainXmls(yarnSiteXmlFile, outputDir);
// check mapping-rules.json if we intend to generate it
if (!cliParser.hasOption(CliOption.CONSOLE_MODE.shortSwitch) &&
cliParser.hasOption(CliOption.RULES_TO_FILE.shortSwitch)) {
checkFileNotInOutputDir(new File(outputDir),
FSConfigToCSConfigConverter.MAPPING_RULES_JSON);
}
return FSConfigToCSConfigConverterParams.Builder.create()
.withYarnSiteXmlConfig(yarnSiteXmlFile)
.withFairSchedulerXmlConfig(fairSchedulerXmlFile)
@ -263,6 +273,8 @@ private FSConfigToCSConfigConverterParams validateInputFiles(
.withConsole(cliParser.hasOption(CliOption.CONSOLE_MODE.shortSwitch))
.withOutputDirectory(outputDir)
.withConvertPlacementRules(convertPlacementRules)
.withPlacementRulesToFile(
cliParser.hasOption(CliOption.RULES_TO_FILE.shortSwitch))
.build();
}

View File

@ -17,13 +17,18 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
@ -55,6 +60,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.ObjectWriter;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import com.fasterxml.jackson.core.JsonGenerator;
/**
* Converts Fair Scheduler configuration (site and fair-scheduler.xml)
@ -64,13 +70,13 @@
public class FSConfigToCSConfigConverter {
public static final Logger LOG = LoggerFactory.getLogger(
FSConfigToCSConfigConverter.class.getName());
public static final String MAPPING_RULES_JSON =
"mapping-rules.json";
private static final String YARN_SITE_XML = "yarn-site.xml";
private static final String CAPACITY_SCHEDULER_XML =
"capacity-scheduler.xml";
private static final String FAIR_SCHEDULER_XML =
"fair-scheduler.xml";
private static final String MAPPING_RULES_JSON =
"mapping-rules.json";
public static final String WARNING_TEXT =
"WARNING: This feature is experimental and not intended " +
@ -99,6 +105,7 @@ public class FSConfigToCSConfigConverter {
private boolean consoleMode = false;
private boolean convertPlacementRules = true;
private String outputDirectory;
private boolean rulesToFile;
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler, ConversionOptions conversionOptions) {
@ -106,7 +113,6 @@ public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
this.conversionOptions = conversionOptions;
this.yarnSiteOutputStream = System.out;
this.capacitySchedulerOutputStream = System.out;
this.mappingRulesOutputStream = System.out;
this.placementConverter = new QueuePlacementConverter();
}
@ -116,6 +122,7 @@ public void convert(FSConfigToCSConfigConverterParams params)
this.clusterResource = getClusterResource(params);
this.convertPlacementRules = params.isConvertPlacementRules();
this.outputDirectory = params.getOutputDirectory();
this.rulesToFile = params.isPlacementRulesToFile();
prepareOutputFiles(params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
@ -127,9 +134,9 @@ public void convert(FSConfigToCSConfigConverterParams params)
private void prepareOutputFiles(boolean console)
throws FileNotFoundException {
if (console) {
LOG.info("Console mode is enabled, " + YARN_SITE_XML + " and" +
" " + CAPACITY_SCHEDULER_XML + " will be only emitted " +
"to the console!");
LOG.info("Console mode is enabled, {}, {} and {} will be only emitted " +
"to the console!",
YARN_SITE_XML, CAPACITY_SCHEDULER_XML, MAPPING_RULES_JSON);
this.consoleMode = true;
return;
}
@ -253,6 +260,10 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {
convertYarnSiteXml(inputYarnSiteConfig);
convertCapacitySchedulerXml(fs);
if (convertPlacementRules) {
performRuleConversion(fs);
}
if (consoleMode) {
System.out.println("======= " + CAPACITY_SCHEDULER_XML + " =======");
}
@ -263,10 +274,6 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {
System.out.println("======= " + YARN_SITE_XML + " =======");
}
convertedYarnSiteConfig.writeXml(yarnSiteOutputStream);
if (convertPlacementRules) {
performRuleConversion(fs);
}
}
private void convertYarnSiteXml(Configuration inputYarnSiteConfig) {
@ -313,28 +320,58 @@ private void performRuleConversion(FairScheduler fs)
fs.getRMContext().getQueuePlacementManager();
if (placementManager.getPlacementRules().size() > 0) {
if (!consoleMode) {
File mappingRulesFile = new File(outputDirectory,
MAPPING_RULES_JSON);
this.mappingRulesOutputStream =
new FileOutputStream(mappingRulesFile);
} else {
System.out.println("======= " + MAPPING_RULES_JSON + " =======");
}
mappingRulesOutputStream = getOutputStreamForJson();
MappingRulesDescription desc =
placementConverter.convertPlacementPolicy(placementManager,
ruleHandler, capacitySchedulerConfig);
ObjectMapper mapper = new ObjectMapper();
// close output stream if we write to a file, leave it open otherwise
if (!consoleMode && rulesToFile) {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, true);
} else {
mapper.configure(JsonGenerator.Feature.AUTO_CLOSE_TARGET, false);
}
ObjectWriter writer = mapper.writer(new DefaultPrettyPrinter());
if (consoleMode && rulesToFile) {
System.out.println("======= " + MAPPING_RULES_JSON + " =======");
}
writer.writeValue(mappingRulesOutputStream, desc);
capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
MAPPING_RULE_FORMAT_JSON);
if (!rulesToFile) {
String json =
((ByteArrayOutputStream)mappingRulesOutputStream)
.toString(StandardCharsets.UTF_8.displayName());
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
}
} else {
LOG.info("No rules to convert");
}
}
/*
* Console RulesToFile OutputStream
* true true System.out / PrintStream
* true false ByteArrayOutputStream
* false true FileOutputStream
* false false ByteArrayOutputStream
*/
private OutputStream getOutputStreamForJson() throws FileNotFoundException {
if (consoleMode && rulesToFile) {
return System.out;
} else if (rulesToFile) {
File mappingRulesFile = new File(outputDirectory,
MAPPING_RULES_JSON);
return new FileOutputStream(mappingRulesFile);
} else {
return new ByteArrayOutputStream();
}
}
private void emitDefaultQueueMaxParallelApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
@ -467,11 +504,6 @@ void setPlacementConverter(QueuePlacementConverter converter) {
this.placementConverter = converter;
}
@VisibleForTesting
void setMappingRulesOutputStream(OutputStream outputStream) {
this.mappingRulesOutputStream = outputStream;
}
@VisibleForTesting
void setConsoleMode(boolean console) {
this.consoleMode = console;

View File

@ -16,6 +16,9 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import java.util.function.Consumer;
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -25,12 +28,16 @@
* Main class that invokes the FS->CS converter.
*
*/
@SuppressWarnings("checkstyle:hideutilityclassconstructor")
public class FSConfigToCSConfigConverterMain {
public final class FSConfigToCSConfigConverterMain {
private FSConfigToCSConfigConverterMain() {
// no instances
}
private static final Logger LOG =
LoggerFactory.getLogger(FSConfigToCSConfigConverterMain.class);
private static final Marker FATAL =
MarkerFactory.getMarker("FATAL");
private static Consumer<Integer> exitFunction = System::exit;
public static void main(String[] args) {
try {
@ -44,11 +51,16 @@ public static void main(String[] args) {
"see previous error messages for details!");
}
System.exit(exitCode);
exitFunction.accept(exitCode);
} catch (Throwable t) {
LOG.error(FATAL,
"Error while starting FS configuration conversion!", t);
System.exit(-1);
exitFunction.accept(-1);
}
}
@VisibleForTesting
static void setExit(Consumer<Integer> exitFunc) {
exitFunction = exitFunc;
}
}

View File

@ -28,7 +28,7 @@ public final class FSConfigToCSConfigConverterParams {
private String clusterResource;
private String outputDirectory;
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private FSConfigToCSConfigConverterParams() {
@ -63,6 +63,10 @@ public boolean isConvertPlacementRules() {
return convertPlacementRules;
}
public boolean isPlacementRulesToFile() {
return placementRulesToFile;
}
@Override
public String toString() {
return "FSConfigToCSConfigConverterParams{" +
@ -72,6 +76,7 @@ public String toString() {
", clusterResource='" + clusterResource + '\'' +
", console=" + console + '\'' +
", convertPlacementRules=" + convertPlacementRules +
", placementRulesToFile=" + placementRulesToFile +
'}';
}
@ -87,6 +92,7 @@ public static final class Builder {
private String clusterResource;
private String outputDirectory;
private boolean convertPlacementRules;
private boolean placementRulesToFile;
private Builder() {
}
@ -130,6 +136,11 @@ public Builder withConvertPlacementRules(boolean convertPlacementRules) {
return this;
}
public Builder withPlacementRulesToFile(boolean rulesToFile) {
this.placementRulesToFile = rulesToFile;
return this;
}
public FSConfigToCSConfigConverterParams build() {
FSConfigToCSConfigConverterParams params =
new FSConfigToCSConfigConverterParams();
@ -140,6 +151,7 @@ public FSConfigToCSConfigConverterParams build() {
params.conversionRulesConfig = this.conversionRulesConfig;
params.outputDirectory = this.outputDirectory;
params.convertPlacementRules = this.convertPlacementRules;
params.placementRulesToFile = this.placementRulesToFile;
return params;
}
}

View File

@ -28,4 +28,7 @@ public class VerificationException extends RuntimeException {
public VerificationException(String message, Throwable cause) {
super(message, cause);
}
public VerificationException() {
}
}

View File

@ -19,7 +19,9 @@
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.doThrow;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.verifyZeroInteractions;
@ -29,6 +31,7 @@
import java.util.List;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.After;
@ -45,8 +48,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
/**
* Unit tests for FSConfigToCSConfigArgumentHandler.
*
@ -229,6 +230,28 @@ public void testInvalidOutputDir() throws Exception {
"precondition error"));
}
@Test
public void testVerificationException() throws Exception {
setupFSConfigConversionFiles(true);
ConversionOptions mockOptions = Mockito.mock(ConversionOptions.class);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(mockOptions, mockValidator);
argumentHandler.setConverterSupplier(this::getMockConverter);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE);
doThrow(new VerificationException("test", new Exception("test")))
.when(mockConverter)
.convert(any(FSConfigToCSConfigConverterParams.class));
argumentHandler.parseAndConvert(args);
verify(mockOptions).handleVerificationFailure(any(Exception.class),
any(String.class));
}
@Test
public void testFairSchedulerXmlIsNotDefinedIfItsDefinedInYarnSiteXml()
throws Exception {
@ -534,7 +557,8 @@ private void testFileExistsInOutputFolder(String file) throws Exception {
String[] args = new String[] {
"-y", FSConfigConverterTestCommons.YARN_SITE_XML,
"-o", FSConfigConverterTestCommons.OUTPUT_DIR};
"-o", FSConfigConverterTestCommons.OUTPUT_DIR,
"-e"};
int retVal = argumentHandler.parseAndConvert(args);
assertEquals("Return value", -1, retVal);
@ -564,6 +588,13 @@ public void testCapacitySchedulerXmlExistsInOutputFolder()
YarnConfiguration.CS_CONFIGURATION_FILE);
}
@Test
public void testMappingRulesJsonExistsInOutputFolder()
throws Exception {
testFileExistsInOutputFolder(
"mapping-rules.json");
}
@Test
public void testPlacementRulesConversionEnabled() throws Exception {
testPlacementRuleConversion(true);

View File

@ -36,7 +36,6 @@
import static org.mockito.Mockito.verifyZeroInteractions;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
@ -463,6 +462,7 @@ public void testConvertCheckOutputDir() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource(CLUSTER_RESOURCE_STRING)
.withConvertPlacementRules(true)
.withPlacementRulesToFile(true)
.build();
converter.convert(params);
@ -612,17 +612,17 @@ private void testUserAsDefaultQueueAndPlacementRules(
config.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
true);
ByteArrayOutputStream jsonOutStream = new ByteArrayOutputStream();
converter.setConvertPlacementRules(true);
converter.setMappingRulesOutputStream(jsonOutStream);
converter.setConsoleMode(true);
converter.convert(config);
String json = converter.getCapacitySchedulerConfig()
.get(CapacitySchedulerConfiguration.MAPPING_RULE_JSON);
MappingRulesDescription description =
new ObjectMapper()
.reader()
.forType(MappingRulesDescription.class)
.readValue(jsonOutStream.toByteArray());
.readValue(json);
if (hasPlacementRules) {
// fs.xml defines 5 rules

View File

@ -21,17 +21,17 @@
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.OUTPUT_DIR;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.YARN_SITE_XML;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.setupFSConfigConversionFiles;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.function.Consumer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.contrib.java.lang.system.ExpectedSystemExit;
/**
@ -40,19 +40,20 @@
*/
public class TestFSConfigToCSConfigConverterMain {
private FSConfigConverterTestCommons converterTestCommons;
@Rule
public final ExpectedSystemExit exit = ExpectedSystemExit.none();
private ExitFunc exitFunc;
@Before
public void setUp() throws Exception {
exitFunc = new ExitFunc();
converterTestCommons = new FSConfigConverterTestCommons();
converterTestCommons.setUp();
FSConfigToCSConfigConverterMain.setExit(exitFunc);
}
@After
public void tearDown() throws Exception {
QueueMetrics.clearQueueMetrics();
FSConfigToCSConfigConverterMain.setExit(System::exit);
converterTestCommons.tearDown();
}
@ -68,7 +69,6 @@ public void tearDown() throws Exception {
public void testConvertFSConfigurationDefaults()
throws Exception {
setupFSConfigConversionFiles();
exit.expectSystemExitWithStatus(0);
FSConfigToCSConfigConverterMain.main(new String[] {
"-o", OUTPUT_DIR,
@ -83,17 +83,18 @@ public void testConvertFSConfigurationDefaults()
assertTrue("capacity-scheduler.xml was not generated", csConfigExists);
assertTrue("yarn-site.xml was not generated", yarnSiteConfigExists);
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testConvertFSConfigurationWithConsoleParam()
throws Exception {
setupFSConfigConversionFiles();
exit.expectSystemExitWithStatus(0);
FSConfigToCSConfigConverterMain.main(new String[] {
"-p",
"-m",
"-e",
"-y", YARN_SITE_XML,
"-f", FS_ALLOC_FILE,
"-r", CONVERSION_RULES_FILE});
@ -105,35 +106,42 @@ public void testConvertFSConfigurationWithConsoleParam()
stdout.contains("======= capacity-scheduler.xml ======="));
assertTrue("Stdout doesn't contain mapping-rules.json",
stdout.contains("======= mapping-rules.json ======="));
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testShortHelpSwitch() {
exit.expectSystemExitWithStatus(0);
FSConfigToCSConfigConverterMain.main(new String[] {"-h"});
verifyHelpText();
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testLongHelpSwitch() {
exit.expectSystemExitWithStatus(0);
FSConfigToCSConfigConverterMain.main(new String[] {"--help"});
verifyHelpText();
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testHelpDisplayedWithoutArgs() {
FSConfigToCSConfigConverterMain.main(new String[] {});
verifyHelpText();
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testConvertFSConfigurationWithLongSwitches()
throws IOException {
exit.expectSystemExitWithStatus(0);
setupFSConfigConversionFiles();
FSConfigToCSConfigConverterMain.main(new String[] {
"--print",
"--convert-placement-rules",
"--rules-to-file",
"--yarnsiteconfig", YARN_SITE_XML,
"--fsconfig", FS_ALLOC_FILE,
"--rulesconfig", CONVERSION_RULES_FILE});
@ -145,6 +153,16 @@ public void testConvertFSConfigurationWithLongSwitches()
stdout.contains("======= capacity-scheduler.xml ======="));
assertTrue("Stdout doesn't contain mapping-rules.json",
stdout.contains("======= mapping-rules.json ======="));
assertEquals("Exit code", 0, exitFunc.exitCode);
}
@Test
public void testNegativeReturnValueOnError() {
FSConfigToCSConfigConverterMain.main(new String[] {
"--print",
"--yarnsiteconfig"});
assertEquals("Exit code", -1, exitFunc.exitCode);
}
private void verifyHelpText() {
@ -152,4 +170,14 @@ private void verifyHelpText() {
assertTrue("Help was not displayed",
stdout.contains("General options are:"));
}
@SuppressWarnings("checkstyle:visibilitymodifier")
class ExitFunc implements Consumer<Integer> {
int exitCode;
@Override
public void accept(Integer t) {
this.exitCode = t.intValue();
}
}
}