YARN-10503. Support queue capacity in terms of absolute resources with custom
resourceType. Contributed by Qi Zhu.
This commit is contained in:
parent
bf66116407
commit
213d3deb26
@ -902,4 +902,19 @@ private static void validateResourceTypes(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static StringBuilder
|
||||
getCustomResourcesStrings(Resource resource) {
|
||||
StringBuilder res = new StringBuilder();
|
||||
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||
ResourceInformation[] resources =
|
||||
resource.getResources();
|
||||
for (int i = 2; i < resources.length; i++) {
|
||||
ResourceInformation resInfo = resources[i];
|
||||
res.append(","
|
||||
+ resInfo.getName() + "=" + resInfo.getValue());
|
||||
}
|
||||
}
|
||||
return res;
|
||||
}
|
||||
}
|
||||
|
@ -2350,11 +2350,14 @@ public void setAutoCreatedLeafQueueTemplateCapacityByLabel(String queuePath,
|
||||
getAutoCreatedQueueTemplateConfPrefix(queuePath);
|
||||
|
||||
StringBuilder resourceString = new StringBuilder();
|
||||
|
||||
resourceString
|
||||
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
|
||||
+ resource.getMemorySize() + ","
|
||||
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
|
||||
+ resource.getVirtualCores() + "]");
|
||||
+ resource.getVirtualCores()
|
||||
+ ResourceUtils.
|
||||
getCustomResourcesStrings(resource) + "]");
|
||||
|
||||
setCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
|
||||
}
|
||||
@ -2385,11 +2388,14 @@ public void setAutoCreatedLeafQueueTemplateMaxCapacity(String queuePath,
|
||||
queuePath);
|
||||
|
||||
StringBuilder resourceString = new StringBuilder();
|
||||
|
||||
resourceString
|
||||
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
|
||||
+ resource.getMemorySize() + ","
|
||||
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
|
||||
+ resource.getVirtualCores() + "]");
|
||||
+ resource.getVirtualCores()
|
||||
+ ResourceUtils.
|
||||
getCustomResourcesStrings(resource) + "]");
|
||||
|
||||
setMaximumCapacityByLabel(leafQueueConfPrefix, label, resourceString.toString());
|
||||
}
|
||||
@ -2489,11 +2495,14 @@ private void updateMinMaxResourceToConf(String label, String queue,
|
||||
}
|
||||
|
||||
StringBuilder resourceString = new StringBuilder();
|
||||
|
||||
resourceString
|
||||
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
|
||||
+ resource.getMemorySize() + ","
|
||||
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
|
||||
+ resource.getVirtualCores() + "]");
|
||||
+ resource.getVirtualCores()
|
||||
+ ResourceUtils.
|
||||
getCustomResourcesStrings(resource) + "]");
|
||||
|
||||
String prefix = getQueuePrefix(queue) + type;
|
||||
if (!label.isEmpty()) {
|
||||
@ -2567,8 +2576,12 @@ private Resource internalGetLabeledResourceRequirementForQueue(String queue,
|
||||
private void updateResourceValuesFromConfig(Set<String> resourceTypes,
|
||||
Resource resource, String[] splits) {
|
||||
|
||||
String resourceName = splits[0].trim();
|
||||
|
||||
// If key is not a valid type, skip it.
|
||||
if (!resourceTypes.contains(splits[0])) {
|
||||
if (!resourceTypes.contains(resourceName)
|
||||
&& !ResourceUtils.getResourceTypes().containsKey(resourceName)) {
|
||||
LOG.error(resourceName + " not supported.");
|
||||
return;
|
||||
}
|
||||
|
||||
@ -2581,9 +2594,17 @@ private void updateResourceValuesFromConfig(Set<String> resourceTypes,
|
||||
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
|
||||
}
|
||||
|
||||
// Custom resource type defined by user.
|
||||
// Such as GPU FPGA etc.
|
||||
if (!resourceTypes.contains(resourceName)) {
|
||||
resource.setResourceInformation(resourceName, ResourceInformation
|
||||
.newInstance(resourceName, units, resourceValue));
|
||||
return;
|
||||
}
|
||||
|
||||
// map it based on key.
|
||||
AbsoluteResourceType resType = AbsoluteResourceType
|
||||
.valueOf(StringUtils.toUpperCase(splits[0].trim()));
|
||||
.valueOf(StringUtils.toUpperCase(resourceName));
|
||||
switch (resType) {
|
||||
case MEMORY :
|
||||
resource.setMemorySize(resourceValue);
|
||||
@ -2592,8 +2613,8 @@ private void updateResourceValuesFromConfig(Set<String> resourceTypes,
|
||||
resource.setVirtualCores(resourceValue.intValue());
|
||||
break;
|
||||
default :
|
||||
resource.setResourceInformation(splits[0].trim(), ResourceInformation
|
||||
.newInstance(splits[0].trim(), units, resourceValue));
|
||||
resource.setResourceInformation(resourceName, ResourceInformation
|
||||
.newInstance(resourceName, units, resourceValue));
|
||||
break;
|
||||
}
|
||||
}
|
||||
|
@ -19,6 +19,7 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
@ -50,10 +51,16 @@
|
||||
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Set;
|
||||
import java.util.Map;
|
||||
import java.util.Arrays;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
|
||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
||||
import static org.junit.Assert.assertEquals;
|
||||
@ -248,4 +255,79 @@ public void testClusterMetricsWithGPU()
|
||||
.get(GPU_URI)).longValue(), 0);
|
||||
ClusterMetrics.destroy();
|
||||
}
|
||||
|
||||
/**
|
||||
* Test CS absolute conf with Custom resource type.
|
||||
* */
|
||||
@Test
|
||||
public void testCapacitySchedulerAbsoluteConfWithCustomResourceType()
|
||||
throws IOException {
|
||||
// reset resource types
|
||||
ResourceUtils.resetResourceTypes();
|
||||
String resourceTypesFileName = "resource-types-test.xml";
|
||||
File source = new File(
|
||||
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
|
||||
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
|
||||
FileUtils.copyFile(source, resourceTypesFile);
|
||||
|
||||
CapacitySchedulerConfiguration newConf =
|
||||
new CapacitySchedulerConfiguration(conf);
|
||||
|
||||
// Only memory vcores for first class.
|
||||
Set<String> resourceTypes = Arrays.
|
||||
stream(CapacitySchedulerConfiguration.
|
||||
AbsoluteResourceType.values()).
|
||||
map(value -> value.toString().toLowerCase()).
|
||||
collect(Collectors.toSet());
|
||||
|
||||
Map<String, Long> valuesMin = Maps.newHashMap();
|
||||
valuesMin.put(GPU_URI, 10L);
|
||||
valuesMin.put(FPGA_URI, 10L);
|
||||
valuesMin.put("testType", 10L);
|
||||
|
||||
Map<String, Long> valuesMax = Maps.newHashMap();
|
||||
valuesMax.put(GPU_URI, 100L);
|
||||
valuesMax.put(FPGA_URI, 100L);
|
||||
valuesMax.put("testType", 100L);
|
||||
|
||||
Resource aMINRES =
|
||||
Resource.newInstance(1000, 10, valuesMin);
|
||||
|
||||
Resource aMAXRES =
|
||||
Resource.newInstance(1000, 10, valuesMax);
|
||||
|
||||
// Define top-level queues
|
||||
newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||
new String[] {"a", "b", "c"});
|
||||
newConf.setMinimumResourceRequirement("", "root.a",
|
||||
aMINRES);
|
||||
newConf.setMaximumResourceRequirement("", "root.a",
|
||||
aMAXRES);
|
||||
|
||||
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||
|
||||
//start RM
|
||||
MockRM rm = new MockRM(newConf);
|
||||
rm.start();
|
||||
|
||||
// Check the gpu resource conf is right.
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
Assert.assertEquals(aMINRES,
|
||||
cs.getConfiguration().
|
||||
getMinimumResourceRequirement("", "root.a", resourceTypes));
|
||||
Assert.assertEquals(aMAXRES,
|
||||
cs.getConfiguration().
|
||||
getMaximumResourceRequirement("", "root.a", resourceTypes));
|
||||
|
||||
// Check the gpu resource of queue is right.
|
||||
Assert.assertEquals(aMINRES, cs.getQueue("root.a").
|
||||
getQueueResourceQuotas().getConfiguredMinResource());
|
||||
Assert.assertEquals(aMAXRES, cs.getQueue("root.a").
|
||||
getQueueResourceQuotas().getConfiguredMaxResource());
|
||||
|
||||
rm.close();
|
||||
|
||||
}
|
||||
|
||||
}
|
||||
|
@ -17,6 +17,6 @@
|
||||
<configuration>
|
||||
<property>
|
||||
<name>yarn.resource-types</name>
|
||||
<value>yarn.io/gpu, yarn.io/fpga</value>
|
||||
<value>yarn.io/gpu, yarn.io/fpga, testType</value>
|
||||
</property>
|
||||
</configuration>
|
Loading…
Reference in New Issue
Block a user