From 67ae81f0e0ac7f107261ee15f2eb4d189e3b1983 Mon Sep 17 00:00:00 2001 From: Naganarasimha Date: Mon, 27 Aug 2018 10:27:33 +0800 Subject: [PATCH] YARN-7863. Modify placement constraints to support node attributes. Contributed by Sunil Govindan. --- .../yarn/api/records/NodeAttributeOpCode.java | 43 ++++++ .../api/resource/PlacementConstraint.java | 40 +++++- .../api/resource/PlacementConstraints.java | 19 +++ .../constraint/PlacementConstraintParser.java | 112 ++++++++++++++-- .../src/main/proto/yarn_protos.proto | 7 + .../TestPlacementConstraintParser.java | 61 ++++++++- .../distributedshell/ApplicationMaster.java | 37 +++-- .../applications/distributedshell/Client.java | 5 +- .../distributedshell/PlacementSpec.java | 19 ++- ...PlacementConstraintFromProtoConverter.java | 10 +- .../PlacementConstraintToProtoConverter.java | 11 ++ .../resourcemanager/ResourceManager.java | 7 +- .../nodelabels/NodeAttributesManagerImpl.java | 24 +++- .../scheduler/SchedulerNode.java | 11 ++ .../scheduler/capacity/CapacityScheduler.java | 36 ++++- .../constraint/PlacementConstraintsUtil.java | 126 +++++++++++++++--- .../NodeAttributesUpdateSchedulerEvent.java | 41 ++++++ .../scheduler/event/SchedulerEventType.java | 1 + .../LocalityAppPlacementAllocator.java | 4 + 19 files changed, 568 insertions(+), 46 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java new file mode 100644 index 0000000000..76db063eed --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/NodeAttributeOpCode.java @@ -0,0 +1,43 @@ +/** +* Licensed to the Apache Software Foundation (ASF) under one +* or more contributor license agreements. See the NOTICE file +* distributed with this work for additional information +* regarding copyright ownership. The ASF licenses this file +* to you under the Apache License, Version 2.0 (the +* "License"); you may not use this file except in compliance +* with the License. You may obtain a copy of the License at +* +* http://www.apache.org/licenses/LICENSE-2.0 +* +* Unless required by applicable law or agreed to in writing, software +* distributed under the License is distributed on an "AS IS" BASIS, +* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +* See the License for the specific language governing permissions and +* limitations under the License. +*/ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.classification.InterfaceAudience.Public; +import org.apache.hadoop.classification.InterfaceStability.Evolving; + +/** + * Enumeration of various node attribute op codes. + */ +@Public +@Evolving +public enum NodeAttributeOpCode { + /** + * Default as No OP. + */ + NO_OP, + /** + * EQUALS op code for Attribute. + */ + EQ, + + /** + * NOT EQUALS op code for Attribute. + */ + NE +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java index 0fe8273e6d..79196fbf85 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraint.java @@ -29,6 +29,7 @@ import java.util.Iterator; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; /** * {@code PlacementConstraint} represents a placement constraint for a resource @@ -155,13 +156,22 @@ public class PlacementConstraint { private int minCardinality; private int maxCardinality; private Set targetExpressions; + private NodeAttributeOpCode attributeOpCode; public SingleConstraint(String scope, int minCardinality, - int maxCardinality, Set targetExpressions) { + int maxCardinality, NodeAttributeOpCode opCode, + Set targetExpressions) { this.scope = scope; this.minCardinality = minCardinality; this.maxCardinality = maxCardinality; this.targetExpressions = targetExpressions; + this.attributeOpCode = opCode; + } + + public SingleConstraint(String scope, int minCardinality, + int maxCardinality, Set targetExpressions) { + this(scope, minCardinality, maxCardinality, NodeAttributeOpCode.NO_OP, + targetExpressions); } public SingleConstraint(String scope, int minC, int maxC, @@ -169,6 +179,13 @@ public class PlacementConstraint { this(scope, minC, maxC, new HashSet<>(Arrays.asList(targetExpressions))); } + public SingleConstraint(String scope, int minC, int maxC, + NodeAttributeOpCode opCode, + TargetExpression... targetExpressions) { + this(scope, minC, maxC, opCode, + new HashSet<>(Arrays.asList(targetExpressions))); + } + /** * Get the scope of the constraint. * @@ -205,6 +222,15 @@ public class PlacementConstraint { return targetExpressions; } + /** + * Get the NodeAttributeOpCode of the constraint. + * + * @return nodeAttribute Op Code + */ + public NodeAttributeOpCode getNodeAttributeOpCode() { + return attributeOpCode; + } + @Override public boolean equals(Object o) { if (this == o) { @@ -225,6 +251,10 @@ public class PlacementConstraint { if (!getScope().equals(that.getScope())) { return false; } + if (getNodeAttributeOpCode() != null && !getNodeAttributeOpCode() + .equals(that.getNodeAttributeOpCode())) { + return false; + } return getTargetExpressions().equals(that.getTargetExpressions()); } @@ -233,6 +263,7 @@ public class PlacementConstraint { int result = getScope().hashCode(); result = 31 * result + getMinCardinality(); result = 31 * result + getMaxCardinality(); + result = 31 * result + getNodeAttributeOpCode().hashCode(); result = 31 * result + getTargetExpressions().hashCode(); return result; } @@ -259,6 +290,13 @@ public class PlacementConstraint { .append(getScope()).append(",") .append(targetExpr) .toString()); + } else if (min == -1 && max == -1) { + // node attribute + targetConstraints.add(new StringBuilder() + .append(getScope()).append(",") + .append(getNodeAttributeOpCode()).append(",") + .append(targetExpr) + .toString()); } else { // cardinality targetConstraints.add(new StringBuilder() diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index d22a6bd90c..73fa328833 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -23,6 +23,7 @@ import java.util.concurrent.TimeUnit; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -85,6 +86,24 @@ public final class PlacementConstraints { return new SingleConstraint(scope, 0, 0, targetExpressions); } + /** + * Creates a constraint that requires allocations to be placed on nodes that + * belong to a scope (e.g., node or rack) that satisfy any of the + * target expressions based on node attribute op code. + * + * @param scope the scope within which the target expressions should not be + * true + * @param opCode Node Attribute code which could be equals, not equals. + * @param targetExpressions the expressions that need to not be true within + * the scope + * @return the resulting placement constraint + */ + public static AbstractConstraint targetNodeAttribute(String scope, + NodeAttributeOpCode opCode, + TargetExpression... targetExpressions) { + return new SingleConstraint(scope, -1, -1, opCode, targetExpressions); + } + /** * Creates a constraint that restricts the number of allocations within a * given scope (e.g., node or rack). diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java index 2926c9d1de..93fd706b0c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/util/constraint/PlacementConstraintParser.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.util.constraint; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; @@ -44,11 +45,12 @@ import java.util.regex.Pattern; @InterfaceStability.Unstable public final class PlacementConstraintParser { + public static final char EXPRESSION_VAL_DELIM = ','; private static final char EXPRESSION_DELIM = ':'; private static final char KV_SPLIT_DELIM = '='; - private static final char EXPRESSION_VAL_DELIM = ','; private static final char BRACKET_START = '('; private static final char BRACKET_END = ')'; + private static final String KV_NE_DELIM = "!="; private static final String IN = "in"; private static final String NOT_IN = "notin"; private static final String AND = "and"; @@ -349,6 +351,91 @@ public final class PlacementConstraintParser { } } + /** + * Constraint parser used to parse a given target expression. + */ + public static class NodeConstraintParser extends ConstraintParser { + + public NodeConstraintParser(String expression) { + super(new BaseStringTokenizer(expression, + String.valueOf(EXPRESSION_VAL_DELIM))); + } + + @Override + public AbstractConstraint parse() + throws PlacementConstraintParseException { + PlacementConstraint.AbstractConstraint placementConstraints = null; + String attributeName = ""; + NodeAttributeOpCode opCode = NodeAttributeOpCode.EQ; + String scope = SCOPE_NODE; + + Set constraintEntities = new TreeSet<>(); + while (hasMoreTokens()) { + String currentTag = nextToken(); + StringTokenizer attributeKV = getAttributeOpCodeTokenizer(currentTag); + + // Usually there will be only one k=v pair. However in case when + // multiple values are present for same attribute, it will also be + // coming as next token. for example, java=1.8,1.9 or python!=2. + if (attributeKV.countTokens() > 1) { + opCode = getAttributeOpCode(currentTag); + attributeName = attributeKV.nextToken(); + currentTag = attributeKV.nextToken(); + } + constraintEntities.add(currentTag); + } + + if(attributeName.isEmpty()) { + throw new PlacementConstraintParseException( + "expecting valid expression like k=v or k!=v, but get " + + constraintEntities); + } + + PlacementConstraint.TargetExpression target = null; + if (!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets + .nodeAttribute(attributeName, + constraintEntities + .toArray(new String[constraintEntities.size()])); + } + + placementConstraints = PlacementConstraints + .targetNodeAttribute(scope, opCode, target); + return placementConstraints; + } + + private StringTokenizer getAttributeOpCodeTokenizer(String currentTag) { + StringTokenizer attributeKV = new StringTokenizer(currentTag, + KV_NE_DELIM); + + // Try with '!=' delim as well. + if (attributeKV.countTokens() < 2) { + attributeKV = new StringTokenizer(currentTag, + String.valueOf(KV_SPLIT_DELIM)); + } + return attributeKV; + } + + /** + * Below conditions are validated. + * java=8 : OpCode = EQUALS + * java!=8 : OpCode = NEQUALS + * @param currentTag tag + * @return Attribute op code. + */ + private NodeAttributeOpCode getAttributeOpCode(String currentTag) + throws PlacementConstraintParseException { + if (currentTag.contains(KV_NE_DELIM)) { + return NodeAttributeOpCode.NE; + } else if (currentTag.contains(String.valueOf(KV_SPLIT_DELIM))) { + return NodeAttributeOpCode.EQ; + } + throw new PlacementConstraintParseException( + "expecting valid expression like k=v or k!=v, but get " + + currentTag); + } + } + /** * Constraint parser used to parse a given target expression, such as * "NOTIN, NODE, foo, bar". @@ -363,20 +450,23 @@ public final class PlacementConstraintParser { @Override public AbstractConstraint parse() throws PlacementConstraintParseException { - PlacementConstraint.AbstractConstraint placementConstraints; + PlacementConstraint.AbstractConstraint placementConstraints = null; String op = nextToken(); if (op.equalsIgnoreCase(IN) || op.equalsIgnoreCase(NOT_IN)) { String scope = nextToken(); scope = parseScope(scope); - Set allocationTags = new TreeSet<>(); + Set constraintEntities = new TreeSet<>(); while(hasMoreTokens()) { String tag = nextToken(); - allocationTags.add(tag); + constraintEntities.add(tag); + } + PlacementConstraint.TargetExpression target = null; + if(!constraintEntities.isEmpty()) { + target = PlacementConstraints.PlacementTargets.allocationTag( + constraintEntities + .toArray(new String[constraintEntities.size()])); } - PlacementConstraint.TargetExpression target = - PlacementConstraints.PlacementTargets.allocationTag( - allocationTags.toArray(new String[allocationTags.size()])); if (op.equalsIgnoreCase(IN)) { placementConstraints = PlacementConstraints .targetIn(scope, target); @@ -550,6 +640,11 @@ public final class PlacementConstraintParser { new ConjunctionConstraintParser(constraintStr); constraintOptional = Optional.ofNullable(jp.tryParse()); } + if (!constraintOptional.isPresent()) { + NodeConstraintParser np = + new NodeConstraintParser(constraintStr); + constraintOptional = Optional.ofNullable(np.tryParse()); + } if (!constraintOptional.isPresent()) { throw new PlacementConstraintParseException( "Invalid constraint expression " + constraintStr); @@ -584,12 +679,13 @@ public final class PlacementConstraintParser { */ public static Map parsePlacementSpec( String expression) throws PlacementConstraintParseException { + // Continue handling for application tag based constraint otherwise. // Respect insertion order. Map result = new LinkedHashMap<>(); PlacementConstraintParser.ConstraintTokenizer tokenizer = new PlacementConstraintParser.MultipleConstraintsTokenizer(expression); tokenizer.validate(); - while(tokenizer.hasMoreElements()) { + while (tokenizer.hasMoreElements()) { String specStr = tokenizer.nextElement(); // each spec starts with sourceAllocationTag=numOfContainers and // followed by a constraint expression. diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto index 10b36c75f5..5fe2cc9455 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/proto/yarn_protos.proto @@ -646,11 +646,18 @@ message PlacementConstraintProto { optional CompositePlacementConstraintProto compositeConstraint = 2; } +enum NodeAttributeOpCodeProto { + NO_OP = 1; + EQ = 2; + NE = 3; +} + message SimplePlacementConstraintProto { required string scope = 1; repeated PlacementConstraintTargetProto targetExpressions = 2; optional int32 minCardinality = 3; optional int32 maxCardinality = 4; + optional NodeAttributeOpCodeProto attributeOpCode = 5; } message PlacementConstraintTargetProto { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java index a69571c5c8..9806ba4ac9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintParser.java @@ -22,6 +22,8 @@ import com.google.common.collect.Sets; import java.util.Iterator; import java.util.Map; import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or; @@ -38,8 +40,14 @@ import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.Multiple import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.SourceTagsTokenizer; import org.apache.hadoop.yarn.util.constraint.PlacementConstraintParser.ConstraintTokenizer; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.*; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNodeAttribute; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; import org.junit.Assert; import org.junit.Test; @@ -443,4 +451,55 @@ public class TestPlacementConstraintParser { + constrainExpr + ", caused by: " + e.getMessage()); } } + + @Test + public void testParseNodeAttributeSpec() + throws PlacementConstraintParseException { + Map result; + PlacementConstraint.AbstractConstraint expectedPc1, expectedPc2; + PlacementConstraint actualPc1, actualPc2; + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpec("xyz=4,rm.yarn.io/foo=true"); + Assert.assertEquals(1, result.size()); + TargetExpression target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "true"); + expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpec("xyz=3,rm.yarn.io/foo!=abc"); + Assert.assertEquals(1, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "abc"); + expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + actualPc1 = result.values().iterator().next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + + // A single node attribute constraint + result = PlacementConstraintParser + .parsePlacementSpec( + "xyz=1,rm.yarn.io/foo!=abc:zxy=1,rm.yarn.io/bar=true"); + Assert.assertEquals(2, result.size()); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/foo", "abc"); + expectedPc1 = targetNodeAttribute("node", NodeAttributeOpCode.NE, target); + target = PlacementTargets + .nodeAttribute("rm.yarn.io/bar", "true"); + expectedPc2 = targetNodeAttribute("node", NodeAttributeOpCode.EQ, target); + + Iterator valueIt = result.values().iterator(); + actualPc1 = valueIt.next(); + actualPc2 = valueIt.next(); + Assert.assertEquals(expectedPc1, actualPc1.getConstraintExpr()); + Assert.assertEquals(expectedPc2, actualPc2.getConstraintExpr()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java index ecf07b184d..09a796e4e6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/ApplicationMaster.java @@ -523,9 +523,13 @@ public class ApplicationMaster { if (cliParser.hasOption("placement_spec")) { String placementSpec = cliParser.getOptionValue("placement_spec"); - LOG.info("Placement Spec received [{}]", placementSpec); - parsePlacementSpecs(placementSpec); + String decodedSpec = getDecodedPlacementSpec(placementSpec); + LOG.info("Placement Spec received [{}]", decodedSpec); + + this.numTotalContainers = 0; + parsePlacementSpecs(decodedSpec); LOG.info("Total num containers requested [{}]", numTotalContainers); + if (numTotalContainers == 0) { throw new IllegalArgumentException( "Cannot run distributed shell with no containers"); @@ -694,21 +698,23 @@ public class ApplicationMaster { return true; } - private void parsePlacementSpecs(String placementSpecifications) { - // Client sends placement spec in encoded format + private void parsePlacementSpecs(String decodedSpec) { + Map pSpecs = + PlacementSpec.parse(decodedSpec); + this.placementSpecs = new HashMap<>(); + for (PlacementSpec pSpec : pSpecs.values()) { + this.numTotalContainers += pSpec.getNumContainers(); + this.placementSpecs.put(pSpec.sourceTag, pSpec); + } + } + + private String getDecodedPlacementSpec(String placementSpecifications) { Base64.Decoder decoder = Base64.getDecoder(); byte[] decodedBytes = decoder.decode( placementSpecifications.getBytes(StandardCharsets.UTF_8)); String decodedSpec = new String(decodedBytes, StandardCharsets.UTF_8); LOG.info("Decode placement spec: " + decodedSpec); - Map pSpecs = - PlacementSpec.parse(decodedSpec); - this.placementSpecs = new HashMap<>(); - this.numTotalContainers = 0; - for (PlacementSpec pSpec : pSpecs.values()) { - this.numTotalContainers += pSpec.numContainers; - this.placementSpecs.put(pSpec.sourceTag, pSpec); - } + return decodedSpec; } /** @@ -798,6 +804,7 @@ public class ApplicationMaster { } } } + RegisterApplicationMasterResponse response = amRMClient .registerApplicationMaster(appMasterHostname, appMasterRpcPort, appMasterTrackingUrl, placementConstraintMap); @@ -845,14 +852,18 @@ public class ApplicationMaster { // Keep looping until all the containers are launched and shell script // executed on them ( regardless of success/failure). if (this.placementSpecs == null) { + LOG.info("placementSpecs null"); for (int i = 0; i < numTotalContainersToRequest; ++i) { ContainerRequest containerAsk = setupContainerAskForRM(); amRMClient.addContainerRequest(containerAsk); } } else { + LOG.info("placementSpecs to create req:" + placementSpecs); List schedReqs = new ArrayList<>(); for (PlacementSpec pSpec : this.placementSpecs.values()) { - for (int i = 0; i < pSpec.numContainers; i++) { + LOG.info("placementSpec :" + pSpec + ", container:" + pSpec + .getNumContainers()); + for (int i = 0; i < pSpec.getNumContainers(); i++) { SchedulingRequest sr = setupSchedulingRequest(pSpec); schedReqs.add(sr); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java index 9da92885ce..e8b69fe186 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/Client.java @@ -103,7 +103,7 @@ import org.slf4j.LoggerFactory; * the provided shell command on a set of containers.

* *

This client is meant to act as an example on how to write yarn-based applications.

- * + * *

To submit an application, a client first needs to connect to the ResourceManager * aka ApplicationsManager or ASM via the {@link ApplicationClientProtocol}. The {@link ApplicationClientProtocol} * provides a way for the client to get access to cluster information and to request for a @@ -192,6 +192,8 @@ public class Client { // Placement specification private String placementSpec = ""; + // Node Attribute specification + private String nodeAttributeSpec = ""; // log4j.properties file // if available, add to local resources and set into classpath private String log4jPropFile = ""; @@ -448,6 +450,7 @@ public class Client { // Check if it is parsable PlacementSpec.parse(this.placementSpec); } + appName = cliParser.getOptionValue("appname", "DistributedShell"); amPriority = Integer.parseInt(cliParser.getOptionValue("priority", "0")); amQueue = cliParser.getOptionValue("queue", "default"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java index 290925980a..ceaa37d587 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-applications/hadoop-yarn-applications-distributedshell/src/main/java/org/apache/hadoop/yarn/applications/distributedshell/PlacementSpec.java @@ -37,8 +37,8 @@ public class PlacementSpec { LoggerFactory.getLogger(PlacementSpec.class); public final String sourceTag; - public final int numContainers; public final PlacementConstraint constraint; + private int numContainers; public PlacementSpec(String sourceTag, int numContainers, PlacementConstraint constraint) { @@ -47,6 +47,22 @@ public class PlacementSpec { this.constraint = constraint; } + /** + * Get the number of container for this spec. + * @return container count + */ + public int getNumContainers() { + return numContainers; + } + + /** + * Set number of containers for this spec. + * @param numContainers number of containers. + */ + public void setNumContainers(int numContainers) { + this.numContainers = numContainers; + } + // Placement specification should be of the form: // PlacementSpec => ""|KeyVal;PlacementSpec // KeyVal => SourceTag=Constraint @@ -71,6 +87,7 @@ public class PlacementSpec { public static Map parse(String specs) throws IllegalArgumentException { LOG.info("Parsing Placement Specs: [{}]", specs); + Map pSpecs = new HashMap<>(); Map parsed; try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java index 926b6fa279..447905e2b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintFromProtoConverter.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; @@ -37,6 +38,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementConstraint; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto; import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto; import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto; import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto; @@ -73,7 +75,8 @@ public class PlacementConstraintFromProtoConverter { } return new SingleConstraint(proto.getScope(), proto.getMinCardinality(), - proto.getMaxCardinality(), targets); + proto.getMaxCardinality(), + convertFromProtoFormat(proto.getAttributeOpCode()), targets); } private TargetExpression convert(PlacementConstraintTargetProto proto) { @@ -113,4 +116,9 @@ public class PlacementConstraintFromProtoConverter { return new TimedPlacementConstraint(pConstraint, proto.getSchedulingDelay(), ProtoUtils.convertFromProtoFormat(proto.getDelayUnit())); } + + private static NodeAttributeOpCode convertFromProtoFormat( + NodeAttributeOpCodeProto p) { + return NodeAttributeOpCode.valueOf(p.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java index 7816e181dd..30f774136d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/api/pb/PlacementConstraintToProtoConverter.java @@ -19,6 +19,7 @@ package org.apache.hadoop.yarn.api.pb; import org.apache.hadoop.classification.InterfaceAudience.Private; +import org.apache.hadoop.yarn.api.records.NodeAttributeOpCode; import org.apache.hadoop.yarn.api.records.impl.pb.ProtoUtils; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; @@ -34,6 +35,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TimedPlacementCon import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto; import org.apache.hadoop.yarn.proto.YarnProtos.CompositePlacementConstraintProto.CompositeType; +import org.apache.hadoop.yarn.proto.YarnProtos.NodeAttributeOpCodeProto; import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintProto; import org.apache.hadoop.yarn.proto.YarnProtos.PlacementConstraintTargetProto; import org.apache.hadoop.yarn.proto.YarnProtos.SimplePlacementConstraintProto; @@ -72,6 +74,10 @@ public class PlacementConstraintToProtoConverter } sb.setMinCardinality(constraint.getMinCardinality()); sb.setMaxCardinality(constraint.getMaxCardinality()); + if (constraint.getNodeAttributeOpCode() != null) { + sb.setAttributeOpCode( + convertToProtoFormat(constraint.getNodeAttributeOpCode())); + } if (constraint.getTargetExpressions() != null) { for (TargetExpression target : constraint.getTargetExpressions()) { sb.addTargetExpressions( @@ -171,4 +177,9 @@ public class PlacementConstraintToProtoConverter return tb.build(); } + + private static NodeAttributeOpCodeProto convertToProtoFormat( + NodeAttributeOpCode p) { + return NodeAttributeOpCodeProto.valueOf(p.name()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index 81ef337a24..16f019f3c8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -519,9 +519,10 @@ public class ResourceManager extends CompositeService return new RMNodeLabelsManager(); } - protected NodeAttributesManager createNodeAttributesManager() - throws InstantiationException, IllegalAccessException { - return new NodeAttributesManagerImpl(); + protected NodeAttributesManager createNodeAttributesManager() { + NodeAttributesManagerImpl namImpl = new NodeAttributesManagerImpl(); + namImpl.setRMContext(rmContext); + return namImpl; } protected AllocationTagsManager createAllocationTagsManager() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java index fac2dfd72c..9111d0f9a0 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/nodelabels/NodeAttributesManagerImpl.java @@ -57,6 +57,8 @@ import org.apache.hadoop.yarn.nodelabels.RMNodeAttribute; import org.apache.hadoop.yarn.nodelabels.StringAttributeValue; import org.apache.hadoop.yarn.server.api.protocolrecords.AttributeMappingOperationType; import org.apache.hadoop.yarn.server.api.protocolrecords.NodeToAttributes; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import com.google.common.base.Strings; @@ -92,6 +94,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { private final ReadLock readLock; private final WriteLock writeLock; + private RMContext rmContext = null; public NodeAttributesManagerImpl() { super("NodeAttributesManagerImpl"); @@ -131,7 +134,7 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { } protected void initNodeAttributeStore(Configuration conf) throws Exception { - this.store =getAttributeStoreClass(conf); + this.store = getAttributeStoreClass(conf); this.store.init(conf, this); this.store.recover(); } @@ -206,6 +209,21 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { .handle(new NodeAttributesStoreEvent(nodeAttributeMapping, op)); } + // Map used to notify RM + Map> newNodeToAttributesMap = + new HashMap>(); + nodeAttributeMapping.forEach((k, v) -> { + Host node = nodeCollections.get(k); + newNodeToAttributesMap.put(k, node.attributes.keySet()); + }); + + // Notify RM + if (rmContext != null && rmContext.getDispatcher() != null) { + LOG.info("Updated NodeAttribute event to RM:" + newNodeToAttributesMap + .values()); + rmContext.getDispatcher().getEventHandler().handle( + new NodeAttributesUpdateSchedulerEvent(newNodeToAttributesMap)); + } } finally { writeLock.unlock(); } @@ -703,4 +721,8 @@ public class NodeAttributesManagerImpl extends NodeAttributesManager { store.close(); } } + + public void setRMContext(RMContext context) { + this.rmContext = context; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java index 59771fdef7..b35aeba83b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerNode.java @@ -34,6 +34,7 @@ import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.Container; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceUtilization; @@ -79,6 +80,8 @@ public abstract class SchedulerNode { private volatile Set labels = null; + private volatile Set nodeAttributes = null; + // Last updated time private volatile long lastHeartbeatMonotonicTime; @@ -503,6 +506,14 @@ public abstract class SchedulerNode { return getNodeID().hashCode(); } + public Set getNodeAttributes() { + return nodeAttributes; + } + + public void updateNodeAttributes(Set attributes) { + this.nodeAttributes = attributes; + } + private static class ContainerInfo { private final RMContainer container; private boolean launchedOnNode; diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index 81dcf86e03..a1d3f60049 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -52,6 +52,7 @@ import org.apache.hadoop.yarn.api.records.ContainerExitStatus; import org.apache.hadoop.yarn.api.records.ContainerId; import org.apache.hadoop.yarn.api.records.ContainerStatus; import org.apache.hadoop.yarn.api.records.ExecutionType; +import org.apache.hadoop.yarn.api.records.NodeAttribute; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.QueueACL; @@ -137,6 +138,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppRemovedS import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerExpiredSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.ContainerPreemptEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAddedSchedulerEvent; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeAttributesUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeLabelsUpdateSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeResourceUpdateSchedulerEvent; @@ -1767,6 +1769,14 @@ public class CapacityScheduler extends updateNodeLabelsAndQueueResource(labelUpdateEvent); } break; + case NODE_ATTRIBUTES_UPDATE: + { + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent = + (NodeAttributesUpdateSchedulerEvent) event; + + updateNodeAttributes(attributeUpdateEvent); + } + break; case NODE_UPDATE: { NodeUpdateSchedulerEvent nodeUpdatedEvent = (NodeUpdateSchedulerEvent)event; @@ -1900,6 +1910,30 @@ public class CapacityScheduler extends } } + private void updateNodeAttributes( + NodeAttributesUpdateSchedulerEvent attributeUpdateEvent) { + try { + writeLock.lock(); + for (Entry> entry : attributeUpdateEvent + .getUpdatedNodeToAttributes().entrySet()) { + String hostname = entry.getKey(); + Set attributes = entry.getValue(); + List nodeIds = nodeTracker.getNodeIdsByResourceName(hostname); + updateAttributesOnNode(nodeIds, attributes); + } + } finally { + writeLock.unlock(); + } + } + + private void updateAttributesOnNode(List nodeIds, + Set attributes) { + nodeIds.forEach((k) -> { + SchedulerNode node = nodeTracker.getNode(k); + node.updateNodeAttributes(attributes); + }); + } + /** * Process node labels update. */ @@ -2768,7 +2802,7 @@ public class CapacityScheduler extends schedulingRequest, schedulerNode, rmContext.getPlacementConstraintManager(), rmContext.getAllocationTagsManager())) { - LOG.debug("Failed to allocate container for application " + LOG.info("Failed to allocate container for application " + appAttempt.getApplicationId() + " on node " + schedulerNode.getNodeName() + " because this allocation violates the" diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index f47e1d4889..ccd334cd6a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,8 +24,7 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; -import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.*; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; @@ -114,22 +113,92 @@ public final class PlacementConstraintsUtil { || maxScopeCardinality <= desiredMaxCardinality); } - private static boolean canSatisfyNodePartitionConstraintExpresssion( - TargetExpression targetExpression, SchedulerNode schedulerNode) { + private static boolean canSatisfyNodeConstraintExpresssion( + SingleConstraint sc, TargetExpression targetExpression, + SchedulerNode schedulerNode) { Set values = targetExpression.getTargetValues(); - if (values == null || values.isEmpty()) { - return schedulerNode.getPartition().equals( - RMNodeLabelsManager.NO_LABEL); - } else{ - String nodePartition = values.iterator().next(); - if (!nodePartition.equals(schedulerNode.getPartition())) { + + if (targetExpression.getTargetKey().equals(NODE_PARTITION)) { + if (values == null || values.isEmpty()) { + return schedulerNode.getPartition() + .equals(RMNodeLabelsManager.NO_LABEL); + } else { + String nodePartition = values.iterator().next(); + if (!nodePartition.equals(schedulerNode.getPartition())) { + return false; + } + } + } else { + NodeAttributeOpCode opCode = sc.getNodeAttributeOpCode(); + // compare attributes. + String inputAttribute = values.iterator().next(); + NodeAttribute requestAttribute = getNodeConstraintFromRequest( + targetExpression.getTargetKey(), inputAttribute); + if (requestAttribute == null) { + return true; + } + + if (schedulerNode.getNodeAttributes() == null || + !schedulerNode.getNodeAttributes().contains(requestAttribute)) { + if(LOG.isDebugEnabled()) { + LOG.debug("Incoming requestAttribute:" + requestAttribute + + "is not present in " + schedulerNode.getNodeID()); + } + return false; + } + boolean found = false; + for (Iterator it = schedulerNode.getNodeAttributes() + .iterator(); it.hasNext();) { + NodeAttribute nodeAttribute = it.next(); + if (LOG.isDebugEnabled()) { + LOG.debug("Starting to compare Incoming requestAttribute :" + + requestAttribute + + " with requestAttribute value= " + requestAttribute + .getAttributeValue() + + ", stored nodeAttribute value=" + nodeAttribute + .getAttributeValue()); + } + if (requestAttribute.equals(nodeAttribute)) { + if (isOpCodeMatches(requestAttribute, nodeAttribute, opCode)) { + if (LOG.isDebugEnabled()) { + LOG.debug( + "Incoming requestAttribute:" + requestAttribute + + " matches with node:" + schedulerNode.getNodeID()); + } + found = true; + return found; + } + } + } + if (!found) { + if(LOG.isDebugEnabled()) { + LOG.info("skip this node:" + schedulerNode.getNodeID() + + " for requestAttribute:" + requestAttribute); + } return false; } } - return true; } + private static boolean isOpCodeMatches(NodeAttribute requestAttribute, + NodeAttribute nodeAttribute, NodeAttributeOpCode opCode) { + boolean retCode = false; + switch (opCode) { + case EQ: + retCode = requestAttribute.getAttributeValue() + .equals(nodeAttribute.getAttributeValue()); + break; + case NE: + retCode = !(requestAttribute.getAttributeValue() + .equals(nodeAttribute.getAttributeValue())); + break; + default: + break; + } + return retCode; + } + private static boolean canSatisfySingleConstraint(ApplicationId applicationId, SingleConstraint singleConstraint, SchedulerNode schedulerNode, AllocationTagsManager tagsManager) @@ -146,10 +215,12 @@ public final class PlacementConstraintsUtil { singleConstraint, currentExp, schedulerNode, tagsManager)) { return false; } - } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE) - && currentExp.getTargetKey().equals(NODE_PARTITION)) { - // This is a node partition expression, check it. - canSatisfyNodePartitionConstraintExpresssion(currentExp, schedulerNode); + } else if (currentExp.getTargetType().equals(TargetType.NODE_ATTRIBUTE)) { + // This is a node attribute expression, check it. + if (!canSatisfyNodeConstraintExpresssion(singleConstraint, currentExp, + schedulerNode)) { + return false; + } } } // return true if all targetExpressions are satisfied @@ -203,6 +274,11 @@ public final class PlacementConstraintsUtil { AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { if (constraint == null) { + if(LOG.isDebugEnabled()) { + LOG.debug( + "Constraint is found empty during constraint validation for app:" + + appId); + } return true; } @@ -263,4 +339,24 @@ public final class PlacementConstraintsUtil { pcm.getMultilevelConstraint(applicationId, sourceTags, pc), schedulerNode, atm); } + + private static NodeAttribute getNodeConstraintFromRequest(String attrKey, + String attrString) { + NodeAttribute nodeAttribute = null; + if(LOG.isDebugEnabled()) { + LOG.debug("Incoming node attribute: " + attrKey + "=" + attrString); + } + + // Input node attribute could be like 1.8 + String[] name = attrKey.split("/"); + if (name == null || name.length == 1) { + nodeAttribute = NodeAttribute + .newInstance(attrKey, NodeAttributeType.STRING, attrString); + } else { + nodeAttribute = NodeAttribute + .newInstance(name[0], name[1], NodeAttributeType.STRING, attrString); + } + + return nodeAttribute; + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java new file mode 100644 index 0000000000..cdc0b69e6e --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/NodeAttributesUpdateSchedulerEvent.java @@ -0,0 +1,41 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.event; + +import java.util.Map; +import java.util.Set; + +import org.apache.hadoop.yarn.api.records.NodeAttribute; + +/** + * Event handler class for Node Attributes which sends events to Scheduler. + */ +public class NodeAttributesUpdateSchedulerEvent extends SchedulerEvent { + private Map> nodeToAttributes; + + public NodeAttributesUpdateSchedulerEvent( + Map> newNodeToAttributesMap) { + super(SchedulerEventType.NODE_ATTRIBUTES_UPDATE); + this.nodeToAttributes = newNodeToAttributesMap; + } + + public Map> getUpdatedNodeToAttributes() { + return nodeToAttributes; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java index b107cf4ee6..869bf0ed9e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/event/SchedulerEventType.java @@ -26,6 +26,7 @@ public enum SchedulerEventType { NODE_UPDATE, NODE_RESOURCE_UPDATE, NODE_LABELS_UPDATE, + NODE_ATTRIBUTES_UPDATE, // Source: RMApp APP_ADDED, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java index 9d30e9065e..45573505c2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/LocalityAppPlacementAllocator.java @@ -396,6 +396,10 @@ public class LocalityAppPlacementAllocator SchedulingMode schedulingMode) { // We will only look at node label = nodeLabelToLookAt according to // schedulingMode and partition of node. + if(LOG.isDebugEnabled()) { + LOG.debug("precheckNode is invoked for " + schedulerNode.getNodeID() + "," + + schedulingMode); + } String nodePartitionToLookAt; if (schedulingMode == SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY) { nodePartitionToLookAt = schedulerNode.getPartition();