YARN-10531. Be able to disable user limit factor for CapacityScheduler Leaf Queue. (Qi Zhu via wangda)
Change-Id: I670e5525619b320745254609c48e7e1afb084835
This commit is contained in:
parent
f1766e5bb4
commit
b7384a8d02
@ -1535,8 +1535,13 @@ private void deriveCapacityFromAbsoluteConfigurations(String label,
|
|||||||
leafQueue.setMaxApplications(maxApplications);
|
leafQueue.setMaxApplications(maxApplications);
|
||||||
|
|
||||||
int maxApplicationsPerUser = Math.min(maxApplications,
|
int maxApplicationsPerUser = Math.min(maxApplications,
|
||||||
(int) (maxApplications * (leafQueue.getUsersManager().getUserLimit()
|
(int) (maxApplications
|
||||||
/ 100.0f) * leafQueue.getUsersManager().getUserLimitFactor()));
|
* (leafQueue.getUsersManager().getUserLimit() / 100.0f)
|
||||||
|
* leafQueue.getUsersManager().getUserLimitFactor()));
|
||||||
|
if (leafQueue.getUsersManager().getUserLimitFactor() == -1) {
|
||||||
|
maxApplicationsPerUser = maxApplications;
|
||||||
|
}
|
||||||
|
|
||||||
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
|
leafQueue.setMaxApplicationsPerUser(maxApplicationsPerUser);
|
||||||
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
|
LOG.info("LeafQueue:" + leafQueue.getQueuePath() + ", maxApplications="
|
||||||
+ maxApplications + ", maxApplicationsPerUser="
|
+ maxApplications + ", maxApplicationsPerUser="
|
||||||
|
@ -708,16 +708,33 @@ public Resource getUserAMResourceLimitPerPartition(
|
|||||||
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
||||||
* effectiveUserLimit * usersManager.getUserLimitFactor(),
|
* effectiveUserLimit * usersManager.getUserLimitFactor(),
|
||||||
minimumAllocation);
|
minimumAllocation);
|
||||||
|
|
||||||
|
if (getUserLimitFactor() == -1) {
|
||||||
|
userAMLimit = Resources.multiplyAndNormalizeUp(
|
||||||
|
resourceCalculator, queuePartitionResource,
|
||||||
|
queueCapacities.getMaxAMResourcePercentage(nodePartition),
|
||||||
|
minimumAllocation);
|
||||||
|
}
|
||||||
|
|
||||||
userAMLimit =
|
userAMLimit =
|
||||||
Resources.min(resourceCalculator, lastClusterResource,
|
Resources.min(resourceCalculator, lastClusterResource,
|
||||||
userAMLimit,
|
userAMLimit,
|
||||||
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
|
Resources.clone(getAMResourceLimitPerPartition(nodePartition)));
|
||||||
|
|
||||||
Resource preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
|
Resource preWeighteduserAMLimit =
|
||||||
|
Resources.multiplyAndNormalizeUp(
|
||||||
resourceCalculator, queuePartitionResource,
|
resourceCalculator, queuePartitionResource,
|
||||||
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
queueCapacities.getMaxAMResourcePercentage(nodePartition)
|
||||||
* preWeightedUserLimit * usersManager.getUserLimitFactor(),
|
* preWeightedUserLimit * usersManager.getUserLimitFactor(),
|
||||||
minimumAllocation);
|
minimumAllocation);
|
||||||
|
|
||||||
|
if (getUserLimitFactor() == -1) {
|
||||||
|
preWeighteduserAMLimit = Resources.multiplyAndNormalizeUp(
|
||||||
|
resourceCalculator, queuePartitionResource,
|
||||||
|
queueCapacities.getMaxAMResourcePercentage(nodePartition),
|
||||||
|
minimumAllocation);
|
||||||
|
}
|
||||||
|
|
||||||
preWeighteduserAMLimit =
|
preWeighteduserAMLimit =
|
||||||
Resources.min(resourceCalculator, lastClusterResource,
|
Resources.min(resourceCalculator, lastClusterResource,
|
||||||
preWeighteduserAMLimit,
|
preWeighteduserAMLimit,
|
||||||
@ -1896,9 +1913,14 @@ private void updateAbsoluteCapacitiesAndRelatedFields() {
|
|||||||
maxApplications =
|
maxApplications =
|
||||||
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
|
(int) (maxSystemApps * queueCapacities.getAbsoluteCapacity());
|
||||||
}
|
}
|
||||||
maxApplicationsPerUser = Math.min(maxApplications,
|
maxApplicationsPerUser =
|
||||||
|
Math.min(maxApplications,
|
||||||
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
|
(int) (maxApplications * (usersManager.getUserLimit() / 100.0f)
|
||||||
* usersManager.getUserLimitFactor()));
|
* usersManager.getUserLimitFactor()));
|
||||||
|
|
||||||
|
if (getUserLimitFactor() == -1) {
|
||||||
|
maxApplicationsPerUser = maxApplications;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
|
@ -478,8 +478,8 @@ private CapacitySchedulerConfiguration getConfForAutoCreatedQueue(
|
|||||||
new CapacitySchedulerConfiguration(
|
new CapacitySchedulerConfiguration(
|
||||||
csContext.getConfiguration(), false);
|
csContext.getConfiguration(), false);
|
||||||
if (isLeaf) {
|
if (isLeaf) {
|
||||||
// FIXME: Ideally we should disable user limit factor, see YARN-10531
|
// set to -1, to disable it
|
||||||
// dupCSConfig.setUserLimitFactor(childQueuePath, );
|
dupCSConfig.setUserLimitFactor(childQueuePath, -1);
|
||||||
|
|
||||||
// Set Max AM percentage to a higher value
|
// Set Max AM percentage to a higher value
|
||||||
dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent(
|
dupCSConfig.setMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
|
@ -64,6 +64,9 @@ public PlanQueue(CapacitySchedulerContext cs, String queueName,
|
|||||||
float userLimitFactor = conf.getUserLimitFactor(queuePath);
|
float userLimitFactor = conf.getUserLimitFactor(queuePath);
|
||||||
int maxAppsPerUserForReservation =
|
int maxAppsPerUserForReservation =
|
||||||
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
|
(int) (maxAppsForReservation * (userLimit / 100.0f) * userLimitFactor);
|
||||||
|
if (userLimitFactor == -1) {
|
||||||
|
maxAppsPerUserForReservation = maxAppsForReservation;
|
||||||
|
}
|
||||||
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
|
updateQuotas(userLimit, userLimitFactor, maxAppsForReservation,
|
||||||
maxAppsPerUserForReservation);
|
maxAppsPerUserForReservation);
|
||||||
|
|
||||||
|
@ -791,8 +791,16 @@ partitionResource, getUsageRatio(nodePartition),
|
|||||||
// IGNORE_PARTITION_EXCLUSIVITY allocation.
|
// IGNORE_PARTITION_EXCLUSIVITY allocation.
|
||||||
Resource maxUserLimit = Resources.none();
|
Resource maxUserLimit = Resources.none();
|
||||||
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
|
if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) {
|
||||||
|
// If user-limit-factor set to -1, we should disabled user limit.
|
||||||
|
if (getUserLimitFactor() != -1) {
|
||||||
maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
|
maxUserLimit = Resources.multiplyAndRoundDown(queueCapacity,
|
||||||
getUserLimitFactor());
|
getUserLimitFactor());
|
||||||
|
} else {
|
||||||
|
maxUserLimit = lQueue.
|
||||||
|
getEffectiveMaxCapacityDown(
|
||||||
|
nodePartition, lQueue.getMinimumAllocation());
|
||||||
|
}
|
||||||
|
|
||||||
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
} else if (schedulingMode == SchedulingMode.IGNORE_PARTITION_EXCLUSIVITY) {
|
||||||
maxUserLimit = partitionResource;
|
maxUserLimit = partitionResource;
|
||||||
}
|
}
|
||||||
|
@ -462,6 +462,41 @@ public void testChildlessParentQueueWhenAutoQueueCreationEnabled()
|
|||||||
((ParentQueue)empty).isEligibleForAutoQueueCreation());
|
((ParentQueue)empty).isEligibleForAutoQueueCreation());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void testAutoCreateQueueUserLimitDisabled() throws Exception {
|
||||||
|
startScheduler();
|
||||||
|
createBasicQueueStructureAndValidate();
|
||||||
|
|
||||||
|
submitApp(cs, USER0, USER0, "root.e-auto");
|
||||||
|
|
||||||
|
AbstractCSQueue e = (AbstractCSQueue) cs.getQueue("root.e-auto");
|
||||||
|
Assert.assertNotNull(e);
|
||||||
|
Assert.assertTrue(e.isDynamicQueue());
|
||||||
|
|
||||||
|
AbstractCSQueue user0 = (AbstractCSQueue) cs.getQueue(
|
||||||
|
"root.e-auto." + USER0);
|
||||||
|
Assert.assertNotNull(user0);
|
||||||
|
Assert.assertTrue(user0.isDynamicQueue());
|
||||||
|
Assert.assertTrue(user0 instanceof LeafQueue);
|
||||||
|
|
||||||
|
LeafQueue user0LeafQueue = (LeafQueue)user0;
|
||||||
|
|
||||||
|
// Assert user limit factor is -1
|
||||||
|
Assert.assertTrue(user0LeafQueue.getUserLimitFactor() == -1);
|
||||||
|
|
||||||
|
// Assert user max applications not limited
|
||||||
|
Assert.assertEquals(user0LeafQueue.getMaxApplicationsPerUser(),
|
||||||
|
user0LeafQueue.getMaxApplications());
|
||||||
|
|
||||||
|
// Assert AM Resource
|
||||||
|
Assert.assertEquals(user0LeafQueue.getAMResourceLimit().getMemorySize(),
|
||||||
|
user0LeafQueue.getMaxAMResourcePerQueuePercent()*MAX_MEMORY*GB, 1e-6);
|
||||||
|
|
||||||
|
// Assert user limit (no limit) when limit factor is -1
|
||||||
|
Assert.assertEquals(MAX_MEMORY*GB,
|
||||||
|
user0LeafQueue.getEffectiveMaxCapacityDown("",
|
||||||
|
user0LeafQueue.getMinimumAllocation()).getMemorySize(), 1e-6);
|
||||||
|
}
|
||||||
|
|
||||||
private LeafQueue createQueue(String queuePath) throws YarnException {
|
private LeafQueue createQueue(String queuePath) throws YarnException {
|
||||||
return autoQueueHandler.autoCreateQueue(
|
return autoQueueHandler.autoCreateQueue(
|
||||||
CSQueueUtils.extractQueuePath(queuePath));
|
CSQueueUtils.extractQueuePath(queuePath));
|
||||||
|
@ -1436,6 +1436,114 @@ public void testUserLimitCacheActiveUsersChanged() throws Exception {
|
|||||||
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
|
.get(SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY).getMemorySize());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDisabledUserLimitFactor() throws Exception {
|
||||||
|
// Mock the queue
|
||||||
|
LeafQueue a = stubLeafQueue((LeafQueue)queues.get(A));
|
||||||
|
//unset maxCapacity
|
||||||
|
a.setMaxCapacity(1.0f);
|
||||||
|
|
||||||
|
when(csContext.getClusterResource())
|
||||||
|
.thenReturn(Resources.createResource(16 * GB, 32));
|
||||||
|
|
||||||
|
// Users
|
||||||
|
final String user0 = "user0";
|
||||||
|
final String user1 = "user1";
|
||||||
|
|
||||||
|
// Submit applications
|
||||||
|
final ApplicationAttemptId appAttemptId0 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(0, 0);
|
||||||
|
FiCaSchedulerApp app0 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId0, user0, a,
|
||||||
|
a.getAbstractUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app0, user0);
|
||||||
|
|
||||||
|
final ApplicationAttemptId appAttemptId1 =
|
||||||
|
TestUtils.getMockApplicationAttemptId(1, 0);
|
||||||
|
FiCaSchedulerApp app1 =
|
||||||
|
new FiCaSchedulerApp(appAttemptId1, user1, a,
|
||||||
|
a.getAbstractUsersManager(), spyRMContext);
|
||||||
|
a.submitApplicationAttempt(app1, user1); // different user
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
String host0 = "127.0.0.1";
|
||||||
|
FiCaSchedulerNode node0 =
|
||||||
|
TestUtils.getMockNode(host0, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
String host1 = "127.0.0.2";
|
||||||
|
FiCaSchedulerNode node1 =
|
||||||
|
TestUtils.getMockNode(host1, DEFAULT_RACK, 0, 8*GB);
|
||||||
|
|
||||||
|
final int numNodes = 2;
|
||||||
|
Resource clusterResource =
|
||||||
|
Resources.createResource(numNodes * (8*GB), numNodes * 16);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
// Setup resource-requests
|
||||||
|
Priority priority = TestUtils.createMockPriority(1);
|
||||||
|
app0.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 3*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
app1.updateResourceRequests(Collections.singletonList(
|
||||||
|
TestUtils.createResourceRequest(ResourceRequest.ANY, 1*GB, 2, true,
|
||||||
|
priority, recordFactory)));
|
||||||
|
|
||||||
|
Map<ApplicationAttemptId, FiCaSchedulerApp> apps = ImmutableMap.of(
|
||||||
|
app0.getApplicationAttemptId(), app0, app1.getApplicationAttemptId(),
|
||||||
|
app1);
|
||||||
|
Map<NodeId, FiCaSchedulerNode> nodes = ImmutableMap.of(node0.getNodeID(),
|
||||||
|
node0, node1.getNodeID(), node1);
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Start testing ...
|
||||||
|
*/
|
||||||
|
a.setUserLimitFactor(1);
|
||||||
|
a.setUserLimit(50);
|
||||||
|
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
// There're two active users
|
||||||
|
assertEquals(2, a.getAbstractUsersManager().getNumActiveUsers());
|
||||||
|
|
||||||
|
// 1 container to user0
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(3*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(0*GB, app1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
// Allocate one container to app1. Even if app0
|
||||||
|
// submit earlier, it cannot get this container assigned since user0
|
||||||
|
// exceeded user-limit already.
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node0,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(4*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(3*GB, app0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
// Set to -1 , disabled user limit factor
|
||||||
|
// There will be not limited
|
||||||
|
a.setUserLimitFactor(-1);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
applyCSAssignment(clusterResource,
|
||||||
|
a.assignContainers(clusterResource, node1,
|
||||||
|
new ResourceLimits(clusterResource),
|
||||||
|
SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), a, nodes, apps);
|
||||||
|
assertEquals(7*GB, a.getUsedResources().getMemorySize());
|
||||||
|
assertEquals(6*GB, app0.getCurrentConsumption().getMemorySize());
|
||||||
|
assertEquals(1*GB, app1.getCurrentConsumption().getMemorySize());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testUserLimits() throws Exception {
|
public void testUserLimits() throws Exception {
|
||||||
// Mock the queue
|
// Mock the queue
|
||||||
|
Loading…
Reference in New Issue
Block a user