diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java new file mode 100644 index 0000000000..402b47a0e4 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementPolicy.java @@ -0,0 +1,121 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collections; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.util.ReflectionUtils; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; + +public class QueuePlacementPolicy { + private static final Map> ruleClasses; + static { + Map> map = + new HashMap>(); + map.put("user", QueuePlacementRule.User.class); + map.put("primaryGroup", QueuePlacementRule.PrimaryGroup.class); + map.put("specified", QueuePlacementRule.Specified.class); + map.put("default", QueuePlacementRule.Default.class); + map.put("reject", QueuePlacementRule.Reject.class); + ruleClasses = Collections.unmodifiableMap(map); + } + + private final List rules; + private final Set configuredQueues; + private final Groups groups; + + public QueuePlacementPolicy(List rules, + Set configuredQueues, Configuration conf) + throws AllocationConfigurationException { + for (int i = 0; i < rules.size()-1; i++) { + if (rules.get(i).isTerminal()) { + throw new AllocationConfigurationException("Rules after rule " + + i + " in queue placement policy can never be reached"); + } + } + if (!rules.get(rules.size()-1).isTerminal()) { + throw new AllocationConfigurationException( + "Could get past last queue placement rule without assigning"); + } + this.rules = rules; + this.configuredQueues = configuredQueues; + groups = new Groups(conf); + } + + /** + * Builds a QueuePlacementPolicy from an xml element. + */ + public static QueuePlacementPolicy fromXml(Element el, Set configuredQueues, + Configuration conf) throws AllocationConfigurationException { + List rules = new ArrayList(); + NodeList elements = el.getChildNodes(); + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element)node; + String ruleName = element.getTagName(); + Class clazz = ruleClasses.get(ruleName); + if (clazz == null) { + throw new AllocationConfigurationException("No rule class found for " + + ruleName); + } + QueuePlacementRule rule = ReflectionUtils.newInstance(clazz, null); + rule.initializeFromXml(element); + rules.add(rule); + } + } + return new QueuePlacementPolicy(rules, configuredQueues, conf); + } + + /** + * Applies this rule to an app with the given requested queue and user/group + * information. + * + * @param requestedQueue + * The queue specified in the ApplicationSubmissionContext + * @param user + * The user submitting the app + * @return + * The name of the queue to assign the app to. Or null if the app should + * be rejected. + * @throws IOException + * If an exception is encountered while getting the user's groups + */ + public String assignAppToQueue(String requestedQueue, String user) + throws IOException { + for (QueuePlacementRule rule : rules) { + String queue = rule.assignAppToQueue(requestedQueue, user, groups, + configuredQueues); + if (queue == null || !queue.isEmpty()) { + return queue; + } + } + throw new IllegalStateException("Should have applied a rule before " + + "reaching here"); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java new file mode 100644 index 0000000000..95acdcae04 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/QueuePlacementRule.java @@ -0,0 +1,200 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.IOException; +import java.util.Collection; +import java.util.HashMap; +import java.util.Map; + +import org.apache.hadoop.security.Groups; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.w3c.dom.Element; +import org.w3c.dom.NamedNodeMap; +import org.w3c.dom.Node; + +public abstract class QueuePlacementRule { + protected boolean create; + + /** + * Initializes the rule with any arguments. + * + * @param args + * Additional attributes of the rule's xml element other than create. + */ + public QueuePlacementRule initialize(boolean create, Map args) { + this.create = create; + return this; + } + + /** + * + * @param requestedQueue + * The queue explicitly requested. + * @param user + * The user submitting the app. + * @param groups + * The groups of the user submitting the app. + * @param configuredQueues + * The queues specified in the scheduler configuration. + * @return + * The queue to place the app into. An empty string indicates that we should + * continue to the next rule, and null indicates that the app should be rejected. + */ + public String assignAppToQueue(String requestedQueue, String user, + Groups groups, Collection configuredQueues) throws IOException { + String queue = getQueueForApp(requestedQueue, user, groups); + if (create || configuredQueues.contains(queue)) { + return queue; + } else { + return ""; + } + } + + public void initializeFromXml(Element el) { + boolean create = true; + NamedNodeMap attributes = el.getAttributes(); + Map args = new HashMap(); + for (int i = 0; i < attributes.getLength(); i++) { + Node node = attributes.item(i); + String key = node.getNodeName(); + String value = node.getNodeValue(); + if (key.equals("create")) { + create = Boolean.parseBoolean(value); + } else { + args.put(key, value); + } + } + initialize(create, args); + } + + /** + * Returns true if this rule never tells the policy to continue. + */ + public abstract boolean isTerminal(); + + /** + * Applies this rule to an app with the given requested queue and user/group + * information. + * + * @param requestedQueue + * The queue specified in the ApplicationSubmissionContext + * @param user + * The user submitting the app. + * @param groups + * The groups of the user submitting the app. + * @return + * The name of the queue to assign the app to, or null to empty string + * continue to the next rule. + */ + protected abstract String getQueueForApp(String requestedQueue, String user, + Groups groups) throws IOException; + + /** + * Places apps in queues by username of the submitter + */ + public static class User extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, + String user, Groups groups) { + return "root." + user; + } + + @Override + public boolean isTerminal() { + return create; + } + } + + /** + * Places apps in queues by primary group of the submitter + */ + public static class PrimaryGroup extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, + String user, Groups groups) throws IOException { + return "root." + groups.getGroups(user).get(0); + } + + @Override + public boolean isTerminal() { + return create; + } + } + + /** + * Places apps in queues by requested queue of the submitter + */ + public static class Specified extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, + String user, Groups groups) { + if (requestedQueue.equals(YarnConfiguration.DEFAULT_QUEUE_NAME)) { + return ""; + } else { + if (!requestedQueue.startsWith("root.")) { + requestedQueue = "root." + requestedQueue; + } + return requestedQueue; + } + } + + @Override + public boolean isTerminal() { + return false; + } + } + + /** + * Places all apps in the default queue + */ + public static class Default extends QueuePlacementRule { + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups) { + return "root." + YarnConfiguration.DEFAULT_QUEUE_NAME; + } + + @Override + public boolean isTerminal() { + return create; + } + } + + /** + * Rejects all apps + */ + public static class Reject extends QueuePlacementRule { + @Override + public String assignAppToQueue(String requestedQueue, String user, + Groups groups, Collection configuredQueues) { + return null; + } + + @Override + protected String getQueueForApp(String requestedQueue, String user, + Groups groups) { + throw new UnsupportedOperationException(); + } + + @Override + public boolean isTerminal() { + return true; + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java new file mode 100644 index 0000000000..24dd65d7f5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/SimpleGroupsMapping.java @@ -0,0 +1,44 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import java.io.IOException; +import java.util.Arrays; +import java.util.List; + +import org.apache.hadoop.security.GroupMappingServiceProvider; + +public class SimpleGroupsMapping implements GroupMappingServiceProvider { + + @Override + public List getGroups(String user) { + return Arrays.asList(user + "group"); + } + + @Override + public void cacheGroupsRefresh() throws IOException { + throw new UnsupportedOperationException(); + } + + @Override + public void cacheGroupsAdd(List groups) throws IOException { + throw new UnsupportedOperationException(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java new file mode 100644 index 0000000000..130fe35327 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestQueuePlacementPolicy.java @@ -0,0 +1,119 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; + +import static org.junit.Assert.assertEquals; + +import java.util.Set; + +import javax.xml.parsers.DocumentBuilder; +import javax.xml.parsers.DocumentBuilderFactory; + +import org.apache.commons.io.IOUtils; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.security.GroupMappingServiceProvider; +import org.junit.BeforeClass; +import org.junit.Test; +import org.w3c.dom.Document; +import org.w3c.dom.Element; + +import com.google.common.collect.Sets; + +public class TestQueuePlacementPolicy { + private final static Configuration conf = new Configuration(); + private final static Set configuredQueues = Sets.newHashSet("root.someuser"); + + @BeforeClass + public static void setup() { + conf.setClass(CommonConfigurationKeys.HADOOP_SECURITY_GROUP_MAPPING, + SimpleGroupsMapping.class, GroupMappingServiceProvider.class); + } + + @Test + public void testSpecifiedUserPolicy() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(""); + QueuePlacementPolicy policy = parse(sb.toString()); + assertEquals("root.specifiedq",policy.assignAppToQueue("specifiedq", "someuser")); + assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); + assertEquals("root.otheruser", policy.assignAppToQueue("default", "otheruser")); + } + + @Test + public void testNoCreate() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + QueuePlacementPolicy policy = parse(sb.toString()); + assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); + assertEquals("root.someuser", policy.assignAppToQueue("default", "someuser")); + assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "otheruser")); + assertEquals("root.default", policy.assignAppToQueue("default", "otheruser")); + } + + @Test + public void testSpecifiedThenReject() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(""); + QueuePlacementPolicy policy = parse(sb.toString()); + assertEquals("root.specifiedq", policy.assignAppToQueue("specifiedq", "someuser")); + assertEquals(null, policy.assignAppToQueue("default", "someuser")); + } + + @Test (expected = AllocationConfigurationException.class) + public void testOmittedTerminalRule() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(""); + parse(sb.toString()); + } + + @Test (expected = AllocationConfigurationException.class) + public void testTerminalRuleInMiddle() throws Exception { + StringBuffer sb = new StringBuffer(); + sb.append(""); + sb.append(" "); + sb.append(" "); + sb.append(" "); + sb.append(""); + parse(sb.toString()); + } + + private QueuePlacementPolicy parse(String str) throws Exception { + // Read and parse the allocations file. + DocumentBuilderFactory docBuilderFactory = + DocumentBuilderFactory.newInstance(); + docBuilderFactory.setIgnoringComments(true); + DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); + Document doc = builder.parse(IOUtils.toInputStream(str)); + Element root = doc.getDocumentElement(); + return QueuePlacementPolicy.fromXml(root, configuredQueues, conf); + } +}