From 200eec8f2eaab29e2feddc7c3d38ab8f215bd36d Mon Sep 17 00:00:00 2001 From: Szilard Nemeth Date: Thu, 3 Jun 2021 13:27:36 +0200 Subject: [PATCH] YARN-10796. Capacity Scheduler: dynamic queue cannot scale out properly if its capacity is 0%. Contributed by Peter Bacsko --- .../scheduler/capacity/UsersManager.java | 24 ++-- .../scheduler/capacity/TestUsersManager.java | 109 ++++++++++++++++++ 2 files changed, 126 insertions(+), 7 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUsersManager.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java index 6f7d8f6155..cf9dead183 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/UsersManager.java @@ -702,7 +702,8 @@ public class UsersManager implements AbstractUsersManager { activeUsersWithOnlyPendingApps = new AtomicInteger(numPendingUsers); } - private Resource computeUserLimit(String userName, Resource clusterResource, + @VisibleForTesting + Resource computeUserLimit(String userName, Resource clusterResource, String nodePartition, SchedulingMode schedulingMode, boolean activeUser) { Resource partitionResource = labelManager.getResourceByLabel(nodePartition, clusterResource); @@ -716,6 +717,7 @@ public class UsersManager implements AbstractUsersManager { * (which extra resources we are allocating) */ Resource queueCapacity = lQueue.getEffectiveCapacity(nodePartition); + Resource originalCapacity = queueCapacity; /* * Assume we have required resource equals to minimumAllocation, this can @@ -791,16 +793,19 @@ public class UsersManager implements AbstractUsersManager { // IGNORE_PARTITION_EXCLUSIVITY allocation. Resource maxUserLimit = Resources.none(); if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { - // If user-limit-factor set to -1, we should disabled user limit. - if (getUserLimitFactor() != -1) { - maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, - getUserLimitFactor()); - } else { + if (getUserLimitFactor() == -1 || + originalCapacity.equals(Resources.none())) { + // If user-limit-factor set to -1, we should disable user limit. + // + // Also prevent incorrect maxUserLimit due to low queueCapacity + // Can happen if dynamic queue has capacity = 0% maxUserLimit = lQueue. getEffectiveMaxCapacityDown( nodePartition, lQueue.getMinimumAllocation()); + } else { + maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity, + getUserLimitFactor()); } - } else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) { maxUserLimit = partitionResource; } @@ -1131,4 +1136,9 @@ public class UsersManager implements AbstractUsersManager { public int getNumActiveUsersWithOnlyPendingApps() { return activeUsersWithOnlyPendingApps.get(); } + + @VisibleForTesting + void setUsageRatio(String label, float usage) { + qUsageRatios.usageRatios.put(label, usage); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUsersManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUsersManager.java new file mode 100644 index 0000000000..5b79ee2e25 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestUsersManager.java @@ -0,0 +1,109 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import static org.mockito.Mockito.when; +import static org.mockito.ArgumentMatchers.any; +import static org.mockito.ArgumentMatchers.anyString; +import static org.junit.Assert.assertEquals; + +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; +import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; +import org.junit.Before; +import org.junit.Test; +import org.junit.runner.RunWith; +import org.mockito.Mock; +import org.mockito.junit.MockitoJUnitRunner; + +@RunWith(MockitoJUnitRunner.class) +public class TestUsersManager { + private static final Resource CLUSTER_RESOURCE = + Resource.newInstance(16384, 16); + private static final Resource MINIMUM_ALLOCATION = + Resource.newInstance(1024, 1); + private static final Resource MAX_RESOURCE_LIMIT = + Resource.newInstance(9216, 1); + private static final Resource NON_ZERO_CAPACITY = + Resource.newInstance(8192, 1); + private static final String TEST_USER = "test"; + + private UsersManager usersManager; + + @Mock + private AutoCreatedLeafQueue lQueue; + + @Mock + private RMNodeLabelsManager labelMgr; + + @Mock + private QueueMetrics metrics; + + @Mock + private CapacitySchedulerContext context; + + @Before + public void setup() { + usersManager = new UsersManager(metrics, + lQueue, + labelMgr, + context, + new DefaultResourceCalculator()); + + when(lQueue.getMinimumAllocation()).thenReturn(MINIMUM_ALLOCATION); + when(lQueue.getEffectiveMaxCapacityDown(anyString(), any(Resource.class))) + .thenReturn(MAX_RESOURCE_LIMIT); + when(labelMgr.getResourceByLabel(anyString(), any(Resource.class))) + .thenReturn(CLUSTER_RESOURCE); + usersManager.setUsageRatio(CommonNodeLabelsManager.NO_LABEL, 0.5f); + usersManager.setUserLimit( + CapacitySchedulerConfiguration.DEFAULT_USER_LIMIT); + usersManager.setUserLimitFactor( + CapacitySchedulerConfiguration.DEFAULT_USER_LIMIT_FACTOR); + } + + @Test + public void testComputeUserLimitWithZeroCapacityQueue() { + when(lQueue.getEffectiveCapacity(anyString())) + .thenReturn(Resources.none()); + + checkLimit(MAX_RESOURCE_LIMIT); + } + + @Test + public void testComputeUserLimitWithNonZeroCapacityQueue() { + when(lQueue.getEffectiveCapacity(anyString())) + .thenReturn(NON_ZERO_CAPACITY); + + checkLimit(NON_ZERO_CAPACITY); + } + + private void checkLimit(Resource expectedLimit) { + Resource limit = usersManager.computeUserLimit(TEST_USER, + CLUSTER_RESOURCE, + CommonNodeLabelsManager.NO_LABEL, + SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY, + true); + + assertEquals("User limit", expectedLimit, limit); + } +}