diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index 1e62f250ab..fe18d8252d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -927,6 +927,10 @@ protected String getUserNameForPlacement(final String user, return usernameUsedForPlacement; } String queue = appPlacementContext.getQueue(); + String parent = appPlacementContext.getParentQueue(); + if (scheduler instanceof CapacityScheduler && parent != null) { + queue = parent + "." + queue; + } if (callerUGI != null && scheduler .checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) { usernameUsedForPlacement = userNameFromAppTag; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index a632bfa765..259cd5c3ce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2290,6 +2290,21 @@ private void markContainerForNonKillable( public boolean checkAccess(UserGroupInformation callerUGI, QueueACL acl, String queueName) { CSQueue queue = getQueue(queueName); + + if (queueName.startsWith("root.")) { + // can only check proper ACLs if the path is fully qualified + while (queue == null) { + int sepIndex = queueName.lastIndexOf("."); + String parentName = queueName.substring(0, sepIndex); + if (LOG.isDebugEnabled()) { + LOG.debug("Queue {} does not exist, checking parent {}", + queueName, parentName); + } + queueName = parentName; + queue = queueManager.getQueue(queueName); + } + } + if (queue == null) { LOG.debug("ACL not found for queue access-type {} for queue {}", acl, queueName); @@ -3307,4 +3322,9 @@ public boolean placementConstraintEnabled() { public void setActivitiesManagerEnabled(boolean enabled) { this.activitiesManagerEnabled = enabled; } + + @VisibleForTesting + public void setQueueManager(CapacitySchedulerQueueManager qm) { + this.queueManager = qm; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index bc8991787c..e8b4105e9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -84,11 +84,17 @@ import org.junit.After; import org.junit.Assert; import org.junit.Before; +import org.junit.Rule; import org.junit.Test; +import org.junit.rules.TestWatcher; +import org.junit.runner.Description; +import org.mockito.ArgumentCaptor; import org.mockito.invocation.InvocationOnMock; import org.mockito.stubbing.Answer; import java.io.IOException; +import java.lang.annotation.Retention; +import java.lang.annotation.RetentionPolicy; import java.nio.ByteBuffer; import java.util.ArrayList; import java.util.Collections; @@ -102,6 +108,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.junit.Assert.assertTrue; +import static org.junit.Assert.assertEquals; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.eq; @@ -120,12 +127,16 @@ */ public class TestAppManager extends AppManagerTestBase{ + @Rule + public UseCapacitySchedulerRule shouldUseCs = new UseCapacitySchedulerRule(); + private static final Logger LOG = LoggerFactory.getLogger(TestAppManager.class); private static RMAppEventType appEventType = RMAppEventType.KILL; private static String USER = "user_"; private static String USER0 = USER + 0; + private ResourceScheduler scheduler; private static final String USER_ID_PREFIX = "userid="; @@ -227,7 +238,13 @@ public void setUp() throws IOException { rmContext = mockRMContext(1, now - 10); rmContext .setRMTimelineCollectorManager(mock(RMTimelineCollectorManager.class)); - ResourceScheduler scheduler = mockResourceScheduler(); + + if (shouldUseCs.useCapacityScheduler()) { + scheduler = mockResourceScheduler(CapacityScheduler.class); + } else { + scheduler = mockResourceScheduler(); + } + ((RMContextImpl)rmContext).setScheduler(scheduler); Configuration conf = new Configuration(); @@ -880,7 +897,7 @@ public void testRMAppSubmitMaxAppAttempts() throws Exception { new int[]{ 1, 1, 1, 1 }}; for (int i = 0; i < globalMaxAppAttempts.length; ++i) { for (int j = 0; j < individualMaxAppAttempts.length; ++j) { - ResourceScheduler scheduler = mockResourceScheduler(); + scheduler = mockResourceScheduler(); Configuration conf = new Configuration(); conf.setInt(YarnConfiguration.GLOBAL_RM_AM_MAX_ATTEMPTS, globalMaxAppAttempts[i]); @@ -1061,7 +1078,12 @@ public ApplicationPlacementContext answer(InvocationOnMock invocation) } private static ResourceScheduler mockResourceScheduler() { - ResourceScheduler scheduler = mock(ResourceScheduler.class); + return mockResourceScheduler(ResourceScheduler.class); + } + + private static ResourceScheduler + mockResourceScheduler(Class schedulerClass) { + ResourceScheduler scheduler = mock(schedulerClass); when(scheduler.getMinimumResourceCapability()).thenReturn( Resources.createResource( YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB)); @@ -1299,6 +1321,51 @@ public void testGetUserNameForPlacementNoRuleDefined() Assert.assertEquals(expectedUser, userNameForPlacement); } + @Test + @UseMockCapacityScheduler + public void testCheckAccessFullPathWithCapacityScheduler() + throws YarnException { + // make sure we only combine "parent + queue" if CS is selected + testCheckAccess("root.users", "hadoop"); + } + + @Test + @UseMockCapacityScheduler + public void testCheckAccessLeafQueueOnlyWithCapacityScheduler() + throws YarnException { + // make sure we that NPE is avoided if there's no parent defined + testCheckAccess(null, "hadoop"); + } + + private void testCheckAccess(String parent, String queue) + throws YarnException { + enableApplicationTagPlacement(true, "hadoop"); + String userIdTag = USER_ID_PREFIX + "hadoop"; + setApplicationTags("tag1", userIdTag, "tag2"); + PlacementManager placementMgr = mock(PlacementManager.class); + ApplicationPlacementContext appContext; + String expectedQueue; + if (parent == null) { + appContext = new ApplicationPlacementContext(queue); + expectedQueue = queue; + } else { + appContext = new ApplicationPlacementContext(queue, parent); + expectedQueue = parent + "." + queue; + } + + when(placementMgr.placeApplication(asContext, "hadoop")) + .thenReturn(appContext); + appMonitor.getUserNameForPlacement("hadoop", asContext, placementMgr); + + ArgumentCaptor queueNameCaptor = + ArgumentCaptor.forClass(String.class); + verify(scheduler).checkAccess(any(UserGroupInformation.class), + any(QueueACL.class), queueNameCaptor.capture()); + + assertEquals("Expected access check for queue", + expectedQueue, queueNameCaptor.getValue()); + } + private void enableApplicationTagPlacement(boolean userHasAccessToQueue, String... whiteListedUsers) { Configuration conf = new Configuration(); @@ -1307,7 +1374,6 @@ private void enableApplicationTagPlacement(boolean userHasAccessToQueue, conf.setStrings(YarnConfiguration .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers); ((RMContextImpl) rmContext).setYarnConfiguration(conf); - ResourceScheduler scheduler = mockResourceScheduler(); when(scheduler.checkAccess(any(UserGroupInformation.class), eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class))) .thenReturn(userHasAccessToQueue); @@ -1338,4 +1404,24 @@ private void setApplicationTags(String... tags) { Collections.addAll(applicationTags, tags); asContext.setApplicationTags(applicationTags); } + + private class UseCapacitySchedulerRule extends TestWatcher { + private boolean useCapacityScheduler; + + @Override + protected void starting(Description d) { + useCapacityScheduler = + d.getAnnotation(UseMockCapacityScheduler.class) != null; + } + + public boolean useCapacityScheduler() { + return useCapacityScheduler; + } + } + + @Retention(RetentionPolicy.RUNTIME) + public @interface UseMockCapacityScheduler { + // mark test cases with this which require + // the scheduler type to be CapacityScheduler + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java index caf1df4d02..596cca1402 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerAutoQueueCreation.java @@ -19,22 +19,27 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.apache.hadoop.thirdparty.com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; +import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmissionData; import org.apache.hadoop.yarn.server.resourcemanager.MockRMAppSubmitter; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.NullRMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement .ApplicationPlacementContext; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; @@ -84,7 +89,6 @@ import static org.apache.hadoop.yarn.server.resourcemanager.placement.UserGroupMappingPlacementRule.CURRENT_USER_MAPPING; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CSQueueUtils.EPSILON; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; import static org.junit.Assert.assertTrue; @@ -839,4 +843,62 @@ public void testReinitializeQueuesWithAutoCreatedLeafQueues() } } } + + @Test + public void testDynamicAutoQueueCreationWithTags() + throws Exception { + MockRM rm = null; + try { + CapacitySchedulerConfiguration csConf + = new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + csConf.setCapacity("root.a", 90); + csConf.setCapacity("root.b", 10); + csConf.setAutoCreateChildQueueEnabled("root.a", true); + csConf.setAutoCreatedLeafQueueConfigCapacity("root.a", 50); + csConf.setAutoCreatedLeafQueueConfigMaxCapacity("root.a", 100); + csConf.setAcl("root.a", QueueACL.ADMINISTER_QUEUE, "*"); + csConf.setAcl("root.a", QueueACL.SUBMIT_APPLICATIONS, "*"); + csConf.setBoolean(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true); + csConf.setStrings(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, "hadoop"); + csConf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING, + "u:%user:root.a.%user"); + + RMNodeLabelsManager mgr = new NullRMNodeLabelsManager(); + mgr.init(csConf); + rm = new MockRM(csConf) { + @Override + public RMNodeLabelsManager createNodeLabelManager() { + return mgr; + } + }; + rm.start(); + MockNM nm = rm.registerNode("127.0.0.1:1234", 16 * GB); + + MockRMAppSubmissionData data = + MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) + .withAppName("apptodynamicqueue") + .withUser("hadoop") + .withAcls(null) + .withUnmanagedAM(false) + .withApplicationTags(Sets.newHashSet("userid=testuser")) + .build(); + RMApp app = MockRMAppSubmitter.submit(rm, data); + MockRM.launchAndRegisterAM(app, rm, nm); + nm.nodeHeartbeat(true); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue queue = cs.getQueue("root.a.testuser"); + assertNotNull("Leaf queue has not been auto-created", queue); + assertEquals("Number of running applications", 1, + queue.getNumApplications()); + } finally { + if (rm != null) { + rm.close(); + } + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java index 9eeb9b4acf..bb5b790fe2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerQueueACLs.java @@ -17,15 +17,22 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import static org.junit.Assert.assertEquals; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + import java.io.IOException; import java.util.HashMap; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.QueueACLsTestBase; +import org.junit.Test; public class TestCapacitySchedulerQueueACLs extends QueueACLsTestBase { @Override @@ -132,6 +139,7 @@ public void updateConfigWithDAndD1Queues(String rootAcl, String queueDAcl, .reinitialize(csConf, resourceManager.getRMContext()); } + private void setQueueCapacity(CapacitySchedulerConfiguration csConf, float capacity, String queuePath) { csConf.setCapacity(queuePath, capacity); @@ -142,4 +150,38 @@ private void setAdminAndSubmitACL(CapacitySchedulerConfiguration csConf, csConf.setAcl(queuePath, QueueACL.ADMINISTER_QUEUE, queueAcl); csConf.setAcl(queuePath, QueueACL.SUBMIT_APPLICATIONS, queueAcl); } + + @Test + public void testCheckAccessForUserWithOnlyLeafNameProvided() { + testCheckAccess(false, "dynamicQueue"); + } + + @Test + public void testCheckAccessForUserWithFullPathProvided() { + testCheckAccess(true, "root.users.dynamicQueue"); + } + + @Test + public void testCheckAccessForRootQueue() { + testCheckAccess(false, "root"); + } + + private void testCheckAccess(boolean expectedResult, String queueName) { + CapacitySchedulerQueueManager qm = + mock(CapacitySchedulerQueueManager.class); + CSQueue root = mock(ParentQueue.class); + CSQueue users = mock(ManagedParentQueue.class); + when(qm.getQueue("root")).thenReturn(root); + when(qm.getQueue("root.users")).thenReturn(users); + when(users.hasAccess(any(QueueACL.class), + any(UserGroupInformation.class))).thenReturn(true); + UserGroupInformation mockUGI = mock(UserGroupInformation.class); + + CapacityScheduler cs = + (CapacityScheduler) resourceManager.getResourceScheduler(); + cs.setQueueManager(qm); + + assertEquals("checkAccess() failed", expectedResult, + cs.checkAccess(mockUGI, QueueACL.ADMINISTER_QUEUE, queueName)); + } }