YARN-789. Enable zero capabilities resource requests in fair scheduler. (tucu)

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1493219 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Alejandro Abdelnur 2013-06-14 19:16:40 +00:00
parent 079ed1c9e1
commit b9753e509c
9 changed files with 164 additions and 31 deletions

View File

@ -346,6 +346,9 @@ Release 2.1.0-beta - UNRELEASED
YARN-803. factor out scheduler config validation from the ResourceManager YARN-803. factor out scheduler config validation from the ResourceManager
to each scheduler implementation. (tucu) to each scheduler implementation. (tucu)
YARN-789. Enable zero capabilities resource requests in fair scheduler.
(tucu)
OPTIMIZATIONS OPTIMIZATIONS
YARN-512. Log aggregation root directory check is more expensive than it YARN-512. Log aggregation root directory check is more expensive than it

View File

@ -54,19 +54,25 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
@Override @Override
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) { Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min( int normalizedMemory = Math.min(
roundUp( roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()), Math.max(r.getMemory(), minimumResource.getMemory()),
minimumResource.getMemory()), stepFactor.getMemory()),
maximumResource.getMemory()); maximumResource.getMemory());
return Resources.createResource(normalizedMemory); return Resources.createResource(normalizedMemory);
} }
@Override @Override
public Resource roundUp(Resource r, Resource minimumResource) { public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
return normalize(r, minimumResource, maximumResource, minimumResource);
}
@Override
public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource( return Resources.createResource(
roundUp(r.getMemory(),minimumResource.getMemory()) roundUp(r.getMemory(), stepFactor.getMemory())
); );
} }

View File

@ -124,26 +124,26 @@ public Resource divideAndCeil(Resource numerator, int denominator) {
@Override @Override
public Resource normalize(Resource r, Resource minimumResource, public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) { Resource maximumResource, Resource stepFactor) {
int normalizedMemory = Math.min( int normalizedMemory = Math.min(
roundUp( roundUp(
Math.max(r.getMemory(), minimumResource.getMemory()), Math.max(r.getMemory(), minimumResource.getMemory()),
minimumResource.getMemory()), stepFactor.getMemory()),
maximumResource.getMemory()); maximumResource.getMemory());
int normalizedCores = Math.min( int normalizedCores = Math.min(
roundUp( roundUp(
Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()), Math.max(r.getVirtualCores(), minimumResource.getVirtualCores()),
minimumResource.getVirtualCores()), stepFactor.getVirtualCores()),
maximumResource.getVirtualCores()); maximumResource.getVirtualCores());
return Resources.createResource(normalizedMemory, return Resources.createResource(normalizedMemory,
normalizedCores); normalizedCores);
} }
@Override @Override
public Resource roundUp(Resource r, Resource minimumResource) { public Resource roundUp(Resource r, Resource stepFactor) {
return Resources.createResource( return Resources.createResource(
roundUp(r.getMemory(), minimumResource.getMemory()), roundUp(r.getMemory(), stepFactor.getMemory()),
roundUp(r.getVirtualCores(), minimumResource.getVirtualCores()) roundUp(r.getVirtualCores(), stepFactor.getVirtualCores())
); );
} }

View File

@ -96,8 +96,26 @@ public abstract Resource multiplyAndNormalizeDown(
* @param maximumResource the upper bound of the resource to be allocated * @param maximumResource the upper bound of the resource to be allocated
* @return normalized resource * @return normalized resource
*/ */
public Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource) {
return normalize(r, minimumResource, maximumResource, minimumResource);
}
/**
* Normalize resource <code>r</code> given the base
* <code>minimumResource</code> and verify against max allowed
* <code>maximumResource</code> using a step factor for hte normalization.
*
* @param r resource
* @param minimumResource minimum value
* @param maximumResource the upper bound of the resource to be allocated
* @param stepFactor the increment for resources to be allocated
* @return normalized resource
*/
public abstract Resource normalize(Resource r, Resource minimumResource, public abstract Resource normalize(Resource r, Resource minimumResource,
Resource maximumResource); Resource maximumResource,
Resource stepFactor);
/** /**
* Round-up resource <code>r</code> given factor <code>stepFactor</code>. * Round-up resource <code>r</code> given factor <code>stepFactor</code>.

View File

@ -132,9 +132,9 @@ public static Resource multiplyAndRoundDown(Resource lhs, double by) {
} }
public static Resource normalize( public static Resource normalize(
ResourceCalculator calculator, Resource lhs, Resource factor, ResourceCalculator calculator, Resource lhs, Resource min,
Resource limit) { Resource max, Resource increment) {
return calculator.normalize(lhs, factor, limit); return calculator.normalize(lhs, min, max, increment);
} }
public static Resource roundUp( public static Resource roundUp(

View File

@ -95,7 +95,7 @@ public static void normalizeRequests(
for (ResourceRequest ask : asks) { for (ResourceRequest ask : asks) {
normalizeRequest( normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource, ask, resourceCalculator, clusterResource, minimumResource,
maximumResource); maximumResource, minimumResource);
} }
} }
@ -112,7 +112,43 @@ public static void normalizeRequest(
Resource normalized = Resource normalized =
Resources.normalize( Resources.normalize(
resourceCalculator, ask.getCapability(), minimumResource, resourceCalculator, ask.getCapability(), minimumResource,
maximumResource); maximumResource, minimumResource);
ask.setCapability(normalized);
}
/**
* Utility method to normalize a list of resource requests, by insuring that
* the memory for each request is a multiple of minMemory and is not zero.
*/
public static void normalizeRequests(
List<ResourceRequest> asks,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
Resource maximumResource,
Resource incrementResource) {
for (ResourceRequest ask : asks) {
normalizeRequest(
ask, resourceCalculator, clusterResource, minimumResource,
maximumResource, incrementResource);
}
}
/**
* Utility method to normalize a resource request, by insuring that the
* requested memory is a multiple of minMemory and is not zero.
*/
public static void normalizeRequest(
ResourceRequest ask,
ResourceCalculator resourceCalculator,
Resource clusterResource,
Resource minimumResource,
Resource maximumResource,
Resource incrementResource) {
Resource normalized =
Resources.normalize(
resourceCalculator, ask.getCapability(), minimumResource,
maximumResource, incrementResource);
ask.setCapability(normalized); ask.setCapability(normalized);
} }

View File

@ -116,6 +116,7 @@ public class FairScheduler implements ResourceScheduler {
private RMContext rmContext; private RMContext rmContext;
private Resource minimumAllocation; private Resource minimumAllocation;
private Resource maximumAllocation; private Resource maximumAllocation;
private Resource incrAllocation;
private QueueManager queueMgr; private QueueManager queueMgr;
private Clock clock; private Clock clock;
@ -560,6 +561,10 @@ public Resource getMinimumResourceCapability() {
return minimumAllocation; return minimumAllocation;
} }
public Resource getIncrementResourceCapability() {
return incrAllocation;
}
@Override @Override
public Resource getMaximumResourceCapability() { public Resource getMaximumResourceCapability() {
return maximumAllocation; return maximumAllocation;
@ -769,7 +774,7 @@ public Allocation allocate(ApplicationAttemptId appAttemptId,
// Sanity check // Sanity check
SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(), SchedulerUtils.normalizeRequests(ask, new DominantResourceCalculator(),
clusterCapacity, minimumAllocation, maximumAllocation); clusterCapacity, minimumAllocation, maximumAllocation, incrAllocation);
// Release containers // Release containers
for (ContainerId releasedContainerId : release) { for (ContainerId releasedContainerId : release) {
@ -1028,6 +1033,7 @@ public synchronized void reinitialize(Configuration conf, RMContext rmContext)
validateConf(this.conf); validateConf(this.conf);
minimumAllocation = this.conf.getMinimumAllocation(); minimumAllocation = this.conf.getMinimumAllocation();
maximumAllocation = this.conf.getMaximumAllocation(); maximumAllocation = this.conf.getMaximumAllocation();
incrAllocation = this.conf.getIncrementAllocation();
userAsDefaultQueue = this.conf.getUserAsDefaultQueue(); userAsDefaultQueue = this.conf.getUserAsDefaultQueue();
nodeLocalityThreshold = this.conf.getLocalityThresholdNode(); nodeLocalityThreshold = this.conf.getLocalityThresholdNode();
rackLocalityThreshold = this.conf.getLocalityThresholdRack(); rackLocalityThreshold = this.conf.getLocalityThresholdRack();

View File

@ -32,6 +32,16 @@
@Private @Private
@Evolving @Evolving
public class FairSchedulerConfiguration extends Configuration { public class FairSchedulerConfiguration extends Configuration {
/** Increment request grant-able by the RM scheduler.
* These properties are looked up in the yarn-site.xml */
public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_MB =
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-mb";
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB = 1024;
public static final String RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES =
YarnConfiguration.YARN_PREFIX + "scheduler.increment-allocation-vcores";
public static final int DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES = 1;
public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml"; public static final String FS_CONFIGURATION_FILE = "fair-scheduler.xml";
private static final String CONF_PREFIX = "yarn.scheduler.fair."; private static final String CONF_PREFIX = "yarn.scheduler.fair.";
@ -102,6 +112,16 @@ public Resource getMaximumAllocation() {
return Resources.createResource(mem, cpu); return Resources.createResource(mem, cpu);
} }
public Resource getIncrementAllocation() {
int incrementMemory = getInt(
RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB);
int incrementCores = getInt(
RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES,
DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES);
return Resources.createResource(incrementMemory, incrementCores);
}
public boolean getUserAsDefaultQueue() { public boolean getUserAsDefaultQueue() {
return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE); return getBoolean(USER_AS_DEFAULT_QUEUE, DEFAULT_USER_AS_DEFAULT_QUEUE);
} }

View File

@ -62,6 +62,7 @@
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService; import org.apache.hadoop.yarn.server.resourcemanager.ApplicationMasterService;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources; import org.apache.hadoop.yarn.server.resourcemanager.resource.Resources;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
@ -81,6 +82,8 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.DominantResourceFairnessPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.policies.FifoPolicy;
import org.apache.hadoop.yarn.server.resourcemanager.security.ClientToAMTokenSecretManagerInRM;
import org.apache.hadoop.yarn.server.resourcemanager.security.RMContainerTokenSecretManager;
import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.server.utils.BuilderUtils;
import org.junit.After; import org.junit.After;
import org.junit.Before; import org.junit.Before;
@ -120,7 +123,9 @@ public void tick(int seconds) {
public void setUp() throws IOException { public void setUp() throws IOException {
scheduler = new FairScheduler(); scheduler = new FairScheduler();
Configuration conf = createConfiguration(); Configuration conf = createConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 10240);
// All tests assume only one assignment per node update // All tests assume only one assignment per node update
conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false"); conf.set(FairSchedulerConfiguration.ASSIGN_MULTIPLE, "false");
@ -281,6 +286,8 @@ public void testLoadConfigurationOnInitialize() throws IOException {
conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7); conf.setDouble(FairSchedulerConfiguration.LOCALITY_THRESHOLD_RACK, .7);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024); conf.setInt(YarnConfiguration.RM_SCHEDULER_MAXIMUM_ALLOCATION_MB, 1024);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512); conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 512);
conf.setInt(FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
128);
scheduler.reinitialize(conf, resourceManager.getRMContext()); scheduler.reinitialize(conf, resourceManager.getRMContext());
Assert.assertEquals(true, scheduler.assignMultiple); Assert.assertEquals(true, scheduler.assignMultiple);
Assert.assertEquals(3, scheduler.maxAssign); Assert.assertEquals(3, scheduler.maxAssign);
@ -289,6 +296,42 @@ public void testLoadConfigurationOnInitialize() throws IOException {
Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01); Assert.assertEquals(.7, scheduler.rackLocalityThreshold, .01);
Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory()); Assert.assertEquals(1024, scheduler.getMaximumResourceCapability().getMemory());
Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory()); Assert.assertEquals(512, scheduler.getMinimumResourceCapability().getMemory());
Assert.assertEquals(128,
scheduler.getIncrementResourceCapability().getMemory());
}
@Test
public void testNonMinZeroResourcesSettings() throws IOException {
FairScheduler fs = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 256);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 1);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.reinitialize(conf, null);
Assert.assertEquals(256, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(1, fs.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
}
@Test
public void testMinZeroResourcesSettings() throws IOException {
FairScheduler fs = new FairScheduler();
YarnConfiguration conf = new YarnConfiguration();
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_MB, 0);
conf.setInt(YarnConfiguration.RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, 0);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_MB, 512);
conf.setInt(
FairSchedulerConfiguration.RM_SCHEDULER_INCREMENT_ALLOCATION_VCORES, 2);
fs.reinitialize(conf, null);
Assert.assertEquals(0, fs.getMinimumResourceCapability().getMemory());
Assert.assertEquals(0, fs.getMinimumResourceCapability().getVirtualCores());
Assert.assertEquals(512, fs.getIncrementResourceCapability().getMemory());
Assert.assertEquals(2, fs.getIncrementResourceCapability().getVirtualCores());
} }
@Test @Test
@ -412,8 +455,8 @@ public void testSimpleContainerAllocation() {
NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1); NodeUpdateSchedulerEvent updateEvent = new NodeUpdateSchedulerEvent(node1);
scheduler.handle(updateEvent); scheduler.handle(updateEvent);
// Asked for less than min_allocation. // Asked for less than increment allocation.
assertEquals(YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, assertEquals(FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB,
scheduler.getQueueManager().getQueue("queue1"). scheduler.getQueueManager().getQueue("queue1").
getResourceUsage().getMemory()); getResourceUsage().getMemory());
@ -571,7 +614,8 @@ public void testQueueDemandCalculation() throws Exception {
ApplicationAttemptId id22 = createAppAttemptId(2, 2); ApplicationAttemptId id22 = createAppAttemptId(2, 2);
scheduler.addApplication(id22, "root.queue2", "user1"); scheduler.addApplication(id22, "root.queue2", "user1");
int minReqSize = YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB; int minReqSize =
FairSchedulerConfiguration.DEFAULT_RM_SCHEDULER_INCREMENT_ALLOCATION_MB;
// First ask, queue1 requests 1 large (minReqSize * 2). // First ask, queue1 requests 1 large (minReqSize * 2).
List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>(); List<ResourceRequest> ask1 = new ArrayList<ResourceRequest>();