YARN-9886. Queue mapping based on userid passed through application tag. Contributed by Julia Kinga Marton

This commit is contained in:
Szilard Nemeth 2019-11-19 21:14:15 +01:00
parent ea8ffac121
commit dfdc6d6dd9
6 changed files with 306 additions and 4 deletions

View File

@ -1859,6 +1859,14 @@ public static boolean isAclEnabled(Configuration conf) {
"container-monitor.procfs-tree.smaps-based-rss.enabled"; "container-monitor.procfs-tree.smaps-based-rss.enabled";
public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED = public static final boolean DEFAULT_PROCFS_USE_SMAPS_BASED_RSS_ENABLED =
false; false;
private static final String APPLICATION_TAG_BASED_PLACEMENT_PREFIX =
"application-tag-based-placement";
public static final String APPLICATION_TAG_BASED_PLACEMENT_ENABLED =
APPLICATION_TAG_BASED_PLACEMENT_PREFIX + ".enable";
public static final boolean DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED =
false;
public static final String APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST =
APPLICATION_TAG_BASED_PLACEMENT_PREFIX + ".username.whitelist";
/** Enable/disable container metrics. */ /** Enable/disable container metrics. */
@Private @Private

View File

@ -21,11 +21,11 @@
import java.util.ArrayList; import java.util.ArrayList;
import java.util.Collections; import java.util.Collections;
import java.util.HashMap; import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Private;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
@ -247,7 +247,7 @@ private void initApplicationTags() {
return; return;
} }
ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder; ApplicationSubmissionContextProtoOrBuilder p = viaProto ? proto : builder;
this.applicationTags = new HashSet<String>(); this.applicationTags = new TreeSet<>();
this.applicationTags.addAll(p.getApplicationTagsList()); this.applicationTags.addAll(p.getApplicationTagsList());
} }
@ -305,7 +305,7 @@ public synchronized void setApplicationTags(Set<String> tags) {
} }
checkTags(tags); checkTags(tags);
// Convert applicationTags to lower case and add // Convert applicationTags to lower case and add
this.applicationTags = new HashSet<String>(); this.applicationTags = new TreeSet<>();
for (String tag : tags) { for (String tag : tags) {
this.applicationTags.add(StringUtils.toLowerCase(tag)); this.applicationTags.add(StringUtils.toLowerCase(tag));
} }

View File

@ -1288,6 +1288,26 @@
<value>false</value> <value>false</value>
</property> </property>
<property>
<description>
Whether to enable application placement based on user ID passed via
application tags. When it is enabled, u=&lt;userId&gt; pattern will be checked
and if found, the application will be placed onto the found user's queue,
if the original user has enough rights on the passed user's queue.
</description>
<name>application-tag-based-placement.enable</name>
<value>false</value>
</property>
<property>
<description>
Comma separated list of users who can use the application tag based
placement, if it is enabled.
</description>
<name>application-tag-based-placement.username.whitelist</name>
<value></value>
</property>
<property> <property>
<description>How long to keep aggregation logs before deleting them. -1 disables. <description>How long to keep aggregation logs before deleting them. -1 disables.
Be careful set this too small and you will spam the name node.</description> Be careful set this too small and you will spam the name node.</description>

View File

@ -889,7 +889,10 @@ ApplicationPlacementContext placeApplication(
ApplicationPlacementContext placementContext = null; ApplicationPlacementContext placementContext = null;
if (placementManager != null) { if (placementManager != null) {
try { try {
placementContext = placementManager.placeApplication(context, user); String usernameUsedForPlacement =
getUserNameForPlacement(user, context, placementManager);
placementContext = placementManager
.placeApplication(context, usernameUsedForPlacement);
} catch (YarnException e) { } catch (YarnException e) {
// Placement could also fail if the user doesn't exist in system // Placement could also fail if the user doesn't exist in system
// skip if the user is not found during recovery. // skip if the user is not found during recovery.
@ -916,6 +919,80 @@ ApplicationPlacementContext placeApplication(
return placementContext; return placementContext;
} }
@VisibleForTesting
protected String getUserNameForPlacement(final String user,
final ApplicationSubmissionContext context,
final PlacementManager placementManager) throws YarnException {
boolean applicationTagBasedPlacementEnabled = conf
.getBoolean(YarnConfiguration.APPLICATION_TAG_BASED_PLACEMENT_ENABLED,
YarnConfiguration.DEFAULT_APPLICATION_TAG_BASED_PLACEMENT_ENABLED);
String usernameUsedForPlacement = user;
if (!applicationTagBasedPlacementEnabled) {
return usernameUsedForPlacement;
}
if (!isWhitelistedUser(user, conf)) {
LOG.warn("User '{}' is not allowed to do placement based " +
"on application tag", user);
return usernameUsedForPlacement;
}
LOG.debug("Application tag based placement is enabled, checking for " +
"userId in the application tag");
Set<String> applicationTags = context.getApplicationTags();
String userNameFromAppTag = getUserNameFromApplicationTag(applicationTags);
if (userNameFromAppTag != null) {
LOG.debug("Found userId '{}' in application tag", userNameFromAppTag);
UserGroupInformation callerUGI = UserGroupInformation
.createRemoteUser(userNameFromAppTag);
// check if the actual user has rights to submit application to the
// user's queue from the application tag
String queue = placementManager
.placeApplication(context, usernameUsedForPlacement).getQueue();
if (callerUGI != null && scheduler
.checkAccess(callerUGI, QueueACL.SUBMIT_APPLICATIONS, queue)) {
usernameUsedForPlacement = userNameFromAppTag;
} else {
LOG.warn("User '{}' from application tag does not have access to " +
" queue '{}'. " + "The placement is done for user '{}'",
userNameFromAppTag, queue, user);
}
} else {
LOG.warn("userId was not found in application tags");
}
return usernameUsedForPlacement;
}
private boolean isWhitelistedUser(final String user,
final Configuration config) {
String[] userWhitelist = config.getStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST);
if (userWhitelist == null || userWhitelist.length == 0) {
return false;
}
for (String s: userWhitelist) {
if (s.equals(user)) {
return true;
}
}
return false;
}
private String getUserNameFromApplicationTag(Set<String> applicationTags) {
String userIdPrefix = "u=";
for (String tag: applicationTags) {
if (tag.startsWith(userIdPrefix)) {
String[] userIdTag = tag.split("=");
if (userIdTag.length == 2) {
return userIdTag[1];
} else {
LOG.warn("Found wrongly qualified username in tag");
}
}
}
return null;
}
private void copyPlacementQueueToSubmissionContext( private void copyPlacementQueueToSubmissionContext(
ApplicationPlacementContext placementContext, ApplicationPlacementContext placementContext,
ApplicationSubmissionContext context) { ApplicationSubmissionContext context) {

View File

@ -29,6 +29,7 @@
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext; import org.apache.hadoop.yarn.api.records.ApplicationSubmissionContext;
import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore; import org.apache.hadoop.yarn.server.resourcemanager.recovery.RMStateStore;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
@ -103,5 +104,11 @@ public void submitApplication(
super.submitApplication(submissionContext, System.currentTimeMillis(), super.submitApplication(submissionContext, System.currentTimeMillis(),
user); user);
} }
public String getUserNameForPlacement(final String user,
final ApplicationSubmissionContext context,
final PlacementManager placementManager) throws YarnException {
return super.getUserNameForPlacement(user, context, placementManager);
}
} }
} }

View File

@ -22,6 +22,7 @@
import com.google.common.collect.Lists; import com.google.common.collect.Lists;
import com.google.common.collect.Maps; import com.google.common.collect.Maps;
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.QueueACL;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
@ -94,12 +95,14 @@
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
import java.util.TreeSet;
import java.util.concurrent.ConcurrentMap; import java.util.concurrent.ConcurrentMap;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
import static org.junit.Assert.assertTrue; import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.any; import static org.mockito.ArgumentMatchers.any;
import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.ArgumentMatchers.eq;
import static org.mockito.ArgumentMatchers.isA; import static org.mockito.ArgumentMatchers.isA;
import static org.mockito.ArgumentMatchers.matches; import static org.mockito.ArgumentMatchers.matches;
import static org.mockito.Mockito.doAnswer; import static org.mockito.Mockito.doAnswer;
@ -1197,4 +1200,191 @@ private static List<ResourceRequest> cloneResourceRequests(
return cloneReqs; return cloneReqs;
} }
@Test
public void testGetUserNameForPlacementTagBasedPlacementDisabled()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String userIdTag = "u=user2";
setApplicationTags("tag1", userIdTag, "tag2");
verifyPlacementUsername(expectedQueue, user, user);
}
@Test
/**
* Test case for when the application tag based placement is enabled and
* the submitting user 'user1' is whitelisted and the user from the
* application tag has access to queue.
* Expected behaviour: the placement is done for user from the tag 'user2'
*/
public void testGetUserNameForPlacementTagBasedPlacementEnabled()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String expectedUser = "user2";
String userIdTag = "u=" + expectedUser;
setApplicationTags("tag1", userIdTag, "tag2");
enableApplicationTagPlacement(true, user);
verifyPlacementUsername(expectedQueue, user, expectedUser);
}
@Test
/**
* Test case for when the application tag based placement is enabled and
* the submitting user 'user1' is whitelisted and there are multiple valid
* username tags passed
* Expected behaviour: the placement is done for the first valid username
* from the tag 'user2'
*/
public void testGetUserNameForPlacementTagBasedPlacementMultipleUserIds()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String expectedUser = "user2";
String userIdTag = "u=" + expectedUser;
String userIdTag2 = "u=user3";
setApplicationTags("tag1", userIdTag, "tag2", userIdTag2);
enableApplicationTagPlacement(true, user);
verifyPlacementUsername(expectedQueue, user, expectedUser);
}
@Test
/**
* Test case for when the application tag based placement is enabled but
* no username is set in the application tag
* Expected behaviour: the placement is done for the submitting user 'user1'
*/
public void testGetUserNameForPlacementTagBasedPlacementNoUserId()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
setApplicationTags("tag1", "tag2");
enableApplicationTagPlacement(true, user);
verifyPlacementUsername(expectedQueue, user, user);
}
@Test
/**
* Test case for when the application tag based placement is enabled but
* the user from the application tag 'user2' does not have access to the
* queue.
* Expected behaviour: the placement is done for the submitting user 'user1'
*/
public void testGetUserNameForPlacementUserWithoutAccessToQueue()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String userIdTag = "u=user2";
setApplicationTags("tag1", userIdTag, "tag2");
enableApplicationTagPlacement(false, user);
verifyPlacementUsername(expectedQueue, user, user);
}
@Test
/**
* Test case for when the application tag based placement is enabled but
* the submitting user 'user1' is not whitelisted and there is a valid
* username tag passed.
* Expected behaviour: the placement is done for the submitting user 'user1'
*/
public void testGetUserNameForPlacementNotWhitelistedUser()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String userIdTag = "u=user2";
setApplicationTags("tag1", userIdTag, "tag2");
enableApplicationTagPlacement(true, "someUser");
verifyPlacementUsername(expectedQueue, user, user);
}
@Test
/**
* Test case for when the application tag based placement is enabled but
* there is no whitelisted user.
* Expected behaviour: the placement is done for the submitting user 'user1'
*/
public void testGetUserNameForPlacementEmptyWhiteList()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String userIdTag = "u=user2";
setApplicationTags("tag1", userIdTag, "tag2");
enableApplicationTagPlacement(false);
verifyPlacementUsername(expectedQueue, user, user);
}
@Test
/**
* Test case for when the application tag based placement is enabled and
* there is one wrongly qualified user 'u=' and a valid user 'u=user2' passed
* via application tag.
* Expected behaviour: the placement is done for the first valid username
* from the tag 'user2'
*/
public void testGetUserNameForPlacementWronglyQualifiedFirstUserNameInTag()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String expectedUser = "user2";
String userIdTag = "u=" + expectedUser;
String wrongUserIdTag = "u=";
setApplicationTags("tag1", wrongUserIdTag, userIdTag, "tag2");
enableApplicationTagPlacement(true, user);
verifyPlacementUsername(expectedQueue, user, expectedUser);
}
@Test
/**
* Test case for when the application tag based placement is enabled and
* there is only one wrongly qualified user 'u=' passed via application tag.
* Expected behaviour: the placement is done for the submitting user 'user1'
*/
public void testGetUserNameForPlacementWronglyQualifiedUserNameInTag()
throws YarnException {
String user = "user1";
String expectedQueue = "user1Queue";
String wrongUserIdTag = "u=";
setApplicationTags("tag1", wrongUserIdTag, "tag2");
enableApplicationTagPlacement(true, user);
verifyPlacementUsername(expectedQueue, user, user);
}
private void enableApplicationTagPlacement(boolean userHasAccessToQueue,
String... whiteListedUsers) {
Configuration conf = new Configuration();
conf.setBoolean(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_ENABLED, true);
conf.setStrings(YarnConfiguration
.APPLICATION_TAG_BASED_PLACEMENT_USER_WHITELIST, whiteListedUsers);
((RMContextImpl) rmContext).setYarnConfiguration(conf);
ResourceScheduler scheduler = mockResourceScheduler();
when(scheduler.checkAccess(any(UserGroupInformation.class),
eq(QueueACL.SUBMIT_APPLICATIONS), any(String.class)))
.thenReturn(userHasAccessToQueue);
ApplicationMasterService masterService =
new ApplicationMasterService(rmContext, scheduler);
appMonitor = new TestRMAppManager(rmContext,
new ClientToAMTokenSecretManagerInRM(),
scheduler, masterService,
new ApplicationACLsManager(conf), conf);
}
private void verifyPlacementUsername(final String queue,
final String submittingUser, final String expectedUser)
throws YarnException {
PlacementManager placementMgr = mock(PlacementManager.class);
ApplicationPlacementContext appContext
= new ApplicationPlacementContext(queue);
when(placementMgr.placeApplication(asContext, submittingUser))
.thenReturn(appContext);
String userNameForPlacement = appMonitor
.getUserNameForPlacement(submittingUser, asContext, placementMgr);
Assert.assertEquals(expectedUser, userNameForPlacement);
}
private void setApplicationTags(String... tags) {
Set<String> applicationTags = new TreeSet<>();
Collections.addAll(applicationTags, tags);
asContext.setApplicationTags(applicationTags);
}
} }