YARN-10802. Change Capacity Scheduler minimum-user-limit-percent to accept decimal values. Contributed by Benjamin Teke

This commit is contained in:
Szilard Nemeth 2021-06-14 22:33:04 +02:00
parent ebee2aed00
commit e31d06032b
10 changed files with 170 additions and 48 deletions

View File

@ -636,8 +636,8 @@ public void setMaximumCapacityByLabel(String queue, String label,
absoluteResourceCapacity); absoluteResourceCapacity);
} }
public int getUserLimit(String queue) { public float getUserLimit(String queue) {
int userLimit = getInt(getQueuePrefix(queue) + USER_LIMIT, float userLimit = getFloat(getQueuePrefix(queue) + USER_LIMIT,
DEFAULT_USER_LIMIT); DEFAULT_USER_LIMIT);
return userLimit; return userLimit;
} }
@ -686,8 +686,8 @@ public <S extends SchedulableEntity> OrderingPolicy<S> getAppOrderingPolicy(
return orderingPolicy; return orderingPolicy;
} }
public void setUserLimit(String queue, int userLimit) { public void setUserLimit(String queue, float userLimit) {
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit); setFloat(getQueuePrefix(queue) + USER_LIMIT, userLimit);
LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}", LOG.debug("here setUserLimit: queuePrefix={}, userLimit={}",
getQueuePrefix(queue), getUserLimit(queue)); getQueuePrefix(queue), getUserLimit(queue));
} }

View File

@ -255,7 +255,7 @@ protected void setupQueueConfigs(Resource clusterResource,
conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath())); conf.getDefaultApplicationPriorityConfPerQueue(getQueuePath()));
// Validate leaf queue's user's weights. // Validate leaf queue's user's weights.
int queueUL = Math.min(100, conf.getUserLimit(getQueuePath())); float queueUL = Math.min(100.0f, conf.getUserLimit(getQueuePath()));
for (Entry<String, Float> e : getUserWeights().entrySet()) { for (Entry<String, Float> e : getUserWeights().entrySet()) {
float val = e.getValue().floatValue(); float val = e.getValue().floatValue();
if (val < 0.0f || val > (100.0f / queueUL)) { if (val < 0.0f || val > (100.0f / queueUL)) {
@ -367,17 +367,17 @@ public List<CSQueue> getChildQueues() {
} }
/** /**
* Set user limit - used only for testing. * Set user limit.
* @param userLimit new user limit * @param userLimit new user limit
*/ */
@VisibleForTesting @VisibleForTesting
void setUserLimit(int userLimit) { void setUserLimit(float userLimit) {
usersManager.setUserLimit(userLimit); usersManager.setUserLimit(userLimit);
usersManager.userLimitNeedsRecompute(); usersManager.userLimitNeedsRecompute();
} }
/** /**
* Set user limit factor - used only for testing. * Set user limit factor.
* @param userLimitFactor new user limit factor * @param userLimitFactor new user limit factor
*/ */
@VisibleForTesting @VisibleForTesting
@ -444,7 +444,7 @@ public int getNumActiveApplications(String user) {
} }
@Private @Private
public int getUserLimit() { public float getUserLimit() {
return usersManager.getUserLimit(); return usersManager.getUserLimit();
} }

View File

@ -38,7 +38,7 @@ public class PlanQueue extends AbstractManagedParentQueue {
private int maxAppsForReservation; private int maxAppsForReservation;
private int maxAppsPerUserForReservation; private int maxAppsPerUserForReservation;
private int userLimit; private float userLimit;
private float userLimitFactor; private float userLimitFactor;
protected CapacitySchedulerContext schedulerContext; protected CapacitySchedulerContext schedulerContext;
private boolean showReservationsAsQueues; private boolean showReservationsAsQueues;
@ -60,15 +60,16 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super DEFAULT_MAXIMUM_SYSTEM_APPLICATIIONS * super
.getAbsoluteCapacity()); .getAbsoluteCapacity());
} }
int userLimit = conf.getUserLimit(queuePath); float configuredUserLimit = conf.getUserLimit(queuePath);
float userLimitFactor = conf.getUserLimitFactor(queuePath); float configuredUserLimitFactor = conf.getUserLimitFactor(queuePath);
int maxAppsPerUserForReservation = int configuredMaxAppsPerUserForReservation =
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor); (int) (maxAppsForReservation * (configuredUserLimit / 100.0f) *
if (userLimitFactor == -1) { configuredUserLimitFactor);
maxAppsPerUserForReservation = maxAppsForReservation; if (configuredUserLimitFactor == -1) {
configuredMaxAppsPerUserForReservation = maxAppsForReservation;
} }
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation, updateQuotas(configuredUserLimit, configuredUserLimitFactor,
maxAppsPerUserForReservation); maxAppsForReservation, configuredMaxAppsPerUserForReservation);
StringBuffer queueInfo = new StringBuffer(); StringBuffer queueInfo = new StringBuffer();
queueInfo.append("Created Plan Queue: ").append(queueName) queueInfo.append("Created Plan Queue: ").append(queueName)
@ -76,9 +77,10 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
.append("]\nwith max capacity: [").append(super.getMaximumCapacity()) .append("]\nwith max capacity: [").append(super.getMaximumCapacity())
.append("\nwith max reservation apps: [").append(maxAppsForReservation) .append("\nwith max reservation apps: [").append(maxAppsForReservation)
.append("]\nwith max reservation apps per user: [") .append("]\nwith max reservation apps per user: [")
.append(maxAppsPerUserForReservation).append("]\nwith user limit: [") .append(configuredMaxAppsPerUserForReservation)
.append(userLimit).append("]\nwith user limit factor: [") .append("]\nwith user limit: [")
.append(userLimitFactor).append("]."); .append(configuredUserLimit).append("]\nwith user limit factor: [")
.append(configuredUserLimitFactor).append("].");
LOG.info(queueInfo.toString()); LOG.info(queueInfo.toString());
} }
@ -123,12 +125,12 @@ public void reinitialize(CSQueue newlyParsedQueue,
} }
} }
private void updateQuotas(int userLimit, float userLimitFactor, private void updateQuotas(float newUserLimit, float newUserLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) { int newMaxAppsForReservation, int newMaxAppsPerUserForReservation) {
this.userLimit = userLimit; this.userLimit = newUserLimit;
this.userLimitFactor = userLimitFactor; this.userLimitFactor = newUserLimitFactor;
this.maxAppsForReservation = maxAppsForReservation; this.maxAppsForReservation = newMaxAppsForReservation;
this.maxAppsPerUserForReservation = maxAppsPerUserForReservation; this.maxAppsPerUserForReservation = newMaxAppsPerUserForReservation;
} }
/** /**
@ -155,7 +157,7 @@ public int getMaxApplicationsPerUserForReservation() {
* *
* @return userLimit * @return userLimit
*/ */
public int getUserLimitForReservation() { public float getUserLimitForReservation() {
return userLimit; return userLimit;
} }

View File

@ -73,7 +73,7 @@ public void reinitialize(CSQueue newlyParsedQueue,
} }
} }
private void updateQuotas(int userLimit, float userLimitFactor, private void updateQuotas(float userLimit, float userLimitFactor,
int maxAppsForReservation, int maxAppsPerUserForReservation) { int maxAppsForReservation, int maxAppsPerUserForReservation) {
setUserLimit(userLimit); setUserLimit(userLimit);
setUserLimitFactor(userLimitFactor); setUserLimitFactor(userLimitFactor);

View File

@ -77,7 +77,7 @@ public class UsersManager implements AbstractUsersManager {
private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState = private Map<String, Map<SchedulingMode, Long>> localVersionOfAllUsersState =
new HashMap<String, Map<SchedulingMode, Long>>(); new HashMap<String, Map<SchedulingMode, Long>>();
private volatile int userLimit; private volatile float userLimit;
private volatile float userLimitFactor; private volatile float userLimitFactor;
private WriteLock writeLock; private WriteLock writeLock;
@ -320,7 +320,7 @@ public UsersManager(QueueMetrics metrics, LeafQueue lQueue,
* Get configured user-limit. * Get configured user-limit.
* @return user limit * @return user limit
*/ */
public int getUserLimit() { public float getUserLimit() {
return userLimit; return userLimit;
} }
@ -328,7 +328,7 @@ public int getUserLimit() {
* Set configured user-limit. * Set configured user-limit.
* @param userLimit user limit * @param userLimit user limit
*/ */
public void setUserLimit(int userLimit) { public void setUserLimit(float userLimit) {
this.userLimit = userLimit; this.userLimit = userLimit;
} }

View File

@ -197,14 +197,21 @@ private void renderQueueCapacityInfo(ResponseInfo ri, String label) {
private void renderCommonLeafQueueInfo(ResponseInfo ri) { private void renderCommonLeafQueueInfo(ResponseInfo ri) {
ri. ri.
__("Num Schedulable Applications:", Integer.toString(lqinfo.getNumActiveApplications())). __("Num Schedulable Applications:",
__("Num Non-Schedulable Applications:", Integer.toString(lqinfo.getNumPendingApplications())). Integer.toString(lqinfo.getNumActiveApplications())).
__("Num Containers:", Integer.toString(lqinfo.getNumContainers())). __("Num Non-Schedulable Applications:",
__("Max Applications:", Integer.toString(lqinfo.getMaxApplications())). Integer.toString(lqinfo.getNumPendingApplications())).
__("Max Applications Per User:", Integer.toString(lqinfo.getMaxApplicationsPerUser())). __("Num Containers:",
__("Configured Minimum User Limit Percent:", Integer.toString(lqinfo.getUserLimit()) + "%"). Integer.toString(lqinfo.getNumContainers())).
__("Max Applications:",
Integer.toString(lqinfo.getMaxApplications())).
__("Max Applications Per User:",
Integer.toString(lqinfo.getMaxApplicationsPerUser())).
__("Configured Minimum User Limit Percent:",
lqinfo.getUserLimit() + "%").
__("Configured User Limit Factor:", lqinfo.getUserLimitFactor()). __("Configured User Limit Factor:", lqinfo.getUserLimitFactor()).
__("Accessible Node Labels:", StringUtils.join(",", lqinfo.getNodeLabels())). __("Accessible Node Labels:",
StringUtils.join(",", lqinfo.getNodeLabels())).
__("Ordering Policy: ", lqinfo.getOrderingPolicyDisplayName()). __("Ordering Policy: ", lqinfo.getOrderingPolicyDisplayName()).
__("Preemption:", __("Preemption:",
lqinfo.getPreemptionDisabled() ? "disabled" : "enabled"). lqinfo.getPreemptionDisabled() ? "disabled" : "enabled").

View File

@ -43,7 +43,7 @@ public class CapacitySchedulerLeafQueueInfo extends CapacitySchedulerQueueInfo {
protected int numContainers; protected int numContainers;
protected int maxApplications; protected int maxApplications;
protected int maxApplicationsPerUser; protected int maxApplicationsPerUser;
protected int userLimit; protected float userLimit;
protected UsersInfo users; // To add another level in the XML protected UsersInfo users; // To add another level in the XML
protected float userLimitFactor; protected float userLimitFactor;
protected float configuredMaxAMResourceLimit; protected float configuredMaxAMResourceLimit;
@ -130,7 +130,7 @@ public int getMaxApplicationsPerUser() {
return maxApplicationsPerUser; return maxApplicationsPerUser;
} }
public int getUserLimit() { public float getUserLimit() {
return userLimit; return userLimit;
} }

View File

@ -1731,6 +1731,111 @@ public void testUserLimits() throws Exception {
1, a.getAbstractUsersManager().getNumActiveUsers()); 1, a.getAbstractUsersManager().getNumActiveUsers());
} }
@Test
public void testDecimalUserLimits() throws Exception {
// Mock the queue
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
//unset maxCapacity
a.setMaxCapacity(1.0f);
when(csContext.getClusterResource())
.thenReturn(Resources.createResource(16 * GB, 32));
// Users
final String user0 = "user_0";
final String user1 = "user_1";
// Submit applications
final ApplicationAttemptId appAttemptId0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app0 =
new FiCaSchedulerApp(appAttemptId0, user0, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app0, user0);
final ApplicationAttemptId appAttemptId1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app1 =
new FiCaSchedulerApp(appAttemptId1, user1, a,
a.getAbstractUsersManager(), spyRMContext);
a.submitApplicationAttempt(app1, user1); // different user
// Setup some nodes
String host0 = "127.0.0.1";
FiCaSchedulerNode node0 =
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
String host1 = "127.0.0.2";
FiCaSchedulerNode node1 =
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource =
Resources.createResource(numNodes * (8*GB), numNodes * 16);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// Setup resource-requests
Priority priority = TestUtils.createMockPriority(1);
app0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
priority, recordFactory)));
app1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
priority, recordFactory)));
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
app1);
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
node0, node1.getNodeID(), node1);
/**
* Start testing...
*/
// Set user-limit
a.setUserLimit(50.1f);
a.setUserLimitFactor(2);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
// There're two active users
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
// 1 container to user_0
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(3*GB, a.getUsedResources().getMemorySize());
assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(0, app1.getCurrentConsumption().getMemorySize());
// Allocate another container. Since the user limit is 50.1% it isn't
// reached, app_0 will get another container.
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node0,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(6*GB, a.getUsedResources().getMemorySize());
assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(0, app1.getCurrentConsumption().getMemorySize());
applyCSAssignment(clusterResource,
a.assignContainers(clusterResource, node1,
new ResourceLimits(clusterResource),
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
assertEquals(7*GB, a.getUsedResources().getMemorySize());
assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
assertEquals(GB, app1.getCurrentConsumption().getMemorySize());
// app_0 doesn't have outstanding resources, there's only one active user.
assertEquals("There should only be 1 active user!",
1, a.getAbstractUsersManager().getNumActiveUsers());
}
@Test @Test
public void testUserSpecificUserLimits() throws Exception { public void testUserSpecificUserLimits() throws Exception {
// Mock the queue // Mock the queue

View File

@ -1164,19 +1164,27 @@ public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
// Extra cases for testing maxApplicationsPerUser // Extra cases for testing maxApplicationsPerUser
int halfPercent = 50; float halfPercent = 50f;
int oneAndQuarterPercent = 125; float oneAndQuarterPercent = 125f;
float thirdPercent = 33.3f;
a.getUsersManager().setUserLimit(halfPercent); a.getUsersManager().setUserLimit(halfPercent);
b.getUsersManager().setUserLimit(oneAndQuarterPercent); b.getUsersManager().setUserLimit(oneAndQuarterPercent);
root.updateClusterResource(clusterResource, root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource)); new ResourceLimits(clusterResource));
assertEquals(a.getMaxApplications() * halfPercent / 100, assertEquals((int) (a.getMaxApplications() * halfPercent / 100),
a.getMaxApplicationsPerUser()); a.getMaxApplicationsPerUser());
// Q_B's limit per user shouldn't be greater // Q_B's limit per user shouldn't be greater
// than the whole queue's application limit // than the whole queue's application limit
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser()); assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
b.getUsersManager().setUserLimit(thirdPercent);
root.updateClusterResource(clusterResource,
new ResourceLimits(clusterResource));
assertEquals((int) (b.getMaxApplications() * thirdPercent / 100),
b.getMaxApplicationsPerUser());
float userLimitFactorQueueA = 0.9f; float userLimitFactorQueueA = 0.9f;
float userLimitFactorQueueB = 1.1f; float userLimitFactorQueueB = 1.1f;
a.getUsersManager().setUserLimit(halfPercent); a.getUsersManager().setUserLimit(halfPercent);

View File

@ -100,7 +100,7 @@ private class LeafQueueInfo extends QueueInfo {
int numContainers; int numContainers;
int maxApplications; int maxApplications;
int maxApplicationsPerUser; int maxApplicationsPerUser;
int userLimit; float userLimit;
float userLimitFactor; float userLimitFactor;
long defaultApplicationLifetime; long defaultApplicationLifetime;
long maxApplicationLifetime; long maxApplicationLifetime;
@ -352,7 +352,7 @@ public void verifySubQueueXML(Element qElem, String q,
WebServicesTestUtils.getXmlInt(qElem, "maxApplications"); WebServicesTestUtils.getXmlInt(qElem, "maxApplications");
lqi.maxApplicationsPerUser = lqi.maxApplicationsPerUser =
WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser"); WebServicesTestUtils.getXmlInt(qElem, "maxApplicationsPerUser");
lqi.userLimit = WebServicesTestUtils.getXmlInt(qElem, "userLimit"); lqi.userLimit = WebServicesTestUtils.getXmlFloat(qElem, "userLimit");
lqi.userLimitFactor = lqi.userLimitFactor =
WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor"); WebServicesTestUtils.getXmlFloat(qElem, "userLimitFactor");
lqi.defaultApplicationLifetime = lqi.defaultApplicationLifetime =
@ -477,7 +477,7 @@ private void verifySubQueue(JSONObject info, String q,
lqi.numContainers = info.getInt("numContainers"); lqi.numContainers = info.getInt("numContainers");
lqi.maxApplications = info.getInt("maxApplications"); lqi.maxApplications = info.getInt("maxApplications");
lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser"); lqi.maxApplicationsPerUser = info.getInt("maxApplicationsPerUser");
lqi.userLimit = info.getInt("userLimit"); lqi.userLimit = (float) info.getDouble("userLimit");
lqi.userLimitFactor = (float) info.getDouble("userLimitFactor"); lqi.userLimitFactor = (float) info.getDouble("userLimitFactor");
lqi.defaultApplicationLifetime = lqi.defaultApplicationLifetime =
info.getLong("defaultApplicationLifetime"); info.getLong("defaultApplicationLifetime");
@ -553,7 +553,7 @@ private void verifyLeafQueueGeneric(String q, LeafQueueInfo info)
(float)info.maxApplicationsPerUser, info.userLimitFactor); (float)info.maxApplicationsPerUser, info.userLimitFactor);
assertEquals("userLimit doesn't match", csConf.getUserLimit(q), assertEquals("userLimit doesn't match", csConf.getUserLimit(q),
info.userLimit); info.userLimit, 1e-3f);
assertEquals("userLimitFactor doesn't match", assertEquals("userLimitFactor doesn't match",
csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f); csConf.getUserLimitFactor(q), info.userLimitFactor, 1e-3f);