diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 1af325023d..d507e53543 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -176,6 +176,15 @@ public void submitApplication(ApplicationId applicationId, String user, public void submitApplicationAttempt(FiCaSchedulerApp application, String userName); + /** + * Submit an application attempt to the queue. + * @param application application whose attempt is being submitted + * @param userName user who submitted the application attempt + * @param isMoveApp is application being moved across the queue + */ + public void submitApplicationAttempt(FiCaSchedulerApp application, + String userName, boolean isMoveApp); + /** * An application submitted to this queue has finished. * @param applicationId diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index b8fdd423b9..7cd2c1c08e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2431,7 +2431,7 @@ public String moveApplication(ApplicationId appId, if (!app.isStopped()) { source.finishApplicationAttempt(app, sourceQueueName); // Submit to a new queue - dest.submitApplicationAttempt(app, user); + dest.submitApplicationAttempt(app, user, true); } // Finish app & update metrics app.move(dest); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 340354495b..9d8e1e35e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -578,6 +578,12 @@ public void reinitialize( @Override public void submitApplicationAttempt(FiCaSchedulerApp application, String userName) { + submitApplicationAttempt(application, userName, false); + } + + @Override + public void submitApplicationAttempt(FiCaSchedulerApp application, + String userName, boolean isMoveApp) { // Careful! Locking order is important! writeLock.lock(); try { @@ -592,7 +598,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, } // We don't want to update metrics for move app - if (application.isPending()) { + if (!isMoveApp) { metrics.submitAppAttempt(userName); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 53e8fd2429..8a7acd6438 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -478,6 +478,13 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, // submit attempt logic. } + @Override + public void submitApplicationAttempt(FiCaSchedulerApp application, + String userName, boolean isMoveApp) { + throw new UnsupportedOperationException("Submission of application attempt" + + " to parent queue is not supported"); + } + @Override public void finishApplicationAttempt(FiCaSchedulerApp application, String queue) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index ae4009ab8f..855c793edc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -57,6 +57,7 @@ import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.Credentials; import org.apache.hadoop.security.ShellBasedUnixGroupsMapping; @@ -222,6 +223,8 @@ protected RMNodeLabelsManager createNodeLabelManager() { @After public void tearDown() throws Exception { if (resourceManager != null) { + QueueMetrics.clearQueueMetrics(); + DefaultMetricsSystem.shutdown(); resourceManager.stop(); } } @@ -1859,13 +1862,13 @@ public void testMoveAppBasic() throws Exception { MockRM rm = setUpMove(); AbstractYarnScheduler scheduler = (AbstractYarnScheduler) rm.getResourceScheduler(); - + QueueMetrics metrics = scheduler.getRootQueueMetrics(); + Assert.assertEquals(0, metrics.getAppsPending()); // submit an app RMApp app = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); ApplicationAttemptId appAttemptId = rm.getApplicationReport(app.getApplicationId()) .getCurrentApplicationAttemptId(); - // check preconditions List appsInA1 = scheduler.getAppsInQueue("a1"); assertEquals(1, appsInA1.size()); @@ -1882,6 +1885,8 @@ public void testMoveAppBasic() throws Exception { assertTrue(appsInRoot.contains(appAttemptId)); assertEquals(1, appsInRoot.size()); + assertEquals(1, metrics.getAppsPending()); + List appsInB1 = scheduler.getAppsInQueue("b1"); assertTrue(appsInB1.isEmpty()); @@ -1907,6 +1912,8 @@ public void testMoveAppBasic() throws Exception { assertTrue(appsInRoot.contains(appAttemptId)); assertEquals(1, appsInRoot.size()); + assertEquals(1, metrics.getAppsPending()); + appsInA1 = scheduler.getAppsInQueue("a1"); assertTrue(appsInA1.isEmpty()); @@ -1916,6 +1923,67 @@ public void testMoveAppBasic() throws Exception { rm.stop(); } + @Test + public void testMoveAppPendingMetrics() throws Exception { + MockRM rm = setUpMove(); + AbstractYarnScheduler scheduler = + (AbstractYarnScheduler) rm.getResourceScheduler(); + QueueMetrics metrics = scheduler.getRootQueueMetrics(); + List appsInA1 = scheduler.getAppsInQueue("a1"); + List appsInB1 = scheduler.getAppsInQueue("b1"); + + assertEquals(0, appsInA1.size()); + assertEquals(0, appsInB1.size()); + Assert.assertEquals(0, metrics.getAppsPending()); + + // submit two apps in a1 + RMApp app1 = rm.submitApp(GB, "test-move-1", "user_0", null, "a1"); + RMApp app2 = rm.submitApp(GB, "test-move-2", "user_0", null, "a1"); + + appsInA1 = scheduler.getAppsInQueue("a1"); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(2, appsInA1.size()); + assertEquals(0, appsInB1.size()); + assertEquals(2, metrics.getAppsPending()); + + // submit one app in b1 + RMApp app3 = rm.submitApp(GB, "test-move-2", "user_0", null, "b1"); + + appsInA1 = scheduler.getAppsInQueue("a1"); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(2, appsInA1.size()); + assertEquals(1, appsInB1.size()); + assertEquals(3, metrics.getAppsPending()); + + // now move the app1 from a1 to b1 + scheduler.moveApplication(app1.getApplicationId(), "b1"); + + appsInA1 = scheduler.getAppsInQueue("a1"); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInA1.size()); + assertEquals(2, appsInB1.size()); + assertEquals(3, metrics.getAppsPending()); + + // now move the app2 from a1 to b1 + scheduler.moveApplication(app2.getApplicationId(), "b1"); + + appsInA1 = scheduler.getAppsInQueue("a1"); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(0, appsInA1.size()); + assertEquals(3, appsInB1.size()); + assertEquals(3, metrics.getAppsPending()); + + // now move the app3 from b1 to a1 + scheduler.moveApplication(app3.getApplicationId(), "a1"); + + appsInA1 = scheduler.getAppsInQueue("a1"); + appsInB1 = scheduler.getAppsInQueue("b1"); + assertEquals(1, appsInA1.size()); + assertEquals(2, appsInB1.size()); + assertEquals(3, metrics.getAppsPending()); + rm.stop(); + } + @Test public void testMoveAppSameParent() throws Exception { MockRM rm = setUpMove();