YARN-8715. Make allocation tags in the placement spec optional for node-attributes. Contributed by Weiwei Yang.
This commit is contained in:
parent
95231f1749
commit
33d8327cff
@ -17,6 +17,7 @@
|
|||||||
*/
|
*/
|
||||||
package org.apache.hadoop.yarn.util.constraint;
|
package org.apache.hadoop.yarn.util.constraint;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import org.apache.hadoop.classification.InterfaceAudience;
|
import org.apache.hadoop.classification.InterfaceAudience;
|
||||||
import org.apache.hadoop.classification.InterfaceStability;
|
import org.apache.hadoop.classification.InterfaceStability;
|
||||||
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
|
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
|
||||||
@ -589,6 +590,14 @@ private SourceTags(String sourceTag, int number) {
|
|||||||
this.num = number;
|
this.num = number;
|
||||||
}
|
}
|
||||||
|
|
||||||
|
public static SourceTags emptySourceTags() {
|
||||||
|
return new SourceTags("", 0);
|
||||||
|
}
|
||||||
|
|
||||||
|
public boolean isEmpty() {
|
||||||
|
return Strings.isNullOrEmpty(tag) && num == 0;
|
||||||
|
}
|
||||||
|
|
||||||
public String getTag() {
|
public String getTag() {
|
||||||
return this.tag;
|
return this.tag;
|
||||||
}
|
}
|
||||||
@ -692,20 +701,47 @@ public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
|
|||||||
// foo=4,Pn
|
// foo=4,Pn
|
||||||
String[] splitted = specStr.split(
|
String[] splitted = specStr.split(
|
||||||
String.valueOf(EXPRESSION_VAL_DELIM), 2);
|
String.valueOf(EXPRESSION_VAL_DELIM), 2);
|
||||||
if (splitted.length != 2) {
|
final SourceTags st;
|
||||||
|
final String exprs;
|
||||||
|
if (splitted.length == 1) {
|
||||||
|
// source tags not specified
|
||||||
|
exprs = splitted[0];
|
||||||
|
st = SourceTags.emptySourceTags();
|
||||||
|
} else if (splitted.length == 2) {
|
||||||
|
exprs = splitted[1];
|
||||||
|
String tagAlloc = splitted[0];
|
||||||
|
st = SourceTags.parseFrom(tagAlloc);
|
||||||
|
} else {
|
||||||
throw new PlacementConstraintParseException(
|
throw new PlacementConstraintParseException(
|
||||||
"Unexpected placement constraint expression " + specStr);
|
"Unexpected placement constraint expression " + specStr);
|
||||||
}
|
}
|
||||||
|
|
||||||
String tagAlloc = splitted[0];
|
|
||||||
SourceTags st = SourceTags.parseFrom(tagAlloc);
|
|
||||||
String exprs = splitted[1];
|
|
||||||
AbstractConstraint constraint =
|
AbstractConstraint constraint =
|
||||||
PlacementConstraintParser.parseExpression(exprs);
|
PlacementConstraintParser.parseExpression(exprs);
|
||||||
|
|
||||||
result.put(st, constraint.build());
|
result.put(st, constraint.build());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Validation
|
||||||
|
Set<SourceTags> sourceTagSet = result.keySet();
|
||||||
|
if (sourceTagSet.stream()
|
||||||
|
.filter(sourceTags -> sourceTags.isEmpty())
|
||||||
|
.findAny()
|
||||||
|
.isPresent()) {
|
||||||
|
// Source tags, e.g foo=3, is optional for a node-attribute constraint,
|
||||||
|
// but when source tags is absent, the parser only accept single
|
||||||
|
// constraint expression to avoid ambiguous semantic. This is because
|
||||||
|
// DS AM is requesting number of containers per the number specified
|
||||||
|
// in the source tags, we do overwrite when there is no source tags
|
||||||
|
// with num_containers argument from commandline. If that is partially
|
||||||
|
// missed in the constraints, we don't know if it is ought to
|
||||||
|
// overwritten or not.
|
||||||
|
if (result.size() != 1) {
|
||||||
|
throw new PlacementConstraintParseException(
|
||||||
|
"Source allocation tags is required for a multi placement"
|
||||||
|
+ " constraint expression.");
|
||||||
|
}
|
||||||
|
}
|
||||||
return result;
|
return result;
|
||||||
}
|
}
|
||||||
}
|
}
|
@ -501,5 +501,27 @@ public void testParseNodeAttributeSpec()
|
|||||||
actualPc2 = valueIt.next();
|
actualPc2 = valueIt.next();
|
||||||
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
|
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
|
||||||
Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
|
Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr());
|
||||||
|
|
||||||
|
// A single node attribute constraint w/o source tags
|
||||||
|
result = PlacementConstraintParser
|
||||||
|
.parsePlacementSpec("rm.yarn.io/foo=true");
|
||||||
|
Assert.assertEquals(1, result.size());
|
||||||
|
target = PlacementTargets.nodeAttribute("rm.yarn.io/foo", "true");
|
||||||
|
expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
|
||||||
|
|
||||||
|
SourceTags actualSourceTags = result.keySet().iterator().next();
|
||||||
|
Assert.assertTrue(actualSourceTags.isEmpty());
|
||||||
|
actualPc1 = result.values().iterator().next();
|
||||||
|
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
|
||||||
|
|
||||||
|
// If source tags is not specified for a node-attribute constraint,
|
||||||
|
// then this expression must be single constraint expression.
|
||||||
|
try {
|
||||||
|
PlacementConstraintParser
|
||||||
|
.parsePlacementSpec("rm.yarn.io/foo=true:xyz=1,notin,node,xyz");
|
||||||
|
Assert.fail("Expected a failure!");
|
||||||
|
} catch (Exception e) {
|
||||||
|
Assert.assertTrue(e instanceof PlacementConstraintParseException);
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
@ -47,6 +47,7 @@
|
|||||||
import java.util.Arrays;
|
import java.util.Arrays;
|
||||||
import java.util.concurrent.atomic.AtomicLong;
|
import java.util.concurrent.atomic.AtomicLong;
|
||||||
|
|
||||||
|
import com.google.common.base.Strings;
|
||||||
import org.apache.commons.cli.CommandLine;
|
import org.apache.commons.cli.CommandLine;
|
||||||
import org.apache.commons.cli.GnuParser;
|
import org.apache.commons.cli.GnuParser;
|
||||||
import org.apache.commons.cli.HelpFormatter;
|
import org.apache.commons.cli.HelpFormatter;
|
||||||
@ -527,7 +528,9 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
LOG.info("Placement Spec received [{}]", decodedSpec);
|
LOG.info("Placement Spec received [{}]", decodedSpec);
|
||||||
|
|
||||||
this.numTotalContainers = 0;
|
this.numTotalContainers = 0;
|
||||||
parsePlacementSpecs(decodedSpec);
|
int globalNumOfContainers = Integer
|
||||||
|
.parseInt(cliParser.getOptionValue("num_containers", "0"));
|
||||||
|
parsePlacementSpecs(decodedSpec, globalNumOfContainers);
|
||||||
LOG.info("Total num containers requested [{}]", numTotalContainers);
|
LOG.info("Total num containers requested [{}]", numTotalContainers);
|
||||||
|
|
||||||
if (numTotalContainers == 0) {
|
if (numTotalContainers == 0) {
|
||||||
@ -698,11 +701,19 @@ public boolean init(String[] args) throws ParseException, IOException {
|
|||||||
return true;
|
return true;
|
||||||
}
|
}
|
||||||
|
|
||||||
private void parsePlacementSpecs(String decodedSpec) {
|
private void parsePlacementSpecs(String decodedSpec,
|
||||||
|
int globalNumOfContainers) {
|
||||||
Map<String, PlacementSpec> pSpecs =
|
Map<String, PlacementSpec> pSpecs =
|
||||||
PlacementSpec.parse(decodedSpec);
|
PlacementSpec.parse(decodedSpec);
|
||||||
this.placementSpecs = new HashMap<>();
|
this.placementSpecs = new HashMap<>();
|
||||||
for (PlacementSpec pSpec : pSpecs.values()) {
|
for (PlacementSpec pSpec : pSpecs.values()) {
|
||||||
|
// Use global num of containers when the spec doesn't specify
|
||||||
|
// source tags. This is allowed when using node-attribute constraints.
|
||||||
|
if (Strings.isNullOrEmpty(pSpec.sourceTag)
|
||||||
|
&& pSpec.getNumContainers() == 0
|
||||||
|
&& globalNumOfContainers > 0) {
|
||||||
|
pSpec.setNumContainers(globalNumOfContainers);
|
||||||
|
}
|
||||||
this.numTotalContainers += pSpec.getNumContainers();
|
this.numTotalContainers += pSpec.getNumContainers();
|
||||||
this.placementSpecs.put(pSpec.sourceTag, pSpec);
|
this.placementSpecs.put(pSpec.sourceTag, pSpec);
|
||||||
}
|
}
|
||||||
@ -799,8 +810,9 @@ public void run() throws YarnException, IOException, InterruptedException {
|
|||||||
placementConstraintMap = new HashMap<>();
|
placementConstraintMap = new HashMap<>();
|
||||||
for (PlacementSpec spec : this.placementSpecs.values()) {
|
for (PlacementSpec spec : this.placementSpecs.values()) {
|
||||||
if (spec.constraint != null) {
|
if (spec.constraint != null) {
|
||||||
placementConstraintMap.put(
|
Set<String> allocationTags = Strings.isNullOrEmpty(spec.sourceTag) ?
|
||||||
Collections.singleton(spec.sourceTag), spec.constraint);
|
Collections.emptySet() : Collections.singleton(spec.sourceTag);
|
||||||
|
placementConstraintMap.put(allocationTags, spec.constraint);
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
Loading…
Reference in New Issue
Block a user