YARN-10659. Improve CS MappingRule %secondary_group evaluation. Contributed by Gergely Pollak
This commit is contained in:
parent
d7eeca4d0c
commit
a5745711dd
@ -37,6 +37,7 @@
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
@ -184,6 +185,10 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
||||
LOG.warn(
|
||||
"Group provider hasn't been set, cannot query groups for user {}",
|
||||
user);
|
||||
//enforcing empty primary group instead of null, which would be considered
|
||||
//as unknown variable and would evaluate to '%primary_group'
|
||||
vctx.put("%primary_group", "");
|
||||
vctx.put("%secondary_group", "");
|
||||
return;
|
||||
}
|
||||
Set<String> groupsSet = groups.getGroupsSet(user);
|
||||
@ -192,24 +197,32 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
||||
vctx.putExtraDataset("groups", groupsSet);
|
||||
return;
|
||||
}
|
||||
String secondaryGroup = null;
|
||||
Iterator<String> it = groupsSet.iterator();
|
||||
String primaryGroup = it.next();
|
||||
|
||||
ArrayList<String> secondaryGroupList = new ArrayList<>();
|
||||
|
||||
while (it.hasNext()) {
|
||||
String group = it.next();
|
||||
if (this.queueManager.getQueue(group) != null) {
|
||||
secondaryGroup = group;
|
||||
break;
|
||||
}
|
||||
secondaryGroupList.add(it.next());
|
||||
}
|
||||
|
||||
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
||||
LOG.debug("User {} is not associated with any Secondary group", user);
|
||||
if (secondaryGroupList.size() == 0) {
|
||||
//if we have no chance to have a secondary group to speed up evaluation
|
||||
//we simply register it as a regular variable with "" as a value
|
||||
vctx.put("%secondary_group", "");
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("User {} does not have any potential Secondary group", user);
|
||||
}
|
||||
} else {
|
||||
vctx.putConditional(
|
||||
MappingRuleConditionalVariables.SecondaryGroupVariable.VARIABLE_NAME,
|
||||
new MappingRuleConditionalVariables.SecondaryGroupVariable(
|
||||
this.queueManager,
|
||||
secondaryGroupList
|
||||
));
|
||||
}
|
||||
|
||||
vctx.put("%primary_group", primaryGroup);
|
||||
vctx.put("%secondary_group", secondaryGroup);
|
||||
vctx.putExtraDataset("groups", groupsSet);
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule.MappingRuleConditionalVariable;
|
||||
|
||||
import java.util.Arrays;
|
||||
import java.util.HashMap;
|
||||
@ -38,6 +39,13 @@ public class VariableContext {
|
||||
* This is our actual variable store.
|
||||
*/
|
||||
private Map<String, String> variables = new HashMap<>();
|
||||
|
||||
/**
|
||||
* This is our conditional variable store.
|
||||
*/
|
||||
private Map<String, MappingRuleConditionalVariable> conditionalVariables =
|
||||
new HashMap<>();
|
||||
|
||||
/**
|
||||
* This set contains the names of the immutable variables if null it is
|
||||
* ignored.
|
||||
@ -106,10 +114,32 @@ public VariableContext put(String name, String value) {
|
||||
throw new IllegalStateException(
|
||||
"Variable '" + name + "' is immutable, cannot update it's value!");
|
||||
}
|
||||
|
||||
if (conditionalVariables.containsKey(name)) {
|
||||
throw new IllegalStateException(
|
||||
"Variable '" + name + "' is already defined as a conditional" +
|
||||
" variable, cannot change it's value!");
|
||||
}
|
||||
variables.put(name, value);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method is used to add a conditional variable to the variable context.
|
||||
* @param name Name of the variable
|
||||
* @param variable The conditional variable evaluator
|
||||
* @return VariableContext for daisy chaining
|
||||
*/
|
||||
public VariableContext putConditional(String name,
|
||||
MappingRuleConditionalVariable variable) {
|
||||
if (conditionalVariables.containsKey(name)) {
|
||||
throw new IllegalStateException(
|
||||
"Variable '" + name + "' is conditional, cannot update it's value!");
|
||||
}
|
||||
conditionalVariables.put(name, variable);
|
||||
return this;
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns the value of a variable, null values are replaced with "".
|
||||
* @param name Name of the variable
|
||||
@ -213,10 +243,21 @@ public String replacePathVariables(String input) {
|
||||
|
||||
String[] parts = input.split("\\.");
|
||||
for (int i = 0; i < parts.length; i++) {
|
||||
//if the part is a variable it should be in the map, otherwise we keep
|
||||
//it's original value. This means undefined variables will return the
|
||||
//name of the variable, but this is working as intended.
|
||||
String newVal = variables.getOrDefault(parts[i], parts[i]);
|
||||
String newVal = parts[i];
|
||||
//if the part is a variable it should be in either the variable or the
|
||||
//conditional variable map, otherwise we keep it's original value.
|
||||
//This means undefined variables will return the name of the variable,
|
||||
//but this is working as intended.
|
||||
if (variables.containsKey(parts[i])) {
|
||||
newVal = variables.get(parts[i]);
|
||||
} else if (conditionalVariables.containsKey(parts[i])) {
|
||||
MappingRuleConditionalVariable condVariable =
|
||||
conditionalVariables.get(parts[i]);
|
||||
if (condVariable != null) {
|
||||
newVal = condVariable.evaluateInPath(parts, i);
|
||||
}
|
||||
}
|
||||
|
||||
//if a variable's value is null, we use empty string instead
|
||||
if (newVal == null) {
|
||||
newVal = "";
|
||||
|
@ -0,0 +1,22 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule;
|
||||
|
||||
public interface MappingRuleConditionalVariable {
|
||||
String evaluateInPath(String[] parts, int currentIndex);
|
||||
}
|
@ -0,0 +1,123 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement.csmappingrule;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||
|
||||
import java.util.List;
|
||||
|
||||
public class MappingRuleConditionalVariables {
|
||||
/**
|
||||
* Utility class, hiding constructor.
|
||||
*/
|
||||
private MappingRuleConditionalVariables() {}
|
||||
|
||||
/**
|
||||
* SecondaryGroupVariable represents a conditional variable which is supposed
|
||||
* to evaluate path parts with "%secondary_group". The evaluation depends on
|
||||
* if parent path is provided.
|
||||
* If there was no parent path provided, the %secondary_group variable will be
|
||||
* equal to the first non-primary group of the user which has a matching queue
|
||||
* in the queue hierarchy. This means the queue name must be disambiguous as
|
||||
* well.
|
||||
* If there is a parent provided (the %secondary_group variable is not the
|
||||
* first element in the path), the %secondary_group variable will be
|
||||
* equal to the first non-primary group of the user which has a matching queue
|
||||
* UNDER the parent path. The parent path must be a full path, to avoid
|
||||
* ambiguity problems.
|
||||
*/
|
||||
public static class SecondaryGroupVariable implements
|
||||
MappingRuleConditionalVariable {
|
||||
/**
|
||||
* This is the name of the variable we are replacing.
|
||||
*/
|
||||
public final static String VARIABLE_NAME = "%secondary_group";
|
||||
|
||||
/**
|
||||
* We need an instance of queue manager in order to look for queues under
|
||||
* the parent path.
|
||||
*/
|
||||
private CapacitySchedulerQueueManager queueManager;
|
||||
/**
|
||||
* We store the potential secondary_groups candidates in this list, it must
|
||||
* not contain the primary group.
|
||||
*/
|
||||
private List<String> potentialGroups;
|
||||
|
||||
/**
|
||||
* Constructor requires a queue manager instance and a list of potential
|
||||
* secondary groups.
|
||||
* @param qm The queue manager which will be used to check which potential
|
||||
* secondary group should be used.
|
||||
* @param groups List of potential secondary groups.
|
||||
*/
|
||||
public SecondaryGroupVariable(CapacitySchedulerQueueManager qm,
|
||||
List<String> groups) {
|
||||
queueManager = qm;
|
||||
potentialGroups = groups;
|
||||
}
|
||||
|
||||
/**
|
||||
* Method used to evaluate the variable when used in a path.
|
||||
* @param parts Split representation of the path.
|
||||
* @param currentIndex The index of the evaluation in the path. This shows
|
||||
* which part is currently being evaluated.
|
||||
* @return Substituted queue path part, this method will only return the
|
||||
* value of the conditional variable, not the whole path.
|
||||
*/
|
||||
public String evaluateInPath(String[] parts, int currentIndex) {
|
||||
//First we need to determine the parent path (if any)
|
||||
StringBuilder parentBuilder = new StringBuilder();
|
||||
//Building the parent prefix, if we don't have any parent path
|
||||
//in case of currentIndex == 0 we will have an empty prefix.
|
||||
for (int i = 0; i < currentIndex; i++) {
|
||||
parentBuilder.append(parts[i]);
|
||||
//Generally this is not a good idea, we would need a condition, to not
|
||||
//append a '.' after the last part, however we are generating parent
|
||||
//prefix paths, so we need paths prefixes, like 'root.group.something.'
|
||||
parentBuilder.append(".");
|
||||
}
|
||||
|
||||
//We'll use this prefix to lookup the groups, when we have a parent
|
||||
//provided we need to find a queue under that parent, which matches the
|
||||
//name of the secondaryGroup, if we don't have a parent the prefix is
|
||||
//empty
|
||||
String lookupPrefix = parentBuilder.toString();
|
||||
|
||||
//Going through the potential groups to check if there is a matching queue
|
||||
for (String group : potentialGroups) {
|
||||
String path = lookupPrefix + group;
|
||||
if (queueManager.getQueue(path) != null) {
|
||||
return group;
|
||||
}
|
||||
}
|
||||
|
||||
//No valid group found
|
||||
return "";
|
||||
}
|
||||
|
||||
@Override
|
||||
public String toString() {
|
||||
return "SecondaryGroupVariable{" +
|
||||
"variableName='" + VARIABLE_NAME + "'," +
|
||||
"groups=" + potentialGroups +
|
||||
"}";
|
||||
}
|
||||
}
|
||||
|
||||
}
|
@ -153,6 +153,8 @@ public MappingRuleResultType getResult() {
|
||||
/**
|
||||
* Generator method for place results.
|
||||
* @param queue The name of the queue in which we shall place the application
|
||||
* @param allowCreate Flag to indicate if the placement rule is allowed to
|
||||
* create a queue if possible.
|
||||
* @return The generated MappingRuleResult
|
||||
*/
|
||||
public static MappingRuleResult createPlacementResult(
|
||||
|
@ -64,7 +64,7 @@ public class TestCSMappingPlacementRule {
|
||||
public TemporaryFolder folder = new TemporaryFolder();
|
||||
|
||||
private Map<String, Set<String>> userGroups = ImmutableMap.of(
|
||||
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
||||
"alice", ImmutableSet.of("p_alice", "unique", "user"),
|
||||
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
||||
"charlie", ImmutableSet.of("p_charlie", "user", "tester"),
|
||||
"dave", ImmutableSet.of("user"),
|
||||
@ -79,6 +79,8 @@ private void createQueueHierarchy(CapacitySchedulerQueueManager queueManager) {
|
||||
.withManagedParentQueue("root.man")
|
||||
.withQueue("root.user.alice")
|
||||
.withQueue("root.user.bob")
|
||||
.withQueue("root.secondaryTests.unique")
|
||||
.withQueue("root.secondaryTests.user")
|
||||
.withQueue("root.ambiguous.user.charlie")
|
||||
.withQueue("root.ambiguous.user.dave")
|
||||
.withQueue("root.ambiguous.user.ambi")
|
||||
@ -91,13 +93,12 @@ private void createQueueHierarchy(CapacitySchedulerQueueManager queueManager) {
|
||||
.withManagedParentQueue("root.ambiguous.deep.managed")
|
||||
.withQueue("root.disambiguous.deep.disambiuser.emily")
|
||||
.withQueue("root.disambiguous.deep.disambiuser.disambi")
|
||||
.withQueue("root.disambiguous.deep.group.developer")
|
||||
.withManagedParentQueue("root.disambiguous.deep.group.developer")
|
||||
.withManagedParentQueue("root.disambiguous.deep.dman")
|
||||
.withDynamicParentQueue("root.dynamic")
|
||||
.build();
|
||||
|
||||
when(queueManager.getQueue(isNull())).thenReturn(null);
|
||||
when(queueManager.isAmbiguous("primarygrouponly")).thenReturn(true);
|
||||
}
|
||||
|
||||
private CSMappingPlacementRule setupEngine(
|
||||
@ -479,14 +480,6 @@ public void testSpecified() throws IOException {
|
||||
"queue 'root.user.bob'", engine, appBob, "alice", "root.user.bob");
|
||||
}
|
||||
|
||||
private MappingRule createGroupMapping(String group, String queue) {
|
||||
MappingRuleMatcher matcher = MappingRuleMatchers.createUserGroupMatcher(group);
|
||||
MappingRuleAction action =
|
||||
(new MappingRuleActions.PlaceToQueueAction(queue, true))
|
||||
.setFallbackReject();
|
||||
return new MappingRule(matcher, action);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGroupTargetMatching() throws IOException {
|
||||
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||
@ -502,7 +495,7 @@ public void testGroupTargetMatching() throws IOException {
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"root.dynamic.%secondary_group.%user", true))
|
||||
"root.disambiguous.deep.group.%secondary_group.%user", true))
|
||||
.setFallbackReject()));
|
||||
|
||||
rules.add(
|
||||
@ -526,10 +519,11 @@ public void testGroupTargetMatching() throws IOException {
|
||||
"Alice should be placed to root.man.p_alice based on her primary group",
|
||||
engine, app, "alice", "root.man.p_alice");
|
||||
assertPlace(
|
||||
"Bob should be placed to root.dynamic.developer.bob based on his " +
|
||||
"secondary group, since we have a queue named 'developer', bob " +
|
||||
"Bob should be placed to root.disambiguous.deep.group.developer.bob" +
|
||||
"based on his secondary group, since we have a queue named" +
|
||||
"'developer', under the path 'root.disambiguous.deep.group' bob " +
|
||||
"identifies as a user with secondary_group 'developer'", engine, app,
|
||||
"bob", "root.dynamic.developer.bob");
|
||||
"bob", "root.disambiguous.deep.group.developer.bob");
|
||||
assertReject("Charlie should get rejected because he neither of his" +
|
||||
"groups have an ambiguous queue, so effectively he has no secondary " +
|
||||
"group", engine, app, "charlie");
|
||||
@ -537,6 +531,93 @@ public void testGroupTargetMatching() throws IOException {
|
||||
engine, app, "dave");
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testSecondaryGroupWithoutParent() throws IOException {
|
||||
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("alice"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"%secondary_group", false))
|
||||
.setFallbackReject()));
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"%secondary_group.%user", true))
|
||||
.setFallbackReject()));
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"%secondary_group", true))
|
||||
.setFallbackReject()));
|
||||
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||
ApplicationSubmissionContext app = createApp("app");
|
||||
|
||||
assertPlace(
|
||||
"Alice should be placed to root.secondaryTests.unique because " +
|
||||
"'unique' is a globally unique queue, and she has a matching group",
|
||||
engine, app, "alice", "root.secondaryTests.unique");
|
||||
assertPlace(
|
||||
"Bob should be placed to root.disambiguous.deep.group.developer.bob " +
|
||||
"because 'developer' is a globally unique PARENT queue, and he " +
|
||||
"has a matching group name, and can create a queue with '%user' " +
|
||||
"under it", engine, app, "bob",
|
||||
"root.disambiguous.deep.group.developer.bob");
|
||||
assertReject("Charlie should get rejected because neither of his" +
|
||||
"groups have a disambiguous queue, so effectively he has no " +
|
||||
"secondary group", engine, app, "charlie");
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testSecondaryGroupWithParent() throws IOException {
|
||||
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("alice"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"root.secondaryTests.%secondary_group", false))
|
||||
.setFallbackReject()));
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"root.secondaryTests.%secondary_group", true))
|
||||
.setFallbackReject()));
|
||||
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||
(new MappingRuleActions.PlaceToQueueAction(
|
||||
"root.%secondary_group", true))
|
||||
.setFallbackReject()));
|
||||
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||
ApplicationSubmissionContext app = createApp("app");
|
||||
|
||||
assertPlace(
|
||||
"Alice should be placed to root.secondaryTests.unique because " +
|
||||
"both her secondary groups 'user' and 'unique' are eligible " +
|
||||
"for being a secondary group under root.secondaryTests, but " +
|
||||
"'unique' precedes 'user' in the group list.",
|
||||
engine, app, "alice", "root.secondaryTests.unique");
|
||||
assertPlace(
|
||||
"Bob should be placed to root.secondaryTests.user " +
|
||||
"bob is member of group 'user' and while 'user' is globally not " +
|
||||
"unique it is a valid secondary group target under queue " +
|
||||
"root.secondaryTests.",
|
||||
engine, app, "bob", "root.secondaryTests.user");
|
||||
assertReject("Charlie should get rejected because neither of his" +
|
||||
"groups have a matching queue under root.", engine, app, "charlie");
|
||||
}
|
||||
|
||||
|
||||
void assertConfigTestResult(List<MappingRule> rules) {
|
||||
assertEquals("We only specified one rule", 1, rules.size());
|
||||
MappingRule rule = rules.get(0);
|
||||
|
@ -472,8 +472,8 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
// u:b4:c.%secondary_group
|
||||
QueueMapping userQueueMapping3 = QueueMappingBuilder.create()
|
||||
.type(QueueMapping.MappingType.USER)
|
||||
.source("e")
|
||||
.queue("c.%secondary_group")
|
||||
.source("b4")
|
||||
.queue("root.b.%secondary_group")
|
||||
.build();
|
||||
|
||||
queueMappingsForUG.add(userQueueMapping1);
|
||||
@ -508,9 +508,9 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
ApplicationPlacementContext ctx1 = r.getPlacementForApp(asc, "a1");
|
||||
assertEquals("Queue", "a1group", ctx1.getQueue());
|
||||
|
||||
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "e");
|
||||
assertEquals("Queue", "esubgroup1", ctx2.getQueue());
|
||||
assertEquals("Queue", "root.c", ctx2.getParentQueue());
|
||||
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "b4");
|
||||
assertEquals("Queue", "b4subgroup1", ctx2.getQueue());
|
||||
assertEquals("Queue", "root.b", ctx2.getParentQueue());
|
||||
} finally {
|
||||
if (mockRM != null) {
|
||||
mockRM.close();
|
||||
|
Loading…
Reference in New Issue
Block a user