YARN-4066. Large number of queues choke fair scheduler. (Johan Gustavsson via kasha)
This commit is contained in:
parent
715dbddf77
commit
a0b5a0a419
@ -509,6 +509,9 @@ Release 2.8.0 - UNRELEASED
|
||||
YARN-3635. Refactored current queue mapping implementation in CapacityScheduler
|
||||
to use a generic PlacementManager framework. (Wangda Tan via jianhe)
|
||||
|
||||
YARN-4066. Large number of queues choke fair scheduler.
|
||||
(Johan Gustavsson via kasha)
|
||||
|
||||
BUG FIXES
|
||||
|
||||
YARN-3197. Confusing log generated by CapacityScheduler. (Varun Saxena
|
||||
|
@ -87,7 +87,19 @@ public void initialize(Configuration conf) throws IOException,
|
||||
* could be referred to as just "parent1.queue2".
|
||||
*/
|
||||
public FSLeafQueue getLeafQueue(String name, boolean create) {
|
||||
FSQueue queue = getQueue(name, create, FSQueueType.LEAF);
|
||||
return getLeafQueue(name, create, true);
|
||||
}
|
||||
|
||||
public FSLeafQueue getLeafQueue(
|
||||
String name,
|
||||
boolean create,
|
||||
boolean recomputeSteadyShares) {
|
||||
FSQueue queue = getQueue(
|
||||
name,
|
||||
create,
|
||||
FSQueueType.LEAF,
|
||||
recomputeSteadyShares
|
||||
);
|
||||
if (queue instanceof FSParentQueue) {
|
||||
return null;
|
||||
}
|
||||
@ -117,29 +129,47 @@ public boolean removeLeafQueue(String name) {
|
||||
* could be referred to as just "parent1.queue2".
|
||||
*/
|
||||
public FSParentQueue getParentQueue(String name, boolean create) {
|
||||
FSQueue queue = getQueue(name, create, FSQueueType.PARENT);
|
||||
return getParentQueue(name, create, true);
|
||||
}
|
||||
|
||||
public FSParentQueue getParentQueue(
|
||||
String name,
|
||||
boolean create,
|
||||
boolean recomputeSteadyShares) {
|
||||
FSQueue queue = getQueue(
|
||||
name,
|
||||
create,
|
||||
FSQueueType.PARENT,
|
||||
recomputeSteadyShares
|
||||
);
|
||||
if (queue instanceof FSLeafQueue) {
|
||||
return null;
|
||||
}
|
||||
return (FSParentQueue) queue;
|
||||
}
|
||||
|
||||
private FSQueue getQueue(String name, boolean create, FSQueueType queueType) {
|
||||
private FSQueue getQueue(
|
||||
String name,
|
||||
boolean create,
|
||||
FSQueueType queueType,
|
||||
boolean recomputeSteadyShares) {
|
||||
boolean recompute = recomputeSteadyShares;
|
||||
name = ensureRootPrefix(name);
|
||||
FSQueue queue;
|
||||
synchronized (queues) {
|
||||
FSQueue queue = queues.get(name);
|
||||
queue = queues.get(name);
|
||||
if (queue == null && create) {
|
||||
// if the queue doesn't exist,create it and return
|
||||
queue = createQueue(name, queueType);
|
||||
|
||||
// Update steady fair share for all queues
|
||||
if (queue != null) {
|
||||
} else {
|
||||
recompute = false;
|
||||
}
|
||||
}
|
||||
if (recompute) {
|
||||
rootQueue.recomputeSteadyShares();
|
||||
}
|
||||
}
|
||||
return queue;
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Creates a leaf or parent queue based on what is specified in 'queueType'
|
||||
@ -376,20 +406,24 @@ private String ensureRootPrefix(String name) {
|
||||
|
||||
public void updateAllocationConfiguration(AllocationConfiguration queueConf) {
|
||||
// Create leaf queues and the parent queues in a leaf's ancestry if they do not exist
|
||||
for (String name : queueConf.getConfiguredQueues().get(FSQueueType.LEAF)) {
|
||||
synchronized (queues) {
|
||||
for (String name : queueConf.getConfiguredQueues().get(
|
||||
FSQueueType.LEAF)) {
|
||||
if (removeEmptyIncompatibleQueues(name, FSQueueType.LEAF)) {
|
||||
getLeafQueue(name, true);
|
||||
getLeafQueue(name, true, false);
|
||||
}
|
||||
}
|
||||
|
||||
// At this point all leaves and 'parents with at least one child' would have been created.
|
||||
// At this point all leaves and 'parents with
|
||||
// at least one child' would have been created.
|
||||
// Now create parents with no configured leaf.
|
||||
for (String name : queueConf.getConfiguredQueues().get(
|
||||
FSQueueType.PARENT)) {
|
||||
if (removeEmptyIncompatibleQueues(name, FSQueueType.PARENT)) {
|
||||
getParentQueue(name, true);
|
||||
getParentQueue(name, true, false);
|
||||
}
|
||||
}
|
||||
}
|
||||
rootQueue.recomputeSteadyShares();
|
||||
|
||||
for (FSQueue queue : queues.values()) {
|
||||
// Update queue metrics
|
||||
|
Loading…
Reference in New Issue
Block a user