YARN-474. Fix CapacityScheduler to trigger application-activation when am-resource-percent configuration is refreshed. Contributed by Zhijie Shen.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1461402 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Vinod Kumar Vavilapalli 2013-03-27 03:44:34 +00:00
parent 5319818487
commit d60c2fa17f
3 changed files with 67 additions and 2 deletions

View File

@ -143,6 +143,9 @@ Release 2.0.5-beta - UNRELEASED
YARN-498. Unmanaged AM launcher does not set various constants in env for YARN-498. Unmanaged AM launcher does not set various constants in env for
an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas) an AM, also does not handle failed AMs properly. (Hitesh Shah via bikas)
YARN-474. Fix CapacityScheduler to trigger application-activation when
am-resource-percent configuration is refreshed. (Zhijie Shen via vinodkv)
Release 2.0.4-alpha - UNRELEASED Release 2.0.4-alpha - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -607,6 +607,10 @@ public synchronized void reinitialize(
newlyParsedLeafQueue.getMaximumActiveApplications(), newlyParsedLeafQueue.getMaximumActiveApplications(),
newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(), newlyParsedLeafQueue.getMaximumActiveApplicationsPerUser(),
newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls); newlyParsedLeafQueue.state, newlyParsedLeafQueue.acls);
// queue metrics are updated, more resource may be available
// activate the pending applications if possible
activateApplications();
} }
@Override @Override

View File

@ -138,6 +138,7 @@ public void setUp() throws Exception {
private static final String C = "c"; private static final String C = "c";
private static final String C1 = "c1"; private static final String C1 = "c1";
private static final String D = "d"; private static final String D = "d";
private static final String E = "e";
private void setupQueueConfiguration( private void setupQueueConfiguration(
CapacitySchedulerConfiguration conf, CapacitySchedulerConfiguration conf,
final String newRoot) { final String newRoot) {
@ -148,7 +149,7 @@ private void setupQueueConfiguration(
conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " "); conf.setAcl(CapacitySchedulerConfiguration.ROOT, QueueACL.SUBMIT_APPLICATIONS, " ");
final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot; final String Q_newRoot = CapacitySchedulerConfiguration.ROOT + "." + newRoot;
conf.setQueues(Q_newRoot, new String[] {A, B, C, D}); conf.setQueues(Q_newRoot, new String[] {A, B, C, D, E});
conf.setCapacity(Q_newRoot, 100); conf.setCapacity(Q_newRoot, 100);
conf.setMaximumCapacity(Q_newRoot, 100); conf.setMaximumCapacity(Q_newRoot, 100);
conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " "); conf.setAcl(Q_newRoot, QueueACL.SUBMIT_APPLICATIONS, " ");
@ -174,10 +175,14 @@ private void setupQueueConfiguration(
conf.setCapacity(Q_C1, 100); conf.setCapacity(Q_C1, 100);
final String Q_D = Q_newRoot + "." + D; final String Q_D = Q_newRoot + "." + D;
conf.setCapacity(Q_D, 10); conf.setCapacity(Q_D, 9);
conf.setMaximumCapacity(Q_D, 11); conf.setMaximumCapacity(Q_D, 11);
conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d"); conf.setAcl(Q_D, QueueACL.SUBMIT_APPLICATIONS, "user_d");
final String Q_E = Q_newRoot + "." + E;
conf.setCapacity(Q_E, 1);
conf.setMaximumCapacity(Q_E, 1);
conf.setAcl(Q_E, QueueACL.SUBMIT_APPLICATIONS, "user_e");
} }
static LeafQueue stubLeafQueue(LeafQueue queue) { static LeafQueue stubLeafQueue(LeafQueue queue) {
@ -1567,6 +1572,59 @@ public void testSchedulingConstraints() throws Exception {
} }
@Test (timeout = 30000)
public void testActivateApplicationAfterQueueRefresh() throws Exception {
// Manipulate queue 'e'
LeafQueue e = stubLeafQueue((LeafQueue)queues.get(E));
// Users
final String user_e = "user_e";
// Submit applications
final ApplicationAttemptId appAttemptId_0 =
TestUtils.getMockApplicationAttemptId(0, 0);
FiCaSchedulerApp app_0 =
new FiCaSchedulerApp(appAttemptId_0, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_0, user_e, E);
final ApplicationAttemptId appAttemptId_1 =
TestUtils.getMockApplicationAttemptId(1, 0);
FiCaSchedulerApp app_1 =
new FiCaSchedulerApp(appAttemptId_1, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_1, user_e, E); // same user
final ApplicationAttemptId appAttemptId_2 =
TestUtils.getMockApplicationAttemptId(2, 0);
FiCaSchedulerApp app_2 =
new FiCaSchedulerApp(appAttemptId_2, user_e, e,
mock(ActiveUsersManager.class), rmContext);
e.submitApplication(app_2, user_e, E); // same user
// before reinitialization
assertEquals(2, e.activeApplications.size());
assertEquals(1, e.pendingApplications.size());
csConf.setDouble(CapacitySchedulerConfiguration
.MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
CapacitySchedulerConfiguration
.DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT * 2);
Map<String, CSQueue> newQueues = new HashMap<String, CSQueue>();
CSQueue newRoot =
CapacityScheduler.parseQueue(csContext, csConf, null,
CapacitySchedulerConfiguration.ROOT,
newQueues, queues,
TestUtils.spyHook);
queues = newQueues;
root.reinitialize(newRoot, cs.getClusterResources());
// after reinitialization
assertEquals(3, e.activeApplications.size());
assertEquals(0, e.pendingApplications.size());
}
public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) { public boolean hasQueueACL(List<QueueUserACLInfo> aclInfos, QueueACL acl) {
for (QueueUserACLInfo aclInfo : aclInfos) { for (QueueUserACLInfo aclInfo : aclInfos) {
if (aclInfo.getUserAcls().contains(acl)) { if (aclInfo.getUserAcls().contains(acl)) {