From 814d701d46b4ff87f6ec94ba39667c80475c38d7 Mon Sep 17 00:00:00 2001 From: Robert Kanter Date: Tue, 6 Feb 2018 14:36:49 -0800 Subject: [PATCH] YARN-7841. Cleanup AllocationFileLoaderService's reloadAllocations method (snemeth via rkanter) --- .../fair/AllocationConfiguration.java | 99 ++-- .../fair/AllocationFileLoaderService.java | 491 ++++-------------- .../fair/allocation/AllocationFileParser.java | 258 +++++++++ .../allocation/AllocationFileQueueParser.java | 268 ++++++++++ .../fair/allocation/QueueProperties.java | 280 ++++++++++ .../fair/TestAllocationFileLoaderService.java | 187 ++++--- .../allocationfile/AllocationFileQueue.java | 82 +++ .../AllocationFileQueueBuilder.java | 115 ++++ .../AllocationFileQueueProperties.java | 202 +++++++ .../AllocationFileSimpleQueueBuilder.java | 64 +++ .../AllocationFileSubQueueBuilder.java | 54 ++ .../allocationfile/AllocationFileWriter.java | 175 +++++++ .../fair/allocationfile/UserSettings.java | 80 +++ 13 files changed, 1803 insertions(+), 552 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSimpleQueueBuilder.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSubQueueBuilder.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileWriter.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/UserSettings.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java index 3505bca68f..c98aadccf8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationConfiguration.java @@ -31,6 +31,8 @@ import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.QueueProperties; import org.apache.hadoop.yarn.util.resource.Resources; import com.google.common.annotations.VisibleForTesting; @@ -47,7 +49,7 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { private final Map maxChildQueueResources; // Sharing weights for each queue private final Map queueWeights; - + // Max concurrent running applications for each queue and for each user; in addition, // for users that have no max specified, we use the userMaxJobsDefault. @VisibleForTesting @@ -88,13 +90,13 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { private final Set reservableQueues; private final Map schedulingPolicies; - + private final SchedulingPolicy defaultSchedulingPolicy; - + // Policy for mapping apps to queues @VisibleForTesting QueuePlacementPolicy placementPolicy; - + //Configured queues in the alloc xml @VisibleForTesting Map> configuredQueues; @@ -104,53 +106,42 @@ public class AllocationConfiguration extends ReservationSchedulerConfiguration { private final Set nonPreemptableQueues; - public AllocationConfiguration(Map minQueueResources, - Map maxQueueResources, - Map maxChildQueueResources, - Map queueMaxApps, - Map userMaxApps, - Map queueWeights, - Map queueMaxAMShares, int userMaxAppsDefault, - int queueMaxAppsDefault, - ConfigurableResource queueMaxResourcesDefault, - float queueMaxAMShareDefault, - Map schedulingPolicies, - SchedulingPolicy defaultSchedulingPolicy, - Map minSharePreemptionTimeouts, - Map fairSharePreemptionTimeouts, - Map fairSharePreemptionThresholds, - Map> queueAcls, - Map> resAcls, - QueuePlacementPolicy placementPolicy, - Map> configuredQueues, - ReservationQueueConfiguration globalReservationQueueConfig, - Set reservableQueues, - Set nonPreemptableQueues) { - this.minQueueResources = minQueueResources; - this.maxQueueResources = maxQueueResources; - this.maxChildQueueResources = maxChildQueueResources; - this.queueMaxApps = queueMaxApps; - this.userMaxApps = userMaxApps; - this.queueMaxAMShares = queueMaxAMShares; - this.queueWeights = queueWeights; - this.userMaxAppsDefault = userMaxAppsDefault; - this.queueMaxResourcesDefault = queueMaxResourcesDefault; - this.queueMaxAppsDefault = queueMaxAppsDefault; - this.queueMaxAMShareDefault = queueMaxAMShareDefault; - this.defaultSchedulingPolicy = defaultSchedulingPolicy; - this.schedulingPolicies = schedulingPolicies; - this.minSharePreemptionTimeouts = minSharePreemptionTimeouts; - this.fairSharePreemptionTimeouts = fairSharePreemptionTimeouts; - this.fairSharePreemptionThresholds = fairSharePreemptionThresholds; - this.queueAcls = queueAcls; - this.resAcls = resAcls; - this.reservableQueues = reservableQueues; + public AllocationConfiguration(QueueProperties queueProperties, + AllocationFileParser allocationFileParser, + QueuePlacementPolicy newPlacementPolicy, + ReservationQueueConfiguration globalReservationQueueConfig) + throws AllocationConfigurationException { + this.minQueueResources = queueProperties.getMinQueueResources(); + this.maxQueueResources = queueProperties.getMaxQueueResources(); + this.maxChildQueueResources = queueProperties.getMaxChildQueueResources(); + this.queueMaxApps = queueProperties.getQueueMaxApps(); + this.userMaxApps = allocationFileParser.getUserMaxApps(); + this.queueMaxAMShares = queueProperties.getQueueMaxAMShares(); + this.queueWeights = queueProperties.getQueueWeights(); + this.userMaxAppsDefault = allocationFileParser.getUserMaxAppsDefault(); + this.queueMaxResourcesDefault = + allocationFileParser.getQueueMaxResourcesDefault(); + this.queueMaxAppsDefault = allocationFileParser.getQueueMaxAppsDefault(); + this.queueMaxAMShareDefault = + allocationFileParser.getQueueMaxAMShareDefault(); + this.defaultSchedulingPolicy = + allocationFileParser.getDefaultSchedulingPolicy(); + this.schedulingPolicies = queueProperties.getQueuePolicies(); + this.minSharePreemptionTimeouts = + queueProperties.getMinSharePreemptionTimeouts(); + this.fairSharePreemptionTimeouts = + queueProperties.getFairSharePreemptionTimeouts(); + this.fairSharePreemptionThresholds = + queueProperties.getFairSharePreemptionThresholds(); + this.queueAcls = queueProperties.getQueueAcls(); + this.resAcls = queueProperties.getReservationAcls(); + this.reservableQueues = queueProperties.getReservableQueues(); this.globalReservationQueueConfig = globalReservationQueueConfig; - this.placementPolicy = placementPolicy; - this.configuredQueues = configuredQueues; - this.nonPreemptableQueues = nonPreemptableQueues; + this.placementPolicy = newPlacementPolicy; + this.configuredQueues = queueProperties.getConfiguredQueues(); + this.nonPreemptableQueues = queueProperties.getNonPreemptableQueues(); } - + public AllocationConfiguration(Configuration conf) { minQueueResources = new HashMap<>(); maxChildQueueResources = new HashMap<>(); @@ -179,7 +170,7 @@ public AllocationConfiguration(Configuration conf) { QueuePlacementPolicy.fromConfiguration(conf, configuredQueues); nonPreemptableQueues = new HashSet<>(); } - + /** * Get the ACLs associated with this queue. If a given ACL is not explicitly * configured, include the default value for that ACL. The default for the @@ -300,7 +291,7 @@ ConfigurableResource getMaxResources(String queue) { } return maxQueueResource; } - + /** * Get the maximum resource allocation for children of the given queue. * @@ -317,15 +308,15 @@ SchedulingPolicy getSchedulingPolicy(String queueName) { SchedulingPolicy policy = schedulingPolicies.get(queueName); return (policy == null) ? defaultSchedulingPolicy : policy; } - + public SchedulingPolicy getDefaultSchedulingPolicy() { return defaultSchedulingPolicy; } - + public Map> getConfiguredQueues() { return configuredQueues; } - + public QueuePlacementPolicy getPlacementPolicy() { return placementPolicy; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java index f73e05ff33..d8d9051830 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/AllocationFileLoaderService.java @@ -29,24 +29,20 @@ import org.apache.hadoop.security.authorize.AccessControlList; import org.apache.hadoop.service.AbstractService; import org.apache.hadoop.yarn.api.records.QueueACL; -import org.apache.hadoop.yarn.api.records.ReservationACL; -import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.security.AccessType; import org.apache.hadoop.yarn.security.Permission; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.security.PrivilegedEntity.EntityType; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.QueueProperties; import org.apache.hadoop.yarn.util.Clock; import org.apache.hadoop.yarn.util.SystemClock; -import org.apache.hadoop.yarn.util.resource.Resources; import org.w3c.dom.Document; import org.w3c.dom.Element; -import org.w3c.dom.Node; import org.w3c.dom.NodeList; -import org.w3c.dom.Text; import org.xml.sax.SAXException; - import javax.xml.parsers.DocumentBuilder; import javax.xml.parsers.DocumentBuilderFactory; import javax.xml.parsers.ParserConfigurationException; @@ -54,18 +50,18 @@ import java.net.URL; import java.util.ArrayList; import java.util.HashMap; -import java.util.HashSet; import java.util.List; import java.util.Map; -import java.util.Set; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser.EVERYBODY_ACL; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation.AllocationFileQueueParser.ROOT; @Public @Unstable public class AllocationFileLoaderService extends AbstractService { - + public static final Log LOG = LogFactory.getLog( AllocationFileLoaderService.class.getName()); - + /** Time to wait between checks of the allocation file */ public static final long ALLOC_RELOAD_INTERVAL_MS = 10 * 1000; @@ -80,11 +76,6 @@ public class AllocationFileLoaderService extends AbstractService { //Permitted allocation file filesystems (case insensitive) private static final String SUPPORTED_FS_REGEX = "(?i)(hdfs)|(file)|(s3a)|(viewfs)"; - private static final String ROOT = "root"; - private static final AccessControlList EVERYBODY_ACL = - new AccessControlList("*"); - private static final AccessControlList NOBODY_ACL = - new AccessControlList(" "); private final Clock clock; @@ -97,10 +88,10 @@ public class AllocationFileLoaderService extends AbstractService { private FileSystem fs; private Listener reloadListener; - + @VisibleForTesting long reloadIntervalMs = ALLOC_RELOAD_INTERVAL_MS; - + private Thread reloadThread; private volatile boolean running = true; @@ -114,7 +105,7 @@ public AllocationFileLoaderService(Clock clock) { super(AllocationFileLoaderService.class.getName()); this.clock = clock; } - + @Override public void serviceInit(Configuration conf) throws Exception { this.allocFile = getAllocationFile(conf); @@ -161,7 +152,7 @@ public void serviceInit(Configuration conf) throws Exception { } super.serviceInit(conf); } - + @Override public void serviceStart() throws Exception { if (reloadThread != null) { @@ -169,7 +160,7 @@ public void serviceStart() throws Exception { } super.serviceStart(); } - + @Override public void serviceStop() throws Exception { running = false; @@ -183,7 +174,7 @@ public void serviceStop() throws Exception { } super.serviceStop(); } - + /** * Path to XML file containing allocations. If the * path is relative, it is searched for in the @@ -215,11 +206,11 @@ public Path getAllocationFile(Configuration conf) } return allocPath; } - + public synchronized void setReloadListener(Listener reloadListener) { this.reloadListener = reloadListener; } - + /** * Updates the allocation list from the allocation config file. This file is * expected to be in the XML format specified in the design doc. @@ -229,417 +220,111 @@ public synchronized void setReloadListener(Listener reloadListener) { * @throws ParserConfigurationException if XML parser is misconfigured. * @throws SAXException if config file is malformed. */ - public synchronized void reloadAllocations() throws IOException, - ParserConfigurationException, SAXException, + public synchronized void reloadAllocations() + throws IOException, ParserConfigurationException, SAXException, AllocationConfigurationException { if (allocFile == null) { reloadListener.onReload(null); return; } LOG.info("Loading allocation file " + allocFile); - // Create some temporary hashmaps to hold the new allocs, and we only save - // them in our fields if we have parsed the entire allocs file successfully. - Map minQueueResources = new HashMap<>(); - Map maxQueueResources = new HashMap<>(); - Map maxChildQueueResources = new HashMap<>(); - Map queueMaxApps = new HashMap<>(); - Map userMaxApps = new HashMap<>(); - Map queueMaxAMShares = new HashMap<>(); - Map queueWeights = new HashMap<>(); - Map queuePolicies = new HashMap<>(); - Map minSharePreemptionTimeouts = new HashMap<>(); - Map fairSharePreemptionTimeouts = new HashMap<>(); - Map fairSharePreemptionThresholds = new HashMap<>(); - Map> queueAcls = - new HashMap<>(); - Map> reservationAcls = - new HashMap<>(); - Set reservableQueues = new HashSet<>(); - Set nonPreemptableQueues = new HashSet<>(); - int userMaxAppsDefault = Integer.MAX_VALUE; - int queueMaxAppsDefault = Integer.MAX_VALUE; - ConfigurableResource queueMaxResourcesDefault = - new ConfigurableResource(Resources.unbounded()); - float queueMaxAMShareDefault = 0.5f; - long defaultFairSharePreemptionTimeout = Long.MAX_VALUE; - long defaultMinSharePreemptionTimeout = Long.MAX_VALUE; - float defaultFairSharePreemptionThreshold = 0.5f; - SchedulingPolicy defaultSchedPolicy = SchedulingPolicy.DEFAULT_POLICY; - - // Reservation global configuration knobs - String planner = null; - String reservationAgent = null; - String reservationAdmissionPolicy = null; - - QueuePlacementPolicy newPlacementPolicy = null; - - // Remember all queue names so we can display them on web UI, etc. - // configuredQueues is segregated based on whether it is a leaf queue - // or a parent queue. This information is used for creating queues - // and also for making queue placement decisions(QueuePlacementRule.java). - Map> configuredQueues = new HashMap<>(); - - for (FSQueueType queueType : FSQueueType.values()) { - configuredQueues.put(queueType, new HashSet<>()); - } // Read and parse the allocations file. DocumentBuilderFactory docBuilderFactory = - DocumentBuilderFactory.newInstance(); + DocumentBuilderFactory.newInstance(); docBuilderFactory.setIgnoringComments(true); DocumentBuilder builder = docBuilderFactory.newDocumentBuilder(); Document doc = builder.parse(fs.open(allocFile)); Element root = doc.getDocumentElement(); - if (!"allocations".equals(root.getTagName())) - throw new AllocationConfigurationException("Bad fair scheduler config " + - "file: top-level element not "); + if (!"allocations".equals(root.getTagName())) { + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: top-level element not "); + } NodeList elements = root.getChildNodes(); - List queueElements = new ArrayList<>(); - Element placementPolicyElement = null; - for (int i = 0; i < elements.getLength(); i++) { - Node node = elements.item(i); - if (node instanceof Element) { - Element element = (Element)node; - if ("queue".equals(element.getTagName()) || - "pool".equals(element.getTagName())) { - queueElements.add(element); - } else if ("user".equals(element.getTagName())) { - String userName = element.getAttribute("name"); - NodeList fields = element.getChildNodes(); - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) { - continue; - } - Element field = (Element) fieldNode; - if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxApps.put(userName, val); - } - } - } else if ("queueMaxResourcesDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - ConfigurableResource val = - FairSchedulerConfiguration.parseResourceConfigValue(text); - queueMaxResourcesDefault = val; - } else if ("userMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - userMaxAppsDefault = val; - } else if ("defaultFairSharePreemptionTimeout" - .equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - defaultFairSharePreemptionTimeout = val; - } else if ("fairSharePreemptionTimeout".equals(element.getTagName())) { - if (defaultFairSharePreemptionTimeout == Long.MAX_VALUE) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - defaultFairSharePreemptionTimeout = val; - } - } else if ("defaultMinSharePreemptionTimeout" - .equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - defaultMinSharePreemptionTimeout = val; - } else if ("defaultFairSharePreemptionThreshold" - .equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - float val = Float.parseFloat(text); - val = Math.max(Math.min(val, 1.0f), 0.0f); - defaultFairSharePreemptionThreshold = val; - } else if ("queueMaxAppsDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxAppsDefault = val; - } else if ("queueMaxAMShareDefault".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - float val = Float.parseFloat(text); - val = Math.min(val, 1.0f); - queueMaxAMShareDefault = val; - } else if ("defaultQueueSchedulingPolicy".equals(element.getTagName()) - || "defaultQueueSchedulingMode".equals(element.getTagName())) { - String text = ((Text)element.getFirstChild()).getData().trim(); - if (text.equalsIgnoreCase(FifoPolicy.NAME)) { - throw new AllocationConfigurationException("Bad fair scheduler " - + "config file: defaultQueueSchedulingPolicy or " - + "defaultQueueSchedulingMode can't be FIFO."); - } - defaultSchedPolicy = SchedulingPolicy.parse(text); - } else if ("queuePlacementPolicy".equals(element.getTagName())) { - placementPolicyElement = element; - } else if ("reservation-planner".equals(element.getTagName())) { - String text = ((Text) element.getFirstChild()).getData().trim(); - planner = text; - } else if ("reservation-agent".equals(element.getTagName())) { - String text = ((Text) element.getFirstChild()).getData().trim(); - reservationAgent = text; - } else if ("reservation-policy".equals(element.getTagName())) { - String text = ((Text) element.getFirstChild()).getData().trim(); - reservationAdmissionPolicy = text; - } else { - LOG.warn("Bad element in allocations file: " + element.getTagName()); - } - } - } - // Load queue elements. A root queue can either be included or omitted. If - // it's included, all other queues must be inside it. - for (Element element : queueElements) { - String parent = "root"; - if (element.getAttribute("name").equalsIgnoreCase("root")) { - if (queueElements.size() > 1) { - throw new AllocationConfigurationException("If configuring root queue," - + " no other queues can be placed alongside it."); - } - parent = null; - } - loadQueue(parent, element, minQueueResources, maxQueueResources, - maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, - queueWeights, queuePolicies, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - reservationAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); - } + AllocationFileParser allocationFileParser = + new AllocationFileParser(elements); + allocationFileParser.parse(); + + AllocationFileQueueParser queueParser = + new AllocationFileQueueParser(allocationFileParser.getQueueElements()); + QueueProperties queueProperties = queueParser.parse(); // Load placement policy and pass it configured queues Configuration conf = getConfig(); - if (placementPolicyElement != null) { - newPlacementPolicy = QueuePlacementPolicy.fromXml(placementPolicyElement, - configuredQueues, conf); - } else { - newPlacementPolicy = QueuePlacementPolicy.fromConfiguration(conf, - configuredQueues); - } + QueuePlacementPolicy newPlacementPolicy = + getQueuePlacementPolicy(allocationFileParser, queueProperties, conf); + setupRootQueueProperties(allocationFileParser, queueProperties); - // Set the min/fair share preemption timeout for the root queue - if (!minSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)){ - minSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE, - defaultMinSharePreemptionTimeout); - } - if (!fairSharePreemptionTimeouts.containsKey(QueueManager.ROOT_QUEUE)) { - fairSharePreemptionTimeouts.put(QueueManager.ROOT_QUEUE, - defaultFairSharePreemptionTimeout); - } + ReservationQueueConfiguration globalReservationQueueConfig = + createReservationQueueConfig(allocationFileParser); - // Set the fair share preemption threshold for the root queue - if (!fairSharePreemptionThresholds.containsKey(QueueManager.ROOT_QUEUE)) { - fairSharePreemptionThresholds.put(QueueManager.ROOT_QUEUE, - defaultFairSharePreemptionThreshold); - } - - ReservationQueueConfiguration globalReservationQueueConfig = new - ReservationQueueConfiguration(); - if (planner != null) { - globalReservationQueueConfig.setPlanner(planner); - } - if (reservationAdmissionPolicy != null) { - globalReservationQueueConfig.setReservationAdmissionPolicy - (reservationAdmissionPolicy); - } - if (reservationAgent != null) { - globalReservationQueueConfig.setReservationAgent(reservationAgent); - } - - AllocationConfiguration info = - new AllocationConfiguration(minQueueResources, maxQueueResources, - maxChildQueueResources, queueMaxApps, userMaxApps, queueWeights, - queueMaxAMShares, userMaxAppsDefault, queueMaxAppsDefault, - queueMaxResourcesDefault, queueMaxAMShareDefault, queuePolicies, - defaultSchedPolicy, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, fairSharePreemptionThresholds, queueAcls, - reservationAcls, newPlacementPolicy, configuredQueues, - globalReservationQueueConfig, reservableQueues, nonPreemptableQueues); + AllocationConfiguration info = new AllocationConfiguration(queueProperties, + allocationFileParser, newPlacementPolicy, globalReservationQueueConfig); lastSuccessfulReload = clock.getTime(); lastReloadAttemptFailed = false; reloadListener.onReload(info); } - - /** - * Loads a queue from a queue element in the configuration file - */ - private void loadQueue(String parentName, Element element, - Map minQueueResources, - Map maxQueueResources, - Map maxChildQueueResources, - Map queueMaxApps, - Map userMaxApps, - Map queueMaxAMShares, - Map queueWeights, - Map queuePolicies, - Map minSharePreemptionTimeouts, - Map fairSharePreemptionTimeouts, - Map fairSharePreemptionThresholds, - Map> queueAcls, - Map> resAcls, - Map> configuredQueues, - Set reservableQueues, - Set nonPreemptableQueues) + + private QueuePlacementPolicy getQueuePlacementPolicy( + AllocationFileParser allocationFileParser, + QueueProperties queueProperties, Configuration conf) throws AllocationConfigurationException { - String queueName = FairSchedulerUtilities.trimQueueName( - element.getAttribute("name")); - - if (queueName.contains(".")) { - throw new AllocationConfigurationException("Bad fair scheduler config " - + "file: queue name (" + queueName + ") shouldn't contain period."); - } - - if (queueName.isEmpty()) { - throw new AllocationConfigurationException("Bad fair scheduler config " - + "file: queue name shouldn't be empty or " - + "consist only of whitespace."); - } - - if (parentName != null) { - queueName = parentName + "." + queueName; - } - - Map acls = new HashMap<>(); - Map racls = new HashMap<>(); - NodeList fields = element.getChildNodes(); - boolean isLeaf = true; - boolean isReservable = false; - - for (int j = 0; j < fields.getLength(); j++) { - Node fieldNode = fields.item(j); - if (!(fieldNode instanceof Element)) { - continue; - } - Element field = (Element) fieldNode; - if ("minResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - ConfigurableResource val = - FairSchedulerConfiguration.parseResourceConfigValue(text); - minQueueResources.put(queueName, val.getResource()); - } else if ("maxResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - ConfigurableResource val = - FairSchedulerConfiguration.parseResourceConfigValue(text); - maxQueueResources.put(queueName, val); - } else if ("maxChildResources".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - ConfigurableResource val = - FairSchedulerConfiguration.parseResourceConfigValue(text); - maxChildQueueResources.put(queueName, val); - } else if ("maxRunningApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - int val = Integer.parseInt(text); - queueMaxApps.put(queueName, val); - } else if ("maxAMShare".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - float val = Float.parseFloat(text); - val = Math.min(val, 1.0f); - queueMaxAMShares.put(queueName, val); - } else if ("weight".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - double val = Double.parseDouble(text); - queueWeights.put(queueName, (float)val); - } else if ("minSharePreemptionTimeout".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - minSharePreemptionTimeouts.put(queueName, val); - } else if ("fairSharePreemptionTimeout".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - long val = Long.parseLong(text) * 1000L; - fairSharePreemptionTimeouts.put(queueName, val); - } else if ("fairSharePreemptionThreshold".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - float val = Float.parseFloat(text); - val = Math.max(Math.min(val, 1.0f), 0.0f); - fairSharePreemptionThresholds.put(queueName, val); - } else if ("schedulingPolicy".equals(field.getTagName()) - || "schedulingMode".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - SchedulingPolicy policy = SchedulingPolicy.parse(text); - queuePolicies.put(queueName, policy); - } else if ("aclSubmitApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - acls.put(AccessType.SUBMIT_APP, new AccessControlList(text)); - } else if ("aclAdministerApps".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - acls.put(AccessType.ADMINISTER_QUEUE, new AccessControlList(text)); - } else if ("aclAdministerReservations".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - racls.put(ReservationACL.ADMINISTER_RESERVATIONS, - new AccessControlList(text)); - } else if ("aclListReservations".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - racls.put(ReservationACL.LIST_RESERVATIONS, new AccessControlList( - text)); - } else if ("aclSubmitReservations".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData(); - racls.put(ReservationACL.SUBMIT_RESERVATIONS, - new AccessControlList(text)); - } else if ("reservation".equals(field.getTagName())) { - isReservable = true; - reservableQueues.add(queueName); - configuredQueues.get(FSQueueType.PARENT).add(queueName); - } else if ("allowPreemptionFrom".equals(field.getTagName())) { - String text = ((Text)field.getFirstChild()).getData().trim(); - if (!Boolean.parseBoolean(text)) { - nonPreemptableQueues.add(queueName); - } - } else if ("queue".endsWith(field.getTagName()) || - "pool".equals(field.getTagName())) { - loadQueue(queueName, field, minQueueResources, maxQueueResources, - maxChildQueueResources, queueMaxApps, userMaxApps, queueMaxAMShares, - queueWeights, queuePolicies, minSharePreemptionTimeouts, - fairSharePreemptionTimeouts, fairSharePreemptionThresholds, - queueAcls, resAcls, configuredQueues, reservableQueues, - nonPreemptableQueues); - isLeaf = false; - } - } - - // if a leaf in the alloc file is marked as type='parent' - // then store it as a parent queue - if (isLeaf && !"parent".equals(element.getAttribute("type"))) { - configuredQueues.get(FSQueueType.LEAF).add(queueName); + if (allocationFileParser.getQueuePlacementPolicy().isPresent()) { + return QueuePlacementPolicy.fromXml( + allocationFileParser.getQueuePlacementPolicy().get(), + queueProperties.getConfiguredQueues(), conf); } else { - if (isReservable) { - throw new AllocationConfigurationException("The configuration settings" - + " for " + queueName + " are invalid. A queue element that " - + "contains child queue elements or that has the type='parent' " - + "attribute cannot also include a reservation element."); - } - configuredQueues.get(FSQueueType.PARENT).add(queueName); + return QueuePlacementPolicy.fromConfiguration(conf, + queueProperties.getConfiguredQueues()); } - - // Set default acls if not defined - // The root queue defaults to all access - for (QueueACL acl : QueueACL.values()) { - AccessType accessType = SchedulerUtils.toAccessType(acl); - if (acls.get(accessType) == null){ - AccessControlList defaultAcl = queueName.equals(ROOT) ? - EVERYBODY_ACL : NOBODY_ACL; - acls.put(accessType, defaultAcl); - } - } - - queueAcls.put(queueName, acls); - resAcls.put(queueName, racls); - checkMinAndMaxResource(minQueueResources, maxQueueResources, queueName); } - private void checkMinAndMaxResource(Map minResources, - Map maxResources, String queueName) { - - ConfigurableResource maxConfigurableResource = maxResources.get(queueName); - Resource minResource = minResources.get(queueName); - - if (maxConfigurableResource != null && minResource != null) { - Resource maxResource = maxConfigurableResource.getResource(); - - // check whether max resource is bigger or equals to min resource when max - // resource are absolute values - if (maxResource != null && !Resources.fitsIn(minResource, maxResource)) { - LOG.warn(String.format("Queue %s has max resources %s less than " - + "min resources %s", queueName, maxResource, minResource)); - } + private void setupRootQueueProperties( + AllocationFileParser allocationFileParser, + QueueProperties queueProperties) { + // Set the min/fair share preemption timeout for the root queue + if (!queueProperties.getMinSharePreemptionTimeouts() + .containsKey(QueueManager.ROOT_QUEUE)) { + queueProperties.getMinSharePreemptionTimeouts().put( + QueueManager.ROOT_QUEUE, + allocationFileParser.getDefaultMinSharePreemptionTimeout()); } + if (!queueProperties.getFairSharePreemptionTimeouts() + .containsKey(QueueManager.ROOT_QUEUE)) { + queueProperties.getFairSharePreemptionTimeouts().put( + QueueManager.ROOT_QUEUE, + allocationFileParser.getDefaultFairSharePreemptionTimeout()); + } + + // Set the fair share preemption threshold for the root queue + if (!queueProperties.getFairSharePreemptionThresholds() + .containsKey(QueueManager.ROOT_QUEUE)) { + queueProperties.getFairSharePreemptionThresholds().put( + QueueManager.ROOT_QUEUE, + allocationFileParser.getDefaultFairSharePreemptionThreshold()); + } + } + + private ReservationQueueConfiguration createReservationQueueConfig( + AllocationFileParser allocationFileParser) { + ReservationQueueConfiguration globalReservationQueueConfig = + new ReservationQueueConfiguration(); + if (allocationFileParser.getReservationPlanner().isPresent()) { + globalReservationQueueConfig + .setPlanner(allocationFileParser.getReservationPlanner().get()); + } + if (allocationFileParser.getReservationAdmissionPolicy().isPresent()) { + globalReservationQueueConfig.setReservationAdmissionPolicy( + allocationFileParser.getReservationAdmissionPolicy().get()); + } + if (allocationFileParser.getReservationAgent().isPresent()) { + globalReservationQueueConfig.setReservationAgent( + allocationFileParser.getReservationAgent().get()); + } + return globalReservationQueueConfig; } /** diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java new file mode 100644 index 0000000000..161405b765 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileParser.java @@ -0,0 +1,258 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation; + +import com.google.common.collect.Maps; +import com.google.common.collect.Sets; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.AllocationConfigurationException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Optional; +import java.util.Set; + +/** + * Responsible for parsing allocation.xml config file. + * All node's text value is stored to textValues if {@link #VALID_TAG_NAMES} + * contains the tag name. + * Other meaningful fields are also saved in {@link #parse()}. + */ +public class AllocationFileParser { + private static final Logger LOG = + LoggerFactory.getLogger(AllocationFileParser.class); + + private static final String QUEUE_MAX_RESOURCES_DEFAULT = + "queueMaxResourcesDefault"; + private static final String USER_MAX_APPS_DEFAULT = "userMaxAppsDefault"; + private static final String DEFAULT_FAIR_SHARE_PREEMPTION_TIMEOUT = + "defaultFairSharePreemptionTimeout"; + private static final String FAIR_SHARE_PREEMPTION_TIMEOUT = + "fairSharePreemptionTimeout"; + private static final String DEFAULT_MIN_SHARE_PREEMPTION_TIMEOUT = + "defaultMinSharePreemptionTimeout"; + private static final String QUEUE_MAX_APPS_DEFAULT = "queueMaxAppsDefault"; + private static final String DEFAULT_FAIR_SHARE_PREEMPTION_THRESHOLD = + "defaultFairSharePreemptionThreshold"; + private static final String QUEUE_MAX_AM_SHARE_DEFAULT = + "queueMaxAMShareDefault"; + private static final String RESERVATION_PLANNER = "reservation-planner"; + private static final String RESERVATION_AGENT = "reservation-agent"; + private static final String RESERVATION_ADMISSION_POLICY = + "reservation-policy"; + private static final String QUEUE_PLACEMENT_POLICY = "queuePlacementPolicy"; + private static final String QUEUE = "queue"; + private static final String POOL = "pool"; + private static final String USER = "user"; + private static final String USERNAME = "name"; + private static final String MAX_RUNNING_APPS = "maxRunningApps"; + private static final String DEFAULT_QUEUE_SCHEDULING_POLICY = + "defaultQueueSchedulingPolicy"; + private static final String DEFAULT_QUEUE_SCHEDULING_MODE = + "defaultQueueSchedulingMode"; + + private static final Set VALID_TAG_NAMES = + Sets.newHashSet(QUEUE_MAX_RESOURCES_DEFAULT, USER_MAX_APPS_DEFAULT, + DEFAULT_FAIR_SHARE_PREEMPTION_TIMEOUT, FAIR_SHARE_PREEMPTION_TIMEOUT, + DEFAULT_MIN_SHARE_PREEMPTION_TIMEOUT, QUEUE_MAX_APPS_DEFAULT, + DEFAULT_FAIR_SHARE_PREEMPTION_THRESHOLD, QUEUE_MAX_AM_SHARE_DEFAULT, + RESERVATION_PLANNER, RESERVATION_AGENT, RESERVATION_ADMISSION_POLICY, + QUEUE_PLACEMENT_POLICY, QUEUE, POOL, USER, + DEFAULT_QUEUE_SCHEDULING_POLICY, DEFAULT_QUEUE_SCHEDULING_MODE); + + private final NodeList elements; + private final Map textValues = Maps.newHashMap(); + private Element queuePlacementPolicyElement; + private final List queueElements = new ArrayList<>(); + private final Map userMaxApps = new HashMap<>(); + private SchedulingPolicy defaultSchedulingPolicy; + + public AllocationFileParser(NodeList elements) { + this.elements = elements; + } + + public void parse() throws AllocationConfigurationException { + for (int i = 0; i < elements.getLength(); i++) { + Node node = elements.item(i); + if (node instanceof Element) { + Element element = (Element) node; + final String tagName = element.getTagName(); + if (VALID_TAG_NAMES.contains(tagName)) { + if (tagName.equals(QUEUE_PLACEMENT_POLICY)) { + queuePlacementPolicyElement = element; + } else if (isSchedulingPolicy(element)) { + defaultSchedulingPolicy = extractSchedulingPolicy(element); + } else if (isQueue(element)) { + queueElements.add(element); + } else if (tagName.equals(USER)) { + extractUserData(element); + } else { + textValues.put(tagName, getTrimmedTextData(element)); + } + } else { + LOG.warn("Bad element in allocations file: " + tagName); + } + } + } + } + + private boolean isSchedulingPolicy(Element element) { + return DEFAULT_QUEUE_SCHEDULING_POLICY.equals(element.getTagName()) + || DEFAULT_QUEUE_SCHEDULING_MODE.equals(element.getTagName()); + } + + private void extractUserData(Element element) { + final String userName = element.getAttribute(USERNAME); + final NodeList fields = element.getChildNodes(); + for (int j = 0; j < fields.getLength(); j++) { + final Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) { + continue; + } + final Element field = (Element) fieldNode; + if (MAX_RUNNING_APPS.equals(field.getTagName())) { + final String text = getTrimmedTextData(field); + final int val = Integer.parseInt(text); + userMaxApps.put(userName, val); + } + } + } + + private SchedulingPolicy extractSchedulingPolicy(Element element) + throws AllocationConfigurationException { + String text = getTrimmedTextData(element); + if (text.equalsIgnoreCase(FifoPolicy.NAME)) { + throw new AllocationConfigurationException("Bad fair scheduler " + + "config file: defaultQueueSchedulingPolicy or " + + "defaultQueueSchedulingMode can't be FIFO."); + } + return SchedulingPolicy.parse(text); + } + + private boolean isQueue(Element element) { + return element.getTagName().equals(QUEUE) + || element.getTagName().equals(POOL); + } + + private String getTrimmedTextData(Element element) { + return ((Text) element.getFirstChild()).getData().trim(); + } + + public ConfigurableResource getQueueMaxResourcesDefault() + throws AllocationConfigurationException { + Optional value = getTextValue(QUEUE_MAX_RESOURCES_DEFAULT); + if (value.isPresent()) { + return FairSchedulerConfiguration.parseResourceConfigValue(value.get()); + } + return new ConfigurableResource(Resources.unbounded()); + } + + public int getUserMaxAppsDefault() { + Optional value = getTextValue(USER_MAX_APPS_DEFAULT); + return value.map(Integer::parseInt).orElse(Integer.MAX_VALUE); + } + + public long getDefaultFairSharePreemptionTimeout() { + Optional value = getTextValue(FAIR_SHARE_PREEMPTION_TIMEOUT); + Optional defaultValue = + getTextValue(DEFAULT_FAIR_SHARE_PREEMPTION_TIMEOUT); + + if (value.isPresent() && !defaultValue.isPresent()) { + return Long.parseLong(value.get()) * 1000L; + } else if (defaultValue.isPresent()) { + return Long.parseLong(defaultValue.get()) * 1000L; + } + return Long.MAX_VALUE; + } + + public long getDefaultMinSharePreemptionTimeout() { + Optional value = getTextValue(DEFAULT_MIN_SHARE_PREEMPTION_TIMEOUT); + return value.map(v -> Long.parseLong(v) * 1000L).orElse(Long.MAX_VALUE); + } + + public int getQueueMaxAppsDefault() { + Optional value = getTextValue(QUEUE_MAX_APPS_DEFAULT); + return value.map(Integer::parseInt).orElse(Integer.MAX_VALUE); + } + + public float getDefaultFairSharePreemptionThreshold() { + Optional value = + getTextValue(DEFAULT_FAIR_SHARE_PREEMPTION_THRESHOLD); + if (value.isPresent()) { + float floatValue = Float.parseFloat(value.get()); + return Math.max(Math.min(floatValue, 1.0f), 0.0f); + } + return 0.5f; + } + + public float getQueueMaxAMShareDefault() { + Optional value = getTextValue(QUEUE_MAX_AM_SHARE_DEFAULT); + if (value.isPresent()) { + float val = Float.parseFloat(value.get()); + return Math.min(val, 1.0f); + } + return 0.5f; + } + + // Reservation global configuration knobs + public Optional getReservationPlanner() { + return getTextValue(RESERVATION_PLANNER); + } + + public Optional getReservationAgent() { + return getTextValue(RESERVATION_AGENT); + } + + public Optional getReservationAdmissionPolicy() { + return getTextValue(RESERVATION_ADMISSION_POLICY); + } + + public Optional getQueuePlacementPolicy() { + return Optional.ofNullable(queuePlacementPolicyElement); + } + + private Optional getTextValue(String key) { + return Optional.ofNullable(textValues.get(key)); + } + + public List getQueueElements() { + return queueElements; + } + + public Map getUserMaxApps() { + return userMaxApps; + } + + public SchedulingPolicy getDefaultSchedulingPolicy() { + if (defaultSchedulingPolicy != null) { + return defaultSchedulingPolicy; + } + return SchedulingPolicy.DEFAULT_POLICY; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java new file mode 100644 index 0000000000..ec7e4a4033 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/AllocationFileQueueParser.java @@ -0,0 +1,268 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation; + +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.QueueACL; +import org.apache.hadoop.yarn.api.records.ReservationACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.*; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import org.w3c.dom.Element; +import org.w3c.dom.Node; +import org.w3c.dom.NodeList; +import org.w3c.dom.Text; + +import java.util.List; +import java.util.Map; + +/** + * Responsible for loading queue configuration properties + * from a list of {@link Element}s containing queues. + */ +public class AllocationFileQueueParser { + private static final Logger LOG = + LoggerFactory.getLogger(AllocationFileQueueParser.class); + + public static final String ROOT = "root"; + public static final AccessControlList EVERYBODY_ACL = + new AccessControlList("*"); + static final AccessControlList NOBODY_ACL = new AccessControlList(" "); + private static final String MIN_RESOURCES = "minResources"; + private static final String MAX_RESOURCES = "maxResources"; + private static final String MAX_CHILD_RESOURCES = "maxChildResources"; + private static final String MAX_RUNNING_APPS = "maxRunningApps"; + private static final String MAX_AMSHARE = "maxAMShare"; + private static final String WEIGHT = "weight"; + private static final String MIN_SHARE_PREEMPTION_TIMEOUT = + "minSharePreemptionTimeout"; + private static final String FAIR_SHARE_PREEMPTION_TIMEOUT = + "fairSharePreemptionTimeout"; + private static final String FAIR_SHARE_PREEMPTION_THRESHOLD = + "fairSharePreemptionThreshold"; + private static final String SCHEDULING_POLICY = "schedulingPolicy"; + private static final String SCHEDULING_MODE = "schedulingMode"; + private static final String ACL_SUBMIT_APPS = "aclSubmitApps"; + private static final String ACL_ADMINISTER_APPS = "aclAdministerApps"; + private static final String ACL_ADMINISTER_RESERVATIONS = + "aclAdministerReservations"; + private static final String ACL_LIST_RESERVATIONS = "aclListReservations"; + private static final String ACL_SUBMIT_RESERVATIONS = "aclSubmitReservations"; + private static final String RESERVATION = "reservation"; + private static final String ALLOW_PREEMPTION_FROM = "allowPreemptionFrom"; + private static final String QUEUE = "queue"; + private static final String POOL = "pool"; + + private final List elements; + + public AllocationFileQueueParser(List elements) { + this.elements = elements; + } + + // Load queue elements. A root queue can either be included or omitted. If + // it's included, all other queues must be inside it. + public QueueProperties parse() throws AllocationConfigurationException { + QueueProperties.Builder queuePropertiesBuilder = + new QueueProperties.Builder(); + for (Element element : elements) { + String parent = ROOT; + if (element.getAttribute("name").equalsIgnoreCase(ROOT)) { + if (elements.size() > 1) { + throw new AllocationConfigurationException( + "If configuring root queue," + + " no other queues can be placed alongside it."); + } + parent = null; + } + loadQueue(parent, element, queuePropertiesBuilder); + } + + return queuePropertiesBuilder.build(); + } + + /** + * Loads a queue from a queue element in the configuration file. + */ + private void loadQueue(String parentName, Element element, + QueueProperties.Builder builder) throws AllocationConfigurationException { + String queueName = + FairSchedulerUtilities.trimQueueName(element.getAttribute("name")); + + if (queueName.contains(".")) { + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: queue name (" + queueName + ") shouldn't contain period."); + } + + if (queueName.isEmpty()) { + throw new AllocationConfigurationException("Bad fair scheduler config " + + "file: queue name shouldn't be empty or " + + "consist only of whitespace."); + } + + if (parentName != null) { + queueName = parentName + "." + queueName; + } + + NodeList fields = element.getChildNodes(); + boolean isLeaf = true; + boolean isReservable = false; + + for (int j = 0; j < fields.getLength(); j++) { + Node fieldNode = fields.item(j); + if (!(fieldNode instanceof Element)) { + continue; + } + Element field = (Element) fieldNode; + if (MIN_RESOURCES.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + ConfigurableResource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + builder.minQueueResources(queueName, val.getResource()); + } else if (MAX_RESOURCES.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + ConfigurableResource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + builder.maxQueueResources(queueName, val); + } else if (MAX_CHILD_RESOURCES.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + ConfigurableResource val = + FairSchedulerConfiguration.parseResourceConfigValue(text); + builder.maxChildQueueResources(queueName, val); + } else if (MAX_RUNNING_APPS.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + int val = Integer.parseInt(text); + builder.queueMaxApps(queueName, val); + } else if (MAX_AMSHARE.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + float val = Float.parseFloat(text); + val = Math.min(val, 1.0f); + builder.queueMaxAMShares(queueName, val); + } else if (WEIGHT.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + double val = Double.parseDouble(text); + builder.queueWeights(queueName, (float) val); + } else if (MIN_SHARE_PREEMPTION_TIMEOUT.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + long val = Long.parseLong(text) * 1000L; + builder.minSharePreemptionTimeouts(queueName, val); + } else if (FAIR_SHARE_PREEMPTION_TIMEOUT.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + long val = Long.parseLong(text) * 1000L; + builder.fairSharePreemptionTimeouts(queueName, val); + } else if (FAIR_SHARE_PREEMPTION_THRESHOLD.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + float val = Float.parseFloat(text); + val = Math.max(Math.min(val, 1.0f), 0.0f); + builder.fairSharePreemptionThresholds(queueName, val); + } else if (SCHEDULING_POLICY.equals(field.getTagName()) + || SCHEDULING_MODE.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + SchedulingPolicy policy = SchedulingPolicy.parse(text); + builder.queuePolicies(queueName, policy); + } else if (ACL_SUBMIT_APPS.equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + builder.queueAcls(queueName, AccessType.SUBMIT_APP, + new AccessControlList(text)); + } else if (ACL_ADMINISTER_APPS.equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + builder.queueAcls(queueName, AccessType.ADMINISTER_QUEUE, + new AccessControlList(text)); + } else if (ACL_ADMINISTER_RESERVATIONS.equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + builder.reservationAcls(queueName, + ReservationACL.ADMINISTER_RESERVATIONS, + new AccessControlList(text)); + } else if (ACL_LIST_RESERVATIONS.equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + builder.reservationAcls(queueName, ReservationACL.LIST_RESERVATIONS, + new AccessControlList(text)); + } else if (ACL_SUBMIT_RESERVATIONS.equals(field.getTagName())) { + String text = ((Text) field.getFirstChild()).getData(); + builder.reservationAcls(queueName, ReservationACL.SUBMIT_RESERVATIONS, + new AccessControlList(text)); + } else if (RESERVATION.equals(field.getTagName())) { + isReservable = true; + builder.reservableQueues(queueName); + builder.configuredQueues(FSQueueType.PARENT, queueName); + } else if (ALLOW_PREEMPTION_FROM.equals(field.getTagName())) { + String text = getTrimmedTextData(field); + if (!Boolean.parseBoolean(text)) { + builder.nonPreemptableQueues(queueName); + } + } else if (QUEUE.endsWith(field.getTagName()) + || POOL.equals(field.getTagName())) { + loadQueue(queueName, field, builder); + isLeaf = false; + } + } + + // if a leaf in the alloc file is marked as type='parent' + // then store it as a parent queue + if (isLeaf && !"parent".equals(element.getAttribute("type"))) { + builder.configuredQueues(FSQueueType.LEAF, queueName); + } else { + if (isReservable) { + throw new AllocationConfigurationException("The configuration settings" + + " for " + queueName + " are invalid. A queue element that " + + "contains child queue elements or that has the type='parent' " + + "attribute cannot also include a reservation element."); + } + builder.configuredQueues(FSQueueType.PARENT, queueName); + } + + // Set default acls if not defined + // The root queue defaults to all access + for (QueueACL acl : QueueACL.values()) { + AccessType accessType = SchedulerUtils.toAccessType(acl); + if (!builder.isAclDefinedForAccessType(queueName, accessType)) { + AccessControlList defaultAcl = + queueName.equals(ROOT) ? EVERYBODY_ACL : NOBODY_ACL; + builder.queueAcls(queueName, accessType, defaultAcl); + } + } + + checkMinAndMaxResource(builder.getMinQueueResources(), + builder.getMaxQueueResources(), queueName); + } + + private String getTrimmedTextData(Element element) { + return ((Text) element.getFirstChild()).getData().trim(); + } + + private void checkMinAndMaxResource(Map minResources, + Map maxResources, String queueName) { + + ConfigurableResource maxConfigurableResource = maxResources.get(queueName); + Resource minResource = minResources.get(queueName); + + if (maxConfigurableResource != null && minResource != null) { + Resource maxResource = maxConfigurableResource.getResource(); + + // check whether max resource is greater or equals to min resource when + // max resource are absolute values + if (maxResource != null && !Resources.fitsIn(minResource, maxResource)) { + LOG.warn(String.format( + "Queue %s has max resources %s less than " + "min resources %s", + queueName, maxResource, minResource)); + } + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java new file mode 100644 index 0000000000..ee5f179023 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocation/QueueProperties.java @@ -0,0 +1,280 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocation; + +import org.apache.hadoop.security.authorize.AccessControlList; +import org.apache.hadoop.yarn.api.records.ReservationACL; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueueType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.SchedulingPolicy; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; + +/** + * This class is a value class, storing queue properties parsed from the + * allocation.xml config file. Since there are a bunch of properties, properties + * should be added via QueueProperties.Builder. + */ +public class QueueProperties { + // Create some temporary hashmaps to hold the new allocs, and we only save + // them in our fields if we have parsed the entire allocations file + // successfully. + private final Map minQueueResources; + private final Map maxQueueResources; + private final Map maxChildQueueResources; + private final Map queueMaxApps; + private final Map queueMaxAMShares; + private final Map queueWeights; + private final Map queuePolicies; + private final Map minSharePreemptionTimeouts; + private final Map fairSharePreemptionTimeouts; + private final Map fairSharePreemptionThresholds; + private final Map> queueAcls; + private final Map> + reservationAcls; + private final Set reservableQueues; + private final Set nonPreemptableQueues; + private final Map> configuredQueues; + + QueueProperties(Builder builder) { + this.reservableQueues = builder.reservableQueues; + this.minQueueResources = builder.minQueueResources; + this.fairSharePreemptionTimeouts = builder.fairSharePreemptionTimeouts; + this.queueWeights = builder.queueWeights; + this.nonPreemptableQueues = builder.nonPreemptableQueues; + this.configuredQueues = builder.configuredQueues; + this.queueMaxAMShares = builder.queueMaxAMShares; + this.queuePolicies = builder.queuePolicies; + this.fairSharePreemptionThresholds = builder.fairSharePreemptionThresholds; + this.queueMaxApps = builder.queueMaxApps; + this.minSharePreemptionTimeouts = builder.minSharePreemptionTimeouts; + this.maxQueueResources = builder.maxQueueResources; + this.maxChildQueueResources = builder.maxChildQueueResources; + this.reservationAcls = builder.reservationAcls; + this.queueAcls = builder.queueAcls; + } + + public Map> getConfiguredQueues() { + return configuredQueues; + } + + public Map getMinSharePreemptionTimeouts() { + return minSharePreemptionTimeouts; + } + + public Map getFairSharePreemptionTimeouts() { + return fairSharePreemptionTimeouts; + } + + public Map getFairSharePreemptionThresholds() { + return fairSharePreemptionThresholds; + } + + public Map getMinQueueResources() { + return minQueueResources; + } + + public Map getMaxQueueResources() { + return maxQueueResources; + } + + public Map getMaxChildQueueResources() { + return maxChildQueueResources; + } + + public Map getQueueMaxApps() { + return queueMaxApps; + } + + public Map getQueueWeights() { + return queueWeights; + } + + public Map getQueueMaxAMShares() { + return queueMaxAMShares; + } + + public Map getQueuePolicies() { + return queuePolicies; + } + + public Map> getQueueAcls() { + return queueAcls; + } + + public Map> + getReservationAcls() { + return reservationAcls; + } + + public Set getReservableQueues() { + return reservableQueues; + } + + public Set getNonPreemptableQueues() { + return nonPreemptableQueues; + } + + /** + * Builder class for {@link QueueProperties}. + * All methods are adding queue properties to the maps of this builder + * keyed by the queue's name except some methods + * like {@link #isAclDefinedForAccessType(String, AccessType)} or + * {@link #getMinQueueResources()}. + * + */ + public static final class Builder { + private Map minQueueResources = new HashMap<>(); + private Map maxQueueResources = + new HashMap<>(); + private Map maxChildQueueResources = + new HashMap<>(); + private Map queueMaxApps = new HashMap<>(); + private Map queueMaxAMShares = new HashMap<>(); + private Map queueWeights = new HashMap<>(); + private Map queuePolicies = new HashMap<>(); + private Map minSharePreemptionTimeouts = new HashMap<>(); + private Map fairSharePreemptionTimeouts = new HashMap<>(); + private Map fairSharePreemptionThresholds = new HashMap<>(); + private Map> queueAcls = + new HashMap<>(); + private Map> + reservationAcls = new HashMap<>(); + private Set reservableQueues = new HashSet<>(); + private Set nonPreemptableQueues = new HashSet<>(); + // Remember all queue names so we can display them on web UI, etc. + // configuredQueues is segregated based on whether it is a leaf queue + // or a parent queue. This information is used for creating queues + // and also for making queue placement decisions(QueuePlacementRule.java). + private Map> configuredQueues = new HashMap<>(); + + Builder() { + for (FSQueueType queueType : FSQueueType.values()) { + configuredQueues.put(queueType, new HashSet<>()); + } + } + + public static Builder create() { + return new Builder(); + } + + public Builder minQueueResources(String queueName, Resource resource) { + this.minQueueResources.put(queueName, resource); + return this; + } + + public Builder maxQueueResources(String queueName, + ConfigurableResource resource) { + this.maxQueueResources.put(queueName, resource); + return this; + } + + public Builder maxChildQueueResources(String queueName, + ConfigurableResource resource) { + this.maxChildQueueResources.put(queueName, resource); + return this; + } + + public Builder queueMaxApps(String queueName, int value) { + this.queueMaxApps.put(queueName, value); + return this; + } + + public Builder queueMaxAMShares(String queueName, float value) { + this.queueMaxAMShares.put(queueName, value); + return this; + } + + public Builder queueWeights(String queueName, float value) { + this.queueWeights.put(queueName, value); + return this; + } + + public Builder queuePolicies(String queueName, SchedulingPolicy policy) { + this.queuePolicies.put(queueName, policy); + return this; + } + + public Builder minSharePreemptionTimeouts(String queueName, long value) { + this.minSharePreemptionTimeouts.put(queueName, value); + return this; + } + + public Builder fairSharePreemptionTimeouts(String queueName, long value) { + this.fairSharePreemptionTimeouts.put(queueName, value); + return this; + } + + public Builder fairSharePreemptionThresholds(String queueName, + float value) { + this.fairSharePreemptionThresholds.put(queueName, value); + return this; + } + + public Builder queueAcls(String queueName, AccessType accessType, + AccessControlList acls) { + this.queueAcls.putIfAbsent(queueName, new HashMap<>()); + this.queueAcls.get(queueName).put(accessType, acls); + return this; + } + + public Builder reservationAcls(String queueName, + ReservationACL reservationACL, AccessControlList acls) { + this.reservationAcls.putIfAbsent(queueName, new HashMap<>()); + this.reservationAcls.get(queueName).put(reservationACL, acls); + return this; + } + + public Builder reservableQueues(String queue) { + this.reservableQueues.add(queue); + return this; + } + + public Builder nonPreemptableQueues(String queue) { + this.nonPreemptableQueues.add(queue); + return this; + } + + public void configuredQueues(FSQueueType queueType, String queueName) { + this.configuredQueues.get(queueType).add(queueName); + } + + public boolean isAclDefinedForAccessType(String queueName, + AccessType accessType) { + Map aclsForQueue = + this.queueAcls.get(queueName); + return aclsForQueue != null && aclsForQueue.get(accessType) != null; + } + + public Map getMinQueueResources() { + return minQueueResources; + } + + public Map getMaxQueueResources() { + return maxQueueResources; + } + + public QueueProperties build() { + return new QueueProperties(this); + } + + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java index c46ecd9789..4a7461da79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestAllocationFileLoaderService.java @@ -27,12 +27,12 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.QueuePlacementRule.NestedUserQueue; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile.AllocationFileWriter; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FairSharePolicy; import org.apache.hadoop.yarn.util.ControlledClock; import org.apache.hadoop.yarn.util.resource.Resources; import org.junit.Test; - import java.io.File; import java.io.FileOutputStream; import java.io.FileWriter; @@ -43,7 +43,6 @@ import java.net.URL; import java.nio.charset.StandardCharsets; import java.util.List; - import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertFalse; import static org.junit.Assert.assertNull; @@ -51,7 +50,7 @@ import static org.junit.Assert.fail; public class TestAllocationFileLoaderService { - + final static String TEST_DIR = new File(System.getProperty("test.build.data", "/tmp")).getAbsolutePath(); @@ -112,7 +111,7 @@ public void testGetAllocationFileFromClasspath() { fail("Unable to access allocation file from classpath: " + e); } } - + @Test (timeout = 10000) public void testReload() throws Exception { PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); @@ -127,7 +126,7 @@ public void testReload() throws Exception { out.println(" "); out.println(""); out.close(); - + ControlledClock clock = new ControlledClock(); clock.setTime(0); Configuration conf = new Configuration(); @@ -141,7 +140,7 @@ public void testReload() throws Exception { allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; - + // Verify conf QueuePlacementPolicy policy = allocConf.getPlacementPolicy(); List rules = policy.getRules(); @@ -154,9 +153,9 @@ public void testReload() throws Exception { .contains("root.queueA")); assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF) .contains("root.queueB")); - + confHolder.allocConf = null; - + // Modify file and advance the clock out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); @@ -166,22 +165,22 @@ public void testReload() throws Exception { out.println(" "); out.println(" "); out.println(" "); - out.println(" "); + out.println(" "); out.println(" "); out.println(" "); out.println(" "); out.println(" "); out.println(""); out.close(); - + clock.tickMsec(System.currentTimeMillis() + AllocationFileLoaderService.ALLOC_RELOAD_WAIT_MS + 10000); allocLoader.start(); - + while (confHolder.allocConf == null) { Thread.sleep(20); } - + // Verify conf allocConf = confHolder.allocConf; policy = allocConf.getPlacementPolicy(); @@ -199,91 +198,89 @@ public void testReload() throws Exception { assertTrue(allocConf.getConfiguredQueues().get(FSQueueType.LEAF) .contains("root.queueB")); } - + @Test public void testAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); - PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); - out.println(""); - out.println(""); - // Give queue A a minimum of 1024 M - out.println(""); - out.println("1024mb,0vcores"); - out.println("2048mb,10vcores"); - out.println(""); - // Give queue B a minimum of 2048 M - out.println(""); - out.println("2048mb,0vcores"); - out.println("5120mb,110vcores"); - out.println("alice,bob admins"); - out.println("fair"); - out.println(""); - // Give queue C no minimum - out.println(""); - out.println("5120mb,0vcores"); - out.println("alice,bob admins"); - out.println(""); - // Give queue D a limit of 3 running apps and 0.4f maxAMShare - out.println(""); - out.println("3"); - out.println("0.4"); - out.println(""); - // Give queue E a preemption timeout of one minute - out.println(""); - out.println("60"); - out.println(""); - // Make queue F a parent queue without configured leaf queues using the - // 'type' attribute - out.println(""); - out.println("2048mb,64vcores"); - out.println(""); - // Create hierarchical queues G,H, with different min/fair share preemption - // timeouts and preemption thresholds. Also add a child default to make sure - // it doesn't impact queue H. - out.println(""); - out.println("2048mb,64vcores"); - out.println("120"); - out.println("50"); - out.println("0.6"); - out.println(" "); - out.println(" 180"); - out.println(" 40"); - out.println(" 0.7"); - out.println(" "); - out.println(""); - // Set default limit of apps per queue to 15 - out.println("15"); - // Set default limit of max resource per queue to 4G and 100 cores - out.println("4096mb,100vcores"); - // Set default limit of apps per user to 5 - out.println("5"); - // Set default limit of AMResourceShare to 0.5f - out.println("0.5f"); - // Give user1 a limit of 10 jobs - out.println(""); - out.println("10"); - out.println(""); - // Set default min share preemption timeout to 2 minutes - out.println("120" - + ""); - // Set default fair share preemption timeout to 5 minutes - out.println("300"); - // Set default fair share preemption threshold to 0.4 - out.println("0.4"); - // Set default scheduling policy to DRF - out.println("drf"); - out.println(""); - out.close(); - + AllocationFileWriter + .create() + // Give queue A a minimum of 1024 M + .queue("queueA") + .minResources("1024mb,0vcores") + .maxResources("2048mb,10vcores") + .buildQueue() + // Give queue B a minimum of 2048 M + .queue("queueB") + .minResources("2048mb,0vcores") + .maxResources("5120mb,110vcores") + .aclAdministerApps("alice,bob admins") + .schedulingPolicy("fair") + .buildQueue() + // Give queue C no minimum + .queue("queueC") + .minResources("5120mb,0vcores") + .aclSubmitApps("alice,bob admins") + .buildQueue() + // Give queue D a limit of 3 running apps and 0.4f maxAMShare + .queue("queueD") + .maxRunningApps(3) + .maxAMShare(0.4) + .buildQueue() + // Give queue E a preemption timeout of one minute + .queue("queueE") + .minSharePreemptionTimeout(60) + .buildQueue() + // Make queue F a parent queue without configured leaf queues + // using the 'type' attribute + .queue("queueF") + .parent(true) + .maxChildResources("2048mb,64vcores") + .buildQueue() + .queue("queueG") + .maxChildResources("2048mb,64vcores") + .fairSharePreemptionTimeout(120) + .minSharePreemptionTimeout(50) + .fairSharePreemptionThreshold(0.6) + // Create hierarchical queues G,H, with different min/fair + // share preemption timeouts and preemption thresholds. + // Also add a child default to make sure it doesn't impact queue H. + .subQueue("queueH") + .fairSharePreemptionTimeout(180) + .minSharePreemptionTimeout(40) + .fairSharePreemptionThreshold(0.7) + .buildSubQueue() + .buildQueue() + // Set default limit of apps per queue to 15 + .queueMaxAppsDefault(15) + // Set default limit of max resource per queue to 4G and 100 cores + .queueMaxResourcesDefault("4096mb,100vcores") + // Set default limit of apps per user to 5 + .userMaxAppsDefault(5) + // Set default limit of AMResourceShare to 0.5f + .queueMaxAMShareDefault(0.5) + // Set default min share preemption timeout to 2 minutes + .defaultMinSharePreemptionTimeout(120) + // Set default fair share preemption timeout to 5 minutes + .defaultFairSharePreemptionTimeout(300) + // Set default fair share preemption threshold to 0.4 + .defaultFairSharePreemptionThreshold(0.4) + // Set default scheduling policy to DRF + .defaultQueueSchedulingPolicy("drf") + // Give user1 a limit of 10 jobs + .userSettings("user1") + .maxRunningApps(10) + .build() + .writeToFile(ALLOC_FILE); + allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration queueConf = confHolder.allocConf; - + assertEquals(6, queueConf.getConfiguredQueues().get(FSQueueType.LEAF).size()); assertEquals(Resources.createResource(0), queueConf.getMinResources("root." + YarnConfiguration.DEFAULT_QUEUE_NAME)); @@ -443,7 +440,7 @@ public void testAllocationFileParsing() throws Exception { assertEquals(DominantResourceFairnessPolicy.NAME, queueConf.getSchedulingPolicy("root.newqueue").getName()); } - + @Test public void testBackwardsCompatibleAllocationFileParsing() throws Exception { Configuration conf = new Configuration(); @@ -492,7 +489,7 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { out.println("0.6"); out.println(""); out.close(); - + allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); allocLoader.setReloadListener(confHolder); @@ -571,27 +568,27 @@ public void testBackwardsCompatibleAllocationFileParsing() throws Exception { assertEquals(.3f, queueConf.getFairSharePreemptionThreshold("root.queueE"), 0.01); } - + @Test public void testSimplePlacementPolicyFromConf() throws Exception { Configuration conf = new Configuration(); conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); conf.setBoolean(FairSchedulerConfiguration.ALLOW_UNDECLARED_POOLS, false); conf.setBoolean(FairSchedulerConfiguration.USER_AS_DEFAULT_QUEUE, false); - + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); out.println(""); out.println(""); out.println(""); out.close(); - + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); allocLoader.setReloadListener(confHolder); allocLoader.reloadAllocations(); AllocationConfiguration allocConf = confHolder.allocConf; - + QueuePlacementPolicy placementPolicy = allocConf.getPlacementPolicy(); List rules = placementPolicy.getRules(); assertEquals(2, rules.size()); @@ -599,7 +596,7 @@ public void testSimplePlacementPolicyFromConf() throws Exception { assertEquals(false, rules.get(0).create); assertEquals(QueuePlacementRule.Default.class, rules.get(1).getClass()); } - + /** * Verify that you can't place queues at the same level as the root queue in * the allocations file. @@ -618,7 +615,7 @@ public void testQueueAlongsideRoot() throws Exception { out.println(""); out.println(""); out.close(); - + AllocationFileLoaderService allocLoader = new AllocationFileLoaderService(); allocLoader.init(conf); ReloadListener confHolder = new ReloadListener(); @@ -890,7 +887,7 @@ public void testReservableCannotBeCombinedWithDynamicUserQueue() private class ReloadListener implements AllocationFileLoaderService.Listener { public AllocationConfiguration allocConf; - + @Override public void onReload(AllocationConfiguration info) { allocConf = info; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java new file mode 100644 index 0000000000..f1afe6979f --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueue.java @@ -0,0 +1,82 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import java.io.PrintWriter; +import java.io.StringWriter; +import java.util.List; + +class AllocationFileQueue { + private final AllocationFileQueueProperties properties; + private final List subQueues; + + AllocationFileQueue(AllocationFileQueueProperties properties, + List subQueues) { + this.properties = properties; + this.subQueues = subQueues; + } + + String render() { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + printStartTag(pw); + AllocationFileWriter.printQueues(pw, subQueues); + AllocationFileWriter.addIfPresent(pw, "minResources", + properties::getMinResources); + AllocationFileWriter.addIfPresent(pw, "maxResources", + properties::getMaxResources); + AllocationFileWriter.addIfPresent(pw, "aclAdministerApps", + properties::getAclAdministerApps); + AllocationFileWriter.addIfPresent(pw, "aclSubmitApps", + properties::getAclSubmitApps); + AllocationFileWriter.addIfPresent(pw, "schedulingPolicy", + properties::getSchedulingPolicy); + AllocationFileWriter.addIfPresent(pw, "maxRunningApps", + () -> AllocationFileWriter + .createNumberSupplier(properties.getMaxRunningApps())); + AllocationFileWriter.addIfPresent(pw, "maxAMShare", + () -> AllocationFileWriter.createNumberSupplier(properties + .getMaxAMShare())); + AllocationFileWriter.addIfPresent(pw, "minSharePreemptionTimeout", + () -> AllocationFileWriter + .createNumberSupplier(properties.getMinSharePreemptionTimeout())); + AllocationFileWriter.addIfPresent(pw, "maxChildResources", + properties::getMaxChildResources); + AllocationFileWriter.addIfPresent(pw, "fairSharePreemptionTimeout", + () -> AllocationFileWriter + .createNumberSupplier(properties.getFairSharePreemptionTimeout())); + AllocationFileWriter.addIfPresent(pw, "fairSharePreemptionThreshold", + () -> AllocationFileWriter + .createNumberSupplier( + properties.getFairSharePreemptionThreshold())); + printEndTag(pw); + pw.close(); + return sw.toString(); + } + + private void printStartTag(PrintWriter pw) { + pw.print(""); + } + + private void printEndTag(PrintWriter pw) { + pw.println(""); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java new file mode 100644 index 0000000000..a2faf1da31 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueBuilder.java @@ -0,0 +1,115 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +/** + * Abstract base class for building simple queues and subqueues for testcases. + * Currently there are two concrete types subclassed from this class: + * {@link AllocationFileSimpleQueueBuilder} and + * {@link AllocationFileSubQueueBuilder}. + * The intention of having this class to group the common properties of + * simple queues and subqueues by methods delegating calls to a + * queuePropertiesBuilder instance. + */ +public abstract class AllocationFileQueueBuilder { + final AllocationFileQueueProperties.Builder queuePropertiesBuilder; + + AllocationFileQueueBuilder() { + this.queuePropertiesBuilder = + AllocationFileQueueProperties.Builder.create(); + } + + public AllocationFileQueueBuilder parent(boolean parent) { + this.queuePropertiesBuilder.parent(parent); + return this; + } + + public AllocationFileQueueBuilder minResources(String value) { + this.queuePropertiesBuilder.minResources(value); + return this; + } + + public AllocationFileQueueBuilder maxResources(String value) { + this.queuePropertiesBuilder.maxResources(value); + return this; + } + + public AllocationFileQueueBuilder aclAdministerApps(String value) { + this.queuePropertiesBuilder.aclAdministerApps(value); + return this; + } + + public AllocationFileQueueBuilder aclSubmitApps(String value) { + this.queuePropertiesBuilder.aclSubmitApps(value); + return this; + } + + public AllocationFileQueueBuilder schedulingPolicy(String value) { + this.queuePropertiesBuilder.schedulingPolicy(value); + return this; + } + + public AllocationFileQueueBuilder maxRunningApps(int value) { + this.queuePropertiesBuilder.maxRunningApps(value); + return this; + } + + public AllocationFileQueueBuilder maxAMShare(double value) { + this.queuePropertiesBuilder.maxAMShare(value); + return this; + } + + public AllocationFileQueueBuilder minSharePreemptionTimeout(int value) { + this.queuePropertiesBuilder.minSharePreemptionTimeout(value); + return this; + } + + public AllocationFileQueueBuilder maxChildResources(String value) { + this.queuePropertiesBuilder.maxChildResources(value); + return this; + } + + public AllocationFileQueueBuilder fairSharePreemptionTimeout(Integer value) { + this.queuePropertiesBuilder.fairSharePreemptionTimeout(value); + return this; + } + + public AllocationFileQueueBuilder fairSharePreemptionThreshold( + double value) { + this.queuePropertiesBuilder.fairSharePreemptionThreshold(value); + return this; + } + + public AllocationFileQueueBuilder subQueue(String queueName) { + if (this instanceof AllocationFileSimpleQueueBuilder) { + return new AllocationFileSubQueueBuilder( + (AllocationFileSimpleQueueBuilder) this, queueName); + } else { + throw new IllegalStateException( + "subQueue can only be invoked on instances of " + + AllocationFileSimpleQueueBuilder.class); + } + } + + public abstract AllocationFileWriter buildQueue(); + + public abstract AllocationFileSimpleQueueBuilder buildSubQueue(); + + AllocationFileQueueProperties.Builder getqueuePropertiesBuilder() { + return queuePropertiesBuilder; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java new file mode 100644 index 0000000000..2c01144a15 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileQueueProperties.java @@ -0,0 +1,202 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +/** + * The purpose of this class is to store all properties of a queue. + */ +public class AllocationFileQueueProperties { + private final String queueName; + private final String minResources; + private final String maxResources; + private final String aclAdministerApps; + private final String aclSubmitApps; + private final String schedulingPolicy; + private final Integer maxRunningApps; + private final Double maxAMShare; + private final Integer minSharePreemptionTimeout; + private final Boolean parent; + private final String maxChildResources; + private final Integer fairSharePreemptionTimeout; + private final Double fairSharePreemptionThreshold; + + AllocationFileQueueProperties(Builder builder) { + this.queueName = builder.queueName; + this.parent = builder.parent; + this.minResources = builder.minResources; + this.maxResources = builder.maxResources; + this.aclAdministerApps = builder.aclAdministerApps; + this.aclSubmitApps = builder.aclSubmitApps; + this.schedulingPolicy = builder.schedulingPolicy; + this.maxRunningApps = builder.maxRunningApps; + this.maxAMShare = builder.maxAMShare; + this.minSharePreemptionTimeout = builder.minSharePreemptionTimeout; + this.maxChildResources = builder.maxChildResources; + this.fairSharePreemptionTimeout = builder.fairSharePreemptionTimeout; + this.fairSharePreemptionThreshold = builder.fairSharePreemptionThreshold; + } + + public String getQueueName() { + return queueName; + } + + public String getMinResources() { + return minResources; + } + + public String getMaxResources() { + return maxResources; + } + + public String getAclAdministerApps() { + return aclAdministerApps; + } + + public String getAclSubmitApps() { + return aclSubmitApps; + } + + public String getSchedulingPolicy() { + return schedulingPolicy; + } + + public Integer getMaxRunningApps() { + return maxRunningApps; + } + + public Double getMaxAMShare() { + return maxAMShare; + } + + public Integer getMinSharePreemptionTimeout() { + return minSharePreemptionTimeout; + } + + public Boolean getParent() { + return parent; + } + + public String getMaxChildResources() { + return maxChildResources; + } + + public Integer getFairSharePreemptionTimeout() { + return fairSharePreemptionTimeout; + } + + public Double getFairSharePreemptionThreshold() { + return fairSharePreemptionThreshold; + } + + /** + * Builder class for {@link AllocationFileQueueProperties}. + */ + public static final class Builder { + private String queueName; + private Boolean parent = false; + private String minResources; + private String maxResources; + private String aclAdministerApps; + private String aclSubmitApps; + private String schedulingPolicy; + private Integer maxRunningApps; + private Double maxAMShare; + private Integer minSharePreemptionTimeout; + private String maxChildResources; + private Integer fairSharePreemptionTimeout; + private Double fairSharePreemptionThreshold; + + Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder queueName(String queueName) { + this.queueName = queueName; + return this; + } + + public Builder minResources(String minResources) { + this.minResources = minResources; + return this; + } + + public Builder maxResources(String maxResources) { + this.maxResources = maxResources; + return this; + } + + public Builder aclAdministerApps(String aclAdministerApps) { + this.aclAdministerApps = aclAdministerApps; + return this; + } + + public Builder aclSubmitApps(String aclSubmitApps) { + this.aclSubmitApps = aclSubmitApps; + return this; + } + + public Builder schedulingPolicy(String schedulingPolicy) { + this.schedulingPolicy = schedulingPolicy; + return this; + } + + public Builder maxRunningApps(Integer maxRunningApps) { + this.maxRunningApps = maxRunningApps; + return this; + } + + public Builder maxAMShare(Double maxAMShare) { + this.maxAMShare = maxAMShare; + return this; + } + + public Builder minSharePreemptionTimeout( + Integer minSharePreemptionTimeout) { + this.minSharePreemptionTimeout = minSharePreemptionTimeout; + return this; + } + + public Builder parent(Boolean parent) { + this.parent = parent; + return this; + } + + public Builder maxChildResources(String maxChildResources) { + this.maxChildResources = maxChildResources; + return this; + } + + public Builder fairSharePreemptionTimeout( + Integer fairSharePreemptionTimeout) { + this.fairSharePreemptionTimeout = fairSharePreemptionTimeout; + return this; + } + + public Builder fairSharePreemptionThreshold( + Double fairSharePreemptionThreshold) { + this.fairSharePreemptionThreshold = fairSharePreemptionThreshold; + return this; + } + + public AllocationFileQueueProperties build() { + return new AllocationFileQueueProperties(this); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSimpleQueueBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSimpleQueueBuilder.java new file mode 100644 index 0000000000..93d100ed35 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSimpleQueueBuilder.java @@ -0,0 +1,64 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import java.util.ArrayList; +import java.util.List; + +/** + * Queue builder that can build a simple queue with its properties. + * Subqueues can be added with {@link #addSubQueue(AllocationFileQueue)}. + */ +public class AllocationFileSimpleQueueBuilder + extends AllocationFileQueueBuilder { + private final AllocationFileWriter allocationFileWriter; + private final List subQueues = new ArrayList<>(); + + AllocationFileSimpleQueueBuilder(AllocationFileWriter allocationFileWriter, + String queueName) { + this.allocationFileWriter = allocationFileWriter; + getqueuePropertiesBuilder().queueName(queueName); + } + + void addSubQueue(AllocationFileQueue queue) { + subQueues.add(queue); + } + + @Override + public AllocationFileWriter buildQueue() { + AllocationFileQueueProperties queueProperties = + getqueuePropertiesBuilder().build(); + AllocationFileQueue queue = + new AllocationFileQueue(queueProperties, subQueues); + + if (allocationFileWriter != null) { + allocationFileWriter.addQueue(queue); + } else { + throw new IllegalStateException( + "allocationFileWriter field has to be set on a " + getClass()); + } + + return allocationFileWriter; + } + + @Override + public AllocationFileSimpleQueueBuilder buildSubQueue() { + throw new IllegalStateException( + "buildSubQueue is not supported in " + getClass()); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSubQueueBuilder.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSubQueueBuilder.java new file mode 100644 index 0000000000..728aedcbc8 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileSubQueueBuilder.java @@ -0,0 +1,54 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import com.google.common.collect.Lists; + + +/** + * Queue builder that can build a subqueue with its properties. + */ +public class AllocationFileSubQueueBuilder extends AllocationFileQueueBuilder { + private AllocationFileSimpleQueueBuilder parentQueueBuilder; + + AllocationFileSubQueueBuilder( + AllocationFileSimpleQueueBuilder parentQueueBuilder, String queueName) { + getqueuePropertiesBuilder().queueName(queueName); + this.parentQueueBuilder = parentQueueBuilder; + } + + @Override + public AllocationFileWriter buildQueue() { + throw new IllegalStateException( + "BuildQueue is not supported in " + getClass()); + } + + public AllocationFileSimpleQueueBuilder buildSubQueue() { + AllocationFileQueueProperties queueProperties = + getqueuePropertiesBuilder().build(); + AllocationFileQueue queue = + new AllocationFileQueue(queueProperties, Lists.newArrayList()); + + if (parentQueueBuilder != null) { + parentQueueBuilder.addSubQueue(queue); + return parentQueueBuilder; + } else { + throw new IllegalStateException( + "parentQueueBuilder field has to be set on a " + getClass()); + } + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileWriter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileWriter.java new file mode 100644 index 0000000000..df1cc53d84 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/AllocationFileWriter.java @@ -0,0 +1,175 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import java.io.FileWriter; +import java.io.IOException; +import java.io.PrintWriter; +import java.util.ArrayList; +import java.util.List; +import java.util.function.Supplier; + +/** + * This class is capable of serializing allocation file data to a file + * in XML format. + * See {@link #writeToFile(String)} method for the implementation. + */ +public final class AllocationFileWriter { + private Integer queueMaxAppsDefault; + private String queueMaxResourcesDefault; + private Integer userMaxAppsDefault; + private Double queueMaxAMShareDefault; + private Integer defaultMinSharePreemptionTimeout; + private Integer defaultFairSharePreemptionTimeout; + private Double defaultFairSharePreemptionThreshold; + private String defaultQueueSchedulingPolicy; + private List queues = new ArrayList<>(); + private UserSettings userSettings; + + private AllocationFileWriter() { + } + + public static AllocationFileWriter create() { + return new AllocationFileWriter(); + } + + public AllocationFileSimpleQueueBuilder queue(String queueName) { + return new AllocationFileSimpleQueueBuilder(this, queueName); + } + + public AllocationFileWriter queueMaxAppsDefault(int value) { + this.queueMaxAppsDefault = value; + return this; + } + + public AllocationFileWriter queueMaxResourcesDefault(String value) { + this.queueMaxResourcesDefault = value; + return this; + } + + public AllocationFileWriter userMaxAppsDefault(int value) { + this.userMaxAppsDefault = value; + return this; + } + + public AllocationFileWriter queueMaxAMShareDefault(double value) { + this.queueMaxAMShareDefault = value; + return this; + } + + public AllocationFileWriter defaultMinSharePreemptionTimeout(int value) { + this.defaultMinSharePreemptionTimeout = value; + return this; + } + + public AllocationFileWriter defaultFairSharePreemptionTimeout(int value) { + this.defaultFairSharePreemptionTimeout = value; + return this; + } + + public AllocationFileWriter defaultFairSharePreemptionThreshold( + double value) { + this.defaultFairSharePreemptionThreshold = value; + return this; + } + + public AllocationFileWriter defaultQueueSchedulingPolicy(String value) { + this.defaultQueueSchedulingPolicy = value; + return this; + } + + public UserSettings.Builder userSettings(String username) { + return new UserSettings.Builder(this, username); + } + + void addQueue(AllocationFileQueue queue) { + this.queues.add(queue); + } + + void setUserSettings(UserSettings userSettings) { + this.userSettings = userSettings; + } + + static void printQueues(PrintWriter pw, List queues) { + for (AllocationFileQueue queue : queues) { + pw.println(queue.render()); + } + } + + private void printUserSettings(PrintWriter pw) { + pw.println(userSettings.render()); + } + + static void addIfPresent(PrintWriter pw, String tag, + Supplier supplier) { + if (supplier.get() != null) { + pw.println("<" + tag + ">" + supplier.get() + ""); + } + } + + static String createNumberSupplier(Object number) { + if (number != null) { + return number.toString(); + } + return null; + } + + private void writeHeader(PrintWriter pw) { + pw.println(""); + pw.println(""); + } + + private void writeFooter(PrintWriter pw) { + pw.println(""); + } + + public void writeToFile(String filename) { + PrintWriter pw; + try { + pw = new PrintWriter(new FileWriter(filename)); + } catch (IOException e) { + throw new RuntimeException(e); + } + writeHeader(pw); + if (!queues.isEmpty()) { + printQueues(pw, queues); + } + if (userSettings != null) { + printUserSettings(pw); + } + + addIfPresent(pw, "queueMaxAppsDefault", + () -> createNumberSupplier(queueMaxAppsDefault)); + addIfPresent(pw, "queueMaxResourcesDefault", + () -> queueMaxResourcesDefault); + addIfPresent(pw, "userMaxAppsDefault", + () -> createNumberSupplier(userMaxAppsDefault)); + addIfPresent(pw, "queueMaxAMShareDefault", + () -> createNumberSupplier(queueMaxAMShareDefault)); + addIfPresent(pw, "defaultMinSharePreemptionTimeout", + () -> createNumberSupplier(defaultMinSharePreemptionTimeout)); + addIfPresent(pw, "defaultFairSharePreemptionTimeout", + () -> createNumberSupplier(defaultFairSharePreemptionTimeout)); + addIfPresent(pw, "defaultFairSharePreemptionThreshold", + () -> createNumberSupplier(defaultFairSharePreemptionThreshold)); + addIfPresent(pw, "defaultQueueSchedulingPolicy", + () -> defaultQueueSchedulingPolicy); + writeFooter(pw); + pw.close(); + } + +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/UserSettings.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/UserSettings.java new file mode 100644 index 0000000000..7a5656e5f3 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/allocationfile/UserSettings.java @@ -0,0 +1,80 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.allocationfile; + +import java.io.PrintWriter; +import java.io.StringWriter; + +/** + * Value class that stores user settings and can render data in XML format, + * see {@link #render()}. + */ +class UserSettings { + private final String username; + private final Integer maxRunningApps; + + UserSettings(Builder builder) { + this.username = builder.username; + this.maxRunningApps = builder.maxRunningApps; + } + + public String render() { + StringWriter sw = new StringWriter(); + PrintWriter pw = new PrintWriter(sw); + addStartTag(pw); + AllocationFileWriter.addIfPresent(pw, "maxRunningApps", + () -> AllocationFileWriter.createNumberSupplier(maxRunningApps)); + addEndTag(pw); + pw.close(); + + return sw.toString(); + } + + private void addStartTag(PrintWriter pw) { + pw.println(""); + } + + private void addEndTag(PrintWriter pw) { + pw.println(""); + } + + /** + * Builder class for {@link UserSettings} + */ + public static class Builder { + private final AllocationFileWriter allocationFileWriter; + private final String username; + private Integer maxRunningApps; + + Builder(AllocationFileWriter allocationFileWriter, String username) { + this.allocationFileWriter = allocationFileWriter; + this.username = username; + } + + public Builder maxRunningApps(int value) { + this.maxRunningApps = value; + return this; + } + + public AllocationFileWriter build() { + UserSettings userSettings = new UserSettings(this); + allocationFileWriter.setUserSettings(userSettings); + + return allocationFileWriter; + } + } +}