From cd0490e8c6dd502c639de2d26980556b78eeeb06 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Wed, 11 Nov 2020 17:06:12 +0100 Subject: [PATCH] Revert "YARN-10425. Replace the legacy placement engine in CS with the new one. Contributed by Gergely Pollak." This reverts commit b0ab222a6c0073c31c285e89e43508b3e0ca9576. --- .../RefreshMountTableEntriesRequest.java | 34 ++ .../server/resourcemanager/RMAppManager.java | 4 +- .../AppNameMappingPlacementRule.java | 204 +++++++ .../placement/CSMappingPlacementRule.java | 92 +-- .../MappingRuleValidationContextImpl.java | 16 +- .../placement/PlacementManager.java | 10 +- .../placement/PlacementRule.java | 25 - .../UserGroupMappingPlacementRule.java | 558 ++++++++++++++++++ .../scheduler/capacity/CapacityScheduler.java | 44 +- .../CapacitySchedulerConfiguration.java | 18 - .../resourcemanager/TestAppManager.java | 4 +- .../TestAppManagerWithFairScheduler.java | 14 +- .../placement/TestPlacementManager.java | 31 +- .../TestUserGroupMappingPlacementRule.java | 4 +- .../TestAbsoluteResourceWithAutoQueue.java | 1 - ...CapacitySchedulerAutoCreatedQueueBase.java | 28 +- ...estCapacitySchedulerAutoQueueCreation.java | 15 +- ...tCapacitySchedulerQueueMappingFactory.java | 43 +- .../scheduler/capacity/TestQueueMappings.java | 29 +- 19 files changed, 947 insertions(+), 227 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java diff --git a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java index e69de29bb2..899afe75c0 100644 --- a/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java +++ b/hadoop-hdfs-project/hadoop-hdfs-rbf/src/main/java/org/apache/hadoop/hdfs/server/federation/store/protocol/RefreshMountTableEntriesRequest.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hdfs.server.federation.store.protocol; + +import java.io.IOException; + +import org.apache.hadoop.hdfs.server.federation.store.driver.StateStoreSerializer; + +/** + * API request for refreshing mount table cached entries from state store. + */ +public abstract class RefreshMountTableEntriesRequest { + + public static RefreshMountTableEntriesRequest newInstance() + throws IOException { + return StateStoreSerializer + .newRecord(RefreshMountTableEntriesRequest.class); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 13c2ec7de4..fe18d8252d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -864,9 +864,9 @@ ApplicationPlacementContext placeApplication( if (placementManager != null) { try { String usernameUsedForPlacement = - getUserNameForPlacement(user, context, placementManager); + getUserNameForPlacement(user, context, placementManager); placementContext = placementManager - .placeApplication(context, usernameUsedForPlacement, isRecovery); + .placeApplication(context, usernameUsedForPlacement); } catch (YarnException e) { // Placement could also fail if the user doesn't exist in system // skip if the user is not found during recovery. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java new file mode 100644 index 0000000000..63d98ba6c4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/AppNameMappingPlacementRule.java @@ -0,0 +1,204 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.getPlacementContext; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.isStaticQueueMapping; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetAutoCreatedQueueMapping; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetQueueMapping; + +public class AppNameMappingPlacementRule extends PlacementRule { + private static final Logger LOG = LoggerFactory + .getLogger(AppNameMappingPlacementRule.class); + + public static final String CURRENT_APP_MAPPING = "%application"; + + private static final String QUEUE_MAPPING_NAME = "app-name"; + + private boolean overrideWithQueueMappings = false; + private List mappings = null; + protected CapacitySchedulerQueueManager queueManager; + + public AppNameMappingPlacementRule() { + this(false, null); + } + + public AppNameMappingPlacementRule(boolean overrideWithQueueMappings, + List newMappings) { + this.overrideWithQueueMappings = overrideWithQueueMappings; + this.mappings = newMappings; + } + + @Override + public boolean initialize(ResourceScheduler scheduler) + throws IOException { + if (!(scheduler instanceof CapacityScheduler)) { + throw new IOException( + "AppNameMappingPlacementRule can be configured only for " + + "CapacityScheduler"); + } + CapacitySchedulerContext schedulerContext = + (CapacitySchedulerContext) scheduler; + CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized App Name queue mappings, override: " + overrideWithQueueMappings); + + List queueMappings = + conf.getQueueMappingEntity(QUEUE_MAPPING_NAME); + + // Get new user mappings + List newMappings = new ArrayList<>(); + + queueManager = schedulerContext.getCapacitySchedulerQueueManager(); + + // check if mappings refer to valid queues + for (QueueMapping mapping : queueMappings) { + if (isStaticQueueMapping(mapping)) { + //at this point mapping.getQueueName() return only the queue name, since + //the config parsing have been changed making QueueMapping more + //consistent + + CSQueue queue = queueManager.getQueue(mapping.getFullPath()); + if (ifQueueDoesNotExist(queue)) { + //Try getting queue by its full path name, if it exists it is a static + //leaf queue indeed, without any auto creation magic + + if (queueManager.isAmbiguous(mapping.getFullPath())) { + throw new IOException( + "mapping contains ambiguous leaf queue reference " + mapping + .getFullPath()); + } + + //if leaf queue does not exist, + // this could be a potential auto created leaf queue + //validate if parent queue is specified, + // then it should exist and + // be an instance of AutoCreateEnabledParentQueue + QueueMapping newMapping = + validateAndGetAutoCreatedQueueMapping(queueManager, mapping); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping + .getQueue()); + } + newMappings.add(newMapping); + } else { + // if queue exists, validate + // if its an instance of leaf queue + // if its an instance of auto created leaf queue, + // then extract parent queue name and update queue mapping + QueueMapping newMapping = validateAndGetQueueMapping( + queueManager, queue, mapping); + newMappings.add(newMapping); + } + } else { + //If it is a dynamic queue mapping, + // we can safely assume leaf queue name does not have '.' in it + // validate + // if parent queue is specified, then + // parent queue exists and an instance of AutoCreateEnabledParentQueue + // + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping); + if (newMapping != null) { + newMappings.add(newMapping); + } else{ + newMappings.add(mapping); + } + } + } + + if (newMappings.size() > 0) { + this.mappings = newMappings; + this.overrideWithQueueMappings = overrideWithQueueMappings; + LOG.info("get valid queue mapping from app name config: " + + newMappings.toString() + ", override: " + overrideWithQueueMappings); + return true; + } + return false; + } + + private static boolean ifQueueDoesNotExist(CSQueue queue) { + return queue == null; + } + + private ApplicationPlacementContext getAppPlacementContext(String user, + String applicationName) throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.getSource().equals(CURRENT_APP_MAPPING)) { + if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) { + return getPlacementContext(mapping, applicationName, queueManager); + } else { + return getPlacementContext(mapping, queueManager); + } + } + if (mapping.getSource().equals(applicationName)) { + return getPlacementContext(mapping, queueManager); + } + } + return null; + } + + @Override + public ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) throws YarnException { + String queueName = asc.getQueue(); + String applicationName = asc.getApplicationName(); + if (mappings != null && mappings.size() > 0) { + try { + ApplicationPlacementContext mappedQueue = getAppPlacementContext(user, + applicationName); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + //queueName will be same as mapped queue name in case of recovery + || queueName.equals(mappedQueue.getQueue()) + || overrideWithQueueMappings) { + LOG.info("Application {} mapping [{}] to [{}] override {}", + applicationName, queueName, mappedQueue.getQueue(), + overrideWithQueueMappings); + return mappedQueue; + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationName + + " reason: " + ioex.getMessage(); + throw new YarnException(message); + } + } + return null; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java index aff75bae34..cad7f85349 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/CSMappingPlacementRule.java @@ -125,11 +125,7 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException { overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); if (groups == null) { - //We cannot use Groups#getUserToGroupsMappingService here, because when - //tests change the HADOOP_SECURITY_GROUP_MAPPING, Groups won't refresh its - //cached instance of groups, so we might get a Group instance which - //ignores the HADOOP_SECURITY_GROUP_MAPPING settings. - groups = new Groups(conf); + groups = Groups.getUserToGroupsMappingService(conf); } MappingRuleValidationContext validationContext = buildValidationContext(); @@ -149,8 +145,8 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException { } LOG.info("Initialized queue mappings, can override user specified " + - "queues: {} number of rules: {} mapping rules: {}", - overrideWithQueueMappings, mappingRules.size(), mappingRules); + "queues: {} number of rules: {}", overrideWithQueueMappings, + mappingRules.size()); if (LOG.isDebugEnabled()) { LOG.debug("Initialized with the following mapping rules:"); @@ -174,12 +170,6 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException { */ private void setupGroupsForVariableContext(VariableContext vctx, String user) throws IOException { - if (groups == null) { - LOG.warn( - "Group provider hasn't been set, cannot query groups for user {}", - user); - return; - } Set groupsSet = groups.getGroupsSet(user); String secondaryGroup = null; Iterator it = groupsSet.iterator(); @@ -203,18 +193,14 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user) } private VariableContext createVariableContext( - ApplicationSubmissionContext asc, String user) { + ApplicationSubmissionContext asc, String user) throws IOException { VariableContext vctx = new VariableContext(); vctx.put("%user", user); vctx.put("%specified", asc.getQueue()); vctx.put("%application", asc.getApplicationName()); vctx.put("%default", "root.default"); - try { - setupGroupsForVariableContext(vctx, user); - } catch (IOException e) { - LOG.warn("Unable to setup groups: {}", e.getMessage()); - } + setupGroupsForVariableContext(vctx, user); vctx.setImmutables(immutableVariables); return vctx; @@ -352,43 +338,34 @@ private ApplicationPlacementContext createPlacementContext(String queueName) { @Override public ApplicationPlacementContext getPlacementForApp( ApplicationSubmissionContext asc, String user) throws YarnException { - return getPlacementForApp(asc, user, false); - } - - @Override - public ApplicationPlacementContext getPlacementForApp( - ApplicationSubmissionContext asc, String user, boolean recovery) - throws YarnException { //We only use the mapping rules if overrideWithQueueMappings enabled //or the application is submitted to the default queue, which effectively //means the application doesn't have any specific queue. String appQueue = asc.getQueue(); - LOG.debug("Looking placement for app '{}' originally submitted to queue " + - "'{}', with override enabled '{}'", - asc.getApplicationName(), appQueue, overrideWithQueueMappings); if (appQueue != null && !appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) && !appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_FULL_NAME) && - !overrideWithQueueMappings && - !recovery) { + !overrideWithQueueMappings) { LOG.info("Have no jurisdiction over application submission '{}', " + "moving to next PlacementRule engine", asc.getApplicationName()); return null; } VariableContext variables; - variables = createVariableContext(asc, user); + try { + variables = createVariableContext(asc, user); + } catch (IOException e) { + LOG.error("Unable to setup variable context", e); + throw new YarnException(e); + } - ApplicationPlacementContext ret = null; for (MappingRule rule : mappingRules) { MappingRuleResult result = evaluateRule(rule, variables); switch (result.getResult()) { case PLACE_TO_DEFAULT: - ret = placeToDefault(asc, variables, rule); - break; + return placeToDefault(asc, variables, rule); case PLACE: - ret = placeToQueue(asc, rule, result); - break; + return placeToQueue(asc, rule, result); case REJECT: LOG.info("Rejecting application '{}', reason: Mapping rule '{}' " + " fallback action is set to REJECT.", @@ -400,42 +377,17 @@ public ApplicationPlacementContext getPlacementForApp( case SKIP: //SKIP means skip to the next rule, which is the default behaviour of //the for loop, so we don't need to take any extra actions - break; + break; default: LOG.error("Invalid result '{}'", result); } - - //If we already have a return value, we can return it! - if (ret != null) { - break; - } } - if (ret == null) { - //If no rule was applied we return null, to let the engine move onto the - //next placementRule class - LOG.info("No matching rule found for application '{}', moving to next " + - "PlacementRule engine", asc.getApplicationName()); - } - - if (recovery) { - //we need this part for backwards compatibility with recovery - //the legacy code checked if the placement matches the queue of the - //application to be recovered, and if it did, it created an - //ApplicationPlacementContext. - //However at a later point this is going to be changed, there are two - //major issues with this approach: - // 1) The recovery only uses LEAF queue names, which must be updated - // 2) The ORIGINAL queue which the application was submitted is NOT - // stored this might result in different placement evaluation since - // now we can have rules which give different result based on what - // the user submitted. - if (ret == null || !ret.getQueue().equals(asc.getQueue())) { - return null; - } - } - - return ret; + //If no rule was applied we return null, to let the engine move onto the + //next placementRule class + LOG.info("No matching rule found for application '{}', moving to next " + + "PlacementRule engine", asc.getApplicationName()); + return null; } private ApplicationPlacementContext placeToQueue( @@ -458,13 +410,13 @@ private ApplicationPlacementContext placeToDefault( String queueName = validateAndNormalizeQueue( variables.replacePathVariables("%default"), false); LOG.debug("Application '{}' have been placed to queue '{}' by " + - "the fallback option of rule {}", + "the fallback option of rule {}", asc.getApplicationName(), queueName, rule); return createPlacementContext(queueName); } catch (YarnException e) { LOG.error("Rejecting application due to a failed fallback" + " action '{}'" + ", reason: {}", asc.getApplicationName(), - e); + e.getMessage()); //We intentionally omit the details, we don't want any server side //config information to leak to the client side throw new YarnException("Application submission have been rejected by a" + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java index 80bf9293d4..cbde33f19f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/MappingRuleValidationContextImpl.java @@ -154,22 +154,20 @@ private boolean validateDynamicQueuePath(MappingQueuePath path) } if (!(parentQueue instanceof ManagedParentQueue)) { - if (parentQueue.getChildQueues() != null) { - for (CSQueue queue : parentQueue.getChildQueues()) { - if (queue instanceof LeafQueue) { - //if a non managed parent queue has at least one leaf queue, this - //mapping can be valid, we cannot do any more checks - return true; - } + for (CSQueue queue : parentQueue.getChildQueues()) { + if (queue instanceof LeafQueue) { + //if a non managed parent queue has at least one leaf queue, this + //mapping can be valid, we cannot do any more checks + return true; } } //There is no way we can place anything into the queue referenced by the // rule, because we cannot auto create, and we don't have any leaf queues - //Actually this branch is not accessible with the current queue hierarchy, + //Actually this branch is not accessibe with the current queue hierarchy, //there should be no parents without any leaf queues. This condition says //for sanity checks - throw new YarnException("Target queue path '" + path + "' has " + + throw new YarnException("Target queue path '" + path + "' has" + "a non-managed parent queue which has no LeafQueues either."); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java index efde8f9bb2..4e9195d15b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementManager.java @@ -54,8 +54,7 @@ public void updateRules(List rules) { } public ApplicationPlacementContext placeApplication( - ApplicationSubmissionContext asc, String user, boolean recovery) - throws YarnException { + ApplicationSubmissionContext asc, String user) throws YarnException { readLock.lock(); try { if (null == rules || rules.isEmpty()) { @@ -64,7 +63,7 @@ public ApplicationPlacementContext placeApplication( ApplicationPlacementContext placement = null; for (PlacementRule rule : rules) { - placement = rule.getPlacementForApp(asc, user, recovery); + placement = rule.getPlacementForApp(asc, user); if (placement != null) { break; } @@ -75,11 +74,6 @@ public ApplicationPlacementContext placeApplication( readLock.unlock(); } } - - public ApplicationPlacementContext placeApplication( - ApplicationSubmissionContext asc, String user) throws YarnException { - return placeApplication(asc, user, false); - } @VisibleForTesting public List getPlacementRules() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java index 50d686ad37..dde632e4db 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/PlacementRule.java @@ -79,29 +79,4 @@ public abstract boolean initialize(ResourceScheduler scheduler) */ public abstract ApplicationPlacementContext getPlacementForApp( ApplicationSubmissionContext asc, String user) throws YarnException; - - - /** - * Return the scheduler queue name the application should be placed in - * wrapped in an {@link ApplicationPlacementContext} object. - * - * A non null return value places the application in a queue, - * a null value means the queue is not yet determined. The - * next {@link PlacementRule} in the list maintained in the - * {@link PlacementManager} will be executed. - * - * @param asc The context of the application created on submission - * @param user The name of the user submitting the application - * @param recovery Indicates if the submission is a recovery - * - * @throws YarnException for any error while executing the rule - * - * @return The queue name wrapped in {@link ApplicationPlacementContext} or - * null if no queue was resolved - */ - public ApplicationPlacementContext getPlacementForApp( - ApplicationSubmissionContext asc, String user, boolean recovery) - throws YarnException { - return getPlacementForApp(asc, user); - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java new file mode 100644 index 0000000000..46de0f8909 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -0,0 +1,558 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.placement; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Iterator; +import java.util.List; +import java.util.Set; + +import org.apache.commons.lang3.StringUtils; +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + +public class UserGroupMappingPlacementRule extends PlacementRule { + private static final Logger LOG = LoggerFactory + .getLogger(UserGroupMappingPlacementRule.class); + + public static final String CURRENT_USER_MAPPING = "%user"; + + public static final String PRIMARY_GROUP_MAPPING = "%primary_group"; + + public static final String SECONDARY_GROUP_MAPPING = "%secondary_group"; + + private boolean overrideWithQueueMappings = false; + private List mappings = null; + private Groups groups; + private CapacitySchedulerQueueManager queueManager; + + public UserGroupMappingPlacementRule(){ + this(false, null, null); + } + + @VisibleForTesting + UserGroupMappingPlacementRule(boolean overrideWithQueueMappings, + List newMappings, Groups groups) { + this.mappings = newMappings; + this.overrideWithQueueMappings = overrideWithQueueMappings; + this.groups = groups; + } + + private String getPrimaryGroup(String user) throws IOException { + return groups.getGroupsSet(user).iterator().next(); + } + + private String getSecondaryGroup(String user) throws IOException { + Set groupsSet = groups.getGroupsSet(user); + String secondaryGroup = null; + // Traverse all secondary groups (as there could be more than one + // and position is not guaranteed) and ensure there is queue with + // the same name + Iterator it = groupsSet.iterator(); + it.next(); + while (it.hasNext()) { + String group = it.next(); + if (this.queueManager.getQueue(group) != null) { + secondaryGroup = group; + break; + } + } + + if (secondaryGroup == null && LOG.isDebugEnabled()) { + LOG.debug("User {} is not associated with any Secondary " + + "Group. Hence it may use the 'default' queue", user); + } + return secondaryGroup; + } + + private ApplicationPlacementContext getPlacementForUser(String user) + throws IOException { + for (QueueMapping mapping : mappings) { + if (mapping.getType().equals(MappingType.USER)) { + if (mapping.getSource().equals(CURRENT_USER_MAPPING)) { + if (mapping.getParentQueue() != null + && mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) + && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "primary group current user mapping", user); + } + return getContextForGroupParent(user, mapping, + getPrimaryGroup(user)); + } else if (mapping.getParentQueue() != null + && mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING) + && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "secondary group current user mapping", user); + } + return getContextForGroupParent(user, mapping, + getSecondaryGroup(user)); + } else if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "current user mapping", user); + } + return getPlacementContext(mapping, user); + } else if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "primary group mapping", user); + } + return getPlacementContext(mapping, getPrimaryGroup(user)); + } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "secondary group mapping", user); + } + return getPlacementContext(mapping, getSecondaryGroup(user)); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "current user static mapping", user); + } + return getPlacementContext(mapping); + } + } + + if (user.equals(mapping.getSource())) { + if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "static user primary group mapping", user); + } + return getPlacementContext(mapping, getPrimaryGroup(user)); + } else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) { + String secondaryGroup = getSecondaryGroup(user); + if (secondaryGroup != null) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "static user secondary group mapping", user); + } + return getPlacementContext(mapping, secondaryGroup); + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Wanted to create placement context for user {}" + + " using static user secondary group mapping," + + " but user has no secondary group!", user); + } + return null; + } + } else { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "current user static mapping", user); + } + return getPlacementContext(mapping); + } + } + } + if (mapping.getType().equals(MappingType.GROUP)) { + for (String userGroups : groups.getGroupsSet(user)) { + if (userGroups.equals(mapping.getSource())) { + if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "static group current user mapping", user); + } + return getPlacementContext(mapping, user); + } + if (LOG.isDebugEnabled()) { + LOG.debug("Creating placement context for user {} using " + + "static group static mapping", user); + } + return getPlacementContext(mapping); + } + } + } + } + return null; + } + + /** + * This convenience method allows to change the parent path or a leafName in + * a mapping object, by creating a new one, using the builder and copying the + * rest of the parameters. + * @param mapping The mapping to be changed + * @param parentPath The new parentPath of the mapping + * @param leafName The new leafQueueName of the mapping + * @return The updated NEW mapping + */ + private QueueMapping alterMapping( + QueueMapping mapping, String parentPath, String leafName) { + return QueueMappingBuilder.create() + .type(mapping.getType()) + .source(mapping.getSource()) + .queue(leafName) + .parentQueue(parentPath) + .build(); + } + + // invoked for mappings: + // u:%user:%primary_group.%user + // u:%user:%secondary_group.%user + private ApplicationPlacementContext getContextForGroupParent( + String user, + QueueMapping mapping, + String group) throws IOException { + + CSQueue groupQueue = this.queueManager.getQueue(group); + if (groupQueue != null) { + // replace the group string + QueueMapping resolvedGroupMapping = alterMapping( + mapping, + groupQueue.getQueuePath(), + user); + validateQueueMapping(resolvedGroupMapping); + return getPlacementContext(resolvedGroupMapping, user); + } else { + if (queueManager.isAmbiguous(group)) { + LOG.info("Queue mapping rule expect group queue to exist with name {}" + + " but the reference is ambiguous!", group); + } else { + LOG.info("Queue mapping rule expect group queue to exist with name {}" + + " but it does not exist!", group); + } + return null; + } + } + + @Override + public ApplicationPlacementContext getPlacementForApp( + ApplicationSubmissionContext asc, String user) + throws YarnException { + String queueName = asc.getQueue(); + ApplicationId applicationId = asc.getApplicationId(); + if (mappings != null && mappings.size() > 0) { + try { + ApplicationPlacementContext mappedQueue = getPlacementForUser(user); + if (mappedQueue != null) { + // We have a mapping, should we use it? + if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) + //queueName will be same as mapped queue name in case of recovery + || queueName.equals(mappedQueue.getQueue()) + || overrideWithQueueMappings) { + LOG.info("Application {} user {} mapping [{}] to [{}] override {}", + applicationId, user, queueName, mappedQueue.getQueue(), + overrideWithQueueMappings); + return mappedQueue; + } + } + } catch (IOException ioex) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + ioex.getMessage(); + throw new YarnException(message, ioex); + } + } + return null; + } + + private ApplicationPlacementContext getPlacementContext( + QueueMapping mapping) throws IOException { + return getPlacementContext(mapping, mapping.getQueue()); + } + + private ApplicationPlacementContext getPlacementContext(QueueMapping mapping, + String leafQueueName) throws IOException { + //leafQueue name no longer identifies a queue uniquely checking ambiguity + if (!mapping.hasParentQueue() && queueManager.isAmbiguous(leafQueueName)) { + throw new IOException("mapping contains ambiguous leaf queue reference " + + leafQueueName); + } + + if (!StringUtils.isEmpty(mapping.getParentQueue())) { + return getPlacementContextWithParent(mapping, leafQueueName); + } else { + return getPlacementContextNoParent(leafQueueName); + } + } + + private ApplicationPlacementContext getPlacementContextWithParent( + QueueMapping mapping, + String leafQueueName) { + CSQueue parent = queueManager.getQueue(mapping.getParentQueue()); + //we don't find the specified parent, so the placement rule is invalid + //for this case + if (parent == null) { + if (queueManager.isAmbiguous(mapping.getParentQueue())) { + LOG.warn("Placement rule specified a parent queue {}, but it is" + + "ambiguous.", mapping.getParentQueue()); + } else { + LOG.warn("Placement rule specified a parent queue {}, but it does" + + "not exist.", mapping.getParentQueue()); + } + return null; + } + + String parentPath = parent.getQueuePath(); + + //if we have a parent which is not a managed parent, we check if the leaf + //queue exists under this parent + if (!(parent instanceof ManagedParentQueue)) { + CSQueue queue = queueManager.getQueue(parentPath + "." + leafQueueName); + //if the queue doesn't exit we return null + if (queue == null) { + LOG.warn("Placement rule specified a parent queue {}, but it is" + + " not a managed parent queue, and no queue exists with name {} " + + "under it.", mapping.getParentQueue(), leafQueueName); + return null; + } + } + //at this point we either have a managed parent or the queue actually + //exists so we have a placement context, returning it + return new ApplicationPlacementContext(leafQueueName, parentPath); + } + + private ApplicationPlacementContext getPlacementContextNoParent( + String leafQueueName) { + //in this case we don't have a parent specified so we expect the queue to + //exist, otherwise the mapping will not be valid for this case + CSQueue queue = queueManager.getQueue(leafQueueName); + if (queue == null) { + if (queueManager.isAmbiguous(leafQueueName)) { + LOG.warn("Queue {} specified in placement rule is ambiguous", + leafQueueName); + } else { + LOG.warn("Queue {} specified in placement rule does not exist", + leafQueueName); + } + return null; + } + + //getting parent path to make sure if the leaf name would become ambiguous + //the placement context stays valid. + CSQueue parent = queueManager.getQueue(leafQueueName).getParent(); + return new ApplicationPlacementContext( + leafQueueName, parent.getQueuePath()); + } + + @VisibleForTesting + @Override + public boolean initialize(ResourceScheduler scheduler) + throws IOException { + if (!(scheduler instanceof CapacityScheduler)) { + throw new IOException( + "UserGroupMappingPlacementRule can be configured only for " + + "CapacityScheduler"); + } + CapacitySchedulerContext schedulerContext = + (CapacitySchedulerContext) scheduler; + CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration(); + boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings(); + LOG.info( + "Initialized queue mappings, override: " + overrideWithQueueMappings); + + List queueMappings = conf.getQueueMappings(); + + // Get new user/group mappings + List newMappings = new ArrayList<>(); + + queueManager = schedulerContext.getCapacitySchedulerQueueManager(); + + // check if mappings refer to valid queues + for (QueueMapping mapping : queueMappings) { + //at this point mapping.getQueueName() return only the queue name, since + //the config parsing have been changed making QueueMapping more consistent + + if (isStaticQueueMapping(mapping)) { + //Try getting queue by its full path name, if it exists it is a static + //leaf queue indeed, without any auto creation magic + CSQueue queue = queueManager.getQueue(mapping.getFullPath()); + if (ifQueueDoesNotExist(queue)) { + //We might not be able to find the queue, because the reference was + // ambiguous this should only happen if the queue was referenced by + // leaf name only + if (queueManager.isAmbiguous(mapping.getFullPath())) { + throw new IOException( + "mapping contains ambiguous leaf queue reference " + mapping + .getFullPath()); + } + + //if leaf queue does not exist, + // this could be a potential auto created leaf queue + //validate if parent queue is specified, + // then it should exist and + // be an instance of AutoCreateEnabledParentQueue + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + mapping + .getQueue()); + } + newMappings.add(newMapping); + } else { + // if queue exists, validate + // if its an instance of leaf queue + // if its an instance of auto created leaf queue, + // then extract parent queue name and update queue mapping + QueueMapping newMapping = validateAndGetQueueMapping(queueManager, + queue, mapping); + newMappings.add(newMapping); + } + } else{ + //If it is a dynamic queue mapping, + // we can safely assume leaf queue name does not have '.' in it + // validate + // if parent queue is specified, then + // parent queue exists and an instance of AutoCreateEnabledParentQueue + // + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping); + if (newMapping != null) { + newMappings.add(newMapping); + } else{ + newMappings.add(mapping); + } + } + } + + // initialize groups if mappings are present + if (newMappings.size() > 0) { + this.mappings = newMappings; + this.groups = Groups.getUserToGroupsMappingService( + ((CapacityScheduler)scheduler).getConf()); + this.overrideWithQueueMappings = overrideWithQueueMappings; + return true; + } + return false; + } + + private static QueueMapping validateAndGetQueueMapping( + CapacitySchedulerQueueManager queueManager, CSQueue queue, + QueueMapping mapping) throws IOException { + if (!(queue instanceof LeafQueue)) { + throw new IOException( + "mapping contains invalid or non-leaf queue : " + + mapping.getFullPath()); + } + + if (queue instanceof AutoCreatedLeafQueue && queue + .getParent() instanceof ManagedParentQueue) { + + QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping( + queueManager, mapping); + if (newMapping == null) { + throw new IOException( + "mapping contains invalid or non-leaf queue " + + mapping.getFullPath()); + } + return newMapping; + } + return mapping; + } + + private static boolean ifQueueDoesNotExist(CSQueue queue) { + return queue == null; + } + + private static QueueMapping validateAndGetAutoCreatedQueueMapping( + CapacitySchedulerQueueManager queueManager, QueueMapping mapping) + throws IOException { + if (mapping.hasParentQueue() + && (mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) + || mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING))) { + // dynamic parent queue + return mapping; + } else if (mapping.hasParentQueue()) { + //if parent queue is specified, + // then it should exist and be an instance of ManagedParentQueue + QueuePlacementRuleUtils.validateQueueMappingUnderParentQueue( + queueManager.getQueue(mapping.getParentQueue()), + mapping.getParentQueue(), mapping.getQueue()); + return mapping; + } + + return null; + } + + private static boolean isStaticQueueMapping(QueueMapping mapping) { + return !mapping.getQueue() + .contains(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING) + && !mapping.getQueue() + .contains(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING) + && !mapping.getQueue() + .contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING); + } + + private void validateQueueMapping(QueueMapping queueMapping) + throws IOException { + String parentQueueName = queueMapping.getParentQueue(); + String leafQueueFullName = queueMapping.getFullPath(); + CSQueue parentQueue = queueManager.getQueueByFullName(parentQueueName); + CSQueue leafQueue = queueManager.getQueue(leafQueueFullName); + + if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) { + //this might be confusing, but a mapping is not guaranteed to provide the + //parent queue's name, which can result in ambiguous queue references + //if no parent queueName is provided mapping.getFullPath() is the same + //as mapping.getQueue() + if (leafQueue == null && queueManager.isAmbiguous(leafQueueFullName)) { + throw new IOException("mapping contains ambiguous leaf queue name: " + + leafQueueFullName); + } else if (parentQueue == null || + (!(parentQueue instanceof ManagedParentQueue))) { + throw new IOException("mapping contains invalid or non-leaf queue " + + " and no managed parent is found: " + + leafQueueFullName); + } + } else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) { + throw new IOException( + "mapping contains invalid parent queue [" + parentQueueName + "]"); + } else if (!parentQueue.getQueuePath() + .equals(leafQueue.getParent().getQueuePath())) { + throw new IOException("mapping contains invalid parent queue " + + "which does not match existing leaf queue's parent : [" + + parentQueue.getQueuePath() + "] does not match [ " + + leafQueue.getParent().getQueuePath() + "]"); + } + } + + @VisibleForTesting + public List getQueueMappings() { + return mappings; + } + + @VisibleForTesting + @Private + public void setQueueManager(CapacitySchedulerQueueManager queueManager) { + this.queueManager = queueManager; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e25301bb17..259cd5c3ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -35,10 +35,6 @@ import java.util.concurrent.atomic.AtomicBoolean; import org.apache.commons.lang3.StringUtils; -import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; -import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; -import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; -import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -75,6 +71,11 @@ import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState; import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData; @@ -678,14 +679,24 @@ public int getPendingBacklogs() { } } - @VisibleForTesting - public PlacementRule getCSMappingPlacementRule() throws IOException { + public PlacementRule getUserGroupMappingPlacementRule() throws IOException { readLock.lock(); try { - CSMappingPlacementRule mappingRule = new CSMappingPlacementRule(); - mappingRule.initialize(this); - return mappingRule; + UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(); + ugRule.initialize(this); + return ugRule; + } finally { + readLock.unlock(); + } + } + + public PlacementRule getAppNameMappingPlacementRule() throws IOException { + readLock.lock(); + try { + AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(); + anRule.initialize(this); + return anRule; } finally { readLock.unlock(); } @@ -707,18 +718,19 @@ public void updatePlacementRules() throws IOException { } placementRuleStrs = new ArrayList<>(distinguishRuleSet); - boolean csMappingAdded = false; for (String placementRuleStr : placementRuleStrs) { switch (placementRuleStr) { case YarnConfiguration.USER_GROUP_PLACEMENT_RULE: + PlacementRule ugRule = getUserGroupMappingPlacementRule(); + if (null != ugRule) { + placementRules.add(ugRule); + } + break; case YarnConfiguration.APP_NAME_PLACEMENT_RULE: - if (!csMappingAdded) { - PlacementRule csMappingRule = getCSMappingPlacementRule(); - if (null != csMappingRule) { - placementRules.add(csMappingRule); - csMappingAdded = true; - } + PlacementRule anRule = getAppNameMappingPlacementRule(); + if (null != anRule) { + placementRules.add(anRule); } break; default: diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index aa78c21d86..0b74e506c0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -1229,24 +1229,6 @@ public void setQueueMappings(List queueMappings) { setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); } - - @Private - @VisibleForTesting - public void setAppNameMappings(List queueMappings) { - if (queueMappings == null) { - return; - } - - List queueMappingStrs = new ArrayList<>(); - for (QueueMapping mapping : queueMappings) { - String rule = mapping.toString(); - String[] parts = rule.split(":"); - queueMappingStrs.add(parts[1] + ":" + parts[2]); - } - - setStrings(QUEUE_MAPPING_NAME, StringUtils.join(",", queueMappingStrs)); - } - @Private @VisibleForTesting void setWorkflowPriorityMappings( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 8e53b1a209..e8b4105e9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -1053,9 +1053,7 @@ public ApplicationPlacementContext answer(InvocationOnMock invocation) } }).when(placementMgr).placeApplication( - any(ApplicationSubmissionContext.class), - any(String.class), - any(Boolean.class)); + any(ApplicationSubmissionContext.class), any(String.class)); rmContext.setQueuePlacementManager(placementMgr); asContext.setQueue("oldQueue"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java index b3ed5ef58c..37ff02d503 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManagerWithFairScheduler.java @@ -127,7 +127,7 @@ public void testQueueSubmitWithHighQueueContainerSize() ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res); // Submit to limited queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("limited")); try { rmAppManager.submitApplication(asContext, "test"); @@ -138,7 +138,7 @@ public void testQueueSubmitWithHighQueueContainerSize() } // submit same app but now place it in the unlimited queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("root.unlimited")); rmAppManager.submitApplication(asContext, "test"); } @@ -172,7 +172,7 @@ public void testQueueSubmitWithPermissionLimits() ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res); // Submit to no access queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("noaccess")); try { rmAppManager.submitApplication(asContext, "test"); @@ -182,13 +182,13 @@ public void testQueueSubmitWithPermissionLimits() e.getCause() instanceof AccessControlException); } // Submit to submit access queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("submitonly")); rmAppManager.submitApplication(asContext, "test"); // Submit second app to admin access queue appId = MockApps.newAppID(2); asContext = createAppSubmitCtx(appId, res); - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("adminonly")); rmAppManager.submitApplication(asContext, "test"); } @@ -245,7 +245,7 @@ public void testQueueSubmitWithAutoCreateQueue() ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res); // Submit to noaccess parent with non existent child queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("root.noaccess.child")); try { rmAppManager.submitApplication(asContext, "test"); @@ -255,7 +255,7 @@ public void testQueueSubmitWithAutoCreateQueue() e.getCause() instanceof AccessControlException); } // Submit to submitonly parent with non existent child queue - when(placementMgr.placeApplication(any(), any(), any(Boolean.class))) + when(placementMgr.placeApplication(any(), any())) .thenReturn(new ApplicationPlacementContext("root.submitonly.child")); rmAppManager.submitApplication(asContext, "test"); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java index 720024779d..22a9125576 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestPlacementManager.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement; -import org.apache.hadoop.thirdparty.com.google.common.collect.Lists; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -34,6 +33,7 @@ import org.junit.Test; import java.util.ArrayList; +import java.util.Arrays; import java.util.List; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT; @@ -83,12 +83,9 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception { USER1)) .build(); - cs.getConfiguration().setQueueMappings( - Lists.newArrayList(userQueueMapping)); - CSMappingPlacementRule ugRule = new CSMappingPlacementRule(); - ugRule.initialize(cs); + UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule( + false, Arrays.asList(userQueueMapping), null); queuePlacementRules.add(ugRule); - pm.updateRules(queuePlacementRules); ApplicationSubmissionContext asc = Records.newRecord( @@ -105,14 +102,17 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception { .parentQueue(PARENT_QUEUE) .build(); - cs.getConfiguration().setAppNameMappings( - Lists.newArrayList(queueMappingEntity)); - CSMappingPlacementRule anRule = new CSMappingPlacementRule(); - anRule.initialize(cs); + AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false, + Arrays.asList(queueMappingEntity)); queuePlacementRules.add(anRule); pm.updateRules(queuePlacementRules); - ApplicationPlacementContext pc = pm.placeApplication(asc, USER2); - Assert.assertNotNull(pc); + try { + ApplicationPlacementContext pc = pm.placeApplication(asc, USER2); + Assert.assertNotNull(pc); + } catch (Exception e) { + e.printStackTrace(); + Assert.fail("Exception not expected"); + } } @Test @@ -121,9 +121,10 @@ public void testPlacementRuleUpdationOrder() throws Exception { QueueMapping userQueueMapping = QueueMappingBuilder.create() .type(MappingType.USER).source(USER1) .queue(getQueueMapping(PARENT_QUEUE, USER1)).build(); + UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule( + false, Arrays.asList(userQueueMapping), null); - CSMappingPlacementRule ugRule = new CSMappingPlacementRule(); - + // Configure placement rule conf.set(YarnConfiguration.QUEUE_PLACEMENT_RULES, ugRule.getName()); queueMappings.add(userQueueMapping); conf.setQueueMappings(queueMappings); @@ -134,7 +135,7 @@ public void testPlacementRuleUpdationOrder() throws Exception { PlacementManager pm = cs.getRMContext().getQueuePlacementManager(); // As we are setting placement rule, It shouldn't update default - // placement rule ie user-group. Number of placement rules should be 1. + // placement rule ie user-group. Number of placemnt rules should be 1. Assert.assertEquals(1, pm.getPlacementRules().size()); // Verifying if placement rule set is same as the one we configured Assert.assertEquals(ugRule.getName(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index d93496ba01..79f1d406e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.placement; +import static org.junit.Assert.fail; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; import static org.mockito.Mockito.isNull; @@ -156,7 +157,7 @@ public void testSecondaryGroupMapping() throws IOException, YarnException { .build()); } - @Test + @Test(expected = YarnException.class) public void testNullGroupMapping() throws IOException, YarnException { conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, NullGroupsMapping.class, GroupMappingServiceProvider.class); @@ -170,6 +171,7 @@ public void testNullGroupMapping() throws IOException, YarnException { .inputUser("a") .expectedQueue("default") .build()); + fail("No Groups for user 'a'"); } @Test diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java index 683e9fcf38..84d3756377 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceWithAutoQueue.java @@ -159,7 +159,6 @@ public void testAutoCreateLeafQueueCreation() throws Exception { csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, ResourceScheduler.class); - csConf.setOverrideWithQueueMappings(true); mockRM = new MockRM(csConf); cs = (CapacityScheduler) mockRM.getResourceScheduler(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java index 4757cd79a0..a414ab44dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoCreatedQueueBase.java @@ -114,9 +114,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { public static final String C = CapacitySchedulerConfiguration.ROOT + ".c"; public static final String D = CapacitySchedulerConfiguration.ROOT + ".d"; public static final String E = CapacitySchedulerConfiguration.ROOT + ".e"; - public static final String ESUBGROUP1 = + public static final String ASUBGROUP1 = CapacitySchedulerConfiguration.ROOT + ".esubgroup1"; - public static final String FGROUP = + public static final String AGROUP = CapacitySchedulerConfiguration.ROOT + ".fgroup"; public static final String A1 = A + ".a1"; public static final String A2 = A + ".a2"; @@ -124,14 +124,14 @@ public class TestCapacitySchedulerAutoCreatedQueueBase { public static final String B2 = B + ".b2"; public static final String B3 = B + ".b3"; public static final String B4 = B + ".b4subgroup1"; - public static final String ESUBGROUP1_A = ESUBGROUP1 + ".e"; - public static final String FGROUP_F = FGROUP + ".f"; + public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e"; + public static final String AGROUP_A = AGROUP + ".f"; public static final float A_CAPACITY = 20f; public static final float B_CAPACITY = 20f; public static final float C_CAPACITY = 20f; public static final float D_CAPACITY = 20f; - public static final float ESUBGROUP1_CAPACITY = 10f; - public static final float FGROUP_CAPACITY = 10f; + public static final float ASUBGROUP1_CAPACITY = 10f; + public static final float AGROUP_CAPACITY = 10f; public static final float A1_CAPACITY = 30; public static final float A2_CAPACITY = 70; @@ -371,8 +371,8 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setCapacity(B, B_CAPACITY); conf.setCapacity(C, C_CAPACITY); conf.setCapacity(D, D_CAPACITY); - conf.setCapacity(ESUBGROUP1, ESUBGROUP1_CAPACITY); - conf.setCapacity(FGROUP, FGROUP_CAPACITY); + conf.setCapacity(ASUBGROUP1, ASUBGROUP1_CAPACITY); + conf.setCapacity(AGROUP, AGROUP_CAPACITY); // Define 2nd-level queues conf.setQueues(A, new String[] { "a1", "a2" }); @@ -391,12 +391,12 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( conf.setCapacity(B4, B4_CAPACITY); conf.setUserLimitFactor(B4, 100.0f); - conf.setQueues(ESUBGROUP1, new String[] {"e"}); - conf.setCapacity(ESUBGROUP1_A, 100f); - conf.setUserLimitFactor(ESUBGROUP1_A, 100.0f); - conf.setQueues(FGROUP, new String[] {"f"}); - conf.setCapacity(FGROUP_F, 100f); - conf.setUserLimitFactor(FGROUP_F, 100.0f); + conf.setQueues(ASUBGROUP1, new String[] {"e"}); + conf.setCapacity(ASUBGROUP1_A, 100f); + conf.setUserLimitFactor(ASUBGROUP1_A, 100.0f); + conf.setQueues(AGROUP, new String[] {"f"}); + conf.setCapacity(AGROUP_A, 100f); + conf.setUserLimitFactor(AGROUP_A, 100.0f); conf.setUserLimitFactor(C, 1.0f); conf.setAutoCreateChildQueueEnabled(C, true); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index 084a177048..596cca1402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -86,6 +86,7 @@ import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager .NO_LABEL; +import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; import static org.junit.Assert.assertEquals; @@ -105,8 +106,6 @@ public class TestCapacitySchedulerAutoQueueCreation private static final Logger LOG = LoggerFactory.getLogger( TestCapacitySchedulerAutoQueueCreation.class); - private static final String CURRENT_USER_MAPPING = "%user"; - private static final Resource TEMPLATE_MAX_RES = Resource.newInstance(16 * GB, 48); @@ -425,16 +424,16 @@ public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping() //dynamic queue mapping try { - setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a1", + setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a", CURRENT_USER_MAPPING); newCS.updatePlacementRules(); fail("Expected invalid parent queue mapping failure"); } catch (IOException e) { //expected exception - assertTrue(e.getMessage().contains( - "Target queue path 'a1.%user' has a non-managed parent queue")); + "invalid parent queue which does not have auto creation of leaf " + + "queues enabled [" + "a" + "]")); } //"a" is not auto create enabled and app_user does not exist as a leaf @@ -447,8 +446,8 @@ public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping() fail("Expected invalid parent queue mapping failure"); } catch (IOException e) { //expected exception - assertTrue(e.getMessage().contains( - "contains an invalid parent queue 'INVALID_PARENT_QUEUE'")); + assertTrue(e.getMessage() + .contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]")); } } finally { if (newMockRM != null) { @@ -478,7 +477,7 @@ public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping() fail("Expected invalid parent queue mapping failure"); } catch (IOException e) { //expected exception - assertTrue(e.getMessage().contains("invalid parent queue")); + assertTrue(e.getMessage().contains("invalid parent queue []")); } } finally { if (newMockRM != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java index 6a478a06e4..5beda25225 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java @@ -25,11 +25,11 @@ import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; -import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; import org.apache.hadoop.yarn.util.Records; @@ -46,8 +46,10 @@ public class TestCapacitySchedulerQueueMappingFactory { private static final String QUEUE_MAPPING_NAME = "app-name"; - private static final String QUEUE_MAPPING_RULE = - CSMappingPlacementRule.class.getCanonicalName(); + private static final String QUEUE_MAPPING_RULE_APP_NAME = + "org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule"; + private static final String QUEUE_MAPPING_RULE_USER_GROUP = + "org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule"; public static final String USER = "user_"; public static final String PARENT_QUEUE = "c"; @@ -57,7 +59,8 @@ public static CapacitySchedulerConfiguration setupQueueMappingsForRules( List queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(QUEUE_MAPPING_RULE); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); + queuePlacementRules.add(QUEUE_MAPPING_RULE_APP_NAME); conf.setQueuePlacementRules(queuePlacementRules); @@ -131,7 +134,8 @@ public void testUpdatePlacementRulesFactory() throws Exception { } // verify both placement rules were added successfully - assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE)); + assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP)); + assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME)); } finally { if(mockRM != null) { mockRM.close(); @@ -150,7 +154,7 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception { SimpleGroupsMapping.class, GroupMappingServiceProvider.class); List queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(QUEUE_MAPPING_RULE); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); conf.setQueuePlacementRules(queuePlacementRules); List existingMappingsForUG = conf.getQueueMappings(); @@ -196,8 +200,8 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception { List rules = cs.getRMContext().getQueuePlacementManager().getPlacementRules(); - CSMappingPlacementRule r = - (CSMappingPlacementRule) rules.get(0); + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1"); assertEquals("Queue", "b1", ctx.getQueue()); @@ -323,7 +327,7 @@ private void testNestedUserQueueWithDynamicParentQueue( SimpleGroupsMapping.class, GroupMappingServiceProvider.class); List queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(QUEUE_MAPPING_RULE); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); conf.setQueuePlacementRules(queuePlacementRules); List existingMappingsForUG = conf.getQueueMappings(); @@ -349,8 +353,8 @@ private void testNestedUserQueueWithDynamicParentQueue( List rules = cs.getRMContext().getQueuePlacementManager().getPlacementRules(); - CSMappingPlacementRule r = - (CSMappingPlacementRule) rules.get(0); + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user); assertEquals("Queue", user, ctx.getQueue()); @@ -378,7 +382,7 @@ public void testDynamicPrimaryGroupQueue() throws Exception { SimpleGroupsMapping.class, GroupMappingServiceProvider.class); List queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(QUEUE_MAPPING_RULE); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); conf.setQueuePlacementRules(queuePlacementRules); List existingMappingsForUG = conf.getQueueMappings(); @@ -422,8 +426,8 @@ public void testDynamicPrimaryGroupQueue() throws Exception { List rules = cs.getRMContext().getQueuePlacementManager().getPlacementRules(); - CSMappingPlacementRule r = - (CSMappingPlacementRule) rules.get(0); + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1"); assertEquals("Queue", "b1", ctx.getQueue()); @@ -447,7 +451,7 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception { SimpleGroupsMapping.class, GroupMappingServiceProvider.class); List queuePlacementRules = new ArrayList<>(); - queuePlacementRules.add(QUEUE_MAPPING_RULE); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); conf.setQueuePlacementRules(queuePlacementRules); List existingMappingsForUG = conf.getQueueMappings(); @@ -469,11 +473,11 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception { .queue("%primary_group") .build(); - // u:b4:c.%secondary_group + // u:b4:%secondary_group QueueMapping userQueueMapping3 = QueueMappingBuilder.create() .type(QueueMapping.MappingType.USER) .source("e") - .queue("c.%secondary_group") + .queue("%secondary_group") .build(); queueMappingsForUG.add(userQueueMapping1); @@ -499,8 +503,8 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception { List rules = cs.getRMContext().getQueuePlacementManager().getPlacementRules(); - CSMappingPlacementRule r = - (CSMappingPlacementRule) rules.get(0); + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1"); assertEquals("Queue", "b1", ctx.getQueue()); @@ -510,7 +514,6 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception { ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "e"); assertEquals("Queue", "esubgroup1", ctx2.getQueue()); - assertEquals("Queue", "root.c", ctx2.getParentQueue()); } finally { if (mockRM != null) { mockRM.close(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java index dcd0fe0bc3..039b9da8aa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestQueueMappings.java @@ -18,14 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.io.IOException; -import java.util.List; -import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule; -import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping; +import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; +import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; +import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.junit.Assert; import org.junit.Before; import org.junit.Test; @@ -91,13 +92,12 @@ public void testQueueMappingTrimSpaces() throws IOException { // space trimming conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1); cs.reinitialize(conf, null); - - List rules = cs.getConfiguration().getMappingRules(); - - String ruleStr = rules.get(0).toString(); - assert(ruleStr.contains("variable='%user'")); - assert(ruleStr.contains("value='a'")); - assert(ruleStr.contains("queueName='q1'")); + checkQMapping( + QueueMappingBuilder.create() + .type(MappingType.USER) + .source("a") + .queue(Q1) + .build()); } @Test @@ -155,4 +155,13 @@ private void checkInvalidQMapping(YarnConfiguration conf, Assert.assertTrue("invalid mapping did not throw exception for " + reason, fail); } + + private void checkQMapping(QueueMapping expected) + throws IOException { + UserGroupMappingPlacementRule rule = + (UserGroupMappingPlacementRule) cs.getRMContext() + .getQueuePlacementManager().getPlacementRules().get(0); + QueueMapping queueMapping = rule.getQueueMappings().get(0); + Assert.assertEquals(queueMapping, expected); + } }