YARN-10376. Create a class that covers the functionality of UserGroupMappingPlacementRule and AppNameMappingPlacementRule using the new mapping rules. Contributed by Gergely Pollak.
This commit is contained in:
parent
8a3952436c
commit
1841a5bb03
@ -1087,6 +1087,7 @@ public static boolean isAclEnabled(Configuration conf) {
|
|||||||
|
|
||||||
/** Default queue name */
|
/** Default queue name */
|
||||||
public static final String DEFAULT_QUEUE_NAME = "default";
|
public static final String DEFAULT_QUEUE_NAME = "default";
|
||||||
|
public static final String DEFAULT_QUEUE_FULL_NAME = "root.default";
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Buckets (in minutes) for the number of apps running in each queue.
|
* Buckets (in minutes) for the number of apps running in each queue.
|
||||||
|
@ -0,0 +1,413 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
import com.google.common.base.Preconditions;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.*;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.Iterator;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* This class is responsible for making application submissions to queue
|
||||||
|
* assignments, based on the configured ruleset. This class supports all
|
||||||
|
* features supported by UserGroupMappingPlacementRule and
|
||||||
|
* AppNameMappingPlacementRule classes, also adding some features which are
|
||||||
|
* present in fair scheduler queue placement. This helps to reduce the gap
|
||||||
|
* between the two schedulers.
|
||||||
|
*/
|
||||||
|
public class CSMappingPlacementRule extends PlacementRule {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(CSMappingPlacementRule.class);
|
||||||
|
|
||||||
|
private CapacitySchedulerQueueManager queueManager;
|
||||||
|
private List<MappingRule> mappingRules;
|
||||||
|
|
||||||
|
/**
|
||||||
|
* These are the variables we associate a special meaning, these should be
|
||||||
|
* immutable for each variable context.
|
||||||
|
*/
|
||||||
|
private ImmutableSet<String> immutableVariables = ImmutableSet.of(
|
||||||
|
"%user",
|
||||||
|
"%primary_group",
|
||||||
|
"%secondary_group",
|
||||||
|
"%application",
|
||||||
|
"%specified"
|
||||||
|
);
|
||||||
|
|
||||||
|
private Groups groups;
|
||||||
|
private boolean overrideWithQueueMappings;
|
||||||
|
private boolean failOnConfigError = true;
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setGroups(Groups groups) {
|
||||||
|
this.groups = groups;
|
||||||
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
void setFailOnConfigError(boolean failOnConfigError) {
|
||||||
|
this.failOnConfigError = failOnConfigError;
|
||||||
|
}
|
||||||
|
|
||||||
|
private MappingRuleValidationContext buildValidationContext()
|
||||||
|
throws IOException {
|
||||||
|
Preconditions.checkNotNull(queueManager, "Queue manager must be " +
|
||||||
|
"initialized before building validation a context!");
|
||||||
|
|
||||||
|
MappingRuleValidationContext validationContext =
|
||||||
|
new MappingRuleValidationContextImpl(queueManager);
|
||||||
|
|
||||||
|
//Adding all immutable variables to the known variable list
|
||||||
|
for (String var : immutableVariables) {
|
||||||
|
try {
|
||||||
|
validationContext.addImmutableVariable(var);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Error initializing placement variables, unable to register" +
|
||||||
|
" '{}': {}", var, e.getMessage());
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//Immutables + %default are the only officially supported variables,
|
||||||
|
//We initialize the context with these, and let the rules to extend the list
|
||||||
|
try {
|
||||||
|
validationContext.addVariable("%default");
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Error initializing placement variables, unable to register" +
|
||||||
|
" '%default': " + e.getMessage());
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
return validationContext;
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||||
|
if (!(scheduler instanceof CapacityScheduler)) {
|
||||||
|
throw new IOException(
|
||||||
|
"CSMappingPlacementRule can be only used with CapacityScheduler");
|
||||||
|
}
|
||||||
|
LOG.info("Initializing {} queue mapping manager.",
|
||||||
|
getClass().getSimpleName());
|
||||||
|
|
||||||
|
CapacitySchedulerContext csContext = (CapacitySchedulerContext) scheduler;
|
||||||
|
queueManager = csContext.getCapacitySchedulerQueueManager();
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||||
|
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
||||||
|
|
||||||
|
if (groups == null) {
|
||||||
|
groups = Groups.getUserToGroupsMappingService(conf);
|
||||||
|
}
|
||||||
|
|
||||||
|
MappingRuleValidationContext validationContext = buildValidationContext();
|
||||||
|
|
||||||
|
//Getting and validating mapping rules
|
||||||
|
mappingRules = conf.getMappingRules();
|
||||||
|
for (MappingRule rule : mappingRules) {
|
||||||
|
try {
|
||||||
|
rule.validate(validationContext);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Error initializing queue mappings, rule '{}' " +
|
||||||
|
"has encountered a validation error: {}", rule, e.getMessage());
|
||||||
|
if (failOnConfigError) {
|
||||||
|
throw new IOException(e);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
LOG.info("Initialized queue mappings, can override user specified " +
|
||||||
|
"queues: {} number of rules: {}", overrideWithQueueMappings,
|
||||||
|
mappingRules.size());
|
||||||
|
|
||||||
|
if (LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("Initialized with the following mapping rules:");
|
||||||
|
mappingRules.forEach(rule -> LOG.debug(rule.toString()));
|
||||||
|
}
|
||||||
|
|
||||||
|
return mappingRules.size() > 0;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String getPrimaryGroup(String user) throws IOException {
|
||||||
|
return groups.getGroupsSet(user).iterator().next();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Traverse all secondary groups (as there could be more than one
|
||||||
|
* and position is not guaranteed) and ensure there is queue with
|
||||||
|
* the same name.
|
||||||
|
* @param user Name of the user
|
||||||
|
* @return Name of the secondary group if found, null otherwise
|
||||||
|
* @throws IOException
|
||||||
|
*/
|
||||||
|
private String getSecondaryGroup(String user) throws IOException {
|
||||||
|
Set<String> groupsSet = groups.getGroupsSet(user);
|
||||||
|
String secondaryGroup = null;
|
||||||
|
Iterator<String> it = groupsSet.iterator();
|
||||||
|
it.next();
|
||||||
|
while (it.hasNext()) {
|
||||||
|
String group = it.next();
|
||||||
|
if (this.queueManager.getQueue(group) != null) {
|
||||||
|
secondaryGroup = group;
|
||||||
|
break;
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
||||||
|
LOG.debug("User {} is not associated with any Secondary " +
|
||||||
|
"Group. Hence it may use the 'default' queue", user);
|
||||||
|
}
|
||||||
|
return secondaryGroup;
|
||||||
|
}
|
||||||
|
|
||||||
|
private VariableContext createVariableContext(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws IOException {
|
||||||
|
VariableContext vctx = new VariableContext();
|
||||||
|
|
||||||
|
vctx.put("%user", user);
|
||||||
|
vctx.put("%specified", asc.getQueue());
|
||||||
|
vctx.put("%application", asc.getApplicationName());
|
||||||
|
vctx.put("%primary_group", getPrimaryGroup(user));
|
||||||
|
vctx.put("%secondary_group", getSecondaryGroup(user));
|
||||||
|
vctx.put("%default", "root.default");
|
||||||
|
|
||||||
|
vctx.setImmutables(immutableVariables);
|
||||||
|
return vctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String validateAndNormalizeQueue(String queueName)
|
||||||
|
throws YarnException {
|
||||||
|
MappingQueuePath path = new MappingQueuePath(queueName);
|
||||||
|
String leaf = path.getLeafName();
|
||||||
|
String parent = path.getParent();
|
||||||
|
|
||||||
|
String normalizedName;
|
||||||
|
if (parent != null) {
|
||||||
|
normalizedName = validateAndNormalizeQueueWithParent(parent, leaf);
|
||||||
|
} else {
|
||||||
|
normalizedName = validateAndNormalizeQueueWithNoParent(leaf);
|
||||||
|
}
|
||||||
|
|
||||||
|
CSQueue queue = queueManager.getQueueByFullName(normalizedName);
|
||||||
|
if (queue != null && !(queue instanceof LeafQueue)) {
|
||||||
|
throw new YarnException("Mapping rule returned a non-leaf queue '" +
|
||||||
|
normalizedName + "', cannot place application in it.");
|
||||||
|
}
|
||||||
|
|
||||||
|
return normalizedName;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String validateAndNormalizeQueueWithParent(String parent, String leaf)
|
||||||
|
throws YarnException {
|
||||||
|
CSQueue parentQueue = queueManager.getQueue(parent);
|
||||||
|
//we don't find the specified parent, so the placement rule is invalid
|
||||||
|
//for this case
|
||||||
|
if (parentQueue == null) {
|
||||||
|
if (queueManager.isAmbiguous(parent)) {
|
||||||
|
throw new YarnException("Mapping rule specified a parent queue '" +
|
||||||
|
parent + "', but it is ambiguous.");
|
||||||
|
} else {
|
||||||
|
throw new YarnException("Mapping rule specified a parent queue '" +
|
||||||
|
parent + "', but it does not exist.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//normalizing parent path
|
||||||
|
String parentPath = parentQueue.getQueuePath();
|
||||||
|
String fullPath = parentPath + DOT + leaf;
|
||||||
|
|
||||||
|
//if we have a parent which is not a managed parent, we check if the leaf
|
||||||
|
//queue exists under this parent
|
||||||
|
if (!(parentQueue instanceof ManagedParentQueue)) {
|
||||||
|
CSQueue queue = queueManager.getQueue(fullPath);
|
||||||
|
//if the queue doesn't exit we return null
|
||||||
|
if (queue == null) {
|
||||||
|
throw new YarnException("Mapping rule specified a parent queue '" +
|
||||||
|
parent + "', but it is not a managed parent queue, " +
|
||||||
|
"and no queue exists with name '" + leaf + "' under it.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
//at this point we either have a managed parent or the queue actually
|
||||||
|
//exists so we have a placement context, returning it
|
||||||
|
return fullPath;
|
||||||
|
}
|
||||||
|
|
||||||
|
private String validateAndNormalizeQueueWithNoParent(String leaf)
|
||||||
|
throws YarnException {
|
||||||
|
//in this case we don't have a parent specified so we expect the queue to
|
||||||
|
//exist, otherwise the mapping will not be valid for this case
|
||||||
|
CSQueue queue = queueManager.getQueue(leaf);
|
||||||
|
if (queue == null) {
|
||||||
|
if (queueManager.isAmbiguous(leaf)) {
|
||||||
|
throw new YarnException("Queue '" + leaf + "' specified in mapping" +
|
||||||
|
" rule is ambiguous");
|
||||||
|
} else {
|
||||||
|
throw new YarnException("Queue '" + leaf + "' specified in mapping" +
|
||||||
|
" rule does not exist.");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//normalizing queue path
|
||||||
|
return queue.getQueuePath();
|
||||||
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Evaluates the mapping rule using the provided variable context. For
|
||||||
|
* placement results we check if the placement is valid, and in case of
|
||||||
|
* invalid placements we use the rule's fallback settings to get the result.
|
||||||
|
* @param rule The mapping rule to be evaluated
|
||||||
|
* @param variables The variables and their respective values
|
||||||
|
* @return Evaluation result
|
||||||
|
*/
|
||||||
|
private MappingRuleResult evaluateRule(
|
||||||
|
MappingRule rule, VariableContext variables) {
|
||||||
|
MappingRuleResult result = rule.evaluate(variables);
|
||||||
|
|
||||||
|
if (result.getResult() == MappingRuleResultType.PLACE) {
|
||||||
|
try {
|
||||||
|
result.updateNormalizedQueue(
|
||||||
|
validateAndNormalizeQueue(result.getQueue()));
|
||||||
|
} catch (Exception e) {
|
||||||
|
LOG.info("Cannot place to queue '" + result.getQueue() +
|
||||||
|
"' returned by mapping rule.", e);
|
||||||
|
result = rule.getFallback();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
return result;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationPlacementContext createPlacementContext(String queueName) {
|
||||||
|
int parentQueueNameEndIndex = queueName.lastIndexOf(DOT);
|
||||||
|
if (parentQueueNameEndIndex > -1) {
|
||||||
|
String parent = queueName.substring(0, parentQueueNameEndIndex).trim();
|
||||||
|
String leaf = queueName.substring(parentQueueNameEndIndex + 1).trim();
|
||||||
|
return new ApplicationPlacementContext(leaf, parent);
|
||||||
|
}
|
||||||
|
|
||||||
|
//this statement is here only for future proofing and consistency.
|
||||||
|
//Currently there is no valid queue name which does not have a parent
|
||||||
|
//and valid for app placement. Since we normalize all paths, the only queue
|
||||||
|
//which can have no parent at this point is 'root', which is neither a
|
||||||
|
//leaf queue nor a managerParent queue. But it might become one, and
|
||||||
|
//it's better to leave the code consistent.
|
||||||
|
return new ApplicationPlacementContext(queueName);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Override
|
||||||
|
public ApplicationPlacementContext getPlacementForApp(
|
||||||
|
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||||
|
//We only use the mapping rules if overrideWithQueueMappings enabled
|
||||||
|
//or the application is submitted to the default queue, which effectively
|
||||||
|
//means the application doesn't have any specific queue.
|
||||||
|
String appQueue = asc.getQueue();
|
||||||
|
if (appQueue != null &&
|
||||||
|
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
|
||||||
|
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_FULL_NAME) &&
|
||||||
|
!overrideWithQueueMappings) {
|
||||||
|
LOG.info("Have no jurisdiction over application submission '{}', " +
|
||||||
|
"moving to next PlacementRule engine", asc.getApplicationName());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
VariableContext variables;
|
||||||
|
try {
|
||||||
|
variables = createVariableContext(asc, user);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Unable to setup variable context", e);
|
||||||
|
throw new YarnException(e);
|
||||||
|
}
|
||||||
|
|
||||||
|
for (MappingRule rule : mappingRules) {
|
||||||
|
MappingRuleResult result = evaluateRule(rule, variables);
|
||||||
|
switch (result.getResult()) {
|
||||||
|
case PLACE_TO_DEFAULT:
|
||||||
|
return placeToDefault(asc, variables, rule);
|
||||||
|
case PLACE:
|
||||||
|
return placeToQueue(asc, rule, result);
|
||||||
|
case REJECT:
|
||||||
|
LOG.info("Rejecting application '{}', reason: Mapping rule '{}' " +
|
||||||
|
" fallback action is set to REJECT.",
|
||||||
|
asc.getApplicationName(), rule);
|
||||||
|
//We intentionally omit the details, we don't want any server side
|
||||||
|
//config information to leak to the client side
|
||||||
|
throw new YarnException("Application submission have been rejected by" +
|
||||||
|
" a mapping rule. Please see the logs for details");
|
||||||
|
case SKIP:
|
||||||
|
//SKIP means skip to the next rule, which is the default behaviour of
|
||||||
|
//the for loop, so we don't need to take any extra actions
|
||||||
|
break;
|
||||||
|
default:
|
||||||
|
LOG.error("Invalid result '{}'", result);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
//If no rule was applied we return null, to let the engine move onto the
|
||||||
|
//next placementRule class
|
||||||
|
LOG.info("No matching rule found for application '{}', moving to next " +
|
||||||
|
"PlacementRule engine", asc.getApplicationName());
|
||||||
|
return null;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationPlacementContext placeToQueue(
|
||||||
|
ApplicationSubmissionContext asc,
|
||||||
|
MappingRule rule,
|
||||||
|
MappingRuleResult result) {
|
||||||
|
LOG.debug("Application '{}' have been placed to queue '{}' by " +
|
||||||
|
"rule {}", asc.getApplicationName(), result.getNormalizedQueue(), rule);
|
||||||
|
//evaluateRule will only return a PLACE rule, if it is verified
|
||||||
|
//and normalized, so it is safe here to simply create the placement
|
||||||
|
//context
|
||||||
|
return createPlacementContext(result.getNormalizedQueue());
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationPlacementContext placeToDefault(
|
||||||
|
ApplicationSubmissionContext asc,
|
||||||
|
VariableContext variables,
|
||||||
|
MappingRule rule) throws YarnException {
|
||||||
|
try {
|
||||||
|
String queueName = validateAndNormalizeQueue(
|
||||||
|
variables.replacePathVariables("%default"));
|
||||||
|
LOG.debug("Application '{}' have been placed to queue '{}' by " +
|
||||||
|
"the fallback option of rule {}",
|
||||||
|
asc.getApplicationName(), queueName, rule);
|
||||||
|
return createPlacementContext(queueName);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error("Rejecting application due to a failed fallback" +
|
||||||
|
" action '{}'" + ", reason: {}", asc.getApplicationName(),
|
||||||
|
e.getMessage());
|
||||||
|
//We intentionally omit the details, we don't want any server side
|
||||||
|
//config information to leak to the client side
|
||||||
|
throw new YarnException("Application submission have been rejected by a" +
|
||||||
|
" mapping rule. Please see the logs for details");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
}
|
@ -0,0 +1,376 @@
|
|||||||
|
/**
|
||||||
|
* Licensed to the Apache Software Foundation (ASF) under one
|
||||||
|
* or more contributor license agreements. See the NOTICE file
|
||||||
|
* distributed with this work for additional information
|
||||||
|
* regarding copyright ownership. The ASF licenses this file
|
||||||
|
* to you under the Apache License, Version 2.0 (the
|
||||||
|
* "License"); you may not use this file except in compliance
|
||||||
|
* with the License. You may obtain a copy of the License at
|
||||||
|
*
|
||||||
|
* http://www.apache.org/licenses/LICENSE-2.0
|
||||||
|
*
|
||||||
|
* Unless required by applicable law or agreed to in writing, software
|
||||||
|
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||||
|
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||||
|
* See the License for the specific language governing permissions and
|
||||||
|
* limitations under the License.
|
||||||
|
*/
|
||||||
|
|
||||||
|
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||||
|
|
||||||
|
import com.google.common.collect.ImmutableMap;
|
||||||
|
import com.google.common.collect.ImmutableSet;
|
||||||
|
import org.apache.hadoop.security.Groups;
|
||||||
|
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||||
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||||
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Test;
|
||||||
|
import org.slf4j.Logger;
|
||||||
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
|
import java.util.ArrayList;
|
||||||
|
import java.util.List;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Set;
|
||||||
|
|
||||||
|
import static junit.framework.TestCase.*;
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
|
import static org.mockito.ArgumentMatchers.isNull;
|
||||||
|
import static org.mockito.Mockito.mock;
|
||||||
|
import static org.mockito.Mockito.when;
|
||||||
|
|
||||||
|
public class TestCSMappingPlacementRule {
|
||||||
|
private static final Logger LOG = LoggerFactory
|
||||||
|
.getLogger(TestCSMappingPlacementRule.class);
|
||||||
|
private Map<String, Set<String>> userGroups = ImmutableMap.of(
|
||||||
|
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
||||||
|
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
||||||
|
"charlie", ImmutableSet.of("p_charlie", "user", "tester"),
|
||||||
|
"dave", ImmutableSet.of("user", "tester"),
|
||||||
|
"emily", ImmutableSet.of("user", "tester", "developer")
|
||||||
|
);
|
||||||
|
|
||||||
|
private void createQueueHierarchy(CapacitySchedulerQueueManager queueManager) {
|
||||||
|
MockQueueHierarchyBuilder.create()
|
||||||
|
.withQueueManager(queueManager)
|
||||||
|
.withQueue("root.unman")
|
||||||
|
.withQueue("root.default")
|
||||||
|
.withManagedParentQueue("root.man")
|
||||||
|
.withQueue("root.user.alice")
|
||||||
|
.withQueue("root.user.bob")
|
||||||
|
.withQueue("root.ambiguous.user.charlie")
|
||||||
|
.withQueue("root.ambiguous.user.dave")
|
||||||
|
.withQueue("root.ambiguous.user.ambi")
|
||||||
|
.withQueue("root.ambiguous.group.tester")
|
||||||
|
.withManagedParentQueue("root.ambiguous.managed")
|
||||||
|
.withQueue("root.ambiguous.deep.user.charlie")
|
||||||
|
.withQueue("root.ambiguous.deep.user.dave")
|
||||||
|
.withQueue("root.ambiguous.deep.user.ambi")
|
||||||
|
.withQueue("root.ambiguous.deep.group.tester")
|
||||||
|
.withManagedParentQueue("root.ambiguous.deep.managed")
|
||||||
|
.withQueue("root.disambiguous.deep.disambiuser.emily")
|
||||||
|
.withQueue("root.disambiguous.deep.disambiuser.disambi")
|
||||||
|
.withQueue("root.disambiguous.deep.group.developer")
|
||||||
|
.withManagedParentQueue("root.disambiguous.deep.dman")
|
||||||
|
.build();
|
||||||
|
|
||||||
|
when(queueManager.getQueue(isNull())).thenReturn(null);
|
||||||
|
when(queueManager.isAmbiguous("primarygrouponly")).thenReturn(true);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CSMappingPlacementRule setupEngine(
|
||||||
|
boolean overrideUserMappings, List<MappingRule> mappings)
|
||||||
|
throws IOException {
|
||||||
|
return setupEngine(overrideUserMappings, mappings, false);
|
||||||
|
}
|
||||||
|
|
||||||
|
private CSMappingPlacementRule setupEngine(
|
||||||
|
boolean overrideUserMappings, List<MappingRule> mappings,
|
||||||
|
boolean failOnConfigError)
|
||||||
|
throws IOException {
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration csConf =
|
||||||
|
mock(CapacitySchedulerConfiguration.class);
|
||||||
|
when(csConf.getMappingRules()).thenReturn(mappings);
|
||||||
|
when(csConf.getOverrideWithQueueMappings())
|
||||||
|
.thenReturn(overrideUserMappings);
|
||||||
|
|
||||||
|
CapacitySchedulerQueueManager qm =
|
||||||
|
mock(CapacitySchedulerQueueManager.class);
|
||||||
|
createQueueHierarchy(qm);
|
||||||
|
|
||||||
|
CapacityScheduler cs = mock(CapacityScheduler.class);
|
||||||
|
when(cs.getConfiguration()).thenReturn(csConf);
|
||||||
|
when(cs.getCapacitySchedulerQueueManager()).thenReturn(qm);
|
||||||
|
|
||||||
|
CSMappingPlacementRule engine = new CSMappingPlacementRule();
|
||||||
|
Groups groups = mock(Groups.class);
|
||||||
|
|
||||||
|
//Initializing group provider to return groups specified in the userGroup
|
||||||
|
// map for each respective user
|
||||||
|
for (String user : userGroups.keySet()) {
|
||||||
|
when(groups.getGroupsSet(user)).thenReturn(userGroups.get(user));
|
||||||
|
}
|
||||||
|
engine.setGroups(groups);
|
||||||
|
engine.setFailOnConfigError(failOnConfigError);
|
||||||
|
engine.initialize(cs);
|
||||||
|
|
||||||
|
return engine;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationSubmissionContext createApp(String name, String queue) {
|
||||||
|
ApplicationSubmissionContext ctx = Records.newRecord(
|
||||||
|
ApplicationSubmissionContext.class);
|
||||||
|
ctx.setApplicationName(name);
|
||||||
|
ctx.setQueue(queue);
|
||||||
|
return ctx;
|
||||||
|
}
|
||||||
|
|
||||||
|
private ApplicationSubmissionContext createApp(String name) {
|
||||||
|
return createApp(name, YarnConfiguration.DEFAULT_QUEUE_NAME);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertReject(String message, CSMappingPlacementRule engine,
|
||||||
|
ApplicationSubmissionContext asc, String user) {
|
||||||
|
try {
|
||||||
|
engine.getPlacementForApp(asc, user);
|
||||||
|
fail(message);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
//To prevent PlacementRule chaining present in PlacementManager
|
||||||
|
//when an application is rejected an exception is thrown to make sure
|
||||||
|
//no other engine will try to place it.
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertPlace(CSMappingPlacementRule engine,
|
||||||
|
ApplicationSubmissionContext asc, String user, String expectedQueue) {
|
||||||
|
assertPlace("Placement should not throw exception!",
|
||||||
|
engine, asc, user, expectedQueue);
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertPlace(String message, CSMappingPlacementRule engine,
|
||||||
|
ApplicationSubmissionContext asc, String user, String expectedQueue) {
|
||||||
|
try {
|
||||||
|
ApplicationPlacementContext apc = engine.getPlacementForApp(asc, user);
|
||||||
|
assertNotNull(message, apc);
|
||||||
|
String queue = apc.getParentQueue() == null ? "" :
|
||||||
|
(apc.getParentQueue() + DOT);
|
||||||
|
queue += apc.getQueue();
|
||||||
|
assertEquals(message, expectedQueue, queue);
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error(message, e);
|
||||||
|
fail(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
private void assertNullResult(String message, CSMappingPlacementRule engine,
|
||||||
|
ApplicationSubmissionContext asc, String user) {
|
||||||
|
try {
|
||||||
|
assertNull(message, engine.getPlacementForApp(asc, user));
|
||||||
|
} catch (YarnException e) {
|
||||||
|
LOG.error(message, e);
|
||||||
|
fail(message);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLegacyPlacementToExistingQueue() throws IOException {
|
||||||
|
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "root.ambiguous.user.ambi"));
|
||||||
|
rules.add(MappingRule.createLegacyRule("u", "bob", "ambi"));
|
||||||
|
rules.add(MappingRule.createLegacyRule("u", "dave", "disambi"));
|
||||||
|
rules.add(MappingRule.createLegacyRule("u", "%user", "disambiuser.%user"));
|
||||||
|
|
||||||
|
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||||
|
ApplicationSubmissionContext asc = createApp("Default");
|
||||||
|
assertPlace(engine, asc, "alice", "root.ambiguous.user.ambi");
|
||||||
|
assertPlace("Should be placed to default because ambi is ambiguous and " +
|
||||||
|
"legacy fallback is default", engine, asc, "bob", "root.default");
|
||||||
|
assertPlace(engine, asc, "emily",
|
||||||
|
"root.disambiguous.deep.disambiuser.emily");
|
||||||
|
assertPlace("Should be placed to default because disambiuser.charlie does" +
|
||||||
|
"not exit and legacy fallback is default", engine, asc, "charlie",
|
||||||
|
"root.default");
|
||||||
|
assertPlace(engine, asc, "dave",
|
||||||
|
"root.disambiguous.deep.disambiuser.disambi");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLegacyPlacementToManagedQueues() throws IOException {
|
||||||
|
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "root.ambiguous.managed.%user"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "bob", "managed.%user"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "charlie", "root.unman.charlie"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "dave", "non-existent.%user"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "%user", "root.man.%user"));
|
||||||
|
|
||||||
|
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||||
|
ApplicationSubmissionContext asc = createApp("Default");
|
||||||
|
assertPlace(engine, asc, "alice", "root.ambiguous.managed.alice");
|
||||||
|
assertPlace("Should be placed to default because managed is ambiguous " +
|
||||||
|
"and legacy fallback is default", engine, asc, "bob", "root.default");
|
||||||
|
assertPlace("Should be placed to default because root.unman is not " +
|
||||||
|
"managed and legacy fallback is default", engine, asc, "charlie",
|
||||||
|
"root.default");
|
||||||
|
assertPlace("Should be placed to default because parent queue does not " +
|
||||||
|
"exist and legacy fallback is default",engine, asc, "dave",
|
||||||
|
"root.default");
|
||||||
|
assertPlace(engine, asc, "emily", "root.man.emily");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLegacyPlacementShortReference() throws IOException {
|
||||||
|
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "non-existent"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "bob", "root"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "charlie", "man"));
|
||||||
|
rules.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "dave", "ambi"));
|
||||||
|
|
||||||
|
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||||
|
ApplicationSubmissionContext asc = createApp("Default");
|
||||||
|
assertPlace("Should be placed to default: non-existent does not exist and " +
|
||||||
|
"legacy fallback is default", engine, asc, "alice", "root.default");
|
||||||
|
assertPlace("Should be placed to default: root is never managed and " +
|
||||||
|
"legacy fallback is default", engine, asc, "bob", "root.default");
|
||||||
|
assertPlace("Should be placed to default: managed parent is not a leaf " +
|
||||||
|
"queue and legacy fallback is default", engine, asc, "charlie",
|
||||||
|
"root.default");
|
||||||
|
assertPlace("Should be placed to default: ambi is an ambiguous reference " +
|
||||||
|
"and legacy fallback is default", engine, asc, "dave", "root.default");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testRuleFallbackHandling() throws IOException {
|
||||||
|
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("alice"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||||
|
.setFallbackReject()));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("bob"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||||
|
.setFallbackSkip()));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("bob"),
|
||||||
|
MappingRuleActions.createUpdateDefaultAction("root.invalid")));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("bob"),
|
||||||
|
new MappingRuleActions.PlaceToQueueAction("%default")));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||||
|
.setFallbackDefaultPlacement()));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("emily"),
|
||||||
|
MappingRuleActions.createUpdateDefaultAction("root.invalid")));
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createUserMatcher("emily"),
|
||||||
|
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||||
|
.setFallbackDefaultPlacement()));
|
||||||
|
//This rule is to catch all shouldfail applications, and place them to a
|
||||||
|
// queue, so we can detect they were not rejected nor null-ed
|
||||||
|
rules.add(
|
||||||
|
new MappingRule(
|
||||||
|
MappingRuleMatchers.createApplicationNameMatcher("ShouldFail"),
|
||||||
|
new MappingRuleActions.PlaceToQueueAction("root.default")));
|
||||||
|
|
||||||
|
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||||
|
ApplicationSubmissionContext fail = createApp("ShouldFail");
|
||||||
|
ApplicationSubmissionContext success = createApp("ShouldSucceed");
|
||||||
|
|
||||||
|
assertReject("Alice has a straight up reject rule, " +
|
||||||
|
"her application should be rejected",
|
||||||
|
engine, fail, "alice");
|
||||||
|
assertReject(
|
||||||
|
"Bob should fail to place to non-existent -> should skip to next rule" +
|
||||||
|
"\nBob should update the %default to root.invalid" +
|
||||||
|
"\nBob should fail to place the app to %default which is root.invalid",
|
||||||
|
engine, fail, "bob");
|
||||||
|
assertPlace(
|
||||||
|
"Charlie should be able to place the app to root.default as the" +
|
||||||
|
"non-existent queue does not exist, but fallback is place to default",
|
||||||
|
engine, success, "charlie", "root.default");
|
||||||
|
assertNullResult(
|
||||||
|
"Dave with success app has no matching rule, so we expect a null",
|
||||||
|
engine, success, "dave");
|
||||||
|
assertReject(
|
||||||
|
"Emily should update the %default to root.invalid" +
|
||||||
|
"\nBob should fail to place the app to non-existent and since the" +
|
||||||
|
" fallback is placeToDefault, it should also fail, because we have" +
|
||||||
|
" just updated default to an invalid value",
|
||||||
|
engine, fail, "emily");
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testConfigValidation() {
|
||||||
|
ArrayList<MappingRule> nonExistentStatic = new ArrayList<>();
|
||||||
|
nonExistentStatic.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "non-existent"));
|
||||||
|
|
||||||
|
//since the %token is an unknown variable, it will be considered as
|
||||||
|
//a literal string, and since %token queue does not exist, it should fail
|
||||||
|
ArrayList<MappingRule> tokenAsStatic = new ArrayList<>();
|
||||||
|
tokenAsStatic.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "%token"));
|
||||||
|
|
||||||
|
ArrayList<MappingRule> tokenAsDynamic = new ArrayList<>();
|
||||||
|
//this rule might change the value of the %token, so the validator will be
|
||||||
|
//aware of the %token variable
|
||||||
|
tokenAsDynamic.add(new MappingRule(
|
||||||
|
new MappingRuleMatchers.MatchAllMatcher(),
|
||||||
|
new MappingRuleActions.VariableUpdateAction("%token", "non-existent")
|
||||||
|
));
|
||||||
|
//since %token is an known variable, this rule is considered dynamic
|
||||||
|
//so it cannot be entirely validated, this init should be successful
|
||||||
|
tokenAsDynamic.add(MappingRule.createLegacyRule(
|
||||||
|
"u", "alice", "%token"));
|
||||||
|
|
||||||
|
try {
|
||||||
|
setupEngine(true, nonExistentStatic, true);
|
||||||
|
fail("We expect the setup to fail because we have a static rule " +
|
||||||
|
"referencing a non-existent queue");
|
||||||
|
} catch (IOException e) {
|
||||||
|
//Exception expected
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
setupEngine(true, tokenAsStatic, true);
|
||||||
|
fail("We expect the setup to fail because we have a rule containing an " +
|
||||||
|
"unknown token, which is considered a static rule, with a " +
|
||||||
|
"non-existent queue");
|
||||||
|
} catch (IOException e) {
|
||||||
|
//Exception expected
|
||||||
|
}
|
||||||
|
|
||||||
|
try {
|
||||||
|
setupEngine(true, tokenAsDynamic, true);
|
||||||
|
} catch (IOException e) {
|
||||||
|
fail("We expect the setup to succeed because the %token is a known " +
|
||||||
|
"variable so the rule is considered dynamic without parent, " +
|
||||||
|
"and this always should pass");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
}
|
Loading…
Reference in New Issue
Block a user