YARN-10503. Support queue capacity in terms of absolute resources with custom
resourceType. Contributed by Qi Zhu.
This commit is contained in:
parent
0e6cd352b5
commit
1658a5140a
@ -901,4 +901,19 @@ public class ResourceUtils {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static StringBuilder
|
||||||
|
getCustomResourcesStrings(Resource resource) {
|
||||||
|
StringBuilder res = new StringBuilder();
|
||||||
|
if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) {
|
||||||
|
ResourceInformation[] resources =
|
||||||
|
resource.getResources();
|
||||||
|
for (int i = 2; i < resources.length; i++) {
|
||||||
|
ResourceInformation resInfo = resources[i];
|
||||||
|
res.append(","
|
||||||
|
+ resInfo.getName() + "=" + resInfo.getValue());
|
||||||
|
}
|
||||||
|
}
|
||||||
|
return res;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -2127,11 +2127,14 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
}
|
}
|
||||||
|
|
||||||
StringBuilder resourceString = new StringBuilder();
|
StringBuilder resourceString = new StringBuilder();
|
||||||
|
|
||||||
resourceString
|
resourceString
|
||||||
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
|
.append("[" + AbsoluteResourceType.MEMORY.toString().toLowerCase() + "="
|
||||||
+ resource.getMemorySize() + ","
|
+ resource.getMemorySize() + ","
|
||||||
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
|
+ AbsoluteResourceType.VCORES.toString().toLowerCase() + "="
|
||||||
+ resource.getVirtualCores() + "]");
|
+ resource.getVirtualCores()
|
||||||
|
+ ResourceUtils.
|
||||||
|
getCustomResourcesStrings(resource) + "]");
|
||||||
|
|
||||||
String prefix = getQueuePrefix(queue) + type;
|
String prefix = getQueuePrefix(queue) + type;
|
||||||
if (!label.isEmpty()) {
|
if (!label.isEmpty()) {
|
||||||
@ -2191,8 +2194,12 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
private void updateResourceValuesFromConfig(Set<String> resourceTypes,
|
private void updateResourceValuesFromConfig(Set<String> resourceTypes,
|
||||||
Resource resource, String[] splits) {
|
Resource resource, String[] splits) {
|
||||||
|
|
||||||
|
String resourceName = splits[0].trim();
|
||||||
|
|
||||||
// If key is not a valid type, skip it.
|
// If key is not a valid type, skip it.
|
||||||
if (!resourceTypes.contains(splits[0])) {
|
if (!resourceTypes.contains(resourceName)
|
||||||
|
&& !ResourceUtils.getResourceTypes().containsKey(resourceName)) {
|
||||||
|
LOG.error(resourceName + " not supported.");
|
||||||
return;
|
return;
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -2205,9 +2212,17 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
|
resourceValue = UnitsConversionUtil.convert(units, "Mi", resourceValue);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Custom resource type defined by user.
|
||||||
|
// Such as GPU FPGA etc.
|
||||||
|
if (!resourceTypes.contains(resourceName)) {
|
||||||
|
resource.setResourceInformation(resourceName, ResourceInformation
|
||||||
|
.newInstance(resourceName, units, resourceValue));
|
||||||
|
return;
|
||||||
|
}
|
||||||
|
|
||||||
// map it based on key.
|
// map it based on key.
|
||||||
AbsoluteResourceType resType = AbsoluteResourceType
|
AbsoluteResourceType resType = AbsoluteResourceType
|
||||||
.valueOf(StringUtils.toUpperCase(splits[0].trim()));
|
.valueOf(StringUtils.toUpperCase(resourceName));
|
||||||
switch (resType) {
|
switch (resType) {
|
||||||
case MEMORY :
|
case MEMORY :
|
||||||
resource.setMemorySize(resourceValue);
|
resource.setMemorySize(resourceValue);
|
||||||
@ -2216,8 +2231,8 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
resource.setVirtualCores(resourceValue.intValue());
|
resource.setVirtualCores(resourceValue.intValue());
|
||||||
break;
|
break;
|
||||||
default :
|
default :
|
||||||
resource.setResourceInformation(splits[0].trim(), ResourceInformation
|
resource.setResourceInformation(resourceName, ResourceInformation
|
||||||
.newInstance(splits[0].trim(), units, resourceValue));
|
.newInstance(resourceName, units, resourceValue));
|
||||||
break;
|
break;
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -19,6 +19,7 @@
|
|||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||||
|
|
||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
|
import org.apache.hadoop.thirdparty.com.google.common.collect.Maps;
|
||||||
import org.apache.hadoop.yarn.api.records.ContainerId;
|
import org.apache.hadoop.yarn.api.records.ContainerId;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
@ -50,10 +51,16 @@ import org.junit.Test;
|
|||||||
|
|
||||||
import java.io.File;
|
import java.io.File;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
import java.util.Set;
|
||||||
|
import java.util.Map;
|
||||||
|
import java.util.Arrays;
|
||||||
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.FPGA_URI;
|
||||||
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
import static org.apache.hadoop.yarn.api.records.ResourceInformation.GPU_URI;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAXIMUM_ALLOCATION_MB;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
@ -248,4 +255,79 @@ public class TestCSAllocateCustomResource {
|
|||||||
.get(GPU_URI)).longValue(), 0);
|
.get(GPU_URI)).longValue(), 0);
|
||||||
ClusterMetrics.destroy();
|
ClusterMetrics.destroy();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
/**
|
||||||
|
* Test CS absolute conf with Custom resource type.
|
||||||
|
* */
|
||||||
|
@Test
|
||||||
|
public void testCapacitySchedulerAbsoluteConfWithCustomResourceType()
|
||||||
|
throws IOException {
|
||||||
|
// reset resource types
|
||||||
|
ResourceUtils.resetResourceTypes();
|
||||||
|
String resourceTypesFileName = "resource-types-test.xml";
|
||||||
|
File source = new File(
|
||||||
|
conf.getClassLoader().getResource(resourceTypesFileName).getFile());
|
||||||
|
resourceTypesFile = new File(source.getParent(), "resource-types.xml");
|
||||||
|
FileUtils.copyFile(source, resourceTypesFile);
|
||||||
|
|
||||||
|
CapacitySchedulerConfiguration newConf =
|
||||||
|
new CapacitySchedulerConfiguration(conf);
|
||||||
|
|
||||||
|
// Only memory vcores for first class.
|
||||||
|
Set<String> resourceTypes = Arrays.
|
||||||
|
stream(CapacitySchedulerConfiguration.
|
||||||
|
AbsoluteResourceType.values()).
|
||||||
|
map(value -> value.toString().toLowerCase()).
|
||||||
|
collect(Collectors.toSet());
|
||||||
|
|
||||||
|
Map<String, Long> valuesMin = Maps.newHashMap();
|
||||||
|
valuesMin.put(GPU_URI, 10L);
|
||||||
|
valuesMin.put(FPGA_URI, 10L);
|
||||||
|
valuesMin.put("testType", 10L);
|
||||||
|
|
||||||
|
Map<String, Long> valuesMax = Maps.newHashMap();
|
||||||
|
valuesMax.put(GPU_URI, 100L);
|
||||||
|
valuesMax.put(FPGA_URI, 100L);
|
||||||
|
valuesMax.put("testType", 100L);
|
||||||
|
|
||||||
|
Resource aMINRES =
|
||||||
|
Resource.newInstance(1000, 10, valuesMin);
|
||||||
|
|
||||||
|
Resource aMAXRES =
|
||||||
|
Resource.newInstance(1000, 10, valuesMax);
|
||||||
|
|
||||||
|
// Define top-level queues
|
||||||
|
newConf.setQueues(CapacitySchedulerConfiguration.ROOT,
|
||||||
|
new String[] {"a", "b", "c"});
|
||||||
|
newConf.setMinimumResourceRequirement("", "root.a",
|
||||||
|
aMINRES);
|
||||||
|
newConf.setMaximumResourceRequirement("", "root.a",
|
||||||
|
aMAXRES);
|
||||||
|
|
||||||
|
newConf.setClass(CapacitySchedulerConfiguration.RESOURCE_CALCULATOR_CLASS,
|
||||||
|
DominantResourceCalculator.class, ResourceCalculator.class);
|
||||||
|
|
||||||
|
//start RM
|
||||||
|
MockRM rm = new MockRM(newConf);
|
||||||
|
rm.start();
|
||||||
|
|
||||||
|
// Check the gpu resource conf is right.
|
||||||
|
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||||
|
Assert.assertEquals(aMINRES,
|
||||||
|
cs.getConfiguration().
|
||||||
|
getMinimumResourceRequirement("", "root.a", resourceTypes));
|
||||||
|
Assert.assertEquals(aMAXRES,
|
||||||
|
cs.getConfiguration().
|
||||||
|
getMaximumResourceRequirement("", "root.a", resourceTypes));
|
||||||
|
|
||||||
|
// Check the gpu resource of queue is right.
|
||||||
|
Assert.assertEquals(aMINRES, cs.getQueue("root.a").
|
||||||
|
getQueueResourceQuotas().getConfiguredMinResource());
|
||||||
|
Assert.assertEquals(aMAXRES, cs.getQueue("root.a").
|
||||||
|
getQueueResourceQuotas().getConfiguredMaxResource());
|
||||||
|
|
||||||
|
rm.close();
|
||||||
|
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
@ -17,6 +17,6 @@
|
|||||||
<configuration>
|
<configuration>
|
||||||
<property>
|
<property>
|
||||||
<name>yarn.resource-types</name>
|
<name>yarn.resource-types</name>
|
||||||
<value>yarn.io/gpu</value>
|
<value>yarn.io/gpu,yarn.io/fpga,testType</value>
|
||||||
</property>
|
</property>
|
||||||
</configuration>
|
</configuration>
|
Loading…
x
Reference in New Issue
Block a user