diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 214c6e7638..8941fdf649 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -2266,12 +2266,18 @@ public void updateApplicationPriority(SchedulerApplication app try { writeLock.lock(); FiCaSchedulerApp attempt = app.getCurrentAppAttempt(); - getOrderingPolicy().removeSchedulableEntity(attempt); - + boolean isActive = orderingPolicy.removeSchedulableEntity(attempt); + if (!isActive) { + pendingOrderingPolicy.removeSchedulableEntity(attempt); + } // Update new priority in SchedulerApplication attempt.setPriority(newAppPriority); - getOrderingPolicy().addSchedulableEntity(attempt); + if (isActive) { + orderingPolicy.addSchedulableEntity(attempt); + } else { + pendingOrderingPolicy.addSchedulableEntity(attempt); + } } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java index 706c274ddf..884e2368f4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestClientRMService.java @@ -1641,6 +1641,25 @@ protected ClientRMService createClientRMService() { rm.close(); } + @Test(timeout = 120000) + public void testUpdatePriorityAndKillAppWithZeroClusterResource() + throws Exception { + int maxPriority = 10; + int appPriority = 5; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + maxPriority); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); + ClientRMService rmService = rm.getClientRMService(); + testApplicationPriorityUpdation(rmService, app1, appPriority, appPriority); + rm.killApp(app1.getApplicationId()); + rm.waitForState(app1.getApplicationId(), RMAppState.KILLED); + rm.stop(); + } + @Test(timeout = 120000) public void testUpdateApplicationPriorityRequest() throws Exception { int maxPriority = 10; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java index 523d49d53d..2a346f828c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestApplicationPriority.java @@ -23,6 +23,7 @@ import static org.mockito.Mockito.when; import java.util.ArrayList; +import java.util.Collection; import java.util.Iterator; import java.util.List; import java.util.Map; @@ -36,6 +37,7 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.DrainDispatcher; +import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; @@ -54,6 +56,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.SchedulerEvent; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -733,4 +736,75 @@ protected Dispatcher createDispatcher() { rm2.stop(); rm1.stop(); } + + @Test(timeout = 120000) + public void testUpdatePriorityOnPendingAppAndKillAttempt() throws Exception { + int maxPriority = 10; + int appPriority = 5; + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration.MAX_CLUSTER_LEVEL_APPLICATION_PRIORITY, + maxPriority); + MockRM rm = new MockRM(conf); + rm.init(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + CSQueue defaultQueue = (LeafQueue) cs.getQueue("default"); + + // Update priority and kill application with no resource + RMApp app1 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Collection appsPending = + ((LeafQueue) defaultQueue).getPendingApplications(); + Collection activeApps = + ((LeafQueue) defaultQueue).getOrderingPolicy().getSchedulableEntities(); + + // Verify app is in pending state + Assert.assertEquals("Pending apps should be 1", 1, appsPending.size()); + Assert.assertEquals("Active apps should be 0", 0, activeApps.size()); + + // kill app1 which is pending + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app1); + + // Check ordering policy size when resource is added + MockNM nm1 = + new MockNM("127.0.0.1:1234", 8096, rm.getResourceTrackerService()); + nm1.registerNode(); + RMApp app2 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Assert.assertEquals("Pending apps should be 0", 0, appsPending.size()); + Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); + RMApp app3 = rm.submitApp(1024, Priority.newInstance(appPriority)); + RMApp app4 = rm.submitApp(1024, Priority.newInstance(appPriority)); + Assert.assertEquals("Pending apps should be 2", 2, appsPending.size()); + Assert.assertEquals("Active apps should be 1", 1, activeApps.size()); + // kill app3, pending apps should reduce to 1 + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 1, 1, app3); + // kill app2, running apps is killed and pending added to running + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 1, app2); + // kill app4, all apps are killed and both policy size should be zero + killAppAndVerifyOrderingPolicy(rm, defaultQueue, 0, 0, app4); + rm.stop(); + } + + private void killAppAndVerifyOrderingPolicy(MockRM rm, CSQueue defaultQueue, + int appsPendingExpected, int activeAppsExpected, RMApp app) + throws YarnException { + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + cs.updateApplicationPriority(Priority.newInstance(2), + app.getApplicationId()); + SchedulerEvent removeAttempt; + removeAttempt = new AppAttemptRemovedSchedulerEvent( + app.getCurrentAppAttempt().getAppAttemptId(), RMAppAttemptState.KILLED, + false); + cs.handle(removeAttempt); + rm.drainEvents(); + Collection appsPending = + ((LeafQueue) defaultQueue).getPendingApplications(); + Collection activeApps = + ((LeafQueue) defaultQueue).getApplications(); + Assert.assertEquals("Pending apps should be " + appsPendingExpected, + appsPendingExpected, appsPending.size()); + Assert.assertEquals("Active apps should be " + activeAppsExpected, + activeAppsExpected, activeApps.size()); + } + }