YARN-1423. Support queue placement by secondary group in the Fair Scheduler (Ted Malaska via Sandy Ryza)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1545157 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Sanford Ryza 2013-11-25 06:04:26 +00:00
parent 4ef4425825
commit 38f94dc16d
6 changed files with 53 additions and 9 deletions

View File

@ -117,6 +117,9 @@ Release 2.3.0 - UNRELEASED
YARN-1303. Fixed DistributedShell to not fail with multiple commands separated YARN-1303. Fixed DistributedShell to not fail with multiple commands separated
by a semi-colon as shell-command. (Xuan Gong via vinodkv) by a semi-colon as shell-command. (Xuan Gong via vinodkv)
YARN-1423. Support queue placement by secondary group in the Fair Scheduler
(Ted Malaska via Sandy Ryza)
OPTIMIZATIONS OPTIMIZATIONS
BUG FIXES BUG FIXES

View File

@ -39,6 +39,8 @@ public class QueuePlacementPolicy {
new HashMap<String, Class<? extends QueuePlacementRule>>(); new HashMap<String, Class<? extends QueuePlacementRule>>();
map.put("user", QueuePlacementRule.User.class); map.put("user", QueuePlacementRule.User.class);
map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class); map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class);
map.put("secondaryGroupExistingQueue",
QueuePlacementRule.SecondaryGroupExistingQueue.class);
map.put("specified", QueuePlacementRule.Specified.class); map.put("specified", QueuePlacementRule.Specified.class);
map.put("default", QueuePlacementRule.Default.class); map.put("default", QueuePlacementRule.Default.class);
map.put("reject", QueuePlacementRule.Reject.class); map.put("reject", QueuePlacementRule.Reject.class);

View File

@ -20,6 +20,7 @@
import java.io.IOException; import java.io.IOException;
import java.util.Collection; import java.util.Collection;
import java.util.HashMap; import java.util.HashMap;
import java.util.List;
import java.util.Map; import java.util.Map;
import org.apache.hadoop.security.Groups; import org.apache.hadoop.security.Groups;
@ -58,7 +59,7 @@ public QueuePlacementRule initialize(boolean create, Map<String, String> args) {
*/ */
public String assignAppToQueue(String requestedQueue, String user, public String assignAppToQueue(String requestedQueue, String user,
Groups groups, Collection<String> configuredQueues) throws IOException { Groups groups, Collection<String> configuredQueues) throws IOException {
String queue = getQueueForApp(requestedQueue, user, groups); String queue = getQueueForApp(requestedQueue, user, groups, configuredQueues);
if (create || configuredQueues.contains(queue)) { if (create || configuredQueues.contains(queue)) {
return queue; return queue;
} else { } else {
@ -103,7 +104,7 @@ public void initializeFromXml(Element el) {
* continue to the next rule. * continue to the next rule.
*/ */
protected abstract String getQueueForApp(String requestedQueue, String user, protected abstract String getQueueForApp(String requestedQueue, String user,
Groups groups) throws IOException; Groups groups, Collection<String> configuredQueues) throws IOException;
/** /**
* Places apps in queues by username of the submitter * Places apps in queues by username of the submitter
@ -111,7 +112,7 @@ protected abstract String getQueueForApp(String requestedQueue, String user,
public static class User extends QueuePlacementRule { public static class User extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue,
String user, Groups groups) { String user, Groups groups, Collection<String> configuredQueues) {
return "root." + user; return "root." + user;
} }
@ -127,7 +128,8 @@ public boolean isTerminal() {
public static class PrimaryGroup extends QueuePlacementRule { public static class PrimaryGroup extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue,
String user, Groups groups) throws IOException { String user, Groups groups,
Collection<String> configuredQueues) throws IOException {
return "root." + groups.getGroups(user).get(0); return "root." + groups.getGroups(user).get(0);
} }
@ -136,6 +138,33 @@ public boolean isTerminal() {
return create; return create;
} }
} }
/**
* Places apps in queues by secondary group of the submitter
*
* Match will be made on first secondary group that exist in
* queues
*/
public static class SecondaryGroupExistingQueue extends QueuePlacementRule {
@Override
protected String getQueueForApp(String requestedQueue,
String user, Groups groups,
Collection<String> configuredQueues) throws IOException {
List<String> groupNames = groups.getGroups(user);
for (int i = 1; i < groupNames.size(); i++) {
if (configuredQueues.contains("root." + groupNames.get(i))) {
return "root." + groupNames.get(i);
}
}
return "";
}
@Override
public boolean isTerminal() {
return create;
}
}
/** /**
* Places apps in queues by requested queue of the submitter * Places apps in queues by requested queue of the submitter
@ -143,7 +172,7 @@ public boolean isTerminal() {
public static class Specified extends QueuePlacementRule { public static class Specified extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, protected String getQueueForApp(String requestedQueue,
String user, Groups groups) { String user, Groups groups, Collection<String> configuredQueues) {
if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) {
return ""; return "";
} else { } else {
@ -166,7 +195,7 @@ public boolean isTerminal() {
public static class Default extends QueuePlacementRule { public static class Default extends QueuePlacementRule {
@Override @Override
protected String getQueueForApp(String requestedQueue, String user, protected String getQueueForApp(String requestedQueue, String user,
Groups groups) { Groups groups, Collection<String> configuredQueues) {
return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME;
} }
@ -188,7 +217,7 @@ public String assignAppToQueue(String requestedQueue, String user,
@Override @Override
protected String getQueueForApp(String requestedQueue, String user, protected String getQueueForApp(String requestedQueue, String user,
Groups groups) { Groups groups, Collection<String> configuredQueues) {
throw new UnsupportedOperationException(); throw new UnsupportedOperationException();
} }

View File

@ -28,7 +28,7 @@ public class SimpleGroupsMapping implements GroupMappingServiceProvider {
@Override @Override
public List<String> getGroups(String user) { public List<String> getGroups(String user) {
return Arrays.asList(user + "group"); return Arrays.asList(user + "group", user + "subgroup1", user + "subgroup2");
} }
@Override @Override

View File

@ -682,8 +682,10 @@ public void testQueuePlacementWithPolicy() throws Exception {
rules.add(new QueuePlacementRule.Specified().initialize(true, null)); rules.add(new QueuePlacementRule.Specified().initialize(true, null));
rules.add(new QueuePlacementRule.User().initialize(false, null)); rules.add(new QueuePlacementRule.User().initialize(false, null));
rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null)); rules.add(new QueuePlacementRule.PrimaryGroup().initialize(false, null));
rules.add(new QueuePlacementRule.SecondaryGroupExistingQueue().initialize(false, null));
rules.add(new QueuePlacementRule.Default().initialize(true, null)); rules.add(new QueuePlacementRule.Default().initialize(true, null));
Set<String> queues = Sets.newHashSet("root.user1", "root.user3group"); Set<String> queues = Sets.newHashSet("root.user1", "root.user3group",
"root.user4subgroup1", "root.user4subgroup2" , "root.user5subgroup2");
scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy( scheduler.getQueueManager().placementPolicy = new QueuePlacementPolicy(
rules, queues, conf); rules, queues, conf);
appId = createSchedulingRequest(1024, "somequeue", "user1"); appId = createSchedulingRequest(1024, "somequeue", "user1");
@ -692,6 +694,10 @@ public void testQueuePlacementWithPolicy() throws Exception {
assertEquals("root.user1", apps.get(appId).getQueueName()); assertEquals("root.user1", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user3"); appId = createSchedulingRequest(1024, "default", "user3");
assertEquals("root.user3group", apps.get(appId).getQueueName()); assertEquals("root.user3group", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user4");
assertEquals("root.user4subgroup1", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "user5");
assertEquals("root.user5subgroup2", apps.get(appId).getQueueName());
appId = createSchedulingRequest(1024, "default", "otheruser"); appId = createSchedulingRequest(1024, "default", "otheruser");
assertEquals("root.default", apps.get(appId).getQueueName()); assertEquals("root.default", apps.get(appId).getQueueName());

View File

@ -287,6 +287,10 @@ Allocation file format
* primaryGroup: the app is placed into a queue with the name of the * primaryGroup: the app is placed into a queue with the name of the
primary group of the user who submitted it. primary group of the user who submitted it.
* secondaryGroupExistingQueue: the app is placed into a queue with a name
that matches a secondary group of the user who submitted it. The first
secondary group that matches a configured queue will be selected.
* default: the app is placed into the queue named "default". * default: the app is placed into the queue named "default".
* reject: the app is rejected. * reject: the app is rejected.