MAPREDUCE-3126. Fixed a corner case in CapacityScheduler where headroom wasn't updated on changes to cluster size.
git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1182000 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
parent
1c358b0796
commit
b8f0836f94
@ -1565,7 +1565,7 @@ Release 0.23.0 - Unreleased
|
||||
job submission files to fail fast. (Abhijit Suresh Shingate via acmurthy)
|
||||
|
||||
MAPREDUCE-3158. Fix test failures in MRv1 due to default framework being
|
||||
set to yarn. (Hitesh Shah via acmurhty)
|
||||
set to yarn. (Hitesh Shah via acmurthy)
|
||||
|
||||
MAPREDUCE-3167. container-executor is not being packaged with the assembly
|
||||
target. (mahadev)
|
||||
@ -1576,6 +1576,9 @@ Release 0.23.0 - Unreleased
|
||||
MAPREDUCE-2668. Fixed AuxServices to send a signal on application-finish
|
||||
to all the services. (Thomas Graves via vinodkv)
|
||||
|
||||
MAPREDUCE-3126. Fixed a corner case in CapacityScheduler where headroom
|
||||
wasn't updated on changes to cluster size. (acmurthy)
|
||||
|
||||
Release 0.22.0 - Unreleased
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -166,6 +166,12 @@ public int getUserLimit(String queue) {
|
||||
return userLimit;
|
||||
}
|
||||
|
||||
public void setUserLimit(String queue, int userLimit) {
|
||||
setInt(getQueuePrefix(queue) + USER_LIMIT, userLimit);
|
||||
LOG.info("here setUserLimit: queuePrefix=" + getQueuePrefix(queue) +
|
||||
", userLimit=" + getUserLimit(queue));
|
||||
}
|
||||
|
||||
public float getUserLimitFactor(String queue) {
|
||||
float userLimitFactor =
|
||||
getFloat(getQueuePrefix(queue) + USER_LIMIT_FACTOR,
|
||||
|
@ -698,9 +698,7 @@ private synchronized SchedulerApp getApplication(
|
||||
application.showRequests();
|
||||
|
||||
synchronized (application) {
|
||||
Resource userLimit =
|
||||
computeUserLimit(application, clusterResource, Resources.none());
|
||||
setUserResourceLimit(application, userLimit);
|
||||
computeAndSetUserResourceLimit(application, clusterResource);
|
||||
|
||||
for (Priority priority : application.getPriorities()) {
|
||||
// Required resource
|
||||
@ -719,7 +717,7 @@ private synchronized SchedulerApp getApplication(
|
||||
}
|
||||
|
||||
// User limits
|
||||
userLimit =
|
||||
Resource userLimit =
|
||||
computeUserLimit(application, clusterResource, required);
|
||||
if (!assignToUser(application.getUser(), userLimit)) {
|
||||
break;
|
||||
@ -807,10 +805,13 @@ private synchronized boolean assignToQueue(Resource clusterResource,
|
||||
return true;
|
||||
}
|
||||
|
||||
private void setUserResourceLimit(SchedulerApp application,
|
||||
Resource resourceLimit) {
|
||||
application.setAvailableResourceLimit(resourceLimit);
|
||||
metrics.setAvailableResourcesToUser(application.getUser(), application.getHeadroom());
|
||||
private void computeAndSetUserResourceLimit(SchedulerApp application,
|
||||
Resource clusterResource) {
|
||||
Resource userLimit =
|
||||
computeUserLimit(application, clusterResource, Resources.none());
|
||||
application.setAvailableResourceLimit(userLimit);
|
||||
metrics.setAvailableResourcesToUser(application.getUser(),
|
||||
application.getHeadroom());
|
||||
}
|
||||
|
||||
private int roundUp(int memory) {
|
||||
@ -1270,12 +1271,18 @@ synchronized void releaseResource(Resource clusterResource,
|
||||
|
||||
@Override
|
||||
public synchronized void updateClusterResource(Resource clusterResource) {
|
||||
// Update queue properties
|
||||
maxActiveApplications =
|
||||
computeMaxActiveApplications(clusterResource, maxAMResourcePercent,
|
||||
absoluteCapacity);
|
||||
maxActiveApplicationsPerUser =
|
||||
computeMaxActiveApplicationsPerUser(maxActiveApplications, userLimit,
|
||||
userLimitFactor);
|
||||
|
||||
// Update application properties
|
||||
for (SchedulerApp application : activeApplications) {
|
||||
computeAndSetUserResourceLimit(application, clusterResource);
|
||||
}
|
||||
}
|
||||
|
||||
private synchronized void updateResource(Resource clusterResource) {
|
||||
|
@ -14,6 +14,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.junit.After;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
@ -101,8 +102,10 @@ public void testLimitsComputation() throws Exception {
|
||||
|
||||
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getMinimumResourceCapability()).thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).thenReturn(Resources.createResource(16*GB));
|
||||
when(csContext.getMinimumResourceCapability()).
|
||||
thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).
|
||||
thenReturn(Resources.createResource(16*GB));
|
||||
|
||||
// Say cluster has 100 nodes of 16G each
|
||||
Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
||||
@ -227,6 +230,76 @@ public void testActiveApplicationLimits() throws Exception {
|
||||
assertEquals(0, queue.getNumPendingApplications(user_1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testHeadroom() throws Exception {
|
||||
CapacitySchedulerConfiguration csConf =
|
||||
new CapacitySchedulerConfiguration();
|
||||
csConf.setUserLimit(CapacityScheduler.ROOT + "." + A, 25);
|
||||
setupQueueConfiguration(csConf);
|
||||
|
||||
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
|
||||
when(csContext.getConfiguration()).thenReturn(csConf);
|
||||
when(csContext.getMinimumResourceCapability()).
|
||||
thenReturn(Resources.createResource(GB));
|
||||
when(csContext.getMaximumResourceCapability()).
|
||||
thenReturn(Resources.createResource(16*GB));
|
||||
|
||||
// Say cluster has 100 nodes of 16G each
|
||||
Resource clusterResource = Resources.createResource(100 * 16 * GB);
|
||||
when(csContext.getClusterResources()).thenReturn(clusterResource);
|
||||
|
||||
Map<String, CSQueue> queues = new HashMap<String, CSQueue>();
|
||||
CapacityScheduler.parseQueue(csContext, csConf, null, "root",
|
||||
queues, queues,
|
||||
CapacityScheduler.queueComparator,
|
||||
CapacityScheduler.applicationComparator,
|
||||
TestUtils.spyHook);
|
||||
|
||||
// Manipulate queue 'a'
|
||||
LeafQueue queue = TestLeafQueue.stubLeafQueue((LeafQueue)queues.get(A));
|
||||
|
||||
String host_0 = "host_0";
|
||||
String rack_0 = "rack_0";
|
||||
SchedulerNode node_0 = TestUtils.getMockNode(host_0, rack_0, 0, 16*GB);
|
||||
|
||||
final String user_0 = "user_0";
|
||||
final String user_1 = "user_1";
|
||||
|
||||
int APPLICATION_ID = 0;
|
||||
|
||||
// Submit first application from user_0, check headroom
|
||||
SchedulerApp app_0_0 = getMockApplication(APPLICATION_ID++, user_0);
|
||||
queue.submitApplication(app_0_0, user_0, A);
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
Resource expectedHeadroom = Resources.createResource(10*16*GB);
|
||||
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
|
||||
// Submit second application from user_0, check headroom
|
||||
SchedulerApp app_0_1 = getMockApplication(APPLICATION_ID++, user_0);
|
||||
queue.submitApplication(app_0_1, user_0, A);
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
verify(app_0_0, times(2)).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));// no change
|
||||
|
||||
// Submit first application from user_1, check for new headroom
|
||||
SchedulerApp app_1_0 = getMockApplication(APPLICATION_ID++, user_1);
|
||||
queue.submitApplication(app_1_0, user_1, A);
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(10*16*GB / 2); // changes
|
||||
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
|
||||
// Now reduce cluster size and check for the smaller headroom
|
||||
clusterResource = Resources.createResource(90*16*GB);
|
||||
queue.assignContainers(clusterResource, node_0); // Schedule to compute
|
||||
expectedHeadroom = Resources.createResource(9*16*GB / 2); // changes
|
||||
verify(app_0_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
verify(app_0_1).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
verify(app_1_0).setAvailableResourceLimit(eq(expectedHeadroom));
|
||||
}
|
||||
|
||||
|
||||
@After
|
||||
public void tearDown() {
|
||||
|
||||
|
@ -117,7 +117,7 @@ private void setupQueueConfiguration(CapacitySchedulerConfiguration conf) {
|
||||
LOG.info("Setup top-level queues a and b");
|
||||
}
|
||||
|
||||
private LeafQueue stubLeafQueue(LeafQueue queue) {
|
||||
static LeafQueue stubLeafQueue(LeafQueue queue) {
|
||||
|
||||
// Mock some methods for ease in these unit tests
|
||||
|
||||
|
Loading…
Reference in New Issue
Block a user