YARN-5453. FairScheduler#update may skip update demand resource of child queue/app if current demand reached maxResource. (sandflee via kasha)
This commit is contained in:
parent
c8bc7a8475
commit
86ac1ad9fd
@ -268,20 +268,16 @@ public void updateDemand() {
|
|||||||
readLock.lock();
|
readLock.lock();
|
||||||
try {
|
try {
|
||||||
for (FSAppAttempt sched : runnableApps) {
|
for (FSAppAttempt sched : runnableApps) {
|
||||||
if (Resources.equals(demand, maxShare)) {
|
updateDemandForApp(sched);
|
||||||
break;
|
|
||||||
}
|
|
||||||
updateDemandForApp(sched, maxShare);
|
|
||||||
}
|
}
|
||||||
for (FSAppAttempt sched : nonRunnableApps) {
|
for (FSAppAttempt sched : nonRunnableApps) {
|
||||||
if (Resources.equals(demand, maxShare)) {
|
updateDemandForApp(sched);
|
||||||
break;
|
|
||||||
}
|
|
||||||
updateDemandForApp(sched, maxShare);
|
|
||||||
}
|
}
|
||||||
} finally {
|
} finally {
|
||||||
readLock.unlock();
|
readLock.unlock();
|
||||||
}
|
}
|
||||||
|
// Cap demand to maxShare to limit allocation to maxShare
|
||||||
|
demand = Resources.componentwiseMin(demand, maxShare);
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("The updated demand for " + getName() + " is " + demand
|
LOG.debug("The updated demand for " + getName() + " is " + demand
|
||||||
+ "; the max is " + maxShare);
|
+ "; the max is " + maxShare);
|
||||||
@ -290,7 +286,7 @@ public void updateDemand() {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
|
private void updateDemandForApp(FSAppAttempt sched) {
|
||||||
sched.updateDemand();
|
sched.updateDemand();
|
||||||
Resource toAdd = sched.getDemand();
|
Resource toAdd = sched.getDemand();
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
@ -299,7 +295,6 @@ private void updateDemandForApp(FSAppAttempt sched, Resource maxRes) {
|
|||||||
+ demand);
|
+ demand);
|
||||||
}
|
}
|
||||||
demand = Resources.add(demand, toAdd);
|
demand = Resources.add(demand, toAdd);
|
||||||
demand = Resources.componentwiseMin(demand, maxRes);
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -159,16 +159,14 @@ public void updateDemand() {
|
|||||||
childQueue.updateDemand();
|
childQueue.updateDemand();
|
||||||
Resource toAdd = childQueue.getDemand();
|
Resource toAdd = childQueue.getDemand();
|
||||||
demand = Resources.add(demand, toAdd);
|
demand = Resources.add(demand, toAdd);
|
||||||
demand = Resources.componentwiseMin(demand, maxShare);
|
|
||||||
if (LOG.isDebugEnabled()) {
|
if (LOG.isDebugEnabled()) {
|
||||||
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
LOG.debug("Counting resource from " + childQueue.getName() + " " +
|
||||||
toAdd + "; Total resource demand for " + getName() +
|
toAdd + "; Total resource demand for " + getName() +
|
||||||
" now " + demand);
|
" now " + demand);
|
||||||
}
|
}
|
||||||
if (Resources.equals(demand, maxShare)) {
|
|
||||||
break;
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
// Cap demand to maxShare to limit allocation to maxShare
|
||||||
|
demand = Resources.componentwiseMin(demand, maxShare);
|
||||||
} finally {
|
} finally {
|
||||||
writeLock.unlock();
|
writeLock.unlock();
|
||||||
}
|
}
|
||||||
|
@ -4813,4 +4813,45 @@ public void testReservationMetrics() throws IOException {
|
|||||||
assertEquals(0, metrics.getReservedMB());
|
assertEquals(0, metrics.getReservedMB());
|
||||||
assertEquals(0, metrics.getReservedVirtualCores());
|
assertEquals(0, metrics.getReservedVirtualCores());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testUpdateDemand() throws IOException {
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
Resource maxResource = Resources.createResource(1024 * 8);
|
||||||
|
|
||||||
|
FSAppAttempt app1 = mock(FSAppAttempt.class);
|
||||||
|
Mockito.when(app1.getDemand()).thenReturn(maxResource);
|
||||||
|
FSAppAttempt app2 = mock(FSAppAttempt.class);
|
||||||
|
Mockito.when(app2.getDemand()).thenReturn(maxResource);
|
||||||
|
|
||||||
|
QueueManager queueManager = scheduler.getQueueManager();
|
||||||
|
FSParentQueue queue1 = queueManager.getParentQueue("queue1", true);
|
||||||
|
|
||||||
|
FSLeafQueue aQueue =
|
||||||
|
new FSLeafQueue("root.queue1.a", scheduler, queue1);
|
||||||
|
aQueue.setMaxShare(maxResource);
|
||||||
|
aQueue.addAppSchedulable(app1);
|
||||||
|
|
||||||
|
FSLeafQueue bQueue =
|
||||||
|
new FSLeafQueue("root.queue1.b", scheduler, queue1);
|
||||||
|
bQueue.setMaxShare(maxResource);
|
||||||
|
bQueue.addAppSchedulable(app2);
|
||||||
|
|
||||||
|
queue1.setMaxShare(maxResource);
|
||||||
|
queue1.addChildQueue(aQueue);
|
||||||
|
queue1.addChildQueue(bQueue);
|
||||||
|
|
||||||
|
queue1.updateDemand();
|
||||||
|
|
||||||
|
assertTrue("Demand is greater than max allowed ",
|
||||||
|
Resources.equals(queue1.getDemand(), maxResource));
|
||||||
|
assertTrue("Demand of child queue not updated ",
|
||||||
|
Resources.equals(aQueue.getDemand(), maxResource) &&
|
||||||
|
Resources.equals(bQueue.getDemand(), maxResource));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user