YARN-11383. Workflow priority mappings is case sensitive (#5171)
Contributed by Aparajita Choudhary
(cherry picked from commit 2a0dc2ab2f
)
This commit is contained in:
parent
703158c9c6
commit
157af0cb22
@ -56,8 +56,8 @@ public class WorkflowPriorityMappingsManager {
|
|||||||
|
|
||||||
private boolean overrideWithPriorityMappings = false;
|
private boolean overrideWithPriorityMappings = false;
|
||||||
// Map of queue to a map of workflow ID to priority
|
// Map of queue to a map of workflow ID to priority
|
||||||
private Map<String, Map<String, WorkflowPriorityMapping>> priorityMappings =
|
private Map<String, Map<String, Priority>> priorityMappings =
|
||||||
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
|
new HashMap<>();
|
||||||
|
|
||||||
public static class WorkflowPriorityMapping {
|
public static class WorkflowPriorityMapping {
|
||||||
String workflowID;
|
String workflowID;
|
||||||
@ -115,10 +115,9 @@ public void initialize(CapacityScheduler scheduler) throws IOException {
|
|||||||
*
|
*
|
||||||
* @return workflowID to priority mappings for a queue
|
* @return workflowID to priority mappings for a queue
|
||||||
*/
|
*/
|
||||||
public Map<String, Map<String, WorkflowPriorityMapping>>
|
public Map<String, Map<String, Priority>>
|
||||||
getWorkflowPriorityMappings() {
|
getWorkflowPriorityMappings() {
|
||||||
Map<String, Map<String, WorkflowPriorityMapping>> mappings =
|
Map<String, Map<String, Priority>> mappings = new HashMap<>();
|
||||||
new HashMap<String, Map<String, WorkflowPriorityMapping>>();
|
|
||||||
|
|
||||||
Collection<String> workflowMappings = conf.getWorkflowPriorityMappings();
|
Collection<String> workflowMappings = conf.getWorkflowPriorityMappings();
|
||||||
for (String workflowMapping : workflowMappings) {
|
for (String workflowMapping : workflowMappings) {
|
||||||
@ -127,9 +126,9 @@ public void initialize(CapacityScheduler scheduler) throws IOException {
|
|||||||
if (mapping != null) {
|
if (mapping != null) {
|
||||||
if (!mappings.containsKey(mapping.queue)) {
|
if (!mappings.containsKey(mapping.queue)) {
|
||||||
mappings.put(mapping.queue,
|
mappings.put(mapping.queue,
|
||||||
new HashMap<String, WorkflowPriorityMapping>());
|
new HashMap<String, Priority>());
|
||||||
}
|
}
|
||||||
mappings.get(mapping.queue).put(mapping.workflowID, mapping);
|
mappings.get(mapping.queue).put(mapping.workflowID, mapping.priority);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
return mappings;
|
return mappings;
|
||||||
@ -150,8 +149,9 @@ private WorkflowPriorityMapping getWorkflowMappingFromString(
|
|||||||
}
|
}
|
||||||
WorkflowPriorityMapping mapping;
|
WorkflowPriorityMapping mapping;
|
||||||
try {
|
try {
|
||||||
mapping = new WorkflowPriorityMapping(mappingArray[0], mappingArray[1],
|
//Converting workflow id to lowercase as yarn converts application tags also to lowercase
|
||||||
Priority.newInstance(Integer.parseInt(mappingArray[2])));
|
mapping = new WorkflowPriorityMapping(StringUtils.toLowerCase(mappingArray[0]),
|
||||||
|
mappingArray[1], Priority.newInstance(Integer.parseInt(mappingArray[2])));
|
||||||
} catch (NumberFormatException e) {
|
} catch (NumberFormatException e) {
|
||||||
throw new IllegalArgumentException(
|
throw new IllegalArgumentException(
|
||||||
"Illegal workflow priority for mapping " + mappingString);
|
"Illegal workflow priority for mapping " + mappingString);
|
||||||
@ -168,7 +168,7 @@ public Priority getMappedPriority(String workflowID, CSQueue queue) {
|
|||||||
String queuePath = queue.getQueuePath();
|
String queuePath = queue.getQueuePath();
|
||||||
if (priorityMappings.containsKey(queuePath)
|
if (priorityMappings.containsKey(queuePath)
|
||||||
&& priorityMappings.get(queuePath).containsKey(workflowID)) {
|
&& priorityMappings.get(queuePath).containsKey(workflowID)) {
|
||||||
return priorityMappings.get(queuePath).get(workflowID).priority;
|
return priorityMappings.get(queuePath).get(workflowID);
|
||||||
} else {
|
} else {
|
||||||
queue = queue.getParent();
|
queue = queue.getParent();
|
||||||
return getMappedPriority(workflowID, queue);
|
return getMappedPriority(workflowID, queue);
|
||||||
|
@ -64,7 +64,7 @@ private static void setWorkFlowPriorityMappings(
|
|||||||
List<WorkflowPriorityMapping> mappings = Arrays.asList(
|
List<WorkflowPriorityMapping> mappings = Arrays.asList(
|
||||||
new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)),
|
new WorkflowPriorityMapping("workflow1", B, Priority.newInstance(2)),
|
||||||
new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)),
|
new WorkflowPriorityMapping("workflow2", A1, Priority.newInstance(3)),
|
||||||
new WorkflowPriorityMapping("workflow3", A, Priority.newInstance(4)));
|
new WorkflowPriorityMapping("Workflow3", A, Priority.newInstance(4)));
|
||||||
conf.setWorkflowPriorityMappings(mappings);
|
conf.setWorkflowPriorityMappings(mappings);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -85,16 +85,10 @@ public void testWorkflowPriorityMappings() throws Exception {
|
|||||||
mockRM.start();
|
mockRM.start();
|
||||||
cs.start();
|
cs.start();
|
||||||
|
|
||||||
Map<String, Map<String, Object>> expected = ImmutableMap.of(
|
Map<String, Object> expected = ImmutableMap.of(
|
||||||
A, ImmutableMap.of("workflow3",
|
A, ImmutableMap.of("workflow3", Priority.newInstance(4)),
|
||||||
new WorkflowPriorityMapping(
|
B, ImmutableMap.of("workflow1", Priority.newInstance(2)),
|
||||||
"workflow3", A, Priority.newInstance(4))),
|
A1, ImmutableMap.of("workflow2", Priority.newInstance(3)));
|
||||||
B, ImmutableMap.of("workflow1",
|
|
||||||
new WorkflowPriorityMapping(
|
|
||||||
"workflow1", B, Priority.newInstance(2))),
|
|
||||||
A1, ImmutableMap.of("workflow2",
|
|
||||||
new WorkflowPriorityMapping(
|
|
||||||
"workflow2", A1, Priority.newInstance(3))));
|
|
||||||
assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
|
assertEquals(expected, cs.getWorkflowPriorityMappingsManager()
|
||||||
.getWorkflowPriorityMappings());
|
.getWorkflowPriorityMappings());
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user