YARN-9699. Migration tool that help to generate CS config based on FS config [Phase 1]. Contributed by Peter Bacsko

This commit is contained in:
Szilard Nemeth 2019-10-14 16:40:40 +02:00
parent 5f4641a120
commit 5cc7873a47
35 changed files with 4357 additions and 46 deletions

View File

@ -21,6 +21,10 @@
import com.google.common.annotations.VisibleForTesting;
import com.sun.jersey.spi.container.servlet.ServletContainer;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigArgumentHandler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigArgumentHandler.CliOption;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigConverter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.slf4j.Marker;
@ -136,7 +140,6 @@
import java.net.InetSocketAddress;
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.nio.charset.Charset;
import java.security.PrivilegedExceptionAction;
import java.security.SecureRandom;
@ -226,6 +229,13 @@ public class ResourceManager extends CompositeService
private Configuration conf;
private UserGroupInformation rmLoginUGI;
private static FSConfigToCSConfigArgumentHandler
fsConfigConversionArgumentHandler;
static {
FSConfigToCSConfigConverter converter = initFSConfigConverter();
initFSArgumentHandler(converter);
}
public ResourceManager() {
super("ResourceManager");
@ -1556,6 +1566,22 @@ public static void main(String argv[]) {
} else if (argv[0].equals("-remove-application-from-state-store")
&& argv.length == 2) {
removeApplication(conf, argv[1]);
} else if (argv[0].equals("-convert-fs-configuration")) {
String[] args = Arrays.copyOfRange(argv, 1, argv.length);
try {
int exitCode =
fsConfigConversionArgumentHandler.parseAndConvert(args);
if (exitCode != 0) {
LOG.error(FATAL,
"Error while starting FS configuration conversion, " +
"see previous error messages for details!");
System.exit(exitCode);
}
} catch (Throwable t) {
LOG.error(FATAL,
"Error while starting FS configuration conversion!", t);
System.exit(-1);
}
} else {
printUsage(System.err);
}
@ -1666,6 +1692,13 @@ private static void printUsage(PrintStream out) {
out.println("Usage: yarn resourcemanager [-format-state-store]");
out.println(" "
+ "[-remove-application-from-state-store <appId>]" + "\n");
out.println("[-convert-fs-configuration ");
out.println(FSConfigToCSConfigConverter.WARNING_TEXT);
for (CliOption cliOption : CliOption.values()) {
out.println(" " + cliOption.getAsArgumentString());
}
out.println("]");
}
protected RMAppLifetimeMonitor createRMAppLifetimeMonitor() {
@ -1683,4 +1716,17 @@ private void registerMXBean() {
public boolean isSecurityEnabled() {
return UserGroupInformation.isSecurityEnabled();
}
@VisibleForTesting
static void initFSArgumentHandler(FSConfigToCSConfigConverter converter) {
ResourceManager.fsConfigConversionArgumentHandler =
new FSConfigToCSConfigArgumentHandler(converter);
}
private static FSConfigToCSConfigConverter initFSConfigConverter() {
FSConfigToCSConfigRuleHandler ruleHandler =
new FSConfigToCSConfigRuleHandler();
return new FSConfigToCSConfigConverter(ruleHandler);
}
}

View File

@ -96,6 +96,7 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ReleaseContainerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEventType;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.server.scheduler.OpportunisticContainerContext;
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
@ -182,6 +183,8 @@ public abstract class AbstractYarnScheduler
protected SchedulingMonitorManager schedulingMonitorManager =
new SchedulingMonitorManager();
private boolean migration;
/**
* Construct the service.
*
@ -197,6 +200,9 @@ public AbstractYarnScheduler(String name) {
@Override
public void serviceInit(Configuration conf) throws Exception {
migration =
conf.getBoolean(FairSchedulerConfiguration.MIGRATION_MODE, false);
nmExpireInterval =
conf.getInt(YarnConfiguration.RM_NM_EXPIRY_INTERVAL_MS,
YarnConfiguration.DEFAULT_RM_NM_EXPIRY_INTERVAL_MS);
@ -209,7 +215,10 @@ public void serviceInit(Configuration conf) throws Exception {
nodeTracker.setConfiguredMaxAllocationWaitTime(
configuredMaximumAllocationWaitTime);
maxClusterLevelAppPriority = getMaxPriorityFromConf(conf);
this.releaseCache = new Timer("Pending Container Clear Timer");
if (!migration) {
this.releaseCache = new Timer("Pending Container Clear Timer");
}
autoUpdateContainers =
conf.getBoolean(YarnConfiguration.RM_AUTO_UPDATE_CONTAINERS,
YarnConfiguration.DEFAULT_RM_AUTO_UPDATE_CONTAINERS);
@ -227,11 +236,14 @@ public void serviceInit(Configuration conf) throws Exception {
@Override
protected void serviceStart() throws Exception {
if (updateThread != null) {
updateThread.start();
if (!migration) {
if (updateThread != null) {
updateThread.start();
}
schedulingMonitorManager.startAll();
createReleaseCache();
}
schedulingMonitorManager.startAll();
createReleaseCache();
super.serviceStart();
}

View File

@ -243,18 +243,34 @@ public int getUserMaxApps(String user) {
return (maxApps == null) ? userMaxAppsDefault : maxApps;
}
public Map<String, Integer> getUserMaxApps() {
return userMaxApps;
}
@VisibleForTesting
int getQueueMaxApps(String queue) {
Integer maxApps = queueMaxApps.get(queue);
return (maxApps == null) ? queueMaxAppsDefault : maxApps;
}
public int getQueueMaxAppsDefault() {
return queueMaxAppsDefault;
}
public int getUserMaxAppsDefault() {
return userMaxAppsDefault;
}
@VisibleForTesting
float getQueueMaxAMShare(String queue) {
Float maxAMShare = queueMaxAMShares.get(queue);
return (maxAMShare == null) ? queueMaxAMShareDefault : maxAMShare;
}
public float getQueueMaxAMShareDefault() {
return queueMaxAMShareDefault;
}
/**
* Get the minimum resource allocation for the given queue.
*

View File

@ -137,4 +137,9 @@ void setPercentage(String name, double value) {
}
}
}
public double[] getPercentages() {
return percentages == null ? null :
Arrays.copyOf(percentages, percentages.length);
}
}

View File

@ -184,6 +184,10 @@ public Resource getMaxShare() {
return result;
}
public ConfigurableResource getRawMaxShare() {
return maxShare;
}
public Resource getReservedResource() {
reservedResource.setMemorySize(metrics.getReservedMB());
reservedResource.setVirtualCores(metrics.getReservedVirtualCores());
@ -207,7 +211,7 @@ public int getMaxRunningApps() {
}
@VisibleForTesting
protected float getMaxAMShare() {
public float getMaxAMShare() {
return maxAMShare;
}

View File

@ -213,6 +213,8 @@ public class FairScheduler extends
@VisibleForTesting
Resource reservationThreshold;
private boolean migration;
public FairScheduler() {
super(FairScheduler.class.getName());
context = new FSContext(this);
@ -1428,7 +1430,7 @@ private void initScheduler(Configuration conf) throws IOException {
allocConf = new AllocationConfiguration(this);
queueMgr.initialize();
if (continuousSchedulingEnabled) {
if (continuousSchedulingEnabled && !migration) {
// Continuous scheduling is deprecated log it on startup
LOG.warn("Continuous scheduling is turned ON. It is deprecated " +
"because it can cause scheduler slowness due to locking issues. " +
@ -1441,7 +1443,7 @@ private void initScheduler(Configuration conf) throws IOException {
schedulingThread.setDaemon(true);
}
if (this.conf.getPreemptionEnabled()) {
if (this.conf.getPreemptionEnabled() && !migration) {
createPreemptionThread();
}
} finally {
@ -1495,11 +1497,15 @@ private void startSchedulerThreads() {
@Override
public void serviceInit(Configuration conf) throws Exception {
migration =
conf.getBoolean(FairSchedulerConfiguration.MIGRATION_MODE, false);
initScheduler(conf);
super.serviceInit(conf);
// Initialize SchedulingMonitorManager
schedulingMonitorManager.initialize(rmContext, conf);
if (!migration) {
// Initialize SchedulingMonitorManager
schedulingMonitorManager.initialize(rmContext, conf);
}
}
@Override

View File

@ -85,29 +85,39 @@ public class FairSchedulerConfiguration extends Configuration {
private static final String CONF_PREFIX = "yarn.scheduler.fair.";
/**
* Used during FS->CS conversion. When enabled, background threads are
* not started. This property should NOT be used by end-users!
*/
public static final String MIGRATION_MODE = CONF_PREFIX + "migration.mode";
public static final String ALLOCATION_FILE = CONF_PREFIX + "allocation.file";
protected static final String DEFAULT_ALLOCATION_FILE = "fair-scheduler.xml";
/** Whether pools can be created that were not specified in the FS configuration file
*/
protected static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX + "allow-undeclared-pools";
protected static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
public static final String ALLOW_UNDECLARED_POOLS = CONF_PREFIX +
"allow-undeclared-pools";
public static final boolean DEFAULT_ALLOW_UNDECLARED_POOLS = true;
/** Whether to use the user name as the queue name (instead of "default") if
* the request does not specify a queue. */
protected static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX + "user-as-default-queue";
protected static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
public static final String USER_AS_DEFAULT_QUEUE = CONF_PREFIX +
"user-as-default-queue";
public static final boolean DEFAULT_USER_AS_DEFAULT_QUEUE = true;
protected static final float DEFAULT_LOCALITY_THRESHOLD = -1.0f;
/** Cluster threshold for node locality. */
protected static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX + "locality.threshold.node";
protected static final float DEFAULT_LOCALITY_THRESHOLD_NODE =
public static final String LOCALITY_THRESHOLD_NODE = CONF_PREFIX +
"locality.threshold.node";
public static final float DEFAULT_LOCALITY_THRESHOLD_NODE =
DEFAULT_LOCALITY_THRESHOLD;
/** Cluster threshold for rack locality. */
protected static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX + "locality.threshold.rack";
protected static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
public static final String LOCALITY_THRESHOLD_RACK = CONF_PREFIX +
"locality.threshold.rack";
public static final float DEFAULT_LOCALITY_THRESHOLD_RACK =
DEFAULT_LOCALITY_THRESHOLD;
/**
@ -139,10 +149,10 @@ public class FairSchedulerConfiguration extends Configuration {
* {@link #ASSIGN_MULTIPLE} to improve container allocation ramp up.
*/
@Deprecated
protected static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
public static final String CONTINUOUS_SCHEDULING_ENABLED = CONF_PREFIX +
"continuous-scheduling-enabled";
@Deprecated
protected static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
public static final boolean DEFAULT_CONTINUOUS_SCHEDULING_ENABLED = false;
/**
* Sleep time of each pass in continuous scheduling (5ms in default).
@ -150,21 +160,22 @@ public class FairSchedulerConfiguration extends Configuration {
* Only used when {@link #CONTINUOUS_SCHEDULING_ENABLED} is enabled
*/
@Deprecated
protected static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
public static final String CONTINUOUS_SCHEDULING_SLEEP_MS = CONF_PREFIX +
"continuous-scheduling-sleep-ms";
@Deprecated
protected static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
public static final int DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS = 5;
/** Whether preemption is enabled. */
protected static final String PREEMPTION = CONF_PREFIX + "preemption";
protected static final boolean DEFAULT_PREEMPTION = false;
public static final String PREEMPTION = CONF_PREFIX + "preemption";
public static final boolean DEFAULT_PREEMPTION = false;
protected static final String PREEMPTION_THRESHOLD =
CONF_PREFIX + "preemption.cluster-utilization-threshold";
protected static final float DEFAULT_PREEMPTION_THRESHOLD = 0.8f;
protected static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX + "waitTimeBeforeKill";
protected static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
public static final String WAIT_TIME_BEFORE_KILL = CONF_PREFIX +
"waitTimeBeforeKill";
public static final int DEFAULT_WAIT_TIME_BEFORE_KILL = 15000;
/**
* Postfix for resource allocation increments in the
@ -181,18 +192,19 @@ public class FairSchedulerConfiguration extends Configuration {
* This is intended to be a backdoor on production clusters, and hence
* intentionally not documented.
*/
protected static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
public static final String WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS =
CONF_PREFIX + "waitTimeBeforeNextStarvationCheck";
protected static final long
public static final long
DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS = 10000;
/** Whether to assign multiple containers in one check-in. */
public static final String ASSIGN_MULTIPLE = CONF_PREFIX + "assignmultiple";
protected static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
public static final boolean DEFAULT_ASSIGN_MULTIPLE = false;
/** Whether to give more weight to apps requiring many resources. */
protected static final String SIZE_BASED_WEIGHT = CONF_PREFIX + "sizebasedweight";
protected static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
public static final String SIZE_BASED_WEIGHT = CONF_PREFIX +
"sizebasedweight";
public static final boolean DEFAULT_SIZE_BASED_WEIGHT = false;
/** Maximum number of containers to assign on each check-in. */
public static final String DYNAMIC_MAX_ASSIGN =
@ -203,8 +215,8 @@ public class FairSchedulerConfiguration extends Configuration {
* Specify exact number of containers to assign on each heartbeat, if dynamic
* max assign is turned off.
*/
protected static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
protected static final int DEFAULT_MAX_ASSIGN = -1;
public static final String MAX_ASSIGN = CONF_PREFIX + "max.assign";
public static final int DEFAULT_MAX_ASSIGN = -1;
/** The update interval for calculating resources in FairScheduler .*/
public static final String UPDATE_INTERVAL_MS =

View File

@ -0,0 +1,37 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
/**
* Thrown when the FS-to-CS converter logic encounters a
* condition from which it cannot recover (eg. unsupported
* settings).
*
*/
public class ConversionException extends RuntimeException {
private static final long serialVersionUID = 4161836727287317835L;
public ConversionException(String message) {
super(message);
}
public ConversionException(String message, Throwable cause) {
super(message, cause);
}
}

View File

@ -0,0 +1,214 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.commons.cli.CommandLine;
import org.apache.commons.cli.GnuParser;
import org.apache.commons.cli.MissingArgumentException;
import org.apache.commons.cli.Option;
import org.apache.commons.cli.Options;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
/**
* Parses arguments passed to the FS->CS converter.
* If the arguments are valid, it calls the converter itself.
*
*/
public class FSConfigToCSConfigArgumentHandler {
private static final Logger LOG =
LoggerFactory.getLogger(FSConfigToCSConfigArgumentHandler.class);
private final FSConfigToCSConfigConverter converter;
public FSConfigToCSConfigArgumentHandler(FSConfigToCSConfigConverter
converter) {
this.converter = converter;
}
/**
* Represents options for the converter CLI.
*
*/
public enum CliOption {
YARN_SITE("yarn-site.xml", "y", "yarnsiteconfig",
"Path to a valid yarn-site.xml config file", true, true),
// fair-scheduler.xml is not mandatory
// if FairSchedulerConfiguration.ALLOCATION_FILE is defined in yarn-site.xml
FAIR_SCHEDULER("fair-scheduler.xml", "f", "fsconfig",
"Path to a valid fair-scheduler.xml config file", false, true),
CONVERSION_RULES("conversion rules config file", "r", "rulesconfig",
"Optional parameter. If given, should specify a valid path to the " +
"conversion rules file (property format).", false, true),
CONSOLE_MODE("console mode", "p", "print",
"If defined, the converted configuration will " +
"only be emitted to the console.", false, false),
CLUSTER_RESOURCE("cluster resource", "c", "cluster-resource",
"Needs to be given if maxResources is defined as percentages " +
"for any queue, otherwise this parameter can be omitted.",
false, true),
OUTPUT_DIR("output directory", "o", "output-directory",
"Output directory for yarn-site.xml and" +
" capacity-scheduler.xml files." +
"Must have write permission for user who is running this script.",
true, true);
private final String name;
private final String shortSwitch;
private final String longSwitch;
private final String description;
private final boolean required;
private final boolean hasArg;
CliOption(String name, String shortSwitch, String longSwitch,
String description, boolean required, boolean hasArg) {
this.name = name;
this.shortSwitch = shortSwitch;
this.longSwitch = longSwitch;
this.description = description;
this.required = required;
this.hasArg = hasArg;
}
public Option createCommonsCliOption() {
Option option = new Option(shortSwitch, longSwitch, hasArg, description);
option.setRequired(required);
return option;
}
public String getAsArgumentString() {
return shortSwitch + "|" + longSwitch + ": " + description;
}
}
public int parseAndConvert(String[] args) throws Exception {
Options opts = createOptions();
try {
CommandLine cliParser = new GnuParser().parse(opts, args);
checkOptionPresent(cliParser, CliOption.YARN_SITE);
checkOptionPresent(cliParser, CliOption.OUTPUT_DIR);
FSConfigToCSConfigConverterParams params = validateInputFiles(cliParser);
converter.convert(params);
} catch (MissingArgumentException e) {
String msg = "Missing argument for options" + e.getMessage();
logAndStdErr(e, msg);
return -1;
} catch (PreconditionException e) {
String msg = "Cannot start FS config conversion due to the following"
+ " precondition error: " + e.getMessage();
logAndStdErr(e, msg);
return -1;
} catch (UnsupportedPropertyException e) {
String msg = "Unsupported property/setting encountered during FS config "
+ "conversion: " + e.getMessage();
logAndStdErr(e, msg);
return -1;
} catch (ConversionException | IllegalArgumentException e) {
String msg = "Fatal error during FS config conversion: " + e.getMessage();
logAndStdErr(e, msg);
return -1;
}
return 0;
}
private void logAndStdErr(Exception e, String msg) {
LOG.error(msg, e);
System.err.println(msg);
}
private Options createOptions() {
Options opts = new Options();
for (CliOption cliOption : CliOption.values()) {
opts.addOption(cliOption.createCommonsCliOption());
}
return opts;
}
private FSConfigToCSConfigConverterParams validateInputFiles(
CommandLine cliParser) {
String yarnSiteXmlFile =
cliParser.getOptionValue(CliOption.YARN_SITE.shortSwitch);
String fairSchedulerXmlFile =
cliParser.getOptionValue(CliOption.FAIR_SCHEDULER.shortSwitch);
String conversionRulesFile =
cliParser.getOptionValue(CliOption.CONVERSION_RULES.shortSwitch);
String outputDir =
cliParser.getOptionValue(CliOption.OUTPUT_DIR.shortSwitch);
checkFile(CliOption.YARN_SITE, yarnSiteXmlFile);
checkFile(CliOption.FAIR_SCHEDULER, fairSchedulerXmlFile);
checkFile(CliOption.CONVERSION_RULES, conversionRulesFile);
checkDirectory(CliOption.OUTPUT_DIR, outputDir);
return FSConfigToCSConfigConverterParams.Builder.create()
.withYarnSiteXmlConfig(yarnSiteXmlFile)
.withFairSchedulerXmlConfig(fairSchedulerXmlFile)
.withConversionRulesConfig(conversionRulesFile)
.withClusterResource(
cliParser.getOptionValue(CliOption.CLUSTER_RESOURCE.shortSwitch))
.withConsole(cliParser.hasOption(CliOption.CONSOLE_MODE.shortSwitch))
.withOutputDirectory(outputDir)
.build();
}
private static void checkOptionPresent(CommandLine cliParser,
CliOption cliOption) {
if (!cliParser.hasOption(cliOption.shortSwitch)) {
throw new PreconditionException(
String.format("Missing %s parameter " + "(switch: %s|%s).",
cliOption.name, cliOption.shortSwitch, cliOption.longSwitch));
}
}
private static void checkFile(CliOption cliOption, String filePath) {
checkFileInternal(cliOption, filePath, true);
}
private static void checkDirectory(CliOption cliOption, String dirPath) {
checkFileInternal(cliOption, dirPath, false);
}
private static void checkFileInternal(CliOption cliOption, String filePath,
boolean isFile) {
//We can safely ignore null here as files / dirs were checked before
if (filePath == null) {
return;
}
File file = new File(filePath);
if (isFile && file.isDirectory()) {
throw new PreconditionException(
String.format("Specified path %s is a directory but should be " +
" a file (As value of parameter %s)", filePath, cliOption.name));
} else if (!isFile && !file.isDirectory()) {
throw new PreconditionException(
String.format("Specified path %s is not a directory " +
"(As value of parameter %s)", filePath, cliOption.name));
} else if (!file.exists()) {
throw new PreconditionException(
String.format("Specified path %s does not exist " +
"(As value of parameter %s)", filePath, cliOption.name));
}
}
}

View File

@ -0,0 +1,368 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.Map;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.authorize.AccessControlList;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.security.AccessType;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.common.annotations.VisibleForTesting;
/**
* Converts Fair Scheduler configuration (site and fair-scheduler.xml)
* to Capacity Scheduler. The mapping is not 100% perfect due to
* feature gaps. These will be addressed in the future.
*/
public class FSConfigToCSConfigConverter {
public static final Logger LOG = LoggerFactory.getLogger(
FSConfigToCSConfigConverter.class.getName());
private static final String YARN_SITE_XML = "yarn-site.xml";
private static final String CAPACITY_SCHEDULER_XML =
"capacity-scheduler.xml";
private static final String FAIR_SCHEDULER_XML =
"fair-scheduler.xml";
public static final String WARNING_TEXT =
"WARNING: This feature is experimental and not intended " +
"for production use!";
private Resource clusterResource;
private boolean preemptionEnabled = false;
private int queueMaxAppsDefault;
private float queueMaxAMShareDefault;
private boolean autoCreateChildQueues = false;
private boolean sizeBasedWeight = false;
private boolean userAsDefaultQueue = false;
private Configuration yarnSiteConfig;
private Configuration capacitySchedulerConfig;
private FSConfigToCSConfigRuleHandler ruleHandler;
private OutputStream yarnSiteOutputStream;
private OutputStream capacitySchedulerOutputStream;
private boolean consoleMode = false;
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
ruleHandler) {
this.ruleHandler = ruleHandler;
this.yarnSiteOutputStream = System.out;
this.capacitySchedulerOutputStream = System.out;
}
public void convert(FSConfigToCSConfigConverterParams params)
throws Exception {
validateParams(params);
prepareOutputFiles(params.getOutputDirectory(), params.isConsole());
loadConversionRules(params.getConversionRulesConfig());
Configuration conf = createConfiguration(params);
handleFairSchedulerConfig(params, conf);
this.clusterResource = getClusterResource(params);
convert(conf);
}
private void prepareOutputFiles(String outputDirectory, boolean console)
throws FileNotFoundException {
if (console) {
LOG.info("Console mode is enabled, " + YARN_SITE_XML + " and" +
" " + CAPACITY_SCHEDULER_XML + " will be only emitted " +
"to the console!");
this.consoleMode = true;
return;
}
File yarnSiteXmlOutput = new File(outputDirectory,
YARN_SITE_XML);
File schedulerXmlOutput = new File(outputDirectory,
CAPACITY_SCHEDULER_XML);
LOG.info("Output directory for " + YARN_SITE_XML + " and" +
" " + CAPACITY_SCHEDULER_XML + " is: {}", outputDirectory);
this.yarnSiteOutputStream = new FileOutputStream(yarnSiteXmlOutput);
this.capacitySchedulerOutputStream =
new FileOutputStream(schedulerXmlOutput);
}
private void validateParams(FSConfigToCSConfigConverterParams params) {
if (params.getYarnSiteXmlConfig() == null) {
throw new PreconditionException("" + YARN_SITE_XML + " configuration " +
"is not defined but it is mandatory!");
} else if (params.getOutputDirectory() == null && !params.isConsole()) {
throw new PreconditionException("Output directory configuration " +
"is not defined but it is mandatory!");
}
}
private Resource getClusterResource(
FSConfigToCSConfigConverterParams params) {
Resource resource = null;
if (params.getClusterResource() != null) {
ConfigurableResource configurableResource;
try {
configurableResource = FairSchedulerConfiguration
.parseResourceConfigValue(params.getClusterResource());
} catch (AllocationConfigurationException e) {
throw new ConversionException("Error while parsing resource.", e);
}
resource = configurableResource.getResource();
}
return resource;
}
private void loadConversionRules(String rulesFile) throws IOException {
if (rulesFile != null) {
LOG.info("Reading conversion rules file from: " + rulesFile);
this.ruleHandler.loadRulesFromFile(rulesFile);
} else {
LOG.info("Conversion rules file is not defined, " +
"using default conversion config!");
}
}
private Configuration createConfiguration(
FSConfigToCSConfigConverterParams params) {
Configuration conf = new YarnConfiguration();
conf.addResource(new Path(params.getYarnSiteXmlConfig()));
conf.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
return conf;
}
private void handleFairSchedulerConfig(
FSConfigToCSConfigConverterParams params, Configuration conf) {
String fairSchedulerXmlConfig = params.getFairSchedulerXmlConfig();
// Don't override allocation file in conf yet, as it would ruin the second
// condition here
if (fairSchedulerXmlConfig != null) {
LOG.info("Using explicitly defined " + FAIR_SCHEDULER_XML);
} else if (conf.get(FairSchedulerConfiguration.ALLOCATION_FILE) != null) {
LOG.info("Using " + FAIR_SCHEDULER_XML + " defined in " +
YARN_SITE_XML + " by key: " +
FairSchedulerConfiguration.ALLOCATION_FILE);
} else {
throw new PreconditionException("" + FAIR_SCHEDULER_XML +
" is not defined neither in " + YARN_SITE_XML +
"(with property: " + FairSchedulerConfiguration.ALLOCATION_FILE +
") nor directly with its own parameter!");
}
// We can now safely override allocation file in conf
if (fairSchedulerXmlConfig != null) {
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE,
params.getFairSchedulerXmlConfig());
}
}
@VisibleForTesting
void convert(Configuration conf) throws Exception {
System.out.println(WARNING_TEXT);
// initialize Fair Scheduler
RMContext ctx = new RMContextImpl();
PlacementManager placementManager = new PlacementManager();
ctx.setQueuePlacementManager(placementManager);
FairScheduler fs = new FairScheduler();
fs.setRMContext(ctx);
fs.init(conf);
AllocationConfiguration allocConf = fs.getAllocationConfiguration();
queueMaxAppsDefault = allocConf.getQueueMaxAppsDefault();
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
yarnSiteConfig = new Configuration(false);
capacitySchedulerConfig = new Configuration(false);
checkUserMaxApps(allocConf);
checkUserMaxAppsDefault(allocConf);
convertYarnSiteXml(conf);
convertCapacitySchedulerXml(fs);
if (consoleMode) {
System.out.println("======= " + CAPACITY_SCHEDULER_XML + " =======");
}
capacitySchedulerConfig.writeXml(capacitySchedulerOutputStream);
if (consoleMode) {
System.out.println();
System.out.println("======= " + YARN_SITE_XML + " =======");
}
yarnSiteConfig.writeXml(yarnSiteOutputStream);
}
@VisibleForTesting
void setYarnSiteOutputStream(OutputStream out) {
this.yarnSiteOutputStream = out;
}
@VisibleForTesting
void setCapacitySchedulerConfigOutputStream(OutputStream out) {
this.capacitySchedulerOutputStream = out;
}
private void convertYarnSiteXml(Configuration conf) {
FSYarnSiteConverter siteConverter =
new FSYarnSiteConverter();
siteConverter.convertSiteProperties(conf, yarnSiteConfig);
autoCreateChildQueues = siteConverter.isAutoCreateChildQueues();
preemptionEnabled = siteConverter.isPreemptionEnabled();
sizeBasedWeight = siteConverter.isSizeBasedWeight();
userAsDefaultQueue = siteConverter.isUserAsDefaultQueue();
checkReservationSystem(conf);
}
private void convertCapacitySchedulerXml(FairScheduler fs) {
FSParentQueue rootQueue = fs.getQueueManager().getRootQueue();
emitDefaultMaxApplications();
emitDefaultMaxAMShare();
FSQueueConverter queueConverter = new FSQueueConverter(ruleHandler,
capacitySchedulerConfig,
preemptionEnabled,
sizeBasedWeight,
autoCreateChildQueues,
clusterResource,
queueMaxAMShareDefault,
queueMaxAppsDefault);
queueConverter.convertQueueHierarchy(rootQueue);
emitACLs(fs);
PlacementManager placementManager =
fs.getRMContext().getQueuePlacementManager();
if (placementManager.getPlacementRules().size() > 0) {
QueuePlacementConverter placementConverter =
new QueuePlacementConverter();
Map<String, String> properties =
placementConverter.convertPlacementPolicy(placementManager,
ruleHandler, userAsDefaultQueue);
properties.forEach((k, v) -> capacitySchedulerConfig.set(k, v));
}
// Validate ordering policy
if (queueConverter.isDrfPolicyUsedOnQueueLevel()) {
if (queueConverter.isFifoOrFairSharePolicyUsed()) {
throw new ConversionException(
"DRF ordering policy cannot be used together with fifo/fair");
} else {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class.getCanonicalName());
}
}
}
private void emitDefaultMaxApplications() {
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS,
String.valueOf(queueMaxAppsDefault));
}
}
private void emitDefaultMaxAMShare() {
capacitySchedulerConfig.set(
CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
String.valueOf(queueMaxAMShareDefault));
}
private void emitACLs(FairScheduler fs) {
fs.getAllocationConfiguration().getQueueAcls()
.forEach(this::generateQueueAcl);
}
private void generateQueueAcl(String queue,
Map<AccessType, AccessControlList> access) {
AccessControlList submitAcls = access.get(AccessType.SUBMIT_APP);
AccessControlList adminAcls = access.get(AccessType.ADMINISTER_QUEUE);
if (!submitAcls.getGroups().isEmpty() ||
!submitAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
submitAcls.getAclString());
}
if (!adminAcls.getGroups().isEmpty() ||
!adminAcls.getUsers().isEmpty()) {
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
adminAcls.getAclString());
}
}
private void checkReservationSystem(Configuration conf) {
if (conf.getBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE,
YarnConfiguration.DEFAULT_RM_RESERVATION_SYSTEM_ENABLE)) {
ruleHandler.handleReservationSystem();
}
}
private void checkUserMaxApps(AllocationConfiguration allocConf) {
if (allocConf.getUserMaxApps() != null
&& allocConf.getUserMaxApps().size() > 0) {
ruleHandler.handleUserMaxApps();
}
}
private void checkUserMaxAppsDefault(AllocationConfiguration allocConf) {
if (allocConf.getUserMaxAppsDefault() > 0) {
ruleHandler.handleUserMaxAppsDefault();
}
}
@VisibleForTesting
Resource getClusterResource() {
return clusterResource;
}
@VisibleForTesting
public void setClusterResource(Resource clusterResource) {
this.clusterResource = clusterResource;
}
@VisibleForTesting
FSConfigToCSConfigRuleHandler getRuleHandler() {
return ruleHandler;
}
}

View File

@ -0,0 +1,131 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
/**
* POJO that holds values for the FS->CS converter.
*
*/
public final class FSConfigToCSConfigConverterParams {
private String yarnSiteXmlConfig;
private String fairSchedulerXmlConfig;
private String conversionRulesConfig;
private boolean console;
private String clusterResource;
private String outputDirectory;
private FSConfigToCSConfigConverterParams() {
//must use builder
}
public String getFairSchedulerXmlConfig() {
return fairSchedulerXmlConfig;
}
public String getYarnSiteXmlConfig() {
return yarnSiteXmlConfig;
}
public String getConversionRulesConfig() {
return conversionRulesConfig;
}
public String getClusterResource() {
return clusterResource;
}
public boolean isConsole() {
return console;
}
public String getOutputDirectory() {
return outputDirectory;
}
@Override
public String toString() {
return "FSConfigToCSConfigConverterParams{" +
"yarnSiteXmlConfig='" + yarnSiteXmlConfig + '\'' +
", fairSchedulerXmlConfig='" + fairSchedulerXmlConfig + '\'' +
", conversionRulesConfig='" + conversionRulesConfig + '\'' +
", clusterResource='" + clusterResource + '\'' +
", console=" + console +
'}';
}
/**
* Builder that can construct FSConfigToCSConfigConverterParams objects.
*
*/
public static final class Builder {
private String yarnSiteXmlConfig;
private String fairSchedulerXmlConfig;
private String conversionRulesConfig;
private boolean console;
private String clusterResource;
private String outputDirectory;
private Builder() {
}
public static Builder create() {
return new Builder();
}
public Builder withYarnSiteXmlConfig(String config) {
this.yarnSiteXmlConfig = config;
return this;
}
public Builder withFairSchedulerXmlConfig(String config) {
this.fairSchedulerXmlConfig = config;
return this;
}
public Builder withConversionRulesConfig(String config) {
this.conversionRulesConfig = config;
return this;
}
public Builder withClusterResource(String res) {
this.clusterResource = res;
return this;
}
public Builder withConsole(boolean console) {
this.console = console;
return this;
}
public Builder withOutputDirectory(String outputDir) {
this.outputDirectory = outputDir;
return this;
}
public FSConfigToCSConfigConverterParams build() {
FSConfigToCSConfigConverterParams params =
new FSConfigToCSConfigConverterParams();
params.clusterResource = this.clusterResource;
params.console = this.console;
params.fairSchedulerXmlConfig = this.fairSchedulerXmlConfig;
params.yarnSiteXmlConfig = this.yarnSiteXmlConfig;
params.conversionRulesConfig = this.conversionRulesConfig;
params.outputDirectory = this.outputDirectory;
return params;
}
}
}

View File

@ -0,0 +1,229 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static java.lang.String.format;
import java.io.File;
import java.io.FileInputStream;
import java.io.IOException;
import java.io.InputStream;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import org.apache.commons.lang3.StringUtils;
import com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* Class that determines what should happen if the FS->CS converter
* encounters a property that is currently not supported.
*
* Acceptable values are either "abort" or "warning".
*/
public class FSConfigToCSConfigRuleHandler {
private static final Logger LOG =
LoggerFactory.getLogger(FSConfigToCSConfigRuleHandler.class);
public static final String MAX_CHILD_QUEUE_LIMIT =
"maxChildQueue.limit";
public static final String MAX_CAPACITY_PERCENTAGE =
"maxCapacityPercentage.action";
public static final String MAX_CHILD_CAPACITY =
"maxChildCapacity.action";
public static final String USER_MAX_RUNNING_APPS =
"userMaxRunningApps.action";
public static final String USER_MAX_APPS_DEFAULT =
"userMaxAppsDefault.action";
public static final String DYNAMIC_MAX_ASSIGN =
"dynamicMaxAssign.action";
public static final String SPECIFIED_NOT_FIRST =
"specifiedNotFirstRule.action";
public static final String RESERVATION_SYSTEM =
"reservationSystem.action";
public static final String QUEUE_AUTO_CREATE =
"queueAutoCreate.action";
@VisibleForTesting
enum RuleAction {
WARNING,
ABORT
}
private Map<String, RuleAction> actions;
private Properties properties;
void loadRulesFromFile(String ruleFile) throws IOException {
if (ruleFile == null) {
throw new IllegalArgumentException("Rule file cannot be null!");
}
properties = new Properties();
try (InputStream is = new FileInputStream(new File(ruleFile))) {
properties.load(is);
}
actions = new HashMap<>();
initPropertyActions();
}
public FSConfigToCSConfigRuleHandler() {
properties = new Properties();
actions = new HashMap<>();
}
@VisibleForTesting
FSConfigToCSConfigRuleHandler(Properties props) {
properties = props;
actions = new HashMap<>();
initPropertyActions();
}
private void initPropertyActions() {
setActionForProperty(MAX_CAPACITY_PERCENTAGE);
setActionForProperty(MAX_CHILD_CAPACITY);
setActionForProperty(USER_MAX_RUNNING_APPS);
setActionForProperty(USER_MAX_APPS_DEFAULT);
setActionForProperty(DYNAMIC_MAX_ASSIGN);
setActionForProperty(SPECIFIED_NOT_FIRST);
setActionForProperty(RESERVATION_SYSTEM);
setActionForProperty(QUEUE_AUTO_CREATE);
}
public void handleMaxCapacityPercentage(String queueName) {
handle(MAX_CAPACITY_PERCENTAGE, null,
format("<maxResources> defined in percentages for queue %s",
queueName));
}
public void handleMaxChildCapacity() {
handle(MAX_CHILD_CAPACITY, "<maxChildResources>", null);
}
public void handleChildQueueCount(String queue, int count) {
String value = properties.getProperty(MAX_CHILD_QUEUE_LIMIT);
if (value != null) {
if (StringUtils.isNumeric(value)) {
int maxChildQueue = Integer.parseInt(value);
if (count > maxChildQueue) {
throw new ConversionException(
format("Queue %s has too many children: %d", queue, count));
}
} else {
throw new ConversionException(
"Rule setting: maxChildQueue.limit is not an integer");
}
}
}
public void handleUserMaxApps() {
handle(USER_MAX_RUNNING_APPS, "<maxRunningApps>", null);
}
public void handleUserMaxAppsDefault() {
handle(USER_MAX_APPS_DEFAULT, "<userMaxAppsDefault>", null);
}
public void handleDynamicMaxAssign() {
handle(DYNAMIC_MAX_ASSIGN,
FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, null);
}
public void handleSpecifiedNotFirstRule() {
handle(SPECIFIED_NOT_FIRST,
null,
"The <specified> tag is not the first placement rule, this cannot be"
+ " converted properly");
}
public void handleReservationSystem() {
handle(RESERVATION_SYSTEM,
null,
"Conversion of reservation system is not supported");
}
public void handleQueueAutoCreate(String placementRule) {
handle(QUEUE_AUTO_CREATE,
null,
format(
"Placement rules: queue auto-create is not supported (type: %s)",
placementRule));
}
private void handle(String actionName, String fsSetting, String message) {
RuleAction action = actions.get(actionName);
if (action != null) {
switch (action) {
case ABORT:
String exceptionMessage;
if (message != null) {
exceptionMessage = message;
} else {
exceptionMessage = format("Setting %s is not supported", fsSetting);
}
throw new UnsupportedPropertyException(exceptionMessage);
case WARNING:
if (message != null) {
LOG.warn(message);
} else {
LOG.warn("Setting {} is not supported, ignoring conversion",
fsSetting);
}
break;
default:
throw new IllegalArgumentException(
"Unknown action " + action);
}
}
}
private void setActionForProperty(String property) {
String action = properties.getProperty(property);
if (action == null) {
LOG.info("No rule set for {}, defaulting to WARNING", property);
actions.put(property, RuleAction.WARNING);
} else if (action.equalsIgnoreCase("warning")) {
actions.put(property, RuleAction.WARNING);
} else if (action.equalsIgnoreCase("abort")) {
actions.put(property, RuleAction.ABORT);
} else {
LOG.warn("Unknown action {} set for rule {}, defaulting to WARNING",
action, property);
actions.put(property, RuleAction.WARNING);
}
}
@VisibleForTesting
public Map<String, RuleAction> getActions() {
return actions;
}
}

View File

@ -0,0 +1,493 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.mutable.MutableBoolean;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* Converts a Fair Schedule queue hierarchy to Capacity Scheduler
* configuration.
*
*/
public class FSQueueConverter {
private static final int MAX_RUNNING_APPS_UNSET = Integer.MIN_VALUE;
private final Set<String> leafQueueNames;
private final FSConfigToCSConfigRuleHandler ruleHandler;
private Configuration capacitySchedulerConfig;
private final boolean preemptionEnabled;
private final boolean sizeBasedWeight;
private final Resource clusterResource;
private final float queueMaxAMShareDefault;
private final boolean autoCreateChildQueues;
private final int queueMaxAppsDefault;
private boolean fifoOrFairSharePolicyUsed;
private boolean drfPolicyUsedOnQueueLevel;
@SuppressWarnings("checkstyle:parameternumber")
public FSQueueConverter(FSConfigToCSConfigRuleHandler ruleHandler,
Configuration capacitySchedulerConfig,
boolean preemptionEnabled,
boolean sizeBasedWeight,
boolean autoCreateChildQueues,
Resource clusterResource,
float queueMaxAMShareDefault,
int queueMaxAppsDefault) {
this.leafQueueNames = new HashSet<>();
this.ruleHandler = ruleHandler;
this.capacitySchedulerConfig = capacitySchedulerConfig;
this.preemptionEnabled = preemptionEnabled;
this.sizeBasedWeight = sizeBasedWeight;
this.clusterResource = clusterResource;
this.queueMaxAMShareDefault = queueMaxAMShareDefault;
this.autoCreateChildQueues = autoCreateChildQueues;
this.queueMaxAppsDefault = queueMaxAppsDefault;
}
@SuppressWarnings("checkstyle:linelength")
public void convertQueueHierarchy(FSQueue queue) {
List<FSQueue> children = queue.getChildQueues();
final String queueName = queue.getName();
if (queue instanceof FSLeafQueue) {
String shortName = getQueueShortName(queueName);
if (!leafQueueNames.add(shortName)) {
throw new ConversionException(
"Leaf queues must be unique, "
+ shortName + " is defined at least twice");
}
}
emitChildQueues(queueName, children);
emitMaxAMShare(queueName, queue);
emitMaxRunningApps(queueName, queue);
emitMaxAllocations(queueName, queue);
emitPreemptionDisabled(queueName, queue);
// TODO: COULD BE incorrect! Needs further clarifications
emitChildCapacity(queue);
emitMaximumCapacity(queueName, queue);
emitAutoCreateChildQueue(queueName);
emitSizeBasedWeight(queueName);
emitOrderingPolicy(queueName, queue);
checkMaxChildCapacitySetting(queue);
for (FSQueue childQueue : children) {
convertQueueHierarchy(childQueue);
}
}
public boolean isFifoOrFairSharePolicyUsed() {
return fifoOrFairSharePolicyUsed;
}
public boolean isDrfPolicyUsedOnQueueLevel() {
return drfPolicyUsedOnQueueLevel;
}
/**
* Generates yarn.scheduler.capacity.&lt;queue-name&gt;.queues.
* @param queueName
* @param children
*/
private void emitChildQueues(String queueName, List<FSQueue> children) {
ruleHandler.handleChildQueueCount(queueName, children.size());
if (children.size() > 0) {
String childQueues = children.stream()
.map(child -> getQueueShortName(child.getName()))
.collect(Collectors.joining(","));
capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues);
}
}
/**
* &lt;maxAMShare&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-am-resource-percent.
* @param queueName
* @param queue
*/
private void emitMaxAMShare(String queueName, FSQueue queue) {
float queueMaxAmShare = queue.getMaxAMShare();
// Direct floating point comparison is OK here
if (queueMaxAmShare != 0.0f
&& queueMaxAmShare != queueMaxAMShareDefault
&& queueMaxAmShare != -1.0f) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-am-resource-percent", String.valueOf(queueMaxAmShare));
}
if (queueMaxAmShare == -1.0f) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-am-resource-percent", "1.0");
}
}
/**
* &lt;maxRunningApps&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-applications.
* @param queueName
* @param queue
*/
private void emitMaxRunningApps(String queueName, FSQueue queue) {
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-applications",
String.valueOf(queue.getMaxRunningApps()));
}
}
/**
* &lt;maxResources&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-capacity.
* @param queueName
* @param queue
*/
private void emitMaximumCapacity(String queueName, FSQueue queue) {
ConfigurableResource rawMaxShare = queue.getRawMaxShare();
final Resource maxResource = rawMaxShare.getResource();
long memSize = 0;
long vCores = 0;
boolean defined = false;
if (maxResource == null) {
if (rawMaxShare.getPercentages() != null) {
if (clusterResource == null) {
throw new ConversionException(
String.format("<maxResources> defined in percentages for" +
" queue %s, but cluster resource parameter is not" +
" defined via CLI!", queueName));
}
ruleHandler.handleMaxCapacityPercentage(queueName);
double[] percentages = rawMaxShare.getPercentages();
int memIndex = ResourceUtils.getResourceTypeIndex().get("memory-mb");
int vcoreIndex = ResourceUtils.getResourceTypeIndex().get("vcores");
memSize = (long) (percentages[memIndex] *
clusterResource.getMemorySize());
vCores = (long) (percentages[vcoreIndex] *
clusterResource.getVirtualCores());
defined = true;
} else {
throw new PreconditionException(
"Illegal ConfigurableResource = " + rawMaxShare);
}
} else if (isNotUnboundedResource(maxResource)) {
memSize = maxResource.getMemorySize();
vCores = maxResource.getVirtualCores();
defined = true;
}
if (defined) {
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
String.format("[memory=%d, vcores=%d]", memSize, vCores));
}
}
/**
* &lt;maxContainerAllocation&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.maximum-allocation-mb
* / vcores.
* @param queueName
* @param queue
*/
private void emitMaxAllocations(String queueName, FSQueue queue) {
Resource maxAllocation = queue.getMaximumContainerAllocation();
if (isNotUnboundedResource(maxAllocation)) {
long parentMaxVcores = Integer.MIN_VALUE;
long parentMaxMemory = Integer.MIN_VALUE;
if (queue.getParent() != null) {
FSQueue parent = queue.getParent();
Resource parentMaxAllocation = parent.getMaximumContainerAllocation();
if (isNotUnboundedResource(parentMaxAllocation)) {
parentMaxVcores = parentMaxAllocation.getVirtualCores();
parentMaxMemory = parentMaxAllocation.getMemorySize();
}
}
long maxVcores = maxAllocation.getVirtualCores();
long maxMemory = maxAllocation.getMemorySize();
// only emit max allocation if it differs from the parent's setting
if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-mb", String.valueOf(maxMemory));
capacitySchedulerConfig.set(PREFIX + queueName +
".maximum-allocation-vcores", String.valueOf(maxVcores));
}
}
}
/**
* &lt;allowPreemptionFrom&gt;
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.disable_preemption.
* @param queueName
* @param queue
*/
private void emitPreemptionDisabled(String queueName, FSQueue queue) {
if (preemptionEnabled && !queue.isPreemptable()) {
capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption",
"true");
}
}
/**
* yarn.scheduler.fair.allow-undeclared-pools
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;
* .auto-create-child-queue.enabled.
* @param queueName
*/
private void emitAutoCreateChildQueue(String queueName) {
if (autoCreateChildQueues) {
capacitySchedulerConfig.setBoolean(PREFIX + queueName +
".auto-create-child-queue.enabled", true);
}
}
/**
* yarn.scheduler.fair.sizebasedweight ==>
* yarn.scheduler.capacity.&lt;queue-path&gt;
* .ordering-policy.fair.enable-size-based-weight.
* @param queueName
*/
private void emitSizeBasedWeight(String queueName) {
if (sizeBasedWeight) {
capacitySchedulerConfig.setBoolean(PREFIX + queueName +
".ordering-policy.fair.enable-size-based-weight", true);
}
}
/**
* &lt;schedulingPolicy&gt;
* ==> yarn.scheduler.capacity.&lt;queue-path&gt;.ordering-policy.
* @param queueName
* @param queue
*/
private void emitOrderingPolicy(String queueName, FSQueue queue) {
String policy = queue.getPolicy().getName();
switch (policy) {
case FairSharePolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FairSharePolicy.NAME);
fifoOrFairSharePolicyUsed = true;
break;
case FifoPolicy.NAME:
capacitySchedulerConfig.set(PREFIX + queueName
+ ".ordering-policy", FifoPolicy.NAME);
fifoOrFairSharePolicyUsed = true;
break;
case DominantResourceFairnessPolicy.NAME:
// DRF is not supported on a queue level,
// it has to be global
drfPolicyUsedOnQueueLevel = true;
break;
default:
throw new ConversionException("Unexpected ordering policy " +
"on queue " + queueName + ": " + policy);
}
}
/**
* weight + minResources
* ==> yarn.scheduler.capacity.&lt;queue-name&gt;.capacity.
* @param queue
*/
private void emitChildCapacity(FSQueue queue) {
List<FSQueue> children = queue.getChildQueues();
int totalWeight = getTotalWeight(children);
Map<String, Capacity> capacities = getCapacities(totalWeight, children);
capacities
.forEach((key, value) -> capacitySchedulerConfig.set(PREFIX + key +
".capacity", value.toString()));
}
/**
* Missing feature, "leaf-queue-template.capacity" only accepts a single
* pct value.
* @param queue
*/
private void checkMaxChildCapacitySetting(FSQueue queue) {
if (queue.getMaxChildQueueResource() != null) {
Resource resource = queue.getMaxChildQueueResource().getResource();
if ((resource != null && isNotUnboundedResource(resource))
|| queue.getMaxChildQueueResource().getPercentages() != null) {
// Maximum child resource is defined
ruleHandler.handleMaxChildCapacity();
}
}
}
private Map<String, Capacity> getCapacities(int totalWeight,
List<FSQueue> children) {
final BigDecimal hundred = new BigDecimal(100).setScale(3);
if (children.size() == 0) {
return new HashMap<>();
} else if (children.size() == 1) {
Map<String, Capacity> capacity = new HashMap<>();
String queueName = children.get(0).getName();
capacity.put(queueName, Capacity.newCapacity(hundred));
return capacity;
} else {
Map<String, Capacity> capacities = new HashMap<>();
Map<String, BigDecimal> bdCapacities = new HashMap<>();
MutableBoolean needVerifySum = new MutableBoolean(true);
children
.stream()
.forEach(queue -> {
BigDecimal total = new BigDecimal(totalWeight);
BigDecimal weight = new BigDecimal(queue.getWeight());
BigDecimal pct = weight
.setScale(5)
.divide(total, RoundingMode.HALF_UP)
.multiply(hundred)
.setScale(3);
// <minResources> defined?
if (Resources.none().compareTo(queue.getMinShare()) != 0) {
needVerifySum.setFalse();
/* TODO: Needs discussion.
*
* Probably it's not entirely correct this way!
* Eg. root.queue1 in FS translates to 33%
* capacity, but minResources is defined as 1vcore,8GB
* which is less than 33%.
*
* Therefore max(calculatedCapacity, minResource) is
* more sound.
*/
Resource minShare = queue.getMinShare();
// TODO: in Phase-2, we have to deal with other resources as well
String capacity = String.format("[memory=%d,vcores=%d]",
minShare.getMemorySize(), minShare.getVirtualCores());
capacities.put(queue.getName(), Capacity.newCapacity(capacity));
} else {
capacities.put(queue.getName(), Capacity.newCapacity(pct));
bdCapacities.put(queue.getName(), pct);
}
});
if (needVerifySum.isTrue()) {
BigDecimal totalPct = new BigDecimal(0);
for (Map.Entry<String, BigDecimal> entry : bdCapacities.entrySet()) {
totalPct = totalPct.add(entry.getValue());
}
// fix last value if total != 100.000
if (!totalPct.equals(hundred)) {
BigDecimal tmp = new BigDecimal(0);
for (int i = 0; i < children.size() - 2; i++) {
tmp = tmp.add(bdCapacities.get(children.get(i).getQueueName()));
}
String lastQueue = children.get(children.size() - 1).getName();
BigDecimal corrected = hundred.subtract(tmp);
capacities.put(lastQueue, Capacity.newCapacity(corrected));
}
}
return capacities;
}
}
private int getTotalWeight(List<FSQueue> children) {
double sum = children
.stream()
.mapToDouble(c -> c.getWeight())
.sum();
return (int) sum;
}
private String getQueueShortName(String queueName) {
int lastDot = queueName.lastIndexOf(".");
return queueName.substring(lastDot + 1);
}
private boolean isNotUnboundedResource(Resource res) {
return Resources.unbounded().compareTo(res) != 0;
}
/*
* Represents a queue capacity in either percentage
* or in absolute resources
*/
private static class Capacity {
private BigDecimal percentage;
private String absoluteResource;
public static Capacity newCapacity(BigDecimal pct) {
Capacity capacity = new Capacity();
capacity.percentage = pct;
capacity.absoluteResource = null;
return capacity;
}
public static Capacity newCapacity(String absoluteResource) {
Capacity capacity = new Capacity();
capacity.percentage = null;
capacity.absoluteResource = absoluteResource;
return capacity;
}
@Override
public String toString() {
if (percentage != null) {
return percentage.toString();
} else {
return absoluteResource;
}
}
}
}

View File

@ -0,0 +1,159 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
/**
* Converts a Fair Scheduler site configuration to Capacity Scheduler
* site configuration.
*
*/
public class FSYarnSiteConverter {
private boolean preemptionEnabled;
private boolean autoCreateChildQueues;
private boolean sizeBasedWeight;
private boolean userAsDefaultQueue;
@SuppressWarnings({"deprecation", "checkstyle:linelength"})
public void convertSiteProperties(Configuration conf,
Configuration yarnSiteConfig) {
yarnSiteConfig.set(YarnConfiguration.RM_SCHEDULER,
CapacityScheduler.class.getCanonicalName());
// TODO: deprecated property, check if necessary
if (conf.getBoolean(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED,
FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_ENABLED)) {
yarnSiteConfig.setBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, true);
int interval = conf.getInt(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS,
FairSchedulerConfiguration.DEFAULT_CONTINUOUS_SCHEDULING_SLEEP_MS);
yarnSiteConfig.setInt(PREFIX +
"schedule-asynchronously.scheduling-interval-ms", interval);
}
String mbIncrementAllocation =
conf.get("yarn.resource-types.memory-mb.increment-allocation");
if (mbIncrementAllocation != null) {
yarnSiteConfig.set("yarn.scheduler.minimum-allocation-mb",
mbIncrementAllocation);
}
String vcoreIncrementAllocation =
conf.get("yarn.resource-types.vcores.increment-allocation");
if (vcoreIncrementAllocation != null) {
yarnSiteConfig.set("yarn.scheduler.minimum-allocation-vcores",
vcoreIncrementAllocation);
}
if (conf.getBoolean(FairSchedulerConfiguration.PREEMPTION,
FairSchedulerConfiguration.DEFAULT_PREEMPTION)) {
yarnSiteConfig.setBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS, true);
preemptionEnabled = true;
int waitTimeBeforeKill = conf.getInt(
FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL,
FairSchedulerConfiguration.DEFAULT_WAIT_TIME_BEFORE_KILL);
yarnSiteConfig.setInt(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
waitTimeBeforeKill);
long waitBeforeNextStarvationCheck = conf.getLong(
FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
FairSchedulerConfiguration.DEFAULT_WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS);
yarnSiteConfig.setLong(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
waitBeforeNextStarvationCheck);
}
if (conf.getBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE,
FairSchedulerConfiguration.DEFAULT_ASSIGN_MULTIPLE)) {
yarnSiteConfig.setBoolean(
CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, true);
} else {
yarnSiteConfig.setBoolean(
CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED, false);
}
int maxAssign = conf.getInt(FairSchedulerConfiguration.MAX_ASSIGN,
FairSchedulerConfiguration.DEFAULT_MAX_ASSIGN);
if (maxAssign != FairSchedulerConfiguration.DEFAULT_MAX_ASSIGN) {
yarnSiteConfig.setInt(
CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT,
maxAssign);
}
float localityThresholdNode = conf.getFloat(
FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE,
FairSchedulerConfiguration.DEFAULT_LOCALITY_THRESHOLD_NODE);
if (localityThresholdNode !=
FairSchedulerConfiguration.DEFAULT_LOCALITY_THRESHOLD_NODE) {
yarnSiteConfig.setFloat(CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY,
localityThresholdNode);
}
float localityThresholdRack = conf.getFloat(
FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK,
FairSchedulerConfiguration.DEFAULT_LOCALITY_THRESHOLD_RACK);
if (localityThresholdRack !=
FairSchedulerConfiguration.DEFAULT_LOCALITY_THRESHOLD_RACK) {
yarnSiteConfig.setFloat(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY,
localityThresholdRack);
}
if (conf.getBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS)) {
autoCreateChildQueues = true;
}
if (conf.getBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT,
FairSchedulerConfiguration.DEFAULT_SIZE_BASED_WEIGHT)) {
sizeBasedWeight = true;
}
if (conf.getBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE)) {
userAsDefaultQueue = true;
}
}
public boolean isPreemptionEnabled() {
return preemptionEnabled;
}
public boolean isAutoCreateChildQueues() {
return autoCreateChildQueues;
}
public boolean isSizeBasedWeight() {
return sizeBasedWeight;
}
public boolean isUserAsDefaultQueue() {
return userAsDefaultQueue;
}
}

View File

@ -0,0 +1,36 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.commons.cli.MissingArgumentException;
/**
* Indicates that some preconditions were not met
* before FS->CS conversion.
*
*/
public class PreconditionException extends RuntimeException {
private static final long serialVersionUID = 7976747724949372164L;
public PreconditionException(String message) {
super(message);
}
public PreconditionException(String message, MissingArgumentException ex) {
super(message, ex);
}
}

View File

@ -0,0 +1,118 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import java.util.HashMap;
import java.util.Map;
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PrimaryGroupPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
class QueuePlacementConverter {
private static final String USER = "%user";
private static final String PRIMARY_GROUP = "%primary_group";
private static final String SECONDARY_GROUP = "%secondary_group";
Map<String, String> convertPlacementPolicy(PlacementManager placementManager,
FSConfigToCSConfigRuleHandler ruleHandler, boolean userAsDefaultQueue) {
StringBuilder mapping = new StringBuilder();
Map<String, String> properties = new HashMap<>();
if (userAsDefaultQueue) {
mapping.append("u:" + USER + ":" + USER);
}
int ruleCount = 0;
for (PlacementRule rule : placementManager.getPlacementRules()) {
if (((FSPlacementRule)rule).getCreateFlag()) {
ruleHandler.handleQueueAutoCreate(rule.getName());
}
ruleCount++;
if (rule instanceof UserPlacementRule) {
UserPlacementRule userRule = (UserPlacementRule) rule;
if (mapping.length() > 0) {
mapping.append(";");
}
// nested rule
if (userRule.getParentRule() != null) {
handleNestedRule(mapping, userRule);
} else {
if (!userAsDefaultQueue) {
mapping.append("u:" + USER + ":" + USER);
}
}
} else if (rule instanceof SpecifiedPlacementRule) {
if (ruleCount > 1) {
ruleHandler.handleSpecifiedNotFirstRule();
}
properties.put(
"yarn.scheduler.capacity.queue-mappings-override.enable", "false");
} else if (rule instanceof PrimaryGroupPlacementRule) {
if (mapping.length() > 0) {
mapping.append(";");
}
mapping.append("u:" + USER + ":" + PRIMARY_GROUP);
} else if (rule instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
if (mapping.length() > 0) {
mapping.append(";");
}
mapping.append("u:" + USER + ":").append(defaultRule.defaultQueueName);
} else if (rule instanceof SecondaryGroupExistingPlacementRule) {
// TODO: wait for YARN-9840
mapping.append("u:" + USER + ":" + SECONDARY_GROUP);
} else {
throw new IllegalArgumentException("Unknown placement rule: " + rule);
}
}
if (mapping.length() > 0) {
properties.put("yarn.scheduler.capacity.queue-mappings",
mapping.toString());
}
return properties;
}
private void handleNestedRule(StringBuilder mapping,
UserPlacementRule userRule) {
PlacementRule pr = userRule.getParentRule();
if (pr instanceof PrimaryGroupPlacementRule) {
// TODO: wait for YARN-9841
mapping.append("u:" + USER + ":" + PRIMARY_GROUP + "." + USER);
} else if (pr instanceof SecondaryGroupExistingPlacementRule) {
// TODO: wait for YARN-9865
mapping.append("u:" + USER + ":" + SECONDARY_GROUP + "." + USER);
} else if (pr instanceof DefaultPlacementRule) {
DefaultPlacementRule defaultRule = (DefaultPlacementRule) pr;
mapping.append("u:" + USER + ":")
.append(defaultRule.defaultQueueName)
.append("." + USER);
} else {
throw new UnsupportedOperationException("Unsupported nested rule: "
+ pr.getClass().getCanonicalName());
}
}
}

View File

@ -0,0 +1,29 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
/**
* Thrown by the FS->CS converter if it encounters an
* unsupported property.
*/
public class UnsupportedPropertyException extends RuntimeException {
private static final long serialVersionUID = 5468104026818355871L;
public UnsupportedPropertyException(String message) {
super(message);
}
}

View File

@ -0,0 +1,24 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
/**
* This package contains classes related to the Fair Scheduler ->
* Capacity Scheduler conversion.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;

View File

@ -18,17 +18,8 @@
package org.apache.hadoop.yarn.server.resourcemanager;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeysPublic;
import org.apache.hadoop.http.lib.StaticUserWebFilter;
import org.apache.hadoop.net.NetworkTopology;
import org.apache.hadoop.security.AuthenticationFilterInitializer;
@ -47,6 +38,9 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigConverter;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigConverterParams;
import org.apache.hadoop.yarn.server.security.http.RMAuthenticationFilterInitializer;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
@ -55,29 +49,51 @@
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.mockito.ArgumentCaptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Collection;
import java.util.concurrent.TimeoutException;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.CONVERSION_RULES_FILE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.FS_ALLOC_FILE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.OUTPUT_DIR;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.YARN_SITE_XML;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigConverterTestCommons.setupFSConfigConversionFiles;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.fail;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.verify;
public class TestResourceManager {
private static final Logger LOG =
LoggerFactory.getLogger(TestResourceManager.class);
private ResourceManager resourceManager = null;
@Rule
public ExpectedException thrown = ExpectedException.none();
private FSConfigConverterTestCommons converterTestCommons;
@Before
public void setUp() throws Exception {
Configuration conf = new YarnConfiguration();
YarnConfiguration conf = new YarnConfiguration();
UserGroupInformation.setConfiguration(conf);
resourceManager = new ResourceManager();
resourceManager.init(conf);
resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey();
resourceManager.getRMContext().getNMTokenSecretManager().rollMasterKey();
converterTestCommons = new FSConfigConverterTestCommons();
converterTestCommons.setUp();
}
@After
public void tearDown() throws Exception {
resourceManager.stop();
converterTestCommons.tearDown();
}
private org.apache.hadoop.yarn.server.resourcemanager.NodeManager
@ -357,4 +373,70 @@ public void testUserProvidedUGIConf() throws Exception {
}
}
/**
* Example command: <br>
* opt/hadoop/bin/yarn resourcemanager -convert-fs-configuration<br>
* -o /tmp/output<br>
* -y /opt/hadoop/etc/hadoop/yarn-site.xml<br>
* -f /opt/hadoop/etc/hadoop/fair-scheduler.xml<br>
* -r /home/systest/sample-rules-config.properties<br>
*/
@Test
@SuppressWarnings("checkstyle:javadocstyle")
public void testResourceManagerConvertFSConfigurationDefaults()
throws Exception {
setupFSConfigConversionFiles();
ArgumentCaptor<FSConfigToCSConfigConverterParams> conversionParams =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
final String mainSwitch = "-convert-fs-configuration";
FSConfigToCSConfigConverter mockConverter =
mock(FSConfigToCSConfigConverter.class);
ResourceManager.initFSArgumentHandler(mockConverter);
ResourceManager.main(new String[] {mainSwitch, "-o", OUTPUT_DIR,
"-y", YARN_SITE_XML, "-f", FS_ALLOC_FILE, "-r",
CONVERSION_RULES_FILE});
// validate params
verify(mockConverter).convert(conversionParams.capture());
FSConfigToCSConfigConverterParams params = conversionParams.getValue();
LOG.info("FS config converter parameters: " + params);
assertThat(params.getYarnSiteXmlConfig()).isEqualTo(YARN_SITE_XML);
assertThat(params.getFairSchedulerXmlConfig()).isEqualTo(FS_ALLOC_FILE);
assertThat(params.getConversionRulesConfig())
.isEqualTo(CONVERSION_RULES_FILE);
assertThat(params.isConsole()).isEqualTo(false);
}
@Test
public void testResourceManagerConvertFSConfigurationWithConsoleParam()
throws Exception {
setupFSConfigConversionFiles();
ArgumentCaptor<FSConfigToCSConfigConverterParams> conversionParams =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
final String mainSwitch = "-convert-fs-configuration";
FSConfigToCSConfigConverter mockConverter =
mock(FSConfigToCSConfigConverter.class);
ResourceManager.initFSArgumentHandler(mockConverter);
ResourceManager.main(new String[] {mainSwitch, "-o", OUTPUT_DIR,
"-p", "-y", YARN_SITE_XML, "-f", FS_ALLOC_FILE, "-r",
CONVERSION_RULES_FILE});
// validate params
verify(mockConverter).convert(conversionParams.capture());
FSConfigToCSConfigConverterParams params = conversionParams.getValue();
LOG.info("FS config converter parameters: " + params);
assertThat(params.getYarnSiteXmlConfig()).isEqualTo(YARN_SITE_XML);
assertThat(params.getFairSchedulerXmlConfig()).isEqualTo(FS_ALLOC_FILE);
assertThat(params.getConversionRulesConfig())
.isEqualTo(CONVERSION_RULES_FILE);
assertThat(params.isConsole()).isEqualTo(true);
}
}

View File

@ -0,0 +1,181 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.FileWriter;
import java.io.IOException;
import java.io.PrintStream;
import java.io.PrintWriter;
import static org.junit.Assert.assertTrue;
/**
* Helper methods for FS->CS converter testing.
*
*/
public class FSConfigConverterTestCommons {
private final static String TEST_DIR =
new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath();
public final static String FS_ALLOC_FILE =
new File(TEST_DIR, "test-fair-scheduler.xml").getAbsolutePath();
public final static String YARN_SITE_XML =
new File(TEST_DIR, "test-yarn-site.xml").getAbsolutePath();
public final static String CONVERSION_RULES_FILE =
new File(TEST_DIR, "test-conversion-rules.properties").getAbsolutePath();
public final static String OUTPUT_DIR =
new File(TEST_DIR, "conversion-output").getAbsolutePath();
private final ByteArrayOutputStream outContent = new ByteArrayOutputStream();
private final ByteArrayOutputStream errContent = new ByteArrayOutputStream();
private PrintStream originalOutStream;
private PrintStream originalErrStream;
public FSConfigConverterTestCommons() {
saveOriginalStreams();
replaceStreamsWithByteArrays();
}
public void setUp() throws IOException {
File d = new File(TEST_DIR, "conversion-output");
if (d.exists()) {
FileUtils.deleteDirectory(d);
}
boolean success = d.mkdirs();
assertTrue("Can't create directory: " + d.getAbsolutePath(), success);
}
public void tearDown() {
deleteTestFiles();
restoreStreams();
}
private void saveOriginalStreams() {
originalOutStream = System.out;
originalErrStream = System.err;
}
private void replaceStreamsWithByteArrays() {
System.setOut(new PrintStream(outContent));
System.setErr(new PrintStream(errContent));
}
private void restoreStreams() {
System.setOut(originalOutStream);
System.setErr(originalErrStream);
}
ByteArrayOutputStream getErrContent() {
return errContent;
}
private void deleteTestFiles() {
//Files may not be created so we are not strict here!
deleteFile(FS_ALLOC_FILE, false);
deleteFile(YARN_SITE_XML, false);
deleteFile(CONVERSION_RULES_FILE, false);
deleteFile(OUTPUT_DIR, false);
}
private static void deleteFile(String f, boolean strict) {
boolean delete = new File(f).delete();
if (strict && !delete) {
throw new RuntimeException("Can't delete test file: " + f);
}
}
public static void setupFSConfigConversionFiles() throws IOException {
configureFairSchedulerXml();
configureYarnSiteXmlWithFsAllocFileDefined();
configureDummyConversionRulesFile();
}
@SuppressWarnings("checkstyle:linelength")
public static void configureFairSchedulerXml() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations>");
out.println("<queueMaxAMShareDefault>-1.0</queueMaxAMShareDefault>");
out.println("<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>");
addQueue(out, "");
out.println("</allocations>");
out.close();
}
@SuppressWarnings("checkstyle:linelength")
private static void addQueue(PrintWriter out, String additionalConfig) {
out.println("<queue name=\"root\">");
out.println(" <schedulingPolicy>fair</schedulingPolicy>");
out.println(" <weight>1.0</weight>");
out.println(" <fairSharePreemptionTimeout>100</fairSharePreemptionTimeout>");
out.println(" <minSharePreemptionTimeout>120</minSharePreemptionTimeout>");
out.println(" <fairSharePreemptionThreshold>.5</fairSharePreemptionThreshold>");
if (StringUtils.isNotEmpty(additionalConfig)) {
out.println(additionalConfig);
}
out.println("</queue>");
}
public static void configureEmptyFairSchedulerXml() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(FS_ALLOC_FILE));
out.println("<?xml version=\"1.0\"?>");
out.println("<allocations></allocations>");
out.close();
}
public static void configureYarnSiteXmlWithFsAllocFileDefined()
throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(YARN_SITE_XML));
out.println("<?xml version=\"1.0\"?>");
out.println("<property>");
out.println("<name>" + FairSchedulerConfiguration.ALLOCATION_FILE +
"</name>");
out.println("<value>" + FS_ALLOC_FILE + "</value>");
out.println("</property>");
out.close();
}
public static void configureEmptyYarnSiteXml() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(YARN_SITE_XML));
out.println("<?xml version=\"1.0\"?>");
out.println("<property></property>");
out.close();
}
public static void configureDummyConversionRulesFile() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(CONVERSION_RULES_FILE));
out.println("dummy_key=dummy_value");
out.close();
}
public static void configureInvalidConversionRulesFile() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(CONVERSION_RULES_FILE));
out.println("bla");
out.close();
}
public static void configureEmptyConversionRulesFile() throws IOException {
PrintWriter out = new PrintWriter(new FileWriter(CONVERSION_RULES_FILE));
out.close();
}
}

View File

@ -0,0 +1,379 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import com.google.common.collect.Lists;
import org.apache.commons.cli.MissingOptionException;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.ArgumentCaptor;
import org.mockito.ArgumentMatchers;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for FSConfigToCSConfigArgumentHandler.
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TestFSConfigToCSConfigArgumentHandler {
private static final Logger LOG =
LoggerFactory.getLogger(TestFSConfigToCSConfigArgumentHandler.class);
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Mock
private FSConfigToCSConfigConverter mockConverter;
private FSConfigConverterTestCommons fsTestCommons;
@Before
public void setUp() throws IOException {
fsTestCommons = new FSConfigConverterTestCommons();
fsTestCommons.setUp();
}
@After
public void tearDown() {
fsTestCommons.tearDown();
}
private void setupFSConfigConversionFiles(boolean defineAllocationFile)
throws IOException {
FSConfigConverterTestCommons.configureFairSchedulerXml();
if (defineAllocationFile) {
FSConfigConverterTestCommons.configureYarnSiteXmlWithFsAllocFileDefined();
} else {
FSConfigConverterTestCommons.configureEmptyYarnSiteXml();
}
FSConfigConverterTestCommons.configureDummyConversionRulesFile();
}
private FSConfigToCSConfigArgumentHandler createArgumentHandler() {
return new FSConfigToCSConfigArgumentHandler(mockConverter);
}
private static String[] getDefaultArgumentsAsArray() {
List<String> args = getDefaultArguments();
return args.toArray(new String[0]);
}
private static List<String> getDefaultArguments() {
return Lists.newArrayList("-y", FSConfigConverterTestCommons.YARN_SITE_XML,
"-o", FSConfigConverterTestCommons.OUTPUT_DIR);
}
private String[] getArgumentsAsArrayWithDefaults(String... args) {
List<String> result = getDefaultArguments();
result.addAll(Arrays.asList(args));
return result.toArray(new String[0]);
}
private String[] getArgumentsAsArray(String... args) {
List<String> result = Lists.newArrayList();
result.addAll(Arrays.asList(args));
return result.toArray(new String[0]);
}
@Test
public void testMissingYarnSiteXmlArgument() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = new String[] {"-o",
FSConfigConverterTestCommons.OUTPUT_DIR};
expectedException.expect(MissingOptionException.class);
expectedException.expectMessage("Missing required option: y");
argumentHandler.parseAndConvert(args);
}
@Test
public void testMissingFairSchedulerXmlArgument() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
argumentHandler.parseAndConvert(getDefaultArgumentsAsArray());
}
@Test
public void testMissingOutputDirArgument() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = new String[] {"-y",
FSConfigConverterTestCommons.YARN_SITE_XML};
expectedException.expect(MissingOptionException.class);
expectedException.expectMessage("Missing required option: o");
argumentHandler.parseAndConvert(args);
}
@Test
public void testMissingRulesConfiguration() throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
argumentHandler.parseAndConvert(getDefaultArgumentsAsArray());
}
@Test
public void testInvalidRulesConfigFile() throws Exception {
FSConfigConverterTestCommons.configureYarnSiteXmlWithFsAllocFileDefined();
FSConfigConverterTestCommons.configureFairSchedulerXml();
FSConfigConverterTestCommons.configureInvalidConversionRulesFile();
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = getArgumentsAsArrayWithDefaults();
argumentHandler.parseAndConvert(args);
}
@Test
public void testInvalidOutputDir() throws Exception {
FSConfigConverterTestCommons.configureYarnSiteXmlWithFsAllocFileDefined();
FSConfigConverterTestCommons.configureFairSchedulerXml();
FSConfigConverterTestCommons.configureDummyConversionRulesFile();
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = getArgumentsAsArray("-y",
FSConfigConverterTestCommons.YARN_SITE_XML, "-o",
FSConfigConverterTestCommons.YARN_SITE_XML);
argumentHandler.parseAndConvert(args);
System.out.println(fsTestCommons.getErrContent());
assertTrue("Error content missing", fsTestCommons.getErrContent()
.toString()
.contains("Cannot start FS config conversion due to the following " +
"precondition error"));
}
@Test
public void testFairSchedulerXmlIsNotDefinedIfItsDefinedInYarnSiteXml()
throws Exception {
setupFSConfigConversionFiles(true);
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
argumentHandler.parseAndConvert(getDefaultArgumentsAsArray());
}
@Test
public void testEmptyYarnSiteXmlSpecified() throws Exception {
FSConfigConverterTestCommons.configureFairSchedulerXml();
FSConfigConverterTestCommons.configureEmptyYarnSiteXml();
FSConfigConverterTestCommons.configureDummyConversionRulesFile();
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE);
argumentHandler.parseAndConvert(args);
}
@Test
public void testEmptyFairSchedulerXmlSpecified() throws Exception {
FSConfigConverterTestCommons.configureEmptyFairSchedulerXml();
FSConfigConverterTestCommons.configureEmptyYarnSiteXml();
FSConfigConverterTestCommons.configureDummyConversionRulesFile();
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE);
argumentHandler.parseAndConvert(args);
}
@Test
public void testEmptyRulesConfigurationSpecified() throws Exception {
FSConfigConverterTestCommons.configureEmptyFairSchedulerXml();
FSConfigConverterTestCommons.configureEmptyYarnSiteXml();
FSConfigConverterTestCommons.configureEmptyConversionRulesFile();
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE);
argumentHandler.parseAndConvert(args);
}
@Test
public void testConvertFSConfigurationDefaults() throws Exception {
setupFSConfigConversionFiles(true);
ArgumentCaptor<FSConfigToCSConfigConverterParams> conversionParams =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(mockConverter);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE);
argumentHandler.parseAndConvert(args);
// validate params
Mockito.verify(mockConverter).convert(conversionParams.capture());
FSConfigToCSConfigConverterParams params = conversionParams.getValue();
LOG.info("FS config converter parameters: " + params);
assertEquals("Yarn site config",
FSConfigConverterTestCommons.YARN_SITE_XML,
params.getYarnSiteXmlConfig());
assertEquals("FS xml", FSConfigConverterTestCommons.FS_ALLOC_FILE,
params.getFairSchedulerXmlConfig());
assertEquals("Conversion rules config",
FSConfigConverterTestCommons.CONVERSION_RULES_FILE,
params.getConversionRulesConfig());
assertFalse("Console mode", params.isConsole());
}
@Test
public void testConvertFSConfigurationWithConsoleParam()
throws Exception {
setupFSConfigConversionFiles(true);
ArgumentCaptor<FSConfigToCSConfigConverterParams> conversionParams =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(mockConverter);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE, "-p");
argumentHandler.parseAndConvert(args);
// validate params
Mockito.verify(mockConverter).convert(conversionParams.capture());
FSConfigToCSConfigConverterParams params = conversionParams.getValue();
LOG.info("FS config converter parameters: " + params);
assertEquals("Yarn site config",
FSConfigConverterTestCommons.YARN_SITE_XML,
params.getYarnSiteXmlConfig());
assertEquals("FS xml", FSConfigConverterTestCommons.FS_ALLOC_FILE,
params.getFairSchedulerXmlConfig());
assertEquals("Conversion rules config",
FSConfigConverterTestCommons.CONVERSION_RULES_FILE,
params.getConversionRulesConfig());
assertTrue("Console mode", params.isConsole());
}
@Test
public void testConvertFSConfigurationClusterResource()
throws Exception {
setupFSConfigConversionFiles(true);
ArgumentCaptor<FSConfigToCSConfigConverterParams> conversionParams =
ArgumentCaptor.forClass(FSConfigToCSConfigConverterParams.class);
FSConfigToCSConfigArgumentHandler argumentHandler =
new FSConfigToCSConfigArgumentHandler(mockConverter);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE,
"-p", "-c", "vcores=20, memory-mb=240");
argumentHandler.parseAndConvert(args);
// validate params
Mockito.verify(mockConverter).convert(conversionParams.capture());
FSConfigToCSConfigConverterParams params = conversionParams.getValue();
LOG.info("FS config converter parameters: " + params);
assertEquals("Yarn site config",
FSConfigConverterTestCommons.YARN_SITE_XML,
params.getYarnSiteXmlConfig());
assertEquals("FS xml",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
params.getFairSchedulerXmlConfig());
assertEquals("Conversion rules config",
FSConfigConverterTestCommons.CONVERSION_RULES_FILE,
params.getConversionRulesConfig());
assertEquals("Cluster resource", "vcores=20, memory-mb=240",
params.getClusterResource());
assertTrue("Console mode", params.isConsole());
}
@Test
public void testConvertFSConfigurationErrorHandling() throws Exception {
setupFSConfigConversionFiles(true);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE, "-p");
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
Mockito.doThrow(UnsupportedPropertyException.class)
.when(mockConverter)
.convert(ArgumentMatchers.any(FSConfigToCSConfigConverterParams.class));
argumentHandler.parseAndConvert(args);
assertTrue("Error content missing", fsTestCommons.getErrContent()
.toString().contains("Unsupported property/setting encountered"));
}
@Test
public void testConvertFSConfigurationErrorHandling2() throws Exception {
setupFSConfigConversionFiles(true);
String[] args = getArgumentsAsArrayWithDefaults("-f",
FSConfigConverterTestCommons.FS_ALLOC_FILE,
"-r", FSConfigConverterTestCommons.CONVERSION_RULES_FILE, "-p");
FSConfigToCSConfigArgumentHandler argumentHandler =
createArgumentHandler();
Mockito.doThrow(ConversionException.class).when(mockConverter)
.convert(ArgumentMatchers.any(FSConfigToCSConfigConverterParams.class));
argumentHandler.parseAndConvert(args);
assertTrue("Error content missing", fsTestCommons.getErrContent()
.toString().contains("Fatal error during FS config conversion"));
}
}

View File

@ -0,0 +1,460 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.DYNAMIC_MAX_ASSIGN;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CAPACITY_PERCENTAGE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.QUEUE_AUTO_CREATE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RESERVATION_SYSTEM;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.ABORT;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.util.Map;
import org.apache.commons.io.FileUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
/**
* Unit tests for FSConfigToCSConfigConverter.
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TestFSConfigToCSConfigConverter {
private static final Resource CLUSTER_RESOURCE =
Resource.newInstance(16384, 16);
private static final String FILE_PREFIX = "file:";
private static final String FAIR_SCHEDULER_XML =
prepareFileName("fair-scheduler-conversion.xml");
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
private FSConfigToCSConfigConverter converter;
private Configuration config;
private ByteArrayOutputStream csConfigOut;
@Rule
public ExpectedException expectedException = ExpectedException.none();
private FSConfigConverterTestCommons converterTestCommons;
private static String prepareFileName(String f) {
return FILE_PREFIX + new File("src/test/resources/" + f).getAbsolutePath();
}
private static final String FAIR_SCHEDULER_XML_INVALID =
prepareFileName("fair-scheduler-invalid.xml");
private static final String YARN_SITE_XML =
prepareFileName("yarn-site-with-allocation-file-ref.xml");
private static final String YARN_SITE_XML_NO_REF_TO_FS_XML =
prepareFileName("yarn-site.xml");
private static final String YARN_SITE_XML_INVALID =
prepareFileName("yarn-site-with-invalid-allocation-file-ref.xml");
private static final String CONVERSION_RULES_FILE =
new File("src/test/resources/conversion-rules.properties")
.getAbsolutePath();
@Before
public void setup() throws IOException {
config = new Configuration(false);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
createConverter();
converterTestCommons = new FSConfigConverterTestCommons();
converterTestCommons.setUp();
}
@After
public void tearDown() {
converterTestCommons.tearDown();
}
private void createConverter() {
converter = new FSConfigToCSConfigConverter(ruleHandler);
converter.setClusterResource(CLUSTER_RESOURCE);
ByteArrayOutputStream yarnSiteOut = new ByteArrayOutputStream();
csConfigOut = new ByteArrayOutputStream();
converter.setCapacitySchedulerConfigOutputStream(csConfigOut);
converter.setYarnSiteOutputStream(yarnSiteOut);
}
private FSConfigToCSConfigConverterParams.Builder
createDefaultParamsBuilder() {
return FSConfigToCSConfigConverterParams.Builder.create()
.withYarnSiteXmlConfig(YARN_SITE_XML)
.withOutputDirectory(FSConfigConverterTestCommons.OUTPUT_DIR);
}
private FSConfigToCSConfigConverterParams.Builder
createParamsBuilder(String yarnSiteConfig) {
return FSConfigToCSConfigConverterParams.Builder.create()
.withYarnSiteXmlConfig(yarnSiteConfig)
.withOutputDirectory(FSConfigConverterTestCommons.OUTPUT_DIR);
}
@Test
public void testDefaultMaxApplications() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
int maxApps =
conf.getInt(
CapacitySchedulerConfiguration.MAXIMUM_SYSTEM_APPLICATIONS, -1);
assertEquals("Default max apps", 15, maxApps);
}
@Test
public void testDefaultMaxAMShare() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
String maxAmShare =
conf.get(CapacitySchedulerConfiguration.
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT);
assertEquals("Default max AM share", "0.16", maxAmShare);
}
@Test
public void testConvertACLs() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
// root
assertEquals("root submit ACL", "alice,bob,joe,john hadoop_users",
conf.get(PREFIX + "root.acl_submit_applications"));
assertEquals("root admin ACL", "alice,bob,joe,john hadoop_users",
conf.get(PREFIX + "root.acl_administer_queue"));
// root.admins.bob
assertEquals("root.admins.bob submit ACL", "bob ",
conf.get(PREFIX + "root.admins.bob.acl_submit_applications"));
assertEquals("root.admins.bob admin ACL", "bob ",
conf.get(PREFIX + "root.admins.bob.acl_administer_queue"));
// root.admins.alice
assertEquals("root.admins.alice submit ACL", "alice ",
conf.get(PREFIX + "root.admins.alice.acl_submit_applications"));
assertEquals("root.admins.alice admin ACL", "alice ",
conf.get(PREFIX + "root.admins.alice.acl_administer_queue"));
// root.users.john
assertEquals("root.users.john submit ACL", "john ",
conf.get(PREFIX + "root.users.john.acl_submit_applications"));
assertEquals("root.users.john admin ACL", "john ",
conf.get(PREFIX + "root.users.john.acl_administer_queue"));
// root.users.joe
assertEquals("root.users.joe submit ACL", "joe ",
conf.get(PREFIX + "root.users.joe.acl_submit_applications"));
assertEquals("root.users.joe admin ACL", "joe ",
conf.get(PREFIX + "root.users.joe.acl_administer_queue"));
}
@Test
public void testDefaultMaxRunningApps() throws Exception {
converter.convert(config);
Configuration conf = getConvertedCSConfig();
// default setting
assertEquals("Default max apps", 15,
conf.getInt(PREFIX + "maximum-applications", -1));
}
@Test
public void testMixedQueueOrderingPolicy() throws Exception {
expectedException.expect(ConversionException.class);
expectedException.expectMessage(
"DRF ordering policy cannot be used together with fifo/fair");
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy-mixed.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
converter.convert(config);
}
@Test
public void testQueueMaxChildCapacityNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("test");
Mockito.doThrow(new UnsupportedPropertyException("test"))
.when(ruleHandler).handleMaxChildCapacity();
converter.convert(config);
}
@Test
public void testReservationSystemNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("maxCapacity");
Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
config.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
converter.convert(config);
}
@Test
public void testUserMaxAppsNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("userMaxApps");
Mockito.doThrow(new UnsupportedPropertyException("userMaxApps"))
.when(ruleHandler).handleUserMaxApps();
converter.convert(config);
}
@Test
public void testUserMaxAppsDefaultNotSupported() throws Exception {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("userMaxAppsDefault");
Mockito.doThrow(new UnsupportedPropertyException("userMaxAppsDefault"))
.when(ruleHandler).handleUserMaxAppsDefault();
converter.convert(config);
}
@Test
public void testConvertFSConfigurationClusterResource() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240")
.build();
converter.convert(params);
assertEquals("Resource", Resource.newInstance(240, 20),
converter.getClusterResource());
}
@Test
public void testConvertFSConfigPctModeUsedAndClusterResourceDefined()
throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240")
.build();
converter.convert(params);
assertEquals("Resource", Resource.newInstance(240, 20),
converter.getClusterResource());
}
@Test
public void testConvertFSConfigPctModeUsedAndClusterResourceNotDefined()
throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.build();
expectedException.expect(ConversionException.class);
expectedException.expectMessage("cluster resource parameter" +
" is not defined via CLI");
converter.convert(params);
}
@Test
public void testConvertFSConfigurationClusterResourceInvalid()
throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240G")
.build();
expectedException.expect(ConversionException.class);
expectedException.expectMessage("Error while parsing resource");
converter.convert(params);
}
@Test
public void testConvertFSConfigurationClusterResourceInvalid2()
throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memmmm=240")
.build();
expectedException.expect(ConversionException.class);
expectedException.expectMessage("Error while parsing resource");
converter.convert(params);
}
@Test
public void testConvertFSConfigurationRulesFile() throws Exception {
ruleHandler = new FSConfigToCSConfigRuleHandler();
createConverter();
FSConfigToCSConfigConverterParams params =
createDefaultParamsBuilder()
.withConversionRulesConfig(CONVERSION_RULES_FILE)
.withClusterResource("vcores=20, memory-mb=2400")
.build();
try {
converter.convert(params);
fail("Should have thrown UnsupportedPropertyException!");
} catch (UnsupportedPropertyException e) {
//need to catch exception so we can check the rules
}
ruleHandler = converter.getRuleHandler();
Map<String, FSConfigToCSConfigRuleHandler.RuleAction> actions =
ruleHandler.getActions();
assertEquals("maxCapacityPercentage",
ABORT, actions.get(MAX_CAPACITY_PERCENTAGE));
assertEquals("maxChildCapacity",
ABORT, actions.get(MAX_CHILD_CAPACITY));
assertEquals("userMaxRunningApps",
ABORT, actions.get(USER_MAX_RUNNING_APPS));
assertEquals("userMaxAppsDefault",
ABORT, actions.get(USER_MAX_APPS_DEFAULT));
assertEquals("dynamicMaxAssign",
ABORT, actions.get(DYNAMIC_MAX_ASSIGN));
assertEquals("specifiedNotFirstRule",
ABORT, actions.get(SPECIFIED_NOT_FIRST));
assertEquals("reservationSystem",
ABORT, actions.get(RESERVATION_SYSTEM));
assertEquals("queueAutoCreate",
ABORT, actions.get(QUEUE_AUTO_CREATE));
}
@Test
public void testConvertFSConfigurationUndefinedYarnSiteConfig()
throws Exception {
FSConfigToCSConfigConverterParams params =
FSConfigToCSConfigConverterParams.Builder.create()
.withYarnSiteXmlConfig(null)
.withOutputDirectory(FSConfigConverterTestCommons.OUTPUT_DIR)
.build();
expectedException.expect(PreconditionException.class);
expectedException.expectMessage(
"yarn-site.xml configuration is not defined");
converter.convert(params);
}
@Test
public void testConvertCheckOutputDir() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240")
.build();
converter.convert(params);
Configuration conf =
getConvertedCSConfig(FSConfigConverterTestCommons.OUTPUT_DIR);
File capacityFile = new File(FSConfigConverterTestCommons.OUTPUT_DIR,
"capacity-scheduler.xml");
assertTrue("Capacity file exists", capacityFile.exists());
assertTrue("Capacity file length > 0", capacityFile.length() > 0);
assertTrue("No. of configuration elements > 0", conf.size() > 0);
File yarnSiteFile = new File(FSConfigConverterTestCommons.OUTPUT_DIR,
"yarn-site.xml");
assertTrue("Yarn site exists", yarnSiteFile.exists());
assertTrue("Yarn site length > 0", yarnSiteFile.length() > 0);
}
@Test
public void testFairSchedulerXmlIsNotDefinedNeitherDirectlyNorInYarnSiteXml()
throws Exception {
FSConfigToCSConfigConverterParams params =
createParamsBuilder(YARN_SITE_XML_NO_REF_TO_FS_XML)
.withClusterResource("vcores=20, memory-mb=240")
.build();
expectedException.expect(PreconditionException.class);
expectedException.expectMessage("fair-scheduler.xml is not defined");
converter.convert(params);
}
@Test
public void testInvalidFairSchedulerXml() throws Exception {
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
.withClusterResource("vcores=20, memory-mb=240")
.withFairSchedulerXmlConfig(FAIR_SCHEDULER_XML_INVALID)
.build();
expectedException.expect(RuntimeException.class);
converter.convert(params);
}
@Test
public void testInvalidYarnSiteXml() throws Exception {
FSConfigToCSConfigConverterParams params =
createParamsBuilder(YARN_SITE_XML_INVALID)
.withClusterResource("vcores=20, memory-mb=240")
.build();
expectedException.expect(RuntimeException.class);
converter.convert(params);
}
private Configuration getConvertedCSConfig() {
ByteArrayInputStream input =
new ByteArrayInputStream(csConfigOut.toByteArray());
assertTrue("CS config output has length of 0!",
csConfigOut.toByteArray().length > 0);
Configuration conf = new Configuration(false);
conf.addResource(input);
return conf;
}
private Configuration getConvertedCSConfig(String dir) throws IOException {
File capacityFile = new File(dir, "capacity-scheduler.xml");
ByteArrayInputStream input =
new ByteArrayInputStream(FileUtils.readFileToByteArray(capacityFile));
Configuration conf = new Configuration(false);
conf.addResource(input);
return conf;
}
}

View File

@ -0,0 +1,144 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.DYNAMIC_MAX_ASSIGN;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CAPACITY_PERCENTAGE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_QUEUE_LIMIT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.QUEUE_AUTO_CREATE;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RESERVATION_SYSTEM;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.SPECIFIED_NOT_FIRST;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_APPS_DEFAULT;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.USER_MAX_RUNNING_APPS;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
import java.io.IOException;
import java.util.Properties;
import org.junit.Test;
/**
* Unit tests for FSConfigToCSConfigRuleHandler.
*
*/
public class TestFSConfigToCSConfigRuleHandler {
private static final String ABORT = "abort";
private static final String WARNING = "warning";
private FSConfigToCSConfigRuleHandler ruleHandler;
@Test
public void testInitPropertyActionsToWarning() throws IOException {
ruleHandler = new FSConfigToCSConfigRuleHandler(new Properties());
ruleHandler.handleChildQueueCount("test", 1);
ruleHandler.handleDynamicMaxAssign();
ruleHandler.handleMaxCapacityPercentage("test");
ruleHandler.handleMaxChildCapacity();
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
ruleHandler.handleUserMaxApps();
ruleHandler.handleUserMaxAppsDefault();
}
@Test
public void testAllRulesWarning() throws IOException {
Properties rules = new Properties();
rules.put(DYNAMIC_MAX_ASSIGN, WARNING);
rules.put(MAX_CAPACITY_PERCENTAGE, WARNING);
rules.put(MAX_CHILD_CAPACITY, WARNING);
rules.put(QUEUE_AUTO_CREATE, WARNING);
rules.put(RESERVATION_SYSTEM, WARNING);
rules.put(SPECIFIED_NOT_FIRST, WARNING);
rules.put(USER_MAX_APPS_DEFAULT, WARNING);
rules.put(USER_MAX_RUNNING_APPS, WARNING);
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
ruleHandler.handleDynamicMaxAssign();
ruleHandler.handleMaxCapacityPercentage("test");
ruleHandler.handleMaxChildCapacity();
ruleHandler.handleQueueAutoCreate("test");
ruleHandler.handleReservationSystem();
ruleHandler.handleSpecifiedNotFirstRule();
ruleHandler.handleUserMaxApps();
ruleHandler.handleUserMaxAppsDefault();
}
@Test
public void testAllRulesAbort() throws IOException {
Properties rules = new Properties();
rules.put(DYNAMIC_MAX_ASSIGN, ABORT);
rules.put(MAX_CAPACITY_PERCENTAGE, ABORT);
rules.put(MAX_CHILD_CAPACITY, ABORT);
rules.put(QUEUE_AUTO_CREATE, ABORT);
rules.put(RESERVATION_SYSTEM, ABORT);
rules.put(SPECIFIED_NOT_FIRST, ABORT);
rules.put(USER_MAX_APPS_DEFAULT, ABORT);
rules.put(USER_MAX_RUNNING_APPS, ABORT);
rules.put(MAX_CHILD_QUEUE_LIMIT, "1");
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
expectAbort(() -> ruleHandler.handleChildQueueCount("test", 2),
ConversionException.class);
expectAbort(() -> ruleHandler.handleDynamicMaxAssign());
expectAbort(() -> ruleHandler.handleMaxCapacityPercentage("test"));
expectAbort(() -> ruleHandler.handleMaxChildCapacity());
expectAbort(() -> ruleHandler.handleQueueAutoCreate("test"));
expectAbort(() -> ruleHandler.handleReservationSystem());
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());
expectAbort(() -> ruleHandler.handleUserMaxApps());
expectAbort(() -> ruleHandler.handleUserMaxAppsDefault());
}
@Test(expected = ConversionException.class)
public void testMaxChildQueueCountNotInteger() throws IOException {
Properties rules = new Properties();
rules.put(MAX_CHILD_QUEUE_LIMIT, "abc");
ruleHandler = new FSConfigToCSConfigRuleHandler(rules);
ruleHandler.handleChildQueueCount("test", 1);
}
private void expectAbort(VoidCall call) {
expectAbort(call, UnsupportedPropertyException.class);
}
private void expectAbort(VoidCall call, Class<?> exceptionClass) {
boolean exceptionThrown = false;
Throwable thrown = null;
try {
call.apply();
} catch (Throwable t) {
thrown = t;
exceptionThrown = true;
}
assertTrue("Exception was not thrown", exceptionThrown);
assertEquals("Unexpected exception", exceptionClass, thrown.getClass());
}
@FunctionalInterface
private interface VoidCall {
void apply();
}
}

View File

@ -0,0 +1,427 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import java.io.File;
import java.io.IOException;
import java.util.Set;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.After;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
import org.junit.runner.RunWith;
import org.mockito.Mock;
import org.mockito.Mockito;
import org.mockito.junit.MockitoJUnitRunner;
import com.google.common.collect.Sets;
/**
* Unit tests for FSQueueConverter.
*
*/
@RunWith(MockitoJUnitRunner.class)
public class TestFSQueueConverter {
private static final Resource CLUSTER_RESOURCE =
Resource.newInstance(16384, 16);
private final static Set<String> ALL_QUEUES =
Sets.newHashSet("root",
"root.default",
"root.admins",
"root.users",
"root.admins.alice",
"root.admins.bob",
"root.users.joe",
"root.users.john");
private static final String FILE_PREFIX = "file:";
private static final String FAIR_SCHEDULER_XML =
prepareFileName("fair-scheduler-conversion.xml");
private static String prepareFileName(String f) {
return FILE_PREFIX + new File("src/test/resources/" + f).getAbsolutePath();
}
private FSQueueConverter converter;
private Configuration config;
private Configuration csConfig;
private FairScheduler fs;
private FSQueue rootQueue;
@Mock
private FSConfigToCSConfigRuleHandler ruleHandler;
@Rule
public ExpectedException expectedException = ExpectedException.none();
@Before
public void setup() {
config = new Configuration(false);
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
csConfig = new Configuration(false);
fs = createFairScheduler();
createConverter();
rootQueue = fs.getQueueManager().getRootQueue();
}
@After
public void tearDown() throws IOException {
if (fs != null) {
fs.close();
}
}
private FairScheduler createFairScheduler() {
RMContext ctx = new RMContextImpl();
PlacementManager placementManager = new PlacementManager();
ctx.setQueuePlacementManager(placementManager);
FairScheduler fairScheduler = new FairScheduler();
fairScheduler.setRMContext(ctx);
fairScheduler.init(config);
return fairScheduler;
}
private void createConverter() {
converter = new FSQueueConverter(ruleHandler,
csConfig,
false,
false,
false,
CLUSTER_RESOURCE,
0.16f,
15);
}
@Test
public void testConvertQueueHierarchy() {
converter.convertQueueHierarchy(rootQueue);
// root children
assertEquals("root children", "default,admins,users",
csConfig.get(PREFIX + "root.queues"));
// root.admins children
assertEquals("root.admins children", "bob,alice",
csConfig.get(PREFIX + "root.admins.queues"));
// root.default children - none
assertNull("root.default children", csConfig.get(PREFIX + "root.default" +
".queues"));
// root.users children
assertEquals("root.users children", "john,joe",
csConfig.get(PREFIX + "root.users.queues"));
Set<String> leafs = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root",
"root.default",
"root.admins",
"root.users"));
assertNoValueForQueues(leafs, ".queues", csConfig);
}
@Test
public void testConvertQueueHierarchyWithSameLeafQueues() throws Exception {
expectedException.expect(ConversionException.class);
expectedException.expectMessage("Leaf queues must be unique");
String absolutePath =
new File("src/test/resources/fair-scheduler-sameleafqueue.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
}
@Test
public void testQueueMaxAMShare() {
converter.convertQueueHierarchy(rootQueue);
// root.admins.bob
assertEquals("root.admins.bob AM share", "1.0",
csConfig.get(PREFIX + "root.admins.bob.maximum-am-resource-percent"));
// root.admins.alice
assertEquals("root.admins.alice AM share", "0.15",
csConfig.get(PREFIX +
"root.admins.alice.maximum-am-resource-percent"));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.bob", "root.admins.alice"));
assertNoValueForQueues(remaining, ".maximum-am-resource-percent",
csConfig);
}
@Test
public void testQueueMaxRunningApps() {
converter.convertQueueHierarchy(rootQueue);
assertEquals("root.admins.alice max apps", 2,
csConfig.getInt(PREFIX + "root.admins.alice.maximum-applications",
-1));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.alice"));
assertNoValueForQueues(remaining, ".maximum-applications", csConfig);
}
@Test
public void testQueueMaxAllocations() {
converter.convertQueueHierarchy(rootQueue);
// root.admins vcores + mb
assertEquals("root.admins max vcores", 3,
csConfig.getInt(PREFIX + "root.admins.maximum-allocation-vcores", -1));
assertEquals("root.admins max memory", 4096,
csConfig.getInt(PREFIX + "root.admins.maximum-allocation-mb", -1));
// root.users.john max vcores + mb
assertEquals("root.users.john max vcores", 2,
csConfig.getInt(PREFIX + "root.users.john.maximum-allocation-vcores",
-1));
assertEquals("root.users.john max memory", 8192,
csConfig.getInt(PREFIX + "root.users.john.maximum-allocation-mb", -1));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins", "root.users.john"));
assertNoValueForQueues(remaining, ".maximum-allocation-vcores", csConfig);
assertNoValueForQueues(remaining, ".maximum-allocation-mb", csConfig);
}
@Test
public void testQueuePreemptionDisabled() {
converter = new FSQueueConverter(ruleHandler,
csConfig,
true,
false,
false,
CLUSTER_RESOURCE,
0.16f,
15);
converter.convertQueueHierarchy(rootQueue);
assertTrue("root.admins.alice preemption setting",
csConfig.getBoolean(PREFIX + "root.admins.alice.disable_preemption",
false));
assertTrue("root.users.joe preemption setting",
csConfig.getBoolean(PREFIX + "root.users.joe.disable_preemption",
false));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.admins.alice", "root.users.joe"));
assertNoValueForQueues(remaining, ".disable_preemption", csConfig);
}
@Test
public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
converter.convertQueueHierarchy(rootQueue);
assertNoValueForQueues(ALL_QUEUES, ".disable_preemption", csConfig);
}
@Test
public void testChildCapacity() {
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root.default capacity", "33.333",
csConfig.get(PREFIX + "root.default.capacity"));
assertEquals("root.admins capacity", "33.333",
csConfig.get(PREFIX + "root.admins.capacity"));
assertEquals("root.users capacity", "66.667",
csConfig.get(PREFIX + "root.users.capacity"));
// root.users
assertEquals("root.users.john capacity", "25.000",
csConfig.get(PREFIX + "root.users.john.capacity"));
assertEquals("root.users.joe capacity", "75.000",
csConfig.get(PREFIX + "root.users.joe.capacity"));
// root.admins
assertEquals("root.admins.alice capacity", "75.000",
csConfig.get(PREFIX + "root.admins.alice.capacity"));
assertEquals("root.admins.bob capacity", "25.000",
csConfig.get(PREFIX + "root.admins.bob.capacity"));
}
@Test
public void testQueueMaximumCapacity() {
converter.convertQueueHierarchy(rootQueue);
assertEquals("root.users.joe maximum capacity", "[memory=8192, vcores=8]",
csConfig.get(PREFIX + "root.users.joe.maximum-capacity"));
assertEquals("root.admins.bob maximum capacity", "[memory=8192, vcores=2]",
csConfig.get(PREFIX + "root.admins.bob.maximum-capacity"));
assertEquals("root.admins.alice maximum capacity",
"[memory=16384, vcores=4]",
csConfig.get(PREFIX + "root.admins.alice.maximum-capacity"));
Set<String> remaining = Sets.difference(ALL_QUEUES,
Sets.newHashSet("root.users.joe",
"root.admins.bob",
"root.admins.alice"));
assertNoValueForQueues(remaining, ".maximum-capacity", csConfig);
}
@Test
public void testQueueAutoCreateChildQueue() {
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, true);
converter = new FSQueueConverter(ruleHandler,
csConfig,
false,
false,
true,
CLUSTER_RESOURCE,
0.16f,
15);
converter.convertQueueHierarchy(rootQueue);
assertTrueForQueues(ALL_QUEUES, ".auto-create-child-queue.enabled",
csConfig);
}
@Test
public void testQueueSizeBasedWeightEnabled() {
converter = new FSQueueConverter(ruleHandler,
csConfig,
false,
true,
false,
CLUSTER_RESOURCE,
0.16f,
15);
converter.convertQueueHierarchy(rootQueue);
assertTrueForQueues(ALL_QUEUES,
".ordering-policy.fair.enable-size-based-weight", csConfig);
}
@Test
public void testQueueSizeBasedWeightDisabled() {
converter.convertQueueHierarchy(rootQueue);
assertNoValueForQueues(ALL_QUEUES,
".ordering-policy.fair.enable-size-based-weight", csConfig);
}
@Test
public void testQueueOrderingPolicy() throws Exception {
String absolutePath =
new File("src/test/resources/fair-scheduler-orderingpolicy.xml")
.getAbsolutePath();
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
FILE_PREFIX + absolutePath);
fs.close();
fs = createFairScheduler();
rootQueue = fs.getQueueManager().getRootQueue();
converter.convertQueueHierarchy(rootQueue);
// root
assertEquals("root ordering policy", "fair",
csConfig.get(PREFIX + "root.ordering-policy"));
assertEquals("root.default ordering policy", "fair",
csConfig.get(PREFIX + "root.default.ordering-policy"));
assertEquals("root.admins ordering policy", "fair",
csConfig.get(PREFIX + "root.admins.ordering-policy"));
assertEquals("root.users ordering policy", "fair",
csConfig.get(PREFIX + "root.users.ordering-policy"));
// root.users
assertEquals("root.users.joe ordering policy", "fair",
csConfig.get(PREFIX + "root.users.joe.ordering-policy"));
assertEquals("root.users.john ordering policy", "FIFO",
csConfig.get(PREFIX + "root.users.john.ordering-policy"));
// root.admins
assertEquals("root.admins.alice ordering policy", "FIFO",
csConfig.get(PREFIX + "root.admins.alice.ordering-policy"));
assertEquals("root.admins.bob ordering policy", "fair",
csConfig.get(PREFIX + "root.admins.bob.ordering-policy"));
}
@Test
public void testQueueMaxChildCapacityNotSupported() {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("test");
Mockito.doThrow(new UnsupportedPropertyException("test"))
.when(ruleHandler).handleMaxChildCapacity();
converter.convertQueueHierarchy(rootQueue);
}
@Test
public void testReservationSystemNotSupported() {
expectedException.expect(UnsupportedPropertyException.class);
expectedException.expectMessage("maxCapacity");
Mockito.doThrow(new UnsupportedPropertyException("maxCapacity"))
.when(ruleHandler).handleMaxChildCapacity();
config.setBoolean(YarnConfiguration.RM_RESERVATION_SYSTEM_ENABLE, true);
converter.convertQueueHierarchy(rootQueue);
}
private void assertNoValueForQueues(Set<String> queues, String postfix,
Configuration config) {
for (String queue : queues) {
String key = PREFIX + queue + postfix;
assertNull("Key " + key + " has value, but it should be null",
config.get(key));
}
}
private void assertTrueForQueues(Set<String> queues, String postfix,
Configuration config) {
for (String queue : queues) {
String key = PREFIX + queue + postfix;
assertTrue("Key " + key + " is false, should be true",
config.getBoolean(key, false));
}
}
}

View File

@ -0,0 +1,141 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
import org.junit.Before;
import org.junit.Test;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;
/**
* Unit tests for FSYarnSiteConverter.
*
*/
public class TestFSYarnSiteConverter {
private Configuration yarnConfig;
private FSYarnSiteConverter converter;
private Configuration yarnConvertedConfig;
@Before
public void setup() {
yarnConfig = new Configuration(false);
yarnConvertedConfig = new Configuration(false);
converter = new FSYarnSiteConverter();
}
@SuppressWarnings("deprecation")
@Test
public void testSiteContinuousSchedulingConversion() {
yarnConfig.setBoolean(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_ENABLED, true);
yarnConfig.setInt(
FairSchedulerConfiguration.CONTINUOUS_SCHEDULING_SLEEP_MS, 666);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertTrue("Cont. scheduling", yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.SCHEDULE_ASYNCHRONOUSLY_ENABLE, false));
assertEquals("Scheduling interval", 666,
yarnConvertedConfig.getInt(
"yarn.scheduler.capacity.schedule-asynchronously" +
".scheduling-interval-ms", -1));
}
@Test
public void testSiteMinimumAllocationIncrementConversion() {
yarnConfig.setInt("yarn.resource-types.memory-mb.increment-allocation", 11);
yarnConfig.setInt("yarn.resource-types.vcores.increment-allocation", 5);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertEquals("Memory alloc increment", 11,
yarnConvertedConfig.getInt("yarn.scheduler.minimum-allocation-mb",
-1));
assertEquals("Vcore increment", 5,
yarnConvertedConfig.getInt("yarn.scheduler.minimum-allocation-vcores",
-1));
}
@Test
public void testSitePreemptionConversion() {
yarnConfig.setBoolean(FairSchedulerConfiguration.PREEMPTION, true);
yarnConfig.setInt(FairSchedulerConfiguration.WAIT_TIME_BEFORE_KILL, 123);
yarnConfig.setInt(
FairSchedulerConfiguration.WAIT_TIME_BEFORE_NEXT_STARVATION_CHECK_MS,
321);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertTrue("Preemption enabled",
yarnConvertedConfig.getBoolean(
YarnConfiguration.RM_SCHEDULER_ENABLE_MONITORS,
false));
assertEquals("Wait time before kill", 123,
yarnConvertedConfig.getInt(
CapacitySchedulerConfiguration.PREEMPTION_WAIT_TIME_BEFORE_KILL,
-1));
assertEquals("Starvation check wait time", 321,
yarnConvertedConfig.getInt(
CapacitySchedulerConfiguration.PREEMPTION_MONITORING_INTERVAL,
-1));
}
@Test
public void testSiteAssignMultipleConversion() {
yarnConfig.setBoolean(FairSchedulerConfiguration.ASSIGN_MULTIPLE, true);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertTrue("Assign multiple",
yarnConvertedConfig.getBoolean(
CapacitySchedulerConfiguration.ASSIGN_MULTIPLE_ENABLED,
false));
}
@Test
public void testSiteMaxAssignConversion() {
yarnConfig.setInt(FairSchedulerConfiguration.MAX_ASSIGN, 111);
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertEquals("Max assign", 111,
yarnConvertedConfig.getInt(
CapacitySchedulerConfiguration.MAX_ASSIGN_PER_HEARTBEAT, -1));
}
@Test
public void testSiteLocalityThresholdConversion() {
yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_NODE,
"123.123");
yarnConfig.set(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK,
"321.321");
converter.convertSiteProperties(yarnConfig, yarnConvertedConfig);
assertEquals("Locality threshold node", "123.123",
yarnConvertedConfig.get(
CapacitySchedulerConfiguration.NODE_LOCALITY_DELAY));
assertEquals("Locality threshold rack", "321.321",
yarnConvertedConfig.get(
CapacitySchedulerConfiguration.RACK_LOCALITY_ADDITIONAL_DELAY));
}
}

View File

@ -0,0 +1,24 @@
#
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License" you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
#
maxCapacityPercentage.action=abort
maxChildCapacity.action=ABORT
userMaxRunningApps.action=abort
userMaxAppsDefault.action=ABORT
dynamicMaxAssign.action=abort
specifiedNotFirstRule.action=ABORT
reservationSystem.action=abort
queueAutoCreate.action=ABORT

View File

@ -0,0 +1,94 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<user name="alice">
<maxRunningApps>30</maxRunningApps>
</user>
<userMaxAppsDefault>10</userMaxAppsDefault>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -0,0 +1,21 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
</queue>
</allocations>

View File

@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>drf</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>fifo</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="alice">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>fifo</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
</queue>
<queue name="bob">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>fair</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -0,0 +1,90 @@
<?xml version="1.0" encoding="UTF-8" standalone="yes"?>
<!--
Licensed to the Apache Software Foundation (ASF) under one
or more contributor license agreements. See the NOTICE file
distributed with this work for additional information
regarding copyright ownership. The ASF licenses this file
to you under the Apache License, Version 2.0 (the
"License"); you may not use this file except in compliance
with the License. You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
-->
<allocations>
<queue name="root">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice,bob,joe,john hadoop_users</aclSubmitApps>
<aclAdministerApps>alice,bob,joe,john hadoop_users</aclAdministerApps>
<queue name="default">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
</queue>
<queue name="users" type="parent">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<queue name="john">
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>john </aclSubmitApps>
<aclAdministerApps>john </aclAdministerApps>
<maxContainerAllocation>vcores=2,memory-mb=8192</maxContainerAllocation>
</queue>
<queue name="joe">
<maxResources>memory-mb=50.0%, vcores=50.0%</maxResources>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>joe </aclSubmitApps>
<aclAdministerApps>joe </aclAdministerApps>
</queue>
</queue>
<queue name="admins" type="parent">
<maxChildResources>memory-mb=8192, vcores=1</maxChildResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<maxContainerAllocation>vcores=3,memory-mb=4096</maxContainerAllocation>
<queue name="john">
<maxResources>memory-mb=16384, vcores=4</maxResources>
<maxRunningApps>2</maxRunningApps>
<weight>3.0</weight>
<allowPreemptionFrom>false</allowPreemptionFrom>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>alice </aclSubmitApps>
<aclAdministerApps>alice </aclAdministerApps>
<maxAMShare>0.15</maxAMShare>
<reservation>memory-mb=16384, vcores=4</reservation>
</queue>
<queue name="joe">
<maxResources>memory-mb=8192, vcores=2</maxResources>
<weight>1.0</weight>
<schedulingPolicy>drf</schedulingPolicy>
<aclSubmitApps>bob </aclSubmitApps>
<aclAdministerApps>bob </aclAdministerApps>
<maxAMShare>-1.0</maxAMShare>
</queue>
</queue>
</queue>
<defaultFairSharePreemptionTimeout>23</defaultFairSharePreemptionTimeout>
<defaultMinSharePreemptionTimeout>24</defaultMinSharePreemptionTimeout>
<defaultFairSharePreemptionThreshold>0.12</defaultFairSharePreemptionThreshold>
<queueMaxAppsDefault>15</queueMaxAppsDefault>
<defaultQueueSchedulingPolicy>fair</defaultQueueSchedulingPolicy>
<queueMaxAMShareDefault>0.16</queueMaxAMShareDefault>
<queuePlacementPolicy>
<rule name="nestedUserQueue" create="false">
<rule name="default" create="false" queue="admins.devs"/>
</rule>
<rule name="specified" create="true"/>
<rule name="nestedUserQueue" create="true">
<rule name="default" create="false" queue="users"/>
</rule>
<rule name="default"/>
</queuePlacementPolicy>
</allocations>

View File

@ -0,0 +1,23 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<configuration>
<property>
<name>yarn.scheduler.fair.allocation.file</name>
<value>fair-scheduler-conversion.xml</value>
</property>
</configuration>

View File

@ -0,0 +1,37 @@
<?xml version="1.0" encoding="UTF-8"?>
<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>
<!--
~ Licensed to the Apache Software Foundation (ASF) under one
~ or more contributor license agreements. See the NOTICE file
~ distributed with this work for additional information
~ regarding copyright ownership. The ASF licenses this file
~ to you under the Apache License, Version 2.0 (the
~ "License"); you may not use this file except in compliance
~ with the License. You may obtain a copy of the License at
~ http://www.apache.org/licenses/LICENSE-2.0
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
-->
<!--
Licensed under the Apache License, Version 2.0 (the "License");
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
http://www.apache.org/licenses/LICENSE-2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an "AS IS" BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License. See accompanying LICENSE file.
-->
<configuration>
<property>
<name>yarn.scheduler.fair.allocation.file
</property>
</configuration>

View File

@ -193,6 +193,7 @@ Usage: `yarn resourcemanager [-format-state-store]`
|:---- |:---- |
| -format-state-store | Formats the RMStateStore. This will clear the RMStateStore and is useful if past applications are no longer needed. This should be run only when the ResourceManager is not running. |
| -remove-application-from-state-store \<appId\> | Remove the application from RMStateStore. This should be run only when the ResourceManager is not running. |
| -convert-fs-configuration [-y&#124;yarnsiteconfig] [-f&#124;fsconfig] [-r&#124;rulesconfig] [-o&#124;output-directory] [-p&#124;print] [-c&#124;cluster-resource] | WARNING: This feature is experimental and not intended for production use! Development is still in progress so the converter should not be considered complete! <br/> Converts the specified Fair Scheduler configuration to Capacity Scheduler configuration. Requires two mandatory input files. First, the yarn-site.xml with the following format: [-y&#124;yarnsiteconfig [\<Path to the yarn-site.xml file\>]. Secondly, the fair-scheduler.xml with the following format: [-f&#124;fsconfig [\<Path to the fair-scheduler.xml file\>]. This config is not mandatory if there is a reference in yarn-site.xml to the fair-scheduler.xml with the property 'yarn.scheduler.fair.allocation.file'. If both are defined, the -f option has precedence. The output directory of the config files should be specified as well, with: \[-o&#124;output-directory\ \<directory\>]. An optional rules config file could be also specified with the following format: [-r&#124;rulesconfig \<Path to the conversion rules file\>]. The rule config file's format is a property file. There's an additional \[-p&#124;print\] parameter, which is optional. If defined, the configuration will be emitted to the console instead. In its normal operation, the output files (yarn-site.xml and capacity-scheduler.xml) of this command is generated to the specified output directory. The cluster resource parameter (\[-c&#124;cluster-resource\] \<resource\>\]) needs to be specified if any queue has a maxResources setting with value as percentage. The format of the resource string is the same as in fair-scheduler.xml.) ] |
Start the ResourceManager