YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending jobs. (Siqi Li via kasha)

This commit is contained in:
Karthik Kambatla 2015-03-04 18:06:36 -08:00
parent 430b537188
commit 22426a1c9f
4 changed files with 348 additions and 6 deletions

View File

@ -698,6 +698,9 @@ Release 2.7.0 - UNRELEASED
YARN-3131. YarnClientImpl should check FAILED and KILLED state in
submitApplication (Chang Li via jlowe)
YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending
jobs. (Siqi Li via kasha)
Release 2.6.0 - 2014-11-18
INCOMPATIBLE CHANGES

View File

@ -1477,6 +1477,7 @@ public void onReload(AllocationConfiguration queueInfo) {
allocConf = queueInfo;
allocConf.getDefaultSchedulingPolicy().initialize(clusterResource);
queueMgr.updateAllocationConfiguration(allocConf);
maxRunningEnforcer.updateRunnabilityOnReload();
}
}
}

View File

@ -104,6 +104,26 @@ public void trackNonRunnableApp(FSAppAttempt app) {
usersNonRunnableApps.put(user, app);
}
/**
* This is called after reloading the allocation configuration when the
* scheduler is reinitilized
*
* Checks to see whether any non-runnable applications become runnable
* now that the max running apps of given queue has been changed
*
* Runs in O(n) where n is the number of apps that are non-runnable and in
* the queues that went from having no slack to having slack.
*/
public void updateRunnabilityOnReload() {
FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue();
List<List<FSAppAttempt>> appsNowMaybeRunnable =
new ArrayList<List<FSAppAttempt>>();
gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable);
updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE);
}
/**
* Checks to see whether any other applications runnable now that the given
* application has been removed from the given queue. And makes them so.
@ -156,6 +176,19 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
}
}
updateAppsRunnability(appsNowMaybeRunnable,
appsNowMaybeRunnable.size());
}
/**
* Checks to see whether applications are runnable now by iterating
* through each one of them and check if the queue and user have slack
*
* if we know how many apps can be runnable, there is no need to iterate
* through all apps, maxRunnableApps is used to break out of the iteration
*/
private void updateAppsRunnability(List<List<FSAppAttempt>>
appsNowMaybeRunnable, int maxRunnableApps) {
// Scan through and check whether this means that any apps are now runnable
Iterator<FSAppAttempt> iter = new MultiListStartTimeIterator(
appsNowMaybeRunnable);
@ -173,9 +206,7 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
next.getQueue().addApp(appSched, true);
noLongerPendingApps.add(appSched);
// No more than one app per list will be able to be made runnable, so
// we can stop looking after we've found that many
if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) {
if (noLongerPendingApps.size() >= maxRunnableApps) {
break;
}
}
@ -194,11 +225,10 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) {
if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) {
LOG.error("Waiting app " + appSched + " expected to be in "
+ "usersNonRunnableApps, but was not. This should never happen.");
+ "usersNonRunnableApps, but was not. This should never happen.");
}
}
}
/**
* Updates the relevant tracking variables after a runnable app with the given
* queue and user has been removed.

View File

@ -2289,6 +2289,314 @@ public void testUserMaxRunningApps() throws Exception {
assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
}
@Test (timeout = 5000)
public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception {
String allocBefore = "<?xml version=\"1.0\"?>" +
"<allocations>" +
"<queue name=\"root\">" +
"<queue name=\"queue1\">" +
"<maxRunningApps>1</maxRunningApps>" +
"</queue>" +
"</queue>" +
"</allocations>";
String allocAfter = "<?xml version=\"1.0\"?>" +
"<allocations>" +
"<queue name=\"root\">" +
"<queue name=\"queue1\">" +
"<maxRunningApps>3</maxRunningApps>" +
"</queue>" +
"</queue>" +
"</allocations>";
testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
}
@Test (timeout = 5000)
public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception {
String allocBefore = "<?xml version=\"1.0\"?>"+
"<allocations>"+
"<queue name=\"root\">"+
"<queue name=\"queue1\">"+
"<maxRunningApps>10</maxRunningApps>"+
"</queue>"+
"</queue>"+
"<user name=\"user1\">"+
"<maxRunningApps>1</maxRunningApps>"+
"</user>"+
"</allocations>";
String allocAfter = "<?xml version=\"1.0\"?>"+
"<allocations>"+
"<queue name=\"root\">"+
"<queue name=\"queue1\">"+
"<maxRunningApps>10</maxRunningApps>"+
"</queue>"+
"</queue>"+
"<user name=\"user1\">"+
"<maxRunningApps>3</maxRunningApps>"+
"</user>"+
"</allocations>";
testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
}
private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore,
String allocAfter) throws Exception {
// Set max running apps
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println(allocBefore);
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Request for app 1
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// App 1 should be running
assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
// App 2 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
// App 3 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println(allocAfter);
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.update();
scheduler.handle(updateEvent);
// App 2 should be running
assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
scheduler.update();
scheduler.handle(updateEvent);
// App 3 should be running
assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
// Now remove app 1
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
attId1, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should be running
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
}
@Test (timeout = 5000)
public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception {
String allocBefore = "<?xml version=\"1.0\"?>" +
"<allocations>" +
"<queue name=\"root\">" +
"<queue name=\"queue1\">" +
"<maxRunningApps>3</maxRunningApps>" +
"</queue>" +
"</queue>" +
"</allocations>";
String allocAfter = "<?xml version=\"1.0\"?>" +
"<allocations>" +
"<queue name=\"root\">" +
"<queue name=\"queue1\">" +
"<maxRunningApps>1</maxRunningApps>" +
"</queue>" +
"</queue>" +
"</allocations>";
testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
}
@Test (timeout = 5000)
public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception {
String allocBefore = "<?xml version=\"1.0\"?>"+
"<allocations>"+
"<queue name=\"root\">"+
"<queue name=\"queue1\">"+
"<maxRunningApps>10</maxRunningApps>"+
"</queue>"+
"</queue>"+
"<user name=\"user1\">"+
"<maxRunningApps>3</maxRunningApps>"+
"</user>"+
"</allocations>";
String allocAfter = "<?xml version=\"1.0\"?>"+
"<allocations>"+
"<queue name=\"root\">"+
"<queue name=\"queue1\">"+
"<maxRunningApps>10</maxRunningApps>"+
"</queue>"+
"</queue>"+
"<user name=\"user1\">"+
"<maxRunningApps>1</maxRunningApps>"+
"</user>"+
"</allocations>";
testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter);
}
private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore,
String allocAfter) throws Exception {
// Set max running apps
conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE);
PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println(allocBefore);
out.close();
scheduler.init(conf);
scheduler.start();
scheduler.reinitialize(conf, resourceManager.getRMContext());
// Add a node
RMNode node1 =
MockNodes
.newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1");
NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1);
scheduler.handle(nodeEvent1);
// Request for app 1
ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent);
// App 1 should be running
assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size());
ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1",
"user1", 1);
scheduler.update();
scheduler.handle(updateEvent);
// App 2 should be running
assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
// App 3 should be running
assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
out = new PrintWriter(new FileWriter(ALLOC_FILE));
out.println(allocAfter);
out.close();
scheduler.reinitialize(conf, resourceManager.getRMContext());
scheduler.update();
scheduler.handle(updateEvent);
// App 2 should still be running
assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size());
scheduler.update();
scheduler.handle(updateEvent);
// App 3 should still be running
assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size());
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
// Now remove app 1
AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
attId1, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
// Now remove app 2
appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
attId2, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should not be running
assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
// Now remove app 3
appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent(
attId3, RMAppAttemptState.FINISHED, false);
scheduler.handle(appRemovedEvent1);
scheduler.update();
scheduler.handle(updateEvent);
// App 4 should be running now
assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size());
}
@Test (timeout = 5000)
public void testReservationWhileMultiplePriorities() throws IOException {
scheduler.init(conf);