From 9485c9aee6e9bb935c3e6ae4da81d70b621781de Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Wed, 25 Jul 2018 16:22:04 +0000 Subject: [PATCH] YARN-4606. CapacityScheduler: applications could get starved because computation of #activeUsers considers pending apps. Contributed by Manikandan R --- .../scheduler/capacity/UsersManager.java | 27 +++- .../capacity/TestCapacityScheduler.java | 128 ++++++++++++++++++ .../capacity/TestContainerAllocation.java | 43 ++++++ 3 files changed, 197 insertions(+), 1 deletion(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 747a488a5a..83ee6c09fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -85,6 +85,7 @@ public class UsersManager implements AbstractUsersManager { private final QueueMetrics metrics; private AtomicInteger activeUsers = new AtomicInteger(0); + private AtomicInteger activeUsersWithOnlyPendingApps = new AtomicInteger(0); private Map> usersApplications = new HashMap>(); @@ -671,9 +672,23 @@ private Map reComputeUserLimits(String userName, // update in local storage userLimitPerSchedulingMode.put(schedulingMode, computedUserLimit); + computeNumActiveUsersWithOnlyPendingApps(); + return userLimitPerSchedulingMode; } + // This method is called within the lock. + private void computeNumActiveUsersWithOnlyPendingApps() { + int numPendingUsers = 0; + for (User user : users.values()) { + if ((user.getPendingApplications() > 0) + && (user.getActiveApplications() <= 0)) { + numPendingUsers++; + } + } + activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers); + } + private Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, @@ -839,6 +854,11 @@ public void activateApplication(String user, ApplicationId applicationId) { try { this.writeLock.lock(); + User userDesc = getUser(user); + if (userDesc != null && userDesc.getActiveApplications() <= 0) { + return; + } + Set userApps = usersApplications.get(user); if (userApps == null) { userApps = new HashSet(); @@ -893,7 +913,7 @@ public void deactivateApplication(String user, ApplicationId applicationId) { @Override public int getNumActiveUsers() { - return activeUsers.get(); + return activeUsers.get() + activeUsersWithOnlyPendingApps.get(); } float sumActiveUsersTimesWeights() { @@ -1090,4 +1110,9 @@ public void updateUserWeights() { this.writeLock.unlock(); } } + + @VisibleForTesting + public int getNumActiveUsersWithOnlyPendingApps() { + return activeUsersWithOnlyPendingApps.get(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 79cdcfe08d..8d948b57ba 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -4978,4 +4978,132 @@ public void testContainerAllocationLocalitySkipped() throws Exception { Assert.assertEquals(AllocationState.QUEUE_SKIPPED, ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); } + + @Test + public void testMoveAppWithActiveUsersWithOnlyPendingApps() throws Exception { + + YarnConfiguration conf = new YarnConfiguration(); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + + // Define top-level queues + newConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] { "a", "b" }); + + newConf.setCapacity(A, 50); + newConf.setCapacity(B, 50); + + // Define 2nd-level queues + newConf.setQueues(A, new String[] { "a1" }); + newConf.setCapacity(A1, 100); + newConf.setUserLimitFactor(A1, 2.0f); + newConf.setMaximumAMResourcePercentPerPartition(A1, "", 0.1f); + + newConf.setQueues(B, new String[] { "b1" }); + newConf.setCapacity(B1, 100); + newConf.setUserLimitFactor(B1, 2.0f); + + LOG.info("Setup top-level queues a and b"); + + MockRM rm = new MockRM(newConf); + rm.start(); + + CapacityScheduler scheduler = + (CapacityScheduler) rm.getResourceScheduler(); + + MockNM nm1 = rm.registerNode("h1:1234", 16 * GB); + + // submit an app + RMApp app = rm.submitApp(GB, "test-move-1", "u1", null, "a1"); + MockAM am1 = MockRM.launchAndRegisterAM(app, rm, nm1); + + ApplicationAttemptId appAttemptId = + rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + + RMApp app2 = rm.submitApp(1 * GB, "app", "u2", null, "a1"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm1); + + RMApp app3 = rm.submitApp(1 * GB, "app", "u3", null, "a1"); + + RMApp app4 = rm.submitApp(1 * GB, "app", "u4", null, "a1"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + RMNode rmNode1 = rm.getRMContext().getRMNodes().get(nm1.getNodeId()); + + // check preconditions + List appsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(4, appsInA1.size()); + String queue = + scheduler.getApplicationAttempt(appsInA1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("a1", queue); + + List appsInA = scheduler.getAppsInQueue("a"); + assertTrue(appsInA.contains(appAttemptId)); + assertEquals(4, appsInA.size()); + + List appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List appsInB1 = scheduler.getAppsInQueue("b1"); + assertTrue(appsInB1.isEmpty()); + + List appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.isEmpty()); + + UsersManager um = + (UsersManager) scheduler.getQueue("a1").getAbstractUsersManager(); + + assertEquals(4, um.getNumActiveUsers()); + assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + + // now move the app + scheduler.moveAllApps("a1", "b1"); + + //Triggering this event so that user limit computation can + //happen again + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(500); + } + + // check postconditions + appsInB1 = scheduler.getAppsInQueue("b1"); + + assertEquals(4, appsInB1.size()); + queue = + scheduler.getApplicationAttempt(appsInB1.get(0)).getQueue() + .getQueueName(); + Assert.assertEquals("b1", queue); + + appsInB = scheduler.getAppsInQueue("b"); + assertTrue(appsInB.contains(appAttemptId)); + assertEquals(4, appsInB.size()); + + appsInRoot = scheduler.getAppsInQueue("root"); + assertTrue(appsInRoot.contains(appAttemptId)); + assertEquals(4, appsInRoot.size()); + + List oldAppsInA1 = scheduler.getAppsInQueue("a1"); + assertEquals(0, oldAppsInA1.size()); + + UsersManager um_b1 = + (UsersManager) scheduler.getQueue("b1").getAbstractUsersManager(); + + assertEquals(2, um_b1.getNumActiveUsers()); + assertEquals(2, um_b1.getNumActiveUsersWithOnlyPendingApps()); + + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(4, appsInB1.size()); + rm.close(); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java index 25e535ae8f..b9bfc2aab5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerAllocation.java @@ -941,4 +941,47 @@ public void testUserLimitAllocationMultipleContainers() throws Exception { rm1.close(); } + + @Test + public void testActiveUsersWithOnlyPendingApps() throws Exception { + + CapacitySchedulerConfiguration newConf = + new CapacitySchedulerConfiguration(conf); + newConf.setMaximumAMResourcePercentPerPartition( + CapacitySchedulerConfiguration.ROOT + ".default", "", 0.2f); + MockRM rm1 = new MockRM(newConf); + + rm1.getRMContext().setNodeLabelManager(mgr); + rm1.start(); + MockNM nm1 = rm1.registerNode("h1:1234", 8 * GB); + + RMApp app1 = rm1.submitApp(1 * GB, "app", "u1", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm1, nm1); + + RMApp app2 = rm1.submitApp(1 * GB, "app", "u2", null, "default"); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm1, nm1); + + RMApp app3 = rm1.submitApp(1 * GB, "app", "u3", null, "default"); + + RMApp app4 = rm1.submitApp(1 * GB, "app", "u4", null, "default"); + + // Each application asks 50 * 1GB containers + am1.allocate("*", 1 * GB, 50, null); + am2.allocate("*", 1 * GB, 50, null); + + CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); + RMNode rmNode1 = rm1.getRMContext().getRMNodes().get(nm1.getNodeId()); + + for (int i = 0; i < 10; i++) { + cs.handle(new NodeUpdateSchedulerEvent(rmNode1)); + Thread.sleep(1000); + } + LeafQueue lq = (LeafQueue) cs.getQueue("default"); + UsersManager um = (UsersManager) lq.getAbstractUsersManager(); + + Assert.assertEquals(4, um.getNumActiveUsers()); + Assert.assertEquals(2, um.getNumActiveUsersWithOnlyPendingApps()); + Assert.assertEquals(2, lq.getMetrics().getAppsPending()); + rm1.close(); + } }