YARN-10219. Fix YARN Native Service Placement Constraints with Node Attributes.
Contributed by Eric Yang.
This commit is contained in:
parent
3edbe8708a
commit
c791b0e90e
@ -26,6 +26,7 @@
|
||||
import static org.apache.hadoop.yarn.service.api.records.Component
|
||||
.RestartPolicyEnum;
|
||||
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
|
||||
import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceSizing;
|
||||
@ -811,16 +812,12 @@ public void requestContainers(long count) {
|
||||
PlacementConstraint constraint = null;
|
||||
switch (yarnServiceConstraint.getType()) {
|
||||
case AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
constraint = getAffinityConstraint(yarnServiceConstraint,
|
||||
targetExpressions);
|
||||
break;
|
||||
case ANTI_AFFINITY:
|
||||
constraint = PlacementConstraints
|
||||
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
constraint = getAntiAffinityConstraint(yarnServiceConstraint,
|
||||
targetExpressions);
|
||||
break;
|
||||
case AFFINITY_WITH_CARDINALITY:
|
||||
constraint = PlacementConstraints.targetCardinality(
|
||||
@ -865,6 +862,46 @@ public void requestContainers(long count) {
|
||||
}
|
||||
}
|
||||
|
||||
private PlacementConstraint getAffinityConstraint(
|
||||
org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
||||
yarnServiceConstraint, List<TargetExpression> targetExpressions) {
|
||||
PlacementConstraint constraint = null;
|
||||
if (!yarnServiceConstraint.getTargetTags().isEmpty() ||
|
||||
!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
||||
constraint = PlacementConstraints
|
||||
.targetIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
}
|
||||
if (!yarnServiceConstraint.getNodeAttributes().isEmpty()) {
|
||||
constraint = PlacementConstraints
|
||||
.targetNodeAttribute(yarnServiceConstraint.getScope().getValue(),
|
||||
NodeAttributeOpCode.EQ, targetExpressions.toArray(
|
||||
new TargetExpression[0])).build();
|
||||
}
|
||||
return constraint;
|
||||
}
|
||||
|
||||
private PlacementConstraint getAntiAffinityConstraint(
|
||||
org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
||||
yarnServiceConstraint, List<TargetExpression> targetExpressions) {
|
||||
PlacementConstraint constraint = null;
|
||||
if (!yarnServiceConstraint.getTargetTags().isEmpty() ||
|
||||
!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
||||
constraint = PlacementConstraints
|
||||
.targetNotIn(yarnServiceConstraint.getScope().getValue(),
|
||||
targetExpressions.toArray(new TargetExpression[0]))
|
||||
.build();
|
||||
}
|
||||
if (!yarnServiceConstraint.getNodeAttributes().isEmpty()) {
|
||||
constraint = PlacementConstraints
|
||||
.targetNodeAttribute(yarnServiceConstraint.getScope().getValue(),
|
||||
NodeAttributeOpCode.NE, targetExpressions.toArray(
|
||||
new TargetExpression[0])).build();
|
||||
}
|
||||
return constraint;
|
||||
}
|
||||
|
||||
private void setDesiredContainers(int n) {
|
||||
int delta = n - scheduler.getServiceMetrics().containersDesired.value();
|
||||
if (delta != 0) {
|
||||
|
@ -354,19 +354,6 @@ private static void validatePlacementPolicy(List<Component> components,
|
||||
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||
comp.getName()));
|
||||
}
|
||||
if (constraint.getTargetTags().isEmpty()) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||
comp.getName()));
|
||||
}
|
||||
for (String targetTag : constraint.getTargetTags()) {
|
||||
if (!comp.getName().equals(targetTag)) {
|
||||
throw new IllegalArgumentException(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
|
||||
targetTag, comp.getName(), comp.getName(), comp.getName()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
@ -734,6 +734,7 @@ public void testComponentHealthThresholdMonitor() throws Exception {
|
||||
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
|
||||
conf.setInt(YarnConfiguration.RM_MAX_COMPLETED_APPLICATIONS,
|
||||
YarnConfiguration.DEFAULT_RM_MAX_COMPLETED_APPLICATIONS);
|
||||
conf.setInt(YarnConfiguration.NM_VCORES, 1);
|
||||
setConf(conf);
|
||||
setupInternal(3);
|
||||
ServiceClient client = createClient(getConf());
|
||||
|
@ -554,33 +554,11 @@ public void testPlacementPolicy() throws IOException {
|
||||
// Set the scope
|
||||
pc.setScope(PlacementScope.NODE);
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||
"CA1 ", "comp-a"), e.getMessage());
|
||||
}
|
||||
|
||||
// Set a target tag - but an invalid one
|
||||
pc.setTargetTags(Collections.singletonList("comp-invalid"));
|
||||
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name");
|
||||
} catch (IllegalArgumentException e) {
|
||||
assertEquals(
|
||||
String.format(
|
||||
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME,
|
||||
"comp-invalid", "comp-a", "comp-a", "comp-a"),
|
||||
e.getMessage());
|
||||
}
|
||||
|
||||
// Set valid target tags now
|
||||
// Target tag is optional.
|
||||
pc.setTargetTags(Collections.singletonList("comp-a"));
|
||||
|
||||
// Finally it should succeed
|
||||
// Validation can succeed for any arbitrary target, only scheduler knows
|
||||
// if the target tag is valid.
|
||||
try {
|
||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||
} catch (IllegalArgumentException e) {
|
||||
|
@ -778,9 +778,6 @@ POST URL - http://localhost:8088/app/v1/services
|
||||
"node_partitions": [
|
||||
"gpu",
|
||||
"fast-disk"
|
||||
],
|
||||
"target_tags": [
|
||||
"hello"
|
||||
]
|
||||
}
|
||||
]
|
||||
@ -797,11 +794,12 @@ GET URL - http://localhost:8088/app/v1/services/hello-world
|
||||
|
||||
Note, for an anti-affinity component no more than 1 container will be allocated
|
||||
in a specific node. In this example, 3 containers have been requested by
|
||||
component "hello". All 3 containers were allocated because the cluster had 3 or
|
||||
more NMs. If the cluster had less than 3 NMs then less than 3 containers would
|
||||
be allocated. In cases when the number of allocated containers are less than the
|
||||
number of requested containers, the component and the service will be in
|
||||
non-STABLE state.
|
||||
component "hello". All 3 containers were allocated on separated centos7 nodes
|
||||
because the node attributes expects to run on centos7 nodes.
|
||||
If the cluster had less than 3 NMs then less than
|
||||
3 containers would be allocated. In cases when the number of allocated containers
|
||||
are less than the number of requested containers, the component and the service
|
||||
will be in non-STABLE state.
|
||||
|
||||
```json
|
||||
{
|
||||
@ -822,16 +820,19 @@ non-STABLE state.
|
||||
"placement_policy": {
|
||||
"constraints": [
|
||||
{
|
||||
"type": "ANTI_AFFINITY",
|
||||
"type": "AFFINITY",
|
||||
"scope": "NODE",
|
||||
"node_attributes": {
|
||||
"os": ["centos6", "centos7"],
|
||||
"fault_domain": ["fd1", "fd2"]
|
||||
"os": ["centos7"]
|
||||
},
|
||||
"node_partitions": [
|
||||
"gpu",
|
||||
"fast-disk"
|
||||
],
|
||||
]
|
||||
},
|
||||
{
|
||||
"type": "ANTI_AFFINITY",
|
||||
"scope": "NODE",
|
||||
"target_tags": [
|
||||
"hello"
|
||||
]
|
||||
|
Loading…
Reference in New Issue
Block a user