YARN-9116. Capacity Scheduler: implements queue level maximum-allocation inheritance. Contributed by Aihua Xu.

This commit is contained in:
Weiwei Yang 2019-01-24 17:20:52 +08:00
parent c726445990
commit a4bd64e724
12 changed files with 483 additions and 260 deletions

View File

@ -20,14 +20,18 @@
import com.google.common.annotations.VisibleForTesting;
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.conf.ConfigurationProvider;
import org.apache.hadoop.yarn.conf.ConfigurationProviderFactory;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
@ -71,6 +75,8 @@ public class ResourceUtils {
"^(((\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?\\.)*"
+ "\\p{Alnum}([\\p{Alnum}-]*\\p{Alnum})?)/)?\\p{Alpha}([\\w.-]*)$");
private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
private static volatile boolean initializedResources = false;
private static final Map<String, Integer> RESOURCE_NAME_TO_INDEX =
new ConcurrentHashMap<String, Integer>();
@ -776,4 +782,108 @@ public static ResourceInformation[] createResourceTypesArray(Map<String,
return info;
}
/**
* Return a new {@link Resource} instance with all resource values
* initialized to {@code value}.
* @param value the value to use for all resources
* @return a new {@link Resource} instance
*/
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource createResourceWithSameValue(long value) {
LightWeightResource res = new LightWeightResource(value,
Long.valueOf(value).intValue());
int numberOfResources = getNumberOfKnownResourceTypes();
for (int i = 2; i < numberOfResources; i++) {
res.setResourceValue(i, value);
}
return res;
}
@InterfaceAudience.Private
@InterfaceStability.Unstable
public static Resource createResourceFromString(
String resourceStr,
List<ResourceTypeInfo> resourceTypeInfos) {
Map<String, Long> typeToValue = parseResourcesString(resourceStr);
validateResourceTypes(typeToValue.keySet(), resourceTypeInfos);
Resource resource = Resource.newInstance(0, 0);
for (Entry<String, Long> entry : typeToValue.entrySet()) {
resource.setResourceValue(entry.getKey(), entry.getValue());
}
return resource;
}
private static Map<String, Long> parseResourcesString(String resourcesStr) {
Map<String, Long> resources = new HashMap<>();
String[] pairs = resourcesStr.trim().split(",");
for (String resource : pairs) {
resource = resource.trim();
if (!resource.matches(RES_PATTERN)) {
throw new IllegalArgumentException("\"" + resource + "\" is not a "
+ "valid resource type/amount pair. "
+ "Please provide key=amount pairs separated by commas.");
}
String[] splits = resource.split("=");
String key = splits[0], value = splits[1];
String units = getUnits(value);
String valueWithoutUnit = value.substring(0,
value.length()- units.length()).trim();
long resourceValue = Long.parseLong(valueWithoutUnit);
// Convert commandline unit to standard YARN unit.
if (units.equals("M") || units.equals("m")) {
units = "Mi";
} else if (units.equals("G") || units.equals("g")) {
units = "Gi";
} else if (units.isEmpty()) {
// do nothing;
} else {
throw new IllegalArgumentException("Acceptable units are M/G or empty");
}
// special handle memory-mb and memory
if (key.equals(ResourceInformation.MEMORY_URI)) {
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
}
if (key.equals("memory")) {
key = ResourceInformation.MEMORY_URI;
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
// special handle gpu
if (key.equals("gpu")) {
key = ResourceInformation.GPU_URI;
}
// special handle fpga
if (key.equals("fpga")) {
key = ResourceInformation.FPGA_URI;
}
resources.put(key, resourceValue);
}
return resources;
}
private static void validateResourceTypes(
Iterable<String> resourceNames,
List<ResourceTypeInfo> resourceTypeInfos)
throws ResourceNotFoundException {
for (String resourceName : resourceNames) {
if (!resourceTypeInfos.stream().anyMatch(
e -> e.getName().equals(resourceName))) {
throw new ResourceNotFoundException(
"Unknown resource: " + resourceName);
}
}
}
}

View File

@ -16,23 +16,15 @@
import org.apache.commons.lang3.StringUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.exception.SubmarineRuntimeException;
import org.apache.hadoop.yarn.submarine.common.fs.RemoteDirectoryManager;
import org.apache.hadoop.yarn.util.UnitsConversionUtil;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.io.File;
import java.io.IOException;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import static org.apache.hadoop.yarn.submarine.client.cli.CliConstants.KEYTAB;
@ -41,7 +33,6 @@
public class CliUtils {
private static final Logger LOG =
LoggerFactory.getLogger(CliUtils.class);
private final static String RES_PATTERN = "^[^=]+=\\d+\\s?\\w*$";
/**
* Replace patterns inside cli
*
@ -74,86 +65,6 @@ public static String replacePatternsInLaunchCommand(String specifiedCli,
return newCli;
}
private static Map<String, Long> parseResourcesString(String resourcesStr) {
Map<String, Long> resources = new HashMap<>();
String[] pairs = resourcesStr.trim().split(",");
for (String resource : pairs) {
resource = resource.trim();
if (!resource.matches(RES_PATTERN)) {
throw new IllegalArgumentException("\"" + resource + "\" is not a "
+ "valid resource type/amount pair. "
+ "Please provide key=amount pairs separated by commas.");
}
String[] splits = resource.split("=");
String key = splits[0], value = splits[1];
String units = ResourceUtils.getUnits(value);
String valueWithoutUnit = value.substring(0,
value.length()- units.length()).trim();
long resourceValue = Long.parseLong(valueWithoutUnit);
// Convert commandline unit to standard YARN unit.
if (units.equals("M") || units.equals("m")) {
units = "Mi";
} else if (units.equals("G") || units.equals("g")) {
units = "Gi";
} else if (units.isEmpty()) {
// do nothing;
} else {
throw new IllegalArgumentException("Acceptable units are M/G or empty");
}
// special handle memory-mb and memory
if (key.equals(ResourceInformation.MEMORY_URI)) {
if (!units.isEmpty()) {
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
}
if (key.equals("memory")) {
key = ResourceInformation.MEMORY_URI;
resourceValue = UnitsConversionUtil.convert(units, "Mi",
resourceValue);
}
// special handle gpu
if (key.equals("gpu")) {
key = ResourceInformation.GPU_URI;
}
// special handle fpga
if (key.equals("fpga")) {
key = ResourceInformation.FPGA_URI;
}
resources.put(key, resourceValue);
}
return resources;
}
private static void validateResourceTypes(Iterable<String> resourceNames,
List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
for (String resourceName : resourceNames) {
if (!resourceTypes.stream().anyMatch(
e -> e.getName().equals(resourceName))) {
throw new ResourceNotFoundException(
"Unknown resource: " + resourceName);
}
}
}
public static Resource createResourceFromString(String resourceStr,
List<ResourceTypeInfo> resourceTypes) throws IOException, YarnException {
Map<String, Long> typeToValue = parseResourcesString(resourceStr);
validateResourceTypes(typeToValue.keySet(), resourceTypes);
Resource resource = Resource.newInstance(0, 0);
for (Map.Entry<String, Long> entry : typeToValue.entrySet()) {
resource.setResourceValue(entry.getKey(), entry.getValue());
}
return resource;
}
// Is it for help?
public static boolean argsForHelp(String[] args) {
if (args == null || args.length == 0)

View File

@ -22,6 +22,7 @@
import org.apache.hadoop.yarn.submarine.client.cli.CliConstants;
import org.apache.hadoop.yarn.submarine.client.cli.CliUtils;
import org.apache.hadoop.yarn.submarine.common.ClientContext;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import java.io.IOException;
import java.util.ArrayList;
@ -104,7 +105,7 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
throw new ParseException(
"--" + CliConstants.WORKER_RES + " is absent.");
}
workerResource = CliUtils.createResourceFromString(
workerResource = ResourceUtils.createResourceFromString(
workerResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
}
@ -115,7 +116,7 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
if (psResourceStr == null) {
throw new ParseException("--" + CliConstants.PS_RES + " is absent.");
}
psResource = CliUtils.createResourceFromString(psResourceStr,
psResource = ResourceUtils.createResourceFromString(psResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
}
@ -127,7 +128,7 @@ public void updateParametersByParsedCommandline(CommandLine parsedCommandLine,
if (tensorboardResourceStr == null || tensorboardResourceStr.isEmpty()) {
tensorboardResourceStr = CliConstants.TENSORBOARD_DEFAULT_RESOURCES;
}
tensorboardResource = CliUtils.createResourceFromString(
tensorboardResource = ResourceUtils.createResourceFromString(
tensorboardResourceStr,
clientContext.getOrCreateYarnClient().getResourceTypeInfo());
tensorboardDockerImage = parsedCommandLine.getOptionValue(

View File

@ -21,9 +21,6 @@
import org.apache.commons.cli.ParseException;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.submarine.client.cli.param.RunJobParameters;
import org.apache.hadoop.yarn.submarine.common.MockClientContext;
@ -32,15 +29,12 @@
import org.apache.hadoop.yarn.submarine.runtimes.common.JobMonitor;
import org.apache.hadoop.yarn.submarine.runtimes.common.JobSubmitter;
import org.apache.hadoop.yarn.submarine.runtimes.common.SubmarineStorage;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import static org.mockito.Matchers.any;
import static org.mockito.Mockito.mock;
@ -204,72 +198,4 @@ public void testLaunchCommandPatternReplace() throws Exception {
"python run-ps.py --input=hdfs://input --model_dir=hdfs://output/model",
runJobCli.getRunJobParameters().getPSLaunchCmd());
}
@Test
public void testResourceUnitParsing() throws Exception {
Resource res = CliUtils.createResourceFromString("memory=20g,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
res = CliUtils.createResourceFromString("memory=20G,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
res = CliUtils.createResourceFromString("memory=20M,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = CliUtils.createResourceFromString("memory=20m,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = CliUtils.createResourceFromString("memory-mb=20,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = CliUtils.createResourceFromString("memory-mb=20m,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = CliUtils.createResourceFromString("memory-mb=20G,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
// W/o unit for memory means bits, and 20 bits will be rounded to 0
res = CliUtils.createResourceFromString("memory=20,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(0, 3), res);
// Test multiple resources
List<ResourceTypeInfo> resTypes = new ArrayList<>(
ResourceUtils.getResourcesTypeInfo());
resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, ""));
ResourceUtils.reinitializeResources(resTypes);
res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = CliUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
res = CliUtils.createResourceFromString("memory=2G,vcores=3",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=0",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = CliUtils.createResourceFromString("memory=2G,vcores=3,yarn.io/gpu=3",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
// TODO, add more negative tests.
}
}

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.impl.LightWeightResource;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
/**
@ -39,25 +38,6 @@ public class Resources {
private static final Log LOG =
LogFactory.getLog(Resources.class);
/**
* Return a new {@link Resource} instance with all resource values
* initialized to {@code value}.
* @param value the value to use for all resources
* @return a new {@link Resource} instance
*/
@Private
@Unstable
public static Resource createResourceWithSameValue(long value) {
LightWeightResource res = new LightWeightResource(value,
Long.valueOf(value).intValue());
int numberOfResources = ResourceUtils.getNumberOfKnownResourceTypes();
for (int i = 2; i < numberOfResources; i++) {
res.setResourceValue(i, value);
}
return res;
}
/**
* Helper class to create a resource with a fixed value for all resource
* types. For example, a NONE resource which returns 0 for any resource type.

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.api.records.ResourceTypeInfo;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.junit.After;
@ -31,7 +32,9 @@
import org.junit.Test;
import java.io.File;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
/**
@ -391,6 +394,74 @@ public void testGetResourceInformationWithDiffUnits() throws Exception {
}
}
@Test
public void testResourceUnitParsing() throws Exception {
Resource res = ResourceUtils.createResourceFromString("memory=20g,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
res = ResourceUtils.createResourceFromString("memory=20G,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
res = ResourceUtils.createResourceFromString("memory=20M,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = ResourceUtils.createResourceFromString("memory=20m,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = ResourceUtils.createResourceFromString("memory-mb=20,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = ResourceUtils.createResourceFromString("memory-mb=20m,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20, 3), res);
res = ResourceUtils.createResourceFromString("memory-mb=20G,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(20 * 1024, 3), res);
// W/o unit for memory means bits, and 20 bits will be rounded to 0
res = ResourceUtils.createResourceFromString("memory=20,vcores=3",
ResourceUtils.getResourcesTypeInfo());
Assert.assertEquals(Resources.createResource(0, 3), res);
// Test multiple resources
List<ResourceTypeInfo> resTypes = new ArrayList<>(
ResourceUtils.getResourcesTypeInfo());
resTypes.add(ResourceTypeInfo.newInstance(ResourceInformation.GPU_URI, ""));
ResourceUtils.reinitializeResources(resTypes);
res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=0",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = ResourceUtils.createResourceFromString("memory=2G,vcores=3,gpu=3",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
res = ResourceUtils.createResourceFromString("memory=2G,vcores=3",
resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = ResourceUtils.createResourceFromString(
"memory=2G,vcores=3,yarn.io/gpu=0", resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(0, res.getResourceValue(ResourceInformation.GPU_URI));
res = ResourceUtils.createResourceFromString(
"memory=2G,vcores=3,yarn.io/gpu=3", resTypes);
Assert.assertEquals(2 * 1024, res.getMemorySize());
Assert.assertEquals(3, res.getResourceValue(ResourceInformation.GPU_URI));
// TODO, add more negative tests.
}
public static String setupResourceTypes(Configuration conf, String filename)
throws Exception {
File source = new File(

View File

@ -269,7 +269,7 @@ public void testCreateResourceWithSameLongValue() throws Exception {
unsetExtraResourceType();
setupExtraResourceType();
Resource res = Resources.createResourceWithSameValue(11L);
Resource res = ResourceUtils.createResourceWithSameValue(11L);
assertEquals(11L, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
assertEquals(11L, res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue());
@ -280,7 +280,7 @@ public void testCreateResourceWithSameIntValue() throws Exception {
unsetExtraResourceType();
setupExtraResourceType();
Resource res = Resources.createResourceWithSameValue(11);
Resource res = ResourceUtils.createResourceWithSameValue(11);
assertEquals(11, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
assertEquals(11, res.getResourceInformation(EXTRA_RESOURCE_TYPE).getValue());
@ -288,14 +288,14 @@ public void testCreateResourceWithSameIntValue() throws Exception {
@Test
public void testCreateSimpleResourceWithSameLongValue() {
Resource res = Resources.createResourceWithSameValue(11L);
Resource res = ResourceUtils.createResourceWithSameValue(11L);
assertEquals(11L, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
}
@Test
public void testCreateSimpleResourceWithSameIntValue() {
Resource res = Resources.createResourceWithSameValue(11);
Resource res = ResourceUtils.createResourceWithSameValue(11);
assertEquals(11, res.getMemorySize());
assertEquals(11, res.getVirtualCores());
}

View File

@ -44,6 +44,7 @@
import org.apache.hadoop.yarn.api.records.QueueState;
import org.apache.hadoop.yarn.api.records.QueueStatistics;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnException;
import org.apache.hadoop.yarn.factories.RecordFactory;
@ -69,10 +70,13 @@
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SimpleCandidateNodeSet;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
import com.google.common.collect.Sets;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.UNDEFINED;
public abstract class AbstractCSQueue implements CSQueue {
private static final Log LOG = LogFactory.getLog(AbstractCSQueue.class);
@ -361,9 +365,9 @@ protected void setupQueueConfigs(Resource clusterResource,
capacityConfigType = CapacityConfigType.NONE;
updateConfigurableResourceRequirement(getQueuePath(), clusterResource);
this.maximumAllocation =
configuration.getMaximumAllocationPerQueue(
getQueuePath());
// Setup queue's maximumAllocation respecting the global setting
// and queue setting
setupMaximumAllocation(configuration);
// initialized the queue state based on previous state, configured state
// and its parent state.
@ -425,6 +429,51 @@ protected void setupQueueConfigs(Resource clusterResource,
}
}
private void setupMaximumAllocation(CapacitySchedulerConfiguration csConf) {
String queue = getQueuePath();
Resource clusterMax = ResourceUtils
.fetchMaximumAllocationFromConfig(csConf);
Resource queueMax = csConf.getQueueMaximumAllocation(queue);
maximumAllocation = Resources.clone(
parent == null ? clusterMax : parent.getMaximumAllocation());
String errMsg =
"Queue maximum allocation cannot be larger than the cluster setting"
+ " for queue " + queue
+ " max allocation per queue: %s"
+ " cluster setting: " + clusterMax;
if (queueMax == Resources.none()) {
// Handle backward compatibility
long queueMemory = csConf.getQueueMaximumAllocationMb(queue);
int queueVcores = csConf.getQueueMaximumAllocationVcores(queue);
if (queueMemory != UNDEFINED) {
maximumAllocation.setMemorySize(queueMemory);
}
if (queueVcores != UNDEFINED) {
maximumAllocation.setVirtualCores(queueVcores);
}
if ((queueMemory != UNDEFINED && queueMemory > clusterMax.getMemorySize()
|| (queueVcores != UNDEFINED
&& queueVcores > clusterMax.getVirtualCores()))) {
throw new IllegalArgumentException(
String.format(errMsg, maximumAllocation));
}
} else {
// Queue level maximum-allocation can't be larger than cluster setting
for (ResourceInformation ri : queueMax.getResources()) {
if (ri.compareTo(clusterMax.getResourceInformation(ri.getName())) > 0) {
throw new IllegalArgumentException(String.format(errMsg, queueMax));
}
maximumAllocation.setResourceInformation(ri.getName(), ri);
}
}
}
private Map<String, Float> getUserWeightsFromHierarchy
(CapacitySchedulerConfiguration configuration) throws
IOException {

View File

@ -86,6 +86,10 @@ public interface CSQueue extends SchedulerQueue<CSQueue> {
public PrivilegedEntity getPrivilegedEntity();
Resource getMaximumAllocation();
Resource getMinimumAllocation();
/**
* Get the configured <em>capacity</em> of the queue.
* @return configured queue capacity

View File

@ -19,6 +19,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
@ -140,13 +141,15 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
@Private
public static final boolean DEFAULT_RESERVE_CONT_LOOK_ALL_NODES = true;
@Private
public static final String MAXIMUM_ALLOCATION = "maximum-allocation";
@Private
public static final String MAXIMUM_ALLOCATION_MB = "maximum-allocation-mb";
@Private
public static final String MAXIMUM_ALLOCATION_VCORES =
"maximum-allocation-vcores";
"maximum-allocation-vcores";
/**
* Ordering policy of queues
*/
@ -889,50 +892,32 @@ public void setQueuePriority(String queue, int priority) {
}
/**
* Get the per queue setting for the maximum limit to allocate to
* each container request.
* Get maximum_allocation setting for the specified queue from the
* configuration.
*
* @param queue
* name of the queue
* @return setting specified per queue else falls back to the cluster setting
* @return Resource object or Resource.none if not set
*/
public Resource getMaximumAllocationPerQueue(String queue) {
// Only support to specify memory and vcores maximum allocation per queue
// for now.
public Resource getQueueMaximumAllocation(String queue) {
String queuePrefix = getQueuePrefix(queue);
long maxAllocationMbPerQueue = getInt(queuePrefix + MAXIMUM_ALLOCATION_MB,
(int)UNDEFINED);
int maxAllocationVcoresPerQueue = getInt(
queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
if (LOG.isDebugEnabled()) {
LOG.debug("max alloc mb per queue for " + queue + " is "
+ maxAllocationMbPerQueue);
LOG.debug("max alloc vcores per queue for " + queue + " is "
+ maxAllocationVcoresPerQueue);
String rawQueueMaxAllocation = get(queuePrefix + MAXIMUM_ALLOCATION, null);
if (Strings.isNullOrEmpty(rawQueueMaxAllocation)) {
return Resources.none();
} else {
return ResourceUtils.createResourceFromString(rawQueueMaxAllocation,
ResourceUtils.getResourcesTypeInfo());
}
Resource clusterMax = ResourceUtils.fetchMaximumAllocationFromConfig(this);
if (maxAllocationMbPerQueue == (int)UNDEFINED) {
LOG.info("max alloc mb per queue for " + queue + " is undefined");
maxAllocationMbPerQueue = clusterMax.getMemorySize();
}
if (maxAllocationVcoresPerQueue == (int)UNDEFINED) {
LOG.info("max alloc vcore per queue for " + queue + " is undefined");
maxAllocationVcoresPerQueue = clusterMax.getVirtualCores();
}
// Copy from clusterMax and overwrite per-queue's maximum memory/vcore
// allocation.
Resource result = Resources.clone(clusterMax);
result.setMemorySize(maxAllocationMbPerQueue);
result.setVirtualCores(maxAllocationVcoresPerQueue);
if (maxAllocationMbPerQueue > clusterMax.getMemorySize()
|| maxAllocationVcoresPerQueue > clusterMax.getVirtualCores()) {
throw new IllegalArgumentException(
"Queue maximum allocation cannot be larger than the cluster setting"
+ " for queue " + queue
+ " max allocation per queue: " + result
+ " cluster setting: " + clusterMax);
}
return result;
}
public long getQueueMaximumAllocationMb(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
}
public int getQueueMaximumAllocationVcores(String queue) {
String queuePrefix = getQueuePrefix(queue);
return getInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, (int)UNDEFINED);
}
public boolean getEnableUserMetrics() {

View File

@ -25,7 +25,6 @@
import org.apache.hadoop.yarn.api.records.ResourceInformation;
import org.apache.hadoop.yarn.exceptions.ResourceNotFoundException;
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
import org.apache.hadoop.yarn.util.resource.Resources;
/**
* A {@code ConfigurableResource} object represents an entity that is used to
@ -53,7 +52,7 @@ public class ConfigurableResource {
* @param value the value to use for all resources
*/
ConfigurableResource(long value) {
this(Resources.createResourceWithSameValue(value));
this(ResourceUtils.createResourceWithSameValue(value));
}
public ConfigurableResource(Resource resource) {

View File

@ -18,8 +18,10 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_VCORES;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
@ -2996,8 +2998,8 @@ public void testRefreshQueuesMaxAllocationRefresh() throws Exception {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max allocation for A1",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
conf.getMaximumAllocationPerQueue(A1).getMemorySize());
Resources.none(),
conf.getQueueMaximumAllocation(A1));
assertEquals("max allocation",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@ -3087,6 +3089,10 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueA1 = findQueue(queueA, A1);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
@ -3095,10 +3101,10 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
4096,
conf.getMaximumAllocationPerQueue(A1).getMemorySize());
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("cluster max allocation MB",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@ -3106,11 +3112,8 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueA1 = findQueue(queueA, A1);
assertEquals("queue max allocation", 4096, ((LeafQueue) queueA1)
.getMaximumAllocation().getMemorySize());
assertEquals("queue max allocation", 4096,
queueA1.getMaximumAllocation().getMemorySize());
setMaxAllocMb(conf, A1, 6144);
setMaxAllocVcores(conf, A1, 3);
@ -3118,9 +3121,9 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
// conf will have changed but we shouldn't be able to change max allocation
// for the actual queue
assertEquals("max allocation MB A1", 6144,
conf.getMaximumAllocationPerQueue(A1).getMemorySize());
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1", 3,
conf.getMaximumAllocationPerQueue(A1).getVirtualCores());
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getMemorySize());
@ -3128,9 +3131,9 @@ public void testRefreshQueuesMaxAllocationRefreshLarger() throws Exception {
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
ResourceUtils.fetchMaximumAllocationFromConfig(conf).getVirtualCores());
assertEquals("queue max allocation MB", 6144,
((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue max allocation vcores", 3,
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max capability MB cluster",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
@ -3216,17 +3219,17 @@ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
CSQueue queueB2 = findQueue(queueB, B2);
assertEquals("queue A1 max allocation MB", 4096,
((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max allocation vcores", 4,
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max allocation MB", 10240,
((LeafQueue) queueA2).getMaximumAllocation().getMemorySize());
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max allocation vcores", 10,
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max allocation MB", 10240,
((LeafQueue) queueB2).getMaximumAllocation().getMemorySize());
queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max allocation vcores", 10,
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
queueB2.getMaximumAllocation().getVirtualCores());
setMaxAllocMb(conf, 12288);
setMaxAllocVcores(conf, 12);
@ -3238,17 +3241,187 @@ public void testRefreshQueuesMaxAllocationCSLarger() throws Exception {
assertEquals("max allocation vcores in CS", 12,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("queue A1 max MB allocation", 4096,
((LeafQueue) queueA1).getMaximumAllocation().getMemorySize());
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("queue A1 max vcores allocation", 4,
((LeafQueue) queueA1).getMaximumAllocation().getVirtualCores());
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("queue A2 max MB allocation", 12288,
((LeafQueue) queueA2).getMaximumAllocation().getMemorySize());
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("queue A2 max vcores allocation", 12,
((LeafQueue) queueA2).getMaximumAllocation().getVirtualCores());
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("queue B2 max MB allocation", 12288,
((LeafQueue) queueB2).getMaximumAllocation().getMemorySize());
queueB2.getMaximumAllocation().getMemorySize());
assertEquals("queue B2 max vcores allocation", 12,
((LeafQueue) queueB2).getMaximumAllocation().getVirtualCores());
queueB2.getMaximumAllocation().getVirtualCores());
}
@Test
public void testQueuesMaxAllocationInheritance() throws Exception {
// queue level max allocation is set by the queue configuration explicitly
// or inherits from the parent.
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
setMaxAllocVcores(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
// Test the child queue overrides
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=4096,vcores=2");
setMaxAllocation(conf, A1, "memory-mb=6144,vcores=2");
setMaxAllocation(conf, B, "memory-mb=5120, vcores=2");
setMaxAllocation(conf, B2, "memory-mb=1024, vcores=2");
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
CSQueue rootQueue = cs.getRootQueue();
CSQueue queueA = findQueue(rootQueue, A);
CSQueue queueB = findQueue(rootQueue, B);
CSQueue queueA1 = findQueue(queueA, A1);
CSQueue queueA2 = findQueue(queueA, A2);
CSQueue queueB1 = findQueue(queueB, B1);
CSQueue queueB2 = findQueue(queueB, B2);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
6144,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB A2", 4096,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A2",
2,
queueA2.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB B", 5120,
queueB.getMaximumAllocation().getMemorySize());
assertEquals("max allocation MB B1", 5120,
queueB1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation MB B2", 1024,
queueB2.getMaximumAllocation().getMemorySize());
// Test get the max-allocation from different parent
unsetMaxAllocation(conf, A1);
unsetMaxAllocation(conf, B);
unsetMaxAllocation(conf, B1);
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=6144,vcores=2");
setMaxAllocation(conf, A, "memory-mb=8192,vcores=2");
cs.reinitialize(conf, mockContext);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
8192,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
2,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB B1",
6144,
queueB1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores B1",
2,
queueB1.getMaximumAllocation().getVirtualCores());
// Test the default
unsetMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT);
unsetMaxAllocation(conf, A);
unsetMaxAllocation(conf, A1);
cs.reinitialize(conf, mockContext);
assertEquals("max capability MB in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
cs.getMaximumResourceCapability().getMemorySize());
assertEquals("max capability vcores in CS",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
cs.getMaximumResourceCapability().getVirtualCores());
assertEquals("max allocation MB A1",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
queueA1.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A1",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
queueA1.getMaximumAllocation().getVirtualCores());
assertEquals("max allocation MB A2",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB,
queueA2.getMaximumAllocation().getMemorySize());
assertEquals("max allocation vcores A2",
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES,
queueA2.getMaximumAllocation().getVirtualCores());
}
@Test
public void testVerifyQueuesMaxAllocationConf() throws Exception {
// queue level max allocation can't exceed the cluster setting
CapacityScheduler cs = new CapacityScheduler();
cs.setConf(new YarnConfiguration());
cs.setRMContext(resourceManager.getRMContext());
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
setupQueueConfiguration(conf);
setMaxAllocMb(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB);
setMaxAllocVcores(conf,
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES);
long largerMem =
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB + 1024;
long largerVcores =
YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES+10;
cs.init(conf);
cs.start();
cs.reinitialize(conf, mockContext);
checkQueueCapacities(cs, A_CAPACITY, B_CAPACITY);
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=" + largerMem + ",vcores=2");
try {
cs.reinitialize(conf, mockContext);
fail("Queue Root maximum allocation can't exceed the cluster setting");
} catch(Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
setMaxAllocation(conf, CapacitySchedulerConfiguration.ROOT,
"memory-mb=4096,vcores=2");
setMaxAllocation(conf, A, "memory-mb=6144,vcores=2");
setMaxAllocation(conf, A1, "memory-mb=" + largerMem + ",vcores=2");
try {
cs.reinitialize(conf, mockContext);
fail("Queue A1 maximum allocation can't exceed the cluster setting");
} catch(Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
setMaxAllocation(conf, A1, "memory-mb=8192" + ",vcores=" + largerVcores);
try {
cs.reinitialize(conf, mockContext);
fail("Queue A1 maximum allocation can't exceed the cluster setting");
} catch(Exception e) {
assertTrue("maximum allocation exception",
e.getCause().getMessage().contains("maximum allocation"));
}
}
private void waitContainerAllocated(MockAM am, int mem, int nContainer,
@ -4103,6 +4276,20 @@ private void setMaxAllocVcores(CapacitySchedulerConfiguration conf,
conf.setInt(propName, maxAllocVcores);
}
private void setMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName, String maxAllocation) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.set(propName, maxAllocation);
}
private void unsetMaxAllocation(CapacitySchedulerConfiguration conf,
String queueName) {
String propName = CapacitySchedulerConfiguration.getQueuePrefix(queueName)
+ MAXIMUM_ALLOCATION;
conf.unset(propName);
}
private void sentRMContainerLaunched(MockRM rm, ContainerId containerId) {
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
RMContainer rmContainer = cs.getRMContainer(containerId);