YARN-10413. Change fs2cs to generate mapping rules in the new format. Contributed by Peter Bacsko
This commit is contained in:
parent
75d10f8499
commit
a7a1f1541a
@ -27,11 +27,7 @@
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import javax.xml.parsers.DocumentBuilder;
|
||||
import javax.xml.parsers.DocumentBuilderFactory;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.FileSystem;
|
||||
import org.apache.hadoop.fs.Path;
|
||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
@ -43,22 +39,21 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationFileLoaderService;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.w3c.dom.Document;
|
||||
import org.w3c.dom.Element;
|
||||
import org.w3c.dom.NodeList;
|
||||
|
||||
import com.fasterxml.jackson.core.util.DefaultPrettyPrinter;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.fasterxml.jackson.databind.ObjectWriter;
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
@ -74,6 +69,8 @@ public class FSConfigToCSConfigConverter {
|
||||
"capacity-scheduler.xml";
|
||||
private static final String FAIR_SCHEDULER_XML =
|
||||
"fair-scheduler.xml";
|
||||
private static final String MAPPING_RULES_JSON =
|
||||
"mapping-rules.json";
|
||||
|
||||
public static final String WARNING_TEXT =
|
||||
"WARNING: This feature is experimental and not intended " +
|
||||
@ -86,23 +83,22 @@ public class FSConfigToCSConfigConverter {
|
||||
private Map<String, Integer> userMaxApps;
|
||||
private int userMaxAppsDefault;
|
||||
|
||||
private boolean autoCreateChildQueues = false;
|
||||
private boolean sizeBasedWeight = false;
|
||||
private boolean userAsDefaultQueue = false;
|
||||
private ConversionOptions conversionOptions;
|
||||
private boolean drfUsed = false;
|
||||
|
||||
private Configuration convertedYarnSiteConfig;
|
||||
private Configuration capacitySchedulerConfig;
|
||||
private CapacitySchedulerConfiguration capacitySchedulerConfig;
|
||||
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||
private QueuePlacementConverter placementConverter;
|
||||
|
||||
private OutputStream yarnSiteOutputStream;
|
||||
private OutputStream capacitySchedulerOutputStream;
|
||||
private OutputStream mappingRulesOutputStream;
|
||||
|
||||
private boolean consoleMode = false;
|
||||
private boolean convertPlacementRules = false;
|
||||
|
||||
|
||||
private boolean convertPlacementRules = true;
|
||||
private String outputDirectory;
|
||||
|
||||
public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
|
||||
ruleHandler, ConversionOptions conversionOptions) {
|
||||
@ -110,24 +106,25 @@ public FSConfigToCSConfigConverter(FSConfigToCSConfigRuleHandler
|
||||
this.conversionOptions = conversionOptions;
|
||||
this.yarnSiteOutputStream = System.out;
|
||||
this.capacitySchedulerOutputStream = System.out;
|
||||
this.mappingRulesOutputStream = System.out;
|
||||
this.placementConverter = new QueuePlacementConverter();
|
||||
}
|
||||
|
||||
public void convert(FSConfigToCSConfigConverterParams params)
|
||||
throws Exception {
|
||||
validateParams(params);
|
||||
prepareOutputFiles(params.getOutputDirectory(), params.isConsole());
|
||||
this.clusterResource = getClusterResource(params);
|
||||
this.convertPlacementRules = params.isConvertPlacementRules();
|
||||
this.outputDirectory = params.getOutputDirectory();
|
||||
prepareOutputFiles(params.isConsole());
|
||||
loadConversionRules(params.getConversionRulesConfig());
|
||||
Configuration inputYarnSiteConfig = getInputYarnSiteConfig(params);
|
||||
handleFairSchedulerConfig(params, inputYarnSiteConfig);
|
||||
|
||||
this.clusterResource = getClusterResource(params);
|
||||
this.convertPlacementRules = params.isConvertPlacementRules();
|
||||
|
||||
convert(inputYarnSiteConfig);
|
||||
}
|
||||
|
||||
private void prepareOutputFiles(String outputDirectory, boolean console)
|
||||
private void prepareOutputFiles(boolean console)
|
||||
throws FileNotFoundException {
|
||||
if (console) {
|
||||
LOG.info("Console mode is enabled, " + YARN_SITE_XML + " and" +
|
||||
@ -240,8 +237,6 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {
|
||||
FairScheduler fs = new FairScheduler();
|
||||
fs.setRMContext(ctx);
|
||||
fs.init(fsConfig);
|
||||
boolean havePlacementPolicies =
|
||||
checkPlacementPoliciesPresent(fs, inputYarnSiteConfig);
|
||||
|
||||
drfUsed = isDrfUsed(fs);
|
||||
|
||||
@ -252,9 +247,10 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {
|
||||
queueMaxAMShareDefault = allocConf.getQueueMaxAMShareDefault();
|
||||
|
||||
convertedYarnSiteConfig = new Configuration(false);
|
||||
capacitySchedulerConfig = new Configuration(false);
|
||||
capacitySchedulerConfig =
|
||||
new CapacitySchedulerConfiguration(new Configuration(false));
|
||||
|
||||
convertYarnSiteXml(inputYarnSiteConfig, havePlacementPolicies);
|
||||
convertYarnSiteXml(inputYarnSiteConfig);
|
||||
convertCapacitySchedulerXml(fs);
|
||||
|
||||
if (consoleMode) {
|
||||
@ -267,22 +263,19 @@ void convert(Configuration inputYarnSiteConfig) throws Exception {
|
||||
System.out.println("======= " + YARN_SITE_XML + " =======");
|
||||
}
|
||||
convertedYarnSiteConfig.writeXml(yarnSiteOutputStream);
|
||||
|
||||
if (convertPlacementRules) {
|
||||
performRuleConversion(fs);
|
||||
}
|
||||
}
|
||||
|
||||
private void convertYarnSiteXml(Configuration inputYarnSiteConfig,
|
||||
boolean havePlacementPolicies) {
|
||||
private void convertYarnSiteXml(Configuration inputYarnSiteConfig) {
|
||||
FSYarnSiteConverter siteConverter =
|
||||
new FSYarnSiteConverter();
|
||||
siteConverter.convertSiteProperties(inputYarnSiteConfig,
|
||||
convertedYarnSiteConfig, drfUsed,
|
||||
conversionOptions.isEnableAsyncScheduler());
|
||||
|
||||
// See docs: "allow-undeclared-pools" and "user-as-default-queue" are
|
||||
// ignored if we have placement rules
|
||||
autoCreateChildQueues =
|
||||
!havePlacementPolicies && siteConverter.isAutoCreateChildQueues();
|
||||
userAsDefaultQueue =
|
||||
!havePlacementPolicies && siteConverter.isUserAsDefaultQueue();
|
||||
preemptionEnabled = siteConverter.isPreemptionEnabled();
|
||||
sizeBasedWeight = siteConverter.isSizeBasedWeight();
|
||||
|
||||
@ -301,7 +294,6 @@ private void convertCapacitySchedulerXml(FairScheduler fs) {
|
||||
.withCapacitySchedulerConfig(capacitySchedulerConfig)
|
||||
.withPreemptionEnabled(preemptionEnabled)
|
||||
.withSizeBasedWeight(sizeBasedWeight)
|
||||
.withAutoCreateChildQueues(autoCreateChildQueues)
|
||||
.withClusterResource(clusterResource)
|
||||
.withQueueMaxAMShareDefault(queueMaxAMShareDefault)
|
||||
.withQueueMaxAppsDefault(queueMaxAppsDefault)
|
||||
@ -311,20 +303,35 @@ private void convertCapacitySchedulerXml(FairScheduler fs) {
|
||||
|
||||
queueConverter.convertQueueHierarchy(rootQueue);
|
||||
emitACLs(fs);
|
||||
}
|
||||
|
||||
if (convertPlacementRules) {
|
||||
LOG.info("Converting placement rules");
|
||||
PlacementManager placementManager =
|
||||
fs.getRMContext().getQueuePlacementManager();
|
||||
private void performRuleConversion(FairScheduler fs)
|
||||
throws IOException {
|
||||
LOG.info("Converting placement rules");
|
||||
|
||||
if (placementManager.getPlacementRules().size() > 0) {
|
||||
Map<String, String> properties =
|
||||
placementConverter.convertPlacementPolicy(placementManager,
|
||||
ruleHandler, userAsDefaultQueue);
|
||||
properties.forEach((k, v) -> capacitySchedulerConfig.set(k, v));
|
||||
PlacementManager placementManager =
|
||||
fs.getRMContext().getQueuePlacementManager();
|
||||
|
||||
if (placementManager.getPlacementRules().size() > 0) {
|
||||
if (!consoleMode) {
|
||||
File mappingRulesFile = new File(outputDirectory,
|
||||
MAPPING_RULES_JSON);
|
||||
this.mappingRulesOutputStream =
|
||||
new FileOutputStream(mappingRulesFile);
|
||||
} else {
|
||||
System.out.println("======= " + MAPPING_RULES_JSON + " =======");
|
||||
}
|
||||
|
||||
MappingRulesDescription desc =
|
||||
placementConverter.convertPlacementPolicy(placementManager,
|
||||
ruleHandler, capacitySchedulerConfig);
|
||||
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
ObjectWriter writer = mapper.writer(new DefaultPrettyPrinter());
|
||||
|
||||
writer.writeValue(mappingRulesOutputStream, desc);
|
||||
} else {
|
||||
LOG.info("Ignoring the conversion of placement rules");
|
||||
LOG.info("No rules to convert");
|
||||
}
|
||||
}
|
||||
|
||||
@ -431,7 +438,7 @@ Resource getClusterResource() {
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void setClusterResource(Resource clusterResource) {
|
||||
void setClusterResource(Resource clusterResource) {
|
||||
this.clusterResource = clusterResource;
|
||||
}
|
||||
|
||||
@ -459,43 +466,14 @@ void setConvertPlacementRules(boolean convertPlacementRules) {
|
||||
void setPlacementConverter(QueuePlacementConverter converter) {
|
||||
this.placementConverter = converter;
|
||||
}
|
||||
/*
|
||||
* Determines whether <queuePlacementPolicy> is present
|
||||
* in the allocation file or not.
|
||||
*
|
||||
* Note that placementManager.getPlacementRules.size()
|
||||
* doesn't work - by default, "allow-undeclared-pools" and
|
||||
* "user-as-default-queue" are translated to policies internally
|
||||
* inside QueuePlacementPolicy.fromConfiguration().
|
||||
*
|
||||
*/
|
||||
private boolean checkPlacementPoliciesPresent(FairScheduler scheduler,
|
||||
Configuration inputYarnSiteConfig)
|
||||
throws RuntimeException {
|
||||
|
||||
try (AllocationFileLoaderService loader =
|
||||
new AllocationFileLoaderService(scheduler)){
|
||||
@VisibleForTesting
|
||||
void setMappingRulesOutputStream(OutputStream outputStream) {
|
||||
this.mappingRulesOutputStream = outputStream;
|
||||
}
|
||||
|
||||
Path allocFilePath = loader.getAllocationFile(inputYarnSiteConfig);
|
||||
FileSystem fs = allocFilePath.getFileSystem(inputYarnSiteConfig);
|
||||
|
||||
DocumentBuilderFactory docBuilderFactory =
|
||||
DocumentBuilderFactory.newInstance();
|
||||
DocumentBuilder builder = docBuilderFactory.newDocumentBuilder();
|
||||
Document doc = builder.parse(fs.open(allocFilePath));
|
||||
Element root = doc.getDocumentElement();
|
||||
|
||||
NodeList elements = root.getChildNodes();
|
||||
|
||||
AllocationFileParser allocationFileParser =
|
||||
new AllocationFileParser(elements);
|
||||
allocationFileParser.parse();
|
||||
docBuilderFactory.setIgnoringComments(true);
|
||||
|
||||
return
|
||||
allocationFileParser.getQueuePlacementPolicy().isPresent();
|
||||
} catch (Exception e) {
|
||||
throw new PreconditionException("Unable to parse allocation file", e);
|
||||
}
|
||||
@VisibleForTesting
|
||||
void setConsoleMode(boolean console) {
|
||||
this.consoleMode = console;
|
||||
}
|
||||
}
|
||||
|
@ -82,8 +82,14 @@ public class FSConfigToCSConfigRuleHandler {
|
||||
public static final String FAIR_AS_DRF =
|
||||
"fairAsDrf.action";
|
||||
|
||||
public static final String MAPPED_DYNAMIC_QUEUE =
|
||||
"mappedDynamicQueue.action";
|
||||
public static final String QUEUE_DYNAMIC_CREATE =
|
||||
"queueDynamicCreate.action";
|
||||
|
||||
public static final String PARENT_DYNAMIC_CREATE =
|
||||
"parentDynamicCreate.action";
|
||||
|
||||
public static final String CHILD_STATIC_DYNAMIC_CONFLICT =
|
||||
"childStaticDynamicConflict.action";
|
||||
|
||||
@VisibleForTesting
|
||||
enum RuleAction {
|
||||
@ -133,7 +139,9 @@ public void initPropertyActions() {
|
||||
setActionForProperty(RESERVATION_SYSTEM);
|
||||
setActionForProperty(QUEUE_AUTO_CREATE);
|
||||
setActionForProperty(FAIR_AS_DRF);
|
||||
setActionForProperty(MAPPED_DYNAMIC_QUEUE);
|
||||
setActionForProperty(QUEUE_DYNAMIC_CREATE);
|
||||
setActionForProperty(PARENT_DYNAMIC_CREATE);
|
||||
setActionForProperty(CHILD_STATIC_DYNAMIC_CONFLICT);
|
||||
}
|
||||
|
||||
public void handleMaxCapacityPercentage(String queueName) {
|
||||
@ -175,28 +183,12 @@ public void handleDynamicMaxAssign() {
|
||||
FairSchedulerConfiguration.DYNAMIC_MAX_ASSIGN, null);
|
||||
}
|
||||
|
||||
public void handleSpecifiedNotFirstRule() {
|
||||
handle(SPECIFIED_NOT_FIRST,
|
||||
null,
|
||||
"The <specified> tag is not the first placement rule, this cannot be"
|
||||
+ " converted properly");
|
||||
}
|
||||
|
||||
public void handleReservationSystem() {
|
||||
handle(RESERVATION_SYSTEM,
|
||||
null,
|
||||
"Conversion of reservation system is not supported");
|
||||
}
|
||||
|
||||
public void handleQueueAutoCreate(String placementRule) {
|
||||
handle(QUEUE_AUTO_CREATE,
|
||||
null,
|
||||
format(
|
||||
"Placement rules: queue auto-create is not supported (type: %s),"
|
||||
+ " please configure auto-create-child-queue property manually",
|
||||
placementRule));
|
||||
}
|
||||
|
||||
public void handleFairAsDrf(String queueName) {
|
||||
handle(FAIR_AS_DRF,
|
||||
null,
|
||||
@ -205,19 +197,30 @@ public void handleFairAsDrf(String queueName) {
|
||||
queueName));
|
||||
}
|
||||
|
||||
public void handleDynamicMappedQueue(String mapping, boolean create) {
|
||||
String msg = "Mapping rule %s is dynamic - this might cause inconsistent"
|
||||
+ " behaviour compared to FS.";
|
||||
public void handleRuleAutoCreateFlag(String queue) {
|
||||
String msg = format("Placement rules: create=true is enabled for"
|
||||
+ " path %s - you have to make sure that these queues are"
|
||||
+ " managed queues and set auto-create-child-queues=true."
|
||||
+ " Other queues cannot statically exist under this path!", queue);
|
||||
|
||||
if (create) {
|
||||
msg += " Also, setting auto-create-child-queue=true is"
|
||||
+ " necessary, because the create flag was set to true on the"
|
||||
+ " original placement rule.";
|
||||
}
|
||||
handle(QUEUE_DYNAMIC_CREATE, null, msg);
|
||||
}
|
||||
|
||||
handle(MAPPED_DYNAMIC_QUEUE,
|
||||
null,
|
||||
format(msg, mapping));
|
||||
public void handleFSParentCreateFlag(String parentPath) {
|
||||
String msg = format("Placement rules: create=true is enabled for parent"
|
||||
+ " path %s - this is not supported in Capacity Scheduler."
|
||||
+ " The parent must exist as a static queue and cannot be"
|
||||
+ " created automatically", parentPath);
|
||||
|
||||
handle(PARENT_DYNAMIC_CREATE, null, msg);
|
||||
}
|
||||
|
||||
public void handleChildStaticDynamicConflict(String parentPath) {
|
||||
String msg = String.format("Placement rules: rule maps to"
|
||||
+ " path %s, but this queue already contains static queue definitions!"
|
||||
+ " This configuration is invalid and *must* be corrected", parentPath);
|
||||
|
||||
handle(CHILD_STATIC_DYNAMIC_CONFLICT, null, msg);
|
||||
}
|
||||
|
||||
private void handle(String actionName, String fsSetting, String message) {
|
||||
|
@ -27,7 +27,6 @@
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSLeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||
@ -54,7 +53,6 @@ public class FSQueueConverter {
|
||||
@SuppressWarnings("unused")
|
||||
private final Resource clusterResource;
|
||||
private final float queueMaxAMShareDefault;
|
||||
private final boolean autoCreateChildQueues;
|
||||
private final int queueMaxAppsDefault;
|
||||
private final boolean drfUsed;
|
||||
|
||||
@ -67,7 +65,6 @@ public FSQueueConverter(FSQueueConverterBuilder builder) {
|
||||
this.sizeBasedWeight = builder.sizeBasedWeight;
|
||||
this.clusterResource = builder.clusterResource;
|
||||
this.queueMaxAMShareDefault = builder.queueMaxAMShareDefault;
|
||||
this.autoCreateChildQueues = builder.autoCreateChildQueues;
|
||||
this.queueMaxAppsDefault = builder.queueMaxAppsDefault;
|
||||
this.conversionOptions = builder.conversionOptions;
|
||||
this.drfUsed = builder.drfUsed;
|
||||
@ -85,7 +82,6 @@ public void convertQueueHierarchy(FSQueue queue) {
|
||||
|
||||
emitChildCapacity(queue);
|
||||
emitMaximumCapacity(queueName, queue);
|
||||
emitAutoCreateChildQueue(queueName, queue);
|
||||
emitSizeBasedWeight(queueName);
|
||||
emitOrderingPolicy(queueName, queue);
|
||||
checkMaxChildCapacitySetting(queue);
|
||||
@ -219,20 +215,6 @@ private void emitPreemptionDisabled(String queueName, FSQueue queue) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* yarn.scheduler.fair.allow-undeclared-pools
|
||||
* ==> yarn.scheduler.capacity.<queue-name>
|
||||
* .auto-create-child-queue.enabled.
|
||||
* @param queueName
|
||||
*/
|
||||
private void emitAutoCreateChildQueue(String queueName, FSQueue queue) {
|
||||
if (autoCreateChildQueues && !queue.getChildQueues().isEmpty()
|
||||
&& !queueName.equals(CapacitySchedulerConfiguration.ROOT)) {
|
||||
capacitySchedulerConfig.setBoolean(PREFIX + queueName +
|
||||
".auto-create-child-queue.enabled", true);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* yarn.scheduler.fair.sizebasedweight ==>
|
||||
* yarn.scheduler.capacity.<queue-path>
|
||||
|
@ -27,7 +27,6 @@ public final class FSQueueConverterBuilder {
|
||||
Configuration capacitySchedulerConfig;
|
||||
boolean preemptionEnabled;
|
||||
boolean sizeBasedWeight;
|
||||
boolean autoCreateChildQueues;
|
||||
Resource clusterResource;
|
||||
float queueMaxAMShareDefault;
|
||||
int queueMaxAppsDefault;
|
||||
@ -65,12 +64,6 @@ public FSQueueConverterBuilder withSizeBasedWeight(
|
||||
return this;
|
||||
}
|
||||
|
||||
public FSQueueConverterBuilder withAutoCreateChildQueues(
|
||||
boolean autoCreateChildQueues) {
|
||||
this.autoCreateChildQueues = autoCreateChildQueues;
|
||||
return this;
|
||||
}
|
||||
|
||||
public FSQueueConverterBuilder withClusterResource(
|
||||
Resource resource) {
|
||||
this.clusterResource = resource;
|
||||
|
@ -32,9 +32,7 @@
|
||||
*/
|
||||
public class FSYarnSiteConverter {
|
||||
private boolean preemptionEnabled;
|
||||
private boolean autoCreateChildQueues;
|
||||
private boolean sizeBasedWeight;
|
||||
private boolean userAsDefaultQueue;
|
||||
|
||||
@SuppressWarnings({"deprecation", "checkstyle:linelength"})
|
||||
public void convertSiteProperties(Configuration conf,
|
||||
@ -111,21 +109,11 @@ public void convertSiteProperties(Configuration conf,
|
||||
localityThresholdRack);
|
||||
}
|
||||
|
||||
if (conf.getBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
|
||||
FairSchedulerConfiguration.DEFAULT_ALLOW_UNDECLARED_POOLS)) {
|
||||
autoCreateChildQueues = true;
|
||||
}
|
||||
|
||||
if (conf.getBoolean(FairSchedulerConfiguration.SIZE_BASED_WEIGHT,
|
||||
FairSchedulerConfiguration.DEFAULT_SIZE_BASED_WEIGHT)) {
|
||||
sizeBasedWeight = true;
|
||||
}
|
||||
|
||||
if (conf.getBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
|
||||
FairSchedulerConfiguration.DEFAULT_USER_AS_DEFAULT_QUEUE)) {
|
||||
userAsDefaultQueue = true;
|
||||
}
|
||||
|
||||
if (drfUsed) {
|
||||
yarnSiteConfig.set(
|
||||
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
@ -141,15 +129,7 @@ public boolean isPreemptionEnabled() {
|
||||
return preemptionEnabled;
|
||||
}
|
||||
|
||||
public boolean isAutoCreateChildQueues() {
|
||||
return autoCreateChildQueues;
|
||||
}
|
||||
|
||||
public boolean isSizeBasedWeight() {
|
||||
return sizeBasedWeight;
|
||||
}
|
||||
|
||||
public boolean isUserAsDefaultQueue() {
|
||||
return userAsDefaultQueue;
|
||||
}
|
||||
}
|
@ -15,8 +15,8 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.DefaultPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.FSPlacementRule;
|
||||
@ -27,106 +27,175 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.FallbackResult;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Type;
|
||||
|
||||
class QueuePlacementConverter {
|
||||
private static final FallbackResult SKIP_RESULT = FallbackResult.SKIP;
|
||||
private static final String DEFAULT_QUEUE = "root.default";
|
||||
private static final String MATCH_ALL_USER = "*";
|
||||
|
||||
private static final String USER = "%user";
|
||||
private static final String PRIMARY_GROUP = "%primary_group";
|
||||
private static final String SECONDARY_GROUP = "%secondary_group";
|
||||
MappingRulesDescription convertPlacementPolicy(
|
||||
PlacementManager placementManager,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler,
|
||||
CapacitySchedulerConfiguration convertedCSconfig) {
|
||||
|
||||
private static final String RULE_SEPARATOR = ",";
|
||||
MappingRulesDescription desc = new MappingRulesDescription();
|
||||
List<Rule> rules = new ArrayList<>();
|
||||
|
||||
Map<String, String> convertPlacementPolicy(PlacementManager placementManager,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler, boolean userAsDefaultQueue) {
|
||||
StringBuilder mapping = new StringBuilder();
|
||||
Map<String, String> properties = new HashMap<>();
|
||||
for (final PlacementRule fsRule : placementManager.getPlacementRules()) {
|
||||
boolean create = ((FSPlacementRule)fsRule).getCreateFlag();
|
||||
|
||||
if (userAsDefaultQueue) {
|
||||
mapping.append("u:" + USER + ":" + USER);
|
||||
}
|
||||
|
||||
int ruleCount = 0;
|
||||
for (PlacementRule rule : placementManager.getPlacementRules()) {
|
||||
if (((FSPlacementRule)rule).getCreateFlag()) {
|
||||
ruleHandler.handleQueueAutoCreate(rule.getName());
|
||||
}
|
||||
|
||||
ruleCount++;
|
||||
if (rule instanceof UserPlacementRule) {
|
||||
UserPlacementRule userRule = (UserPlacementRule) rule;
|
||||
if (fsRule instanceof UserPlacementRule) {
|
||||
UserPlacementRule userRule = (UserPlacementRule) fsRule;
|
||||
|
||||
// nested rule
|
||||
if (userRule.getParentRule() != null) {
|
||||
handleNestedRule(mapping, userRule, ruleHandler);
|
||||
handleNestedRule(rules,
|
||||
userRule,
|
||||
ruleHandler,
|
||||
create,
|
||||
convertedCSconfig);
|
||||
} else {
|
||||
if (!userAsDefaultQueue) {
|
||||
if (mapping.length() > 0) {
|
||||
mapping.append(RULE_SEPARATOR);
|
||||
}
|
||||
mapping.append("u:" + USER + ":" + USER);
|
||||
}
|
||||
rules.add(createRule(Policy.USER, create, ruleHandler));
|
||||
}
|
||||
} else if (rule instanceof SpecifiedPlacementRule) {
|
||||
if (ruleCount > 1) {
|
||||
ruleHandler.handleSpecifiedNotFirstRule();
|
||||
} else if (fsRule instanceof SpecifiedPlacementRule) {
|
||||
rules.add(createRule(Policy.SPECIFIED, create, ruleHandler));
|
||||
} else if (fsRule instanceof PrimaryGroupPlacementRule) {
|
||||
rules.add(createRule(Policy.PRIMARY_GROUP, create, ruleHandler));
|
||||
} else if (fsRule instanceof DefaultPlacementRule) {
|
||||
DefaultPlacementRule defaultRule = (DefaultPlacementRule) fsRule;
|
||||
String defaultQueueName = defaultRule.defaultQueueName;
|
||||
|
||||
Rule rule;
|
||||
if (DEFAULT_QUEUE.equals(defaultQueueName)) {
|
||||
rule = createRule(Policy.DEFAULT_QUEUE, create, ruleHandler);
|
||||
} else {
|
||||
rule = createRule(Policy.CUSTOM, create, ruleHandler);
|
||||
rule.setCustomPlacement(defaultQueueName);
|
||||
}
|
||||
properties.put(
|
||||
"yarn.scheduler.capacity.queue-mappings-override.enable", "false");
|
||||
} else if (rule instanceof PrimaryGroupPlacementRule) {
|
||||
if (mapping.length() > 0) {
|
||||
mapping.append(RULE_SEPARATOR);
|
||||
}
|
||||
mapping.append("u:" + USER + ":" + PRIMARY_GROUP);
|
||||
} else if (rule instanceof DefaultPlacementRule) {
|
||||
DefaultPlacementRule defaultRule = (DefaultPlacementRule) rule;
|
||||
if (mapping.length() > 0) {
|
||||
mapping.append(RULE_SEPARATOR);
|
||||
}
|
||||
mapping.append("u:" + USER + ":").append(defaultRule.defaultQueueName);
|
||||
} else if (rule instanceof SecondaryGroupExistingPlacementRule) {
|
||||
if (mapping.length() > 0) {
|
||||
mapping.append(RULE_SEPARATOR);
|
||||
}
|
||||
mapping.append("u:" + USER + ":" + SECONDARY_GROUP);
|
||||
} else if (!(rule instanceof RejectPlacementRule)) {
|
||||
throw new IllegalArgumentException("Unknown placement rule: " + rule);
|
||||
|
||||
rules.add(rule);
|
||||
} else if (fsRule instanceof SecondaryGroupExistingPlacementRule) {
|
||||
Rule rule = createRule(Policy.SECONDARY_GROUP, create, ruleHandler);
|
||||
rules.add(rule);
|
||||
} else if (fsRule instanceof RejectPlacementRule) {
|
||||
rules.add(createRule(Policy.REJECT, false, ruleHandler));
|
||||
} else {
|
||||
throw new IllegalArgumentException("Unknown placement rule: " + fsRule);
|
||||
}
|
||||
}
|
||||
|
||||
if (mapping.length() > 0) {
|
||||
properties.put("yarn.scheduler.capacity.queue-mappings",
|
||||
mapping.toString());
|
||||
}
|
||||
desc.setRules(rules);
|
||||
|
||||
return properties;
|
||||
return desc;
|
||||
}
|
||||
|
||||
private void handleNestedRule(StringBuilder mapping,
|
||||
UserPlacementRule userRule, FSConfigToCSConfigRuleHandler ruleHandler) {
|
||||
PlacementRule pr = userRule.getParentRule();
|
||||
if (mapping.length() > 0) {
|
||||
mapping.append(RULE_SEPARATOR);
|
||||
}
|
||||
if (pr instanceof PrimaryGroupPlacementRule) {
|
||||
String mappingString = "u:" + USER + ":" + PRIMARY_GROUP + "." + USER;
|
||||
ruleHandler.handleDynamicMappedQueue(mappingString,
|
||||
((PrimaryGroupPlacementRule) pr).getCreateFlag());
|
||||
mapping.append(mappingString);
|
||||
} else if (pr instanceof SecondaryGroupExistingPlacementRule) {
|
||||
String mappingString = "u:" + USER + ":" + SECONDARY_GROUP + "." + USER;
|
||||
ruleHandler.handleDynamicMappedQueue(mappingString,
|
||||
((SecondaryGroupExistingPlacementRule) pr).getCreateFlag());
|
||||
mapping.append("u:" + USER + ":" + SECONDARY_GROUP + "." + USER);
|
||||
} else if (pr instanceof DefaultPlacementRule) {
|
||||
DefaultPlacementRule defaultRule = (DefaultPlacementRule) pr;
|
||||
String mappingString =
|
||||
"u:" + USER + ":" + defaultRule.defaultQueueName + "." + USER;
|
||||
ruleHandler.handleDynamicMappedQueue(mappingString,
|
||||
defaultRule.getCreateFlag());
|
||||
mapping.append(mappingString);
|
||||
private void handleNestedRule(List<Rule> rules,
|
||||
UserPlacementRule userRule,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler,
|
||||
boolean create,
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
PlacementRule parentRule = userRule.getParentRule();
|
||||
boolean parentCreate = ((FSPlacementRule) parentRule).getCreateFlag();
|
||||
Policy policy;
|
||||
String queueName = null;
|
||||
|
||||
if (parentRule instanceof PrimaryGroupPlacementRule) {
|
||||
policy = Policy.PRIMARY_GROUP_USER;
|
||||
} else if (parentRule instanceof SecondaryGroupExistingPlacementRule) {
|
||||
policy = Policy.SECONDARY_GROUP_USER;
|
||||
} else if (parentRule instanceof DefaultPlacementRule) {
|
||||
DefaultPlacementRule defaultRule = (DefaultPlacementRule) parentRule;
|
||||
policy = Policy.USER;
|
||||
queueName = defaultRule.defaultQueueName;
|
||||
} else {
|
||||
throw new UnsupportedOperationException("Unsupported nested rule: "
|
||||
+ pr.getClass().getCanonicalName());
|
||||
throw new IllegalArgumentException(
|
||||
"Unsupported parent nested rule: "
|
||||
+ parentRule.getClass().getCanonicalName());
|
||||
}
|
||||
|
||||
Rule rule = createNestedRule(policy,
|
||||
create,
|
||||
ruleHandler,
|
||||
parentCreate,
|
||||
queueName,
|
||||
csConf);
|
||||
rules.add(rule);
|
||||
}
|
||||
|
||||
private Rule createRule(Policy policy, boolean create,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler) {
|
||||
Rule rule = new Rule();
|
||||
rule.setPolicy(policy);
|
||||
rule.setCreate(create);
|
||||
rule.setMatches(MATCH_ALL_USER);
|
||||
rule.setFallbackResult(SKIP_RESULT);
|
||||
rule.setType(Type.USER);
|
||||
|
||||
if (create) {
|
||||
// display warning that these queues must exist and
|
||||
// cannot be created automatically under "root"
|
||||
if (policy == Policy.PRIMARY_GROUP
|
||||
|| policy == Policy.PRIMARY_GROUP_USER) {
|
||||
ruleHandler.handleRuleAutoCreateFlag("root.<primaryGroup>");
|
||||
} else if (policy == Policy.SECONDARY_GROUP
|
||||
|| policy == Policy.SECONDARY_GROUP_USER) {
|
||||
// in theory, root.<secondaryGroup> must always exist, even in FS,
|
||||
// but display the warning anyway
|
||||
ruleHandler.handleRuleAutoCreateFlag("root.<secondaryGroup>");
|
||||
}
|
||||
}
|
||||
|
||||
return rule;
|
||||
}
|
||||
|
||||
private Rule createNestedRule(Policy policy,
|
||||
boolean create,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler,
|
||||
boolean fsParentCreate,
|
||||
String parentQueue,
|
||||
CapacitySchedulerConfiguration csConf) {
|
||||
|
||||
Rule rule = createRule(policy, create, ruleHandler);
|
||||
|
||||
if (parentQueue != null) {
|
||||
rule.setParentQueue(parentQueue);
|
||||
}
|
||||
|
||||
// create flag for the parent rule is not supported
|
||||
if (fsParentCreate) {
|
||||
if (policy == Policy.PRIMARY_GROUP_USER) {
|
||||
ruleHandler.handleFSParentCreateFlag("root.<primaryGroup>");
|
||||
} else if (policy == Policy.SECONDARY_GROUP_USER) {
|
||||
ruleHandler.handleFSParentCreateFlag("root.<secondaryGroup>");
|
||||
} else {
|
||||
ruleHandler.handleFSParentCreateFlag(parentQueue);
|
||||
}
|
||||
}
|
||||
|
||||
// check if parent conflicts with existing static queues
|
||||
if (create && policy == Policy.USER) {
|
||||
ruleHandler.handleRuleAutoCreateFlag(parentQueue);
|
||||
checkStaticDynamicConflict(parentQueue, csConf, ruleHandler);
|
||||
}
|
||||
|
||||
return rule;
|
||||
}
|
||||
|
||||
private void checkStaticDynamicConflict(String parentPath,
|
||||
CapacitySchedulerConfiguration csConf,
|
||||
FSConfigToCSConfigRuleHandler ruleHandler) {
|
||||
String[] childQueues = csConf.getQueues(parentPath);
|
||||
|
||||
// User must be warned: static + dynamic queues are under the
|
||||
// same parent
|
||||
if (childQueues != null && childQueues.length > 0) {
|
||||
ruleHandler.handleChildStaticDynamicConflict(parentPath);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,7 @@
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.ByteArrayOutputStream;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.util.Map;
|
||||
@ -47,6 +48,7 @@
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
import org.junit.After;
|
||||
@ -59,6 +61,8 @@
|
||||
import org.mockito.Mockito;
|
||||
import org.mockito.junit.MockitoJUnitRunner;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
|
||||
|
||||
/**
|
||||
* Unit tests for FSConfigToCSConfigConverter.
|
||||
@ -125,6 +129,7 @@ public void setup() throws IOException {
|
||||
config = new Configuration(false);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE, FAIR_SCHEDULER_XML);
|
||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||
config.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, true);
|
||||
createConverter();
|
||||
converterTestCommons = new FSConfigConverterTestCommons();
|
||||
converterTestCommons.setUp();
|
||||
@ -139,6 +144,7 @@ private void createConverter() {
|
||||
converter = new FSConfigToCSConfigConverter(ruleHandler,
|
||||
createDefaultConversionOptions());
|
||||
converter.setClusterResource(CLUSTER_RESOURCE);
|
||||
converter.setConvertPlacementRules(false);
|
||||
}
|
||||
|
||||
private FSConfigToCSConfigConverterParams.Builder
|
||||
@ -456,6 +462,7 @@ public void testConvertFSConfigurationUndefinedYarnSiteConfig()
|
||||
public void testConvertCheckOutputDir() throws Exception {
|
||||
FSConfigToCSConfigConverterParams params = createDefaultParamsBuilder()
|
||||
.withClusterResource(CLUSTER_RESOURCE_STRING)
|
||||
.withConvertPlacementRules(true)
|
||||
.build();
|
||||
|
||||
converter.convert(params);
|
||||
@ -473,6 +480,11 @@ public void testConvertCheckOutputDir() throws Exception {
|
||||
"yarn-site.xml");
|
||||
assertTrue("Yarn site exists", yarnSiteFile.exists());
|
||||
assertTrue("Yarn site length > 0", yarnSiteFile.length() > 0);
|
||||
|
||||
File mappingRulesFile = new File(FSConfigConverterTestCommons.OUTPUT_DIR,
|
||||
"mapping-rules.json");
|
||||
assertTrue("Mapping rules file exists", mappingRulesFile.exists());
|
||||
assertTrue("Mapping rules file length > 0", mappingRulesFile.length() > 0);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -572,123 +584,52 @@ public void testConversionWhenMixedPolicyIsUsed() throws Exception {
|
||||
CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS, null));
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:linelength")
|
||||
public void testUserAsDefaultQueueWithPlacementRules() throws Exception {
|
||||
config = new Configuration(false);
|
||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FAIR_SCHEDULER_XML);
|
||||
|
||||
converter.convert(config);
|
||||
|
||||
Configuration convertedConf = converter.getCapacitySchedulerConfig();
|
||||
|
||||
String expectedMappingRules =
|
||||
"u:%user:root.admins.devs.%user,u:%user:root.users.%user,u:%user:root.default";
|
||||
String mappingRules =
|
||||
convertedConf.get(CapacitySchedulerConfiguration.QUEUE_MAPPING);
|
||||
assertEquals("Mapping rules", expectedMappingRules, mappingRules);
|
||||
@Test
|
||||
public void testUserAsDefaultQueueWithPlacementRules()
|
||||
throws Exception {
|
||||
testUserAsDefaultQueueAndPlacementRules(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserAsDefaultQueueTrueWithoutPlacementRules()
|
||||
public void testUserAsDefaultQueueWithoutPlacementRules()
|
||||
throws Exception {
|
||||
testUserAsDefaultQueueWithoutPlacementRules(true);
|
||||
testUserAsDefaultQueueAndPlacementRules(false);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testUserAsDefaultQueueFalseWithoutPlacementRules()
|
||||
throws Exception {
|
||||
testUserAsDefaultQueueWithoutPlacementRules(false);
|
||||
}
|
||||
|
||||
private void testUserAsDefaultQueueWithoutPlacementRules(boolean
|
||||
userAsDefaultQueue) throws Exception {
|
||||
private void testUserAsDefaultQueueAndPlacementRules(
|
||||
boolean hasPlacementRules) throws Exception {
|
||||
config = new Configuration(false);
|
||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FS_NO_PLACEMENT_RULES_XML);
|
||||
config.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
|
||||
userAsDefaultQueue);
|
||||
|
||||
converter.setConvertPlacementRules(true);
|
||||
converter.convert(config);
|
||||
|
||||
Configuration convertedConf = converter.getCapacitySchedulerConfig();
|
||||
String mappingRules =
|
||||
convertedConf.get(CapacitySchedulerConfiguration.QUEUE_MAPPING);
|
||||
|
||||
if (userAsDefaultQueue) {
|
||||
assertEquals("Mapping rules", "u:%user:%user", mappingRules);
|
||||
if (hasPlacementRules) {
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FAIR_SCHEDULER_XML);
|
||||
} else {
|
||||
assertEquals("Mapping rules", "u:%user:root.default", mappingRules);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FS_NO_PLACEMENT_RULES_XML);
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateChildQueuesWithPlacementRules() throws Exception {
|
||||
config = new Configuration(false);
|
||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FAIR_SCHEDULER_XML);
|
||||
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
|
||||
config.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE,
|
||||
true);
|
||||
|
||||
ByteArrayOutputStream jsonOutStream = new ByteArrayOutputStream();
|
||||
converter.setConvertPlacementRules(true);
|
||||
converter.setMappingRulesOutputStream(jsonOutStream);
|
||||
converter.setConsoleMode(true);
|
||||
converter.convert(config);
|
||||
|
||||
Configuration convertedConf = converter.getCapacitySchedulerConfig();
|
||||
String property =
|
||||
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
|
||||
assertNull("Auto-create queue shouldn't be set",
|
||||
convertedConf.get(property));
|
||||
}
|
||||
MappingRulesDescription description =
|
||||
new ObjectMapper()
|
||||
.reader()
|
||||
.forType(MappingRulesDescription.class)
|
||||
.readValue(jsonOutStream.toByteArray());
|
||||
|
||||
@Test
|
||||
public void testAutoCreateChildQueuesTrueWithoutPlacementRules()
|
||||
throws Exception {
|
||||
testAutoCreateChildQueuesWithoutPlacementRules(true);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAutoCreateChildQueuesFalseWithoutPlacementRules()
|
||||
throws Exception {
|
||||
testAutoCreateChildQueuesWithoutPlacementRules(false);
|
||||
}
|
||||
|
||||
@SuppressWarnings("checkstyle:linelength")
|
||||
private void testAutoCreateChildQueuesWithoutPlacementRules(
|
||||
boolean allowUndeclaredPools) throws Exception {
|
||||
config = new Configuration(false);
|
||||
config.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||
config.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||
FS_NO_PLACEMENT_RULES_XML);
|
||||
config.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS,
|
||||
allowUndeclaredPools);
|
||||
|
||||
converter.convert(config);
|
||||
|
||||
Configuration convertedConf = converter.getCapacitySchedulerConfig();
|
||||
String rootUserAutoCreate =
|
||||
"yarn.scheduler.capacity.root.users.auto-create-child-queue.enabled";
|
||||
String rootAutoCreate =
|
||||
"yarn.scheduler.capacity.root.auto-create-child-queue.enabled";
|
||||
String leafQueueAutoCreate =
|
||||
"yarn.scheduler.capacity.root.users.joe.auto-create-child-queue.enabled";
|
||||
|
||||
if (allowUndeclaredPools) {
|
||||
assertEquals("Auto-create queue wasn't enabled for root.users", true,
|
||||
convertedConf.getBoolean(rootUserAutoCreate, false));
|
||||
assertNull("Auto-create queue shouldn't be set for root",
|
||||
convertedConf.get(rootAutoCreate));
|
||||
assertNull("Auto-create queue shouldn't be set for leaf",
|
||||
convertedConf.get(leafQueueAutoCreate));
|
||||
if (hasPlacementRules) {
|
||||
// fs.xml defines 5 rules
|
||||
assertEquals("Number of rules", 5, description.getRules().size());
|
||||
} else {
|
||||
assertNull("Auto-create queue shouldn't be set for root.users",
|
||||
convertedConf.get(rootUserAutoCreate));
|
||||
assertNull("Auto-create queue shouldn't be set for root",
|
||||
convertedConf.get(rootAutoCreate));
|
||||
assertNull("Auto-create queue shouldn't be set for leaf",
|
||||
convertedConf.get(leafQueueAutoCreate));
|
||||
// by default, FS internally creates 2 rules
|
||||
assertEquals("Number of rules", 2, description.getRules().size());
|
||||
}
|
||||
}
|
||||
|
||||
@ -720,7 +661,7 @@ public void testPlacementRulesConversionEnabled() throws Exception {
|
||||
verify(placementConverter).convertPlacementPolicy(
|
||||
any(PlacementManager.class),
|
||||
any(FSConfigToCSConfigRuleHandler.class),
|
||||
any(Boolean.class));
|
||||
any(CapacitySchedulerConfiguration.class));
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -93,6 +93,7 @@ public void testConvertFSConfigurationWithConsoleParam()
|
||||
|
||||
FSConfigToCSConfigConverterMain.main(new String[] {
|
||||
"-p",
|
||||
"-m",
|
||||
"-y", YARN_SITE_XML,
|
||||
"-f", FS_ALLOC_FILE,
|
||||
"-r", CONVERSION_RULES_FILE});
|
||||
@ -102,6 +103,8 @@ public void testConvertFSConfigurationWithConsoleParam()
|
||||
stdout.contains("======= yarn-site.xml ======="));
|
||||
assertTrue("Stdout doesn't contain capacity-scheduler.xml",
|
||||
stdout.contains("======= capacity-scheduler.xml ======="));
|
||||
assertTrue("Stdout doesn't contain mapping-rules.json",
|
||||
stdout.contains("======= mapping-rules.json ======="));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -130,6 +133,7 @@ public void testConvertFSConfigurationWithLongSwitches()
|
||||
|
||||
FSConfigToCSConfigConverterMain.main(new String[] {
|
||||
"--print",
|
||||
"--convert-placement-rules",
|
||||
"--yarnsiteconfig", YARN_SITE_XML,
|
||||
"--fsconfig", FS_ALLOC_FILE,
|
||||
"--rulesconfig", CONVERSION_RULES_FILE});
|
||||
@ -139,6 +143,8 @@ public void testConvertFSConfigurationWithLongSwitches()
|
||||
stdout.contains("======= yarn-site.xml ======="));
|
||||
assertTrue("Stdout doesn't contain capacity-scheduler.xml",
|
||||
stdout.contains("======= capacity-scheduler.xml ======="));
|
||||
assertTrue("Stdout doesn't contain mapping-rules.json",
|
||||
stdout.contains("======= mapping-rules.json ======="));
|
||||
}
|
||||
|
||||
private void verifyHelpText() {
|
||||
|
@ -73,9 +73,7 @@ public void testInitPropertyActionsToWarning() throws IOException {
|
||||
ruleHandler.handleMaxChildCapacity();
|
||||
ruleHandler.handleMinResources();
|
||||
ruleHandler.handleMaxResources();
|
||||
ruleHandler.handleQueueAutoCreate("test");
|
||||
ruleHandler.handleReservationSystem();
|
||||
ruleHandler.handleSpecifiedNotFirstRule();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -101,9 +99,7 @@ public void testAllRulesWarning() throws IOException {
|
||||
ruleHandler.handleMaxChildCapacity();
|
||||
ruleHandler.handleMinResources();
|
||||
ruleHandler.handleMaxResources();
|
||||
ruleHandler.handleQueueAutoCreate("test");
|
||||
ruleHandler.handleReservationSystem();
|
||||
ruleHandler.handleSpecifiedNotFirstRule();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -133,9 +129,7 @@ public void testAllRulesAbort() throws IOException {
|
||||
expectAbort(() -> ruleHandler.handleMaxChildCapacity());
|
||||
expectAbort(() -> ruleHandler.handleMaxResources());
|
||||
expectAbort(() -> ruleHandler.handleMinResources());
|
||||
expectAbort(() -> ruleHandler.handleQueueAutoCreate("test"));
|
||||
expectAbort(() -> ruleHandler.handleReservationSystem());
|
||||
expectAbort(() -> ruleHandler.handleSpecifiedNotFirstRule());
|
||||
expectAbort(() -> ruleHandler.handleFairAsDrf("test"));
|
||||
}
|
||||
|
||||
|
@ -135,7 +135,6 @@ private void createBuilder() {
|
||||
.withCapacitySchedulerConfig(csConfig)
|
||||
.withPreemptionEnabled(false)
|
||||
.withSizeBasedWeight(false)
|
||||
.withAutoCreateChildQueues(false)
|
||||
.withClusterResource(CLUSTER_RESOURCE)
|
||||
.withQueueMaxAMShareDefault(MAX_AM_SHARE_DEFAULT)
|
||||
.withQueueMaxAppsDefault(MAX_APPS_DEFAULT)
|
||||
@ -307,37 +306,10 @@ public void testQueueMinimumCapacity() {
|
||||
verify(ruleHandler, times(2)).handleMinResources();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueAutoCreateChildQueue() {
|
||||
converter = builder
|
||||
.withCapacitySchedulerConfig(csConfig)
|
||||
.withAutoCreateChildQueues(true)
|
||||
.build();
|
||||
|
||||
converter.convertQueueHierarchy(rootQueue);
|
||||
|
||||
Set<String> parentQueues = Sets.newHashSet(
|
||||
"root.admins",
|
||||
"root.users");
|
||||
|
||||
Set<String> leafQueues = Sets.newHashSet(
|
||||
"root.default",
|
||||
"root.admins.alice",
|
||||
"root.admins.bob",
|
||||
"root.users.joe",
|
||||
"root.users.john");
|
||||
|
||||
assertTrueForQueues(parentQueues, ".auto-create-child-queue.enabled",
|
||||
csConfig);
|
||||
assertNoValueForQueues(leafQueues, ".auto-create-child-queue.enabled",
|
||||
csConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQueueWithNoAutoCreateChildQueue() {
|
||||
converter = builder
|
||||
.withCapacitySchedulerConfig(csConfig)
|
||||
.withAutoCreateChildQueues(false)
|
||||
.build();
|
||||
|
||||
converter.convertQueueHierarchy(rootQueue);
|
||||
|
@ -15,19 +15,17 @@
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.ENABLE_QUEUE_MAPPING_OVERRIDE;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.QUEUE_MAPPING;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
import static org.junit.Assert.assertNotNull;
|
||||
import static org.mockito.ArgumentMatchers.eq;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.verify;
|
||||
import static org.mockito.Mockito.verifyNoMoreInteractions;
|
||||
import static org.mockito.Mockito.verifyZeroInteractions;
|
||||
import static org.mockito.Mockito.when;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
@ -40,6 +38,12 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.SecondaryGroupExistingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.SpecifiedPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.MappingRulesDescription;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.FallbackResult;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Policy;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.schema.Rule.Type;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
import org.junit.runner.RunWith;
|
||||
@ -54,6 +58,8 @@
|
||||
*/
|
||||
@RunWith(MockitoJUnitRunner.class)
|
||||
public class TestQueuePlacementConverter {
|
||||
private static final String DEFAULT_QUEUE = "root.default";
|
||||
|
||||
@Mock
|
||||
private PlacementManager placementManager;
|
||||
|
||||
@ -62,113 +68,107 @@ public class TestQueuePlacementConverter {
|
||||
|
||||
private QueuePlacementConverter converter;
|
||||
|
||||
private CapacitySchedulerConfiguration csConf;
|
||||
|
||||
@Before
|
||||
public void setup() {
|
||||
this.converter = new QueuePlacementConverter();
|
||||
this.csConf = new CapacitySchedulerConfiguration(
|
||||
new Configuration(false));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertUserAsDefaultQueue() {
|
||||
Map<String, String> properties = convert(true);
|
||||
public void testConvertUserRule() {
|
||||
PlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
verifyMapping(properties, "u:%user:%user");
|
||||
MappingRulesDescription description = convert();
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.USER);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertUserPlacementRuleWithoutUserAsDefaultQueue() {
|
||||
testConvertUserPlacementRule(false);
|
||||
}
|
||||
public void testConvertSpecifiedRule() {
|
||||
PlacementRule fsRule = mock(SpecifiedPlacementRule.class);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
@Test
|
||||
public void testConvertUserPlacementRuleWithUserAsDefaultQueue() {
|
||||
testConvertUserPlacementRule(true);
|
||||
}
|
||||
|
||||
private void testConvertUserPlacementRule(boolean userAsDefaultQueue) {
|
||||
PlacementRule rule = mock(UserPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
Map<String, String> properties = convert(userAsDefaultQueue);
|
||||
|
||||
verifyMapping(properties, "u:%user:%user");
|
||||
MappingRulesDescription description = convert();
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.SPECIFIED);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertSpecifiedPlacementRule() {
|
||||
PlacementRule rule = mock(SpecifiedPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
public void testConvertPrimaryGroupRule() {
|
||||
PlacementRule fsRule = mock(PrimaryGroupPlacementRule.class);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMappingNoOverride(properties, 1);
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.PRIMARY_GROUP);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertSpecifiedPlacementRuleAtSecondPlace() {
|
||||
PlacementRule rule = mock(UserPlacementRule.class);
|
||||
PlacementRule rule2 = mock(SpecifiedPlacementRule.class);
|
||||
initPlacementManagerMock(rule, rule2);
|
||||
public void testConvertSecondaryGroupRule() {
|
||||
PlacementRule fsRule = mock(SecondaryGroupExistingPlacementRule.class);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMappingNoOverride(properties, 2);
|
||||
verify(ruleHandler).handleSpecifiedNotFirstRule();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertPrimaryGroupPlacementRule() {
|
||||
PlacementRule rule = mock(PrimaryGroupPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
|
||||
verifyMapping(properties, "u:%user:%primary_group");
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.SECONDARY_GROUP);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertSecondaryGroupPlacementRule() {
|
||||
PlacementRule rule = mock(SecondaryGroupExistingPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
public void testConvertDefaultRuleWithQueueName() {
|
||||
DefaultPlacementRule fsRule = mock(DefaultPlacementRule.class);
|
||||
fsRule.defaultQueueName = "abc";
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties, "u:%user:%secondary_group");
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
|
||||
verifyRule(description.getRules().get(0), Policy.CUSTOM);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertDefaultPlacementRule() {
|
||||
DefaultPlacementRule rule = mock(DefaultPlacementRule.class);
|
||||
rule.defaultQueueName = "abc";
|
||||
initPlacementManagerMock(rule);
|
||||
public void testConvertDefaultRule() {
|
||||
DefaultPlacementRule fsRule = mock(DefaultPlacementRule.class);
|
||||
fsRule.defaultQueueName = DEFAULT_QUEUE;
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties, "u:%user:abc");
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.DEFAULT_QUEUE);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testConvertUnsupportedPlacementRule() {
|
||||
public void testConvertUnsupportedRule() {
|
||||
PlacementRule rule = mock(TestPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
// throws exception
|
||||
convert(false);
|
||||
convert();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertRejectPlacementRule() {
|
||||
public void testConvertRejectRule() {
|
||||
PlacementRule rule = mock(RejectPlacementRule.class);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
assertEquals("Map is not empty", 0, properties.size());
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.REJECT);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -178,11 +178,11 @@ public void testConvertNestedPrimaryGroupRule() {
|
||||
when(rule.getParentRule()).thenReturn(parent);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties, "u:%user:%primary_group.%user");
|
||||
verify(ruleHandler).handleDynamicMappedQueue(
|
||||
eq("u:%user:%primary_group.%user"), eq(false));
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.PRIMARY_GROUP_USER);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -193,27 +193,41 @@ public void testConvertNestedSecondaryGroupRule() {
|
||||
when(rule.getParentRule()).thenReturn(parent);
|
||||
initPlacementManagerMock(rule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties, "u:%user:%secondary_group.%user");
|
||||
verify(ruleHandler).handleDynamicMappedQueue(
|
||||
eq("u:%user:%secondary_group.%user"), eq(false));
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.SECONDARY_GROUP_USER);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedDefaultRule() {
|
||||
UserPlacementRule rule = mock(UserPlacementRule.class);
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
DefaultPlacementRule parent =
|
||||
mock(DefaultPlacementRule.class);
|
||||
parent.defaultQueueName = "abc";
|
||||
when(rule.getParentRule()).thenReturn(parent);
|
||||
initPlacementManagerMock(rule);
|
||||
parent.defaultQueueName = "root.abc";
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties, "u:%user:abc.%user");
|
||||
verify(ruleHandler).handleDynamicMappedQueue(
|
||||
eq("u:%user:abc.%user"), eq(false));
|
||||
assertEquals("Number of rules", 1, description.getRules().size());
|
||||
Rule rule = description.getRules().get(0);
|
||||
verifyRule(description.getRules().get(0), Policy.USER);
|
||||
assertEquals("Parent path", "root.abc", rule.getParentQueue());
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test(expected = IllegalArgumentException.class)
|
||||
public void testUnsupportedNestedParentRule() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
TestPlacementRule parent =
|
||||
mock(TestPlacementRule.class);
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
// throws exception
|
||||
convert();
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -225,38 +239,171 @@ public void testConvertMultiplePlacementRules() {
|
||||
mock(SecondaryGroupExistingPlacementRule.class);
|
||||
initPlacementManagerMock(rule1, rule2, rule3);
|
||||
|
||||
Map<String, String> properties = convert(false);
|
||||
MappingRulesDescription description = convert();
|
||||
|
||||
verifyMapping(properties,
|
||||
"u:%user:%user,u:%user:%primary_group,u:%user:%secondary_group");
|
||||
assertEquals("Number of rules", 3, description.getRules().size());
|
||||
verifyRule(description.getRules().get(0), Policy.USER);
|
||||
verifyRule(description.getRules().get(1), Policy.PRIMARY_GROUP);
|
||||
verifyRule(description.getRules().get(2), Policy.SECONDARY_GROUP);
|
||||
verifyZeroInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertPrimaryGroupRuleWithCreate() {
|
||||
FSPlacementRule fsRule = mock(PrimaryGroupPlacementRule.class);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.<primaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertSecondaryGroupRuleWithCreate() {
|
||||
FSPlacementRule fsRule = mock(SecondaryGroupExistingPlacementRule.class);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.<secondaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedPrimaryGroupRuleWithCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
PrimaryGroupPlacementRule parent = mock(PrimaryGroupPlacementRule.class);
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.<primaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedSecondaryGroupRuleWithCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
SecondaryGroupExistingPlacementRule parent =
|
||||
mock(SecondaryGroupExistingPlacementRule.class);
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.<secondaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedDefaultGroupWithCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
DefaultPlacementRule parent =
|
||||
mock(DefaultPlacementRule.class);
|
||||
parent.defaultQueueName = "root.abc";
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.abc"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedPrimaryGroupRuleWithParentCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
PrimaryGroupPlacementRule parent = mock(PrimaryGroupPlacementRule.class);
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(parent.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleFSParentCreateFlag(eq("root.<primaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedSecondaryGroupRuleWithParentCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
SecondaryGroupExistingPlacementRule parent =
|
||||
mock(SecondaryGroupExistingPlacementRule.class);
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(parent.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleFSParentCreateFlag(eq("root.<secondaryGroup>"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedDefaultGroupWithParentCreate() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
DefaultPlacementRule parent =
|
||||
mock(DefaultPlacementRule.class);
|
||||
parent.defaultQueueName = "root.abc";
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(parent.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleFSParentCreateFlag(eq("root.abc"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testConvertNestedDefaultWithConflictingQueues() {
|
||||
UserPlacementRule fsRule = mock(UserPlacementRule.class);
|
||||
DefaultPlacementRule parent =
|
||||
mock(DefaultPlacementRule.class);
|
||||
parent.defaultQueueName = "root.users";
|
||||
when(fsRule.getParentRule()).thenReturn(parent);
|
||||
when(fsRule.getCreateFlag()).thenReturn(true);
|
||||
initPlacementManagerMock(fsRule);
|
||||
csConf.setQueues("root.users", new String[] {"hadoop"});
|
||||
|
||||
convert();
|
||||
|
||||
verify(ruleHandler).handleRuleAutoCreateFlag(eq("root.users"));
|
||||
verify(ruleHandler).handleChildStaticDynamicConflict(eq("root.users"));
|
||||
verifyNoMoreInteractions(ruleHandler);
|
||||
}
|
||||
|
||||
private void initPlacementManagerMock(
|
||||
PlacementRule... rules) {
|
||||
List<PlacementRule> listOfRules = Lists.newArrayList(rules);
|
||||
when(placementManager.getPlacementRules()).thenReturn(listOfRules);
|
||||
}
|
||||
|
||||
private Map<String, String> convert(boolean userAsDefaultQueue) {
|
||||
return converter.convertPlacementPolicy(placementManager, ruleHandler,
|
||||
userAsDefaultQueue);
|
||||
private MappingRulesDescription convert() {
|
||||
return converter.convertPlacementPolicy(placementManager,
|
||||
ruleHandler, csConf);
|
||||
}
|
||||
|
||||
private void verifyMapping(Map<String, String> properties,
|
||||
String expectedValue) {
|
||||
assertEquals("Map size", 1, properties.size());
|
||||
String value = properties.get(QUEUE_MAPPING);
|
||||
assertNotNull("No mapping property found", value);
|
||||
assertEquals("Mapping", expectedValue, value);
|
||||
private void verifyRule(Rule rule, Policy expectedPolicy) {
|
||||
assertEquals("Policy type", expectedPolicy, rule.getPolicy());
|
||||
assertEquals("Match string", "*", rule.getMatches());
|
||||
assertEquals("Fallback result",
|
||||
FallbackResult.SKIP, rule.getFallbackResult());
|
||||
assertEquals("Type", Type.USER, rule.getType());
|
||||
}
|
||||
|
||||
private void verifyMappingNoOverride(Map<String, String> properties,
|
||||
int expectedSize) {
|
||||
assertEquals("Map size", expectedSize, properties.size());
|
||||
String value = properties.get(ENABLE_QUEUE_MAPPING_OVERRIDE);
|
||||
assertNotNull("No mapping property found", value);
|
||||
assertEquals("Override mapping", "false", value);
|
||||
private void verifySetDefaultRule(Rule rule, String expectedQueue) {
|
||||
assertEquals("Policy type", Policy.SET_DEFAULT_QUEUE, rule.getPolicy());
|
||||
assertEquals("Queue", expectedQueue, rule.getValue());
|
||||
assertEquals("Fallback result",
|
||||
FallbackResult.SKIP, rule.getFallbackResult());
|
||||
}
|
||||
|
||||
private class TestPlacementRule extends FSPlacementRule {
|
||||
|
@ -88,6 +88,9 @@
|
||||
<rule name="nestedUserQueue" create="false">
|
||||
<rule name="default" create="false" queue="admins.devs"/>
|
||||
</rule>
|
||||
<rule name="nestedUserQueue" create="false">
|
||||
<rule name="primaryGroup" create="false" queue="users.devs"/>
|
||||
</rule>
|
||||
<rule name="specified" create="true"/>
|
||||
<rule name="nestedUserQueue" create="true">
|
||||
<rule name="default" create="false" queue="users"/>
|
||||
|
Loading…
Reference in New Issue
Block a user