diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java index d19ced9369..cf7b04ab61 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/util/StringUtils.java @@ -424,6 +424,32 @@ public static Collection getStringCollection(String str, String delim) { return values; } + /** + * Returns a collection of strings, trimming leading and trailing whitespace + * on each value. Duplicates are not removed. + * + * @param str + * String separated by delim. + * @param delim + * Delimiter to separate the values in str. + * @return Collection of string values. + */ + public static Collection getTrimmedStringCollection(String str, + String delim) { + List values = new ArrayList(); + if (str == null) + return values; + StringTokenizer tokenizer = new StringTokenizer(str, delim); + while (tokenizer.hasMoreTokens()) { + String next = tokenizer.nextToken(); + if (next == null || next.trim().isEmpty()) { + continue; + } + values.add(next.trim()); + } + return values; + } + /** * Splits a comma separated value String, trimming leading and * trailing whitespace on each value. Duplicate and empty values are removed. diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java index c6cf117e86..43d3abe4f8 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/test/java/org/apache/hadoop/mapreduce/v2/TestMRJobs.java @@ -91,10 +91,14 @@ import org.apache.hadoop.util.Shell; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; +import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler; -import org.apache.hadoop.yarn.util.ConverterUtils; import org.apache.log4j.Level; import org.junit.After; import org.junit.AfterClass; @@ -452,6 +456,53 @@ public void testJobWithChangePriority() throws Exception { Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); } + @Test(timeout = 300000) + public void testJobWithWorkflowPriority() throws Exception { + Configuration sleepConf = new Configuration(mrCluster.getConfig()); + if (!(new File(MiniMRYarnCluster.APPJAR)).exists()) { + LOG.info("MRAppJar " + MiniMRYarnCluster.APPJAR + + " not found. Not running test."); + return; + } + CapacityScheduler scheduler = (CapacityScheduler) mrCluster + .getResourceManager().getResourceScheduler(); + CapacitySchedulerConfiguration csConf = scheduler.getConfiguration(); + csConf.set(CapacitySchedulerConfiguration.WORKFLOW_PRIORITY_MAPPINGS, + WorkflowPriorityMappingsManager.getWorkflowPriorityMappingStr( + Arrays.asList(new WorkflowPriorityMapping( + "wf1", "root.default", Priority.newInstance(1))))); + csConf.setBoolean(CapacitySchedulerConfiguration. + ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true); + scheduler.reinitialize(csConf, scheduler.getRMContext()); + + // set master address to local to test that local mode applied if framework + // equals local + sleepConf.set(MRConfig.MASTER_ADDRESS, "local"); + sleepConf + .setInt("yarn.app.mapreduce.am.scheduler.heartbeat.interval-ms", 5); + sleepConf.set(MRJobConfig.JOB_TAGS, + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + "wf1"); + + SleepJob sleepJob = new SleepJob(); + sleepJob.setConf(sleepConf); + Job job = sleepJob.createJob(1, 1, 1000, 20, 50, 1); + + job.addFileToClassPath(APP_JAR); // The AppMaster jar itself. + job.setJarByClass(SleepJob.class); + job.setMaxMapAttempts(1); // speed up failures + // VERY_HIGH priority should get overwritten by workflow priority mapping + job.setPriority(JobPriority.VERY_HIGH); + job.submit(); + + waitForPriorityToUpdate(job, JobPriority.VERY_LOW); + // Verify the priority from job itself + Assert.assertEquals(JobPriority.VERY_LOW, job.getPriority()); + + boolean succeeded = job.waitForCompletion(true); + Assert.assertTrue(succeeded); + Assert.assertEquals(JobStatus.State.SUCCEEDED, job.getJobState()); + } + private void waitForPriorityToUpdate(Job job, JobPriority expectedStatus) throws IOException, InterruptedException { // Max wait time to get the priority update can be kept as 20sec (100 * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java index 83871a5bf5..49d2ff0580 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/conf/YarnConfiguration.java @@ -4137,6 +4137,13 @@ public static boolean areNodeLabelsEnabled( public static final String NM_CONTAINERS_LAUNCHER_CLASS = NM_PREFIX + "containers-launcher.class"; + // Configuration for the prefix of the tag which contains workflow ID, + // followed by the prefix. + public static final String YARN_WORKFLOW_ID_TAG_PREFIX = + YARN_PREFIX + "workflow-id.tag-prefix"; + public static final String DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX = + "workflowid:"; + public YarnConfiguration() { super(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml index 4393792d03..62f237f99a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/resources/yarn-default.xml @@ -4310,4 +4310,15 @@ yarn.node-labels.exclusive-enforced-partitions + + + + Prefix used to identify the YARN tag which contains workflow ID. If a tag coming in application + submission context has this prefix, whatever follows the prefix will be considered as workflow ID + associated with the application. This configuration is used by features such as workflow priority + for identifying the workflow associated with an application. + + yarn.workflow-id.tag-prefix + workflowid: + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml index 38526d18db..e8712525ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/conf/capacity-scheduler.xml @@ -217,4 +217,28 @@ + + yarn.scheduler.capacity.workflow-priority-mappings + + + A list of mappings that will be used to override application priority. + The syntax for this list is + [workflowId]:[full_queue_name]:[priority][,next mapping]* + where an application submitted (or mapped to) queue "full_queue_name" + and workflowId "workflowId" (as specified in application submission + context) will be given priority "priority". + + + + + yarn.scheduler.capacity.workflow-priority-mappings-override.enable + false + + If a priority mapping is present, will it override the value specified + by the user? This can be used by administrators to give applications a + priority that is different than the one specified by the user. + The default is false. + + + diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fc020f73f0..2121a1f322 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -183,6 +183,8 @@ public class CapacityScheduler extends private CapacitySchedulerQueueManager queueManager; + private WorkflowPriorityMappingsManager workflowPriorityMappingsMgr; + // timeout to join when we stop this service protected final long THREAD_JOIN_TIMEOUT_MS = 1000; @@ -364,6 +366,8 @@ void initScheduler(Configuration configuration) throws this.labelManager, this.appPriorityACLManager); this.queueManager.setCapacitySchedulerContext(this); + this.workflowPriorityMappingsMgr = new WorkflowPriorityMappingsManager(); + this.activitiesManager = new ActivitiesManager(rmContext); activitiesManager.init(conf); initializeQueues(this.conf); @@ -770,6 +774,8 @@ private void initializeQueues(CapacitySchedulerConfiguration conf) updatePlacementRules(); + this.workflowPriorityMappingsMgr.initialize(this); + // Notify Preemption Manager preemptionManager.refreshQueues(null, this.getRootQueue()); } @@ -780,6 +786,8 @@ private void reinitializeQueues(CapacitySchedulerConfiguration newConf) this.queueManager.reinitializeQueues(newConf); updatePlacementRules(); + this.workflowPriorityMappingsMgr.initialize(this); + // Notify Preemption Manager preemptionManager.refreshQueues(null, this.getRootQueue()); } @@ -987,6 +995,17 @@ private void addApplication(ApplicationId applicationId, String queueName, } } + try { + priority = workflowPriorityMappingsMgr.mapWorkflowPriorityForApp( + applicationId, queue, user, priority); + } catch (YarnException e) { + String message = "Failed to submit application " + applicationId + + " submitted by user " + user + " reason: " + e.getMessage(); + this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent( + applicationId, RMAppEventType.APP_REJECTED, message)); + return; + } + // Submit to the queue try { queue.submitApplication(applicationId, user, queueName); @@ -3045,6 +3064,10 @@ public CapacitySchedulerQueueManager getCapacitySchedulerQueueManager() { return this.queueManager; } + public WorkflowPriorityMappingsManager getWorkflowPriorityMappingsManager() { + return this.workflowPriorityMappingsMgr; + } + /** * Try to move a reserved container to a targetNode. * If the targetNode is reserved by another application (other than this one). diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java index 15b1ef9312..a72e8070f5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfiguration.java @@ -44,6 +44,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.reservation.ReservationSchedulerConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.AppPriorityACLConfigurationParser.AppPriorityACLKeyType; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.PriorityUtilizationQueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.policy.QueueOrderingPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.MultiNodeLookupPolicy; @@ -71,7 +72,6 @@ import java.util.regex.Matcher; import java.util.regex.Pattern; import java.util.Set; -import java.util.StringTokenizer; public class CapacitySchedulerConfiguration extends ReservationSchedulerConfiguration { @@ -280,6 +280,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur @Private public static final boolean DEFAULT_ENABLE_QUEUE_MAPPING_OVERRIDE = false; + @Private + public static final String WORKFLOW_PRIORITY_MAPPINGS = + PREFIX + "workflow-priority-mappings"; + + @Private + public static final String ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE = + WORKFLOW_PRIORITY_MAPPINGS + "-override.enable"; + + @Private + public static final boolean DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE = false; + @Private public static final String QUEUE_PREEMPTION_DISABLED = "disable_preemption"; @@ -1022,7 +1033,7 @@ public List getQueueMappingEntity( getTrimmedStringCollection(queueMappingName); for (String mappingValue : mappingsString) { String[] mapping = - getTrimmedStringCollection(mappingValue, ":") + StringUtils.getTrimmedStringCollection(mappingValue, ":") .toArray(new String[] {}); if (mapping.length != 2 || mapping[1].length() == 0) { throw new IllegalArgumentException( @@ -1060,30 +1071,13 @@ public void setQueueMappingEntities(List queueMappings, setStrings(mappingRuleProp, StringUtils.join(",", queueMappingStrs)); } - /** - * Returns a collection of strings, trimming leading and trailing whitespeace - * on each value - * - * @param str - * String to parse - * @param delim - * delimiter to separate the values - * @return Collection of parsed elements. - */ - private static Collection getTrimmedStringCollection(String str, - String delim) { - List values = new ArrayList(); - if (str == null) - return values; - StringTokenizer tokenizer = new StringTokenizer(str, delim); - while (tokenizer.hasMoreTokens()) { - String next = tokenizer.nextToken(); - if (next == null || next.trim().isEmpty()) { - continue; - } - values.add(next.trim()); - } - return values; + public boolean getOverrideWithWorkflowPriorityMappings() { + return getBoolean(ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, + DEFAULT_ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE); + } + + public Collection getWorkflowPriorityMappings() { + return getTrimmedStringCollection(WORKFLOW_PRIORITY_MAPPINGS); } /** @@ -1098,7 +1092,7 @@ public List getQueueMappings() { getTrimmedStringCollection(QUEUE_MAPPING); for (String mappingValue : mappingsString) { String[] mapping = - getTrimmedStringCollection(mappingValue, ":") + StringUtils.getTrimmedStringCollection(mappingValue, ":") .toArray(new String[] {}); if (mapping.length != 3 || mapping[1].length() == 0 || mapping[2].length() == 0) { @@ -1159,6 +1153,14 @@ public void setQueueMappings(List queueMappings) { setStrings(QUEUE_MAPPING, StringUtils.join(",", queueMappingStrs)); } + @Private + @VisibleForTesting + void setWorkflowPriorityMappings( + List workflowPriorityMappings) { + setStrings(WORKFLOW_PRIORITY_MAPPINGS, WorkflowPriorityMappingsManager + .getWorkflowPriorityMappingStr(workflowPriorityMappings)); + } + public boolean isReservable(String queue) { boolean isReservable = getBoolean(getQueuePrefix(queue) + IS_RESERVABLE, false); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WorkflowPriorityMappingsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WorkflowPriorityMappingsManager.java new file mode 100644 index 0000000000..6b6d4faee1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/WorkflowPriorityMappingsManager.java @@ -0,0 +1,230 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.security.UserGroupInformation; +import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppImpl; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.common.annotations.VisibleForTesting; + +@Private +@VisibleForTesting +public class WorkflowPriorityMappingsManager { + + private static final Logger LOG = + LoggerFactory.getLogger(WorkflowPriorityMappingsManager.class); + + private static final String WORKFLOW_PART_SEPARATOR = ":"; + + private static final String WORKFLOW_SEPARATOR = ","; + + private CapacityScheduler scheduler; + + private CapacitySchedulerConfiguration conf; + + private boolean overrideWithPriorityMappings = false; + // Map of queue to a map of workflow ID to priority + private Map> priorityMappings = + new HashMap>(); + + public static class WorkflowPriorityMapping { + String workflowID; + String queue; + Priority priority; + + public WorkflowPriorityMapping(String workflowID, String queue, + Priority priority) { + this.workflowID = workflowID; + this.queue = queue; + this.priority = priority; + } + + public Priority getPriority() { + return this.priority; + } + + @Override + public int hashCode() { + return super.hashCode(); + } + + @Override + public boolean equals(Object obj) { + if (obj instanceof WorkflowPriorityMapping) { + WorkflowPriorityMapping other = (WorkflowPriorityMapping) obj; + return (other.workflowID.equals(workflowID) && + other.queue.equals(queue) && + other.priority.equals(priority)); + } else { + return false; + } + } + + public String toString() { + return workflowID + WORKFLOW_PART_SEPARATOR + queue + + WORKFLOW_PART_SEPARATOR + priority.getPriority(); + } + } + + @VisibleForTesting + public void initialize(CapacityScheduler scheduler) throws IOException { + this.scheduler = scheduler; + this.conf = scheduler.getConfiguration(); + boolean overrideWithWorkflowPriorityMappings = + conf.getOverrideWithWorkflowPriorityMappings(); + LOG.info("Initialized workflow priority mappings, override: " + + overrideWithWorkflowPriorityMappings); + this.overrideWithPriorityMappings = overrideWithWorkflowPriorityMappings; + this.priorityMappings = getWorkflowPriorityMappings(); + } + + /** + * Get workflow ID to priority mappings for a queue. + * + * @return workflowID to priority mappings for a queue + */ + public Map> + getWorkflowPriorityMappings() { + Map> mappings = + new HashMap>(); + + Collection workflowMappings = conf.getWorkflowPriorityMappings(); + for (String workflowMapping : workflowMappings) { + WorkflowPriorityMapping mapping = + getWorkflowMappingFromString(workflowMapping); + if (mapping != null) { + if (!mappings.containsKey(mapping.queue)) { + mappings.put(mapping.queue, + new HashMap()); + } + mappings.get(mapping.queue).put(mapping.workflowID, mapping); + } + } + return mappings; + } + + private WorkflowPriorityMapping getWorkflowMappingFromString( + String mappingString) { + if (mappingString == null) { + return null; + } + String[] mappingArray = StringUtils + .getTrimmedStringCollection(mappingString, WORKFLOW_PART_SEPARATOR) + .toArray(new String[] {}); + if (mappingArray.length != 3 || mappingArray[0].length() == 0 + || mappingArray[1].length() == 0 || mappingArray[2].length() == 0) { + throw new IllegalArgumentException( + "Illegal workflow priority mapping " + mappingString); + } + WorkflowPriorityMapping mapping; + try { + mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1], + Priority.newInstance(Integer.parseInt(mappingArray[2]))); + } catch (NumberFormatException e) { + throw new IllegalArgumentException( + "Illegal workflow priority for mapping " + mappingString); + } + return mapping; + } + + public Priority getMappedPriority(String workflowID, CSQueue queue) { + // Recursively fetch the priority mapping for the given workflow tracing + // up the queue hierarchy until the first match. + if (queue.equals(scheduler.getRootQueue())) { + return null; + } + String queuePath = queue.getQueuePath(); + if (priorityMappings.containsKey(queuePath) + && priorityMappings.get(queuePath).containsKey(workflowID)) { + return priorityMappings.get(queuePath).get(workflowID).priority; + } else { + queue = queue.getParent(); + return getMappedPriority(workflowID, queue); + } + } + + public Priority mapWorkflowPriorityForApp(ApplicationId applicationId, + CSQueue queue, String user, Priority priority) throws YarnException { + if (overrideWithPriorityMappings) { + // Set the correct workflow priority + RMApp rmApp = scheduler.getRMContext().getRMApps().get(applicationId); + if (rmApp != null && rmApp.getApplicationTags() != null + && rmApp.getApplicationSubmissionContext() != null) { + String workflowTagPrefix = scheduler.getConf().get( + YarnConfiguration.YARN_WORKFLOW_ID_TAG_PREFIX, + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX); + String workflowID = null; + for(String tag : rmApp.getApplicationTags()) { + if (tag.trim().startsWith(workflowTagPrefix)) { + workflowID = tag.trim().substring(workflowTagPrefix.length()); + } + } + if (workflowID != null && !workflowID.isEmpty() + && priorityMappings != null && priorityMappings.size() > 0) { + Priority mappedPriority = getMappedPriority(workflowID, queue); + if (mappedPriority != null) { + LOG.info("Application " + applicationId + " user " + user + + " workflow " + workflowID + " queue " + queue.getQueueName() + + " mapping [" + priority + "] to [" + mappedPriority + + "] override " + overrideWithPriorityMappings); + + // If workflow ID exists in workflow mapping, change this + // application's priority to mapped value. Else, use queue + // default priority. + priority = mappedPriority; + priority = scheduler.checkAndGetApplicationPriority( + priority, UserGroupInformation.createRemoteUser(user), + queue.getQueueName(), applicationId); + rmApp.getApplicationSubmissionContext().setPriority(priority); + ((RMAppImpl)rmApp).setApplicationPriority(priority); + } + } + } + } + return priority; + } + + public static String getWorkflowPriorityMappingStr( + List workflowPriorityMappings) { + if (workflowPriorityMappings == null) { + return ""; + } + List workflowPriorityMappingStrs = new ArrayList<>(); + for (WorkflowPriorityMapping mapping : workflowPriorityMappings) { + workflowPriorityMappingStrs.add(mapping.toString()); + } + return StringUtils.join(WORKFLOW_SEPARATOR, workflowPriorityMappingStrs); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index a0dad817ce..9bbd148ff4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -528,14 +528,21 @@ public RMApp submitApp(int masterMemory, String queue) throws Exception { public RMApp submitApp(int masterMemory, Set appTags) throws Exception { + return submitApp(masterMemory, null, false, null, Priority.newInstance(0), + appTags); + } + + public RMApp submitApp(int masterMemory, String queue, + boolean isAppIdProvided, ApplicationId appId, Priority priority, + Set appTags) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); ResourceRequest amResourceRequest = ResourceRequest.newInstance( Priority.newInstance(0), ResourceRequest.ANY, resource, 1); return submitApp(Collections.singletonList(amResourceRequest), "", UserGroupInformation.getCurrentUser().getShortUserName(), null, false, - null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + queue, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, - false, false, null, 0, null, true, Priority.newInstance(0), null, + false, isAppIdProvided, appId, 0, null, true, priority, null, null, null, appTags); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java new file mode 100644 index 0000000000..2a679a15e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerWorkflowPriorityMapping.java @@ -0,0 +1,151 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.junit.Assert.assertEquals; + +import java.util.Arrays; +import java.util.List; +import java.util.Map; + +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Priority; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.WorkflowPriorityMappingsManager.WorkflowPriorityMapping; +import org.junit.Test; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.ImmutableSet; + +public class TestCapacitySchedulerWorkflowPriorityMapping + extends CapacitySchedulerTestBase { + private MockRM mockRM = null; + + private static void setWorkFlowPriorityMappings( + CapacitySchedulerConfiguration conf) { + // Define top-level queues + conf.setQueues( + CapacitySchedulerConfiguration.ROOT, new String[] {"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[] {"a1", "a2"}); + conf.setCapacity(A1, A1_CAPACITY); + conf.setCapacity(A2, A2_CAPACITY); + + conf.setQueues(B, new String[] {"b1", "b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setCapacity(B2, B2_CAPACITY); + conf.setCapacity(B3, B3_CAPACITY); + + List mappings = Arrays.asList( + new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)), + new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)), + new WorkflowPriorityMapping("workflow3", A, Priority.newInstance(4))); + conf.setWorkflowPriorityMappings(mappings); + } + + @Test + public void testWorkflowPriorityMappings() throws Exception { + CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean(CapacitySchedulerConfiguration + .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, true); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, 10); + + // Initialize workflow priority mappings. + setWorkFlowPriorityMappings(conf); + + mockRM = new MockRM(conf); + CapacityScheduler cs = (CapacityScheduler) mockRM.getResourceScheduler(); + mockRM.start(); + cs.start(); + + Map> expected = ImmutableMap.of( + A, ImmutableMap.of("workflow3", + new WorkflowPriorityMapping( + "workflow3", A, Priority.newInstance(4))), + B, ImmutableMap.of("workflow1", + new WorkflowPriorityMapping( + "workflow1", B, Priority.newInstance(2))), + A1, ImmutableMap.of("workflow2", + new WorkflowPriorityMapping( + "workflow2", A1, Priority.newInstance(3)))); + assertEquals(expected, cs.getWorkflowPriorityMappingsManager() + .getWorkflowPriorityMappings()); + + // Maps to rule corresponding to parent queue "a" for workflow3. + mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,1), + Priority.newInstance(0), ImmutableSet.of( + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + + "workflow3")); + RMApp app = + mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,1)); + assertEquals(4, app.getApplicationSubmissionContext().getPriority() + .getPriority()); + + // Does not match any rule as rule for queue + workflow does not exist. + // Priority passed in the app is taken up. + mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,2), + Priority.newInstance(6), ImmutableSet.of( + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + + "workflow1")); + app = + mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,2)); + assertEquals(6, app.getApplicationSubmissionContext().getPriority() + .getPriority()); + + // Maps to rule corresponding to parent queue "a1" for workflow2. + mockRM.submitApp(1, "a1", true, ApplicationId.newInstance(0,3), + Priority.newInstance(0), ImmutableSet.of( + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + + "workflow2")); + app = + mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,3)); + assertEquals(3, app.getApplicationSubmissionContext().getPriority() + .getPriority()); + + // Maps to rule corresponding to parent queue "b" for workflow1. + mockRM.submitApp(1, "b3", true, ApplicationId.newInstance(0,4), + Priority.newInstance(0), ImmutableSet.of( + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + + "workflow1")); + app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,4)); + assertEquals(2, app.getApplicationSubmissionContext().getPriority() + .getPriority()); + + // Disable workflow priority mappings override and reinitialize scheduler. + conf.setBoolean(CapacitySchedulerConfiguration + .ENABLE_WORKFLOW_PRIORITY_MAPPINGS_OVERRIDE, false); + cs.reinitialize(conf, mockRM.getRMContext()); + mockRM.submitApp(1, "a2", true, ApplicationId.newInstance(0,5), + Priority.newInstance(0), ImmutableSet.of( + YarnConfiguration.DEFAULT_YARN_WORKFLOW_ID_TAG_PREFIX + + "workflow3")); + app = mockRM.getRMContext().getRMApps().get(ApplicationId.newInstance(0,5)); + assertEquals(0, app.getApplicationSubmissionContext().getPriority() + .getPriority()); + } +}