YARN-11383. Workflow priority mappings is case sensitive (#5171)

Contributed by Aparajita Choudhary
This commit is contained in:
Varun Saxena 2023-03-05 21:25:16 +05:30
parent 6bd2444815
commit 2a0dc2ab2f
2 changed files with 15 additions and 21 deletions

View File

@ -56,8 +56,8 @@ public class WorkflowPriorityMappingsManager {
private boolean overrideWithPriorityMappings = false;
// Map of queue to a map of workflow ID to priority
private Map<String, Map<String, WorkflowPriorityMapping>> priorityMappings =
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
private Map<String, Map<String, Priority>> priorityMappings =
new HashMap<>();
public static class WorkflowPriorityMapping {
String workflowID;
@ -115,10 +115,9 @@ public void initialize(CapacityScheduler scheduler) throws IOException {
*
* @return workflowID to priority mappings for a queue
*/
public Map<String, Map<String, WorkflowPriorityMapping>>
public Map<String, Map<String, Priority>>
getWorkflowPriorityMappings() {
Map<String, Map<String, WorkflowPriorityMapping>> mappings =
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
Map<String, Map<String, Priority>> mappings = new HashMap<>();
Collection<String> workflowMappings = conf.getWorkflowPriorityMappings();
for (String workflowMapping : workflowMappings) {
@ -127,9 +126,9 @@ public void initialize(CapacityScheduler scheduler) throws IOException {
if (mapping != null) {
if (!mappings.containsKey(mapping.queue)) {
mappings.put(mapping.queue,
new HashMap<String, WorkflowPriorityMapping>());
new HashMap<String, Priority>());
}
mappings.get(mapping.queue).put(mapping.workflowID, mapping);
mappings.get(mapping.queue).put(mapping.workflowID, mapping.priority);
}
}
return mappings;
@ -150,8 +149,9 @@ private WorkflowPriorityMapping getWorkflowMappingFromString(
}
WorkflowPriorityMapping mapping;
try {
mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1],
Priority.newInstance(Integer.parseInt(mappingArray[2])));
//Converting workflow id to lowercase as yarn converts application tags also to lowercase
mapping = new WorkflowPriorityMapping(StringUtils.toLowerCase(mappingArray[0]),
mappingArray[1], Priority.newInstance(Integer.parseInt(mappingArray[2])));
} catch (NumberFormatException e) {
throw new IllegalArgumentException(
"Illegal workflow priority for mapping " + mappingString);
@ -168,7 +168,7 @@ public Priority getMappedPriority(String workflowID, CSQueue queue) {
String queuePath = queue.getQueuePath();
if (priorityMappings.containsKey(queuePath)
&& priorityMappings.get(queuePath).containsKey(workflowID)) {
return priorityMappings.get(queuePath).get(workflowID).priority;
return priorityMappings.get(queuePath).get(workflowID);
} else {
queue = queue.getParent();
return getMappedPriority(workflowID, queue);

View File

@ -78,7 +78,7 @@ private static void setWorkFlowPriorityMappings(
List<WorkflowPriorityMapping> mappings = Arrays.asList(
new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)),
new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)),
new WorkflowPriorityMapping("workflow3", A, Priority.newInstance(4)));
new WorkflowPriorityMapping("Workflow3", A, Priority.newInstance(4)));
conf.setWorkflowPriorityMappings(mappings);
}
@ -99,16 +99,10 @@ public void testWorkflowPriorityMappings() throws Exception {
mockRM.start();
cs.start();
Map<String, Map<String, Object>> expected = ImmutableMap.of(
A, ImmutableMap.of("workflow3",
new WorkflowPriorityMapping(
"workflow3", A, Priority.newInstance(4))),
B, ImmutableMap.of("workflow1",
new WorkflowPriorityMapping(
"workflow1", B, Priority.newInstance(2))),
A1, ImmutableMap.of("workflow2",
new WorkflowPriorityMapping(
"workflow2", A1, Priority.newInstance(3))));
Map<String, Object> expected = ImmutableMap.of(
A, ImmutableMap.of("workflow3", Priority.newInstance(4)),
B, ImmutableMap.of("workflow1", Priority.newInstance(2)),
A1, ImmutableMap.of("workflow2", Priority.newInstance(3)));
assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
.getWorkflowPriorityMappings());