YARN-8804. resourceLimits may be wrongly calculated when leaf-queue is blocked in cluster with 3+ level queues. Contributed by Tao Yang

This commit is contained in:
Jason Lowe 2018-09-26 14:43:00 -07:00
parent 913f87dada
commit 6b988d821e
3 changed files with 112 additions and 6 deletions

View File

@ -38,6 +38,9 @@ public class ResourceLimits {
// containers. // containers.
private volatile Resource headroom; private volatile Resource headroom;
// How much resource should be reserved for high-priority blocked queues
private Resource blockedHeadroom;
private boolean allowPreempt = false; private boolean allowPreempt = false;
public ResourceLimits(Resource limit) { public ResourceLimits(Resource limit) {
@ -81,4 +84,25 @@ public boolean isAllowPreemption() {
public void setIsAllowPreemption(boolean allowPreempt) { public void setIsAllowPreemption(boolean allowPreempt) {
this.allowPreempt = allowPreempt; this.allowPreempt = allowPreempt;
} }
public void addBlockedHeadroom(Resource resource) {
if (blockedHeadroom == null) {
blockedHeadroom = Resource.newInstance(0, 0);
}
Resources.addTo(blockedHeadroom, resource);
}
public Resource getBlockedHeadroom() {
if (blockedHeadroom == null) {
return Resources.none();
}
return blockedHeadroom;
}
public Resource getNetLimit() {
if (blockedHeadroom != null) {
return Resources.subtract(limit, blockedHeadroom);
}
return limit;
}
} }

View File

@ -776,7 +776,6 @@ private CSAssignment assignContainersToChildQueues(Resource cluster,
SchedulingMode schedulingMode) { SchedulingMode schedulingMode) {
CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT; CSAssignment assignment = CSAssignment.NULL_ASSIGNMENT;
Resource parentLimits = limits.getLimit();
printChildQueues(); printChildQueues();
// Try to assign to most 'under-served' sub-queue // Try to assign to most 'under-served' sub-queue
@ -790,7 +789,7 @@ private CSAssignment assignContainersToChildQueues(Resource cluster,
// Get ResourceLimits of child queue before assign containers // Get ResourceLimits of child queue before assign containers
ResourceLimits childLimits = ResourceLimits childLimits =
getResourceLimitsOfChild(childQueue, cluster, parentLimits, getResourceLimitsOfChild(childQueue, cluster, limits.getNetLimit(),
candidates.getPartition()); candidates.getPartition());
CSAssignment childAssignment = childQueue.assignContainers(cluster, CSAssignment childAssignment = childQueue.assignContainers(cluster,
@ -812,16 +811,21 @@ private CSAssignment assignContainersToChildQueues(Resource cluster,
CSAssignment.SkippedType.QUEUE_LIMIT) { CSAssignment.SkippedType.QUEUE_LIMIT) {
assignment = childAssignment; assignment = childAssignment;
} }
Resource blockedHeadroom = null;
if (childQueue instanceof LeafQueue) {
blockedHeadroom = childLimits.getHeadroom();
} else {
blockedHeadroom = childLimits.getBlockedHeadroom();
}
Resource resourceToSubtract = Resources.max(resourceCalculator, Resource resourceToSubtract = Resources.max(resourceCalculator,
cluster, childLimits.getHeadroom(), Resources.none()); cluster, blockedHeadroom, Resources.none());
limits.addBlockedHeadroom(resourceToSubtract);
if(LOG.isDebugEnabled()) { if(LOG.isDebugEnabled()) {
LOG.debug("Decrease parentLimits " + parentLimits + LOG.debug("Decrease parentLimits " + limits.getLimit() +
" for " + this.getQueueName() + " by " + " for " + this.getQueueName() + " by " +
resourceToSubtract + " as childQueue=" + resourceToSubtract + " as childQueue=" +
childQueue.getQueueName() + " is blocked"); childQueue.getQueueName() + " is blocked");
} }
parentLimits = Resources.subtract(parentLimits,
resourceToSubtract);
} }
} }

View File

@ -1052,4 +1052,82 @@ public void testUnreserveWhenClusterResourceHasEmptyResourceType()
rm1.close(); rm1.close();
} }
@Test(timeout = 60000)
public void testAllocationCannotBeBlockedWhenFormerQueueReachedItsLimit()
throws Exception {
/**
* Queue structure:
* <pre>
* Root
* / | \
* a b c
* 10 20 70
* | \
* c1 c2
* 10(max=10) 90
* </pre>
* Test case:
* Create a cluster with two nodes whose node resource both are
* <10GB, 10core>, create queues as above, among them max-capacity of "c1"
* is 10 and others are all 100, so that max-capacity of queue "c1" is
* <2GB, 2core>,
* submit app1 to queue "c1" and launch am1(resource=<1GB, 1 core>) on nm1,
* submit app2 to queue "b" and launch am2(resource=<1GB, 1 core>) on nm1,
* app1 and app2 both ask one <2GB, 1core> containers
*
* Now queue "c" has lower capacity percentage than queue "b", the
* allocation sequence will be "a" -> "c" -> "b", queue "c1" has reached
* queue limit so that requests of app1 should be pending
*
* After nm1 do 1 heartbeat, scheduler should allocate one container for
* app2 on nm1.
*/
CapacitySchedulerConfiguration newConf =
(CapacitySchedulerConfiguration) TestUtils
.getConfigurationWithMultipleQueues(conf);
newConf.setQueues(CapacitySchedulerConfiguration.ROOT + ".c",
new String[] { "c1", "c2" });
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
newConf
.setMaximumCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c1", 10);
newConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".c.c2", 90);
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
DominantResourceCalculator.class, ResourceCalculator.class);
MockRM rm1 = new MockRM(newConf);
RMNodeLabelsManager nodeLabelsManager = new NullRMNodeLabelsManager();
nodeLabelsManager.init(newConf);
rm1.getRMContext().setNodeLabelManager(nodeLabelsManager);
rm1.start();
MockNM nm1 = rm1.registerNode("h1:1234", 10 * GB);
MockNM nm2 = rm1.registerNode("h2:1234", 10 * GB);
// launch an app to queue "c1", AM container should be launched on nm1
RMApp app1 = rm1.submitApp(1 * GB, "app", "user", null, "c1");
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1);
// launch another app to queue "b", AM container should be launched on nm1
RMApp app2 = rm1.submitApp(1 * GB, "app", "user", null, "b");
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1);
am1.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
am2.allocate("*", 2 * GB, 1, new ArrayList<ContainerId>());
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId());
FiCaSchedulerApp schedulerApp1 =
cs.getApplicationAttempt(am1.getApplicationAttemptId());
FiCaSchedulerApp schedulerApp2 =
cs.getApplicationAttempt(am2.getApplicationAttemptId());
// Do nm1 heartbeats 1 times, will allocate a container on nm1 for app2
cs.handle(new NodeUpdateSchedulerEvent(rmNode1));
rm1.drainEvents();
Assert.assertEquals(1, schedulerApp1.getLiveContainers().size());
Assert.assertEquals(2, schedulerApp2.getLiveContainers().size());
rm1.close();
}
} }