YARN-11416. FS2CS should use CapacitySchedulerConfiguration in FSQueueConverterBuilder (#5320)
Co-authored-by: Susheel Gupta <38013283+susheelgupta7@users.noreply.github.com>
This commit is contained in:
parent
600dd45e47
commit
8eb58630ec
@ -501,6 +501,10 @@ public int getMaximumSystemApplications() {
|
|||||||
return maxApplications;
|
return maxApplications;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMaximumApplicationMasterResourcePercent(float percent) {
|
||||||
|
setFloat(PREFIX + MAXIMUM_AM_RESOURCE_SUFFIX, percent);
|
||||||
|
}
|
||||||
|
|
||||||
public float getMaximumApplicationMasterResourcePercent() {
|
public float getMaximumApplicationMasterResourcePercent() {
|
||||||
return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
return getFloat(MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
||||||
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
|
DEFAULT_MAXIMUM_APPLICATIONMASTERS_RESOURCE_PERCENT);
|
||||||
@ -1222,6 +1226,16 @@ public void reinitializeConfigurationProperties() {
|
|||||||
configurationProperties = new ConfigurationProperties(props);
|
configurationProperties = new ConfigurationProperties(props);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setQueueMaximumAllocationMb(String queue, int value) {
|
||||||
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
|
setInt(queuePrefix + MAXIMUM_ALLOCATION_MB, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setQueueMaximumAllocationVcores(String queue, int value) {
|
||||||
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
|
setInt(queuePrefix + MAXIMUM_ALLOCATION_VCORES, value);
|
||||||
|
}
|
||||||
|
|
||||||
public long getQueueMaximumAllocationMb(String queue) {
|
public long getQueueMaximumAllocationMb(String queue) {
|
||||||
String queuePrefix = getQueuePrefix(queue);
|
String queuePrefix = getQueuePrefix(queue);
|
||||||
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
|
return getInt(queuePrefix + MAXIMUM_ALLOCATION_MB, (int)UNDEFINED);
|
||||||
@ -1496,6 +1510,14 @@ public List<MappingRule> parseJSONMappingRules() throws IOException {
|
|||||||
return new ArrayList<>();
|
return new ArrayList<>();
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setMappingRuleFormat(String format) {
|
||||||
|
set(MAPPING_RULE_FORMAT, format);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMappingRuleJson(String json) {
|
||||||
|
set(MAPPING_RULE_JSON, json);
|
||||||
|
}
|
||||||
|
|
||||||
public List<MappingRule> getMappingRules() throws IOException {
|
public List<MappingRule> getMappingRules() throws IOException {
|
||||||
String mappingFormat =
|
String mappingFormat =
|
||||||
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
|
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
|
||||||
@ -1712,6 +1734,14 @@ public boolean getIntraQueuePreemptionDisabled(String queue,
|
|||||||
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
|
+ QUEUE_PREEMPTION_DISABLED, defaultVal);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public void setPreemptionObserveOnly(boolean value) {
|
||||||
|
setBoolean(PREEMPTION_OBSERVE_ONLY, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean getPreemptionObserveOnly() {
|
||||||
|
return getBoolean(PREEMPTION_OBSERVE_ONLY, DEFAULT_PREEMPTION_OBSERVE_ONLY);
|
||||||
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Get configured node labels in a given queuePath.
|
* Get configured node labels in a given queuePath.
|
||||||
*
|
*
|
||||||
@ -1816,29 +1846,48 @@ public static boolean shouldAppFailFast(Configuration conf) {
|
|||||||
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
|
return conf.getBoolean(APP_FAIL_FAST, DEFAULT_APP_FAIL_FAST);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getMaxParallelAppsForQueue(String queue) {
|
public void setDefaultMaxParallelApps(int value) {
|
||||||
int defaultMaxParallelAppsForQueue =
|
setInt(PREFIX + MAX_PARALLEL_APPLICATIONS, value);
|
||||||
getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
|
}
|
||||||
|
|
||||||
|
public Integer getDefaultMaxParallelApps() {
|
||||||
|
return getInt(PREFIX + MAX_PARALLEL_APPLICATIONS,
|
||||||
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
||||||
|
}
|
||||||
|
|
||||||
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
|
public void setDefaultMaxParallelAppsPerUser(int value) {
|
||||||
+ MAX_PARALLEL_APPLICATIONS);
|
setInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS, value);
|
||||||
|
}
|
||||||
|
|
||||||
return (maxParallelAppsForQueue != null) ?
|
public Integer getDefaultMaxParallelAppsPerUser() {
|
||||||
Integer.parseInt(maxParallelAppsForQueue)
|
return getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
|
||||||
: defaultMaxParallelAppsForQueue;
|
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxParallelAppsForUser(String user, int value) {
|
||||||
|
setInt(getUserPrefix(user) + MAX_PARALLEL_APPLICATIONS, value);
|
||||||
}
|
}
|
||||||
|
|
||||||
public Integer getMaxParallelAppsForUser(String user) {
|
public Integer getMaxParallelAppsForUser(String user) {
|
||||||
int defaultMaxParallelAppsForUser =
|
|
||||||
getInt(PREFIX + "user." + MAX_PARALLEL_APPLICATIONS,
|
|
||||||
DEFAULT_MAX_PARALLEL_APPLICATIONS);
|
|
||||||
String maxParallelAppsForUser = get(getUserPrefix(user)
|
String maxParallelAppsForUser = get(getUserPrefix(user)
|
||||||
+ MAX_PARALLEL_APPLICATIONS);
|
+ MAX_PARALLEL_APPLICATIONS);
|
||||||
|
|
||||||
return (maxParallelAppsForUser != null) ?
|
return (maxParallelAppsForUser != null) ?
|
||||||
Integer.parseInt(maxParallelAppsForUser)
|
Integer.valueOf(maxParallelAppsForUser)
|
||||||
: defaultMaxParallelAppsForUser;
|
: getDefaultMaxParallelAppsPerUser();
|
||||||
|
}
|
||||||
|
|
||||||
|
public void setMaxParallelAppsForQueue(String queue, String value) {
|
||||||
|
set(getQueuePrefix(queue) + MAX_PARALLEL_APPLICATIONS, value);
|
||||||
|
}
|
||||||
|
|
||||||
|
public Integer getMaxParallelAppsForQueue(String queue) {
|
||||||
|
String maxParallelAppsForQueue = get(getQueuePrefix(queue)
|
||||||
|
+ MAX_PARALLEL_APPLICATIONS);
|
||||||
|
|
||||||
|
return (maxParallelAppsForQueue != null) ?
|
||||||
|
Integer.valueOf(maxParallelAppsForQueue)
|
||||||
|
: getDefaultMaxParallelApps();
|
||||||
}
|
}
|
||||||
|
|
||||||
public boolean getAllowZeroCapacitySum(String queue) {
|
public boolean getAllowZeroCapacitySum(String queue) {
|
||||||
|
@ -16,9 +16,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_JSON;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSQueueConverter.QUEUE_MAX_AM_SHARE_DISABLED;
|
||||||
|
|
||||||
@ -35,6 +32,7 @@
|
|||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.fs.Path;
|
import org.apache.hadoop.fs.Path;
|
||||||
import org.apache.hadoop.security.authorize.AccessControlList;
|
import org.apache.hadoop.security.authorize.AccessControlList;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.security.AccessType;
|
import org.apache.hadoop.yarn.security.AccessType;
|
||||||
@ -342,14 +340,13 @@ private void performRuleConversion(FairScheduler fs)
|
|||||||
}
|
}
|
||||||
writer.writeValue(mappingRulesOutputStream, desc);
|
writer.writeValue(mappingRulesOutputStream, desc);
|
||||||
|
|
||||||
capacitySchedulerConfig.set(MAPPING_RULE_FORMAT,
|
capacitySchedulerConfig.setMappingRuleFormat(MAPPING_RULE_FORMAT_JSON);
|
||||||
MAPPING_RULE_FORMAT_JSON);
|
|
||||||
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
|
capacitySchedulerConfig.setOverrideWithQueueMappings(true);
|
||||||
if (!rulesToFile) {
|
if (!rulesToFile) {
|
||||||
String json =
|
String json =
|
||||||
((ByteArrayOutputStream)mappingRulesOutputStream)
|
((ByteArrayOutputStream)mappingRulesOutputStream)
|
||||||
.toString(StandardCharsets.UTF_8.displayName());
|
.toString(StandardCharsets.UTF_8.displayName());
|
||||||
capacitySchedulerConfig.set(MAPPING_RULE_JSON, json);
|
capacitySchedulerConfig.setMappingRuleJson(json);
|
||||||
}
|
}
|
||||||
} else {
|
} else {
|
||||||
LOG.info("No rules to convert");
|
LOG.info("No rules to convert");
|
||||||
@ -377,38 +374,31 @@ private OutputStream getOutputStreamForJson() throws FileNotFoundException {
|
|||||||
|
|
||||||
private void emitDefaultQueueMaxParallelApplications() {
|
private void emitDefaultQueueMaxParallelApplications() {
|
||||||
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
|
if (queueMaxAppsDefault != Integer.MAX_VALUE) {
|
||||||
capacitySchedulerConfig.set(
|
capacitySchedulerConfig.setDefaultMaxParallelApps(
|
||||||
PREFIX + "max-parallel-apps",
|
queueMaxAppsDefault);
|
||||||
String.valueOf(queueMaxAppsDefault));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void emitDefaultUserMaxParallelApplications() {
|
private void emitDefaultUserMaxParallelApplications() {
|
||||||
if (userMaxAppsDefault != Integer.MAX_VALUE) {
|
if (userMaxAppsDefault != Integer.MAX_VALUE) {
|
||||||
capacitySchedulerConfig.set(
|
capacitySchedulerConfig.setDefaultMaxParallelAppsPerUser(
|
||||||
PREFIX + "user.max-parallel-apps",
|
userMaxAppsDefault);
|
||||||
String.valueOf(userMaxAppsDefault));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private void emitUserMaxParallelApplications() {
|
private void emitUserMaxParallelApplications() {
|
||||||
userMaxApps
|
userMaxApps
|
||||||
.forEach((user, apps) -> {
|
.forEach((user, apps) -> {
|
||||||
capacitySchedulerConfig.setInt(
|
capacitySchedulerConfig.setMaxParallelAppsForUser(user, apps);
|
||||||
PREFIX + "user." + user + ".max-parallel-apps", apps);
|
|
||||||
});
|
});
|
||||||
}
|
}
|
||||||
|
|
||||||
private void emitDefaultMaxAMShare() {
|
private void emitDefaultMaxAMShare() {
|
||||||
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
|
if (queueMaxAMShareDefault == QUEUE_MAX_AM_SHARE_DISABLED) {
|
||||||
capacitySchedulerConfig.setFloat(
|
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
|
||||||
CapacitySchedulerConfiguration.
|
|
||||||
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
||||||
1.0f);
|
1.0f);
|
||||||
} else {
|
} else {
|
||||||
capacitySchedulerConfig.setFloat(
|
capacitySchedulerConfig.setMaximumApplicationMasterResourcePercent(
|
||||||
CapacitySchedulerConfiguration.
|
|
||||||
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT,
|
|
||||||
queueMaxAMShareDefault);
|
queueMaxAMShareDefault);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -416,8 +406,7 @@ private void emitDisablePreemptionForObserveOnlyMode() {
|
|||||||
if (preemptionMode == FSConfigToCSConfigConverterParams
|
if (preemptionMode == FSConfigToCSConfigConverterParams
|
||||||
.PreemptionMode.OBSERVE_ONLY) {
|
.PreemptionMode.OBSERVE_ONLY) {
|
||||||
capacitySchedulerConfig.
|
capacitySchedulerConfig.
|
||||||
setBoolean(CapacitySchedulerConfiguration.
|
setPreemptionObserveOnly(true);
|
||||||
PREEMPTION_OBSERVE_ONLY, true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -433,13 +422,13 @@ private void generateQueueAcl(String queue,
|
|||||||
|
|
||||||
if (!submitAcls.getGroups().isEmpty() ||
|
if (!submitAcls.getGroups().isEmpty() ||
|
||||||
!submitAcls.getUsers().isEmpty() || submitAcls.isAllAllowed()) {
|
!submitAcls.getUsers().isEmpty() || submitAcls.isAllAllowed()) {
|
||||||
capacitySchedulerConfig.set(PREFIX + queue + ".acl_submit_applications",
|
capacitySchedulerConfig.setAcl(queue, QueueACL.SUBMIT_APPLICATIONS,
|
||||||
submitAcls.getAclString());
|
submitAcls.getAclString());
|
||||||
}
|
}
|
||||||
|
|
||||||
if (!adminAcls.getGroups().isEmpty() ||
|
if (!adminAcls.getGroups().isEmpty() ||
|
||||||
!adminAcls.getUsers().isEmpty() || adminAcls.isAllAllowed()) {
|
!adminAcls.getUsers().isEmpty() || adminAcls.isAllAllowed()) {
|
||||||
capacitySchedulerConfig.set(PREFIX + queue + ".acl_administer_queue",
|
capacitySchedulerConfig.setAcl(queue, QueueACL.ADMINISTER_QUEUE,
|
||||||
adminAcls.getAclString());
|
adminAcls.getAclString());
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -501,7 +490,7 @@ Configuration getYarnSiteConfig() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
Configuration getCapacitySchedulerConfig() {
|
CapacitySchedulerConfiguration getCapacitySchedulerConfig() {
|
||||||
return capacitySchedulerConfig;
|
return capacitySchedulerConfig;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -16,16 +16,11 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.AUTO_QUEUE_CREATION_V2_ENABLED;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_AUTO_QUEUE_CREATION_ENABLED;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DOT;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.ConfigurableResource;
|
||||||
@ -50,7 +45,7 @@ public class FSQueueConverter {
|
|||||||
private static final String FIFO_POLICY = "fifo";
|
private static final String FIFO_POLICY = "fifo";
|
||||||
|
|
||||||
private final FSConfigToCSConfigRuleHandler ruleHandler;
|
private final FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
private Configuration capacitySchedulerConfig;
|
private CapacitySchedulerConfiguration capacitySchedulerConfig;
|
||||||
private final boolean preemptionEnabled;
|
private final boolean preemptionEnabled;
|
||||||
private final boolean sizeBasedWeight;
|
private final boolean sizeBasedWeight;
|
||||||
@SuppressWarnings("unused")
|
@SuppressWarnings("unused")
|
||||||
@ -106,11 +101,10 @@ private void emitChildQueues(String queueName, List<FSQueue> children) {
|
|||||||
ruleHandler.handleChildQueueCount(queueName, children.size());
|
ruleHandler.handleChildQueueCount(queueName, children.size());
|
||||||
|
|
||||||
if (children.size() > 0) {
|
if (children.size() > 0) {
|
||||||
String childQueues = children.stream()
|
List<String> childQueues = children.stream()
|
||||||
.map(child -> getQueueShortName(child.getName()))
|
.map(child -> getQueueShortName(child.getName()))
|
||||||
.collect(Collectors.joining(","));
|
.collect(Collectors.toList());
|
||||||
|
capacitySchedulerConfig.setQueues(queueName, childQueues.toArray(new String[0]));
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName + ".queues", childQueues);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -127,14 +121,14 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
|
|||||||
if (queueMaxAmShare != 0.0f
|
if (queueMaxAmShare != 0.0f
|
||||||
&& queueMaxAmShare != queueMaxAMShareDefault
|
&& queueMaxAmShare != queueMaxAMShareDefault
|
||||||
&& queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
|
&& queueMaxAmShare != QUEUE_MAX_AM_SHARE_DISABLED) {
|
||||||
capacitySchedulerConfig.setFloat(PREFIX + queueName +
|
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
".maximum-am-resource-percent", queueMaxAmShare);
|
queueName, queueMaxAmShare);
|
||||||
}
|
}
|
||||||
|
|
||||||
if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
|
if (queueMaxAmShare == QUEUE_MAX_AM_SHARE_DISABLED
|
||||||
&& queueMaxAmShare != queueMaxAMShareDefault) {
|
&& queueMaxAmShare != queueMaxAMShareDefault) {
|
||||||
capacitySchedulerConfig.setFloat(PREFIX + queueName +
|
capacitySchedulerConfig.setMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
".maximum-am-resource-percent", 1.0f);
|
queueName, 1.0f);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -147,7 +141,7 @@ private void emitMaxAMShare(String queueName, FSQueue queue) {
|
|||||||
private void emitMaxParallelApps(String queueName, FSQueue queue) {
|
private void emitMaxParallelApps(String queueName, FSQueue queue) {
|
||||||
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
|
if (queue.getMaxRunningApps() != MAX_RUNNING_APPS_UNSET
|
||||||
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
|
&& queue.getMaxRunningApps() != queueMaxAppsDefault) {
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName + ".max-parallel-apps",
|
capacitySchedulerConfig.setMaxParallelAppsForQueue(queueName,
|
||||||
String.valueOf(queue.getMaxRunningApps()));
|
String.valueOf(queue.getMaxRunningApps()));
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -167,8 +161,8 @@ private void emitMaximumCapacity(String queueName, FSQueue queue) {
|
|||||||
ruleHandler.handleMaxResources();
|
ruleHandler.handleMaxResources();
|
||||||
}
|
}
|
||||||
|
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName + ".maximum-capacity",
|
capacitySchedulerConfig.setMaximumCapacity(queueName,
|
||||||
"100");
|
100.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
@ -182,7 +176,7 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {
|
|||||||
Resource maxAllocation = queue.getMaximumContainerAllocation();
|
Resource maxAllocation = queue.getMaximumContainerAllocation();
|
||||||
|
|
||||||
if (isNotUnboundedResource(maxAllocation)) {
|
if (isNotUnboundedResource(maxAllocation)) {
|
||||||
long parentMaxVcores = Integer.MIN_VALUE;
|
int parentMaxVcores = Integer.MIN_VALUE;
|
||||||
long parentMaxMemory = Integer.MIN_VALUE;
|
long parentMaxMemory = Integer.MIN_VALUE;
|
||||||
|
|
||||||
if (queue.getParent() != null) {
|
if (queue.getParent() != null) {
|
||||||
@ -194,16 +188,16 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
long maxVcores = maxAllocation.getVirtualCores();
|
int maxVcores = maxAllocation.getVirtualCores();
|
||||||
long maxMemory = maxAllocation.getMemorySize();
|
long maxMemory = maxAllocation.getMemorySize();
|
||||||
|
|
||||||
// only emit max allocation if it differs from the parent's setting
|
// only emit max allocation if it differs from the parent's setting
|
||||||
if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
|
if (maxVcores != parentMaxVcores || maxMemory != parentMaxMemory) {
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName +
|
capacitySchedulerConfig.setQueueMaximumAllocationMb(
|
||||||
".maximum-allocation-mb", String.valueOf(maxMemory));
|
queueName, (int) maxMemory);
|
||||||
|
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName +
|
capacitySchedulerConfig.setQueueMaximumAllocationVcores(
|
||||||
".maximum-allocation-vcores", String.valueOf(maxVcores));
|
queueName, maxVcores);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@ -216,17 +210,14 @@ private void emitMaxAllocations(String queueName, FSQueue queue) {
|
|||||||
*/
|
*/
|
||||||
private void emitPreemptionDisabled(String queueName, FSQueue queue) {
|
private void emitPreemptionDisabled(String queueName, FSQueue queue) {
|
||||||
if (preemptionEnabled && !queue.isPreemptable()) {
|
if (preemptionEnabled && !queue.isPreemptable()) {
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName + ".disable_preemption",
|
capacitySchedulerConfig.setPreemptionDisabled(queueName, true);
|
||||||
"true");
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children) {
|
public void emitDefaultUserLimitFactor(String queueName, List<FSQueue> children) {
|
||||||
if (children.isEmpty() && checkAutoQueueCreationV2Disabled(queueName)) {
|
if (children.isEmpty() &&
|
||||||
capacitySchedulerConfig.setFloat(
|
!capacitySchedulerConfig.isAutoQueueCreationV2Enabled(queueName)) {
|
||||||
CapacitySchedulerConfiguration.
|
capacitySchedulerConfig.setUserLimitFactor(queueName, -1.0f);
|
||||||
PREFIX + queueName + DOT + USER_LIMIT_FACTOR,
|
|
||||||
-1.0f);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -255,19 +246,16 @@ private void emitOrderingPolicy(String queueName, FSQueue queue) {
|
|||||||
|
|
||||||
switch (policy) {
|
switch (policy) {
|
||||||
case DominantResourceFairnessPolicy.NAME:
|
case DominantResourceFairnessPolicy.NAME:
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName
|
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
|
||||||
+ ".ordering-policy", FAIR_POLICY);
|
|
||||||
break;
|
break;
|
||||||
case FairSharePolicy.NAME:
|
case FairSharePolicy.NAME:
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName
|
capacitySchedulerConfig.setOrderingPolicy(queueName, FAIR_POLICY);
|
||||||
+ ".ordering-policy", FAIR_POLICY);
|
|
||||||
if (drfUsed) {
|
if (drfUsed) {
|
||||||
ruleHandler.handleFairAsDrf(queueName);
|
ruleHandler.handleFairAsDrf(queueName);
|
||||||
}
|
}
|
||||||
break;
|
break;
|
||||||
case FifoPolicy.NAME:
|
case FifoPolicy.NAME:
|
||||||
capacitySchedulerConfig.set(PREFIX + queueName
|
capacitySchedulerConfig.setOrderingPolicy(queueName, FIFO_POLICY);
|
||||||
+ ".ordering-policy", FIFO_POLICY);
|
|
||||||
break;
|
break;
|
||||||
default:
|
default:
|
||||||
String msg = String.format("Unexpected ordering policy " +
|
String msg = String.format("Unexpected ordering policy " +
|
||||||
@ -311,12 +299,6 @@ private void checkMaxChildCapacitySetting(FSQueue queue) {
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean checkAutoQueueCreationV2Disabled(String queueName) {
|
|
||||||
return !capacitySchedulerConfig.getBoolean(
|
|
||||||
PREFIX + queueName + DOT + AUTO_QUEUE_CREATION_V2_ENABLED,
|
|
||||||
DEFAULT_AUTO_QUEUE_CREATION_ENABLED);
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getQueueShortName(String queueName) {
|
private String getQueueShortName(String queueName) {
|
||||||
int lastDot = queueName.lastIndexOf(".");
|
int lastDot = queueName.lastIndexOf(".");
|
||||||
return queueName.substring(lastDot + 1);
|
return queueName.substring(lastDot + 1);
|
||||||
|
@ -18,13 +18,13 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
|
||||||
@SuppressWarnings({"checkstyle:visibilitymodifier", "checkstyle:hiddenfield"})
|
@SuppressWarnings({"checkstyle:visibilitymodifier", "checkstyle:hiddenfield"})
|
||||||
public final class FSQueueConverterBuilder {
|
public final class FSQueueConverterBuilder {
|
||||||
FSConfigToCSConfigRuleHandler ruleHandler;
|
FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
Configuration capacitySchedulerConfig;
|
CapacitySchedulerConfiguration capacitySchedulerConfig;
|
||||||
boolean preemptionEnabled;
|
boolean preemptionEnabled;
|
||||||
boolean sizeBasedWeight;
|
boolean sizeBasedWeight;
|
||||||
Resource clusterResource;
|
Resource clusterResource;
|
||||||
@ -48,8 +48,8 @@ public FSQueueConverterBuilder withRuleHandler(
|
|||||||
}
|
}
|
||||||
|
|
||||||
public FSQueueConverterBuilder withCapacitySchedulerConfig(
|
public FSQueueConverterBuilder withCapacitySchedulerConfig(
|
||||||
Configuration config) {
|
CapacitySchedulerConfiguration capacitySchedulerConfig) {
|
||||||
this.capacitySchedulerConfig = config;
|
this.capacitySchedulerConfig = capacitySchedulerConfig;
|
||||||
return this;
|
return this;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,9 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
|
||||||
public interface CapacityConverter {
|
public interface CapacityConverter {
|
||||||
void convertWeightsForChildQueues(FSQueue queue, Configuration csConfig);
|
void convertWeightsForChildQueues(FSQueue queue, CapacitySchedulerConfiguration csConfig);
|
||||||
}
|
}
|
||||||
|
@ -18,8 +18,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
|
||||||
|
|
||||||
import java.math.BigDecimal;
|
import java.math.BigDecimal;
|
||||||
import java.math.RoundingMode;
|
import java.math.RoundingMode;
|
||||||
import java.util.Comparator;
|
import java.util.Comparator;
|
||||||
@ -29,8 +27,8 @@
|
|||||||
import java.util.stream.Collectors;
|
import java.util.stream.Collectors;
|
||||||
|
|
||||||
import org.apache.commons.lang3.tuple.Pair;
|
import org.apache.commons.lang3.tuple.Pair;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
|
||||||
import org.apache.hadoop.classification.VisibleForTesting;
|
import org.apache.hadoop.classification.VisibleForTesting;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
|
||||||
public class WeightToPercentConverter
|
public class WeightToPercentConverter
|
||||||
@ -41,7 +39,7 @@ public class WeightToPercentConverter
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void convertWeightsForChildQueues(FSQueue queue,
|
public void convertWeightsForChildQueues(FSQueue queue,
|
||||||
Configuration csConfig) {
|
CapacitySchedulerConfiguration csConfig) {
|
||||||
List<FSQueue> children = queue.getChildQueues();
|
List<FSQueue> children = queue.getChildQueues();
|
||||||
|
|
||||||
int totalWeight = getTotalWeight(children);
|
int totalWeight = getTotalWeight(children);
|
||||||
@ -52,13 +50,11 @@ public void convertWeightsForChildQueues(FSQueue queue,
|
|||||||
boolean shouldAllowZeroSumCapacity = result.getRight();
|
boolean shouldAllowZeroSumCapacity = result.getRight();
|
||||||
|
|
||||||
capacities
|
capacities
|
||||||
.forEach((key, value) -> csConfig.set(PREFIX + key +
|
.forEach((key, value) -> csConfig.setCapacity(key, value.toString()));
|
||||||
".capacity", value.toString()));
|
|
||||||
|
|
||||||
if (shouldAllowZeroSumCapacity) {
|
if (shouldAllowZeroSumCapacity) {
|
||||||
String queueName = queue.getName();
|
String queueName = queue.getName();
|
||||||
csConfig.setBoolean(
|
csConfig.setAllowZeroCapacitySum(queueName, true);
|
||||||
PREFIX + queueName + ".allow-zero-capacity-sum", true);
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -18,11 +18,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.weightconversion;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
|
||||||
|
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSParentQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
|
|
||||||
@ -32,29 +30,17 @@ public class WeightToWeightConverter
|
|||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void convertWeightsForChildQueues(FSQueue queue,
|
public void convertWeightsForChildQueues(FSQueue queue,
|
||||||
Configuration csConfig) {
|
CapacitySchedulerConfiguration csConfig) {
|
||||||
List<FSQueue> children = queue.getChildQueues();
|
List<FSQueue> children = queue.getChildQueues();
|
||||||
|
|
||||||
if (queue instanceof FSParentQueue || !children.isEmpty()) {
|
if (queue instanceof FSParentQueue || !children.isEmpty()) {
|
||||||
if (queue.getName().equals(ROOT_QUEUE)) {
|
if (queue.getName().equals(ROOT_QUEUE)) {
|
||||||
csConfig.set(getProperty(queue), getWeightString(queue));
|
csConfig.setNonLabeledQueueWeight(queue.getName(), queue.getWeight());
|
||||||
}
|
}
|
||||||
|
|
||||||
children.forEach(fsQueue -> csConfig.set(
|
children.forEach(fsQueue -> csConfig.setNonLabeledQueueWeight(
|
||||||
getProperty(fsQueue), getWeightString(fsQueue)));
|
fsQueue.getName(), fsQueue.getWeight()));
|
||||||
csConfig.setBoolean(getAutoCreateV2EnabledProperty(queue), true);
|
csConfig.setAutoQueueCreationV2Enabled(queue.getName(), true);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
private String getProperty(FSQueue queue) {
|
|
||||||
return PREFIX + queue.getName() + ".capacity";
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getAutoCreateV2EnabledProperty(FSQueue queue) {
|
|
||||||
return PREFIX + queue.getName() + ".auto-queue-creation-v2.enabled";
|
|
||||||
}
|
|
||||||
|
|
||||||
private String getWeightString(FSQueue queue) {
|
|
||||||
return Float.toString(queue.getWeight()) + "w";
|
|
||||||
}
|
|
||||||
}
|
}
|
||||||
|
@ -16,8 +16,6 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.USER_LIMIT_FACTOR;
|
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.DYNAMIC_MAX_ASSIGN;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.DYNAMIC_MAX_ASSIGN;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CAPACITY_PERCENTAGE;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CAPACITY_PERCENTAGE;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.MAX_CHILD_CAPACITY;
|
||||||
@ -33,7 +31,6 @@
|
|||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.ABORT;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.ABORT;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.WARNING;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter.FSConfigToCSConfigRuleHandler.RuleAction.WARNING;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
import static org.junit.Assert.fail;
|
import static org.junit.Assert.fail;
|
||||||
import static org.mockito.ArgumentMatchers.any;
|
import static org.mockito.ArgumentMatchers.any;
|
||||||
@ -49,6 +46,7 @@
|
|||||||
import org.apache.commons.io.FileUtils;
|
import org.apache.commons.io.FileUtils;
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
import org.apache.hadoop.service.ServiceStateException;
|
import org.apache.hadoop.service.ServiceStateException;
|
||||||
|
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||||
import org.apache.hadoop.yarn.api.records.Resource;
|
import org.apache.hadoop.yarn.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
@ -172,41 +170,44 @@ private void createConverter() {
|
|||||||
public void testDefaultMaxAMShare() throws Exception {
|
public void testDefaultMaxAMShare() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
String maxAmShare =
|
Float maxAmShare =
|
||||||
conf.get(CapacitySchedulerConfiguration.
|
conf.getMaximumApplicationMasterResourcePercent();
|
||||||
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT);
|
|
||||||
|
|
||||||
assertEquals("Default max AM share", "0.16", maxAmShare);
|
assertEquals("Default max AM share", 0.16f, maxAmShare, 0.0f);
|
||||||
|
|
||||||
assertEquals("root.admins.alice max-am-resource-percent", "0.15",
|
assertEquals("root.admins.alice max-am-resource-percent", 0.15f,
|
||||||
conf.get(PREFIX + "root.admins.alice.maximum-am-resource-percent"));
|
conf.getMaximumApplicationMasterResourcePerQueuePercent("root.admins.alice"),
|
||||||
|
0.0f);
|
||||||
|
|
||||||
assertNull("root.users.joe maximum-am-resource-percent should be null",
|
//root.users.joe don’t have maximum-am-resource-percent set
|
||||||
conf.get(PREFIX + "root.users.joe.maximum-am-resource-percent"));
|
// so falling back to the global value
|
||||||
|
assertEquals("root.users.joe maximum-am-resource-percent", 0.16f,
|
||||||
|
conf.getMaximumApplicationMasterResourcePerQueuePercent("root.users.joe"),
|
||||||
|
0.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultUserLimitFactor() throws Exception {
|
public void testDefaultUserLimitFactor() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
assertNull("root.users user-limit-factor should be null",
|
assertEquals("root.users user-limit-factor", 1.0f,
|
||||||
conf.get(PREFIX + "root.users." + USER_LIMIT_FACTOR));
|
conf.getUserLimitFactor("root.users"), 0.0f);
|
||||||
assertEquals("root.users auto-queue-creation-v2.enabled", "true",
|
assertEquals("root.users auto-queue-creation-v2.enabled", true,
|
||||||
conf.get(PREFIX + "root.users.auto-queue-creation-v2.enabled"));
|
conf.isAutoQueueCreationV2Enabled("root.users"));
|
||||||
|
|
||||||
assertEquals("root.default user-limit-factor", "-1.0",
|
assertEquals("root.default user-limit-factor", -1.0f,
|
||||||
conf.get(PREFIX + "root.default.user-limit-factor"));
|
conf.getUserLimitFactor("root.default"), 0.0f);
|
||||||
|
|
||||||
assertEquals("root.users.joe user-limit-factor", "-1.0",
|
assertEquals("root.users.joe user-limit-factor", -1.0f,
|
||||||
conf.get(PREFIX + "root.users.joe.user-limit-factor"));
|
conf.getUserLimitFactor("root.users.joe"), 0.0f);
|
||||||
|
|
||||||
assertEquals("root.admins.bob user-limit-factor", "-1.0",
|
assertEquals("root.admins.bob user-limit-factor", -1.0f,
|
||||||
conf.get(PREFIX + "root.admins.bob.user-limit-factor"));
|
conf.getUserLimitFactor("root.admins.bob"), 0.0f);
|
||||||
assertNull("root.admin.bob auto-queue-creation-v2.enabled should be null",
|
assertEquals("root.admin.bob auto-queue-creation-v2.enabled", false,
|
||||||
conf.get(PREFIX + "root.admin.bob.auto-queue-creation-v2.enabled"));
|
conf.isAutoQueueCreationV2Enabled("root.admin.bob"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -218,110 +219,109 @@ public void testDefaultMaxAMShareDisabled() throws Exception {
|
|||||||
|
|
||||||
converter.convert(params);
|
converter.convert(params);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
// -1.0 means disabled ==> 1.0 in CS
|
// -1.0 means disabled ==> 1.0 in CS
|
||||||
assertEquals("Default max-am-resource-percent", "1.0",
|
assertEquals("Default max-am-resource-percent", 1.0f,
|
||||||
conf.get(CapacitySchedulerConfiguration.
|
conf.getMaximumApplicationMasterResourcePercent(), 0.0f);
|
||||||
MAXIMUM_APPLICATION_MASTERS_RESOURCE_PERCENT));
|
|
||||||
|
|
||||||
// root.admins.bob -1.0 equals to the default -1.0
|
// root.admins.bob is unset,so falling back to the global value
|
||||||
assertNull("root.admins.bob maximum-am-resource-percent should be null",
|
assertEquals("root.admins.bob maximum-am-resource-percent", 1.0f,
|
||||||
conf.get(PREFIX + "root.admins.bob.maximum-am-resource-percent"));
|
conf.getMaximumApplicationMasterResourcePerQueuePercent("root.admins.bob"), 0.0f);
|
||||||
|
|
||||||
// root.admins.alice 0.15 != -1.0
|
// root.admins.alice 0.15 != -1.0
|
||||||
assertEquals("root.admins.alice max-am-resource-percent", "0.15",
|
assertEquals("root.admins.alice max-am-resource-percent", 0.15f,
|
||||||
conf.get(PREFIX + "root.admins.alice.maximum-am-resource-percent"));
|
conf.getMaximumApplicationMasterResourcePerQueuePercent("root.admins.alice"), 0.0f);
|
||||||
|
|
||||||
// root.users.joe is unset, inherits -1.0
|
// root.users.joe is unset,so falling back to the global value
|
||||||
assertNull("root.users.joe maximum-am-resource-percent should be null",
|
assertEquals("root.users.joe maximum-am-resource-percent", 1.0f,
|
||||||
conf.get(PREFIX + "root.users.joe.maximum-am-resource-percent"));
|
conf.getMaximumApplicationMasterResourcePerQueuePercent("root.users.joe"), 0.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testConvertACLs() throws Exception {
|
public void testConvertACLs() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
// root
|
// root
|
||||||
assertEquals("root submit ACL", "alice,bob,joe,john hadoop_users",
|
assertEquals("root submit ACL", "alice,bob,joe,john hadoop_users",
|
||||||
conf.get(PREFIX + "root.acl_submit_applications"));
|
conf.getAcl("root", QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
||||||
assertEquals("root admin ACL", "alice,bob,joe,john hadoop_users",
|
assertEquals("root admin ACL", "alice,bob,joe,john hadoop_users",
|
||||||
conf.get(PREFIX + "root.acl_administer_queue"));
|
conf.getAcl("root", QueueACL.ADMINISTER_QUEUE).getAclString());
|
||||||
|
|
||||||
// root.admins.bob
|
// root.admins.bob
|
||||||
assertEquals("root.admins.bob submit ACL", "bob ",
|
assertEquals("root.admins.bob submit ACL", "bob ",
|
||||||
conf.get(PREFIX + "root.admins.bob.acl_submit_applications"));
|
conf.getAcl("root.admins.bob", QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
||||||
assertEquals("root.admins.bob admin ACL", "bob ",
|
assertEquals("root.admins.bob admin ACL", "bob ",
|
||||||
conf.get(PREFIX + "root.admins.bob.acl_administer_queue"));
|
conf.getAcl("root.admins.bob", QueueACL.ADMINISTER_QUEUE).getAclString());
|
||||||
|
|
||||||
// root.admins.alice
|
// root.admins.alice
|
||||||
assertEquals("root.admins.alice submit ACL", "alice ",
|
assertEquals("root.admins.alice submit ACL", "alice ",
|
||||||
conf.get(PREFIX + "root.admins.alice.acl_submit_applications"));
|
conf.getAcl("root.admins.alice", QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
||||||
assertEquals("root.admins.alice admin ACL", "alice ",
|
assertEquals("root.admins.alice admin ACL", "alice ",
|
||||||
conf.get(PREFIX + "root.admins.alice.acl_administer_queue"));
|
conf.getAcl("root.admins.alice", QueueACL.ADMINISTER_QUEUE).getAclString());
|
||||||
|
|
||||||
// root.users.john
|
// root.users.john
|
||||||
assertEquals("root.users.john submit ACL", "*",
|
assertEquals("root.users.john submit ACL", "*",
|
||||||
conf.get(PREFIX + "root.users.john.acl_submit_applications"));
|
conf.getAcl("root.users.john", QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
||||||
assertEquals("root.users.john admin ACL", "*",
|
assertEquals("root.users.john admin ACL", "*",
|
||||||
conf.get(PREFIX + "root.users.john.acl_administer_queue"));
|
conf.getAcl("root.users.john", QueueACL.ADMINISTER_QUEUE).getAclString());
|
||||||
|
|
||||||
// root.users.joe
|
// root.users.joe
|
||||||
assertEquals("root.users.joe submit ACL", "joe ",
|
assertEquals("root.users.joe submit ACL", "joe ",
|
||||||
conf.get(PREFIX + "root.users.joe.acl_submit_applications"));
|
conf.getAcl("root.users.joe", QueueACL.SUBMIT_APPLICATIONS).getAclString());
|
||||||
assertEquals("root.users.joe admin ACL", "joe ",
|
assertEquals("root.users.joe admin ACL", "joe ",
|
||||||
conf.get(PREFIX + "root.users.joe.acl_administer_queue"));
|
conf.getAcl("root.users.joe", QueueACL.ADMINISTER_QUEUE).getAclString());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultQueueMaxParallelApps() throws Exception {
|
public void testDefaultQueueMaxParallelApps() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
assertEquals("Default max parallel apps", 15,
|
assertEquals("Default max parallel apps", 15,
|
||||||
conf.getInt(PREFIX + "max-parallel-apps", -1));
|
conf.getDefaultMaxParallelApps(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSpecificQueueMaxParallelApps() throws Exception {
|
public void testSpecificQueueMaxParallelApps() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
assertEquals("root.admins.alice max parallel apps", 2,
|
assertEquals("root.admins.alice max parallel apps", 2,
|
||||||
conf.getInt(PREFIX + "root.admins.alice.max-parallel-apps", -1));
|
conf.getMaxParallelAppsForQueue("root.admins.alice"), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testDefaultUserMaxParallelApps() throws Exception {
|
public void testDefaultUserMaxParallelApps() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
int userMaxParallelApps =
|
|
||||||
conf.getInt(
|
|
||||||
PREFIX + "user.max-parallel-apps", -1);
|
|
||||||
|
|
||||||
assertEquals("Default user max parallel apps", 10,
|
assertEquals("Default user max parallel apps", 10,
|
||||||
userMaxParallelApps);
|
conf.getDefaultMaxParallelAppsPerUser(), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSpecificUserMaxParallelApps() throws Exception {
|
public void testSpecificUserMaxParallelApps() throws Exception {
|
||||||
converter.convert(config);
|
converter.convert(config);
|
||||||
|
|
||||||
Configuration conf = converter.getCapacitySchedulerConfig();
|
CapacitySchedulerConfiguration conf = converter.getCapacitySchedulerConfig();
|
||||||
|
|
||||||
assertEquals("Max parallel apps for alice", 30,
|
assertEquals("Max parallel apps for alice", 30,
|
||||||
conf.getInt(PREFIX + "user.alice.max-parallel-apps", -1));
|
conf.getMaxParallelAppsForUser("alice"), 0);
|
||||||
assertNull("Max parallel apps should be undefined for user bob",
|
|
||||||
conf.get(PREFIX + "user.bob.max-parallel-apps"));
|
//users.bob, user.joe, user.john don’t have max-parallel-app set
|
||||||
assertNull("Max parallel apps should be undefined for user joe",
|
// so falling back to the global value for .user to 10
|
||||||
conf.get(PREFIX + "user.joe.max-parallel-apps"));
|
assertEquals("Max parallel apps for user bob", 10,
|
||||||
assertNull("Max parallel apps should be undefined for user john",
|
conf.getMaxParallelAppsForUser("bob"), 0);
|
||||||
conf.get(PREFIX + "user.john.max-parallel-apps"));
|
assertEquals("Max parallel apps for user joe", 10,
|
||||||
|
conf.getMaxParallelAppsForUser("joe"), 0);
|
||||||
|
assertEquals("Max parallel apps for user john", 10,
|
||||||
|
conf.getMaxParallelAppsForUser("john"), 0);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -722,8 +722,7 @@ public void testSiteDisabledPreemptionWithObserveOnlyConversion()
|
|||||||
converter.convert(params);
|
converter.convert(params);
|
||||||
assertTrue("The observe only should be true",
|
assertTrue("The observe only should be true",
|
||||||
converter.getCapacitySchedulerConfig().
|
converter.getCapacitySchedulerConfig().
|
||||||
getBoolean(CapacitySchedulerConfiguration.
|
getPreemptionObserveOnly());
|
||||||
PREEMPTION_OBSERVE_ONLY, false));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
|
private boolean testConversionWithAsyncSchedulingOption(boolean enabled) throws Exception {
|
||||||
|
@ -16,7 +16,9 @@
|
|||||||
|
|
||||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.converter;
|
||||||
|
|
||||||
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.DEFAULT_MAX_PARALLEL_APPLICATIONS;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration.PREFIX;
|
||||||
|
import static org.junit.Assert.assertArrayEquals;
|
||||||
import static org.junit.Assert.assertEquals;
|
import static org.junit.Assert.assertEquals;
|
||||||
import static org.junit.Assert.assertNull;
|
import static org.junit.Assert.assertNull;
|
||||||
import static org.junit.Assert.assertTrue;
|
import static org.junit.Assert.assertTrue;
|
||||||
@ -35,6 +37,8 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
import org.apache.hadoop.yarn.server.resourcemanager.RMContextImpl;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairSchedulerConfiguration;
|
||||||
@ -81,12 +85,13 @@ private static String prepareFileName(String f) {
|
|||||||
|
|
||||||
private FSQueueConverter converter;
|
private FSQueueConverter converter;
|
||||||
private Configuration yarnConfig;
|
private Configuration yarnConfig;
|
||||||
private Configuration csConfig;
|
private CapacitySchedulerConfiguration csConfig;
|
||||||
private FairScheduler fs;
|
private FairScheduler fs;
|
||||||
private FSQueue rootQueue;
|
private FSQueue rootQueue;
|
||||||
private ConversionOptions conversionOptions;
|
private ConversionOptions conversionOptions;
|
||||||
private DryRunResultHolder dryRunResultHolder;
|
private DryRunResultHolder dryRunResultHolder;
|
||||||
private FSQueueConverterBuilder builder;
|
private FSQueueConverterBuilder builder;
|
||||||
|
private String key;
|
||||||
|
|
||||||
@Mock
|
@Mock
|
||||||
private FSConfigToCSConfigRuleHandler ruleHandler;
|
private FSConfigToCSConfigRuleHandler ruleHandler;
|
||||||
@ -100,7 +105,8 @@ public void setup() {
|
|||||||
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
yarnConfig.set(FairSchedulerConfiguration.ALLOCATION_FILE,
|
||||||
FAIR_SCHEDULER_XML);
|
FAIR_SCHEDULER_XML);
|
||||||
yarnConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
yarnConfig.setBoolean(FairSchedulerConfiguration.MIGRATION_MODE, true);
|
||||||
csConfig = new Configuration(false);
|
csConfig = new CapacitySchedulerConfiguration(
|
||||||
|
new Configuration(false));
|
||||||
dryRunResultHolder = new DryRunResultHolder();
|
dryRunResultHolder = new DryRunResultHolder();
|
||||||
conversionOptions =
|
conversionOptions =
|
||||||
new ConversionOptions(dryRunResultHolder, false);
|
new ConversionOptions(dryRunResultHolder, false);
|
||||||
@ -149,20 +155,20 @@ public void testConvertQueueHierarchy() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root children
|
// root children
|
||||||
assertEquals("root children", "admins,users,misc,default",
|
assertArrayEquals("root children", new String[]{"admins", "users", "misc", "default"},
|
||||||
csConfig.get(PREFIX + "root.queues"));
|
csConfig.getQueues("root"));
|
||||||
|
|
||||||
// root.admins children
|
// root.admins children
|
||||||
assertEquals("root.admins children", "bob,alice",
|
assertArrayEquals("root.admins children", new String[]{"bob", "alice"},
|
||||||
csConfig.get(PREFIX + "root.admins.queues"));
|
csConfig.getQueues("root.admins"));
|
||||||
|
|
||||||
// root.default children - none
|
// root.default children - none
|
||||||
assertNull("root.default children", csConfig.get(PREFIX + "root.default" +
|
assertNull("root.default children",
|
||||||
".queues"));
|
csConfig.getQueues("root.default"));
|
||||||
|
|
||||||
// root.users children
|
// root.users children
|
||||||
assertEquals("root.users children", "john,joe",
|
assertArrayEquals("root.users children", new String[]{"john", "joe"},
|
||||||
csConfig.get(PREFIX + "root.users.queues"));
|
csConfig.getQueues("root.users"));
|
||||||
|
|
||||||
Set<String> leafs = Sets.difference(ALL_QUEUES,
|
Set<String> leafs = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root",
|
Sets.newHashSet("root",
|
||||||
@ -171,7 +177,12 @@ public void testConvertQueueHierarchy() {
|
|||||||
"root.users",
|
"root.users",
|
||||||
"root.misc"));
|
"root.misc"));
|
||||||
|
|
||||||
assertNoValueForQueues(leafs, ".queues", csConfig);
|
for (String queue : leafs) {
|
||||||
|
key = PREFIX + queue + ".queues";
|
||||||
|
assertNull("Key " + key + " has value, but it should be null",
|
||||||
|
csConfig.getQueues(queue));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -181,18 +192,24 @@ public void testQueueMaxAMShare() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root.admins.bob
|
// root.admins.bob
|
||||||
assertEquals("root.admins.bob AM share", "1.0",
|
assertEquals("root.admins.bob AM share", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.admins.bob.maximum-am-resource-percent"));
|
csConfig.getMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
|
"root.admins.bob"), 0.0f);
|
||||||
|
|
||||||
// root.admins.alice
|
// root.admins.alice
|
||||||
assertEquals("root.admins.alice AM share", "0.15",
|
assertEquals("root.admins.alice AM share", 0.15f,
|
||||||
csConfig.get(PREFIX +
|
csConfig.getMaximumApplicationMasterResourcePerQueuePercent(
|
||||||
"root.admins.alice.maximum-am-resource-percent"));
|
"root.admins.alice"), 0.0f);
|
||||||
|
|
||||||
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root.admins.bob", "root.admins.alice"));
|
Sets.newHashSet("root.admins.bob", "root.admins.alice"));
|
||||||
assertNoValueForQueues(remaining, ".maximum-am-resource-percent",
|
|
||||||
csConfig);
|
for (String queue : remaining) {
|
||||||
|
key = PREFIX + queue + ".maximum-am-resource-percent";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
0.1f, csConfig
|
||||||
|
.getMaximumApplicationMasterResourcePerQueuePercent(queue), 0.0f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -202,12 +219,17 @@ public void testQueueMaxParallelApps() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertEquals("root.admins.alice max apps", 2,
|
assertEquals("root.admins.alice max apps", 2,
|
||||||
csConfig.getInt(PREFIX + "root.admins.alice.max-parallel-apps",
|
csConfig.getMaxParallelAppsForQueue("root.admins.alice"), 0);
|
||||||
-1));
|
|
||||||
|
|
||||||
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root.admins.alice"));
|
Sets.newHashSet("root.admins.alice"));
|
||||||
assertNoValueForQueues(remaining, ".max-parallel-apps", csConfig);
|
|
||||||
|
for (String queue : remaining) {
|
||||||
|
key = PREFIX + queue + ".max-parallel-apps";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
DEFAULT_MAX_PARALLEL_APPLICATIONS, csConfig
|
||||||
|
.getMaxParallelAppsForQueue(queue), 0);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -218,21 +240,30 @@ public void testQueueMaxAllocations() {
|
|||||||
|
|
||||||
// root.admins vcores + mb
|
// root.admins vcores + mb
|
||||||
assertEquals("root.admins max vcores", 3,
|
assertEquals("root.admins max vcores", 3,
|
||||||
csConfig.getInt(PREFIX + "root.admins.maximum-allocation-vcores", -1));
|
csConfig.getQueueMaximumAllocationVcores("root.admins"));
|
||||||
assertEquals("root.admins max memory", 4096,
|
assertEquals("root.admins max memory", 4096,
|
||||||
csConfig.getInt(PREFIX + "root.admins.maximum-allocation-mb", -1));
|
csConfig.getQueueMaximumAllocationMb("root.admins"));
|
||||||
|
|
||||||
// root.users.john max vcores + mb
|
// root.users.john max vcores + mb
|
||||||
assertEquals("root.users.john max vcores", 2,
|
assertEquals("root.users.john max vcores", 2,
|
||||||
csConfig.getInt(PREFIX + "root.users.john.maximum-allocation-vcores",
|
csConfig.getQueueMaximumAllocationVcores("root.users.john"));
|
||||||
-1));
|
|
||||||
assertEquals("root.users.john max memory", 8192,
|
assertEquals("root.users.john max memory", 8192,
|
||||||
csConfig.getInt(PREFIX + "root.users.john.maximum-allocation-mb", -1));
|
csConfig.getQueueMaximumAllocationMb("root.users.john"));
|
||||||
|
|
||||||
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root.admins", "root.users.john"));
|
Sets.newHashSet("root.admins", "root.users.john"));
|
||||||
assertNoValueForQueues(remaining, ".maximum-allocation-vcores", csConfig);
|
|
||||||
assertNoValueForQueues(remaining, ".maximum-allocation-mb", csConfig);
|
for (String queue : remaining) {
|
||||||
|
key = PREFIX + queue + ".maximum-allocation-vcores";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
-1.0, csConfig
|
||||||
|
.getQueueMaximumAllocationVcores(queue), 0.0f);
|
||||||
|
|
||||||
|
key = PREFIX + queue + ".maximum-allocation-mb";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
-1.0, csConfig
|
||||||
|
.getQueueMaximumAllocationMb(queue), 0.0f);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -242,15 +273,20 @@ public void testQueuePreemptionDisabled() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertTrue("root.admins.alice preemption setting",
|
assertTrue("root.admins.alice preemption setting",
|
||||||
csConfig.getBoolean(PREFIX + "root.admins.alice.disable_preemption",
|
csConfig.getPreemptionDisabled(
|
||||||
false));
|
"root.admins.alice", false));
|
||||||
assertTrue("root.users.joe preemption setting",
|
assertTrue("root.users.joe preemption setting",
|
||||||
csConfig.getBoolean(PREFIX + "root.users.joe.disable_preemption",
|
csConfig.getPreemptionDisabled(
|
||||||
false));
|
"root.users.joe", false));
|
||||||
|
|
||||||
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
Set<String> remaining = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root.admins.alice", "root.users.joe"));
|
Sets.newHashSet("root.admins.alice", "root.users.joe"));
|
||||||
assertNoValueForQueues(remaining, ".disable_preemption", csConfig);
|
|
||||||
|
for (String queue : remaining) {
|
||||||
|
key = PREFIX + queue + ".disable_preemption";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
false, csConfig.getPreemptionDisabled(queue, false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -259,7 +295,11 @@ public void testQueuePreemptionDisabledWhenGlobalPreemptionDisabled() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertNoValueForQueues(ALL_QUEUES, ".disable_preemption", csConfig);
|
for (String queue : ALL_QUEUES) {
|
||||||
|
key = PREFIX + queue + ".disable_preemption";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
false, csConfig.getPreemptionDisabled(queue, false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -269,32 +309,42 @@ public void testChildCapacityInCapacityMode() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root
|
// root
|
||||||
assertEquals("root.default capacity", "33.333",
|
assertEquals("root.default capacity", 33.333f,
|
||||||
csConfig.get(PREFIX + "root.default.capacity"));
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
assertEquals("root.admins capacity", "33.333",
|
new QueuePath("root.default")), 0.0f);
|
||||||
csConfig.get(PREFIX + "root.admins.capacity"));
|
assertEquals("root.admins capacity", 33.333f,
|
||||||
assertEquals("root.users capacity", "33.334",
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
csConfig.get(PREFIX + "root.users.capacity"));
|
new QueuePath("root.admins")), 0.0f);
|
||||||
|
assertEquals("root.users capacity", 33.334f,
|
||||||
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
|
new QueuePath("root.users")), 0.0f);
|
||||||
|
|
||||||
// root.users
|
// root.users
|
||||||
assertEquals("root.users.john capacity", "25.000",
|
assertEquals("root.users.john capacity", 25.000f,
|
||||||
csConfig.get(PREFIX + "root.users.john.capacity"));
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
assertEquals("root.users.joe capacity", "75.000",
|
new QueuePath("root.users.john")), 0.0f);
|
||||||
csConfig.get(PREFIX + "root.users.joe.capacity"));
|
assertEquals("root.users.joe capacity", 75.000f,
|
||||||
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
|
new QueuePath("root.users.joe")), 0.0f);
|
||||||
|
|
||||||
// root.admins
|
// root.admins
|
||||||
assertEquals("root.admins.alice capacity", "75.000",
|
assertEquals("root.admins.alice capacity", 75.000f,
|
||||||
csConfig.get(PREFIX + "root.admins.alice.capacity"));
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
assertEquals("root.admins.bob capacity", "25.000",
|
new QueuePath("root.admins.alice")), 0.0f);
|
||||||
csConfig.get(PREFIX + "root.admins.bob.capacity"));
|
assertEquals("root.admins.bob capacity", 25.000f,
|
||||||
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
|
new QueuePath("root.admins.bob")), 0.0f);
|
||||||
|
|
||||||
// root.misc
|
// root.misc
|
||||||
assertEquals("root.misc capacity", "0.000",
|
assertEquals("root.misc capacity", 0.000f,
|
||||||
csConfig.get(PREFIX + "root.misc.capacity"));
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
assertEquals("root.misc.a capacity", "0.000",
|
new QueuePath("root.misc")), 0.000f);
|
||||||
csConfig.get(PREFIX + "root.misc.a.capacity"));
|
assertEquals("root.misc.a capacity", 0.000f,
|
||||||
assertEquals("root.misc.b capacity", "0.000",
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
csConfig.get(PREFIX + "root.misc.b.capacity"));
|
new QueuePath("root.misc.a")), 0.000f);
|
||||||
|
assertEquals("root.misc.b capacity", 0.000f,
|
||||||
|
csConfig.getNonLabeledQueueCapacity(
|
||||||
|
new QueuePath("root.misc.b")), 0.000f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -304,32 +354,32 @@ public void testChildCapacityInWeightMode() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root
|
// root
|
||||||
assertEquals("root.default weight", "1.0w",
|
assertEquals("root.default weight", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.default.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.default"), 0.01f);
|
||||||
assertEquals("root.admins weight", "1.0w",
|
assertEquals("root.admins weight", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.admins.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.admins"), 0.01f);
|
||||||
assertEquals("root.users weight", "1.0w",
|
assertEquals("root.users weight", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.users.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.users"), 0.01f);
|
||||||
|
|
||||||
// root.users
|
// root.users
|
||||||
assertEquals("root.users.john weight", "1.0w",
|
assertEquals("root.users.john weight", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.users.john.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.users.john"), 0.01f);
|
||||||
assertEquals("root.users.joe weight", "3.0w",
|
assertEquals("root.users.joe weight", 3.0f,
|
||||||
csConfig.get(PREFIX + "root.users.joe.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.users.joe"), 0.01f);
|
||||||
|
|
||||||
// root.admins
|
// root.admins
|
||||||
assertEquals("root.admins.alice weight", "3.0w",
|
assertEquals("root.admins.alice weight", 3.0f,
|
||||||
csConfig.get(PREFIX + "root.admins.alice.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.admins.alice"), 0.01f);
|
||||||
assertEquals("root.admins.bob weight", "1.0w",
|
assertEquals("root.admins.bob weight", 1.0f,
|
||||||
csConfig.get(PREFIX + "root.admins.bob.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.admins.bob"), 0.01f);
|
||||||
|
|
||||||
// root.misc
|
// root.misc
|
||||||
assertEquals("root.misc weight", "0.0w",
|
assertEquals("root.misc weight", 0.0f,
|
||||||
csConfig.get(PREFIX + "root.misc.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.misc"), 0.00f);
|
||||||
assertEquals("root.misc.a weight", "0.0w",
|
assertEquals("root.misc.a weight", 0.0f,
|
||||||
csConfig.get(PREFIX + "root.misc.a.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.misc.a"), 0.00f);
|
||||||
assertEquals("root.misc.b weight", "0.0w",
|
assertEquals("root.misc.b weight", 0.0f,
|
||||||
csConfig.get(PREFIX + "root.misc.b.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.misc.b"), 0.00f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -339,21 +389,15 @@ public void testAutoCreateV2FlagsInWeightMode() {
|
|||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertTrue("root autocreate v2 flag",
|
assertTrue("root autocreate v2 flag",
|
||||||
csConfig.getBoolean(
|
csConfig.isAutoQueueCreationV2Enabled("root"));
|
||||||
PREFIX + "root.auto-queue-creation-v2.enabled", false));
|
|
||||||
assertTrue("root.admins autocreate v2 flag",
|
assertTrue("root.admins autocreate v2 flag",
|
||||||
csConfig.getBoolean(
|
csConfig.isAutoQueueCreationV2Enabled("root.admins"));
|
||||||
PREFIX + "root.admins.auto-queue-creation-v2.enabled", false));
|
|
||||||
assertTrue("root.admins.alice autocreate v2 flag",
|
assertTrue("root.admins.alice autocreate v2 flag",
|
||||||
csConfig.getBoolean(
|
csConfig.isAutoQueueCreationV2Enabled("root.admins.alice"));
|
||||||
PREFIX + "root.admins.alice.auto-queue-creation-v2.enabled",
|
|
||||||
false));
|
|
||||||
assertTrue("root.users autocreate v2 flag",
|
assertTrue("root.users autocreate v2 flag",
|
||||||
csConfig.getBoolean(
|
csConfig.isAutoQueueCreationV2Enabled("root.users"));
|
||||||
PREFIX + "root.users.auto-queue-creation-v2.enabled", false));
|
|
||||||
assertTrue("root.misc autocreate v2 flag",
|
assertTrue("root.misc autocreate v2 flag",
|
||||||
csConfig.getBoolean(
|
csConfig.isAutoQueueCreationV2Enabled("root.misc"));
|
||||||
PREFIX + "root.misc.auto-queue-creation-v2.enabled", false));
|
|
||||||
|
|
||||||
//leaf queue root.admins.alice is removed from the below list
|
//leaf queue root.admins.alice is removed from the below list
|
||||||
//adding reservation to a leaf, it's queueType changes to FSParentQueue
|
//adding reservation to a leaf, it's queueType changes to FSParentQueue
|
||||||
@ -363,8 +407,14 @@ public void testAutoCreateV2FlagsInWeightMode() {
|
|||||||
"root.users",
|
"root.users",
|
||||||
"root.misc",
|
"root.misc",
|
||||||
"root.admins.alice"));
|
"root.admins.alice"));
|
||||||
assertNoValueForQueues(leafs, ".auto-queue-creation-v2.enabled",
|
|
||||||
csConfig);
|
for (String queue : leafs) {
|
||||||
|
key = PREFIX + queue + ".auto-queue-creation-v2.enabled";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
false, csConfig
|
||||||
|
.isAutoQueueCreationV2Enabled(queue));
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -375,11 +425,16 @@ public void testZeroSumCapacityValidation() {
|
|||||||
|
|
||||||
Set<String> noZeroSumAllowedQueues = Sets.difference(ALL_QUEUES,
|
Set<String> noZeroSumAllowedQueues = Sets.difference(ALL_QUEUES,
|
||||||
Sets.newHashSet("root.misc"));
|
Sets.newHashSet("root.misc"));
|
||||||
assertNoValueForQueues(noZeroSumAllowedQueues, ".allow-zero-capacity-sum",
|
|
||||||
csConfig);
|
|
||||||
|
|
||||||
assertTrue("root.misc allow zero capacities", csConfig.getBoolean(
|
for (String queue : noZeroSumAllowedQueues) {
|
||||||
PREFIX + "root.misc.allow-zero-capacity-sum", false));
|
key = PREFIX + queue + ".allow-zero-capacity-sum";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
false, csConfig
|
||||||
|
.getAllowZeroCapacitySum(queue));
|
||||||
|
}
|
||||||
|
|
||||||
|
assertTrue("root.misc allow zero capacities",
|
||||||
|
csConfig.getAllowZeroCapacitySum("root.misc"));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -388,7 +443,12 @@ public void testQueueMaximumCapacity() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertValueForQueues(ALL_QUEUES, ".maximum-capacity", csConfig, "100");
|
for (String queue : ALL_QUEUES) {
|
||||||
|
key = PREFIX + queue + ".maximum-capacity";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
100.0, csConfig
|
||||||
|
.getNonLabeledQueueMaximumCapacity(new QueuePath(queue)), 0.0f);
|
||||||
|
}
|
||||||
verify(ruleHandler, times(3)).handleMaxResources();
|
verify(ruleHandler, times(3)).handleMaxResources();
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -409,8 +469,11 @@ public void testQueueWithNoAutoCreateChildQueue() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertNoValueForQueues(ALL_QUEUES, ".auto-create-child-queue.enabled",
|
for (String queue : ALL_QUEUES) {
|
||||||
csConfig);
|
key = PREFIX + queue + ".auto-create-child-queue.enabled";
|
||||||
|
assertEquals("Key " + key + " has different value",
|
||||||
|
false, csConfig.isAutoCreateChildQueueEnabled(queue));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -419,8 +482,11 @@ public void testQueueSizeBasedWeightEnabled() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertTrueForQueues(ALL_QUEUES,
|
for (String queue : ALL_QUEUES) {
|
||||||
".ordering-policy.fair.enable-size-based-weight", csConfig);
|
key = PREFIX + queue + ".ordering-policy.fair.enable-size-based-weight";
|
||||||
|
assertTrue("Key " + key + " has different value",
|
||||||
|
csConfig.getBoolean(key, false));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -429,8 +495,11 @@ public void testQueueSizeBasedWeightDisabled() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
assertNoValueForQueues(ALL_QUEUES,
|
for (String queue : ALL_QUEUES) {
|
||||||
".ordering-policy.fair.enable-size-based-weight", csConfig);
|
key = PREFIX + queue + ".ordering-policy.fair.enable-size-based-weight";
|
||||||
|
assertNull("Key " + key + " has different value",
|
||||||
|
csConfig.get(key));
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -446,28 +515,27 @@ public void testQueueOrderingPolicy() throws Exception {
|
|||||||
rootQueue = fs.getQueueManager().getRootQueue();
|
rootQueue = fs.getQueueManager().getRootQueue();
|
||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
|
|
||||||
// root
|
// root
|
||||||
assertEquals("root ordering policy", null,
|
assertEquals("root ordering policy", "fifo",
|
||||||
csConfig.get(PREFIX + "root.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root").getConfigName());
|
||||||
assertEquals("root.default ordering policy", "fair",
|
assertEquals("root.default ordering policy", "fair",
|
||||||
csConfig.get(PREFIX + "root.default.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.default").getConfigName());
|
||||||
assertEquals("root.admins ordering policy", null,
|
assertEquals("root.admins ordering policy", "fifo",
|
||||||
csConfig.get(PREFIX + "root.admins.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.admins").getConfigName());
|
||||||
assertEquals("root.users ordering policy", null,
|
assertEquals("root.users ordering policy", "fifo",
|
||||||
csConfig.get(PREFIX + "root.users.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.users").getConfigName());
|
||||||
|
|
||||||
// root.users
|
// root.users
|
||||||
assertEquals("root.users.joe ordering policy", "fair",
|
assertEquals("root.users.joe ordering policy", "fair",
|
||||||
csConfig.get(PREFIX + "root.users.joe.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.users.joe").getConfigName());
|
||||||
assertEquals("root.users.john ordering policy", "fifo",
|
assertEquals("root.users.john ordering policy", "fifo",
|
||||||
csConfig.get(PREFIX + "root.users.john.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.users.john").getConfigName());
|
||||||
|
|
||||||
// root.admins
|
// root.admins
|
||||||
assertEquals("root.admins.alice ordering policy", "fifo",
|
assertEquals("root.admins.alice ordering policy", "fifo",
|
||||||
csConfig.get(PREFIX + "root.admins.alice.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.admins.alice.").getConfigName());
|
||||||
assertEquals("root.admins.bob ordering policy", "fair",
|
assertEquals("root.admins.bob ordering policy", "fair",
|
||||||
csConfig.get(PREFIX + "root.admins.bob.ordering-policy"));
|
csConfig.getAppOrderingPolicy("root.admins.bob").getConfigName());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
@ -512,31 +580,4 @@ public void testReservationSystemNotSupported() {
|
|||||||
|
|
||||||
converter.convertQueueHierarchy(rootQueue);
|
converter.convertQueueHierarchy(rootQueue);
|
||||||
}
|
}
|
||||||
|
|
||||||
private void assertNoValueForQueues(Set<String> queues, String postfix,
|
|
||||||
Configuration config) {
|
|
||||||
for (String queue : queues) {
|
|
||||||
String key = PREFIX + queue + postfix;
|
|
||||||
assertNull("Key " + key + " has value, but it should be null",
|
|
||||||
config.get(key));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertValueForQueues(Set<String> queues, String postfix,
|
|
||||||
Configuration config, String expectedValue) {
|
|
||||||
for (String queue : queues) {
|
|
||||||
String key = PREFIX + queue + postfix;
|
|
||||||
assertEquals("Key " + key + " has different value",
|
|
||||||
expectedValue, config.get(key));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
|
|
||||||
private void assertTrueForQueues(Set<String> queues, String postfix,
|
|
||||||
Configuration config) {
|
|
||||||
for (String queue : queues) {
|
|
||||||
String key = PREFIX + queue + postfix;
|
|
||||||
assertTrue("Key " + key + " is false, should be true",
|
|
||||||
config.getBoolean(key, false));
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
}
|
@ -28,6 +28,8 @@
|
|||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueuePath;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
@ -35,104 +37,102 @@
|
|||||||
public class TestWeightToPercentageConverter
|
public class TestWeightToPercentageConverter
|
||||||
extends WeightConverterTestBase {
|
extends WeightConverterTestBase {
|
||||||
private WeightToPercentConverter converter;
|
private WeightToPercentConverter converter;
|
||||||
private Configuration config;
|
private CapacitySchedulerConfiguration csConfig;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
converter = new WeightToPercentConverter();
|
converter = new WeightToPercentConverter();
|
||||||
config = new Configuration(false);
|
csConfig = new CapacitySchedulerConfiguration(
|
||||||
|
new Configuration(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleWeightConversion() {
|
public void testSingleWeightConversion() {
|
||||||
FSQueue root = createFSQueues(1);
|
FSQueue root = createFSQueues(1);
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertFalse("Capacity zerosum allowed",
|
assertFalse("Capacity zerosum allowed",
|
||||||
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
|
csConfig.getAllowZeroCapacitySum("root"));
|
||||||
false));
|
assertEquals("root.a capacity", 100.000f,
|
||||||
assertEquals("root.a capacity", "100.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.0f);
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoChildQueueConversion() {
|
public void testNoChildQueueConversion() {
|
||||||
FSQueue root = createFSQueues();
|
FSQueue root = createFSQueues();
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Converted items", 0,
|
assertEquals("Converted items", 19,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiWeightConversion() {
|
public void testMultiWeightConversion() {
|
||||||
FSQueue root = createFSQueues(1, 2, 3);
|
FSQueue root = createFSQueues(1, 2, 3);
|
||||||
|
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Number of properties", 3,
|
assertEquals("Number of properties", 22,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
// this is no fixing - it's the result of BigDecimal rounding
|
// this is no fixing - it's the result of BigDecimal rounding
|
||||||
assertEquals("root.a capacity", "16.667",
|
assertEquals("root.a capacity", 16.667f,
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.0f);
|
||||||
assertEquals("root.b capacity", "33.333",
|
assertEquals("root.b capacity", 33.333f,
|
||||||
config.get(PREFIX + "root.b.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.0f);
|
||||||
assertEquals("root.c capacity", "50.000",
|
assertEquals("root.c capacity", 50.000f,
|
||||||
config.get(PREFIX + "root.c.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.c")), 0.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiWeightConversionWhenOfThemIsZero() {
|
public void testMultiWeightConversionWhenOfThemIsZero() {
|
||||||
FSQueue root = createFSQueues(0, 1, 1);
|
FSQueue root = createFSQueues(0, 1, 1);
|
||||||
|
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertFalse("Capacity zerosum allowed",
|
assertFalse("Capacity zerosum allowed",
|
||||||
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
|
csConfig.getAllowZeroCapacitySum("root"));
|
||||||
false));
|
assertEquals("Number of properties", 22,
|
||||||
assertEquals("Number of properties", 3,
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
assertEquals("root.a capacity", 0.000f,
|
||||||
assertEquals("root.a capacity", "0.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.0f);
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
assertEquals("root.b capacity", 50.000f,
|
||||||
assertEquals("root.b capacity", "50.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.0f);
|
||||||
config.get(PREFIX + "root.b.capacity"));
|
assertEquals("root.c capacity", 50.000f,
|
||||||
assertEquals("root.c capacity", "50.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.c")), 0.0f);
|
||||||
config.get(PREFIX + "root.c.capacity"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiWeightConversionWhenAllOfThemAreZero() {
|
public void testMultiWeightConversionWhenAllOfThemAreZero() {
|
||||||
FSQueue root = createFSQueues(0, 0, 0);
|
FSQueue root = createFSQueues(0, 0, 0);
|
||||||
|
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Number of properties", 4,
|
assertEquals("Number of properties", 23,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
assertTrue("Capacity zerosum allowed",
|
assertTrue("Capacity zerosum allowed",
|
||||||
config.getBoolean(PREFIX + "root.allow-zero-capacity-sum",
|
csConfig.getAllowZeroCapacitySum("root"));
|
||||||
false));
|
assertEquals("root.a capacity", 0.000f,
|
||||||
assertEquals("root.a capacity", "0.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.0f);
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
assertEquals("root.b capacity", 0.000f,
|
||||||
assertEquals("root.b capacity", "0.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.0f);
|
||||||
config.get(PREFIX + "root.b.capacity"));
|
assertEquals("root.c capacity", 0.000f,
|
||||||
assertEquals("root.c capacity", "0.000",
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.c")), 0.0f);
|
||||||
config.get(PREFIX + "root.c.capacity"));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testCapacityFixingWithThreeQueues() {
|
public void testCapacityFixingWithThreeQueues() {
|
||||||
FSQueue root = createFSQueues(1, 1, 1);
|
FSQueue root = createFSQueues(1, 1, 1);
|
||||||
|
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Number of properties", 3,
|
assertEquals("Number of properties", 22,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
assertEquals("root.a capacity", "33.334",
|
assertEquals("root.a capacity", 33.334f,
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.a")), 0.0f);
|
||||||
assertEquals("root.b capacity", "33.333",
|
assertEquals("root.b capacity", 33.333f,
|
||||||
config.get(PREFIX + "root.b.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.b")), 0.0f);
|
||||||
assertEquals("root.c capacity", "33.333",
|
assertEquals("root.c capacity", 33.333f,
|
||||||
config.get(PREFIX + "root.c.capacity"));
|
csConfig.getNonLabeledQueueCapacity(new QueuePath("root.c")), 0.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
|
@ -25,81 +25,81 @@
|
|||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
|
|
||||||
import org.apache.hadoop.conf.Configuration;
|
import org.apache.hadoop.conf.Configuration;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FSQueue;
|
||||||
import org.junit.Before;
|
import org.junit.Before;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
|
||||||
public class TestWeightToWeightConverter extends WeightConverterTestBase {
|
public class TestWeightToWeightConverter extends WeightConverterTestBase {
|
||||||
private WeightToWeightConverter converter;
|
private WeightToWeightConverter converter;
|
||||||
private Configuration config;
|
private CapacitySchedulerConfiguration csConfig;
|
||||||
|
|
||||||
@Before
|
@Before
|
||||||
public void setup() {
|
public void setup() {
|
||||||
converter = new WeightToWeightConverter();
|
converter = new WeightToWeightConverter();
|
||||||
config = new Configuration(false);
|
csConfig = new CapacitySchedulerConfiguration(
|
||||||
|
new Configuration(false));
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testNoChildQueueConversion() {
|
public void testNoChildQueueConversion() {
|
||||||
FSQueue root = createFSQueues();
|
FSQueue root = createFSQueues();
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("root weight", "1.0w",
|
assertEquals("root weight", 1.0f,
|
||||||
config.get(PREFIX + "root.capacity"));
|
csConfig.getNonLabeledQueueWeight("root"), 0.0f);
|
||||||
assertEquals("Converted items", 2,
|
assertEquals("Converted items", 21,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testSingleWeightConversion() {
|
public void testSingleWeightConversion() {
|
||||||
FSQueue root = createFSQueues(1);
|
FSQueue root = createFSQueues(1);
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("root weight", "1.0w",
|
assertEquals("root weight", 1.0f,
|
||||||
config.get(PREFIX + "root.capacity"));
|
csConfig.getNonLabeledQueueWeight("root"), 0.0f);
|
||||||
assertEquals("root.a weight", "1.0w",
|
assertEquals("root.a weight", 1.0f,
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.a"), 0.0f);
|
||||||
assertEquals("Number of properties", 3,
|
assertEquals("Number of properties", 22,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testMultiWeightConversion() {
|
public void testMultiWeightConversion() {
|
||||||
FSQueue root = createFSQueues(1, 2, 3);
|
FSQueue root = createFSQueues(1, 2, 3);
|
||||||
|
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Number of properties", 5,
|
assertEquals("Number of properties", 24,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
assertEquals("root weight", "1.0w",
|
assertEquals("root weight", 1.0f,
|
||||||
config.get(PREFIX + "root.capacity"));
|
csConfig.getNonLabeledQueueWeight("root"), 0.0f);
|
||||||
assertEquals("root.a weight", "1.0w",
|
assertEquals("root.a weight", 1.0f,
|
||||||
config.get(PREFIX + "root.a.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.a"), 0.0f);
|
||||||
assertEquals("root.b weight", "2.0w",
|
assertEquals("root.b weight", 2.0f,
|
||||||
config.get(PREFIX + "root.b.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.b"), 0.0f);
|
||||||
assertEquals("root.c weight", "3.0w",
|
assertEquals("root.c weight", 3.0f,
|
||||||
config.get(PREFIX + "root.c.capacity"));
|
csConfig.getNonLabeledQueueWeight("root.c"), 0.0f);
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateV2FlagOnParent() {
|
public void testAutoCreateV2FlagOnParent() {
|
||||||
FSQueue root = createFSQueues(1);
|
FSQueue root = createFSQueues(1);
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertTrue("root autocreate v2 enabled",
|
assertTrue("root autocreate v2 enabled",
|
||||||
config.getBoolean(PREFIX + "root.auto-queue-creation-v2.enabled",
|
csConfig.isAutoQueueCreationV2Enabled("root"));
|
||||||
false));
|
|
||||||
}
|
}
|
||||||
|
|
||||||
@Test
|
@Test
|
||||||
public void testAutoCreateV2FlagOnParentWithoutChildren() {
|
public void testAutoCreateV2FlagOnParentWithoutChildren() {
|
||||||
FSQueue root = createParent(new ArrayList<>());
|
FSQueue root = createParent(new ArrayList<>());
|
||||||
converter.convertWeightsForChildQueues(root, config);
|
converter.convertWeightsForChildQueues(root, csConfig);
|
||||||
|
|
||||||
assertEquals("Number of properties", 2,
|
assertEquals("Number of properties", 21,
|
||||||
config.getPropsWithPrefix(PREFIX).size());
|
csConfig.getPropsWithPrefix(PREFIX).size());
|
||||||
assertTrue("root autocreate v2 enabled",
|
assertTrue("root autocreate v2 enabled",
|
||||||
config.getBoolean(PREFIX + "root.auto-queue-creation-v2.enabled",
|
csConfig.isAutoQueueCreationV2Enabled("root"));
|
||||||
false));
|
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user