From f0699a740666fef907db80d5ce9d06e0a9fd0e08 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Fri, 18 Oct 2019 11:19:45 +0200 Subject: [PATCH] YARN-9841. Capacity scheduler: add support for combined %user + %primary_group mapping. Contributed by Manikandan R --- .../UserGroupMappingPlacementRule.java | 16 ++- .../TestUserGroupMappingPlacementRule.java | 43 ++++-- ...tCapacitySchedulerQueueMappingFactory.java | 127 +++++++++++++++++- .../scheduler/fair/SimpleGroupsMapping.java | 3 - .../src/site/markdown/CapacityScheduler.md | 8 +- 5 files changed, 174 insertions(+), 23 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java index 71541e3538..b3c0da185b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/placement/UserGroupMappingPlacementRule.java @@ -162,7 +162,14 @@ private ApplicationPlacementContext getPlacementForUser(String user) for (QueueMapping mapping : mappings) { if (mapping.type == MappingType.USER) { if (mapping.source.equals(CURRENT_USER_MAPPING)) { - if (mapping.queue.equals(CURRENT_USER_MAPPING)) { + if (mapping.getParentQueue() != null + && mapping.getParentQueue().equals(PRIMARY_GROUP_MAPPING) + && mapping.getQueue().equals(CURRENT_USER_MAPPING)) { + return getPlacementContext( + new QueueMapping(mapping.getType(), mapping.getSource(), + CURRENT_USER_MAPPING, groups.getGroups(user).get(0)), + user); + } else if (mapping.queue.equals(CURRENT_USER_MAPPING)) { return getPlacementContext(mapping, user); } else if (mapping.queue.equals(PRIMARY_GROUP_MAPPING)) { return getPlacementContext(mapping, groups.getGroups(user).get(0)); @@ -375,7 +382,12 @@ private static boolean ifQueueDoesNotExist(CSQueue queue) { private static QueueMapping validateAndGetAutoCreatedQueueMapping( CapacitySchedulerQueueManager queueManager, QueueMapping mapping, QueuePath queuePath) throws IOException { - if (queuePath.hasParentQueue()) { + if (queuePath.hasParentQueue() + && queuePath.getParentQueue().equals(PRIMARY_GROUP_MAPPING)) { + // dynamic parent queue + return new QueueMapping(mapping.getType(), mapping.getSource(), + queuePath.getLeafQueue(), queuePath.getParentQueue()); + } else if (queuePath.hasParentQueue()) { //if parent queue is specified, // then it should exist and be an instance of ManagedParentQueue validateParentQueue(queueManager.getQueue(queuePath.getParentQueue()), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java index 589e180ec3..43218a94df 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/placement/TestUserGroupMappingPlacementRule.java @@ -57,6 +57,13 @@ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, String inputQueue, String expectedQueue, boolean overwrite) throws YarnException { + verifyQueueMapping(queueMapping, inputUser, inputQueue, expectedQueue, + overwrite, null); + } + + private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, + String inputQueue, String expectedQueue, boolean overwrite, + String expectedParentQueue) throws YarnException { Groups groups = new Groups(conf); UserGroupMappingPlacementRule rule = new UserGroupMappingPlacementRule( overwrite, Arrays.asList(queueMapping), groups); @@ -68,8 +75,12 @@ private void verifyQueueMapping(QueueMapping queueMapping, String inputUser, ApplicationSubmissionContext.class); asc.setQueue(inputQueue); ApplicationPlacementContext ctx = rule.getPlacementForApp(asc, inputUser); - Assert.assertEquals(expectedQueue, + Assert.assertEquals("Queue", expectedQueue, ctx != null ? ctx.getQueue() : inputQueue); + if (expectedParentQueue != null) { + Assert.assertEquals("Parent Queue", expectedParentQueue, + ctx.getParentQueue()); + } } @Test @@ -97,35 +108,39 @@ public void testMapping() throws YarnException { "q2"); verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", "%user"), "a", "a"); - verifyQueueMapping(new QueueMapping(MappingType.USER, "%user", - "%primary_group"), "a", "agroup"); + verifyQueueMapping( + new QueueMapping(MappingType.USER, "%user", "%primary_group"), "a", + "agroup"); + verifyQueueMapping( + new QueueMapping(MappingType.USER, "%user", "%user", "%primary_group"), + "a", YarnConfiguration.DEFAULT_QUEUE_NAME, "a", false, "agroup"); verifyQueueMapping(new QueueMapping(MappingType.GROUP, "asubgroup1", "q1"), "a", "q1"); // specify overwritten, and see if user specified a queue, and it will be // overridden - verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), - "user", "q2", "q1", true); + verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), "user", + "q2", "q1", true); // if overwritten not specified, it should be which user specified - verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), - "user", "q2", "q2", false); + verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"), "user", + "q2", "q2", false); // if overwritten not specified, it should be which user specified - verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup", - "%user", "usergroup"), + verifyQueueMapping( + new QueueMapping(MappingType.GROUP, "usergroup", "%user", "usergroup"), "user", "default", "user", false); // if overwritten not specified, it should be which user specified - verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup", - "%user", "usergroup"), + verifyQueueMapping( + new QueueMapping(MappingType.GROUP, "usergroup", "%user", "usergroup"), "user", "agroup", "user", true); //If user specific queue is enabled for a specified group under a given // parent queue - verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", - "%user", "parent1"), - "a", "a"); + verifyQueueMapping( + new QueueMapping(MappingType.GROUP, "agroup", "%user", "parent1"), "a", + "a"); //If user specific queue is enabled for a specified group without parent // queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java index b4f97123c0..e1eebc4952 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueMappingFactory.java @@ -18,12 +18,18 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.placement.ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.placement.QueueMappingEntity; import org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SimpleGroupsMapping; +import org.apache.hadoop.yarn.util.Records; import org.junit.Test; import java.util.ArrayList; @@ -32,7 +38,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.getQueueMapping; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestCapacitySchedulerAutoCreatedQueueBase.setupQueueConfiguration; import static org.hamcrest.CoreMatchers.hasItems; -import static org.junit.Assert.assertThat; +import static org.junit.Assert.*; public class TestCapacitySchedulerQueueMappingFactory { @@ -125,4 +131,123 @@ public void testUpdatePlacementRulesFactory() throws Exception { assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_USER_GROUP)); assertThat(placementRuleNames, hasItems(QUEUE_MAPPING_RULE_APP_NAME)); } + + @Test + public void testNestedUserQueueWithDynamicParentQueue() throws Exception { + + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + + List queuePlacementRules = new ArrayList<>(); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); + conf.setQueuePlacementRules(queuePlacementRules); + + List existingMappingsForUG = + conf.getQueueMappings(); + + // set queue mapping + List queueMappingsForUG = + new ArrayList<>(); + + // u:%user:%primary_group.%user + UserGroupMappingPlacementRule.QueueMapping userQueueMapping = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, + "%user", getQueueMapping("%primary_group", "%user")); + queueMappingsForUG.add(userQueueMapping); + + existingMappingsForUG.addAll(queueMappingsForUG); + conf.setQueueMappings(existingMappingsForUG); + + // override with queue mappings + conf.setOverrideWithQueueMappings(true); + + mockRM = new MockRM(conf); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + cs.start(); + + ApplicationSubmissionContext asc = + Records.newRecord(ApplicationSubmissionContext.class); + asc.setQueue("default"); + String inputUser = "a"; + + List rules = + cs.getRMContext().getQueuePlacementManager().getPlacementRules(); + + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); + ApplicationPlacementContext ctx = r.getPlacementForApp(asc, inputUser); + assertEquals("Queue", "a", ctx.getQueue()); + assertEquals("Group", "agroup", ctx.getParentQueue()); + } + + @Test + public void testNestedUserQueueWithStaticParentQueue() throws Exception { + + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + setupQueueConfiguration(conf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + + List queuePlacementRules = new ArrayList<>(); + queuePlacementRules.add(QUEUE_MAPPING_RULE_USER_GROUP); + conf.setQueuePlacementRules(queuePlacementRules); + + List existingMappingsForUG = + conf.getQueueMappings(); + + // set queue mapping + List queueMappingsForUG = + new ArrayList<>(); + + // u:user1:b1 + UserGroupMappingPlacementRule.QueueMapping userQueueMapping1 = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "user1", + "b1"); + // u:%user:parentqueue.%user + UserGroupMappingPlacementRule.QueueMapping userQueueMapping2 = + new UserGroupMappingPlacementRule.QueueMapping( + UserGroupMappingPlacementRule.QueueMapping.MappingType.USER, "%user", + getQueueMapping("c", "%user")); + queueMappingsForUG.add(userQueueMapping1); + queueMappingsForUG.add(userQueueMapping2); + + existingMappingsForUG.addAll(queueMappingsForUG); + conf.setQueueMappings(existingMappingsForUG); + + // override with queue mappings + conf.setOverrideWithQueueMappings(true); + + mockRM = new MockRM(conf); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + cs.updatePlacementRules(); + mockRM.start(); + cs.start(); + + ApplicationSubmissionContext asc = + Records.newRecord(ApplicationSubmissionContext.class); + asc.setQueue("default"); + + List rules = + cs.getRMContext().getQueuePlacementManager().getPlacementRules(); + + UserGroupMappingPlacementRule r = + (UserGroupMappingPlacementRule) rules.get(0); + + ApplicationPlacementContext ctx = r.getPlacementForApp(asc, "user1"); + assertEquals("Queue", "b1", ctx.getQueue()); + + ApplicationPlacementContext ctx2 = r.getPlacementForApp(asc, "user2"); + assertEquals("Queue", "user2", ctx2.getQueue()); + assertEquals("Queue", "c", ctx2.getParentQueue()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java index 47a33d8a19..f7648c86d4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java @@ -33,12 +33,9 @@ public List getGroups(String user) { @Override public void cacheGroupsRefresh() throws IOException { - throw new UnsupportedOperationException(); } @Override public void cacheGroupsAdd(List groups) throws IOException { - throw new UnsupportedOperationException(); } - } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md index 511a485798..5a339cbf46 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-site/src/site/markdown/CapacityScheduler.md @@ -170,12 +170,14 @@ Example: ``` yarn.scheduler.capacity.queue-mappings - u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group + u:user1:queue1,g:group1:queue2,u:%user:%user,u:user2:%primary_group,u:user3:%secondary_group,u:%user:%primary_group.%user Here, is mapped to , is mapped to , maps users to queues with the same name as user, is mapped - to queue name same as respectively. The mappings will be - evaluated from left to right, and the first valid mapping will be used. + to queue name same as , maps users to queue with the + same name as user but parent queue name should be same as + of the user respectively. The mappings will be evaluated from left to + right, and the first valid mapping will be used.