YARN-11185. Pending app metrics are increased doubly when a queue reaches its max-parallel-apps limit. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2022-06-20 15:03:58 +02:00
parent 5d08ffa769
commit 3a66348fda
2 changed files with 51 additions and 30 deletions

View File

@ -578,6 +578,8 @@ public void submitApplicationAttempt(FiCaSchedulerApp application,
public void submitApplicationAttempt(FiCaSchedulerApp application,
String userName, boolean isMoveApp) {
// Careful! Locking order is important!
boolean isAppAlreadySubmitted = applicationAttemptMap.containsKey(
application.getApplicationAttemptId());
writeLock.lock();
try {
// TODO, should use getUser, use this method just to avoid UT failure
@ -591,7 +593,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application,
}
// We don't want to update metrics for move app
if (!isMoveApp) {
if (!isMoveApp && !isAppAlreadySubmitted) {
boolean unmanagedAM = application.getAppSchedulingInfo() != null &&
application.getAppSchedulingInfo().isUnmanagedAM();
usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM);

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import java.util.ArrayList;
import java.util.List;
import org.apache.hadoop.conf.Configuration;
@ -116,6 +117,8 @@
public class TestCapacitySchedulerApps {
public static final int MAX_PARALLEL_APPS = 5;
public static final String USER_0 = "user_0";
private ResourceManager resourceManager = null;
private RMContext mockContext;
@ -237,18 +240,7 @@ public void testKillAllAppsInvalidSource() throws Exception {
YarnScheduler scheduler = rm.getResourceScheduler();
// submit an app
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("test-move-1")
.withUser("user_0")
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
List<ApplicationAttemptId> appsInA1 = scheduler.getAppsInQueue("a1");
@ -1020,18 +1012,7 @@ public void testMoveAllApps() throws Exception {
(AbstractYarnScheduler) rm.getResourceScheduler();
// submit an app
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("test-move-1")
.withUser("user_0")
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
assertOneAppInQueue(scheduler, "a1");
@ -1057,23 +1038,61 @@ public void testMoveAllApps() throws Exception {
}
@Test
public void testMoveAllAppsInvalidDestination() throws Exception {
public void testMaxParallelAppsPendingQueueMetrics() throws Exception {
MockRM rm = setUpMove();
ResourceScheduler scheduler = rm.getResourceScheduler();
CapacityScheduler cs = (CapacityScheduler) scheduler;
cs.getQueueContext().getConfiguration().setInt(CapacitySchedulerConfiguration.getQueuePrefix(A1)
+ CapacitySchedulerConfiguration.MAX_PARALLEL_APPLICATIONS, MAX_PARALLEL_APPS);
cs.reinitialize(cs.getQueueContext().getConfiguration(), mockContext);
List<ApplicationAttemptId> attemptIds = new ArrayList<>();
for (int i = 0; i < 2 * MAX_PARALLEL_APPS; i++) {
attemptIds.add(submitApp(rm));
}
// Finish first batch to allow the other batch to run
for (int i = 0; i < MAX_PARALLEL_APPS; i++) {
cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
RMAppAttemptState.FINISHED, true));
}
// Finish the remaining apps
for (int i = MAX_PARALLEL_APPS; i < 2 * MAX_PARALLEL_APPS; i++) {
cs.handle(new AppAttemptRemovedSchedulerEvent(attemptIds.get(i),
RMAppAttemptState.FINISHED, true));
}
Assert.assertEquals("No pending app should remain for root queue", 0,
cs.getRootQueueMetrics().getAppsPending());
Assert.assertEquals("No running application should remain for root queue", 0,
cs.getRootQueueMetrics().getAppsRunning());
rm.stop();
}
private ApplicationAttemptId submitApp(MockRM rm) throws Exception {
// submit an app
MockRMAppSubmissionData data =
MockRMAppSubmissionData.Builder.createWithMemory(GB, rm)
.withAppName("test-move-1")
.withUser("user_0")
.withUser(USER_0)
.withAcls(null)
.withQueue("a1")
.withUnmanagedAM(false)
.build();
RMApp app = MockRMAppSubmitter.submit(rm, data);
ApplicationAttemptId appAttemptId =
rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
return rm.getApplicationReport(app.getApplicationId())
.getCurrentApplicationAttemptId();
}
@Test
public void testMoveAllAppsInvalidDestination() throws Exception {
MockRM rm = setUpMove();
ResourceScheduler scheduler = rm.getResourceScheduler();
// submit an app
ApplicationAttemptId appAttemptId = submitApp(rm);
// check preconditions
assertApps(scheduler, "root", appAttemptId);