YARN-10457. Add a configuration switch to change between legacy and JSON placement rule format. Contributed by Gergely Pollak
This commit is contained in:
parent
5ff70a59c4
commit
0d3155a687
@ -23,6 +23,7 @@
|
|||||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableSet;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.MappingRule;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils;
|
import org.apache.hadoop.yarn.server.resourcemanager.placement.QueuePlacementRuleUtils;
|
||||||
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.placement.MappingRuleCreator;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
import org.apache.hadoop.classification.InterfaceAudience.Private;
|
||||||
@ -63,6 +64,7 @@
|
|||||||
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
import org.apache.hadoop.yarn.util.resource.ResourceUtils;
|
||||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||||
|
|
||||||
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.Collection;
|
import java.util.Collection;
|
||||||
import java.util.Collections;
|
import java.util.Collections;
|
||||||
@ -391,6 +393,19 @@ public class CapacitySchedulerConfiguration extends ReservationSchedulerConfigur
|
|||||||
"allow-zero-capacity-sum";
|
"allow-zero-capacity-sum";
|
||||||
|
|
||||||
public static final boolean DEFAULT_ALLOW_ZERO_CAPACITY_SUM = false;
|
public static final boolean DEFAULT_ALLOW_ZERO_CAPACITY_SUM = false;
|
||||||
|
public static final String MAPPING_RULE_FORMAT =
|
||||||
|
PREFIX + "mapping-rule-format";
|
||||||
|
public static final String MAPPING_RULE_JSON =
|
||||||
|
PREFIX + "mapping-rule-json";
|
||||||
|
public static final String MAPPING_RULE_JSON_FILE =
|
||||||
|
PREFIX + "mapping-rule-json-file";
|
||||||
|
|
||||||
|
public static final String MAPPING_RULE_FORMAT_LEGACY = "legacy";
|
||||||
|
public static final String MAPPING_RULE_FORMAT_JSON = "json";
|
||||||
|
|
||||||
|
public static final String MAPPING_RULE_FORMAT_DEFAULT =
|
||||||
|
MAPPING_RULE_FORMAT_LEGACY;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Different resource types supported.
|
* Different resource types supported.
|
||||||
*/
|
*/
|
||||||
@ -1168,7 +1183,7 @@ public List<QueueMapping> getQueueMappings() {
|
|||||||
return mappings;
|
return mappings;
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MappingRule> getMappingRules() {
|
public List<MappingRule> parseLegacyMappingRules() {
|
||||||
List<MappingRule> mappings = new ArrayList<MappingRule>();
|
List<MappingRule> mappings = new ArrayList<MappingRule>();
|
||||||
Collection<String> mappingsString =
|
Collection<String> mappingsString =
|
||||||
getTrimmedStringCollection(QUEUE_MAPPING);
|
getTrimmedStringCollection(QUEUE_MAPPING);
|
||||||
@ -1208,6 +1223,53 @@ public List<MappingRule> getMappingRules() {
|
|||||||
return mappings;
|
return mappings;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public List<MappingRule> parseJSONMappingRules() throws IOException {
|
||||||
|
String mappingJson = get(MAPPING_RULE_JSON, "");
|
||||||
|
String mappingJsonFile = get(MAPPING_RULE_JSON_FILE, "");
|
||||||
|
MappingRuleCreator creator = new MappingRuleCreator();
|
||||||
|
|
||||||
|
if (!mappingJson.equals("")) {
|
||||||
|
LOG.info("Reading mapping rules from provided inline JSON '{}'.",
|
||||||
|
mappingJson);
|
||||||
|
try {
|
||||||
|
return creator.getMappingRulesFromString(mappingJson);
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error parsing mapping rule inline JSON.");
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} else if (!mappingJsonFile.equals("")) {
|
||||||
|
LOG.info("Reading mapping rules from JSON file '{}'.",
|
||||||
|
mappingJsonFile);
|
||||||
|
try {
|
||||||
|
return creator.getMappingRulesFromFile(mappingJsonFile.trim());
|
||||||
|
} catch (IOException e) {
|
||||||
|
LOG.error("Error reading or parsing mapping rule JSON file '{}'.",
|
||||||
|
mappingJsonFile);
|
||||||
|
throw e;
|
||||||
|
}
|
||||||
|
} else {
|
||||||
|
LOG.warn("Mapping rule is set to JSON, but no inline JSON nor a JSON " +
|
||||||
|
"file was provided! Starting with no mapping rules!");
|
||||||
|
}
|
||||||
|
|
||||||
|
return new ArrayList<>();
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MappingRule> getMappingRules() throws IOException {
|
||||||
|
String mappingFormat =
|
||||||
|
get(MAPPING_RULE_FORMAT, MAPPING_RULE_FORMAT_DEFAULT);
|
||||||
|
if (mappingFormat.equals(MAPPING_RULE_FORMAT_LEGACY)) {
|
||||||
|
return parseLegacyMappingRules();
|
||||||
|
} else if (mappingFormat.equals(MAPPING_RULE_FORMAT_JSON)) {
|
||||||
|
return parseJSONMappingRules();
|
||||||
|
} else {
|
||||||
|
throw new IllegalArgumentException(
|
||||||
|
"Illegal queue mapping format '" + mappingFormat + "' please use '" +
|
||||||
|
MAPPING_RULE_FORMAT_LEGACY + "' or '" + MAPPING_RULE_FORMAT_JSON +
|
||||||
|
"'");
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
@Private
|
@Private
|
||||||
@VisibleForTesting
|
@VisibleForTesting
|
||||||
public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
|
public void setQueuePlacementRules(Collection<String> queuePlacementRules) {
|
||||||
|
@ -50,9 +50,9 @@ public class MappingRuleCreator {
|
|||||||
private static final String ALL_USER = "*";
|
private static final String ALL_USER = "*";
|
||||||
private static Logger LOG = LoggerFactory.getLogger(MappingRuleCreator.class);
|
private static Logger LOG = LoggerFactory.getLogger(MappingRuleCreator.class);
|
||||||
|
|
||||||
public MappingRulesDescription getMappingRulesFromJson(String jsonPath)
|
public MappingRulesDescription getMappingRulesFromJsonFile(String filePath)
|
||||||
throws IOException {
|
throws IOException {
|
||||||
byte[] fileContents = Files.readAllBytes(Paths.get(jsonPath));
|
byte[] fileContents = Files.readAllBytes(Paths.get(filePath));
|
||||||
return getMappingRulesFromJson(fileContents);
|
return getMappingRulesFromJson(fileContents);
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -62,8 +62,21 @@ MappingRulesDescription getMappingRulesFromJson(byte[] contents)
|
|||||||
return objectMapper.readValue(contents, MappingRulesDescription.class);
|
return objectMapper.readValue(contents, MappingRulesDescription.class);
|
||||||
}
|
}
|
||||||
|
|
||||||
public List<MappingRule> getMappingRules(String jsonPath) throws IOException {
|
MappingRulesDescription getMappingRulesFromJson(String contents)
|
||||||
MappingRulesDescription desc = getMappingRulesFromJson(jsonPath);
|
throws IOException {
|
||||||
|
ObjectMapper objectMapper = new ObjectMapper();
|
||||||
|
return objectMapper.readValue(contents, MappingRulesDescription.class);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MappingRule> getMappingRulesFromFile(String jsonPath)
|
||||||
|
throws IOException {
|
||||||
|
MappingRulesDescription desc = getMappingRulesFromJsonFile(jsonPath);
|
||||||
|
return getMappingRules(desc);
|
||||||
|
}
|
||||||
|
|
||||||
|
public List<MappingRule> getMappingRulesFromString(String json)
|
||||||
|
throws IOException {
|
||||||
|
MappingRulesDescription desc = getMappingRulesFromJson(json);
|
||||||
return getMappingRules(desc);
|
return getMappingRules(desc);
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@ -28,17 +28,26 @@
|
|||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfiguration;
|
||||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueManager;
|
||||||
import org.apache.hadoop.yarn.util.Records;
|
import org.apache.hadoop.yarn.util.Records;
|
||||||
|
import org.junit.Rule;
|
||||||
import org.junit.Test;
|
import org.junit.Test;
|
||||||
|
import org.junit.rules.TemporaryFolder;
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
|
|
||||||
|
import java.io.BufferedWriter;
|
||||||
|
import java.io.File;
|
||||||
|
import java.io.FileWriter;
|
||||||
import java.io.IOException;
|
import java.io.IOException;
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.Set;
|
import java.util.Set;
|
||||||
|
|
||||||
import static junit.framework.TestCase.*;
|
import static junit.framework.TestCase.assertEquals;
|
||||||
|
import static junit.framework.TestCase.assertNotNull;
|
||||||
|
import static junit.framework.TestCase.assertNull;
|
||||||
|
import static junit.framework.TestCase.assertTrue;
|
||||||
|
import static junit.framework.TestCase.fail;
|
||||||
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
import static org.apache.hadoop.yarn.server.resourcemanager.placement.FairQueuePlacementUtils.DOT;
|
||||||
import static org.mockito.ArgumentMatchers.isNull;
|
import static org.mockito.ArgumentMatchers.isNull;
|
||||||
import static org.mockito.Mockito.mock;
|
import static org.mockito.Mockito.mock;
|
||||||
@ -47,6 +56,10 @@
|
|||||||
public class TestCSMappingPlacementRule {
|
public class TestCSMappingPlacementRule {
|
||||||
private static final Logger LOG = LoggerFactory
|
private static final Logger LOG = LoggerFactory
|
||||||
.getLogger(TestCSMappingPlacementRule.class);
|
.getLogger(TestCSMappingPlacementRule.class);
|
||||||
|
|
||||||
|
@Rule
|
||||||
|
public TemporaryFolder folder = new TemporaryFolder();
|
||||||
|
|
||||||
private Map<String, Set<String>> userGroups = ImmutableMap.of(
|
private Map<String, Set<String>> userGroups = ImmutableMap.of(
|
||||||
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
"alice", ImmutableSet.of("p_alice", "user", "developer"),
|
||||||
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
"bob", ImmutableSet.of("p_bob", "user", "developer"),
|
||||||
@ -444,4 +457,105 @@ public void testGroupMatching() throws IOException {
|
|||||||
"developer nor in the p_alice group", engine, app, "charlie",
|
"developer nor in the p_alice group", engine, app, "charlie",
|
||||||
"root.man.user");
|
"root.man.user");
|
||||||
}
|
}
|
||||||
|
|
||||||
|
void assertConfigTestResult(List<MappingRule> rules) {
|
||||||
|
assertEquals("We only specified one rule", 1, rules.size());
|
||||||
|
MappingRule rule = rules.get(0);
|
||||||
|
String ruleStr = rule.toString();
|
||||||
|
assertTrue("Rule's matcher variable should be %user",
|
||||||
|
ruleStr.contains("variable='%user'"));
|
||||||
|
assertTrue("Rule's match value should be bob",
|
||||||
|
ruleStr.contains("value='bob'"));
|
||||||
|
assertTrue("Rule's action should be place to queue", ruleStr.contains(
|
||||||
|
"action=PlaceToQueueAction{queueName='%primary_group'}"));
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testLegacyConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_LEGACY);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.QUEUE_MAPPING,
|
||||||
|
"u:bob:%primary_group");
|
||||||
|
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
assertConfigTestResult(rules);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJSONConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
|
||||||
|
"{\"rules\": [{" +
|
||||||
|
" \"type\": \"user\"," +
|
||||||
|
" \"matches\": \"bob\"," +
|
||||||
|
" \"policy\": \"custom\"," +
|
||||||
|
" \"customPlacement\": \"%primary_group\"," +
|
||||||
|
" \"fallbackResult\":\"skip\"" +
|
||||||
|
"}]}");
|
||||||
|
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
assertConfigTestResult(rules);
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testEmptyJSONConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON, "");
|
||||||
|
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
assertEquals("We expect no rules", 0, rules.size());
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testInvalidJSONConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON,
|
||||||
|
"I'm a bad JSON, since I'm not a JSON.");
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test(expected = IOException.class)
|
||||||
|
public void testMissingJSONFileConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON);
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON_FILE,
|
||||||
|
"/dev/null/nofile");
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
}
|
||||||
|
|
||||||
|
@Test
|
||||||
|
public void testJSONFileConfiguration() throws IOException {
|
||||||
|
CapacitySchedulerConfiguration conf = new CapacitySchedulerConfiguration();
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT,
|
||||||
|
CapacitySchedulerConfiguration.MAPPING_RULE_FORMAT_JSON);
|
||||||
|
|
||||||
|
File jsonFile = folder.newFile("testJSONFileConfiguration.json");
|
||||||
|
|
||||||
|
BufferedWriter writer = new BufferedWriter(new FileWriter(jsonFile));
|
||||||
|
try {
|
||||||
|
writer.write("{\"rules\": [{" +
|
||||||
|
" \"type\": \"user\"," +
|
||||||
|
" \"matches\": \"bob\"," +
|
||||||
|
" \"policy\": \"custom\"," +
|
||||||
|
" \"customPlacement\": \"%primary_group\"," +
|
||||||
|
" \"fallbackResult\":\"skip\"" +
|
||||||
|
"}]}");
|
||||||
|
} finally {
|
||||||
|
writer.close();
|
||||||
|
}
|
||||||
|
|
||||||
|
conf.set(CapacitySchedulerConfiguration.MAPPING_RULE_JSON_FILE,
|
||||||
|
jsonFile.getAbsolutePath());
|
||||||
|
List<MappingRule> rules = conf.getMappingRules();
|
||||||
|
|
||||||
|
assertConfigTestResult(rules);
|
||||||
|
}
|
||||||
}
|
}
|
Loading…
Reference in New Issue
Block a user