YARN-10274. Merge QueueMapping and QueueMappingEntity. Contributed by Gergely Pollak

This commit is contained in:
Szilard Nemeth 2020-06-05 11:38:32 +02:00
parent a4835db95a
commit 8b146c17b3
8 changed files with 74 additions and 146 deletions

View File

@ -48,7 +48,7 @@ public class AppNameMappingPlacementRule extends PlacementRule {
private static final String QUEUE_MAPPING_NAME = "app-name"; private static final String QUEUE_MAPPING_NAME = "app-name";
private boolean overrideWithQueueMappings = false; private boolean overrideWithQueueMappings = false;
private List<QueueMappingEntity> mappings = null; private List<QueueMapping> mappings = null;
protected CapacitySchedulerQueueManager queueManager; protected CapacitySchedulerQueueManager queueManager;
public AppNameMappingPlacementRule() { public AppNameMappingPlacementRule() {
@ -56,7 +56,7 @@ public AppNameMappingPlacementRule() {
} }
public AppNameMappingPlacementRule(boolean overrideWithQueueMappings, public AppNameMappingPlacementRule(boolean overrideWithQueueMappings,
List<QueueMappingEntity> newMappings) { List<QueueMapping> newMappings) {
this.overrideWithQueueMappings = overrideWithQueueMappings; this.overrideWithQueueMappings = overrideWithQueueMappings;
this.mappings = newMappings; this.mappings = newMappings;
} }
@ -76,16 +76,16 @@ public boolean initialize(ResourceScheduler scheduler)
LOG.info( LOG.info(
"Initialized App Name queue mappings, override: " + overrideWithQueueMappings); "Initialized App Name queue mappings, override: " + overrideWithQueueMappings);
List<QueueMappingEntity> queueMappings = List<QueueMapping> queueMappings =
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME); conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
// Get new user mappings // Get new user mappings
List<QueueMappingEntity> newMappings = new ArrayList<>(); List<QueueMapping> newMappings = new ArrayList<>();
queueManager = schedulerContext.getCapacitySchedulerQueueManager(); queueManager = schedulerContext.getCapacitySchedulerQueueManager();
// check if mappings refer to valid queues // check if mappings refer to valid queues
for (QueueMappingEntity mapping : queueMappings) { for (QueueMapping mapping : queueMappings) {
QueuePath queuePath = mapping.getQueuePath(); QueuePath queuePath = mapping.getQueuePath();
if (isStaticQueueMapping(mapping)) { if (isStaticQueueMapping(mapping)) {
@ -109,7 +109,7 @@ public boolean initialize(ResourceScheduler scheduler)
//validate if parent queue is specified, //validate if parent queue is specified,
// then it should exist and // then it should exist and
// be an instance of AutoCreateEnabledParentQueue // be an instance of AutoCreateEnabledParentQueue
QueueMappingEntity newMapping = QueueMapping newMapping =
validateAndGetAutoCreatedQueueMapping(queueManager, mapping, validateAndGetAutoCreatedQueueMapping(queueManager, mapping,
queuePath); queuePath);
if (newMapping == null) { if (newMapping == null) {
@ -123,7 +123,7 @@ public boolean initialize(ResourceScheduler scheduler)
// if its an instance of leaf queue // if its an instance of leaf queue
// if its an instance of auto created leaf queue, // if its an instance of auto created leaf queue,
// then extract parent queue name and update queue mapping // then extract parent queue name and update queue mapping
QueueMappingEntity newMapping = validateAndGetQueueMapping( QueueMapping newMapping = validateAndGetQueueMapping(
queueManager, queue, mapping, queuePath); queueManager, queue, mapping, queuePath);
newMappings.add(newMapping); newMappings.add(newMapping);
} }
@ -134,7 +134,7 @@ public boolean initialize(ResourceScheduler scheduler)
// if parent queue is specified, then // if parent queue is specified, then
// parent queue exists and an instance of AutoCreateEnabledParentQueue // parent queue exists and an instance of AutoCreateEnabledParentQueue
// //
QueueMappingEntity newMapping = validateAndGetAutoCreatedQueueMapping( QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath); queueManager, mapping, queuePath);
if (newMapping != null) { if (newMapping != null) {
newMappings.add(newMapping); newMappings.add(newMapping);
@ -160,7 +160,7 @@ private static boolean ifQueueDoesNotExist(CSQueue queue) {
private ApplicationPlacementContext getAppPlacementContext(String user, private ApplicationPlacementContext getAppPlacementContext(String user,
String applicationName) throws IOException { String applicationName) throws IOException {
for (QueueMappingEntity mapping : mappings) { for (QueueMapping mapping : mappings) {
if (mapping.getSource().equals(CURRENT_APP_MAPPING)) { if (mapping.getSource().equals(CURRENT_APP_MAPPING)) {
if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) { if (mapping.getQueue().equals(CURRENT_APP_MAPPING)) {
return getPlacementContext(mapping, applicationName, queueManager); return getPlacementContext(mapping, applicationName, queueManager);

View File

@ -82,6 +82,7 @@ private QueueMapping(QueueMappingBuilder builder) {
this.source = builder.source; this.source = builder.source;
this.queue = builder.queue; this.queue = builder.queue;
this.parentQueue = builder.parentQueue; this.parentQueue = builder.parentQueue;
this.fullPath = (parentQueue != null) ? (parentQueue + DOT + queue) : queue;
} }
/** /**
@ -89,8 +90,9 @@ private QueueMapping(QueueMappingBuilder builder) {
* *
*/ */
public enum MappingType { public enum MappingType {
USER("u"),
USER("u"), GROUP("g"); GROUP("g"),
APPLICATION("a");
private final String type; private final String type;
@ -108,6 +110,7 @@ public String toString() {
private String source; private String source;
private String queue; private String queue;
private String parentQueue; private String parentQueue;
private String fullPath;
private final static String DELIMITER = ":"; private final static String DELIMITER = ":";
@ -132,7 +135,7 @@ public String getSource() {
} }
public String getFullPath() { public String getFullPath() {
return (parentQueue != null ? parentQueue + DOT + queue : queue); return fullPath;
} }
public QueuePath getQueuePath() { public QueuePath getQueuePath() {
@ -197,4 +200,10 @@ public String toString() {
return type.toString() + DELIMITER + source + DELIMITER return type.toString() + DELIMITER + source + DELIMITER
+ (parentQueue != null ? parentQueue + "." + queue : queue); + (parentQueue != null ? parentQueue + "." + queue : queue);
} }
public String toTypelessString() {
return source + DELIMITER
+ (parentQueue != null ? parentQueue + "." + queue : queue);
}
} }

View File

@ -1,98 +0,0 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.placement;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
public class QueueMappingEntity {
private String source;
private String queue;
private String parentQueue;
private String fullPath;
public final static String DELIMITER = ":";
public QueueMappingEntity(String source, String queue) {
this.source = source;
this.queue = queue;
this.parentQueue = null;
this.fullPath = queue;
}
public QueueMappingEntity(String source, String queue, String parentQueue) {
this.source = source;
this.queue = queue;
this.parentQueue = parentQueue;
this.fullPath = parentQueue + DOT + queue;
}
public QueueMappingEntity(String source, QueuePath path) {
this.source = source;
this.queue = path.getLeafQueue();
this.parentQueue = path.getParentQueue();
this.fullPath = parentQueue + DOT + queue;
}
public String getQueue() {
return queue;
}
public String getParentQueue() {
return parentQueue;
}
public String getFullPath() {
return fullPath;
}
public String getSource() {
return source;
}
public boolean hasParentQueue() {
return parentQueue != null;
}
public QueuePath getQueuePath() {
//This is to make sure the parsing is the same everywhere, but the
//whole parsing part should be moved to QueuePathConstructor
return QueuePlacementRuleUtils.extractQueuePath(getFullPath());
}
@Override
public int hashCode() {
return super.hashCode();
}
@Override
public boolean equals(Object obj) {
if (obj instanceof QueueMappingEntity) {
QueueMappingEntity other = (QueueMappingEntity) obj;
return (other.source.equals(source) &&
other.queue.equals(queue));
} else {
return false;
}
}
public String toString() {
return source + DELIMITER + (parentQueue != null ?
parentQueue + DOT + queue :
queue);
}
}

View File

@ -65,8 +65,8 @@ public static void validateQueueMappingUnderParentQueue(
} }
} }
public static QueueMappingEntity validateAndGetAutoCreatedQueueMapping( public static QueueMapping validateAndGetAutoCreatedQueueMapping(
CapacitySchedulerQueueManager queueManager, QueueMappingEntity mapping, CapacitySchedulerQueueManager queueManager, QueueMapping mapping,
QueuePath queuePath) throws IOException { QueuePath queuePath) throws IOException {
if (queuePath.hasParentQueue()) { if (queuePath.hasParentQueue()) {
//if parent queue is specified, //if parent queue is specified,
@ -74,16 +74,19 @@ public static QueueMappingEntity validateAndGetAutoCreatedQueueMapping(
validateQueueMappingUnderParentQueue(queueManager.getQueue( validateQueueMappingUnderParentQueue(queueManager.getQueue(
queuePath.getParentQueue()), queuePath.getParentQueue(), queuePath.getParentQueue()), queuePath.getParentQueue(),
queuePath.getFullPath()); queuePath.getFullPath());
return new QueueMappingEntity(mapping.getSource(), return QueueMapping.QueueMappingBuilder.create()
queuePath.getFullPath(), queuePath.getParentQueue()); .type(mapping.getType())
.source(mapping.getSource())
.queuePath(queuePath)
.build();
} }
return null; return null;
} }
public static QueueMappingEntity validateAndGetQueueMapping( public static QueueMapping validateAndGetQueueMapping(
CapacitySchedulerQueueManager queueManager, CSQueue queue, CapacitySchedulerQueueManager queueManager, CSQueue queue,
QueueMappingEntity mapping, QueuePath queuePath) throws IOException { QueueMapping mapping, QueuePath queuePath) throws IOException {
if (!(queue instanceof LeafQueue)) { if (!(queue instanceof LeafQueue)) {
throw new IOException( throw new IOException(
"mapping contains invalid or non-leaf queue : " + "mapping contains invalid or non-leaf queue : " +
@ -93,7 +96,7 @@ public static QueueMappingEntity validateAndGetQueueMapping(
if (queue instanceof AutoCreatedLeafQueue && queue if (queue instanceof AutoCreatedLeafQueue && queue
.getParent() instanceof ManagedParentQueue) { .getParent() instanceof ManagedParentQueue) {
QueueMappingEntity newMapping = validateAndGetAutoCreatedQueueMapping( QueueMapping newMapping = validateAndGetAutoCreatedQueueMapping(
queueManager, mapping, queuePath); queueManager, mapping, queuePath);
if (newMapping == null) { if (newMapping == null) {
throw new IOException( throw new IOException(
@ -105,7 +108,7 @@ public static QueueMappingEntity validateAndGetQueueMapping(
return mapping; return mapping;
} }
public static boolean isStaticQueueMapping(QueueMappingEntity mapping) { public static boolean isStaticQueueMapping(QueueMapping mapping) {
return !mapping.getQueue().contains(CURRENT_USER_MAPPING) && !mapping return !mapping.getQueue().contains(CURRENT_USER_MAPPING) && !mapping
.getQueue().contains(PRIMARY_GROUP_MAPPING) .getQueue().contains(PRIMARY_GROUP_MAPPING)
&& !mapping.getQueue().contains(SECONDARY_GROUP_MAPPING); && !mapping.getQueue().contains(SECONDARY_GROUP_MAPPING);
@ -126,13 +129,13 @@ public static QueuePath extractQueuePath(String queuePath) {
} }
public static ApplicationPlacementContext getPlacementContext( public static ApplicationPlacementContext getPlacementContext(
QueueMappingEntity mapping, CapacitySchedulerQueueManager queueManager) QueueMapping mapping, CapacitySchedulerQueueManager queueManager)
throws IOException { throws IOException {
return getPlacementContext(mapping, mapping.getQueue(), queueManager); return getPlacementContext(mapping, mapping.getQueue(), queueManager);
} }
public static ApplicationPlacementContext getPlacementContext( public static ApplicationPlacementContext getPlacementContext(
QueueMappingEntity mapping, String leafQueueName, QueueMapping mapping, String leafQueueName,
CapacitySchedulerQueueManager queueManager) throws IOException { CapacitySchedulerQueueManager queueManager) throws IOException {
//leafQueue name no longer identifies a queue uniquely checking ambiguity //leafQueue name no longer identifies a queue uniquely checking ambiguity

View File

@ -42,7 +42,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType;
@ -1039,12 +1038,12 @@ public void setOverrideWithQueueMappings(boolean overrideWithQueueMappings) {
setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings); setBoolean(ENABLE_QUEUE_MAPPING_OVERRIDE, overrideWithQueueMappings);
} }
public List<QueueMappingEntity> getQueueMappingEntity( public List<QueueMapping> getQueueMappingEntity(
String queueMappingSuffix) { String queueMappingSuffix) {
String queueMappingName = buildQueueMappingRuleProperty(queueMappingSuffix); String queueMappingName = buildQueueMappingRuleProperty(queueMappingSuffix);
List<QueueMappingEntity> mappings = List<QueueMapping> mappings =
new ArrayList<QueueMappingEntity>(); new ArrayList<QueueMapping>();
Collection<String> mappingsString = Collection<String> mappingsString =
getTrimmedStringCollection(queueMappingName); getTrimmedStringCollection(queueMappingName);
for (String mappingValue : mappingsString) { for (String mappingValue : mappingsString) {
@ -1058,10 +1057,11 @@ public List<QueueMappingEntity> getQueueMappingEntity(
//Mappings should be consistent, and have the parent path parsed //Mappings should be consistent, and have the parent path parsed
// from the beginning // from the beginning
QueueMappingEntity m = new QueueMappingEntity( QueueMapping m = QueueMapping.QueueMappingBuilder.create()
mapping[0], .type(QueueMapping.MappingType.APPLICATION)
QueuePlacementRuleUtils.extractQueuePath(mapping[1])); .source(mapping[0])
.queuePath(QueuePlacementRuleUtils.extractQueuePath(mapping[1]))
.build();
mappings.add(m); mappings.add(m);
} }
@ -1076,15 +1076,15 @@ private String buildQueueMappingRuleProperty (String queueMappingSuffix) {
} }
@VisibleForTesting @VisibleForTesting
public void setQueueMappingEntities(List<QueueMappingEntity> queueMappings, public void setQueueMappingEntities(List<QueueMapping> queueMappings,
String queueMappingSuffix) { String queueMappingSuffix) {
if (queueMappings == null) { if (queueMappings == null) {
return; return;
} }
List<String> queueMappingStrs = new ArrayList<>(); List<String> queueMappingStrs = new ArrayList<>();
for (QueueMappingEntity mapping : queueMappings) { for (QueueMapping mapping : queueMappings) {
queueMappingStrs.add(mapping.toString()); queueMappingStrs.add(mapping.toTypelessString());
} }
String mappingRuleProp = buildQueueMappingRuleProperty(queueMappingSuffix); String mappingRuleProp = buildQueueMappingRuleProperty(queueMappingSuffix);

View File

@ -47,13 +47,13 @@ public void setup() {
SimpleGroupsMapping.class, GroupMappingServiceProvider.class); SimpleGroupsMapping.class, GroupMappingServiceProvider.class);
} }
private void verifyQueueMapping(QueueMappingEntity queueMapping, private void verifyQueueMapping(QueueMapping queueMapping,
String user, String expectedQueue) throws YarnException { String user, String expectedQueue) throws YarnException {
verifyQueueMapping(queueMapping, user, verifyQueueMapping(queueMapping, user,
queueMapping.getQueue(), expectedQueue, false); queueMapping.getQueue(), expectedQueue, false);
} }
private void verifyQueueMapping(QueueMappingEntity queueMapping, private void verifyQueueMapping(QueueMapping queueMapping,
String user, String inputQueue, String expectedQueue, String user, String inputQueue, String expectedQueue,
boolean overwrite) throws YarnException { boolean overwrite) throws YarnException {
AppNameMappingPlacementRule rule = new AppNameMappingPlacementRule( AppNameMappingPlacementRule rule = new AppNameMappingPlacementRule(
@ -81,23 +81,31 @@ private void verifyQueueMapping(QueueMappingEntity queueMapping,
ctx != null ? ctx.getQueue() : inputQueue); ctx != null ? ctx.getQueue() : inputQueue);
} }
public QueueMapping queueMappingBuilder(String source, String queue) {
return QueueMapping.QueueMappingBuilder.create()
.type(QueueMapping.MappingType.APPLICATION)
.source(source)
.queue(queue)
.build();
}
@Test @Test
public void testMapping() throws YarnException { public void testMapping() throws YarnException {
// simple base case for mapping user to queue // simple base case for mapping user to queue
verifyQueueMapping(new QueueMappingEntity(APP_NAME, verifyQueueMapping(queueMappingBuilder(APP_NAME,
"q1"), "user_1", "q1"); "q1"), "user_1", "q1");
verifyQueueMapping(new QueueMappingEntity("%application", "q2"), "user_1", verifyQueueMapping(queueMappingBuilder("%application", "q2"), "user_1",
"q2"); "q2");
verifyQueueMapping(new QueueMappingEntity("%application", "%application"), verifyQueueMapping(queueMappingBuilder("%application", "%application"),
"user_1", APP_NAME); "user_1", APP_NAME);
// specify overwritten, and see if user specified a queue, and it will be // specify overwritten, and see if user specified a queue, and it will be
// overridden // overridden
verifyQueueMapping(new QueueMappingEntity(APP_NAME, verifyQueueMapping(queueMappingBuilder(APP_NAME,
"q1"), "1", "q2", "q1", true); "q1"), "1", "q2", "q1", true);
// if overwritten not specified, it should be which user specified // if overwritten not specified, it should be which user specified
verifyQueueMapping(new QueueMappingEntity(APP_NAME, verifyQueueMapping(queueMappingBuilder(APP_NAME,
"q1"), "1", "q2", "q2", false); "q1"), "1", "q2", "q2", false);
} }
} }

View File

@ -95,8 +95,12 @@ public void testPlaceApplicationWithPlacementRuleChain() throws Exception {
Assert.assertNull("Placement should be null", Assert.assertNull("Placement should be null",
pm.placeApplication(asc, USER2)); pm.placeApplication(asc, USER2));
QueueMappingEntity queueMappingEntity = new QueueMappingEntity(APP_NAME, QueueMapping queueMappingEntity = QueueMapping.QueueMappingBuilder.create()
USER1, PARENT_QUEUE); .type(MappingType.APPLICATION)
.source(APP_NAME)
.queue(USER1)
.parentQueue(PARENT_QUEUE)
.build();
AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false, AppNameMappingPlacementRule anRule = new AppNameMappingPlacementRule(false,
Arrays.asList(queueMappingEntity)); Arrays.asList(queueMappingEntity));

View File

@ -29,7 +29,6 @@
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.MappingType;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMapping.QueueMappingBuilder;
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity;
import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping;
@ -84,17 +83,20 @@ public static CapacitySchedulerConfiguration setupQueueMappingsForRules(
existingMappingsForUG.addAll(queueMappingsForUG); existingMappingsForUG.addAll(queueMappingsForUG);
conf.setQueueMappings(existingMappingsForUG); conf.setQueueMappings(existingMappingsForUG);
List<QueueMappingEntity> existingMappingsForAN = List<QueueMapping> existingMappingsForAN =
conf.getQueueMappingEntity(QUEUE_MAPPING_NAME); conf.getQueueMappingEntity(QUEUE_MAPPING_NAME);
//set queue mapping //set queue mapping
List<QueueMappingEntity> queueMappingsForAN = List<QueueMapping> queueMappingsForAN =
new ArrayList<>(); new ArrayList<>();
for (int i = 0; i < sourceIds.length; i++) { for (int i = 0; i < sourceIds.length; i++) {
//Set C as parent queue name for auto queue creation //Set C as parent queue name for auto queue creation
QueueMappingEntity queueMapping = QueueMapping queueMapping = QueueMapping.QueueMappingBuilder.create()
new QueueMappingEntity(USER + sourceIds[i], .type(MappingType.APPLICATION)
getQueueMapping(parentQueue, USER + sourceIds[i])); .source(USER + sourceIds[i])
.queue(getQueueMapping(parentQueue, USER + sourceIds[i]))
.build();
queueMappingsForAN.add(queueMapping); queueMappingsForAN.add(queueMapping);
} }