YARN-3101. In Fair Scheduler, fix canceling of reservations for exceeding max share (Anubhav Dhoot via Sandy Ryza)
This commit is contained in:
parent
afbecbb2cc
commit
b6466deac6
@ -482,6 +482,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
YARN-3058. Fix error message of tokens' activation delay configuration.
|
YARN-3058. Fix error message of tokens' activation delay configuration.
|
||||||
(Yi Liu via ozawa)
|
(Yi Liu via ozawa)
|
||||||
|
|
||||||
|
YARN-3101. In Fair Scheduler, fix canceling of reservations for exceeding
|
||||||
|
max share (Anubhav Dhoot via Sandy Ryza)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -530,6 +530,10 @@ private Resource assignContainer(
|
|||||||
|
|
||||||
return container.getResource();
|
return container.getResource();
|
||||||
} else {
|
} else {
|
||||||
|
if (!FairScheduler.fitsInMaxShare(getQueue(), capability)) {
|
||||||
|
return Resources.none();
|
||||||
|
}
|
||||||
|
|
||||||
// The desired container won't fit here, so reserve
|
// The desired container won't fit here, so reserve
|
||||||
reserve(request.getPriority(), node, container, reserved);
|
reserve(request.getPriority(), node, container, reserved);
|
||||||
|
|
||||||
|
@ -1049,7 +1049,8 @@ private synchronized void attemptScheduling(FSSchedulerNode node) {
|
|||||||
FSQueue queue = reservedAppSchedulable.getQueue();
|
FSQueue queue = reservedAppSchedulable.getQueue();
|
||||||
|
|
||||||
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
|
if (!reservedAppSchedulable.hasContainerForNode(reservedPriority, node)
|
||||||
|| !fitInMaxShare(queue)) {
|
|| !fitsInMaxShare(queue,
|
||||||
|
node.getReservedContainer().getReservedResource())) {
|
||||||
// Don't hold the reservation if app can no longer use it
|
// Don't hold the reservation if app can no longer use it
|
||||||
LOG.info("Releasing reservation that cannot be satisfied for application "
|
LOG.info("Releasing reservation that cannot be satisfied for application "
|
||||||
+ reservedAppSchedulable.getApplicationAttemptId()
|
+ reservedAppSchedulable.getApplicationAttemptId()
|
||||||
@ -1084,14 +1085,18 @@ private synchronized void attemptScheduling(FSSchedulerNode node) {
|
|||||||
updateRootQueueMetrics();
|
updateRootQueueMetrics();
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean fitInMaxShare(FSQueue queue) {
|
static boolean fitsInMaxShare(FSQueue queue, Resource
|
||||||
if (Resources.fitsIn(queue.getResourceUsage(), queue.getMaxShare())) {
|
additionalResource) {
|
||||||
|
Resource usagePlusAddition =
|
||||||
|
Resources.add(queue.getResourceUsage(), additionalResource);
|
||||||
|
|
||||||
|
if (!Resources.fitsIn(usagePlusAddition, queue.getMaxShare())) {
|
||||||
return false;
|
return false;
|
||||||
}
|
}
|
||||||
|
|
||||||
FSQueue parentQueue = queue.getParent();
|
FSQueue parentQueue = queue.getParent();
|
||||||
if (parentQueue != null) {
|
if (parentQueue != null) {
|
||||||
return fitInMaxShare(parentQueue);
|
return fitsInMaxShare(parentQueue, additionalResource);
|
||||||
}
|
}
|
||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
@ -784,19 +784,18 @@ public void testSimpleContainerReservation() throws Exception {
|
|||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test (timeout = 5000)
|
@Test (timeout = 500000)
|
||||||
public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
public void testContainerReservationAttemptExceedingQueueMax()
|
||||||
|
throws Exception {
|
||||||
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
out.println("<?xml version=\"1.0\"?>");
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
out.println("<allocations>");
|
out.println("<allocations>");
|
||||||
out.println("<queue name=\"root\">");
|
out.println("<queue name=\"root\">");
|
||||||
out.println("<queue name=\"queue1\">");
|
out.println("<queue name=\"queue1\">");
|
||||||
out.println("<minResources>1024mb,5vcores</minResources>");
|
out.println("<maxResources>2048mb,5vcores</maxResources>");
|
||||||
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
out.println("<queue name=\"queue2\">");
|
out.println("<queue name=\"queue2\">");
|
||||||
out.println("<minResources>1024mb,5vcores</minResources>");
|
|
||||||
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
out.println("</queue>");
|
out.println("</queue>");
|
||||||
@ -825,7 +824,64 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
|||||||
getResourceUsage().getMemory());
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
// Now queue 2 requests likewise
|
// Now queue 2 requests likewise
|
||||||
ApplicationAttemptId attId = createSchedulingRequest(1024, "queue2", "user2", 1);
|
createSchedulingRequest(1024, "queue2", "user2", 1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Make sure queue 2 is allocated app capacity
|
||||||
|
assertEquals(1024, scheduler.getQueueManager().getQueue("queue2").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", "user1", 1);
|
||||||
|
scheduler.update();
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Ensure the reservation does not get created as allocated memory of
|
||||||
|
// queue1 exceeds max
|
||||||
|
assertEquals(0, scheduler.getSchedulerApp(attId1).
|
||||||
|
getCurrentReservation().getMemory());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test (timeout = 500000)
|
||||||
|
public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
||||||
|
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
|
||||||
|
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<queue name=\"queue1\">");
|
||||||
|
out.println("<maxResources>3072mb,10vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queue2\">");
|
||||||
|
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.init(conf);
|
||||||
|
scheduler.start();
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
|
// Add a node
|
||||||
|
RMNode node1 =
|
||||||
|
MockNodes
|
||||||
|
.newNodeInfo(1, Resources.createResource(3072, 5), 1, "127.0.0.1");
|
||||||
|
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
|
||||||
|
scheduler.handle(nodeEvent1);
|
||||||
|
|
||||||
|
// Queue 1 requests full capacity of the queue
|
||||||
|
createSchedulingRequest(2048, "queue1", "user1", 1);
|
||||||
|
scheduler.update();
|
||||||
|
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
|
||||||
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
|
// Make sure queue 1 is allocated app capacity
|
||||||
|
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
||||||
|
getResourceUsage().getMemory());
|
||||||
|
|
||||||
|
// Now queue 2 requests likewise
|
||||||
|
createSchedulingRequest(1024, "queue2", "user2", 1);
|
||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
@ -841,18 +897,34 @@ public void testContainerReservationNotExceedingQueueMax() throws Exception {
|
|||||||
assertEquals(1024, scheduler.getSchedulerApp(attId1)
|
assertEquals(1024, scheduler.getSchedulerApp(attId1)
|
||||||
.getCurrentReservation().getMemory());
|
.getCurrentReservation().getMemory());
|
||||||
|
|
||||||
// Now remove app of queue2
|
// Exercise checks that reservation fits
|
||||||
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
|
scheduler.handle(updateEvent);
|
||||||
attId, RMAppAttemptState.FINISHED, false);
|
|
||||||
scheduler.update();
|
|
||||||
scheduler.handle(appRemovedEvent1);
|
|
||||||
|
|
||||||
// Queue should have no apps
|
// Ensure the reservation still exists as allocated memory of queue1 doesn't
|
||||||
assertEquals(0, scheduler.getQueueManager().getQueue("queue2").
|
// exceed max
|
||||||
getResourceUsage().getMemory());
|
assertEquals(1024, scheduler.getSchedulerApp(attId1).
|
||||||
|
getCurrentReservation().getMemory());
|
||||||
|
|
||||||
|
// Now reduce max Resources of queue1 down to 2048
|
||||||
|
out = new PrintWriter(new FileWriter(ALLOC_FILE));
|
||||||
|
out.println("<?xml version=\"1.0\"?>");
|
||||||
|
out.println("<allocations>");
|
||||||
|
out.println("<queue name=\"root\">");
|
||||||
|
out.println("<queue name=\"queue1\">");
|
||||||
|
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("<queue name=\"queue2\">");
|
||||||
|
out.println("<maxResources>2048mb,10vcores</maxResources>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</queue>");
|
||||||
|
out.println("</allocations>");
|
||||||
|
out.close();
|
||||||
|
|
||||||
|
scheduler.reinitialize(conf, resourceManager.getRMContext());
|
||||||
|
|
||||||
createSchedulingRequest(1024, "queue2", "user2", 1);
|
createSchedulingRequest(1024, "queue2", "user2", 1);
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
// Make sure allocated memory of queue1 doesn't exceed its maximum
|
// Make sure allocated memory of queue1 doesn't exceed its maximum
|
||||||
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
assertEquals(2048, scheduler.getQueueManager().getQueue("queue1").
|
||||||
getResourceUsage().getMemory());
|
getResourceUsage().getMemory());
|
||||||
@ -2257,10 +2329,9 @@ public void testReservationWhileMultiplePriorities() throws IOException {
|
|||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
assertEquals(1, app.getLiveContainers().size());
|
assertEquals(1, app.getLiveContainers().size());
|
||||||
// Reserved container should will be at higher priority,
|
// Reserved container should still be at lower priority
|
||||||
// since old reservation cannot be satisfied
|
|
||||||
for (RMContainer container : app.getReservedContainers()) {
|
for (RMContainer container : app.getReservedContainers()) {
|
||||||
assertEquals(1, container.getReservedPriority().getPriority());
|
assertEquals(2, container.getReservedPriority().getPriority());
|
||||||
}
|
}
|
||||||
|
|
||||||
// Complete container
|
// Complete container
|
||||||
@ -2273,11 +2344,12 @@ public void testReservationWhileMultiplePriorities() throws IOException {
|
|||||||
scheduler.update();
|
scheduler.update();
|
||||||
scheduler.handle(updateEvent);
|
scheduler.handle(updateEvent);
|
||||||
|
|
||||||
// Reserved container (at higher priority) should be run
|
// Reserved container (at lower priority) should be run
|
||||||
Collection<RMContainer> liveContainers = app.getLiveContainers();
|
Collection<RMContainer> liveContainers = app.getLiveContainers();
|
||||||
assertEquals(1, liveContainers.size());
|
assertEquals(1, liveContainers.size());
|
||||||
for (RMContainer liveContainer : liveContainers) {
|
for (RMContainer liveContainer : liveContainers) {
|
||||||
Assert.assertEquals(1, liveContainer.getContainer().getPriority().getPriority());
|
Assert.assertEquals(2, liveContainer.getContainer().getPriority()
|
||||||
|
.getPriority());
|
||||||
}
|
}
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableMB());
|
||||||
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
assertEquals(0, scheduler.getRootQueueMetrics().getAvailableVirtualCores());
|
||||||
|
Loading…
Reference in New Issue
Block a user