YARN-8350. NPE in service AM related to placement policy. Contributed by Gour Saha
This commit is contained in:
parent
96eefcc84a
commit
778a4a24be
@ -694,6 +694,7 @@ public void requestContainers(long count) {
|
|||||||
// composite constraints then this AND-ed composite constraint is not
|
// composite constraints then this AND-ed composite constraint is not
|
||||||
// used.
|
// used.
|
||||||
PlacementConstraint finalConstraint = null;
|
PlacementConstraint finalConstraint = null;
|
||||||
|
if (placementPolicy != null) {
|
||||||
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
for (org.apache.hadoop.yarn.service.api.records.PlacementConstraint
|
||||||
yarnServiceConstraint : placementPolicy.getConstraints()) {
|
yarnServiceConstraint : placementPolicy.getConstraints()) {
|
||||||
List<TargetExpression> targetExpressions = new ArrayList<>();
|
List<TargetExpression> targetExpressions = new ArrayList<>();
|
||||||
@ -705,8 +706,9 @@ public void requestContainers(long count) {
|
|||||||
// Add all node attributes
|
// Add all node attributes
|
||||||
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
|
for (Map.Entry<String, List<String>> attribute : yarnServiceConstraint
|
||||||
.getNodeAttributes().entrySet()) {
|
.getNodeAttributes().entrySet()) {
|
||||||
targetExpressions.add(PlacementTargets.nodeAttribute(
|
targetExpressions
|
||||||
attribute.getKey(), attribute.getValue().toArray(new String[0])));
|
.add(PlacementTargets.nodeAttribute(attribute.getKey(),
|
||||||
|
attribute.getValue().toArray(new String[0])));
|
||||||
}
|
}
|
||||||
// Add all node partitions
|
// Add all node partitions
|
||||||
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
if (!yarnServiceConstraint.getNodePartitions().isEmpty()) {
|
||||||
@ -749,7 +751,9 @@ public void requestContainers(long count) {
|
|||||||
finalConstraint = constraint;
|
finalConstraint = constraint;
|
||||||
}
|
}
|
||||||
LOG.debug("[COMPONENT {}] Placement constraint: {}",
|
LOG.debug("[COMPONENT {}] Placement constraint: {}",
|
||||||
componentSpec.getName(), constraint.getConstraintExpr().toString());
|
componentSpec.getName(),
|
||||||
|
constraint.getConstraintExpr().toString());
|
||||||
|
}
|
||||||
}
|
}
|
||||||
ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
|
ResourceSizing resourceSizing = ResourceSizing.newInstance((int) count,
|
||||||
resource);
|
resource);
|
||||||
|
@ -91,6 +91,14 @@ public interface RestApiErrorMessages {
|
|||||||
|
|
||||||
String ERROR_QUICKLINKS_FOR_COMP_INVALID = "Quicklinks specified at"
|
String ERROR_QUICKLINKS_FOR_COMP_INVALID = "Quicklinks specified at"
|
||||||
+ " component level, needs corresponding values set at service level";
|
+ " component level, needs corresponding values set at service level";
|
||||||
|
// Note: %sin is not a typo. Constraint name is optional so the error messages
|
||||||
|
// below handle that scenario by adding a space if name is specified.
|
||||||
|
String ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL = "Type not specified "
|
||||||
|
+ "for constraint %sin placement policy of component %s.";
|
||||||
|
String ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL = "Scope not specified "
|
||||||
|
+ "for constraint %sin placement policy of component %s.";
|
||||||
|
String ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL = "Tag(s) not specified "
|
||||||
|
+ "for constraint %sin placement policy of component %s.";
|
||||||
String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s "
|
String ERROR_PLACEMENT_POLICY_TAG_NAME_NOT_SAME = "Invalid target tag %s "
|
||||||
+ "specified in placement policy of component %s. For now, target tags "
|
+ "specified in placement policy of component %s. For now, target tags "
|
||||||
+ "support self reference only. Specifying anything other than its "
|
+ "support self reference only. Specifying anything other than its "
|
||||||
|
@ -40,6 +40,7 @@
|
|||||||
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
import org.apache.hadoop.yarn.service.api.records.Configuration;
|
||||||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||||
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
import org.apache.hadoop.yarn.service.exceptions.SliderException;
|
||||||
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
|
import org.apache.hadoop.yarn.service.conf.RestApiConstants;
|
||||||
@ -314,9 +315,28 @@ public static void validateNameFormat(String name,
|
|||||||
private static void validatePlacementPolicy(List<Component> components,
|
private static void validatePlacementPolicy(List<Component> components,
|
||||||
Set<String> componentNames) {
|
Set<String> componentNames) {
|
||||||
for (Component comp : components) {
|
for (Component comp : components) {
|
||||||
if (comp.getPlacementPolicy() != null) {
|
PlacementPolicy placementPolicy = comp.getPlacementPolicy();
|
||||||
for (PlacementConstraint constraint : comp.getPlacementPolicy()
|
if (placementPolicy != null) {
|
||||||
|
for (PlacementConstraint constraint : placementPolicy
|
||||||
.getConstraints()) {
|
.getConstraints()) {
|
||||||
|
if (constraint.getType() == null) {
|
||||||
|
throw new IllegalArgumentException(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
|
||||||
|
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||||
|
comp.getName()));
|
||||||
|
}
|
||||||
|
if (constraint.getScope() == null) {
|
||||||
|
throw new IllegalArgumentException(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
|
||||||
|
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||||
|
comp.getName()));
|
||||||
|
}
|
||||||
|
if (constraint.getTargetTags().isEmpty()) {
|
||||||
|
throw new IllegalArgumentException(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||||
|
constraint.getName() == null ? "" : constraint.getName() + " ",
|
||||||
|
comp.getName()));
|
||||||
|
}
|
||||||
for (String targetTag : constraint.getTargetTags()) {
|
for (String targetTag : constraint.getTargetTags()) {
|
||||||
if (!comp.getName().equals(targetTag)) {
|
if (!comp.getName().equals(targetTag)) {
|
||||||
throw new IllegalArgumentException(String.format(
|
throw new IllegalArgumentException(String.format(
|
||||||
|
@ -25,6 +25,8 @@
|
|||||||
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
import org.apache.hadoop.yarn.service.api.records.KerberosPrincipal;
|
||||||
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
import org.apache.hadoop.yarn.service.api.records.PlacementConstraint;
|
||||||
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
|
import org.apache.hadoop.yarn.service.api.records.PlacementPolicy;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.PlacementScope;
|
||||||
|
import org.apache.hadoop.yarn.service.api.records.PlacementType;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Resource;
|
import org.apache.hadoop.yarn.service.api.records.Resource;
|
||||||
import org.apache.hadoop.yarn.service.api.records.Service;
|
import org.apache.hadoop.yarn.service.api.records.Service;
|
||||||
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
|
import org.apache.hadoop.yarn.service.exceptions.RestApiErrorMessages;
|
||||||
@ -503,13 +505,48 @@ public void testPlacementPolicy() throws IOException {
|
|||||||
PlacementPolicy pp = new PlacementPolicy();
|
PlacementPolicy pp = new PlacementPolicy();
|
||||||
PlacementConstraint pc = new PlacementConstraint();
|
PlacementConstraint pc = new PlacementConstraint();
|
||||||
pc.setName("CA1");
|
pc.setName("CA1");
|
||||||
pc.setTargetTags(Collections.singletonList("comp-invalid"));
|
|
||||||
pp.setConstraints(Collections.singletonList(pc));
|
pp.setConstraints(Collections.singletonList(pc));
|
||||||
comp.setPlacementPolicy(pp);
|
comp.setPlacementPolicy(pp);
|
||||||
|
|
||||||
try {
|
try {
|
||||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||||
Assert.fail(EXCEPTION_PREFIX + "service with empty placement");
|
Assert.fail(EXCEPTION_PREFIX + "constraint with no type");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
assertEquals(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TYPE_NULL,
|
||||||
|
"CA1 ", "comp-a"), e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the type
|
||||||
|
pc.setType(PlacementType.ANTI_AFFINITY);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||||
|
Assert.fail(EXCEPTION_PREFIX + "constraint with no scope");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
assertEquals(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_SCOPE_NULL,
|
||||||
|
"CA1 ", "comp-a"), e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set the scope
|
||||||
|
pc.setScope(PlacementScope.NODE);
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||||
|
Assert.fail(EXCEPTION_PREFIX + "constraint with no tag(s)");
|
||||||
|
} catch (IllegalArgumentException e) {
|
||||||
|
assertEquals(String.format(
|
||||||
|
RestApiErrorMessages.ERROR_PLACEMENT_POLICY_CONSTRAINT_TAGS_NULL,
|
||||||
|
"CA1 ", "comp-a"), e.getMessage());
|
||||||
|
}
|
||||||
|
|
||||||
|
// Set a target tag - but an invalid one
|
||||||
|
pc.setTargetTags(Collections.singletonList("comp-invalid"));
|
||||||
|
|
||||||
|
try {
|
||||||
|
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||||
|
Assert.fail(EXCEPTION_PREFIX + "constraint with invalid tag name");
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
assertEquals(
|
assertEquals(
|
||||||
String.format(
|
String.format(
|
||||||
@ -518,9 +555,10 @@ public void testPlacementPolicy() throws IOException {
|
|||||||
e.getMessage());
|
e.getMessage());
|
||||||
}
|
}
|
||||||
|
|
||||||
|
// Set valid target tags now
|
||||||
pc.setTargetTags(Collections.singletonList("comp-a"));
|
pc.setTargetTags(Collections.singletonList("comp-a"));
|
||||||
|
|
||||||
// now it should succeed
|
// Finally it should succeed
|
||||||
try {
|
try {
|
||||||
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
ServiceApiUtil.validateAndResolveService(app, sfs, CONF_DNS_ENABLED);
|
||||||
} catch (IllegalArgumentException e) {
|
} catch (IllegalArgumentException e) {
|
||||||
|
Loading…
Reference in New Issue
Block a user