From d60c2fa17f1b6ccb412ce25fc9e28b7af4c8c0a5 Mon Sep 17 00:00:00 2001 From: Vinod Kumar Vavilapalli Date: Wed, 27 Mar 2013 03:44:34 +0000 Subject: [PATCH] YARN-474. Fix CapacityScheduler to trigger application-activation when am-resource-percent configuration is refreshed. Contributed by Zhijie Shen. git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461402 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/capacity/LeafQueue.java | 4 ++ .../scheduler/capacity/TestLeafQueue.java | 62 ++++++++++++++++++- 3 files changed, 67 insertions(+), 2 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 4c7bbc7c3e..e70fe9381f 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -143,6 +143,9 @@ Release 2.0.5-beta - UNRELEASED YARN-498. Unmanaged AM launcher does not set various constants in env for an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas) + YARN-474. Fix CapacityScheduler to trigger application-activation when + am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv) + Release 2.0.4-alpha - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 719cf1e578..1785ec565a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -607,6 +607,10 @@ public synchronized void reinitialize( newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); + + // queue metrics are updated, more resource may be available + // activate the pending applications if possible + activateApplications(); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index ccf2a47c12..0460b3f6ad 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -138,6 +138,7 @@ public void setUp() throws Exception { private static final String C = "c"; private static final String C1 = "c1"; private static final String D = "d"; + private static final String E = "e"; private void setupQueueConfiguration( CapacitySchedulerConfiguration conf, final String newRoot) { @@ -148,7 +149,7 @@ private void setupQueueConfiguration( conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; - conf.setQueues(Q_newRoot, new String[] {A, B, C, D}); + conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E}); conf.setCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); @@ -174,10 +175,14 @@ private void setupQueueConfiguration( conf.setCapacity(Q_C1, 100); final String Q_D = Q_newRoot + "." + D; - conf.setCapacity(Q_D, 10); + conf.setCapacity(Q_D, 9); conf.setMaximumCapacity(Q_D, 11); conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d"); + final String Q_E = Q_newRoot + "." + E; + conf.setCapacity(Q_E, 1); + conf.setMaximumCapacity(Q_E, 1); + conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); } static LeafQueue stubLeafQueue(LeafQueue queue) { @@ -1567,6 +1572,59 @@ public void testSchedulingConstraints() throws Exception { } + @Test (timeout = 30000) + public void testActivateApplicationAfterQueueRefresh() throws Exception { + + // Manipulate queue 'e' + LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E)); + + // Users + final String user_e = "user_e"; + + // Submit applications + final ApplicationAttemptId appAttemptId_0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app_0 = + new FiCaSchedulerApp(appAttemptId_0, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_0, user_e, E); + + final ApplicationAttemptId appAttemptId_1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app_1 = + new FiCaSchedulerApp(appAttemptId_1, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_1, user_e, E); // same user + + final ApplicationAttemptId appAttemptId_2 = + TestUtils.getMockApplicationAttemptId(2, 0); + FiCaSchedulerApp app_2 = + new FiCaSchedulerApp(appAttemptId_2, user_e, e, + mock(ActiveUsersManager.class), rmContext); + e.submitApplication(app_2, user_e, E); // same user + + // before reinitialization + assertEquals(2, e.activeApplications.size()); + assertEquals(1, e.pendingApplications.size()); + + csConf.setDouble(CapacitySchedulerConfiguration + .MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT, + CapacitySchedulerConfiguration + .DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2); + Map newQueues = new HashMap(); + CSQueue newRoot = + CapacityScheduler.parseQueue(csContext, csConf, null, + CapacitySchedulerConfiguration.ROOT, + newQueues, queues, + TestUtils.spyHook); + queues = newQueues; + root.reinitialize(newRoot, cs.getClusterResources()); + + // after reinitialization + assertEquals(3, e.activeApplications.size()); + assertEquals(0, e.pendingApplications.size()); + } + public boolean hasQueueACL(List aclInfos, QueueACL acl) { for (QueueUserACLInfo aclInfo : aclInfos) { if (aclInfo.getUserAcls().contains(acl)) {