diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java new file mode 100644 index 0000000000..25f876156b --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespace.java @@ -0,0 +1,336 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import com.google.common.base.Strings; +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; + +import java.util.ArrayList; +import java.util.List; +import java.util.Set; + +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL; +import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString; + +/** + * Class to describe the namespace of an allocation tag. + * Each namespace can be evaluated against a set of applications. + * After evaluation, the namespace should have an implicit set of + * applications which defines its scope. + */ +public abstract class AllocationTagNamespace implements + Evaluable { + + public final static String NAMESPACE_DELIMITER = "/"; + + private AllocationTagNamespaceType nsType; + // Namespace scope value will be delay binding by eval method. + private Set nsScope; + + public AllocationTagNamespace(AllocationTagNamespaceType + allocationTagNamespaceType) { + this.nsType = allocationTagNamespaceType; + } + + protected void setScopeIfNotNull(Set appIds) { + if (appIds != null) { + this.nsScope = appIds; + } + } + + /** + * Get the type of the namespace. + * @return namespace type. + */ + public AllocationTagNamespaceType getNamespaceType() { + return nsType; + } + + /** + * Get the scope of the namespace, in form of a set of applications. + * Before calling this method, {@link #evaluate(TargetApplications)} + * must be called in prior to ensure the scope is proper evaluated. + * + * @return a set of applications. + */ + public Set getNamespaceScope() { + if (this.nsScope == null) { + throw new IllegalStateException("Invalid namespace scope," + + " it is not initialized. Evaluate must be called before" + + " a namespace can be consumed."); + } + return this.nsScope; + } + + @Override + public abstract void evaluate(TargetApplications target) + throws InvalidAllocationTagException; + + /** + * @return true if the namespace is effective in all applications + * in this cluster. Specifically the namespace prefix should be + * "all". + */ + public boolean isGlobal() { + return AllocationTagNamespaceType.ALL.equals(getNamespaceType()); + } + + /** + * @return true if the namespace is effective within a single application + * by its application ID, the namespace prefix should be "app-id"; + * false otherwise. + */ + public boolean isSingleInterApp() { + return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType()); + } + + /** + * @return true if the namespace is effective to the application itself, + * the namespace prefix should be "self"; false otherwise. + */ + public boolean isIntraApp() { + return AllocationTagNamespaceType.SELF.equals(getNamespaceType()); + } + + /** + * @return true if the namespace is effective to all applications except + * itself, the namespace prefix should be "not-self"; false otherwise. + */ + public boolean isNotSelf() { + return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType()); + } + + /** + * @return true if the namespace is effective to a group of applications + * identified by a application label, the namespace prefix should be + * "app-label"; false otherwise. + */ + public boolean isAppLabel() { + return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType()); + } + + @Override + public String toString() { + return this.nsType.toString(); + } + + /** + * Namespace within application itself. + */ + public static class Self extends AllocationTagNamespace { + + public Self() { + super(SELF); + } + + @Override + public void evaluate(TargetApplications target) + throws InvalidAllocationTagException { + if (target == null || target.getCurrentApplicationId() == null) { + throw new InvalidAllocationTagException("Namespace Self must" + + " be evaluated against a single application ID."); + } + ApplicationId applicationId = target.getCurrentApplicationId(); + setScopeIfNotNull(ImmutableSet.of(applicationId)); + } + } + + /** + * Namespace to all applications except itself. + */ + public static class NotSelf extends AllocationTagNamespace { + + private ApplicationId applicationId; + + public NotSelf() { + super(NOT_SELF); + } + + /** + * The scope of self namespace is to an application itself, + * the application ID can be delay binding to the namespace. + * + * @param appId application ID. + */ + public void setApplicationId(ApplicationId appId) { + this.applicationId = appId; + } + + public ApplicationId getApplicationId() { + return this.applicationId; + } + + @Override + public void evaluate(TargetApplications target) { + Set otherAppIds = target.getOtherApplicationIds(); + setScopeIfNotNull(otherAppIds); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class All extends AllocationTagNamespace { + + public All() { + super(ALL); + } + + @Override + public void evaluate(TargetApplications target) { + Set allAppIds = target.getAllApplicationIds(); + setScopeIfNotNull(allAppIds); + } + } + + /** + * Namespace to all applications in the cluster. + */ + public static class AppLabel extends AllocationTagNamespace { + + public AppLabel() { + super(APP_LABEL); + } + + @Override + public void evaluate(TargetApplications target) { + // TODO Implement app-label namespace evaluation + } + } + + /** + * Namespace defined by a certain application ID. + */ + public static class AppID extends AllocationTagNamespace { + + private ApplicationId targetAppId; + // app-id namespace requires an extra value of an application id. + public AppID(ApplicationId applicationId) { + super(APP_ID); + this.targetAppId = applicationId; + } + + @Override + public void evaluate(TargetApplications target) { + setScopeIfNotNull(ImmutableSet.of(targetAppId)); + } + + @Override + public String toString() { + return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId; + } + } + + /** + * Parse namespace from a string. The string must be in legal format + * defined by each {@link AllocationTagNamespaceType}. + * + * @param namespaceStr namespace string. + * @return an instance of {@link AllocationTagNamespace}. + * @throws InvalidAllocationTagException + * if given string is not in valid format + */ + public static AllocationTagNamespace parse(String namespaceStr) + throws InvalidAllocationTagException { + // Return the default namespace if no valid string is given. + if (Strings.isNullOrEmpty(namespaceStr)) { + return new Self(); + } + + // Normalize the input, escape additional chars. + List nsValues = normalize(namespaceStr); + // The first string should be the prefix. + String nsPrefix = nsValues.get(0); + AllocationTagNamespaceType allocationTagNamespaceType = + fromString(nsPrefix); + switch (allocationTagNamespaceType) { + case SELF: + return new Self(); + case NOT_SELF: + return new NotSelf(); + case ALL: + return new All(); + case APP_ID: + if (nsValues.size() != 2) { + throw new InvalidAllocationTagException( + "Missing the application ID in the namespace string: " + + namespaceStr); + } + String appIDStr = nsValues.get(1); + return parseAppID(appIDStr); + case APP_LABEL: + return new AppLabel(); + default: + throw new InvalidAllocationTagException( + "Invalid namespace string " + namespaceStr); + } + } + + private static AllocationTagNamespace parseAppID(String appIDStr) + throws InvalidAllocationTagException { + try { + ApplicationId applicationId = ApplicationId.fromString(appIDStr); + return new AppID(applicationId); + } catch (IllegalArgumentException e) { + throw new InvalidAllocationTagException( + "Invalid application ID for " + + APP_ID.getTypeKeyword() + ": " + appIDStr); + } + } + + /** + * Valid given namespace string and parse it to a list of sub-strings + * that can be consumed by the parser according to the type of the + * namespace. Currently the size of return list should be either 1 or 2. + * Extra slash is escaped during the normalization. + * + * @param namespaceStr namespace string. + * @return a list of parsed strings. + * @throws InvalidAllocationTagException + * if namespace format is unexpected. + */ + private static List normalize(String namespaceStr) + throws InvalidAllocationTagException { + List result = new ArrayList<>(); + if (namespaceStr == null) { + return result; + } + + String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER); + for (String str : nsValues) { + if (!Strings.isNullOrEmpty(str)) { + result.add(str); + } + } + + // Currently we only allow 1 or 2 values for a namespace string + if (result.size() == 0 || result.size() > 2) { + throw new InvalidAllocationTagException("Invalid namespace string: " + + namespaceStr + ", the syntax is or" + + " /"); + } + + return result; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java new file mode 100644 index 0000000000..5e46cd0f3d --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTagNamespaceType.java @@ -0,0 +1,74 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; + +import java.util.Arrays; +import java.util.Set; +import java.util.stream.Collectors; + +/** + * Class to describe all supported forms of namespaces for an allocation tag. + */ +public enum AllocationTagNamespaceType { + + SELF("self"), + NOT_SELF("not-self"), + APP_ID("app-id"), + APP_LABEL("app-label"), + ALL("all"); + + private String typeKeyword; + AllocationTagNamespaceType(String keyword) { + this.typeKeyword = keyword; + } + + public String getTypeKeyword() { + return this.typeKeyword; + } + + /** + * Parses the namespace type from a given string. + * @param prefix namespace prefix. + * @return namespace type. + * @throws InvalidAllocationTagException + */ + public static AllocationTagNamespaceType fromString(String prefix) throws + InvalidAllocationTagException { + for (AllocationTagNamespaceType type : + AllocationTagNamespaceType.values()) { + if(type.getTypeKeyword().equals(prefix)) { + return type; + } + } + + Set values = Arrays.stream(AllocationTagNamespaceType.values()) + .map(AllocationTagNamespaceType::toString) + .collect(Collectors.toSet()); + throw new InvalidAllocationTagException( + "Invalid namespace prefix: " + prefix + + ", valid values are: " + String.join(",", values)); + } + + @Override + public String toString() { + return this.getTypeKeyword(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java new file mode 100644 index 0000000000..50bffc3551 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/AllocationTags.java @@ -0,0 +1,50 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; + +/** + * Allocation tags under same namespace. + */ +public class AllocationTags { + + private AllocationTagNamespace ns; + private Set tags; + + public AllocationTags(AllocationTagNamespace namespace, + Set allocationTags) { + this.ns = namespace; + this.tags = allocationTags; + } + + /** + * @return the namespace of these tags. + */ + public AllocationTagNamespace getNamespace() { + return this.ns; + } + + /** + * @return the allocation tags. + */ + public Set getTags() { + return this.tags; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java new file mode 100644 index 0000000000..7a74002ea5 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/Evaluable.java @@ -0,0 +1,38 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import org.apache.hadoop.yarn.exceptions.YarnException; + +/** + * A class implements Evaluable interface represents the internal state + * of the class can be changed against a given target. + * @param a target to evaluate against + */ +public interface Evaluable { + + /** + * Evaluate against a given target, this process changes the internal state + * of current class. + * + * @param target a generic type target that impacts this evaluation. + * @throws YarnException + */ + void evaluate(T target) throws YarnException; +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java new file mode 100644 index 0000000000..de0ea268b1 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/records/TargetApplications.java @@ -0,0 +1,53 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.api.records; + +import java.util.Set; +import java.util.stream.Collectors; + +/** + * This class is used by + * {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate + * a namespace. + */ +public class TargetApplications { + + private ApplicationId currentAppId; + private Set allAppIds; + + public TargetApplications(ApplicationId currentApplicationId, + Set allApplicationIds) { + this.currentAppId = currentApplicationId; + this.allAppIds = allApplicationIds; + } + + public Set getAllApplicationIds() { + return this.allAppIds; + } + + public ApplicationId getCurrentApplicationId() { + return this.currentAppId; + } + + public Set getOtherApplicationIds() { + return allAppIds == null ? null : allAppIds.stream().filter(appId -> + !appId.equals(getCurrentApplicationId())) + .collect(Collectors.toSet()); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java index c1549c54db..af70e2a747 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/api/resource/PlacementConstraints.java @@ -20,9 +20,9 @@ import java.util.concurrent.TimeUnit; -import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; @@ -50,13 +50,6 @@ private PlacementConstraints() { public static final String RACK = PlacementConstraint.RACK_SCOPE; public static final String NODE_PARTITION = "yarn_node_partition/"; - private static final String APPLICATION_LABEL_PREFIX = - "yarn_application_label/"; - - @InterfaceAudience.Private - public static final String APPLICATION_LABEL_INTRA_APPLICATION = - APPLICATION_LABEL_PREFIX + "%intra_app%"; - /** * Creates a constraint that requires allocations to be placed on nodes that * satisfy all target expressions within the given scope (e.g., node or rack). @@ -223,6 +216,20 @@ public static TargetExpression allocationTag(String... allocationTags) { allocationTags); } + /** + * Constructs a target expression on a set of allocation tags under + * a certain namespace. + * + * @param namespace namespace of the allocation tags + * @param allocationTags allocation tags + * @return a target expression + */ + public static TargetExpression allocationTagWithNamespace(String namespace, + String... allocationTags) { + return new TargetExpression(TargetType.ALLOCATION_TAG, + namespace, allocationTags); + } + /** * Constructs a target expression on an allocation tag. It is satisfied if * there are allocations with one of the given tags. Comparing to @@ -235,8 +242,9 @@ public static TargetExpression allocationTag(String... allocationTags) { */ public static TargetExpression allocationTagToIntraApp( String... allocationTags) { + AllocationTagNamespace selfNs = new AllocationTagNamespace.Self(); return new TargetExpression(TargetType.ALLOCATION_TAG, - APPLICATION_LABEL_INTRA_APPLICATION, allocationTags); + selfNs.toString(), allocationTags); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java new file mode 100644 index 0000000000..be8d881d7a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-api/src/main/java/org/apache/hadoop/yarn/exceptions/InvalidAllocationTagException.java @@ -0,0 +1,34 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.exceptions; + +/** + * This exception is thrown by + * {@link + * org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)} + * when it fails to parse a namespace. + */ +public class InvalidAllocationTagException extends YarnException { + + private static final long serialVersionUID = 1L; + + public InvalidAllocationTagException(String message) { + super(message); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java index 8ef9999da2..fb2619afcf 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/AllocationTagsManager.java @@ -21,6 +21,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableSet; import org.apache.commons.lang.StringUtils; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; @@ -561,4 +562,12 @@ public long getRackCardinalityByOp(String rack, ApplicationId applicationId, public Map getAllocationTagsWithCount(NodeId nodeId) { return globalNodeMapping.getTypeToTagsWithCount().get(nodeId); } + + /** + * @return all application IDs in a set that currently visible by + * the allocation tags manager. + */ + public Set getAllApplicationIds() { + return ImmutableSet.copyOf(perAppNodeMappings.keySet()); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java index 29483a2a0a..d2bc4d87ae 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/InvalidAllocationTagsQueryException.java @@ -32,4 +32,8 @@ public class InvalidAllocationTagsQueryException extends YarnException { public InvalidAllocationTagsQueryException(String msg) { super(msg); } + + public InvalidAllocationTagsQueryException(YarnException e) { + super(e); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index ab0bbd7f77..2d0e95a9b9 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -24,8 +24,11 @@ import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceStability.Unstable; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.SchedulingRequest; +import org.apache.hadoop.yarn.api.records.TargetApplications; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; @@ -35,6 +38,7 @@ import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; @@ -56,6 +60,53 @@ public final class PlacementConstraintsUtil { private PlacementConstraintsUtil() { } + /** + * Try to the namespace of the allocation tags from the given target key. + * + * @param targetKey + * @return allocation tag namespace. + * @throws InvalidAllocationTagsQueryException + * if fail to parse the target key to a valid namespace. + */ + private static AllocationTagNamespace getAllocationTagNamespace( + ApplicationId currentAppId, String targetKey, AllocationTagsManager atm) + throws InvalidAllocationTagException{ + // Parse to a valid namespace. + AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey); + + // TODO remove such check once we support all forms of namespaces + if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) { + throw new InvalidAllocationTagException( + "Only support " + AllocationTagNamespaceType.SELF.toString() + + " and "+ AllocationTagNamespaceType.APP_ID + " now," + + namespace.toString() + " is not supported yet!"); + } + + // Evaluate the namespace according to the given target + // before it can be consumed. + TargetApplications ta = new TargetApplications(currentAppId, + atm.getAllApplicationIds()); + namespace.evaluate(ta); + return namespace; + } + + // We return a single app Id now, because at present, + // only self and app-id namespace is supported. But moving on, + // this will return a set of application IDs. + // TODO support other forms of namespaces + private static ApplicationId getNamespaceScope( + AllocationTagNamespace namespace) + throws InvalidAllocationTagException { + if (namespace.getNamespaceScope() == null + || namespace.getNamespaceScope().size() != 1) { + throw new InvalidAllocationTagException( + "Invalid allocation tag namespace " + namespace.toString() + + ", expecting it is not null and only 1 application" + + " ID in the scope."); + } + return namespace.getNamespaceScope().iterator().next(); + } + /** * Returns true if single placement constraint with associated * allocationTags and scope is satisfied by a specific scheduler Node. @@ -74,6 +125,18 @@ private static boolean canSatisfySingleConstraintExpression( ApplicationId targetApplicationId, SingleConstraint sc, TargetExpression te, SchedulerNode node, AllocationTagsManager tm) throws InvalidAllocationTagsQueryException { + // Parse the allocation tag's namespace from the given target key, + // then evaluate the namespace and get its scope, + // which is represented by one or more application IDs. + ApplicationId effectiveAppID; + try { + AllocationTagNamespace namespace = getAllocationTagNamespace( + targetApplicationId, te.getTargetKey(), tm); + effectiveAppID = getNamespaceScope(namespace); + } catch (InvalidAllocationTagException e) { + throw new InvalidAllocationTagsQueryException(e); + } + long minScopeCardinality = 0; long maxScopeCardinality = 0; @@ -86,20 +149,20 @@ private static boolean canSatisfySingleConstraintExpression( if (sc.getScope().equals(PlacementConstraints.NODE)) { if (checkMinCardinality) { minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - targetApplicationId, te.getTargetValues(), Long::max); + effectiveAppID, te.getTargetValues(), Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), - targetApplicationId, te.getTargetValues(), Long::min); + effectiveAppID, te.getTargetValues(), Long::min); } } else if (sc.getScope().equals(PlacementConstraints.RACK)) { if (checkMinCardinality) { minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - targetApplicationId, te.getTargetValues(), Long::max); + effectiveAppID, te.getTargetValues(), Long::max); } if (checkMaxCardinality) { maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), - targetApplicationId, te.getTargetValues(), Long::min); + effectiveAppID, te.getTargetValues(), Long::min); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index ed0734503f..7e5506efdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -23,6 +23,8 @@ import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.util.StringUtils; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; @@ -30,6 +32,7 @@ import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -53,7 +56,6 @@ import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; /** @@ -220,7 +222,8 @@ private String throwExceptionWithMetaInfo(String message) { throw new SchedulerInvalidResoureRequestException(sb.toString()); } - private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) + private void validateAndSetSchedulingRequest(SchedulingRequest + newSchedulingRequest) throws SchedulerInvalidResoureRequestException { // Check sizing exists if (newSchedulingRequest.getResourceSizing() == null @@ -333,15 +336,23 @@ private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequ targetAllocationTags = new HashSet<>( targetExpression.getTargetValues()); - if (targetExpression.getTargetKey() != null && !targetExpression - .getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) { + try { + AllocationTagNamespace tagNS = + AllocationTagNamespace.parse(targetExpression.getTargetKey()); + if (!AllocationTagNamespaceType.SELF + .equals(tagNS.getNamespaceType())) { + throwExceptionWithMetaInfo( + "As of now, the only accepted target key for targetKey of " + + "allocation_tag target expression is: [" + + AllocationTagNamespaceType.SELF.toString() + + "]. Please make changes to placement constraints " + + "accordingly. If this is null, it will be set to " + + AllocationTagNamespaceType.SELF.toString() + + " by default."); + } + } catch (InvalidAllocationTagException e) { throwExceptionWithMetaInfo( - "As of now, the only accepted target key for targetKey of " - + "allocation_tag target expression is: [" - + APPLICATION_LABEL_INTRA_APPLICATION - + "]. Please make changes to placement constraints " - + "accordingly. If this is null, it will be set to " - + APPLICATION_LABEL_INTRA_APPLICATION + " by default."); + "Invalid allocation tag namespace, message: " + e.getMessage()); } } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java new file mode 100644 index 0000000000..67a3901e64 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestAllocationTagsNamespace.java @@ -0,0 +1,147 @@ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +import com.google.common.collect.ImmutableSet; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.TargetApplications; +import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException; +import org.junit.Assert; +import org.junit.Test; + +/** + * Test class for {@link AllocationTagNamespace}. + */ +public class TestAllocationTagsNamespace { + + @Test + public void testNamespaceParse() throws InvalidAllocationTagException { + AllocationTagNamespace namespace; + + String namespaceStr = "self"; + namespace = AllocationTagNamespace.parse(namespaceStr); + Assert.assertTrue(namespace.isIntraApp()); + + namespaceStr = "not-self"; + namespace = AllocationTagNamespace.parse(namespaceStr); + Assert.assertTrue(namespace.isNotSelf()); + + namespaceStr = "all"; + namespace = AllocationTagNamespace.parse(namespaceStr); + Assert.assertTrue(namespace.isGlobal()); + + namespaceStr = "app-label"; + namespace = AllocationTagNamespace.parse(namespaceStr); + Assert.assertTrue(namespace.isAppLabel()); + + ApplicationId applicationId = ApplicationId.newInstance(12345, 1); + namespaceStr = "app-id/" + applicationId.toString(); + namespace = AllocationTagNamespace.parse(namespaceStr); + Assert.assertTrue(namespace.isSingleInterApp()); + + // Invalid app-id namespace syntax, invalid app ID. + try { + namespaceStr = "app-id/apppppp_12345_99999"; + AllocationTagNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the given app ID is invalid"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e.getMessage().startsWith( + "Invalid application ID for app-id")); + } + + // Invalid app-id namespace syntax, missing app ID. + try { + namespaceStr = "app-id"; + AllocationTagNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the given namespace" + + " is missing application ID"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e.getMessage().startsWith( + "Missing the application ID in the namespace string")); + } + + // Invalid namespace type. + try { + namespaceStr = "non_exist_ns"; + AllocationTagNamespace.parse(namespaceStr); + Assert.fail("Parsing should fail as the giving type is not supported."); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidAllocationTagException); + Assert.assertTrue(e.getMessage().startsWith( + "Invalid namespace prefix")); + } + } + + @Test + public void testNamespaceEvaluation() throws InvalidAllocationTagException { + AllocationTagNamespace namespace; + TargetApplications targetApplications; + ApplicationId app1 = ApplicationId.newInstance(10000, 1); + ApplicationId app2 = ApplicationId.newInstance(10000, 2); + ApplicationId app3 = ApplicationId.newInstance(10000, 3); + ApplicationId app4 = ApplicationId.newInstance(10000, 4); + ApplicationId app5 = ApplicationId.newInstance(10000, 5); + + // Ensure eval is called before using the scope. + String namespaceStr = "self"; + namespace = AllocationTagNamespace.parse(namespaceStr); + try { + namespace.getNamespaceScope(); + Assert.fail("Call getNamespaceScope before evaluate is not allowed."); + } catch (Exception e) { + Assert.assertTrue(e instanceof IllegalStateException); + Assert.assertTrue(e.getMessage().contains( + "Evaluate must be called before a namespace can be consumed.")); + } + + namespaceStr = "self"; + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); + namespace.evaluate(targetApplications); + Assert.assertEquals(1, namespace.getNamespaceScope().size()); + Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next()); + + namespaceStr = "not-self"; + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, ImmutableSet.of(app1)); + namespace.evaluate(targetApplications); + Assert.assertEquals(0, namespace.getNamespaceScope().size()); + + targetApplications = new TargetApplications(app1, + ImmutableSet.of(app1, app2, app3)); + namespace.evaluate(targetApplications); + Assert.assertEquals(2, namespace.getNamespaceScope().size()); + Assert.assertFalse(namespace.getNamespaceScope().contains(app1)); + + namespaceStr = "all"; + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(null, + ImmutableSet.of(app1, app2)); + namespace.evaluate(targetApplications); + Assert.assertEquals(2, namespace.getNamespaceScope().size()); + + namespaceStr = "app-id/" + app2.toString(); + namespace = AllocationTagNamespace.parse(namespaceStr); + targetApplications = new TargetApplications(app1, + ImmutableSet.of(app1, app2, app3, app4, app5)); + namespace.evaluate(targetApplications); + Assert.assertEquals(1, namespace.getNamespaceScope().size()); + Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next()); + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index 5135f636dc..5ba89488af 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; @@ -30,6 +31,7 @@ import java.util.AbstractMap; import java.util.Arrays; +import java.util.HashMap; import java.util.HashSet; import java.util.Iterator; import java.util.List; @@ -39,6 +41,7 @@ import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespace; import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -508,4 +511,152 @@ public void testANDConstraintAssignment() Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); } + + @Test + public void testInterAppConstraintsByAppID() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123); + + // Register App1 with anti-affinity constraint map. + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: app1/hbase-m(1) + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(application1), ImmutableSet.of("hbase-m")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID()) + .get("hbase-m").longValue()); + + SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + AllocationTagNamespace namespace = + new AllocationTagNamespace.AppID(application1); + Map, PlacementConstraint> constraintMap = new HashMap<>(); + PlacementConstraint constraint2 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(), + "hbase-m")) + .build(); + Set srcTags2 = new HashSet<>(); + srcTags2.add("app2"); + constraintMap.put(srcTags2, constraint2); + + ts = System.currentTimeMillis(); + ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124); + pcm.registerApplication(application2, constraintMap); + + // Anti-affinity with app1/hbase-m so it should not be able to be placed + // onto n0 and n2 as they already have hbase-m allocated. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints( + application2, createSchedulingRequest(srcTags2), + schedulerNode3, pcm, tm)); + + // Intra-app constraint + // Test with default and empty namespace + AllocationTagNamespace self = new AllocationTagNamespace.Self(); + PlacementConstraint constraint3 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace(self.toString(), + "hbase-m")) + .build(); + Set srcTags3 = new HashSet<>(); + srcTags3.add("app3"); + constraintMap.put(srcTags3, constraint3); + + ts = System.currentTimeMillis(); + ApplicationId application3 = BuilderUtils.newApplicationId(ts, 124); + pcm.registerApplication(application3, constraintMap); + + /** + * Place container: + * n0: app1/hbase-m(1), app3/hbase-m + * n1: "" + * n2: app1/hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(application3), ImmutableSet.of("hbase-m")); + + // Anti-affinity to self/hbase-m + Assert.assertFalse(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil + .canSatisfyConstraints(application3, createSchedulingRequest(srcTags3), + schedulerNode3, pcm, tm)); + + pcm.unregisterApplication(application3); + } + + @Test + public void testInvalidAllocationTagNamespace() { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + rmContext.setAllocationTagsManager(tm); + rmContext.setPlacementConstraintManager(pcm); + + long ts = System.currentTimeMillis(); + ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123); + RMNode n0r1 = rmNodes.get(0); + SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + + PlacementConstraint constraint1 = PlacementConstraints + .targetNotIn(NODE, allocationTagWithNamespace("unknown_namespace", + "hbase-m")) + .build(); + Set srcTags1 = new HashSet<>(); + srcTags1.add("app1"); + + try { + PlacementConstraintsUtil.canSatisfyConstraints(application1, + createSchedulingRequest(srcTags1, constraint1), schedulerNode0, + pcm, tm); + Assert.fail("This should fail because we gave an invalid namespace"); + } catch (Exception e) { + Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException); + Assert.assertTrue(e.getMessage() + .contains("Invalid namespace prefix: unknown_namespace")); + } + } } \ No newline at end of file