diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 5d7cd985ef..dea4c0a83e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -1859,6 +1859,14 @@ public static boolean isAclEnabled(Configuration conf) { "container-monitor.procfs-tree.smaps-based-rss.enabled"; public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = false; + private static final String APPLICATION_TAG_BASED_PLACEMENT_PREFIX = + "application-tag-based-placement"; + public static final String APPLICATION_TAG_BASED_PLACEMENT_ENABLED = + APPLICATION_TAG_BASED_PLACEMENT_PREFIX + ".enable"; + public static final boolean DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED = + false; + public static final String APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST = + APPLICATION_TAG_BASED_PLACEMENT_PREFIX + ".username.whitelist"; /** Enable/disable container metrics. */ @Private diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java index b30224e78f..440d731b4e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/records/impl/pb/ApplicationSubmissionContextPBImpl.java @@ -21,11 +21,11 @@ import java.util.ArrayList; import java.util.Collections; import java.util.HashMap; -import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceStability.Unstable; @@ -247,7 +247,7 @@ private void initApplicationTags() { return; } ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; - this.applicationTags = new HashSet(); + this.applicationTags = new TreeSet<>(); this.applicationTags.addAll(p.getApplicationTagsList()); } @@ -305,7 +305,7 @@ public synchronized void setApplicationTags(Set tags) { } checkTags(tags); // Convert applicationTags to lower case and add - this.applicationTags = new HashSet(); + this.applicationTags = new TreeSet<>(); for (String tag : tags) { this.applicationTags.add(StringUtils.toLowerCase(tag)); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 11d712340e..0e8063e9fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -1288,6 +1288,26 @@ false + + + Whether to enable application placement based on user ID passed via + application tags. When it is enabled, u=<userId> pattern will be checked + and if found, the application will be placed onto the found user's queue, + if the original user has enough rights on the passed user's queue. + + application-tag-based-placement.enable + false + + + + + Comma separated list of users who can use the application tag based + placement, if it is enabled. + + application-tag-based-placement.username.whitelist + + + How long to keep aggregation logs before deleting them. -1 disables. Be careful set this too small and you will spam the name node. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java index f4f97936c5..30c5a8dbec 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMAppManager.java @@ -889,7 +889,10 @@ ApplicationPlacementContext placeApplication( ApplicationPlacementContext placementContext = null; if (placementManager != null) { try { - placementContext = placementManager.placeApplication(context, user); + String usernameUsedForPlacement = + getUserNameForPlacement(user, context, placementManager); + placementContext = placementManager + .placeApplication(context, usernameUsedForPlacement); } catch (YarnException e) { // Placement could also fail if the user doesn't exist in system // skip if the user is not found during recovery. @@ -916,6 +919,80 @@ ApplicationPlacementContext placeApplication( return placementContext; } + @VisibleForTesting + protected String getUserNameForPlacement(final String user, + final ApplicationSubmissionContext context, + final PlacementManager placementManager) throws YarnException { + + boolean applicationTagBasedPlacementEnabled = conf + .getBoolean(YarnConfiguration.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, + YarnConfiguration.DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED); + + String usernameUsedForPlacement = user; + if (!applicationTagBasedPlacementEnabled) { + return usernameUsedForPlacement; + } + if (!isWhitelistedUser(user, conf)) { + LOG.warn("User '{}' is not allowed to do placement based " + + "on application tag", user); + return usernameUsedForPlacement; + } + LOG.debug("Application tag based placement is enabled, checking for " + + "userId in the application tag"); + Set applicationTags = context.getApplicationTags(); + String userNameFromAppTag = getUserNameFromApplicationTag(applicationTags); + if (userNameFromAppTag != null) { + LOG.debug("Found userId '{}' in application tag", userNameFromAppTag); + UserGroupInformation callerUGI = UserGroupInformation + .createRemoteUser(userNameFromAppTag); + // check if the actual user has rights to submit application to the + // user's queue from the application tag + String queue = placementManager + .placeApplication(context, usernameUsedForPlacement).getQueue(); + if (callerUGI != null && scheduler + .checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) { + usernameUsedForPlacement = userNameFromAppTag; + } else { + LOG.warn("User '{}' from application tag does not have access to " + + " queue '{}'. " + "The placement is done for user '{}'", + userNameFromAppTag, queue, user); + } + } else { + LOG.warn("userId was not found in application tags"); + } + return usernameUsedForPlacement; + } + + private boolean isWhitelistedUser(final String user, + final Configuration config) { + String[] userWhitelist = config.getStrings(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST); + if (userWhitelist == null || userWhitelist.length == 0) { + return false; + } + for (String s: userWhitelist) { + if (s.equals(user)) { + return true; + } + } + return false; + } + + private String getUserNameFromApplicationTag(Set applicationTags) { + String userIdPrefix = "u="; + for (String tag: applicationTags) { + if (tag.startsWith(userIdPrefix)) { + String[] userIdTag = tag.split("="); + if (userIdTag.length == 2) { + return userIdTag[1]; + } else { + LOG.warn("Found wrongly qualified username in tag"); + } + } + } + return null; + } + private void copyPlacementQueueToSubmissionContext( ApplicationPlacementContext placementContext, ApplicationSubmissionContext context) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java index 36258b431f..63b4e44f0e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/AppManagerTestBase.java @@ -29,6 +29,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; @@ -103,5 +104,11 @@ public void submitApplication( super.submitApplication(submissionContext, System.currentTimeMillis(), user); } + + public String getUserNameForPlacement(final String user, + final ApplicationSubmissionContext context, + final PlacementManager placementManager) throws YarnException { + return super.getUserNameForPlacement(user, context, placementManager); + } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java index 906f1162b3..07dea743be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestAppManager.java @@ -22,6 +22,7 @@ import com.google.common.collect.Lists; import com.google.common.collect.Maps; import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.api.records.QueueACL; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -94,12 +95,14 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.TreeSet; import java.util.concurrent.ConcurrentMap; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.anyString; +import static org.mockito.ArgumentMatchers.eq; import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.matches; import static org.mockito.Mockito.doAnswer; @@ -1197,4 +1200,191 @@ private static List cloneResourceRequests( return cloneReqs; } + @Test + public void testGetUserNameForPlacementTagBasedPlacementDisabled() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + verifyPlacementUsername(expectedQueue, user, user); + } + + @Test + /** + * Test case for when the application tag based placement is enabled and + * the submitting user 'user1' is whitelisted and the user from the + * application tag has access to queue. + * Expected behaviour: the placement is done for user from the tag 'user2' + */ + public void testGetUserNameForPlacementTagBasedPlacementEnabled() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String expectedUser = "user2"; + String userIdTag = "u=" + expectedUser; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(true, user); + verifyPlacementUsername(expectedQueue, user, expectedUser); + } + + @Test + /** + * Test case for when the application tag based placement is enabled and + * the submitting user 'user1' is whitelisted and there are multiple valid + * username tags passed + * Expected behaviour: the placement is done for the first valid username + * from the tag 'user2' + */ + public void testGetUserNameForPlacementTagBasedPlacementMultipleUserIds() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String expectedUser = "user2"; + String userIdTag = "u=" + expectedUser; + String userIdTag2 = "u=user3"; + setApplicationTags("tag1", userIdTag, "tag2", userIdTag2); + enableApplicationTagPlacement(true, user); + verifyPlacementUsername(expectedQueue, user, expectedUser); + } + + @Test + /** + * Test case for when the application tag based placement is enabled but + * no username is set in the application tag + * Expected behaviour: the placement is done for the submitting user 'user1' + */ + public void testGetUserNameForPlacementTagBasedPlacementNoUserId() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + setApplicationTags("tag1", "tag2"); + enableApplicationTagPlacement(true, user); + verifyPlacementUsername(expectedQueue, user, user); + } + + @Test + /** + * Test case for when the application tag based placement is enabled but + * the user from the application tag 'user2' does not have access to the + * queue. + * Expected behaviour: the placement is done for the submitting user 'user1' + */ + public void testGetUserNameForPlacementUserWithoutAccessToQueue() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(false, user); + verifyPlacementUsername(expectedQueue, user, user); + } + + @Test + /** + * Test case for when the application tag based placement is enabled but + * the submitting user 'user1' is not whitelisted and there is a valid + * username tag passed. + * Expected behaviour: the placement is done for the submitting user 'user1' + */ + public void testGetUserNameForPlacementNotWhitelistedUser() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(true, "someUser"); + verifyPlacementUsername(expectedQueue, user, user); + } + + @Test + /** + * Test case for when the application tag based placement is enabled but + * there is no whitelisted user. + * Expected behaviour: the placement is done for the submitting user 'user1' + */ + public void testGetUserNameForPlacementEmptyWhiteList() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String userIdTag = "u=user2"; + setApplicationTags("tag1", userIdTag, "tag2"); + enableApplicationTagPlacement(false); + verifyPlacementUsername(expectedQueue, user, user); + } + + @Test + /** + * Test case for when the application tag based placement is enabled and + * there is one wrongly qualified user 'u=' and a valid user 'u=user2' passed + * via application tag. + * Expected behaviour: the placement is done for the first valid username + * from the tag 'user2' + */ + public void testGetUserNameForPlacementWronglyQualifiedFirstUserNameInTag() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String expectedUser = "user2"; + String userIdTag = "u=" + expectedUser; + String wrongUserIdTag = "u="; + setApplicationTags("tag1", wrongUserIdTag, userIdTag, "tag2"); + enableApplicationTagPlacement(true, user); + verifyPlacementUsername(expectedQueue, user, expectedUser); + } + + @Test + /** + * Test case for when the application tag based placement is enabled and + * there is only one wrongly qualified user 'u=' passed via application tag. + * Expected behaviour: the placement is done for the submitting user 'user1' + */ + public void testGetUserNameForPlacementWronglyQualifiedUserNameInTag() + throws YarnException { + String user = "user1"; + String expectedQueue = "user1Queue"; + String wrongUserIdTag = "u="; + setApplicationTags("tag1", wrongUserIdTag, "tag2"); + enableApplicationTagPlacement(true, user); + verifyPlacementUsername(expectedQueue, user, user); + } + + private void enableApplicationTagPlacement(boolean userHasAccessToQueue, + String... whiteListedUsers) { + Configuration conf = new Configuration(); + conf.setBoolean(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true); + conf.setStrings(YarnConfiguration + .APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers); + ((RMContextImpl) rmContext).setYarnConfiguration(conf); + ResourceScheduler scheduler = mockResourceScheduler(); + when(scheduler.checkAccess(any(UserGroupInformation.class), + eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class))) + .thenReturn(userHasAccessToQueue); + ApplicationMasterService masterService = + new ApplicationMasterService(rmContext, scheduler); + appMonitor = new TestRMAppManager(rmContext, + new ClientToAMTokenSecretManagerInRM(), + scheduler, masterService, + new ApplicationACLsManager(conf), conf); + } + + private void verifyPlacementUsername(final String queue, + final String submittingUser, final String expectedUser) + throws YarnException { + PlacementManager placementMgr = mock(PlacementManager.class); + ApplicationPlacementContext appContext + = new ApplicationPlacementContext(queue); + when(placementMgr.placeApplication(asContext, submittingUser)) + .thenReturn(appContext); + String userNameForPlacement = appMonitor + .getUserNameForPlacement(submittingUser, asContext, placementMgr); + Assert.assertEquals(expectedUser, userNameForPlacement); + } + + private void setApplicationTags(String... tags) { + Set applicationTags = new TreeSet<>(); + Collections.addAll(applicationTags, tags); + asContext.setApplicationTags(applicationTags); + } }