YARN-9258. Support to specify allocation tags without constraint in distributed shell CLI. Contributed by Prabhu Joseph.

This commit is contained in:
Weiwei Yang 2019-02-22 00:18:07 +08:00
parent 2bc3cfe28f
commit 6c4ab0312b
5 changed files with 134 additions and 72 deletions

View File

@ -255,7 +255,7 @@ public String nextElement() {
/**
* Tokenizer used to parse allocation tags expression, which should be
* in tag=numOfAllocations syntax.
* in tag(numOfAllocations) syntax.
*/
public static class SourceTagsTokenizer implements ConstraintTokenizer {
@ -264,22 +264,24 @@ public static class SourceTagsTokenizer implements ConstraintTokenizer {
private Iterator<String> iterator;
public SourceTagsTokenizer(String expr) {
this.expression = expr;
st = new StringTokenizer(expr, String.valueOf(KV_SPLIT_DELIM));
st = new StringTokenizer(expr, String.valueOf(BRACKET_START));
}
@Override
public void validate() throws PlacementConstraintParseException {
ArrayList<String> parsedValues = new ArrayList<>();
if (st.countTokens() != 2) {
if (st.countTokens() != 2 || !this.expression.
endsWith(String.valueOf(BRACKET_END))) {
throw new PlacementConstraintParseException(
"Expecting source allocation tag to be specified"
+ " sourceTag=numOfAllocations syntax,"
+ " but met " + expression);
+ " sourceTag(numOfAllocations) syntax,"
+ " but met " + expression);
}
String sourceTag = st.nextToken();
parsedValues.add(sourceTag);
String num = st.nextToken();
String str = st.nextToken();
String num = str.substring(0, str.length() - 1);
try {
Integer.parseInt(num);
parsedValues.add(num);
@ -630,7 +632,7 @@ public int getNumOfAllocations() {
}
/**
* Parses source tags from expression "sourceTags=numOfAllocations".
* Parses source tags from expression "sourceTags(numOfAllocations)".
* @param expr
* @return source tags, see {@link SourceTags}
* @throws PlacementConstraintParseException
@ -690,12 +692,14 @@ public static AbstractConstraint parseExpression(String constraintStr)
* is a composite expression which is composed by multiple sub constraint
* expressions delimited by ":". With following syntax:
*
* <p>Tag1=N1,P1:Tag2=N2,P2:...:TagN=Nn,Pn</p>
* <p>Tag1(N1),P1:Tag2(N2),P2:...:TagN(Nn),Pn</p>
*
* where <b>TagN=Nn</b> is a key value pair to determine the source
* where <b>TagN(Nn)</b> is a key value pair to determine the source
* allocation tag and the number of allocations, such as:
*
* <p>foo=3</p>
* <p>foo(3)</p>
*
* Optional when using NodeAttribute Constraint.
*
* and where <b>Pn</b> can be any form of a valid constraint expression,
* such as:
@ -705,6 +709,13 @@ public static AbstractConstraint parseExpression(String constraintStr)
* <li>notin,node,foo,bar,1,2</li>
* <li>and(notin,node,foo:notin,node,bar)</li>
* </ul>
*
* and NodeAttribute Constraint such as
*
* <ul>
* <li>yarn.rm.io/foo=true</li>
* <li>java=1.7,1.8</li>
* </ul>
* @param expression expression string.
* @return a map of source tags to placement constraint mapping.
* @throws PlacementConstraintParseException
@ -719,30 +730,35 @@ public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
tokenizer.validate();
while (tokenizer.hasMoreElements()) {
String specStr = tokenizer.nextElement();
// each spec starts with sourceAllocationTag=numOfContainers and
// each spec starts with sourceAllocationTag(numOfContainers) and
// followed by a constraint expression.
// foo=4,Pn
String[] splitted = specStr.split(
String.valueOf(EXPRESSION_VAL_DELIM), 2);
// foo(4),Pn
final SourceTags st;
final String exprs;
if (splitted.length == 1) {
// source tags not specified
exprs = splitted[0];
st = SourceTags.emptySourceTags();
} else if (splitted.length == 2) {
exprs = splitted[1];
String tagAlloc = splitted[0];
st = SourceTags.parseFrom(tagAlloc);
PlacementConstraint constraint;
String delimiter = new String(new char[]{'[', BRACKET_END, ']',
EXPRESSION_VAL_DELIM});
String[] splitted = specStr.split(delimiter, 2);
if (splitted.length == 2) {
st = SourceTags.parseFrom(splitted[0] + String.valueOf(BRACKET_END));
constraint = PlacementConstraintParser.parseExpression(splitted[1]).
build();
} else if (splitted.length == 1) {
// Either Node Attribute Constraint or Source Allocation Tag alone
NodeConstraintParser np = new NodeConstraintParser(specStr);
Optional<AbstractConstraint> constraintOptional =
Optional.ofNullable(np.tryParse());
if (constraintOptional.isPresent()) {
st = SourceTags.emptySourceTags();
constraint = constraintOptional.get().build();
} else {
st = SourceTags.parseFrom(specStr);
constraint = null;
}
} else {
throw new PlacementConstraintParseException(
"Unexpected placement constraint expression " + specStr);
}
AbstractConstraint constraint =
PlacementConstraintParser.parseExpression(exprs);
result.put(st, constraint.build());
result.put(st, constraint);
}
// Validation
@ -751,7 +767,7 @@ public static Map<SourceTags, PlacementConstraint> parsePlacementSpec(
.filter(sourceTags -> sourceTags.isEmpty())
.findAny()
.isPresent()) {
// Source tags, e.g foo=3, is optional for a node-attribute constraint,
// Source tags, e.g foo(3), is optional for a node-attribute constraint,
// but when source tags is absent, the parser only accept single
// constraint expression to avoid ambiguous semantic. This is because
// DS AM is requesting number of containers per the number specified

View File

@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.TreeSet;
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
@ -274,15 +275,15 @@ public void testMultipleConstraintsTokenizer()
TokenizerTester mp;
ct = new MultipleConstraintsTokenizer(
"foo=1,A1,A2,A3:bar=2,B1,B2:moo=3,C1,C2");
"foo(1),A1,A2,A3:bar(2),B1,B2:moo(3),C1,C2");
mp = new TokenizerTester(ct,
"foo=1,A1,A2,A3", "bar=2,B1,B2", "moo=3,C1,C2");
"foo(1),A1,A2,A3", "bar(2),B1,B2", "moo(3),C1,C2");
mp.verify();
ct = new MultipleConstraintsTokenizer(
"foo=1,AND(A2:A3):bar=2,OR(B1:AND(B2:B3)):moo=3,C1,C2");
"foo(1),AND(A2:A3):bar(2),OR(B1:AND(B2:B3)):moo(3),C1,C2");
mp = new TokenizerTester(ct,
"foo=1,AND(A2:A3)", "bar=2,OR(B1:AND(B2:B3))", "moo=3,C1,C2");
"foo(1),AND(A2:A3)", "bar(2),OR(B1:AND(B2:B3))", "moo(3),C1,C2");
mp.verify();
ct = new MultipleConstraintsTokenizer("A:B:C");
@ -301,12 +302,12 @@ public void testMultipleConstraintsTokenizer()
mp = new TokenizerTester(ct, "A", "AND(B:OR(C:D))", "E");
mp.verify();
st = new SourceTagsTokenizer("A=4");
st = new SourceTagsTokenizer("A(4)");
mp = new TokenizerTester(st, "A", "4");
mp.verify();
try {
st = new SourceTagsTokenizer("A=B");
st = new SourceTagsTokenizer("A(B)");
mp = new TokenizerTester(st, "A", "B");
mp.verify();
Assert.fail("Expecting a parsing failure");
@ -348,9 +349,20 @@ public void testParsePlacementSpec()
PlacementConstraint actualPc1, actualPc2;
SourceTags tag1, tag2;
// Only Source Tag without constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo(3)");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
Assert.assertEquals(3, tag1.getNumOfAllocations());
expectedPc1 = null;
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1);
// A single anti-affinity constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,notin,node,foo");
.parsePlacementSpec("foo(3),notin,node,foo");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
@ -361,7 +373,7 @@ public void testParsePlacementSpec()
// Upper case
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,NOTIN,NODE,foo");
.parsePlacementSpec("foo(3),NOTIN,NODE,foo");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
@ -372,7 +384,7 @@ public void testParsePlacementSpec()
// A single cardinality constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=10,cardinality,node,foo,bar,0,100");
.parsePlacementSpec("foo(10),cardinality,node,foo,bar,0,100");
Assert.assertEquals(1, result.size());
tag1 = result.keySet().iterator().next();
Assert.assertEquals("foo", tag1.getTag());
@ -386,7 +398,7 @@ public void testParsePlacementSpec()
// Two constraint expressions
result = PlacementConstraintParser
.parsePlacementSpec("foo=3,notin,node,foo:bar=2,in,node,foo");
.parsePlacementSpec("foo(3),notin,node,foo:bar(2),in,node,foo");
Assert.assertEquals(2, result.size());
Iterator<SourceTags> keyIt = result.keySet().iterator();
tag1 = keyIt.next();
@ -403,7 +415,7 @@ public void testParsePlacementSpec()
// And constraint
result = PlacementConstraintParser
.parsePlacementSpec("foo=1000,and(notin,node,bar:in,node,foo)");
.parsePlacementSpec("foo(1000),and(notin,node,bar:in,node,foo)");
Assert.assertEquals(1, result.size());
keyIt = result.keySet().iterator();
tag1 = keyIt.next();
@ -416,8 +428,8 @@ public void testParsePlacementSpec()
// Multiple constraints with nested forms.
result = PlacementConstraintParser.parsePlacementSpec(
"foo=1000,and(notin,node,bar:or(in,node,foo:in,node,moo))"
+ ":bar=200,notin,node,foo");
"foo(1000),and(notin,node,bar:or(in,node,foo:in,node,moo))"
+ ":bar(200),notin,node,foo");
Assert.assertEquals(2, result.size());
keyIt = result.keySet().iterator();
tag1 = keyIt.next();
@ -436,6 +448,17 @@ public void testParsePlacementSpec()
Assert.assertEquals(actualPc1, expectedPc1);
expectedPc2 = targetNotIn("node", allocationTag("foo")).build();
Assert.assertEquals(expectedPc2, actualPc2);
// Failure Cases
String[] invalidSpecs = {"foo(3", "foo),bar", "foobar", "),java=1.7,1.8"};
for (String spec : invalidSpecs) {
try {
result = PlacementConstraintParser.parsePlacementSpec(spec);
Assert.fail("Expected a failure!");
} catch (Exception e) {
Assert.assertTrue(e instanceof PlacementConstraintParseException);
}
}
}
// We verify the toString result by parsing it again
@ -466,7 +489,7 @@ public void testParseNodeAttributeSpec()
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec("xyz=4,rm.yarn.io/foo=true");
.parsePlacementSpec("xyz(4),rm.yarn.io/foo=true");
Assert.assertEquals(1, result.size());
TargetExpression target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "true");
@ -477,7 +500,7 @@ public void testParseNodeAttributeSpec()
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec("xyz=3,rm.yarn.io/foo!=abc");
.parsePlacementSpec("xyz(3),rm.yarn.io/foo!=abc");
Assert.assertEquals(1, result.size());
target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "abc");
@ -492,7 +515,7 @@ public void testParseNodeAttributeSpec()
// A single node attribute constraint
result = PlacementConstraintParser
.parsePlacementSpec(
"xyz=1,rm.yarn.io/foo!=abc:zxy=1,rm.yarn.io/bar=true");
"xyz(1),rm.yarn.io/foo!=abc:zxy(1),rm.yarn.io/bar=true");
Assert.assertEquals(2, result.size());
target = PlacementTargets
.nodeAttribute("rm.yarn.io/foo", "abc");
@ -519,11 +542,27 @@ public void testParseNodeAttributeSpec()
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
// Node Attribute Constraint With Multiple Values
result = PlacementConstraintParser
.parsePlacementSpec("java=1.7,1.8");
Assert.assertEquals(1, result.size());
Set<String> constraintEntities = new TreeSet<>();
constraintEntities.add("1.7");
constraintEntities.add("1.8");
target = PlacementConstraints.PlacementTargets.nodeAttribute("java",
constraintEntities.toArray(new String[constraintEntities.size()]));
expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target);
actualSourceTags = result.keySet().iterator().next();
Assert.assertTrue(actualSourceTags.isEmpty());
actualPc1 = result.values().iterator().next();
Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr());
// If source tags is not specified for a node-attribute constraint,
// then this expression must be single constraint expression.
try {
PlacementConstraintParser
.parsePlacementSpec("rm.yarn.io/foo=true:xyz=1,notin,node,xyz");
.parsePlacementSpec("rm.yarn.io/foo=true:xyz(1),notin,node,xyz");
Assert.fail("Expected a failure!");
} catch (Exception e) {
Assert.assertTrue(e instanceof PlacementConstraintParseException);
@ -537,7 +576,7 @@ public void testParseAllocationTagNameSpace()
// Constraint with Two Different NameSpaces
result = PlacementConstraintParser
.parsePlacementSpec("foo=2,notin,node,not-self/bar,all/moo");
.parsePlacementSpec("foo(2),notin,node,not-self/bar,all/moo");
Assert.assertEquals(1, result.size());
Set<TargetExpression> expectedTargetExpressions = Sets.newHashSet(
PlacementTargets.allocationTagWithNamespace("not-self", "bar"),
@ -552,7 +591,7 @@ public void testParseAllocationTagNameSpace()
// Constraint With Default NameSpace SELF
result = PlacementConstraintParser
.parsePlacementSpec("foo=2,notin,node,moo");
.parsePlacementSpec("foo(2),notin,node,moo");
Assert.assertEquals(1, result.size());
TargetExpression expectedTargetExpression = PlacementTargets.
allocationTagWithNamespace("self", "moo");
@ -567,7 +606,7 @@ public void testParseAllocationTagNameSpace()
boolean caughtException = false;
try {
result = PlacementConstraintParser
.parsePlacementSpec("foo=2,notin,node,bar/moo");
.parsePlacementSpec("foo(2),notin,node,bar/moo");
} catch(PlacementConstraintParseException e) {
caughtException = true;
}

View File

@ -96,8 +96,12 @@ public static Map<String, PlacementSpec> parse(String specs)
parsed.entrySet()) {
LOG.info("Parsed source tag: {}, number of allocations: {}",
entry.getKey().getTag(), entry.getKey().getNumOfAllocations());
LOG.info("Parsed constraint: {}", entry.getValue()
.getConstraintExpr().getClass().getSimpleName());
if (entry.getValue() != null) {
LOG.info("Parsed constraint: {}", entry.getValue()
.getConstraintExpr().getClass().getSimpleName());
} else {
LOG.info("Parsed constraint Empty");
}
pSpecs.put(entry.getKey().getTag(), new PlacementSpec(
entry.getKey().getTag(),
entry.getKey().getNumOfAllocations(),

View File

@ -156,7 +156,7 @@ public void testDistributedShellWithPlacementConstraint()
"--shell_command",
distShellTest.getSleepCommand(15),
"--placement_spec",
"zk=1,NOTIN,NODE,zk:spark=1,NOTIN,NODE,zk"
"zk(1),NOTIN,NODE,zk:spark(1),NOTIN,NODE,zk"
};
LOG.info("Initializing DS Client");
final Client client =
@ -202,7 +202,7 @@ public void testDistributedShellWithAllocationTagNamespace()
"--shell_command",
distShellTest.getSleepCommand(30),
"--placement_spec",
"bar=1,notin,node,bar"
"bar(1),notin,node,bar"
};
final Client clientA =
new Client(new Configuration(distShellTest.yarnCluster.getConfig()));
@ -275,7 +275,7 @@ public void run() {
"--shell_command",
Shell.WINDOWS ? "dir" : "ls",
"--placement_spec",
"foo=3,notin,node,all/bar"
"foo(3),notin,node,all/bar"
};
final Client clientB = new Client(new Configuration(distShellTest.
yarnCluster.getConfig()));

View File

@ -65,26 +65,29 @@ $ yarn org.apache.hadoop.yarn.applications.distributedshell.Client -jar share/ha
where **PlacementSpec** is of the form:
```
PlacementSpec => "" | KeyVal;PlacementSpec
KeyVal => SourceTag=ConstraintExpr
SourceTag => String
ConstraintExpr => NumContainers | NumContainers, Constraint
Constraint => SingleConstraint | CompositeConstraint
SingleConstraint => "IN",Scope,TargetTag | "NOTIN",Scope,TargetTag | "CARDINALITY",Scope,TargetTag,MinCard,MaxCard
CompositeConstraint => AND(ConstraintList) | OR(ConstraintList)
ConstraintList => Constraint | Constraint:ConstraintList
NumContainers => int
Scope => "NODE" | "RACK"
TargetTag => String
MinCard => int
MaxCard => int
PlacementSpec => "" | PlacementExpr;PlacementSpec
PlacementExpr => SourceTag,ConstraintExpr
SourceTag => String(NumContainers)
ConstraintExpr => SingleConstraint | CompositeConstraint
SingleConstraint => "IN",Scope,TargetTag | "NOTIN",Scope,TargetTag | "CARDINALITY",Scope,TargetTag,MinCard,MaxCard | NodeAttributeConstraintExpr
NodeAttributeConstraintExpr => NodeAttributeName=Value, NodeAttributeName!=Value
CompositeConstraint => AND(ConstraintList) | OR(ConstraintList)
ConstraintList => Constraint | Constraint:ConstraintList
NumContainers => int
Scope => "NODE" | "RACK"
TargetTag => String
MinCard => int
MaxCard => int
```
Note that when the `-placement_spec` argument is specified in the distributed shell command, the `-num-containers` argument should not be used. In case `-num-containers` argument is used in conjunction with `-placement-spec`, the former is ignored. This is because in PlacementSpec, we determine the number of containers per tag, making the `-num-containers` redundant and possibly conflicting. Moreover, if `-placement_spec` is used, all containers will be requested with GUARANTEED execution type.
Note:
* When the `-placement_spec` argument is specified (except NodeAttributeConstraintExpr) in the distributed shell command, the `-num-containers` argument should not be used. In case `-num-containers` argument is used in conjunction with `-placement-spec`, the former is ignored. This is because in PlacementSpec, we determine the number of containers per tag, making the `-num-containers` redundant and possibly conflicting. Moreover, if `-placement_spec` is used, all containers will be requested with GUARANTEED execution type.
* When the `NodeAttributeConstraintExpr` is specified, `SourceTag(NumContainers)` is optional and the value of `-num-containers` will be considered for the number of containers to request.
An example of PlacementSpec is the following:
```
zk=3,NOTIN,NODE,zk:hbase=5,IN,RACK,zk:spark=7,CARDINALITY,NODE,hbase,1,3
zk(3),NOTIN,NODE,zk:hbase(5),IN,RACK,zk:spark(7),CARDINALITY,NODE,hbase,1,3
```
The above encodes three constraints:
@ -94,7 +97,7 @@ The above encodes three constraints:
Another example below demonstrates a composite form of constraint:
```
zk=5,AND(IN,RACK,hbase:NOTIN,NODE,zk)
zk(5),AND(IN,RACK,hbase:NOTIN,NODE,zk)
```
The above constraint uses the conjunction operator `AND` to combine two constraints. The AND constraint is satisfied when both its children constraints are satisfied. The specific PlacementSpec requests to place 5 "zk" containers in a rack where at least one "hbase" container is running, and on a node that no "zk" container is running.
Similarly, an `OR` operator can be used to define a constraint that is satisfied when at least one of its children constraints is satisfied.
@ -130,7 +133,7 @@ To attach an allocation tag namespace `ns` to a target tag `targetTag`, we use t
The example constraints used above could be extended with namespaces as follows:
```
zk=3,NOTIN,NODE,not-self/zk:hbase=5,IN,RACK,all/zk:spark=7,CARDINALITY,NODE,app-id/appID_0023/hbase,1,3
zk(3),NOTIN,NODE,not-self/zk:hbase(5),IN,RACK,all/zk:spark(7),CARDINALITY,NODE,app-id/appID_0023/hbase,1,3
```
The semantics of these constraints are the following:
@ -175,4 +178,4 @@ Applications have to specify the containers for which each constraint will be en
When using the `placement-processor` handler (see [Enabling placement constraints](#Enabling_placement_constraints)), this constraint mapping is specified within the `RegisterApplicationMasterRequest`.
When using the `scheduler` handler, the constraints can also be added at each `SchedulingRequest` object. Each such constraint is valid for the tag of that scheduling request. In case constraints are specified both at the `RegisterApplicationMasterRequest` and the scheduling requests, the latter override the former.
When using the `scheduler` handler, the constraints can also be added at each `SchedulingRequest` object. Each such constraint is valid for the tag of that scheduling request. In case constraints are specified both at the `RegisterApplicationMasterRequest` and the scheduling requests, the latter override the former.