YARN-7657. Queue Mapping could provide options to provide 'user' specific auto-created queues under a specified group parent queue. (Suma Shivaprasad via wangda)
Change-Id: I32d566e8727840e43c0d66e39a77edef017e3a83
This commit is contained in:
parent
a5b27b3c67
commit
b167d60763
@ -104,6 +104,10 @@ public String getParentQueue() {
|
||||
return parentQueue;
|
||||
}
|
||||
|
||||
public boolean hasParentQueue() {
|
||||
return parentQueue != null;
|
||||
}
|
||||
|
||||
public MappingType getType() {
|
||||
return type;
|
||||
}
|
||||
@ -164,6 +168,9 @@ private ApplicationPlacementContext getPlacementForUser(String user)
|
||||
if (mapping.type == MappingType.GROUP) {
|
||||
for (String userGroups : groups.getGroups(user)) {
|
||||
if (userGroups.equals(mapping.source)) {
|
||||
if (mapping.queue.equals(CURRENT_USER_MAPPING)) {
|
||||
return getPlacementContext(mapping, user);
|
||||
}
|
||||
return getPlacementContext(mapping);
|
||||
}
|
||||
}
|
||||
|
@ -410,12 +410,12 @@ public ApplicationPlacementContext answer(InvocationOnMock invocation)
|
||||
new ClientToAMTokenSecretManagerInRM(), newMockRMContext.getScheduler(),
|
||||
masterService, new ApplicationACLsManager(conf), conf);
|
||||
|
||||
//only user test has permission to submit to 'test' queue
|
||||
//only user test has permission to submit to 'user1' queue
|
||||
newAppMonitor.submitApplication(asContext, "user1");
|
||||
|
||||
try {
|
||||
//should fail since user does not have permission to submit to queue
|
||||
// 'test'
|
||||
// 'managedparent'
|
||||
asContext.setApplicationId(appId = MockApps.newAppID(2));
|
||||
newAppMonitor.submitApplication(asContext, "user2");
|
||||
} catch (YarnException e) {
|
||||
|
@ -85,5 +85,26 @@ public void testMapping() throws YarnException {
|
||||
// if overwritten not specified, it should be which user specified
|
||||
verifyQueueMapping(new QueueMapping(MappingType.USER, "user", "q1"),
|
||||
"user", "q2", "q2", false);
|
||||
|
||||
// if overwritten not specified, it should be which user specified
|
||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup",
|
||||
"%user", "usergroup"),
|
||||
"user", "default", "user", false);
|
||||
|
||||
// if overwritten not specified, it should be which user specified
|
||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "usergroup",
|
||||
"%user", "usergroup"),
|
||||
"user", "agroup", "user", true);
|
||||
|
||||
//If user specific queue is enabled for a specified group under a given
|
||||
// parent queue
|
||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup",
|
||||
"%user", "parent1"),
|
||||
"a", "a");
|
||||
|
||||
//If user specific queue is enabled for a specified group without parent
|
||||
// queue
|
||||
verifyQueueMapping(new QueueMapping(MappingType.GROUP, "agroup", "%user"),
|
||||
"a", "a");
|
||||
}
|
||||
}
|
||||
|
@ -43,8 +43,11 @@
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.net.NetworkTopology;
|
||||
import org.apache.hadoop.security.Credentials;
|
||||
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
||||
import org.apache.hadoop.security.TestGroupsCaching;
|
||||
import org.apache.hadoop.security.UserGroupInformation;
|
||||
import org.apache.hadoop.security.token.Token;
|
||||
import org.apache.hadoop.security.token.TokenIdentifier;
|
||||
@ -102,6 +105,9 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.TestAMAuthorization.MyContainerManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||
.UserGroupMappingPlacementRule;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppMetrics;
|
||||
@ -4722,6 +4728,34 @@ public void testAMLimitDouble() throws Exception {
|
||||
rm.stop();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testQueueMappingWithCurrentUserQueueMappingForaGroup() throws
|
||||
Exception {
|
||||
|
||||
CapacitySchedulerConfiguration config =
|
||||
new CapacitySchedulerConfiguration();
|
||||
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
setupQueueConfiguration(config);
|
||||
|
||||
config.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class);
|
||||
config.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
|
||||
"a1" +"=" + "agroup" + "");
|
||||
config.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
||||
"g:agroup:%user");
|
||||
|
||||
MockRM rm = new MockRM(config);
|
||||
rm.start();
|
||||
CapacityScheduler cs = ((CapacityScheduler) rm.getResourceScheduler());
|
||||
cs.start();
|
||||
|
||||
RMApp app = rm.submitApp(GB, "appname", "a1", null, "default");
|
||||
List<ApplicationAttemptId> appsInA1 = cs.getAppsInQueue("a1");
|
||||
assertEquals(1, appsInA1.size());
|
||||
}
|
||||
|
||||
@Test(timeout = 30000)
|
||||
public void testcheckAndGetApplicationLifetime() throws Exception {
|
||||
long maxLifetime = 10;
|
||||
|
@ -22,6 +22,10 @@
|
||||
import org.apache.commons.lang.math.RandomUtils;
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.security.GroupMappingServiceProvider;
|
||||
import org.apache.hadoop.security.ShellBasedUnixGroupsMapping;
|
||||
import org.apache.hadoop.security.TestGroupsCaching;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
@ -58,6 +62,8 @@
|
||||
.AppAttemptAddedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event
|
||||
.SchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair
|
||||
.SimpleGroupsMapping;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
import org.junit.After;
|
||||
@ -127,6 +133,8 @@ public class TestCapacitySchedulerAutoCreatedQueueBase {
|
||||
public static final int NODE2_VCORES = 32;
|
||||
public static final int NODE3_VCORES = 48;
|
||||
|
||||
public static final String TEST_GROUP = "testusergroup";
|
||||
public static final String TEST_GROUPUSER = "testuser";
|
||||
public static final String USER = "user_";
|
||||
public static final String USER0 = USER + 0;
|
||||
public static final String USER1 = USER + 1;
|
||||
@ -256,6 +264,35 @@ public static CapacitySchedulerConfiguration setupQueueMappings(
|
||||
return conf;
|
||||
}
|
||||
|
||||
public static CapacitySchedulerConfiguration setupGroupQueueMappings
|
||||
(String parentQueue, CapacitySchedulerConfiguration conf, String
|
||||
leafQueueName) {
|
||||
|
||||
List<UserGroupMappingPlacementRule.QueueMapping> existingMappings =
|
||||
conf.getQueueMappings();
|
||||
|
||||
//set queue mapping
|
||||
List<UserGroupMappingPlacementRule.QueueMapping> queueMappings =
|
||||
new ArrayList<>();
|
||||
|
||||
//setup group mapping
|
||||
conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING,
|
||||
TestGroupsCaching.FakeunPrivilegedGroupMapping.class, ShellBasedUnixGroupsMapping.class);
|
||||
conf.set(CommonConfigurationKeys.HADOOP_USER_GROUP_STATIC_OVERRIDES,
|
||||
TEST_GROUPUSER +"=" + TEST_GROUP + ";invalid_user=invalid_group");
|
||||
|
||||
UserGroupMappingPlacementRule.QueueMapping userQueueMapping =
|
||||
new UserGroupMappingPlacementRule.QueueMapping(
|
||||
UserGroupMappingPlacementRule.QueueMapping.MappingType.GROUP,
|
||||
TEST_GROUP,
|
||||
getQueueMapping(parentQueue, leafQueueName));
|
||||
|
||||
queueMappings.add(userQueueMapping);
|
||||
existingMappings.addAll(queueMappings);
|
||||
conf.setQueueMappings(existingMappings);
|
||||
return conf;
|
||||
}
|
||||
|
||||
/**
|
||||
* @param conf, to be modified
|
||||
* @return, CS configuration which has C as an auto creation enabled parent
|
||||
@ -505,15 +542,17 @@ protected void validateUserAndAppLimits(
|
||||
}
|
||||
|
||||
protected void validateInitialQueueEntitlement(CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity)
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
Set<String> nodeLabels)
|
||||
throws SchedulerDynamicEditException {
|
||||
validateInitialQueueEntitlement(cs, parentQueue, leafQueueName,
|
||||
expectedTotalChildQueueAbsCapacity);
|
||||
expectedTotalChildQueueAbsCapacity, nodeLabels);
|
||||
}
|
||||
|
||||
protected void validateInitialQueueEntitlement(
|
||||
CapacityScheduler capacityScheduler, CSQueue parentQueue,
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity)
|
||||
String leafQueueName, float expectedTotalChildQueueAbsCapacity,
|
||||
Set<String> nodeLabels)
|
||||
throws SchedulerDynamicEditException {
|
||||
ManagedParentQueue autoCreateEnabledParentQueue =
|
||||
(ManagedParentQueue) parentQueue;
|
||||
@ -532,7 +571,7 @@ protected void validateInitialQueueEntitlement(
|
||||
QueueCapacities cap = autoCreateEnabledParentQueue.getLeafQueueTemplate()
|
||||
.getQueueCapacities();
|
||||
|
||||
for (String label : accessibleNodeLabelsOnC) {
|
||||
for (String label : nodeLabels) {
|
||||
validateCapacitiesByLabel(autoCreateEnabledParentQueue, leafQueue, label);
|
||||
|
||||
QueueEntitlement expectedEntitlement = new QueueEntitlement(
|
||||
|
@ -70,9 +70,13 @@
|
||||
import org.junit.Test;
|
||||
|
||||
import java.io.IOException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
|
||||
import static org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager
|
||||
.NO_LABEL;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement
|
||||
.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
@ -118,13 +122,26 @@ public void testAutoCreateLeafQueueCreation() throws Exception {
|
||||
ManagedParentQueue parentQueue = (ManagedParentQueue) cs.getQueue(
|
||||
PARENT_QUEUE);
|
||||
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.1f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.1f, accessibleNodeLabelsOnC);
|
||||
validateUserAndAppLimits(autoCreatedLeafQueue, 1000, 1000);
|
||||
|
||||
assertTrue(autoCreatedLeafQueue
|
||||
.getOrderingPolicy() instanceof FairOrderingPolicy);
|
||||
|
||||
setupGroupQueueMappings("d", cs.getConfiguration(), "%user");
|
||||
cs.reinitialize(cs.getConfiguration(), mockRM.getRMContext());
|
||||
|
||||
submitApp(mockRM, cs.getQueue("d"), TEST_GROUPUSER, TEST_GROUPUSER, 1, 1);
|
||||
autoCreatedLeafQueue =
|
||||
(AutoCreatedLeafQueue) cs.getQueue(TEST_GROUPUSER);
|
||||
parentQueue = (ManagedParentQueue) cs.getQueue("d");
|
||||
assertEquals(parentQueue, autoCreatedLeafQueue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, TEST_GROUPUSER, 0.02f,
|
||||
new HashSet<String>() {{ add(NO_LABEL); }});
|
||||
|
||||
} finally {
|
||||
cleanupQueue(USER0);
|
||||
cleanupQueue(TEST_GROUPUSER);
|
||||
}
|
||||
}
|
||||
|
||||
@ -158,8 +175,8 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
|
||||
PARENT_QUEUE);
|
||||
assertEquals(parentQueue, user0Queue.getParent());
|
||||
assertEquals(parentQueue, user1Queue.getParent());
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.2f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.2f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER0, 0.2f, accessibleNodeLabelsOnC);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
ApplicationAttemptId appAttemptId = appsInC.get(0);
|
||||
|
||||
@ -200,7 +217,7 @@ public void testReinitializeStoppedAutoCreatedLeafQueue() throws Exception {
|
||||
AutoCreatedLeafQueue leafQueue = (AutoCreatedLeafQueue) cs.getQueue(
|
||||
USER1);
|
||||
validateInitialQueueEntitlement(parentQueue, leafQueue.getQueueName(),
|
||||
0.1f);
|
||||
0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
} finally {
|
||||
cleanupQueue(USER0);
|
||||
@ -482,12 +499,12 @@ public void testAutoCreatedQueueActivationDeactivation() throws Exception {
|
||||
|
||||
//submit app1 as USER1
|
||||
submitApp(mockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app3 as USER1
|
||||
submitApp(mockRM, parentQueue, USER1, USER1, 3, 2);
|
||||
@ -548,12 +565,12 @@ public void testClusterResourceUpdationOnAutoCreatedLeafQueues() throws
|
||||
|
||||
//submit app1 as USER1
|
||||
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
CSQueue user1LeafQueue = newCS.getQueue(USER1);
|
||||
|
||||
//submit another app2 as USER2
|
||||
submitApp(newMockRM, parentQueue, USER2, USER2, 2, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
CSQueue user2LeafQueue = newCS.getQueue(USER2);
|
||||
|
||||
//validate total activated abs capacity remains the same
|
||||
@ -642,7 +659,7 @@ public void testAutoCreatedQueueInheritsNodeLabels() throws Exception {
|
||||
|
||||
submitApp(USER1, USER1, NODEL_LABEL_GPU);
|
||||
//submit app1 as USER1
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
} finally {
|
||||
cleanupQueue(USER1);
|
||||
}
|
||||
@ -662,12 +679,12 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues()
|
||||
|
||||
//submit app1 as USER1
|
||||
submitApp(newMockRM, parentQueue, USER1, USER1, 1, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(newMockRM, parentQueue, USER2, USER2,
|
||||
2, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
//update parent queue capacity
|
||||
conf.setCapacity(C, 30f);
|
||||
@ -692,7 +709,7 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues()
|
||||
|
||||
//submit app1 as USER3
|
||||
submitApp(newMockRM, parentQueue, USER3, USER3, 3, 1);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
|
||||
AutoCreatedLeafQueue user3Queue = (AutoCreatedLeafQueue) newCS.getQueue(
|
||||
USER1);
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
||||
@ -701,7 +718,7 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues()
|
||||
//submit app1 as USER1 - is already activated. there should be no diff
|
||||
// in capacities
|
||||
submitApp(newMockRM, parentQueue, USER3, USER3, 4, 2);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f);
|
||||
validateInitialQueueEntitlement(newCS, parentQueue, USER3, 0.27f, accessibleNodeLabelsOnC);
|
||||
validateCapacities(user3Queue, 0.3f, 0.09f, 0.4f, 0.2f);
|
||||
validateUserAndAppLimits(user3Queue, 900, 900);
|
||||
|
||||
|
@ -59,12 +59,12 @@ public void testEditSchedule() throws Exception {
|
||||
//submit app1 as USER1
|
||||
ApplicationId user1AppId = submitApp(mockRM, parentQueue, USER1, USER1, 1,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER1, 0.1f, accessibleNodeLabelsOnC);
|
||||
|
||||
//submit another app2 as USER2
|
||||
ApplicationId user2AppId = submitApp(mockRM, parentQueue, USER2, USER2, 2,
|
||||
1);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f);
|
||||
validateInitialQueueEntitlement(parentQueue, USER2, 0.2f, accessibleNodeLabelsOnC);
|
||||
|
||||
//validate total activated abs capacity
|
||||
assertEquals(0.2f, autoCreatedQueueManagementPolicy
|
||||
|
Loading…
Reference in New Issue
Block a user