diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java index ac5c8a1516..08fedb578c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractLeafQueue.java @@ -578,6 +578,8 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, public void submitApplicationAttempt(FiCaSchedulerApp application, String userName, boolean isMoveApp) { // Careful! Locking order is important! + boolean isAppAlreadySubmitted = applicationAttemptMap.containsKey( + application.getApplicationAttemptId()); writeLock.lock(); try { // TODO, should use getUser, use this method just to avoid UT failure @@ -591,7 +593,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, } // We don't want to update metrics for move app - if (!isMoveApp) { + if (!isMoveApp && !isAppAlreadySubmitted) { boolean unmanagedAM = application.getAppSchedulingInfo() != null && application.getAppSchedulingInfo().isUnmanagedAM(); usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java index ea22c24b35..d192e7dcc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerApps.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; +import java.util.ArrayList; import java.util.List; import org.apache.hadoop.conf.Configuration; @@ -116,6 +117,8 @@ public class TestCapacitySchedulerApps { + public static final int MAX_PARALLEL_APPS = 5; + public static final String USER_0 = "user_0"; private ResourceManager resourceManager = null; private RMContext mockContext; @@ -237,18 +240,7 @@ public void testKillAllAppsInvalidSource() throws Exception { YarnScheduler scheduler = rm.getResourceScheduler(); // submit an app - MockRMAppSubmissionData data = - MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) - .withAppName("test-move-1") - .withUser("user_0") - .withAcls(null) - .withQueue("a1") - .withUnmanagedAM(false) - .build(); - RMApp app = MockRMAppSubmitter.submit(rm, data); - ApplicationAttemptId appAttemptId = - rm.getApplicationReport(app.getApplicationId()) - .getCurrentApplicationAttemptId(); + ApplicationAttemptId appAttemptId = submitApp(rm); // check preconditions List appsInA1 = scheduler.getAppsInQueue("a1"); @@ -1020,18 +1012,7 @@ public void testMoveAllApps() throws Exception { (AbstractYarnScheduler) rm.getResourceScheduler(); // submit an app - MockRMAppSubmissionData data = - MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) - .withAppName("test-move-1") - .withUser("user_0") - .withAcls(null) - .withQueue("a1") - .withUnmanagedAM(false) - .build(); - RMApp app = MockRMAppSubmitter.submit(rm, data); - ApplicationAttemptId appAttemptId = - rm.getApplicationReport(app.getApplicationId()) - .getCurrentApplicationAttemptId(); + ApplicationAttemptId appAttemptId = submitApp(rm); // check preconditions assertOneAppInQueue(scheduler, "a1"); @@ -1057,23 +1038,61 @@ public void testMoveAllApps() throws Exception { } @Test - public void testMoveAllAppsInvalidDestination() throws Exception { + public void testMaxParallelAppsPendingQueueMetrics() throws Exception { MockRM rm = setUpMove(); ResourceScheduler scheduler = rm.getResourceScheduler(); + CapacityScheduler cs = (CapacityScheduler) scheduler; + cs.getQueueContext().getConfiguration().setInt(CapacitySchedulerConfiguration.getQueuePrefix(A1) + + CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS, MAX_PARALLEL_APPS); + cs.reinitialize(cs.getQueueContext().getConfiguration(), mockContext); + List attemptIds = new ArrayList<>(); + for (int i = 0; i < 2 * MAX_PARALLEL_APPS; i++) { + attemptIds.add(submitApp(rm)); + } + + // Finish first batch to allow the other batch to run + for (int i = 0; i < MAX_PARALLEL_APPS; i++) { + cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i), + RMAppAttemptState.FINISHED, true)); + } + + // Finish the remaining apps + for (int i = MAX_PARALLEL_APPS; i < 2 * MAX_PARALLEL_APPS; i++) { + cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i), + RMAppAttemptState.FINISHED, true)); + } + + Assert.assertEquals("No pending app should remain for root queue", 0, + cs.getRootQueueMetrics().getAppsPending()); + Assert.assertEquals("No running application should remain for root queue", 0, + cs.getRootQueueMetrics().getAppsRunning()); + + rm.stop(); + } + + private ApplicationAttemptId submitApp(MockRM rm) throws Exception { // submit an app MockRMAppSubmissionData data = MockRMAppSubmissionData.Builder.createWithMemory(GB, rm) .withAppName("test-move-1") - .withUser("user_0") + .withUser(USER_0) .withAcls(null) .withQueue("a1") .withUnmanagedAM(false) .build(); RMApp app = MockRMAppSubmitter.submit(rm, data); - ApplicationAttemptId appAttemptId = - rm.getApplicationReport(app.getApplicationId()) - .getCurrentApplicationAttemptId(); + return rm.getApplicationReport(app.getApplicationId()) + .getCurrentApplicationAttemptId(); + } + + @Test + public void testMoveAllAppsInvalidDestination() throws Exception { + MockRM rm = setUpMove(); + ResourceScheduler scheduler = rm.getResourceScheduler(); + + // submit an app + ApplicationAttemptId appAttemptId = submitApp(rm); // check preconditions assertApps(scheduler, "root", appAttemptId);