diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 0b71beee10..9a52325c42 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -697,6 +697,9 @@ Release 2.7.0 - UNRELEASED YARN-3131. YarnClientImpl should check FAILED and KILLED state in submitApplication (Chang Li via jlowe) + + YARN-3231. FairScheduler: Changing queueMaxRunningApps interferes with pending + jobs. (Siqi Li via kasha) Release 2.6.0 - 2014-11-18 diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java index 2b597164ba..e8a955534e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FairScheduler.java @@ -1477,6 +1477,7 @@ public void onReload(AllocationConfiguration queueInfo) { allocConf = queueInfo; allocConf.getDefaultSchedulingPolicy().initialize(clusterResource); queueMgr.updateAllocationConfiguration(allocConf); + maxRunningEnforcer.updateRunnabilityOnReload(); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java index 2c90edd400..f75043814d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/MaxRunningAppsEnforcer.java @@ -104,6 +104,26 @@ public void trackNonRunnableApp(FSAppAttempt app) { usersNonRunnableApps.put(user, app); } + /** + * This is called after reloading the allocation configuration when the + * scheduler is reinitilized + * + * Checks to see whether any non-runnable applications become runnable + * now that the max running apps of given queue has been changed + * + * Runs in O(n) where n is the number of apps that are non-runnable and in + * the queues that went from having no slack to having slack. + */ + public void updateRunnabilityOnReload() { + FSParentQueue rootQueue = scheduler.getQueueManager().getRootQueue(); + List> appsNowMaybeRunnable = + new ArrayList>(); + + gatherPossiblyRunnableAppLists(rootQueue, appsNowMaybeRunnable); + + updateAppsRunnability(appsNowMaybeRunnable, Integer.MAX_VALUE); + } + /** * Checks to see whether any other applications runnable now that the given * application has been removed from the given queue. And makes them so. @@ -156,6 +176,19 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { } } + updateAppsRunnability(appsNowMaybeRunnable, + appsNowMaybeRunnable.size()); + } + + /** + * Checks to see whether applications are runnable now by iterating + * through each one of them and check if the queue and user have slack + * + * if we know how many apps can be runnable, there is no need to iterate + * through all apps, maxRunnableApps is used to break out of the iteration + */ + private void updateAppsRunnability(List> + appsNowMaybeRunnable, int maxRunnableApps) { // Scan through and check whether this means that any apps are now runnable Iterator iter = new MultiListStartTimeIterator( appsNowMaybeRunnable); @@ -173,9 +206,7 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { next.getQueue().addApp(appSched, true); noLongerPendingApps.add(appSched); - // No more than one app per list will be able to be made runnable, so - // we can stop looking after we've found that many - if (noLongerPendingApps.size() >= appsNowMaybeRunnable.size()) { + if (noLongerPendingApps.size() >= maxRunnableApps) { break; } } @@ -194,11 +225,10 @@ public void updateRunnabilityOnAppRemoval(FSAppAttempt app, FSLeafQueue queue) { if (!usersNonRunnableApps.remove(appSched.getUser(), appSched)) { LOG.error("Waiting app " + appSched + " expected to be in " - + "usersNonRunnableApps, but was not. This should never happen."); + + "usersNonRunnableApps, but was not. This should never happen."); } } } - /** * Updates the relevant tracking variables after a runnable app with the given * queue and user has been removed. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java index c29dbfc149..9fadba97ed 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/TestFairScheduler.java @@ -2288,7 +2288,315 @@ public void testUserMaxRunningApps() throws Exception { // Request should be fulfilled assertEquals(2, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); } - + + @Test (timeout = 5000) + public void testIncreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "" + + "" + + "" + + "" + + "1" + + "" + + "" + + ""; + + String allocAfter = "" + + "" + + "" + + "" + + "3" + + "" + + "" + + ""; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testIncreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "1"+ + ""+ + ""; + + String allocAfter = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "3"+ + ""+ + ""; + + testIncreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testIncreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + + @Test (timeout = 5000) + public void testDecreaseQueueMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = "" + + "" + + "" + + "" + + "3" + + "" + + "" + + ""; + + String allocAfter = "" + + "" + + "" + + "" + + "1" + + "" + + "" + + ""; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + @Test (timeout = 5000) + public void testDecreaseUserMaxRunningAppsOnTheFly() throws Exception { + String allocBefore = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "3"+ + ""+ + ""; + + String allocAfter = ""+ + ""+ + ""+ + ""+ + "10"+ + ""+ + ""+ + ""+ + "1"+ + ""+ + ""; + + testDecreaseQueueSettingOnTheFlyInternal(allocBefore, allocAfter); + } + + private void testDecreaseQueueSettingOnTheFlyInternal(String allocBefore, + String allocAfter) throws Exception { + // Set max running apps + conf.set(FairSchedulerConfiguration.ALLOCATION_FILE, ALLOC_FILE); + + PrintWriter out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocBefore); + out.close(); + + scheduler.init(conf); + scheduler.start(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + // Add a node + RMNode node1 = + MockNodes + .newNodeInfo(1, Resources.createResource(8192, 8), 1, "127.0.0.1"); + NodeAddedSchedulerEvent nodeEvent1 = new NodeAddedSchedulerEvent(node1); + scheduler.handle(nodeEvent1); + + // Request for app 1 + ApplicationAttemptId attId1 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); + scheduler.handle(updateEvent); + + // App 1 should be running + assertEquals(1, scheduler.getSchedulerApp(attId1).getLiveContainers().size()); + + ApplicationAttemptId attId2 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId3 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + ApplicationAttemptId attId4 = createSchedulingRequest(1024, "queue1", + "user1", 1); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + // App 3 should be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + out = new PrintWriter(new FileWriter(ALLOC_FILE)); + out.println(allocAfter); + out.close(); + scheduler.reinitialize(conf, resourceManager.getRMContext()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 2 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId2).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 3 should still be running + assertEquals(1, scheduler.getSchedulerApp(attId3).getLiveContainers().size()); + + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 1 + AppAttemptRemovedSchedulerEvent appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId1, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 2 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId2, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should not be running + assertEquals(0, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + + // Now remove app 3 + appRemovedEvent1 = new AppAttemptRemovedSchedulerEvent( + attId3, RMAppAttemptState.FINISHED, false); + + scheduler.handle(appRemovedEvent1); + scheduler.update(); + scheduler.handle(updateEvent); + + // App 4 should be running now + assertEquals(1, scheduler.getSchedulerApp(attId4).getLiveContainers().size()); + } + @Test (timeout = 5000) public void testReservationWhileMultiplePriorities() throws IOException { scheduler.init(conf);