From b7384a8d02ae3cace8c7b5eae84f8f3916a0b177 Mon Sep 17 00:00:00 2001 From: Wangda Tan Date: Thu, 21 Jan 2021 09:27:37 -0800 Subject: [PATCH] YARN-10531. Be able to disable user limit factor for CapacityScheduler Leaf Queue. (Qi Zhu via wangda) Change-Id: I670e5525619b320745254609c48e7e1afb084835 --- .../scheduler/capacity/AbstractCSQueue.java | 9 +- .../scheduler/capacity/LeafQueue.java | 26 ++++- .../scheduler/capacity/ParentQueue.java | 4 +- .../scheduler/capacity/PlanQueue.java | 3 + .../scheduler/capacity/UsersManager.java | 12 +- ...CapacitySchedulerNewQueueAutoCreation.java | 35 ++++++ .../scheduler/capacity/TestLeafQueue.java | 110 +++++++++++++++++- 7 files changed, 190 insertions(+), 9 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index bc3ff2294f..12ce05f279 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -1535,8 +1535,13 @@ private void deriveCapacityFromAbsoluteConfigurations(String label, leafQueue.setMaxApplications(maxApplications); int maxApplicationsPerUser = Math.min(maxApplications, - (int) (maxApplications * (leafQueue.getUsersManager().getUserLimit() - / 100.0f) * leafQueue.getUsersManager().getUserLimitFactor())); + (int) (maxApplications + * (leafQueue.getUsersManager().getUserLimit() / 100.0f) + * leafQueue.getUsersManager().getUserLimitFactor())); + if (leafQueue.getUsersManager().getUserLimitFactor() == -1) { + maxApplicationsPerUser = maxApplications; + } + leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser); LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications=" + maxApplications + ", maxApplicationsPerUser=" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 15c321fca0..6bf8d0a471 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -708,16 +708,33 @@ public Resource getUserAMResourceLimitPerPartition( queueCapacities.getMaxAMResourcePercentage(nodePartition) * effectiveUserLimit * usersManager.getUserLimitFactor(), minimumAllocation); + + if (getUserLimitFactor() == -1) { + userAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition), + minimumAllocation); + } + userAMLimit = Resources.min(resourceCalculator, lastClusterResource, userAMLimit, Resources.clone(getAMResourceLimitPerPartition(nodePartition))); - Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp( + Resource preWeighteduserAMLimit = + Resources.multiplyAndNormalizeUp( resourceCalculator, queuePartitionResource, queueCapacities.getMaxAMResourcePercentage(nodePartition) * preWeightedUserLimit * usersManager.getUserLimitFactor(), minimumAllocation); + + if (getUserLimitFactor() == -1) { + preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp( + resourceCalculator, queuePartitionResource, + queueCapacities.getMaxAMResourcePercentage(nodePartition), + minimumAllocation); + } + preWeighteduserAMLimit = Resources.min(resourceCalculator, lastClusterResource, preWeighteduserAMLimit, @@ -1896,9 +1913,14 @@ private void updateAbsoluteCapacitiesAndRelatedFields() { maxApplications = (int) (maxSystemApps * queueCapacities.getAbsoluteCapacity()); } - maxApplicationsPerUser = Math.min(maxApplications, + maxApplicationsPerUser = + Math.min(maxApplications, (int) (maxApplications * (usersManager.getUserLimit() / 100.0f) * usersManager.getUserLimitFactor())); + + if (getUserLimitFactor() == -1) { + maxApplicationsPerUser = maxApplications; + } } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 0a2f082007..b412e8a1dd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -478,8 +478,8 @@ private CapacitySchedulerConfiguration getConfForAutoCreatedQueue( new CapacitySchedulerConfiguration( csContext.getConfiguration(), false); if (isLeaf) { - // FIXME: Ideally we should disable user limit factor, see YARN-10531 - // dupCSConfig.setUserLimitFactor(childQueuePath, ); + // set to -1, to disable it + dupCSConfig.setUserLimitFactor(childQueuePath, -1); // Set Max AM percentage to a higher value dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java index 4dd3317e3e..f2b0e5aaf6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/PlanQueue.java @@ -64,6 +64,9 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName, float userLimitFactor = conf.getUserLimitFactor(queuePath); int maxAppsPerUserForReservation = (int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); + if (userLimitFactor == -1) { + maxAppsPerUserForReservation = maxAppsForReservation; + } updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, maxAppsPerUserForReservation); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 14766e9953..6f7d8f6155 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -791,8 +791,16 @@ partitionResource, getUsageRatio(nodePartition), // IGNORE_PARTITION_EXCLUSIVITY allocation. Resource maxUserLimit = Resources.none(); if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, - getUserLimitFactor()); + // If user-limit-factor set to -1, we should disabled user limit. + if (getUserLimitFactor() != -1) { + maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, + getUserLimitFactor()); + } else { + maxUserLimit = lQueue. + getEffectiveMaxCapacityDown( + nodePartition, lQueue.getMinimumAllocation()); + } + } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { maxUserLimit = partitionResource; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java index eada112039..0c5375e83e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerNewQueueAutoCreation.java @@ -461,6 +461,41 @@ public void testChildlessParentQueueWhenAutoQueueCreationEnabled() "for auto queue creation", ((ParentQueue)empty).isEligibleForAutoQueueCreation()); } + + public void testAutoCreateQueueUserLimitDisabled() throws Exception { + startScheduler(); + createBasicQueueStructureAndValidate(); + + submitApp(cs, USER0, USER0, "root.e-auto"); + + AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto"); + Assert.assertNotNull(e); + Assert.assertTrue(e.isDynamicQueue()); + + AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue( + "root.e-auto." + USER0); + Assert.assertNotNull(user0); + Assert.assertTrue(user0.isDynamicQueue()); + Assert.assertTrue(user0 instanceof LeafQueue); + + LeafQueue user0LeafQueue = (LeafQueue)user0; + + // Assert user limit factor is -1 + Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1); + + // Assert user max applications not limited + Assert.assertEquals(user0LeafQueue.getMaxApplicationsPerUser(), + user0LeafQueue.getMaxApplications()); + + // Assert AM Resource + Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(), + user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6); + + // Assert user limit (no limit) when limit factor is -1 + Assert.assertEquals(MAX_MEMORY*GB, + user0LeafQueue.getEffectiveMaxCapacityDown("", + user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6); + } private LeafQueue createQueue(String queuePath) throws YarnException { return autoQueueHandler.autoCreateQueue( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java index 0c9799d932..889da07add 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestLeafQueue.java @@ -1436,6 +1436,114 @@ public void testUserLimitCacheActiveUsersChanged() throws Exception { .get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize()); } + @Test + public void testDisabledUserLimitFactor() throws Exception { + // Mock the queue + LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A)); + //unset maxCapacity + a.setMaxCapacity(1.0f); + + when(csContext.getClusterResource()) + .thenReturn(Resources.createResource(16 * GB, 32)); + + // Users + final String user0 = "user0"; + final String user1 = "user1"; + + // Submit applications + final ApplicationAttemptId appAttemptId0 = + TestUtils.getMockApplicationAttemptId(0, 0); + FiCaSchedulerApp app0 = + new FiCaSchedulerApp(appAttemptId0, user0, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app0, user0); + + final ApplicationAttemptId appAttemptId1 = + TestUtils.getMockApplicationAttemptId(1, 0); + FiCaSchedulerApp app1 = + new FiCaSchedulerApp(appAttemptId1, user1, a, + a.getAbstractUsersManager(), spyRMContext); + a.submitApplicationAttempt(app1, user1); // different user + + // Setup some nodes + String host0 = "127.0.0.1"; + FiCaSchedulerNode node0 = + TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB); + String host1 = "127.0.0.2"; + FiCaSchedulerNode node1 = + TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB); + + final int numNodes = 2; + Resource clusterResource = + Resources.createResource(numNodes * (8*GB), numNodes * 16); + when(csContext.getNumClusterNodes()).thenReturn(numNodes); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // Setup resource-requests + Priority priority = TestUtils.createMockPriority(1); + app0.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true, + priority, recordFactory))); + + app1.updateResourceRequests(Collections.singletonList( + TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true, + priority, recordFactory))); + + Map apps = ImmutableMap.of( + app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(), + app1); + Map nodes = ImmutableMap.of(node0.getNodeID(), + node0, node1.getNodeID(), node1); + + /** + * Start testing ... + */ + a.setUserLimitFactor(1); + a.setUserLimit(50); + + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + // There're two active users + assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers()); + + // 1 container to user0 + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(3*GB, a.getUsedResources().getMemorySize()); + assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize()); + assertEquals(0*GB, app1.getCurrentConsumption().getMemorySize()); + + // Allocate one container to app1. Even if app0 + // submit earlier, it cannot get this container assigned since user0 + // exceeded user-limit already. + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node0, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(4*GB, a.getUsedResources().getMemorySize()); + assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize()); + + // Set to -1 , disabled user limit factor + // There will be not limited + a.setUserLimitFactor(-1); + root.updateClusterResource(clusterResource, + new ResourceLimits(clusterResource)); + + applyCSAssignment(clusterResource, + a.assignContainers(clusterResource, node1, + new ResourceLimits(clusterResource), + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps); + assertEquals(7*GB, a.getUsedResources().getMemorySize()); + assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize()); + assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize()); + + } + @Test public void testUserLimits() throws Exception { // Mock the queue @@ -1497,7 +1605,7 @@ public void testUserLimits() throws Exception { /** * Start testing... */ - + // Set user-limit a.setUserLimit(50); a.setUserLimitFactor(2);