YARN-1857. CapacityScheduler headroom doesn't account for other AM's running. Contributed by Chen He and Craig Welch

This commit is contained in:
Jian He 2014-10-07 13:43:12 -07:00
parent 9196db9a08
commit 30d56fdbb4
3 changed files with 168 additions and 12 deletions

View File

@ -585,6 +585,9 @@ Release 2.6.0 - UNRELEASED
YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when YARN-2644. Fixed CapacityScheduler to return up-to-date headroom when
AM allocates. (Craig Welch via jianhe) AM allocates. (Craig Welch via jianhe)
YARN-1857. CapacityScheduler headroom doesn't account for other AM's running.
(Chen He and Craig Welch via jianhe)
Release 2.5.1 - 2014-09-05 Release 2.5.1 - 2014-09-05
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -981,11 +981,28 @@ protected Resource getHeadroom(User user, Resource queueMaxCap,
private Resource getHeadroom(User user, Resource queueMaxCap, private Resource getHeadroom(User user, Resource queueMaxCap,
Resource clusterResource, Resource userLimit) { Resource clusterResource, Resource userLimit) {
/**
* Headroom is:
* min(
* min(userLimit, queueMaxCap) - userConsumed,
* queueMaxCap - queueUsedResources
* )
*
* ( which can be expressed as,
* min (userLimit - userConsumed, queuMaxCap - userConsumed,
* queueMaxCap - queueUsedResources)
* )
*
* given that queueUsedResources >= userConsumed, this simplifies to
*
* >> min (userlimit - userConsumed, queueMaxCap - queueUsedResources) <<
*
*/
Resource headroom = Resource headroom =
Resources.subtract( Resources.min(resourceCalculator, clusterResource,
Resources.min(resourceCalculator, clusterResource, Resources.subtract(userLimit, user.getConsumedResources()),
userLimit, queueMaxCap), Resources.subtract(queueMaxCap, usedResources)
user.getConsumedResources()); );
return headroom; return headroom;
} }
@ -1051,16 +1068,12 @@ protected synchronized boolean assignToQueue(Resource clusterResource,
@Lock({LeafQueue.class, FiCaSchedulerApp.class}) @Lock({LeafQueue.class, FiCaSchedulerApp.class})
private Resource computeUserLimitAndSetHeadroom( Resource computeUserLimitAndSetHeadroom(
FiCaSchedulerApp application, Resource clusterResource, Resource required) { FiCaSchedulerApp application, Resource clusterResource, Resource required) {
String user = application.getUser(); String user = application.getUser();
User queueUser = getUser(user); User queueUser = getUser(user);
/**
* Headroom is min((userLimit, queue-max-cap) - consumed)
*/
Resource userLimit = // User limit Resource userLimit = // User limit
computeUserLimit(application, clusterResource, required, queueUser); computeUserLimit(application, clusterResource, required, queueUser);

View File

@ -215,6 +215,7 @@ private void setupQueueConfiguration(
conf.setCapacity(Q_E, 1); conf.setCapacity(Q_E, 1);
conf.setMaximumCapacity(Q_E, 1); conf.setMaximumCapacity(Q_E, 1);
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e"); conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
} }
static LeafQueue stubLeafQueue(LeafQueue queue) { static LeafQueue stubLeafQueue(LeafQueue queue) {
@ -638,7 +639,146 @@ public void testUserLimits() throws Exception {
assertEquals(2*GB, app_0.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_0.getCurrentConsumption().getMemory());
assertEquals(2*GB, app_1.getCurrentConsumption().getMemory()); assertEquals(2*GB, app_1.getCurrentConsumption().getMemory());
} }
@Test
public void testComputeUserLimitAndSetHeadroom(){
LeafQueue qb = stubLeafQueue((LeafQueue)queues.get(B));
qb.setMaxCapacity(1.0f);
// Users
final String user_0 = "user_0";
final String user_1 = "user_1";
//create nodes
String host_0 = "127.0.0.1";
FiCaSchedulerNode node_0 = TestUtils.getMockNode(host_0, DEFAULT_RACK, 0, 8*GB);
String host_1 = "127.0.0.2";
FiCaSchedulerNode node_1 = TestUtils.getMockNode(host_1, DEFAULT_RACK, 0, 8*GB);
final int numNodes = 2;
Resource clusterResource = Resources.createResource(numNodes * (8*GB), 1);
when(csContext.getNumClusterNodes()).thenReturn(numNodes);
//our test plan contains three cases
//1. single user dominate the queue, we test the headroom
//2. two users, but user_0 is assigned 100% of the queue resource,
// submit user_1's application, check headroom correctness
//3. two users, each is assigned 50% of the queue resource
// each user submit one application and check their headrooms
//4. similarly to 3. but user_0 has no quote left and there are
// free resources left, check headroom
//test case 1
qb.setUserLimit(100);
qb.setUserLimitFactor(1);
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_0, qb,
qb.getActiveUsersManager(), rmContext);
qb.submitApplicationAttempt(app_0, user_0);
Priority u0Priority = TestUtils.createMockPriority(1);
app_0.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u0Priority, recordFactory)));
assertEquals("There should only be 1 active user!",
1, qb.getActiveUsersManager().getNumActiveUsers());
//get headroom
qb.assignContainers(clusterResource, node_0, false);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
//maxqueue 16G, userlimit 13G, - 4G used = 9G
assertEquals(9*GB,app_0.getHeadroom().getMemory());
//test case 2
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_1, qb,
qb.getActiveUsersManager(), rmContext);
Priority u1Priority = TestUtils.createMockPriority(2);
app_2.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 4*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_2, user_1);
qb.assignContainers(clusterResource, node_1, false);
qb.computeUserLimitAndSetHeadroom(app_0, clusterResource,
app_0.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
assertEquals(8*GB, qb.getUsedResources().getMemory());
assertEquals(4*GB, app_0.getCurrentConsumption().getMemory());
//maxqueue 16G, userlimit 13G, - 4G used = 9G BUT
//maxqueue 16G - used 8G (4 each app/user) = 8G max headroom (the new logic)
assertEquals(8*GB, app_0.getHeadroom().getMemory());
assertEquals(4*GB, app_2.getCurrentConsumption().getMemory());
assertEquals(8*GB, app_2.getHeadroom().getMemory());
//test case 3
qb.finishApplication(app_0.getApplicationId(), user_0);
qb.finishApplication(app_2.getApplicationId(), user_1);
qb.releaseResource(clusterResource, app_0, app_0.getResource(u0Priority));
qb.releaseResource(clusterResource, app_2, app_2.getResource(u1Priority));
qb.setUserLimit(50);
qb.setUserLimitFactor(1);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_0, qb,
qb.getActiveUsersManager(), rmContext);
final ApplicationAttemptId appAttemptId_3 =
TestUtils.getMockApplicationAttemptId(3, 0);
FiCaSchedulerApp app_3 =
new FiCaSchedulerApp(appAttemptId_3, user_1, qb,
qb.getActiveUsersManager(), rmContext);
app_1.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
u0Priority, recordFactory)));
app_3.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 2*GB, 1, true,
u1Priority, recordFactory)));
qb.submitApplicationAttempt(app_1, user_0);
qb.submitApplicationAttempt(app_3, user_1);
qb.assignContainers(clusterResource, node_0, false);
qb.assignContainers(clusterResource, node_0, false);
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability());
assertEquals(4*GB, qb.getUsedResources().getMemory());
//maxqueue 16G, userlimit 7G, used (by each user) 2G, headroom 5G (both)
assertEquals(5*GB, app_3.getHeadroom().getMemory());
assertEquals(5*GB, app_1.getHeadroom().getMemory());
//test case 4
final ApplicationAttemptId appAttemptId_4 =
TestUtils.getMockApplicationAttemptId(4, 0);
FiCaSchedulerApp app_4 =
new FiCaSchedulerApp(appAttemptId_4, user_0, qb,
qb.getActiveUsersManager(), rmContext);
qb.submitApplicationAttempt(app_4, user_0);
app_4.updateResourceRequests(Collections.singletonList(
TestUtils.createResourceRequest(ResourceRequest.ANY, 6*GB, 1, true,
u0Priority, recordFactory)));
qb.assignContainers(clusterResource, node_1, false);
qb.computeUserLimitAndSetHeadroom(app_4, clusterResource,
app_4.getResourceRequest(u0Priority, ResourceRequest.ANY).getCapability());
qb.computeUserLimitAndSetHeadroom(app_3, clusterResource,
app_3.getResourceRequest(u1Priority, ResourceRequest.ANY).getCapability());
//app3 is user1, active from last test case
//maxqueue 16G, userlimit 13G, used 2G, would be headroom 10G BUT
//10G in use, so max possible headroom is 6G (new logic)
assertEquals(6*GB, app_3.getHeadroom().getMemory());
//testcase3 still active - 2+2+6=10
assertEquals(10*GB, qb.getUsedResources().getMemory());
//app4 is user 0
//maxqueue 16G, userlimit 13G, used 8G, headroom 5G
//(8G used is 6G from this test case - app4, 2 from last test case, app_1)
assertEquals(5*GB, app_4.getHeadroom().getMemory());
}
@Test @Test
public void testUserHeadroomMultiApp() throws Exception { public void testUserHeadroomMultiApp() throws Exception {
// Mock the queue // Mock the queue
@ -787,7 +927,7 @@ public void testHeadroomWithMaxCap() throws Exception {
// Set user-limit // Set user-limit
a.setUserLimit(50); a.setUserLimit(50);
a.setUserLimitFactor(2); a.setUserLimitFactor(2);
// Now, only user_0 should be active since he is the only one with // Now, only user_0 should be active since he is the only one with
// outstanding requests // outstanding requests
assertEquals("There should only be 1 active user!", assertEquals("There should only be 1 active user!",
@ -835,7 +975,7 @@ public void testHeadroomWithMaxCap() throws Exception {
priority, recordFactory))); priority, recordFactory)));
assertEquals(1, a.getActiveUsersManager().getNumActiveUsers()); assertEquals(1, a.getActiveUsersManager().getNumActiveUsers());
a.assignContainers(clusterResource, node_1, false); a.assignContainers(clusterResource, node_1, false);
assertEquals(1*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap assertEquals(0*GB, app_2.getHeadroom().getMemory()); // hit queue max-cap
} }
@Test @Test