YARN-10396. Max applications calculation per queue disregards queue level settings in absolute mode. Contributed by Benjamin Teke.
This commit is contained in:
parent
b65e43fe38
commit
82ec28f442
@ -1123,8 +1123,18 @@ private void deriveCapacityFromAbsoluteConfigurations(String label,
|
|||||||
if (childQueue instanceof LeafQueue) {
|
if (childQueue instanceof LeafQueue) {
|
||||||
LeafQueue leafQueue = (LeafQueue) childQueue;
|
LeafQueue leafQueue = (LeafQueue) childQueue;
|
||||||
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
CapacitySchedulerConfiguration conf = csContext.getConfiguration();
|
||||||
int maxApplications = (int) (conf.getMaximumSystemApplications()
|
int maxApplications =
|
||||||
|
conf.getMaximumApplicationsPerQueue(childQueue.getQueuePath());
|
||||||
|
if (maxApplications < 0) {
|
||||||
|
int maxGlobalPerQueueApps = conf.getGlobalMaximumApplicationsPerQueue();
|
||||||
|
if (maxGlobalPerQueueApps > 0) {
|
||||||
|
maxApplications = (int) (maxGlobalPerQueueApps *
|
||||||
|
childQueue.getQueueCapacities().getAbsoluteCapacity(label));
|
||||||
|
} else {
|
||||||
|
maxApplications = (int) (conf.getMaximumSystemApplications()
|
||||||
* childQueue.getQueueCapacities().getAbsoluteCapacity(label));
|
* childQueue.getQueueCapacities().getAbsoluteCapacity(label));
|
||||||
|
}
|
||||||
|
}
|
||||||
leafQueue.setMaxApplications(maxApplications);
|
leafQueue.setMaxApplications(maxApplications);
|
||||||
|
|
||||||
int maxApplicationsPerUser = Math.min(maxApplications,
|
int maxApplicationsPerUser = Math.min(maxApplications,
|
||||||
|
@ -108,15 +108,17 @@ public void setUp() throws Exception {
|
|||||||
|
|
||||||
private static final String A = "a";
|
private static final String A = "a";
|
||||||
private static final String B = "b";
|
private static final String B = "b";
|
||||||
|
private static final String Q_A =
|
||||||
|
CapacitySchedulerConfiguration.ROOT + "." + A;
|
||||||
|
private static final String Q_B =
|
||||||
|
CapacitySchedulerConfiguration.ROOT + "." + B;
|
||||||
private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
|
private void setupSingleLevelQueues(CapacitySchedulerConfiguration conf) {
|
||||||
|
|
||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B});
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
||||||
conf.setCapacity(Q_A, 30);
|
conf.setCapacity(Q_A, 30);
|
||||||
|
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
conf.setCapacity(Q_B, 70);
|
conf.setCapacity(Q_B, 70);
|
||||||
|
|
||||||
LOG.info("Setup top-level queues a and b");
|
LOG.info("Setup top-level queues a and b");
|
||||||
@ -128,11 +130,9 @@ private void setupSingleLevelQueuesWithAbsoluteResource(
|
|||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
|
conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{A, B});
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
||||||
conf.setMinimumResourceRequirement("", Q_A,
|
conf.setMinimumResourceRequirement("", Q_A,
|
||||||
QUEUE_A_RESOURCE);
|
QUEUE_A_RESOURCE);
|
||||||
|
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
conf.setMinimumResourceRequirement("", Q_B,
|
conf.setMinimumResourceRequirement("", Q_B,
|
||||||
QUEUE_B_RESOURCE);
|
QUEUE_B_RESOURCE);
|
||||||
|
|
||||||
@ -368,9 +368,7 @@ public void testSingleLevelQueues() throws Exception {
|
|||||||
public void testSingleLevelQueuesPrecision() throws Exception {
|
public void testSingleLevelQueuesPrecision() throws Exception {
|
||||||
// Setup queue configs
|
// Setup queue configs
|
||||||
setupSingleLevelQueues(csConf);
|
setupSingleLevelQueues(csConf);
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + "a";
|
|
||||||
csConf.setCapacity(Q_A, 30);
|
csConf.setCapacity(Q_A, 30);
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + "b";
|
|
||||||
csConf.setCapacity(Q_B, 70.5F);
|
csConf.setCapacity(Q_B, 70.5F);
|
||||||
|
|
||||||
CSQueueStore queues = new CSQueueStore();
|
CSQueueStore queues = new CSQueueStore();
|
||||||
@ -434,10 +432,8 @@ private void setupMultiLevelQueues(CapacitySchedulerConfiguration conf) {
|
|||||||
// Define top-level queues
|
// Define top-level queues
|
||||||
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
|
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {A, B, C, D});
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
||||||
conf.setCapacity(Q_A, 10);
|
conf.setCapacity(Q_A, 10);
|
||||||
|
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
conf.setCapacity(Q_B, 50);
|
conf.setCapacity(Q_B, 50);
|
||||||
|
|
||||||
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
final String Q_C = CapacitySchedulerConfiguration.ROOT + "." + C;
|
||||||
@ -656,7 +652,6 @@ public void testQueueCapacitySettingChildZero() throws Exception {
|
|||||||
setupMultiLevelQueues(csConf);
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
// set child queues capacity to 0 when parents not 0
|
// set child queues capacity to 0 when parents not 0
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
csConf.setCapacity(Q_B + "." + B1, 0);
|
csConf.setCapacity(Q_B + "." + B1, 0);
|
||||||
csConf.setCapacity(Q_B + "." + B2, 0);
|
csConf.setCapacity(Q_B + "." + B2, 0);
|
||||||
csConf.setCapacity(Q_B + "." + B3, 0);
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
||||||
@ -673,9 +668,7 @@ public void testQueueCapacitySettingParentZero() throws Exception {
|
|||||||
setupMultiLevelQueues(csConf);
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
// set parent capacity to 0 when child not 0
|
// set parent capacity to 0 when child not 0
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
csConf.setCapacity(Q_B, 0);
|
csConf.setCapacity(Q_B, 0);
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
||||||
csConf.setCapacity(Q_A, 60);
|
csConf.setCapacity(Q_A, 60);
|
||||||
|
|
||||||
CSQueueStore queues = new CSQueueStore();
|
CSQueueStore queues = new CSQueueStore();
|
||||||
@ -690,13 +683,11 @@ public void testQueueCapacityZero() throws Exception {
|
|||||||
setupMultiLevelQueues(csConf);
|
setupMultiLevelQueues(csConf);
|
||||||
|
|
||||||
// set parent and child capacity to 0
|
// set parent and child capacity to 0
|
||||||
final String Q_B = CapacitySchedulerConfiguration.ROOT + "." + B;
|
|
||||||
csConf.setCapacity(Q_B, 0);
|
csConf.setCapacity(Q_B, 0);
|
||||||
csConf.setCapacity(Q_B + "." + B1, 0);
|
csConf.setCapacity(Q_B + "." + B1, 0);
|
||||||
csConf.setCapacity(Q_B + "." + B2, 0);
|
csConf.setCapacity(Q_B + "." + B2, 0);
|
||||||
csConf.setCapacity(Q_B + "." + B3, 0);
|
csConf.setCapacity(Q_B + "." + B3, 0);
|
||||||
|
|
||||||
final String Q_A = CapacitySchedulerConfiguration.ROOT + "." + A;
|
|
||||||
csConf.setCapacity(Q_A, 60);
|
csConf.setCapacity(Q_A, 60);
|
||||||
|
|
||||||
CSQueueStore queues = new CSQueueStore();
|
CSQueueStore queues = new CSQueueStore();
|
||||||
@ -1029,6 +1020,121 @@ public void testAbsoluteResourceWithChangeInClusterResource()
|
|||||||
QUEUE_B_RESOURCE_70PERC);
|
QUEUE_B_RESOURCE_70PERC);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testDeriveCapacityFromAbsoluteConfigurations() throws Exception {
|
||||||
|
// Setup queue configs
|
||||||
|
setupSingleLevelQueuesWithAbsoluteResource(csConf);
|
||||||
|
|
||||||
|
CSQueueStore queues = new CSQueueStore();
|
||||||
|
CSQueue root = CapacitySchedulerQueueManager.parseQueue(csContext, csConf,
|
||||||
|
null, CapacitySchedulerConfiguration.ROOT, queues, queues,
|
||||||
|
TestUtils.spyHook);
|
||||||
|
|
||||||
|
// Setup some nodes
|
||||||
|
int numNodes = 2;
|
||||||
|
final long memoryPerNode = (QUEUE_A_RESOURCE.getMemorySize() +
|
||||||
|
QUEUE_B_RESOURCE.getMemorySize()) / numNodes;
|
||||||
|
int coresPerNode = (QUEUE_A_RESOURCE.getVirtualCores() +
|
||||||
|
QUEUE_B_RESOURCE.getVirtualCores()) / numNodes;
|
||||||
|
|
||||||
|
Resource clusterResource = Resources.createResource(
|
||||||
|
numNodes * memoryPerNode, numNodes * coresPerNode);
|
||||||
|
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
// Start testing
|
||||||
|
// Only MaximumSystemApplications is set in csConf
|
||||||
|
LeafQueue a = (LeafQueue) queues.get(A);
|
||||||
|
LeafQueue b = (LeafQueue) queues.get(B);
|
||||||
|
|
||||||
|
float queueAScale = (float) QUEUE_A_RESOURCE.getMemorySize() /
|
||||||
|
(float) clusterResource.getMemorySize();
|
||||||
|
float queueBScale = (float) QUEUE_B_RESOURCE.getMemorySize() /
|
||||||
|
(float) clusterResource.getMemorySize();
|
||||||
|
|
||||||
|
assertEquals(queueAScale, a.getQueueCapacities().getCapacity(),
|
||||||
|
DELTA);
|
||||||
|
assertEquals(1f, a.getQueueCapacities().getMaximumCapacity(),
|
||||||
|
DELTA);
|
||||||
|
assertEquals(queueAScale, a.getQueueCapacities().getAbsoluteCapacity(),
|
||||||
|
DELTA);
|
||||||
|
assertEquals(1f,
|
||||||
|
a.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
assertEquals((int) (csConf.getMaximumSystemApplications() * queueAScale),
|
||||||
|
a.getMaxApplications());
|
||||||
|
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
assertEquals(queueBScale,
|
||||||
|
b.getQueueCapacities().getCapacity(), DELTA);
|
||||||
|
assertEquals(1f,
|
||||||
|
b.getQueueCapacities().getMaximumCapacity(), DELTA);
|
||||||
|
assertEquals(queueBScale,
|
||||||
|
b.getQueueCapacities().getAbsoluteCapacity(), DELTA);
|
||||||
|
assertEquals(1f,
|
||||||
|
b.getQueueCapacities().getAbsoluteMaximumCapacity(), DELTA);
|
||||||
|
assertEquals((int) (csConf.getMaximumSystemApplications() * queueBScale),
|
||||||
|
b.getMaxApplications());
|
||||||
|
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
// Set GlobalMaximumApplicationsPerQueue in csConf
|
||||||
|
csConf.setGlobalMaximumApplicationsPerQueue(20000);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
|
||||||
|
queueAScale), a.getMaxApplications());
|
||||||
|
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
||||||
|
assertEquals((int) (csConf.getGlobalMaximumApplicationsPerQueue() *
|
||||||
|
queueBScale), b.getMaxApplications());
|
||||||
|
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
// Set MaximumApplicationsPerQueue in csConf
|
||||||
|
int queueAMaxApplications = 30000;
|
||||||
|
int queueBMaxApplications = 30000;
|
||||||
|
csConf.set("yarn.scheduler.capacity." + Q_A + ".maximum-applications",
|
||||||
|
Integer.toString(queueAMaxApplications));
|
||||||
|
csConf.set("yarn.scheduler.capacity." + Q_B + ".maximum-applications",
|
||||||
|
Integer.toString(queueBMaxApplications));
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
assertEquals(queueAMaxApplications, a.getMaxApplications());
|
||||||
|
assertEquals(a.getMaxApplications(), a.getMaxApplicationsPerUser());
|
||||||
|
assertEquals(queueBMaxApplications, b.getMaxApplications());
|
||||||
|
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
// Extra cases for testing maxApplicationsPerUser
|
||||||
|
int halfPercent = 50;
|
||||||
|
int oneAndQuarterPercent = 125;
|
||||||
|
a.getUsersManager().setUserLimit(halfPercent);
|
||||||
|
b.getUsersManager().setUserLimit(oneAndQuarterPercent);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
assertEquals(a.getMaxApplications() * halfPercent / 100,
|
||||||
|
a.getMaxApplicationsPerUser());
|
||||||
|
// Q_B's limit per user shouldn't be greater
|
||||||
|
// than the whole queue's application limit
|
||||||
|
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
float userLimitFactorQueueA = 0.9f;
|
||||||
|
float userLimitFactorQueueB = 1.1f;
|
||||||
|
a.getUsersManager().setUserLimit(halfPercent);
|
||||||
|
a.getUsersManager().setUserLimitFactor(userLimitFactorQueueA);
|
||||||
|
b.getUsersManager().setUserLimit(100);
|
||||||
|
b.getUsersManager().setUserLimitFactor(userLimitFactorQueueB);
|
||||||
|
root.updateClusterResource(clusterResource,
|
||||||
|
new ResourceLimits(clusterResource));
|
||||||
|
|
||||||
|
assertEquals((int) (a.getMaxApplications() * halfPercent *
|
||||||
|
userLimitFactorQueueA / 100), a.getMaxApplicationsPerUser());
|
||||||
|
// Q_B's limit per user shouldn't be greater
|
||||||
|
// than the whole queue's application limit
|
||||||
|
assertEquals(b.getMaxApplications(), b.getMaxApplicationsPerUser());
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
@After
|
@After
|
||||||
public void tearDown() throws Exception {
|
public void tearDown() throws Exception {
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user