YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available capacity instead of configured max capacity. Contributed by Craig Welch

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1616580 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-08-07 20:00:04 +00:00
parent b98400df7b
commit 8437df8ba9
8 changed files with 354 additions and 2 deletions

View File

@ -145,6 +145,9 @@ Release 2.6.0 - UNRELEASED
YARN-2388. Fixed TestTimelineWebServices failure due to HADOOP-10791. (zjshen)
YARN-2008. Fixed CapacityScheduler to calculate headroom based on max available
capacity instead of configured max capacity. (Craig Welch via jianhe)
Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES

View File

@ -42,6 +42,13 @@ public float divide(Resource unused,
Resource numerator, Resource denominator) {
return ratio(numerator, denominator);
}
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f) {
return true;
}
return false;
}
@Override
public float ratio(Resource a, Resource b) {

View File

@ -109,6 +109,14 @@ public float divide(Resource clusterResource,
getResourceAsValue(clusterResource, numerator, true) /
getResourceAsValue(clusterResource, denominator, true);
}
@Override
public boolean isInvalidDivisor(Resource r) {
if (r.getMemory() == 0.0f || r.getVirtualCores() == 0.0f) {
return true;
}
return false;
}
@Override
public float ratio(Resource a, Resource b) {

View File

@ -149,6 +149,15 @@ public abstract Resource normalize(Resource r, Resource minimumResource,
public abstract float divide(
Resource clusterResource, Resource numerator, Resource denominator);
/**
* Determine if a resource is not suitable for use as a divisor
* (will result in divide by 0, etc)
*
* @param r resource
* @return true if divisor is invalid (should not be used), false else
*/
public abstract boolean isInvalidDivisor(Resource r);
/**
* Ratio of resource <code>a</code> to resource <code>b</code>.
*

View File

@ -184,6 +184,11 @@ public static Resource roundDown(
return calculator.roundDown(lhs, factor);
}
public static boolean isInvalidDivisor(
ResourceCalculator resourceCalculator, Resource divisor) {
return resourceCalculator.isInvalidDivisor(divisor);
}
public static float ratio(
ResourceCalculator resourceCalculator, Resource lhs, Resource rhs) {
return resourceCalculator.ratio(lhs, rhs);

View File

@ -17,6 +17,9 @@
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.server.utils.Lock;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
@ -24,6 +27,8 @@
class CSQueueUtils {
private static final Log LOG = LogFactory.getLog(CSQueueUtils.class);
final static float EPSILON = 0.0001f;
public static void checkMaxCapacity(String queueName,
@ -113,4 +118,52 @@ public static void updateQueueStatistics(
)
);
}
public static float getAbsoluteMaxAvailCapacity(
ResourceCalculator resourceCalculator, Resource clusterResource, CSQueue queue) {
CSQueue parent = queue.getParent();
if (parent == null) {
return queue.getAbsoluteMaximumCapacity();
}
//Get my parent's max avail, needed to determine my own
float parentMaxAvail = getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, parent);
//...and as a resource
Resource parentResource = Resources.multiply(clusterResource, parentMaxAvail);
//check for no resources parent before dividing, if so, max avail is none
if (Resources.isInvalidDivisor(resourceCalculator, parentResource)) {
return 0.0f;
}
//sibling used is parent used - my used...
float siblingUsedCapacity = Resources.ratio(
resourceCalculator,
Resources.subtract(parent.getUsedResources(), queue.getUsedResources()),
parentResource);
//my max avail is the lesser of my max capacity and what is unused from my parent
//by my siblings (if they are beyond their base capacity)
float maxAvail = Math.min(
queue.getMaximumCapacity(),
1.0f - siblingUsedCapacity);
//and, mutiply by parent to get absolute (cluster relative) value
float absoluteMaxAvail = maxAvail * parentMaxAvail;
if (LOG.isDebugEnabled()) {
LOG.debug("qpath " + queue.getQueuePath());
LOG.debug("parentMaxAvail " + parentMaxAvail);
LOG.debug("siblingUsedCapacity " + siblingUsedCapacity);
LOG.debug("getAbsoluteMaximumCapacity " + queue.getAbsoluteMaximumCapacity());
LOG.debug("maxAvail " + maxAvail);
LOG.debug("absoluteMaxAvail " + absoluteMaxAvail);
}
if (absoluteMaxAvail < 0.0f) {
absoluteMaxAvail = 0.0f;
} else if (absoluteMaxAvail > 1.0f) {
absoluteMaxAvail = 1.0f;
}
return absoluteMaxAvail;
}
}

View File

@ -976,13 +976,18 @@ private Resource computeUserLimitAndSetHeadroom(
Resource userLimit = // User limit
computeUserLimit(application, clusterResource, required);
//Max avail capacity needs to take into account usage by ancestor-siblings
//which are greater than their base capacity, so we are interested in "max avail"
//capacity
float absoluteMaxAvailCapacity = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, this);
Resource queueMaxCap = // Queue Max-Capacity
Resources.multiplyAndNormalizeDown(
resourceCalculator,
clusterResource,
absoluteMaxCapacity,
absoluteMaxAvailCapacity,
minimumAllocation);
Resource userConsumed = getUser(user).getConsumedResources();

View File

@ -0,0 +1,262 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.mockito.Matchers.any;
import static org.mockito.Matchers.eq;
import static org.mockito.Mockito.doAnswer;
import static org.mockito.Mockito.doReturn;
import static org.mockito.Mockito.inOrder;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.reset;
import static org.mockito.Mockito.when;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.junit.Assert;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator;
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import org.mockito.InOrder;
import org.mockito.invocation.InvocationOnMock;
import org.mockito.stubbing.Answer;
public class TestCSQueueUtils {
private static final Log LOG = LogFactory.getLog(TestCSQueueUtils.class);
final static int GB = 1024;
@Test
public void testAbsoluteMaxAvailCapacityInvalidDivisor() throws Exception {
runInvalidDivisorTest(false);
runInvalidDivisorTest(true);
}
public void runInvalidDivisorTest(boolean useDominant) throws Exception {
ResourceCalculator resourceCalculator;
Resource clusterResource;
if (useDominant) {
resourceCalculator = new DominantResourceCalculator();
clusterResource = Resources.createResource(10, 0);
} else {
resourceCalculator = new DefaultResourceCalculator();
clusterResource = Resources.createResource(0, 99);
}
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getClusterResource()).thenReturn(clusterResource);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(0, 0));
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
csConf.setCapacity(L1Q1P, 90);
csConf.setMaximumCapacity(L1Q1P, 90);
ParentQueue root = new ParentQueue(csContext,
CapacitySchedulerConfiguration.ROOT, null, null);
LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, root));
LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l1q1));
assertEquals(0.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l1q1), 0.000001f);
}
@Test
public void testAbsoluteMaxAvailCapacityNoUse() throws Exception {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getClusterResource()).thenReturn(clusterResource);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
final String L1Q1 = "L1Q1";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1});
final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
csConf.setCapacity(L1Q1P, 90);
csConf.setMaximumCapacity(L1Q1P, 90);
ParentQueue root = new ParentQueue(csContext,
CapacitySchedulerConfiguration.ROOT, null, null);
LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
LOG.info("t1 root " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, root));
LOG.info("t1 l1q1 " + CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l1q1));
assertEquals(1.0f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, root), 0.000001f);
assertEquals(0.9f, CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l1q1), 0.000001f);
}
@Test
public void testAbsoluteMaxAvailCapacityWithUse() throws Exception {
ResourceCalculator resourceCalculator = new DefaultResourceCalculator();
Resource clusterResource = Resources.createResource(100 * 16 * GB, 100 * 32);
YarnConfiguration conf = new YarnConfiguration();
CapacitySchedulerConfiguration csConf = new CapacitySchedulerConfiguration();
CapacitySchedulerContext csContext = mock(CapacitySchedulerContext.class);
when(csContext.getConf()).thenReturn(conf);
when(csContext.getConfiguration()).thenReturn(csConf);
when(csContext.getClusterResource()).thenReturn(clusterResource);
when(csContext.getResourceCalculator()).thenReturn(resourceCalculator);
when(csContext.getMinimumResourceCapability()).
thenReturn(Resources.createResource(GB, 1));
when(csContext.getMaximumResourceCapability()).
thenReturn(Resources.createResource(16*GB, 32));
final String L1Q1 = "L1Q1";
final String L1Q2 = "L1Q2";
final String L2Q1 = "L2Q1";
final String L2Q2 = "L2Q2";
csConf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[] {L1Q1, L1Q2,
L2Q1, L2Q2});
final String L1Q1P = CapacitySchedulerConfiguration.ROOT + "." + L1Q1;
csConf.setCapacity(L1Q1P, 80);
csConf.setMaximumCapacity(L1Q1P, 80);
final String L1Q2P = CapacitySchedulerConfiguration.ROOT + "." + L1Q2;
csConf.setCapacity(L1Q2P, 20);
csConf.setMaximumCapacity(L1Q2P, 100);
final String L2Q1P = L1Q1P + "." + L2Q1;
csConf.setCapacity(L2Q1P, 50);
csConf.setMaximumCapacity(L2Q1P, 50);
final String L2Q2P = L1Q1P + "." + L2Q2;
csConf.setCapacity(L2Q2P, 50);
csConf.setMaximumCapacity(L2Q2P, 50);
float result;
ParentQueue root = new ParentQueue(csContext,
CapacitySchedulerConfiguration.ROOT, null, null);
LeafQueue l1q1 = new LeafQueue(csContext, L1Q1, root, null);
LeafQueue l1q2 = new LeafQueue(csContext, L1Q2, root, null);
LeafQueue l2q2 = new LeafQueue(csContext, L2Q2, l1q1, null);
LeafQueue l2q1 = new LeafQueue(csContext, L2Q1, l1q1, null);
//no usage, all based on maxCapacity (prior behavior)
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.4f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//some usage, but below the base capacity
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.4f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//usage gt base on parent sibling
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.3f));
Resources.addTo(l1q2.getUsedResources(), Resources.multiply(clusterResource, 0.3f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//same as last, but with usage also on direct parent
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.1f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//add to direct sibling, below the threshold of effect at present
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.3f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
//add to direct sibling, now above the threshold of effect
//(it's cumulative with prior tests)
Resources.addTo(root.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l1q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
Resources.addTo(l2q1.getUsedResources(), Resources.multiply(clusterResource, 0.2f));
result = CSQueueUtils.getAbsoluteMaxAvailCapacity(
resourceCalculator, clusterResource, l2q2);
assertEquals( 0.1f, result, 0.000001f);
LOG.info("t2 l2q2 " + result);
}
}