YARN-10425. Replace the legacy placement engine in CS with the new one. Contributed by Gergely Pollak.
This commit is contained in:
parent
cd0490e8c6
commit
567600fd80
@ -864,9 +864,9 @@ ApplicationPlacementContext placeApplication(
|
||||
if (placementManager != null) {
|
||||
try {
|
||||
String usernameUsedForPlacement =
|
||||
getUserNameForPlacement(user, context, placementManager);
|
||||
getUserNameForPlacement(user, context, placementManager);
|
||||
placementContext = placementManager
|
||||
.placeApplication(context, usernameUsedForPlacement);
|
||||
.placeApplication(context, usernameUsedForPlacement, isRecovery);
|
||||
} catch (YarnException e) {
|
||||
// Placement could also fail if the user doesn't exist in system
|
||||
// skip if the user is not found during recovery.
|
||||
|
@ -1,204 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.getPlacementContext;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.isStaticQueueMapping;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetAutoCreatedQueueMapping;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils.validateAndGetQueueMapping;
|
||||
|
||||
public class AppNameMappingPlacementRule extends PlacementRule {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(AppNameMappingPlacementRule.class);
|
||||
|
||||
public static final String CURRENT_APP_MAPPING = "%application";
|
||||
|
||||
private static final String QUEUE_MAPPING_NAME = "app-name";
|
||||
|
||||
private boolean overrideWithQueueMappings = false;
|
||||
private List<QueueMapping> mappings = null;
|
||||
protected CapacitySchedulerQueueManager queueManager;
|
||||
|
||||
public AppNameMappingPlacementRule() {
|
||||
this(false, null);
|
||||
}
|
||||
|
||||
public AppNameMappingPlacementRule(boolean overrideWithQueueMappings,
|
||||
List<QueueMapping> newMappings) {
|
||||
this.overrideWithQueueMappings = overrideWithQueueMappings;
|
||||
this.mappings = newMappings;
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean initialize(ResourceScheduler scheduler)
|
||||
throws IOException {
|
||||
if (!(scheduler instanceof CapacityScheduler)) {
|
||||
throw new IOException(
|
||||
"AppNameMappingPlacementRule can be configured only for "
|
||||
+ "CapacityScheduler");
|
||||
}
|
||||
CapacitySchedulerContext schedulerContext =
|
||||
(CapacitySchedulerContext) scheduler;
|
||||
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
|
||||
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
||||
LOG.info(
|
||||
"Initialized App Name queue mappings, override: " + overrideWithQueueMappings);
|
||||
|
||||
List<QueueMapping> queueMappings =
|
||||
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
|
||||
|
||||
// Get new user mappings
|
||||
List<QueueMapping> newMappings = new ArrayList<>();
|
||||
|
||||
queueManager = schedulerContext.getCapacitySchedulerQueueManager();
|
||||
|
||||
// check if mappings refer to valid queues
|
||||
for (QueueMapping mapping : queueMappings) {
|
||||
if (isStaticQueueMapping(mapping)) {
|
||||
//at this point mapping.getQueueName() return only the queue name, since
|
||||
//the config parsing have been changed making QueueMapping more
|
||||
//consistent
|
||||
|
||||
CSQueue queue = queueManager.getQueue(mapping.getFullPath());
|
||||
if (ifQueueDoesNotExist(queue)) {
|
||||
//Try getting queue by its full path name, if it exists it is a static
|
||||
//leaf queue indeed, without any auto creation magic
|
||||
|
||||
if (queueManager.isAmbiguous(mapping.getFullPath())) {
|
||||
throw new IOException(
|
||||
"mapping contains ambiguous leaf queue reference " + mapping
|
||||
.getFullPath());
|
||||
}
|
||||
|
||||
//if leaf queue does not exist,
|
||||
// this could be a potential auto created leaf queue
|
||||
//validate if parent queue is specified,
|
||||
// then it should exist and
|
||||
// be an instance of AutoCreateEnabledParentQueue
|
||||
QueueMapping newMapping =
|
||||
validateAndGetAutoCreatedQueueMapping(queueManager, mapping);
|
||||
if (newMapping == null) {
|
||||
throw new IOException(
|
||||
"mapping contains invalid or non-leaf queue " + mapping
|
||||
.getQueue());
|
||||
}
|
||||
newMappings.add(newMapping);
|
||||
} else {
|
||||
// if queue exists, validate
|
||||
// if its an instance of leaf queue
|
||||
// if its an instance of auto created leaf queue,
|
||||
// then extract parent queue name and update queue mapping
|
||||
QueueMapping newMapping = validateAndGetQueueMapping(
|
||||
queueManager, queue, mapping);
|
||||
newMappings.add(newMapping);
|
||||
}
|
||||
} else {
|
||||
//If it is a dynamic queue mapping,
|
||||
// we can safely assume leaf queue name does not have '.' in it
|
||||
// validate
|
||||
// if parent queue is specified, then
|
||||
// parent queue exists and an instance of AutoCreateEnabledParentQueue
|
||||
//
|
||||
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
|
||||
queueManager, mapping);
|
||||
if (newMapping != null) {
|
||||
newMappings.add(newMapping);
|
||||
} else{
|
||||
newMappings.add(mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
if (newMappings.size() > 0) {
|
||||
this.mappings = newMappings;
|
||||
this.overrideWithQueueMappings = overrideWithQueueMappings;
|
||||
LOG.info("get valid queue mapping from app name config: " +
|
||||
newMappings.toString() + ", override: " + overrideWithQueueMappings);
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static boolean ifQueueDoesNotExist(CSQueue queue) {
|
||||
return queue == null;
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getAppPlacementContext(String user,
|
||||
String applicationName) throws IOException {
|
||||
for (QueueMapping mapping : mappings) {
|
||||
if (mapping.getSource().equals(CURRENT_APP_MAPPING)) {
|
||||
if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) {
|
||||
return getPlacementContext(mapping, applicationName, queueManager);
|
||||
} else {
|
||||
return getPlacementContext(mapping, queueManager);
|
||||
}
|
||||
}
|
||||
if (mapping.getSource().equals(applicationName)) {
|
||||
return getPlacementContext(mapping, queueManager);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||
String queueName = asc.getQueue();
|
||||
String applicationName = asc.getApplicationName();
|
||||
if (mappings != null && mappings.size() > 0) {
|
||||
try {
|
||||
ApplicationPlacementContext mappedQueue = getAppPlacementContext(user,
|
||||
applicationName);
|
||||
if (mappedQueue != null) {
|
||||
// We have a mapping, should we use it?
|
||||
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|
||||
//queueName will be same as mapped queue name in case of recovery
|
||||
|| queueName.equals(mappedQueue.getQueue())
|
||||
|| overrideWithQueueMappings) {
|
||||
LOG.info("Application {} mapping [{}] to [{}] override {}",
|
||||
applicationName, queueName, mappedQueue.getQueue(),
|
||||
overrideWithQueueMappings);
|
||||
return mappedQueue;
|
||||
}
|
||||
}
|
||||
} catch (IOException ioex) {
|
||||
String message = "Failed to submit application " + applicationName +
|
||||
" reason: " + ioex.getMessage();
|
||||
throw new YarnException(message);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
@ -125,7 +125,11 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||
overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
||||
|
||||
if (groups == null) {
|
||||
groups = Groups.getUserToGroupsMappingService(conf);
|
||||
//We cannot use Groups#getUserToGroupsMappingService here, because when
|
||||
//tests change the HADOOP_SECURITY_GROUP_MAPPING, Groups won't refresh its
|
||||
//cached instance of groups, so we might get a Group instance which
|
||||
//ignores the HADOOP_SECURITY_GROUP_MAPPING settings.
|
||||
groups = new Groups(conf);
|
||||
}
|
||||
|
||||
MappingRuleValidationContext validationContext = buildValidationContext();
|
||||
@ -145,8 +149,8 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||
}
|
||||
|
||||
LOG.info("Initialized queue mappings, can override user specified " +
|
||||
"queues: {} number of rules: {}", overrideWithQueueMappings,
|
||||
mappingRules.size());
|
||||
"queues: {} number of rules: {} mapping rules: {}",
|
||||
overrideWithQueueMappings, mappingRules.size(), mappingRules);
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Initialized with the following mapping rules:");
|
||||
@ -170,6 +174,12 @@ public boolean initialize(ResourceScheduler scheduler) throws IOException {
|
||||
*/
|
||||
private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
||||
throws IOException {
|
||||
if (groups == null) {
|
||||
LOG.warn(
|
||||
"Group provider hasn't been set, cannot query groups for user {}",
|
||||
user);
|
||||
return;
|
||||
}
|
||||
Set<String> groupsSet = groups.getGroupsSet(user);
|
||||
String secondaryGroup = null;
|
||||
Iterator<String> it = groupsSet.iterator();
|
||||
@ -193,14 +203,18 @@ private void setupGroupsForVariableContext(VariableContext vctx, String user)
|
||||
}
|
||||
|
||||
private VariableContext createVariableContext(
|
||||
ApplicationSubmissionContext asc, String user) throws IOException {
|
||||
ApplicationSubmissionContext asc, String user) {
|
||||
VariableContext vctx = new VariableContext();
|
||||
|
||||
vctx.put("%user", user);
|
||||
vctx.put("%specified", asc.getQueue());
|
||||
vctx.put("%application", asc.getApplicationName());
|
||||
vctx.put("%default", "root.default");
|
||||
setupGroupsForVariableContext(vctx, user);
|
||||
try {
|
||||
setupGroupsForVariableContext(vctx, user);
|
||||
} catch (IOException e) {
|
||||
LOG.warn("Unable to setup groups: {}", e.getMessage());
|
||||
}
|
||||
|
||||
vctx.setImmutables(immutableVariables);
|
||||
return vctx;
|
||||
@ -338,34 +352,43 @@ private ApplicationPlacementContext createPlacementContext(String queueName) {
|
||||
@Override
|
||||
public ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||
return getPlacementForApp(asc, user, false);
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user, boolean recovery)
|
||||
throws YarnException {
|
||||
//We only use the mapping rules if overrideWithQueueMappings enabled
|
||||
//or the application is submitted to the default queue, which effectively
|
||||
//means the application doesn't have any specific queue.
|
||||
String appQueue = asc.getQueue();
|
||||
LOG.debug("Looking placement for app '{}' originally submitted to queue " +
|
||||
"'{}', with override enabled '{}'",
|
||||
asc.getApplicationName(), appQueue, overrideWithQueueMappings);
|
||||
if (appQueue != null &&
|
||||
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME) &&
|
||||
!appQueue.equals(YarnConfiguration.DEFAULT_QUEUE_FULL_NAME) &&
|
||||
!overrideWithQueueMappings) {
|
||||
!overrideWithQueueMappings &&
|
||||
!recovery) {
|
||||
LOG.info("Have no jurisdiction over application submission '{}', " +
|
||||
"moving to next PlacementRule engine", asc.getApplicationName());
|
||||
return null;
|
||||
}
|
||||
|
||||
VariableContext variables;
|
||||
try {
|
||||
variables = createVariableContext(asc, user);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Unable to setup variable context", e);
|
||||
throw new YarnException(e);
|
||||
}
|
||||
variables = createVariableContext(asc, user);
|
||||
|
||||
ApplicationPlacementContext ret = null;
|
||||
for (MappingRule rule : mappingRules) {
|
||||
MappingRuleResult result = evaluateRule(rule, variables);
|
||||
switch (result.getResult()) {
|
||||
case PLACE_TO_DEFAULT:
|
||||
return placeToDefault(asc, variables, rule);
|
||||
ret = placeToDefault(asc, variables, rule);
|
||||
break;
|
||||
case PLACE:
|
||||
return placeToQueue(asc, rule, result);
|
||||
ret = placeToQueue(asc, rule, result);
|
||||
break;
|
||||
case REJECT:
|
||||
LOG.info("Rejecting application '{}', reason: Mapping rule '{}' " +
|
||||
" fallback action is set to REJECT.",
|
||||
@ -377,17 +400,42 @@ public ApplicationPlacementContext getPlacementForApp(
|
||||
case SKIP:
|
||||
//SKIP means skip to the next rule, which is the default behaviour of
|
||||
//the for loop, so we don't need to take any extra actions
|
||||
break;
|
||||
break;
|
||||
default:
|
||||
LOG.error("Invalid result '{}'", result);
|
||||
}
|
||||
|
||||
//If we already have a return value, we can return it!
|
||||
if (ret != null) {
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
//If no rule was applied we return null, to let the engine move onto the
|
||||
//next placementRule class
|
||||
LOG.info("No matching rule found for application '{}', moving to next " +
|
||||
"PlacementRule engine", asc.getApplicationName());
|
||||
return null;
|
||||
if (ret == null) {
|
||||
//If no rule was applied we return null, to let the engine move onto the
|
||||
//next placementRule class
|
||||
LOG.info("No matching rule found for application '{}', moving to next " +
|
||||
"PlacementRule engine", asc.getApplicationName());
|
||||
}
|
||||
|
||||
if (recovery) {
|
||||
//we need this part for backwards compatibility with recovery
|
||||
//the legacy code checked if the placement matches the queue of the
|
||||
//application to be recovered, and if it did, it created an
|
||||
//ApplicationPlacementContext.
|
||||
//However at a later point this is going to be changed, there are two
|
||||
//major issues with this approach:
|
||||
// 1) The recovery only uses LEAF queue names, which must be updated
|
||||
// 2) The ORIGINAL queue which the application was submitted is NOT
|
||||
// stored this might result in different placement evaluation since
|
||||
// now we can have rules which give different result based on what
|
||||
// the user submitted.
|
||||
if (ret == null || !ret.getQueue().equals(asc.getQueue())) {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
return ret;
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext placeToQueue(
|
||||
@ -410,13 +458,13 @@ private ApplicationPlacementContext placeToDefault(
|
||||
String queueName = validateAndNormalizeQueue(
|
||||
variables.replacePathVariables("%default"), false);
|
||||
LOG.debug("Application '{}' have been placed to queue '{}' by " +
|
||||
"the fallback option of rule {}",
|
||||
"the fallback option of rule {}",
|
||||
asc.getApplicationName(), queueName, rule);
|
||||
return createPlacementContext(queueName);
|
||||
} catch (YarnException e) {
|
||||
LOG.error("Rejecting application due to a failed fallback" +
|
||||
" action '{}'" + ", reason: {}", asc.getApplicationName(),
|
||||
e.getMessage());
|
||||
e);
|
||||
//We intentionally omit the details, we don't want any server side
|
||||
//config information to leak to the client side
|
||||
throw new YarnException("Application submission have been rejected by a" +
|
||||
|
@ -154,20 +154,22 @@ private boolean validateDynamicQueuePath(MappingQueuePath path)
|
||||
}
|
||||
|
||||
if (!(parentQueue instanceof ManagedParentQueue)) {
|
||||
for (CSQueue queue : parentQueue.getChildQueues()) {
|
||||
if (queue instanceof LeafQueue) {
|
||||
//if a non managed parent queue has at least one leaf queue, this
|
||||
//mapping can be valid, we cannot do any more checks
|
||||
return true;
|
||||
if (parentQueue.getChildQueues() != null) {
|
||||
for (CSQueue queue : parentQueue.getChildQueues()) {
|
||||
if (queue instanceof LeafQueue) {
|
||||
//if a non managed parent queue has at least one leaf queue, this
|
||||
//mapping can be valid, we cannot do any more checks
|
||||
return true;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
//There is no way we can place anything into the queue referenced by the
|
||||
// rule, because we cannot auto create, and we don't have any leaf queues
|
||||
//Actually this branch is not accessibe with the current queue hierarchy,
|
||||
//Actually this branch is not accessible with the current queue hierarchy,
|
||||
//there should be no parents without any leaf queues. This condition says
|
||||
//for sanity checks
|
||||
throw new YarnException("Target queue path '" + path + "' has" +
|
||||
throw new YarnException("Target queue path '" + path + "' has " +
|
||||
"a non-managed parent queue which has no LeafQueues either.");
|
||||
}
|
||||
|
||||
|
@ -54,7 +54,8 @@ public void updateRules(List<PlacementRule> rules) {
|
||||
}
|
||||
|
||||
public ApplicationPlacementContext placeApplication(
|
||||
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||
ApplicationSubmissionContext asc, String user, boolean recovery)
|
||||
throws YarnException {
|
||||
readLock.lock();
|
||||
try {
|
||||
if (null == rules || rules.isEmpty()) {
|
||||
@ -63,7 +64,7 @@ public ApplicationPlacementContext placeApplication(
|
||||
|
||||
ApplicationPlacementContext placement = null;
|
||||
for (PlacementRule rule : rules) {
|
||||
placement = rule.getPlacementForApp(asc, user);
|
||||
placement = rule.getPlacementForApp(asc, user, recovery);
|
||||
if (placement != null) {
|
||||
break;
|
||||
}
|
||||
@ -74,6 +75,11 @@ public ApplicationPlacementContext placeApplication(
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public ApplicationPlacementContext placeApplication(
|
||||
ApplicationSubmissionContext asc, String user) throws YarnException {
|
||||
return placeApplication(asc, user, false);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<PlacementRule> getPlacementRules() {
|
||||
|
@ -79,4 +79,29 @@ public abstract boolean initialize(ResourceScheduler scheduler)
|
||||
*/
|
||||
public abstract ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user) throws YarnException;
|
||||
|
||||
|
||||
/**
|
||||
* Return the scheduler queue name the application should be placed in
|
||||
* wrapped in an {@link ApplicationPlacementContext} object.
|
||||
*
|
||||
* A non <code>null</code> return value places the application in a queue,
|
||||
* a <code>null</code> value means the queue is not yet determined. The
|
||||
* next {@link PlacementRule} in the list maintained in the
|
||||
* {@link PlacementManager} will be executed.
|
||||
*
|
||||
* @param asc The context of the application created on submission
|
||||
* @param user The name of the user submitting the application
|
||||
* @param recovery Indicates if the submission is a recovery
|
||||
*
|
||||
* @throws YarnException for any error while executing the rule
|
||||
*
|
||||
* @return The queue name wrapped in {@link ApplicationPlacementContext} or
|
||||
* <code>null</code> if no queue was resolved
|
||||
*/
|
||||
public ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user, boolean recovery)
|
||||
throws YarnException {
|
||||
return getPlacementForApp(asc, user);
|
||||
}
|
||||
}
|
@ -1,558 +0,0 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Iterator;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||
import org.apache.hadoop.security.Groups;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AutoCreatedLeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.LeafQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ManagedParentQueue;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.ParentQueue;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
public class UserGroupMappingPlacementRule extends PlacementRule {
|
||||
private static final Logger LOG = LoggerFactory
|
||||
.getLogger(UserGroupMappingPlacementRule.class);
|
||||
|
||||
public static final String CURRENT_USER_MAPPING = "%user";
|
||||
|
||||
public static final String PRIMARY_GROUP_MAPPING = "%primary_group";
|
||||
|
||||
public static final String SECONDARY_GROUP_MAPPING = "%secondary_group";
|
||||
|
||||
private boolean overrideWithQueueMappings = false;
|
||||
private List<QueueMapping> mappings = null;
|
||||
private Groups groups;
|
||||
private CapacitySchedulerQueueManager queueManager;
|
||||
|
||||
public UserGroupMappingPlacementRule(){
|
||||
this(false, null, null);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
UserGroupMappingPlacementRule(boolean overrideWithQueueMappings,
|
||||
List<QueueMapping> newMappings, Groups groups) {
|
||||
this.mappings = newMappings;
|
||||
this.overrideWithQueueMappings = overrideWithQueueMappings;
|
||||
this.groups = groups;
|
||||
}
|
||||
|
||||
private String getPrimaryGroup(String user) throws IOException {
|
||||
return groups.getGroupsSet(user).iterator().next();
|
||||
}
|
||||
|
||||
private String getSecondaryGroup(String user) throws IOException {
|
||||
Set<String> groupsSet = groups.getGroupsSet(user);
|
||||
String secondaryGroup = null;
|
||||
// Traverse all secondary groups (as there could be more than one
|
||||
// and position is not guaranteed) and ensure there is queue with
|
||||
// the same name
|
||||
Iterator<String> it = groupsSet.iterator();
|
||||
it.next();
|
||||
while (it.hasNext()) {
|
||||
String group = it.next();
|
||||
if (this.queueManager.getQueue(group) != null) {
|
||||
secondaryGroup = group;
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
||||
if (secondaryGroup == null && LOG.isDebugEnabled()) {
|
||||
LOG.debug("User {} is not associated with any Secondary "
|
||||
+ "Group. Hence it may use the 'default' queue", user);
|
||||
}
|
||||
return secondaryGroup;
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getPlacementForUser(String user)
|
||||
throws IOException {
|
||||
for (QueueMapping mapping : mappings) {
|
||||
if (mapping.getType().equals(MappingType.USER)) {
|
||||
if (mapping.getSource().equals(CURRENT_USER_MAPPING)) {
|
||||
if (mapping.getParentQueue() != null
|
||||
&& mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|
||||
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"primary group current user mapping", user);
|
||||
}
|
||||
return getContextForGroupParent(user, mapping,
|
||||
getPrimaryGroup(user));
|
||||
} else if (mapping.getParentQueue() != null
|
||||
&& mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING)
|
||||
&& mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"secondary group current user mapping", user);
|
||||
}
|
||||
return getContextForGroupParent(user, mapping,
|
||||
getSecondaryGroup(user));
|
||||
} else if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"current user mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, user);
|
||||
} else if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"primary group mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, getPrimaryGroup(user));
|
||||
} else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"secondary group mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, getSecondaryGroup(user));
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"current user static mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping);
|
||||
}
|
||||
}
|
||||
|
||||
if (user.equals(mapping.getSource())) {
|
||||
if (mapping.getQueue().equals(PRIMARY_GROUP_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"static user primary group mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, getPrimaryGroup(user));
|
||||
} else if (mapping.getQueue().equals(SECONDARY_GROUP_MAPPING)) {
|
||||
String secondaryGroup = getSecondaryGroup(user);
|
||||
if (secondaryGroup != null) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"static user secondary group mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, secondaryGroup);
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Wanted to create placement context for user {}" +
|
||||
" using static user secondary group mapping," +
|
||||
" but user has no secondary group!", user);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
} else {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"current user static mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
if (mapping.getType().equals(MappingType.GROUP)) {
|
||||
for (String userGroups : groups.getGroupsSet(user)) {
|
||||
if (userGroups.equals(mapping.getSource())) {
|
||||
if (mapping.getQueue().equals(CURRENT_USER_MAPPING)) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"static group current user mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping, user);
|
||||
}
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Creating placement context for user {} using " +
|
||||
"static group static mapping", user);
|
||||
}
|
||||
return getPlacementContext(mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
/**
|
||||
* This convenience method allows to change the parent path or a leafName in
|
||||
* a mapping object, by creating a new one, using the builder and copying the
|
||||
* rest of the parameters.
|
||||
* @param mapping The mapping to be changed
|
||||
* @param parentPath The new parentPath of the mapping
|
||||
* @param leafName The new leafQueueName of the mapping
|
||||
* @return The updated NEW mapping
|
||||
*/
|
||||
private QueueMapping alterMapping(
|
||||
QueueMapping mapping, String parentPath, String leafName) {
|
||||
return QueueMappingBuilder.create()
|
||||
.type(mapping.getType())
|
||||
.source(mapping.getSource())
|
||||
.queue(leafName)
|
||||
.parentQueue(parentPath)
|
||||
.build();
|
||||
}
|
||||
|
||||
// invoked for mappings:
|
||||
// u:%user:%primary_group.%user
|
||||
// u:%user:%secondary_group.%user
|
||||
private ApplicationPlacementContext getContextForGroupParent(
|
||||
String user,
|
||||
QueueMapping mapping,
|
||||
String group) throws IOException {
|
||||
|
||||
CSQueue groupQueue = this.queueManager.getQueue(group);
|
||||
if (groupQueue != null) {
|
||||
// replace the group string
|
||||
QueueMapping resolvedGroupMapping = alterMapping(
|
||||
mapping,
|
||||
groupQueue.getQueuePath(),
|
||||
user);
|
||||
validateQueueMapping(resolvedGroupMapping);
|
||||
return getPlacementContext(resolvedGroupMapping, user);
|
||||
} else {
|
||||
if (queueManager.isAmbiguous(group)) {
|
||||
LOG.info("Queue mapping rule expect group queue to exist with name {}" +
|
||||
" but the reference is ambiguous!", group);
|
||||
} else {
|
||||
LOG.info("Queue mapping rule expect group queue to exist with name {}" +
|
||||
" but it does not exist!", group);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public ApplicationPlacementContext getPlacementForApp(
|
||||
ApplicationSubmissionContext asc, String user)
|
||||
throws YarnException {
|
||||
String queueName = asc.getQueue();
|
||||
ApplicationId applicationId = asc.getApplicationId();
|
||||
if (mappings != null && mappings.size() > 0) {
|
||||
try {
|
||||
ApplicationPlacementContext mappedQueue = getPlacementForUser(user);
|
||||
if (mappedQueue != null) {
|
||||
// We have a mapping, should we use it?
|
||||
if (queueName.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)
|
||||
//queueName will be same as mapped queue name in case of recovery
|
||||
|| queueName.equals(mappedQueue.getQueue())
|
||||
|| overrideWithQueueMappings) {
|
||||
LOG.info("Application {} user {} mapping [{}] to [{}] override {}",
|
||||
applicationId, user, queueName, mappedQueue.getQueue(),
|
||||
overrideWithQueueMappings);
|
||||
return mappedQueue;
|
||||
}
|
||||
}
|
||||
} catch (IOException ioex) {
|
||||
String message = "Failed to submit application " + applicationId +
|
||||
" submitted by user " + user + " reason: " + ioex.getMessage();
|
||||
throw new YarnException(message, ioex);
|
||||
}
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getPlacementContext(
|
||||
QueueMapping mapping) throws IOException {
|
||||
return getPlacementContext(mapping, mapping.getQueue());
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getPlacementContext(QueueMapping mapping,
|
||||
String leafQueueName) throws IOException {
|
||||
//leafQueue name no longer identifies a queue uniquely checking ambiguity
|
||||
if (!mapping.hasParentQueue() && queueManager.isAmbiguous(leafQueueName)) {
|
||||
throw new IOException("mapping contains ambiguous leaf queue reference " +
|
||||
leafQueueName);
|
||||
}
|
||||
|
||||
if (!StringUtils.isEmpty(mapping.getParentQueue())) {
|
||||
return getPlacementContextWithParent(mapping, leafQueueName);
|
||||
} else {
|
||||
return getPlacementContextNoParent(leafQueueName);
|
||||
}
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getPlacementContextWithParent(
|
||||
QueueMapping mapping,
|
||||
String leafQueueName) {
|
||||
CSQueue parent = queueManager.getQueue(mapping.getParentQueue());
|
||||
//we don't find the specified parent, so the placement rule is invalid
|
||||
//for this case
|
||||
if (parent == null) {
|
||||
if (queueManager.isAmbiguous(mapping.getParentQueue())) {
|
||||
LOG.warn("Placement rule specified a parent queue {}, but it is" +
|
||||
"ambiguous.", mapping.getParentQueue());
|
||||
} else {
|
||||
LOG.warn("Placement rule specified a parent queue {}, but it does" +
|
||||
"not exist.", mapping.getParentQueue());
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
String parentPath = parent.getQueuePath();
|
||||
|
||||
//if we have a parent which is not a managed parent, we check if the leaf
|
||||
//queue exists under this parent
|
||||
if (!(parent instanceof ManagedParentQueue)) {
|
||||
CSQueue queue = queueManager.getQueue(parentPath + "." + leafQueueName);
|
||||
//if the queue doesn't exit we return null
|
||||
if (queue == null) {
|
||||
LOG.warn("Placement rule specified a parent queue {}, but it is" +
|
||||
" not a managed parent queue, and no queue exists with name {} " +
|
||||
"under it.", mapping.getParentQueue(), leafQueueName);
|
||||
return null;
|
||||
}
|
||||
}
|
||||
//at this point we either have a managed parent or the queue actually
|
||||
//exists so we have a placement context, returning it
|
||||
return new ApplicationPlacementContext(leafQueueName, parentPath);
|
||||
}
|
||||
|
||||
private ApplicationPlacementContext getPlacementContextNoParent(
|
||||
String leafQueueName) {
|
||||
//in this case we don't have a parent specified so we expect the queue to
|
||||
//exist, otherwise the mapping will not be valid for this case
|
||||
CSQueue queue = queueManager.getQueue(leafQueueName);
|
||||
if (queue == null) {
|
||||
if (queueManager.isAmbiguous(leafQueueName)) {
|
||||
LOG.warn("Queue {} specified in placement rule is ambiguous",
|
||||
leafQueueName);
|
||||
} else {
|
||||
LOG.warn("Queue {} specified in placement rule does not exist",
|
||||
leafQueueName);
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
//getting parent path to make sure if the leaf name would become ambiguous
|
||||
//the placement context stays valid.
|
||||
CSQueue parent = queueManager.getQueue(leafQueueName).getParent();
|
||||
return new ApplicationPlacementContext(
|
||||
leafQueueName, parent.getQueuePath());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Override
|
||||
public boolean initialize(ResourceScheduler scheduler)
|
||||
throws IOException {
|
||||
if (!(scheduler instanceof CapacityScheduler)) {
|
||||
throw new IOException(
|
||||
"UserGroupMappingPlacementRule can be configured only for "
|
||||
+ "CapacityScheduler");
|
||||
}
|
||||
CapacitySchedulerContext schedulerContext =
|
||||
(CapacitySchedulerContext) scheduler;
|
||||
CapacitySchedulerConfiguration conf = schedulerContext.getConfiguration();
|
||||
boolean overrideWithQueueMappings = conf.getOverrideWithQueueMappings();
|
||||
LOG.info(
|
||||
"Initialized queue mappings, override: " + overrideWithQueueMappings);
|
||||
|
||||
List<QueueMapping> queueMappings = conf.getQueueMappings();
|
||||
|
||||
// Get new user/group mappings
|
||||
List<QueueMapping> newMappings = new ArrayList<>();
|
||||
|
||||
queueManager = schedulerContext.getCapacitySchedulerQueueManager();
|
||||
|
||||
// check if mappings refer to valid queues
|
||||
for (QueueMapping mapping : queueMappings) {
|
||||
//at this point mapping.getQueueName() return only the queue name, since
|
||||
//the config parsing have been changed making QueueMapping more consistent
|
||||
|
||||
if (isStaticQueueMapping(mapping)) {
|
||||
//Try getting queue by its full path name, if it exists it is a static
|
||||
//leaf queue indeed, without any auto creation magic
|
||||
CSQueue queue = queueManager.getQueue(mapping.getFullPath());
|
||||
if (ifQueueDoesNotExist(queue)) {
|
||||
//We might not be able to find the queue, because the reference was
|
||||
// ambiguous this should only happen if the queue was referenced by
|
||||
// leaf name only
|
||||
if (queueManager.isAmbiguous(mapping.getFullPath())) {
|
||||
throw new IOException(
|
||||
"mapping contains ambiguous leaf queue reference " + mapping
|
||||
.getFullPath());
|
||||
}
|
||||
|
||||
//if leaf queue does not exist,
|
||||
// this could be a potential auto created leaf queue
|
||||
//validate if parent queue is specified,
|
||||
// then it should exist and
|
||||
// be an instance of AutoCreateEnabledParentQueue
|
||||
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
|
||||
queueManager, mapping);
|
||||
if (newMapping == null) {
|
||||
throw new IOException(
|
||||
"mapping contains invalid or non-leaf queue " + mapping
|
||||
.getQueue());
|
||||
}
|
||||
newMappings.add(newMapping);
|
||||
} else {
|
||||
// if queue exists, validate
|
||||
// if its an instance of leaf queue
|
||||
// if its an instance of auto created leaf queue,
|
||||
// then extract parent queue name and update queue mapping
|
||||
QueueMapping newMapping = validateAndGetQueueMapping(queueManager,
|
||||
queue, mapping);
|
||||
newMappings.add(newMapping);
|
||||
}
|
||||
} else{
|
||||
//If it is a dynamic queue mapping,
|
||||
// we can safely assume leaf queue name does not have '.' in it
|
||||
// validate
|
||||
// if parent queue is specified, then
|
||||
// parent queue exists and an instance of AutoCreateEnabledParentQueue
|
||||
//
|
||||
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
|
||||
queueManager, mapping);
|
||||
if (newMapping != null) {
|
||||
newMappings.add(newMapping);
|
||||
} else{
|
||||
newMappings.add(mapping);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// initialize groups if mappings are present
|
||||
if (newMappings.size() > 0) {
|
||||
this.mappings = newMappings;
|
||||
this.groups = Groups.getUserToGroupsMappingService(
|
||||
((CapacityScheduler)scheduler).getConf());
|
||||
this.overrideWithQueueMappings = overrideWithQueueMappings;
|
||||
return true;
|
||||
}
|
||||
return false;
|
||||
}
|
||||
|
||||
private static QueueMapping validateAndGetQueueMapping(
|
||||
CapacitySchedulerQueueManager queueManager, CSQueue queue,
|
||||
QueueMapping mapping) throws IOException {
|
||||
if (!(queue instanceof LeafQueue)) {
|
||||
throw new IOException(
|
||||
"mapping contains invalid or non-leaf queue : " +
|
||||
mapping.getFullPath());
|
||||
}
|
||||
|
||||
if (queue instanceof AutoCreatedLeafQueue && queue
|
||||
.getParent() instanceof ManagedParentQueue) {
|
||||
|
||||
QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
|
||||
queueManager, mapping);
|
||||
if (newMapping == null) {
|
||||
throw new IOException(
|
||||
"mapping contains invalid or non-leaf queue "
|
||||
+ mapping.getFullPath());
|
||||
}
|
||||
return newMapping;
|
||||
}
|
||||
return mapping;
|
||||
}
|
||||
|
||||
private static boolean ifQueueDoesNotExist(CSQueue queue) {
|
||||
return queue == null;
|
||||
}
|
||||
|
||||
private static QueueMapping validateAndGetAutoCreatedQueueMapping(
|
||||
CapacitySchedulerQueueManager queueManager, QueueMapping mapping)
|
||||
throws IOException {
|
||||
if (mapping.hasParentQueue()
|
||||
&& (mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING)
|
||||
|| mapping.getParentQueue().equals(SECONDARY_GROUP_MAPPING))) {
|
||||
// dynamic parent queue
|
||||
return mapping;
|
||||
} else if (mapping.hasParentQueue()) {
|
||||
//if parent queue is specified,
|
||||
// then it should exist and be an instance of ManagedParentQueue
|
||||
QueuePlacementRuleUtils.validateQueueMappingUnderParentQueue(
|
||||
queueManager.getQueue(mapping.getParentQueue()),
|
||||
mapping.getParentQueue(), mapping.getQueue());
|
||||
return mapping;
|
||||
}
|
||||
|
||||
return null;
|
||||
}
|
||||
|
||||
private static boolean isStaticQueueMapping(QueueMapping mapping) {
|
||||
return !mapping.getQueue()
|
||||
.contains(UserGroupMappingPlacementRule.CURRENT_USER_MAPPING)
|
||||
&& !mapping.getQueue()
|
||||
.contains(UserGroupMappingPlacementRule.PRIMARY_GROUP_MAPPING)
|
||||
&& !mapping.getQueue()
|
||||
.contains(UserGroupMappingPlacementRule.SECONDARY_GROUP_MAPPING);
|
||||
}
|
||||
|
||||
private void validateQueueMapping(QueueMapping queueMapping)
|
||||
throws IOException {
|
||||
String parentQueueName = queueMapping.getParentQueue();
|
||||
String leafQueueFullName = queueMapping.getFullPath();
|
||||
CSQueue parentQueue = queueManager.getQueueByFullName(parentQueueName);
|
||||
CSQueue leafQueue = queueManager.getQueue(leafQueueFullName);
|
||||
|
||||
if (leafQueue == null || (!(leafQueue instanceof LeafQueue))) {
|
||||
//this might be confusing, but a mapping is not guaranteed to provide the
|
||||
//parent queue's name, which can result in ambiguous queue references
|
||||
//if no parent queueName is provided mapping.getFullPath() is the same
|
||||
//as mapping.getQueue()
|
||||
if (leafQueue == null && queueManager.isAmbiguous(leafQueueFullName)) {
|
||||
throw new IOException("mapping contains ambiguous leaf queue name: "
|
||||
+ leafQueueFullName);
|
||||
} else if (parentQueue == null ||
|
||||
(!(parentQueue instanceof ManagedParentQueue))) {
|
||||
throw new IOException("mapping contains invalid or non-leaf queue " +
|
||||
" and no managed parent is found: "
|
||||
+ leafQueueFullName);
|
||||
}
|
||||
} else if (parentQueue == null || (!(parentQueue instanceof ParentQueue))) {
|
||||
throw new IOException(
|
||||
"mapping contains invalid parent queue [" + parentQueueName + "]");
|
||||
} else if (!parentQueue.getQueuePath()
|
||||
.equals(leafQueue.getParent().getQueuePath())) {
|
||||
throw new IOException("mapping contains invalid parent queue "
|
||||
+ "which does not match existing leaf queue's parent : ["
|
||||
+ parentQueue.getQueuePath() + "] does not match [ "
|
||||
+ leafQueue.getParent().getQueuePath() + "]");
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public List<QueueMapping> getQueueMappings() {
|
||||
return mappings;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
@Private
|
||||
public void setQueueManager(CapacitySchedulerQueueManager queueManager) {
|
||||
this.queueManager = queueManager;
|
||||
}
|
||||
}
|
@ -35,6 +35,10 @@
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.slf4j.Marker;
|
||||
@ -71,11 +75,6 @@
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementFactory;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore.RMState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.recovery.records.ApplicationStateData;
|
||||
|
||||
@ -679,24 +678,14 @@ public int getPendingBacklogs() {
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public PlacementRule getUserGroupMappingPlacementRule() throws IOException {
|
||||
readLock.lock();
|
||||
try {
|
||||
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule();
|
||||
ugRule.initialize(this);
|
||||
return ugRule;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
public PlacementRule getAppNameMappingPlacementRule() throws IOException {
|
||||
@VisibleForTesting
|
||||
public PlacementRule getCSMappingPlacementRule() throws IOException {
|
||||
readLock.lock();
|
||||
try {
|
||||
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule();
|
||||
anRule.initialize(this);
|
||||
return anRule;
|
||||
CSMappingPlacementRule mappingRule = new CSMappingPlacementRule();
|
||||
mappingRule.initialize(this);
|
||||
return mappingRule;
|
||||
} finally {
|
||||
readLock.unlock();
|
||||
}
|
||||
@ -718,19 +707,18 @@ public void updatePlacementRules() throws IOException {
|
||||
}
|
||||
|
||||
placementRuleStrs = new ArrayList<>(distinguishRuleSet);
|
||||
boolean csMappingAdded = false;
|
||||
|
||||
for (String placementRuleStr : placementRuleStrs) {
|
||||
switch (placementRuleStr) {
|
||||
case YarnConfiguration.USER_GROUP_PLACEMENT_RULE:
|
||||
PlacementRule ugRule = getUserGroupMappingPlacementRule();
|
||||
if (null != ugRule) {
|
||||
placementRules.add(ugRule);
|
||||
}
|
||||
break;
|
||||
case YarnConfiguration.APP_NAME_PLACEMENT_RULE:
|
||||
PlacementRule anRule = getAppNameMappingPlacementRule();
|
||||
if (null != anRule) {
|
||||
placementRules.add(anRule);
|
||||
if (!csMappingAdded) {
|
||||
PlacementRule csMappingRule = getCSMappingPlacementRule();
|
||||
if (null != csMappingRule) {
|
||||
placementRules.add(csMappingRule);
|
||||
csMappingAdded = true;
|
||||
}
|
||||
}
|
||||
break;
|
||||
default:
|
||||
|
@ -1229,6 +1229,24 @@ public void setQueueMappings(List<QueueMapping> queueMappings) {
|
||||
setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs));
|
||||
}
|
||||
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
public void setAppNameMappings(List<QueueMapping> queueMappings) {
|
||||
if (queueMappings == null) {
|
||||
return;
|
||||
}
|
||||
|
||||
List<String> queueMappingStrs = new ArrayList<>();
|
||||
for (QueueMapping mapping : queueMappings) {
|
||||
String rule = mapping.toString();
|
||||
String[] parts = rule.split(":");
|
||||
queueMappingStrs.add(parts[1] + ":" + parts[2]);
|
||||
}
|
||||
|
||||
setStrings(QUEUE_MAPPING_NAME, StringUtils.join(",", queueMappingStrs));
|
||||
}
|
||||
|
||||
@Private
|
||||
@VisibleForTesting
|
||||
void setWorkflowPriorityMappings(
|
||||
|
@ -1053,7 +1053,9 @@ public ApplicationPlacementContext answer(InvocationOnMock invocation)
|
||||
}
|
||||
|
||||
}).when(placementMgr).placeApplication(
|
||||
any(ApplicationSubmissionContext.class), any(String.class));
|
||||
any(ApplicationSubmissionContext.class),
|
||||
any(String.class),
|
||||
any(Boolean.class));
|
||||
rmContext.setQueuePlacementManager(placementMgr);
|
||||
|
||||
asContext.setQueue("oldQueue");
|
||||
|
@ -127,7 +127,7 @@ public void testQueueSubmitWithHighQueueContainerSize()
|
||||
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
|
||||
|
||||
// Submit to limited queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("limited"));
|
||||
try {
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
@ -138,7 +138,7 @@ public void testQueueSubmitWithHighQueueContainerSize()
|
||||
}
|
||||
|
||||
// submit same app but now place it in the unlimited queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("root.unlimited"));
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
}
|
||||
@ -172,7 +172,7 @@ public void testQueueSubmitWithPermissionLimits()
|
||||
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
|
||||
|
||||
// Submit to no access queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("noaccess"));
|
||||
try {
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
@ -182,13 +182,13 @@ public void testQueueSubmitWithPermissionLimits()
|
||||
e.getCause() instanceof AccessControlException);
|
||||
}
|
||||
// Submit to submit access queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("submitonly"));
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
// Submit second app to admin access queue
|
||||
appId = MockApps.newAppID(2);
|
||||
asContext = createAppSubmitCtx(appId, res);
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("adminonly"));
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
}
|
||||
@ -245,7 +245,7 @@ public void testQueueSubmitWithAutoCreateQueue()
|
||||
ApplicationSubmissionContext asContext = createAppSubmitCtx(appId, res);
|
||||
|
||||
// Submit to noaccess parent with non existent child queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("root.noaccess.child"));
|
||||
try {
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
@ -255,7 +255,7 @@ public void testQueueSubmitWithAutoCreateQueue()
|
||||
e.getCause() instanceof AccessControlException);
|
||||
}
|
||||
// Submit to submitonly parent with non existent child queue
|
||||
when(placementMgr.placeApplication(any(), any()))
|
||||
when(placementMgr.placeApplication(any(), any(), any(Boolean.class)))
|
||||
.thenReturn(new ApplicationPlacementContext("root.submitonly.child"));
|
||||
rmAppManager.submitApplication(asContext, "test");
|
||||
}
|
||||
|
@ -18,6 +18,7 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Lists;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
@ -33,7 +34,6 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
||||
@ -83,9 +83,12 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
|
||||
USER1))
|
||||
.build();
|
||||
|
||||
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
|
||||
false, Arrays.asList(userQueueMapping), null);
|
||||
cs.getConfiguration().setQueueMappings(
|
||||
Lists.newArrayList(userQueueMapping));
|
||||
CSMappingPlacementRule ugRule = new CSMappingPlacementRule();
|
||||
ugRule.initialize(cs);
|
||||
queuePlacementRules.add(ugRule);
|
||||
|
||||
pm.updateRules(queuePlacementRules);
|
||||
|
||||
ApplicationSubmissionContext asc = Records.newRecord(
|
||||
@ -102,17 +105,14 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
|
||||
.parentQueue(PARENT_QUEUE)
|
||||
.build();
|
||||
|
||||
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false,
|
||||
Arrays.asList(queueMappingEntity));
|
||||
cs.getConfiguration().setAppNameMappings(
|
||||
Lists.newArrayList(queueMappingEntity));
|
||||
CSMappingPlacementRule anRule = new CSMappingPlacementRule();
|
||||
anRule.initialize(cs);
|
||||
queuePlacementRules.add(anRule);
|
||||
pm.updateRules(queuePlacementRules);
|
||||
try {
|
||||
ApplicationPlacementContext pc = pm.placeApplication(asc, USER2);
|
||||
Assert.assertNotNull(pc);
|
||||
} catch (Exception e) {
|
||||
e.printStackTrace();
|
||||
Assert.fail("Exception not expected");
|
||||
}
|
||||
ApplicationPlacementContext pc = pm.placeApplication(asc, USER2);
|
||||
Assert.assertNotNull(pc);
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -121,10 +121,9 @@ public void testPlacementRuleUpdationOrder() throws Exception {
|
||||
QueueMapping userQueueMapping = QueueMappingBuilder.create()
|
||||
.type(MappingType.USER).source(USER1)
|
||||
.queue(getQueueMapping(PARENT_QUEUE, USER1)).build();
|
||||
UserGroupMappingPlacementRule ugRule = new UserGroupMappingPlacementRule(
|
||||
false, Arrays.asList(userQueueMapping), null);
|
||||
|
||||
// Configure placement rule
|
||||
CSMappingPlacementRule ugRule = new CSMappingPlacementRule();
|
||||
|
||||
conf.set(YarnConfiguration.QUEUE_PLACEMENT_RULES, ugRule.getName());
|
||||
queueMappings.add(userQueueMapping);
|
||||
conf.setQueueMappings(queueMappings);
|
||||
@ -135,7 +134,7 @@ public void testPlacementRuleUpdationOrder() throws Exception {
|
||||
PlacementManager pm = cs.getRMContext().getQueuePlacementManager();
|
||||
|
||||
// As we are setting placement rule, It shouldn't update default
|
||||
// placement rule ie user-group. Number of placemnt rules should be 1.
|
||||
// placement rule ie user-group. Number of placement rules should be 1.
|
||||
Assert.assertEquals(1, pm.getPlacementRules().size());
|
||||
// Verifying if placement rule set is same as the one we configured
|
||||
Assert.assertEquals(ugRule.getName(),
|
||||
|
@ -18,7 +18,6 @@
|
||||
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.placement;
|
||||
|
||||
import static org.junit.Assert.fail;
|
||||
import static org.mockito.Mockito.mock;
|
||||
import static org.mockito.Mockito.when;
|
||||
import static org.mockito.Mockito.isNull;
|
||||
@ -157,7 +156,7 @@ public void testSecondaryGroupMapping() throws IOException, YarnException {
|
||||
.build());
|
||||
}
|
||||
|
||||
@Test(expected = YarnException.class)
|
||||
@Test
|
||||
public void testNullGroupMapping() throws IOException, YarnException {
|
||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
NullGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
@ -171,7 +170,6 @@ public void testNullGroupMapping() throws IOException, YarnException {
|
||||
.inputUser("a")
|
||||
.expectedQueue("default")
|
||||
.build());
|
||||
fail("No Groups for user 'a'");
|
||||
}
|
||||
|
||||
@Test
|
||||
|
@ -159,6 +159,7 @@ public void testAutoCreateLeafQueueCreation() throws Exception {
|
||||
|
||||
csConf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
csConf.setOverrideWithQueueMappings(true);
|
||||
|
||||
mockRM = new MockRM(csConf);
|
||||
cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||
|
@ -114,9 +114,9 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||
public static final String C = CapacitySchedulerConfiguration.ROOT + ".c";
|
||||
public static final String D = CapacitySchedulerConfiguration.ROOT + ".d";
|
||||
public static final String E = CapacitySchedulerConfiguration.ROOT + ".e";
|
||||
public static final String ASUBGROUP1 =
|
||||
public static final String ESUBGROUP1 =
|
||||
CapacitySchedulerConfiguration.ROOT + ".esubgroup1";
|
||||
public static final String AGROUP =
|
||||
public static final String FGROUP =
|
||||
CapacitySchedulerConfiguration.ROOT + ".fgroup";
|
||||
public static final String A1 = A + ".a1";
|
||||
public static final String A2 = A + ".a2";
|
||||
@ -124,14 +124,14 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||
public static final String B2 = B + ".b2";
|
||||
public static final String B3 = B + ".b3";
|
||||
public static final String B4 = B + ".b4subgroup1";
|
||||
public static final String ASUBGROUP1_A = ASUBGROUP1 + ".e";
|
||||
public static final String AGROUP_A = AGROUP + ".f";
|
||||
public static final String ESUBGROUP1_A = ESUBGROUP1 + ".e";
|
||||
public static final String FGROUP_F = FGROUP + ".f";
|
||||
public static final float A_CAPACITY = 20f;
|
||||
public static final float B_CAPACITY = 20f;
|
||||
public static final float C_CAPACITY = 20f;
|
||||
public static final float D_CAPACITY = 20f;
|
||||
public static final float ASUBGROUP1_CAPACITY = 10f;
|
||||
public static final float AGROUP_CAPACITY = 10f;
|
||||
public static final float ESUBGROUP1_CAPACITY = 10f;
|
||||
public static final float FGROUP_CAPACITY = 10f;
|
||||
|
||||
public static final float A1_CAPACITY = 30;
|
||||
public static final float A2_CAPACITY = 70;
|
||||
@ -371,8 +371,8 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration(
|
||||
conf.setCapacity(B, B_CAPACITY);
|
||||
conf.setCapacity(C, C_CAPACITY);
|
||||
conf.setCapacity(D, D_CAPACITY);
|
||||
conf.setCapacity(ASUBGROUP1, ASUBGROUP1_CAPACITY);
|
||||
conf.setCapacity(AGROUP, AGROUP_CAPACITY);
|
||||
conf.setCapacity(ESUBGROUP1, ESUBGROUP1_CAPACITY);
|
||||
conf.setCapacity(FGROUP, FGROUP_CAPACITY);
|
||||
|
||||
// Define 2nd-level queues
|
||||
conf.setQueues(A, new String[] { "a1", "a2" });
|
||||
@ -391,12 +391,12 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration(
|
||||
conf.setCapacity(B4, B4_CAPACITY);
|
||||
conf.setUserLimitFactor(B4, 100.0f);
|
||||
|
||||
conf.setQueues(ASUBGROUP1, new String[] {"e"});
|
||||
conf.setCapacity(ASUBGROUP1_A, 100f);
|
||||
conf.setUserLimitFactor(ASUBGROUP1_A, 100.0f);
|
||||
conf.setQueues(AGROUP, new String[] {"f"});
|
||||
conf.setCapacity(AGROUP_A, 100f);
|
||||
conf.setUserLimitFactor(AGROUP_A, 100.0f);
|
||||
conf.setQueues(ESUBGROUP1, new String[] {"e"});
|
||||
conf.setCapacity(ESUBGROUP1_A, 100f);
|
||||
conf.setUserLimitFactor(ESUBGROUP1_A, 100.0f);
|
||||
conf.setQueues(FGROUP, new String[] {"f"});
|
||||
conf.setCapacity(FGROUP_F, 100f);
|
||||
conf.setUserLimitFactor(FGROUP_F, 100.0f);
|
||||
|
||||
conf.setUserLimitFactor(C, 1.0f);
|
||||
conf.setAutoCreateChildQueueEnabled(C, true);
|
||||
|
@ -86,7 +86,6 @@
|
||||
|
||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||
.NO_LABEL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON;
|
||||
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -106,6 +105,8 @@ public class TestCapacitySchedulerAutoQueueCreation
|
||||
private static final Logger LOG = LoggerFactory.getLogger(
|
||||
TestCapacitySchedulerAutoQueueCreation.class);
|
||||
|
||||
private static final String CURRENT_USER_MAPPING = "%user";
|
||||
|
||||
private static final Resource TEMPLATE_MAX_RES = Resource.newInstance(16 *
|
||||
GB,
|
||||
48);
|
||||
@ -424,16 +425,16 @@ public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
|
||||
|
||||
//dynamic queue mapping
|
||||
try {
|
||||
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a",
|
||||
setupQueueMapping(newCS, CURRENT_USER_MAPPING, "a1",
|
||||
CURRENT_USER_MAPPING);
|
||||
newCS.updatePlacementRules();
|
||||
fail("Expected invalid parent queue mapping failure");
|
||||
|
||||
} catch (IOException e) {
|
||||
//expected exception
|
||||
|
||||
assertTrue(e.getMessage().contains(
|
||||
"invalid parent queue which does not have auto creation of leaf "
|
||||
+ "queues enabled [" + "a" + "]"));
|
||||
"Target queue path 'a1.%user' has a non-managed parent queue"));
|
||||
}
|
||||
|
||||
//"a" is not auto create enabled and app_user does not exist as a leaf
|
||||
@ -446,8 +447,8 @@ public void testQueueMappingValidationFailsWithInvalidParentQueueInMapping()
|
||||
fail("Expected invalid parent queue mapping failure");
|
||||
} catch (IOException e) {
|
||||
//expected exception
|
||||
assertTrue(e.getMessage()
|
||||
.contains("invalid parent queue [" + "INVALID_PARENT_QUEUE" + "]"));
|
||||
assertTrue(e.getMessage().contains(
|
||||
"contains an invalid parent queue 'INVALID_PARENT_QUEUE'"));
|
||||
}
|
||||
} finally {
|
||||
if (newMockRM != null) {
|
||||
@ -477,7 +478,7 @@ public void testQueueMappingUpdatesFailsOnRemovalOfParentQueueInMapping()
|
||||
fail("Expected invalid parent queue mapping failure");
|
||||
} catch (IOException e) {
|
||||
//expected exception
|
||||
assertTrue(e.getMessage().contains("invalid parent queue []"));
|
||||
assertTrue(e.getMessage().contains("invalid parent queue"));
|
||||
}
|
||||
} finally {
|
||||
if (newMockRM != null) {
|
||||
|
@ -25,11 +25,11 @@
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.CSMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
|
||||
import org.apache.hadoop.yarn.util.Records;
|
||||
@ -46,10 +46,8 @@
|
||||
public class TestCapacitySchedulerQueueMappingFactory {
|
||||
|
||||
private static final String QUEUE_MAPPING_NAME = "app-name";
|
||||
private static final String QUEUE_MAPPING_RULE_APP_NAME =
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.placement.AppNameMappingPlacementRule";
|
||||
private static final String QUEUE_MAPPING_RULE_USER_GROUP =
|
||||
"org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule";
|
||||
private static final String QUEUE_MAPPING_RULE =
|
||||
CSMappingPlacementRule.class.getCanonicalName();
|
||||
public static final String USER = "user_";
|
||||
public static final String PARENT_QUEUE = "c";
|
||||
|
||||
@ -59,8 +57,7 @@ public static CapacitySchedulerConfiguration setupQueueMappingsForRules(
|
||||
|
||||
List<String> queuePlacementRules = new ArrayList<>();
|
||||
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_APP_NAME);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE);
|
||||
|
||||
conf.setQueuePlacementRules(queuePlacementRules);
|
||||
|
||||
@ -134,8 +131,7 @@ public void testUpdatePlacementRulesFactory() throws Exception {
|
||||
}
|
||||
|
||||
// verify both placement rules were added successfully
|
||||
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP));
|
||||
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME));
|
||||
assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE));
|
||||
} finally {
|
||||
if(mockRM != null) {
|
||||
mockRM.close();
|
||||
@ -154,7 +150,7 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception {
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
|
||||
List<String> queuePlacementRules = new ArrayList<>();
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE);
|
||||
conf.setQueuePlacementRules(queuePlacementRules);
|
||||
|
||||
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
|
||||
@ -200,8 +196,8 @@ public void testNestedUserQueueWithStaticParentQueue() throws Exception {
|
||||
List<PlacementRule> rules =
|
||||
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||
|
||||
UserGroupMappingPlacementRule r =
|
||||
(UserGroupMappingPlacementRule) rules.get(0);
|
||||
CSMappingPlacementRule r =
|
||||
(CSMappingPlacementRule) rules.get(0);
|
||||
|
||||
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
||||
assertEquals("Queue", "b1", ctx.getQueue());
|
||||
@ -327,7 +323,7 @@ private void testNestedUserQueueWithDynamicParentQueue(
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
|
||||
List<String> queuePlacementRules = new ArrayList<>();
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE);
|
||||
conf.setQueuePlacementRules(queuePlacementRules);
|
||||
|
||||
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
|
||||
@ -353,8 +349,8 @@ private void testNestedUserQueueWithDynamicParentQueue(
|
||||
List<PlacementRule> rules =
|
||||
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||
|
||||
UserGroupMappingPlacementRule r =
|
||||
(UserGroupMappingPlacementRule) rules.get(0);
|
||||
CSMappingPlacementRule r =
|
||||
(CSMappingPlacementRule) rules.get(0);
|
||||
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, user);
|
||||
assertEquals("Queue", user, ctx.getQueue());
|
||||
|
||||
@ -382,7 +378,7 @@ public void testDynamicPrimaryGroupQueue() throws Exception {
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
|
||||
List<String> queuePlacementRules = new ArrayList<>();
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE);
|
||||
conf.setQueuePlacementRules(queuePlacementRules);
|
||||
|
||||
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
|
||||
@ -426,8 +422,8 @@ public void testDynamicPrimaryGroupQueue() throws Exception {
|
||||
|
||||
List<PlacementRule> rules =
|
||||
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||
UserGroupMappingPlacementRule r =
|
||||
(UserGroupMappingPlacementRule) rules.get(0);
|
||||
CSMappingPlacementRule r =
|
||||
(CSMappingPlacementRule) rules.get(0);
|
||||
|
||||
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
||||
assertEquals("Queue", "b1", ctx.getQueue());
|
||||
@ -451,7 +447,7 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
|
||||
|
||||
List<String> queuePlacementRules = new ArrayList<>();
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP);
|
||||
queuePlacementRules.add(QUEUE_MAPPING_RULE);
|
||||
conf.setQueuePlacementRules(queuePlacementRules);
|
||||
|
||||
List<QueueMapping> existingMappingsForUG = conf.getQueueMappings();
|
||||
@ -473,11 +469,11 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
.queue("%primary_group")
|
||||
.build();
|
||||
|
||||
// u:b4:%secondary_group
|
||||
// u:b4:c.%secondary_group
|
||||
QueueMapping userQueueMapping3 = QueueMappingBuilder.create()
|
||||
.type(QueueMapping.MappingType.USER)
|
||||
.source("e")
|
||||
.queue("%secondary_group")
|
||||
.queue("c.%secondary_group")
|
||||
.build();
|
||||
|
||||
queueMappingsForUG.add(userQueueMapping1);
|
||||
@ -503,8 +499,8 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
|
||||
List<PlacementRule> rules =
|
||||
cs.getRMContext().getQueuePlacementManager().getPlacementRules();
|
||||
UserGroupMappingPlacementRule r =
|
||||
(UserGroupMappingPlacementRule) rules.get(0);
|
||||
CSMappingPlacementRule r =
|
||||
(CSMappingPlacementRule) rules.get(0);
|
||||
|
||||
ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1");
|
||||
assertEquals("Queue", "b1", ctx.getQueue());
|
||||
@ -514,6 +510,7 @@ public void testFixedUserWithDynamicGroupQueue() throws Exception {
|
||||
|
||||
ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "e");
|
||||
assertEquals("Queue", "esubgroup1", ctx2.getQueue());
|
||||
assertEquals("Queue", "root.c", ctx2.getParentQueue());
|
||||
} finally {
|
||||
if (mockRM != null) {
|
||||
mockRM.close();
|
||||
|
@ -18,15 +18,14 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -92,12 +91,13 @@ public void testQueueMappingTrimSpaces() throws IOException {
|
||||
// space trimming
|
||||
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, " u : a : " + Q1);
|
||||
cs.reinitialize(conf, null);
|
||||
checkQMapping(
|
||||
QueueMappingBuilder.create()
|
||||
.type(MappingType.USER)
|
||||
.source("a")
|
||||
.queue(Q1)
|
||||
.build());
|
||||
|
||||
List<MappingRule> rules = cs.getConfiguration().getMappingRules();
|
||||
|
||||
String ruleStr = rules.get(0).toString();
|
||||
assert(ruleStr.contains("variable='%user'"));
|
||||
assert(ruleStr.contains("value='a'"));
|
||||
assert(ruleStr.contains("queueName='q1'"));
|
||||
}
|
||||
|
||||
@Test
|
||||
@ -155,13 +155,4 @@ private void checkInvalidQMapping(YarnConfiguration conf,
|
||||
Assert.assertTrue("invalid mapping did not throw exception for " + reason,
|
||||
fail);
|
||||
}
|
||||
|
||||
private void checkQMapping(QueueMapping expected)
|
||||
throws IOException {
|
||||
UserGroupMappingPlacementRule rule =
|
||||
(UserGroupMappingPlacementRule) cs.getRMContext()
|
||||
.getQueuePlacementManager().getPlacementRules().get(0);
|
||||
QueueMapping queueMapping = rule.getQueueMappings().get(0);
|
||||
Assert.assertEquals(queueMapping, expected);
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user