YARN-10411. Create an allowCreate flag for MappingRuleAction. Contributed by Gergely Pollak.
This commit is contained in:
parent
dd6c66556e
commit
360bbcd3bc
@ -203,15 +203,16 @@ private VariableContext createVariableContext(
|
||||
return vctx;
|
||||
}
|
||||
|
||||
private String validateAndNormalizeQueue(String queueName)
|
||||
throws YarnException {
|
||||
private String validateAndNormalizeQueue(
|
||||
String queueName, boolean allowCreate) throws YarnException {
|
||||
MappingQueuePath path = new MappingQueuePath(queueName);
|
||||
String leaf = path.getLeafName();
|
||||
String parent = path.getParent();
|
||||
|
||||
String normalizedName;
|
||||
if (parent != null) {
|
||||
normalizedName = validateAndNormalizeQueueWithParent(parent, leaf);
|
||||
normalizedName = validateAndNormalizeQueueWithParent(
|
||||
parent, leaf, allowCreate);
|
||||
} else {
|
||||
normalizedName = validateAndNormalizeQueueWithNoParent(leaf);
|
||||
}
|
||||
@ -225,8 +226,8 @@ private String validateAndNormalizeQueue(String queueName)
|
||||
return normalizedName;
|
||||
}
|
||||
|
||||
private String validateAndNormalizeQueueWithParent(String parent, String leaf)
|
||||
throws YarnException {
|
||||
private String validateAndNormalizeQueueWithParent(
|
||||
String parent, String leaf, boolean allowCreate) throws YarnException {
|
||||
CSQueue parentQueue = queueManager.getQueue(parent);
|
||||
//we don't find the specified parent, so the placement rule is invalid
|
||||
//for this case
|
||||
@ -244,17 +245,26 @@ private String validateAndNormalizeQueueWithParent(String parent, String leaf)
|
||||
String parentPath = parentQueue.getQueuePath();
|
||||
String fullPath = parentPath + DOT + leaf;
|
||||
|
||||
//if we have a parent which is not a managed parent, we check if the leaf
|
||||
//queue exists under this parent
|
||||
if (!(parentQueue instanceof ManagedParentQueue)) {
|
||||
CSQueue queue = queueManager.getQueue(fullPath);
|
||||
//if the queue doesn't exit we return null
|
||||
if (queue == null) {
|
||||
throw new YarnException("Mapping rule specified a parent queue '" +
|
||||
parent + "', but it is not a managed parent queue, " +
|
||||
"and no queue exists with name '" + leaf + "' under it.");
|
||||
}
|
||||
//checking if the queue actually exists
|
||||
CSQueue queue = queueManager.getQueue(fullPath);
|
||||
//if we have a parent which is not a managed parent and the queue doesn't
|
||||
//then it is an invalid target, since the queue won't be auto-created
|
||||
if (!(parentQueue instanceof ManagedParentQueue) && queue == null) {
|
||||
throw new YarnException("Mapping rule specified a parent queue '" +
|
||||
parent + "', but it is not a managed parent queue, " +
|
||||
"and no queue exists with name '" + leaf + "' under it.");
|
||||
}
|
||||
|
||||
//if the queue does not exist but the parent is managed we need to check if
|
||||
//auto-creation is allowed
|
||||
if (parentQueue instanceof ManagedParentQueue
|
||||
&& queue == null
|
||||
&& allowCreate == false) {
|
||||
throw new YarnException("Mapping rule doesn't allow auto-creation of " +
|
||||
"the queue '" + fullPath + "'");
|
||||
}
|
||||
|
||||
|
||||
//at this point we either have a managed parent or the queue actually
|
||||
//exists so we have a placement context, returning it
|
||||
return fullPath;
|
||||
@ -293,11 +303,11 @@ private MappingRuleResult evaluateRule(
|
||||
|
||||
if (result.getResult() == MappingRuleResultType.PLACE) {
|
||||
try {
|
||||
result.updateNormalizedQueue(
|
||||
validateAndNormalizeQueue(result.getQueue()));
|
||||
result.updateNormalizedQueue(validateAndNormalizeQueue(
|
||||
result.getQueue(), result.isCreateAllowed()));
|
||||
} catch (Exception e) {
|
||||
LOG.info("Cannot place to queue '" + result.getQueue() +
|
||||
"' returned by mapping rule.", e);
|
||||
LOG.info("Cannot place to queue '{}' returned by mapping rule. " +
|
||||
"Reason: {}", result.getQueue(), e.getMessage());
|
||||
result = rule.getFallback();
|
||||
}
|
||||
}
|
||||
@ -395,7 +405,7 @@ private ApplicationPlacementContext placeToDefault(
|
||||
MappingRule rule) throws YarnException {
|
||||
try {
|
||||
String queueName = validateAndNormalizeQueue(
|
||||
variables.replacePathVariables("%default"));
|
||||
variables.replacePathVariables("%default"), false);
|
||||
LOG.debug("Application '{}' have been placed to queue '{}' by " +
|
||||
"the fallback option of rule {}",
|
||||
asc.getApplicationName(), queueName, rule);
|
||||
|
@ -98,7 +98,8 @@ public static MappingRule createLegacyRule(String source, String path) {
|
||||
public static MappingRule createLegacyRule(
|
||||
String type, String source, String path) {
|
||||
MappingRuleMatcher matcher;
|
||||
MappingRuleAction action = new MappingRuleActions.PlaceToQueueAction(path);
|
||||
MappingRuleAction action = MappingRuleActions.createPlaceToQueueAction(
|
||||
path, true);
|
||||
//While legacy rule fallback handling is a bit inconsistent, the most cases
|
||||
//it fall back to default queue placement, so this is the best approximation
|
||||
action.setFallbackDefaultPlacement();
|
||||
|
@ -43,13 +43,22 @@ public static class PlaceToQueueAction extends MappingRuleActionBase {
|
||||
*/
|
||||
private String queuePattern;
|
||||
|
||||
/**
|
||||
* This flag indicates whether the target queue can be created if it does
|
||||
* not exist yet.
|
||||
*/
|
||||
private boolean allowCreate;
|
||||
|
||||
/**
|
||||
* Constructor.
|
||||
* @param queuePattern The queue pattern in which the application will be
|
||||
* placed if this action is fired. The pattern may
|
||||
* contain variables. eg. root.%primary_group.%user
|
||||
* @param allowCreate Determines if the target queue should be created if it
|
||||
* does not exist
|
||||
*/
|
||||
PlaceToQueueAction(String queuePattern) {
|
||||
PlaceToQueueAction(String queuePattern, boolean allowCreate) {
|
||||
this.allowCreate = allowCreate;
|
||||
this.queuePattern = queuePattern == null ? "" : queuePattern;
|
||||
}
|
||||
|
||||
@ -63,7 +72,7 @@ public static class PlaceToQueueAction extends MappingRuleActionBase {
|
||||
@Override
|
||||
public MappingRuleResult execute(VariableContext variables) {
|
||||
String substituted = variables.replacePathVariables(queuePattern);
|
||||
return MappingRuleResult.createPlacementResult(substituted);
|
||||
return MappingRuleResult.createPlacementResult(substituted, allowCreate);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -209,11 +218,14 @@ public static MappingRuleAction createUpdateDefaultAction(String queue) {
|
||||
* Convenience method to create an action which places the application to a
|
||||
* queue.
|
||||
* @param queue The name of the queue the application should be placed to
|
||||
* @param allowCreate Determines if the target queue should be created if it
|
||||
* does not exist
|
||||
* @return PlaceToQueueAction which will place the application to the
|
||||
* specified queue on execute
|
||||
*/
|
||||
public static MappingRuleAction createPlaceToQueueAction(String queue) {
|
||||
return new PlaceToQueueAction(queue);
|
||||
public static MappingRuleAction createPlaceToQueueAction(
|
||||
String queue, boolean allowCreate) {
|
||||
return new PlaceToQueueAction(queue, allowCreate);
|
||||
}
|
||||
|
||||
/**
|
||||
@ -223,7 +235,7 @@ public static MappingRuleAction createPlaceToQueueAction(String queue) {
|
||||
* DEFAULT queue on execute
|
||||
*/
|
||||
public static MappingRuleAction createPlaceToDefaultAction() {
|
||||
return createPlaceToQueueAction(DEFAULT_QUEUE_VARIABLE);
|
||||
return createPlaceToQueueAction(DEFAULT_QUEUE_VARIABLE, false);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -28,6 +28,13 @@ public final class MappingRuleResult {
|
||||
*/
|
||||
private final String queue;
|
||||
|
||||
/**
|
||||
* This flag indicates whether the target queue can be created if it does not
|
||||
* exist yet.
|
||||
* Only valid if result == PLACE
|
||||
*/
|
||||
private boolean allowCreate = true;
|
||||
|
||||
/**
|
||||
* The normalized name of the queue, since CS allows users to reference queues
|
||||
* by only their leaf name, we need to normalize those queues to have full
|
||||
@ -78,6 +85,24 @@ private MappingRuleResult(String queue, MappingRuleResultType result) {
|
||||
this.result = result;
|
||||
}
|
||||
|
||||
/**
|
||||
* Constructor is private to force the user to use the predefined generator
|
||||
* methods to create new instances in order to avoid inconsistent states.
|
||||
* @param queue Name of the queue in which the application is supposed to be
|
||||
* placed, only valid if result == PLACE
|
||||
* otherwise it should be null
|
||||
* @param result The type of the result
|
||||
* @param allowCreate Determines if the target queue should be created if it
|
||||
* does not exist
|
||||
*/
|
||||
private MappingRuleResult(
|
||||
String queue, MappingRuleResultType result, boolean allowCreate) {
|
||||
this.queue = queue;
|
||||
this.normalizedQueue = queue;
|
||||
this.result = result;
|
||||
this.allowCreate = allowCreate;
|
||||
}
|
||||
|
||||
/**
|
||||
* This method returns the result queue. Currently only makes sense when
|
||||
* result == PLACE.
|
||||
@ -87,6 +112,15 @@ public String getQueue() {
|
||||
return queue;
|
||||
}
|
||||
|
||||
/**
|
||||
* The method returns true if the result queue should be created when it does
|
||||
* not exist yet.
|
||||
* @return true if non-existent queues should be created
|
||||
*/
|
||||
public boolean isCreateAllowed() {
|
||||
return allowCreate;
|
||||
}
|
||||
|
||||
/**
|
||||
* External interface for setting the normalized version of the queue. This
|
||||
* class cannot normalize on it's own, but provides a way to store the
|
||||
@ -121,8 +155,10 @@ public MappingRuleResultType getResult() {
|
||||
* @param queue The name of the queue in which we shall place the application
|
||||
* @return The generated MappingRuleResult
|
||||
*/
|
||||
public static MappingRuleResult createPlacementResult(String queue) {
|
||||
return new MappingRuleResult(queue, MappingRuleResultType.PLACE);
|
||||
public static MappingRuleResult createPlacementResult(
|
||||
String queue, boolean allowCreate) {
|
||||
return new MappingRuleResult(
|
||||
queue, MappingRuleResultType.PLACE, allowCreate);
|
||||
}
|
||||
|
||||
/**
|
||||
|
@ -260,12 +260,12 @@ public void testRuleFallbackHandling() throws IOException {
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("alice"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", true))
|
||||
.setFallbackReject()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", true))
|
||||
.setFallbackSkip()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
@ -274,11 +274,11 @@ public void testRuleFallbackHandling() throws IOException {
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
new MappingRuleActions.PlaceToQueueAction("%default")));
|
||||
new MappingRuleActions.PlaceToQueueAction("%default", true)));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", true))
|
||||
.setFallbackDefaultPlacement()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
@ -287,14 +287,14 @@ public void testRuleFallbackHandling() throws IOException {
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("emily"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent"))
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", true))
|
||||
.setFallbackDefaultPlacement()));
|
||||
//This rule is to catch all shouldfail applications, and place them to a
|
||||
// queue, so we can detect they were not rejected nor null-ed
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createApplicationNameMatcher("ShouldFail"),
|
||||
new MappingRuleActions.PlaceToQueueAction("root.default")));
|
||||
new MappingRuleActions.PlaceToQueueAction("root.default", true)));
|
||||
|
||||
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||
ApplicationSubmissionContext fail = createApp("ShouldFail");
|
||||
@ -373,4 +373,42 @@ public void testConfigValidation() {
|
||||
}
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testAllowCreateFlag() throws IOException {
|
||||
ArrayList<MappingRule> rules = new ArrayList<>();
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("alice"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", true))
|
||||
.setFallbackReject()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("non-existent", false))
|
||||
.setFallbackReject()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("charlie"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("root.man.create", true))
|
||||
.setFallbackReject()));
|
||||
rules.add(
|
||||
new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("emily"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("root.man.create", false))
|
||||
.setFallbackReject()));
|
||||
|
||||
CSMappingPlacementRule engine = setupEngine(true, rules);
|
||||
ApplicationSubmissionContext app = createApp("app");
|
||||
|
||||
assertReject("Alice should be rejected because the target queue" +
|
||||
" does not exist", engine, app, "alice");
|
||||
assertReject("Bob should be rejected because the target queue" +
|
||||
" does not exist", engine, app, "bob");
|
||||
assertReject("Emily should be rejected because auto queue creation is not" +
|
||||
" allowed for this action", engine, app, "emily");
|
||||
|
||||
assertPlace("Charlie should be able to place since it is allowed to create",
|
||||
engine, app, "charlie", "root.man.create");
|
||||
|
||||
}
|
||||
}
|
@ -62,7 +62,7 @@ public void testMappingRuleEvaluation() {
|
||||
|
||||
MappingRule rule = new MappingRule(
|
||||
MappingRuleMatchers.createUserMatcher("bob"),
|
||||
(new MappingRuleActions.PlaceToQueueAction("%default.%default"))
|
||||
(new MappingRuleActions.PlaceToQueueAction("%default.%default", true))
|
||||
.setFallbackSkip()
|
||||
);
|
||||
|
||||
@ -133,7 +133,7 @@ public void testLegacyEvaluation() {
|
||||
@Test
|
||||
public void testToStrings() {
|
||||
MappingRuleAction action = new MappingRuleActions.PlaceToQueueAction(
|
||||
"queue");
|
||||
"queue", true);
|
||||
MappingRuleMatcher matcher = MappingRuleMatchers.createUserMatcher("bob");
|
||||
MappingRule rule = new MappingRule(matcher, action);
|
||||
|
||||
|
@ -53,7 +53,7 @@ public void testRejectAction() {
|
||||
@Test
|
||||
public void testActionFallbacks() {
|
||||
MappingRuleActionBase action =
|
||||
new MappingRuleActions.PlaceToQueueAction("a");
|
||||
new MappingRuleActions.PlaceToQueueAction("a", true);
|
||||
|
||||
action.setFallbackDefaultPlacement();
|
||||
assertPlaceDefaultResult(action.getFallback());
|
||||
@ -122,28 +122,29 @@ public void testPlaceToQueueAction() {
|
||||
variables.setImmutables("%immutable");
|
||||
|
||||
MappingRuleAction placeToStatic =
|
||||
new MappingRuleActions.PlaceToQueueAction("root.static.queue");
|
||||
new MappingRuleActions.PlaceToQueueAction("root.static.queue", true);
|
||||
|
||||
MappingRuleAction placeToDynamic =
|
||||
new MappingRuleActions.PlaceToQueueAction("root.%sub.%immutable");
|
||||
new MappingRuleActions.PlaceToQueueAction("root.%sub.%immutable", true);
|
||||
|
||||
MappingRuleAction placeToDynamicDoubleSub =
|
||||
MappingRuleActions.createPlaceToQueueAction("root.%sub%sub.%immutable");
|
||||
MappingRuleActions.createPlaceToQueueAction(
|
||||
"root.%sub%sub.%immutable", true);
|
||||
|
||||
MappingRuleAction placeToNull =
|
||||
MappingRuleActions.createPlaceToQueueAction(null);
|
||||
MappingRuleActions.createPlaceToQueueAction(null, true);
|
||||
|
||||
MappingRuleAction placeToEmpty =
|
||||
MappingRuleActions.createPlaceToQueueAction("");
|
||||
MappingRuleActions.createPlaceToQueueAction("", true);
|
||||
|
||||
MappingRuleAction placeToNulRef =
|
||||
new MappingRuleActions.PlaceToQueueAction("%null");
|
||||
new MappingRuleActions.PlaceToQueueAction("%null", true);
|
||||
|
||||
MappingRuleAction placeToEmptyRef =
|
||||
new MappingRuleActions.PlaceToQueueAction("%empty");
|
||||
new MappingRuleActions.PlaceToQueueAction("%empty", true);
|
||||
|
||||
MappingRuleAction placeToDefaultRef =
|
||||
new MappingRuleActions.PlaceToQueueAction("%default");
|
||||
new MappingRuleActions.PlaceToQueueAction("%default", true);
|
||||
|
||||
assertPlaceResult(placeToStatic.execute(variables), "root.static.queue");
|
||||
assertPlaceResult(placeToDynamic.execute(variables), "root.xxx.immutable");
|
||||
@ -160,7 +161,7 @@ public void testPlaceToQueueAction() {
|
||||
@Test
|
||||
public void testToStrings() {
|
||||
MappingRuleAction place = new MappingRuleActions.PlaceToQueueAction(
|
||||
"queue");
|
||||
"queue", true);
|
||||
MappingRuleAction varUpdate = new MappingRuleActions.VariableUpdateAction(
|
||||
"%var", "value");
|
||||
MappingRuleAction reject = new MappingRuleActions.RejectAction();
|
||||
|
Loading…
Reference in New Issue
Block a user