YARN-7972. Support inter-app placement constraints for allocation tags by application ID. (Weiwei Yang via asuresh)

This commit is contained in:
Arun Suresh 2018-03-05 11:24:17 -08:00
parent 2e1e049bd0
commit 1054b48c27
13 changed files with 1001 additions and 23 deletions

View File

@ -0,0 +1,336 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
import java.util.ArrayList;
import java.util.List;
import java.util.Set;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.fromString;
/**
* Class to describe the namespace of an allocation tag.
* Each namespace can be evaluated against a set of applications.
* After evaluation, the namespace should have an implicit set of
* applications which defines its scope.
*/
public abstract class AllocationTagNamespace implements
Evaluable<TargetApplications> {
public final static String NAMESPACE_DELIMITER = "/";
private AllocationTagNamespaceType nsType;
// Namespace scope value will be delay binding by eval method.
private Set<ApplicationId> nsScope;
public AllocationTagNamespace(AllocationTagNamespaceType
allocationTagNamespaceType) {
this.nsType = allocationTagNamespaceType;
}
protected void setScopeIfNotNull(Set<ApplicationId> appIds) {
if (appIds != null) {
this.nsScope = appIds;
}
}
/**
* Get the type of the namespace.
* @return namespace type.
*/
public AllocationTagNamespaceType getNamespaceType() {
return nsType;
}
/**
* Get the scope of the namespace, in form of a set of applications.
* Before calling this method, {@link #evaluate(TargetApplications)}
* must be called in prior to ensure the scope is proper evaluated.
*
* @return a set of applications.
*/
public Set<ApplicationId> getNamespaceScope() {
if (this.nsScope == null) {
throw new IllegalStateException("Invalid namespace scope,"
+ " it is not initialized. Evaluate must be called before"
+ " a namespace can be consumed.");
}
return this.nsScope;
}
@Override
public abstract void evaluate(TargetApplications target)
throws InvalidAllocationTagException;
/**
* @return true if the namespace is effective in all applications
* in this cluster. Specifically the namespace prefix should be
* "all".
*/
public boolean isGlobal() {
return AllocationTagNamespaceType.ALL.equals(getNamespaceType());
}
/**
* @return true if the namespace is effective within a single application
* by its application ID, the namespace prefix should be "app-id";
* false otherwise.
*/
public boolean isSingleInterApp() {
return AllocationTagNamespaceType.APP_ID.equals(getNamespaceType());
}
/**
* @return true if the namespace is effective to the application itself,
* the namespace prefix should be "self"; false otherwise.
*/
public boolean isIntraApp() {
return AllocationTagNamespaceType.SELF.equals(getNamespaceType());
}
/**
* @return true if the namespace is effective to all applications except
* itself, the namespace prefix should be "not-self"; false otherwise.
*/
public boolean isNotSelf() {
return AllocationTagNamespaceType.NOT_SELF.equals(getNamespaceType());
}
/**
* @return true if the namespace is effective to a group of applications
* identified by a application label, the namespace prefix should be
* "app-label"; false otherwise.
*/
public boolean isAppLabel() {
return AllocationTagNamespaceType.APP_LABEL.equals(getNamespaceType());
}
@Override
public String toString() {
return this.nsType.toString();
}
/**
* Namespace within application itself.
*/
public static class Self extends AllocationTagNamespace {
public Self() {
super(SELF);
}
@Override
public void evaluate(TargetApplications target)
throws InvalidAllocationTagException {
if (target == null || target.getCurrentApplicationId() == null) {
throw new InvalidAllocationTagException("Namespace Self must"
+ " be evaluated against a single application ID.");
}
ApplicationId applicationId = target.getCurrentApplicationId();
setScopeIfNotNull(ImmutableSet.of(applicationId));
}
}
/**
* Namespace to all applications except itself.
*/
public static class NotSelf extends AllocationTagNamespace {
private ApplicationId applicationId;
public NotSelf() {
super(NOT_SELF);
}
/**
* The scope of self namespace is to an application itself,
* the application ID can be delay binding to the namespace.
*
* @param appId application ID.
*/
public void setApplicationId(ApplicationId appId) {
this.applicationId = appId;
}
public ApplicationId getApplicationId() {
return this.applicationId;
}
@Override
public void evaluate(TargetApplications target) {
Set<ApplicationId> otherAppIds = target.getOtherApplicationIds();
setScopeIfNotNull(otherAppIds);
}
}
/**
* Namespace to all applications in the cluster.
*/
public static class All extends AllocationTagNamespace {
public All() {
super(ALL);
}
@Override
public void evaluate(TargetApplications target) {
Set<ApplicationId> allAppIds = target.getAllApplicationIds();
setScopeIfNotNull(allAppIds);
}
}
/**
* Namespace to all applications in the cluster.
*/
public static class AppLabel extends AllocationTagNamespace {
public AppLabel() {
super(APP_LABEL);
}
@Override
public void evaluate(TargetApplications target) {
// TODO Implement app-label namespace evaluation
}
}
/**
* Namespace defined by a certain application ID.
*/
public static class AppID extends AllocationTagNamespace {
private ApplicationId targetAppId;
// app-id namespace requires an extra value of an application id.
public AppID(ApplicationId applicationId) {
super(APP_ID);
this.targetAppId = applicationId;
}
@Override
public void evaluate(TargetApplications target) {
setScopeIfNotNull(ImmutableSet.of(targetAppId));
}
@Override
public String toString() {
return APP_ID.toString() + NAMESPACE_DELIMITER + this.targetAppId;
}
}
/**
* Parse namespace from a string. The string must be in legal format
* defined by each {@link AllocationTagNamespaceType}.
*
* @param namespaceStr namespace string.
* @return an instance of {@link AllocationTagNamespace}.
* @throws InvalidAllocationTagException
* if given string is not in valid format
*/
public static AllocationTagNamespace parse(String namespaceStr)
throws InvalidAllocationTagException {
// Return the default namespace if no valid string is given.
if (Strings.isNullOrEmpty(namespaceStr)) {
return new Self();
}
// Normalize the input, escape additional chars.
List<String> nsValues = normalize(namespaceStr);
// The first string should be the prefix.
String nsPrefix = nsValues.get(0);
AllocationTagNamespaceType allocationTagNamespaceType =
fromString(nsPrefix);
switch (allocationTagNamespaceType) {
case SELF:
return new Self();
case NOT_SELF:
return new NotSelf();
case ALL:
return new All();
case APP_ID:
if (nsValues.size() != 2) {
throw new InvalidAllocationTagException(
"Missing the application ID in the namespace string: "
+ namespaceStr);
}
String appIDStr = nsValues.get(1);
return parseAppID(appIDStr);
case APP_LABEL:
return new AppLabel();
default:
throw new InvalidAllocationTagException(
"Invalid namespace string " + namespaceStr);
}
}
private static AllocationTagNamespace parseAppID(String appIDStr)
throws InvalidAllocationTagException {
try {
ApplicationId applicationId = ApplicationId.fromString(appIDStr);
return new AppID(applicationId);
} catch (IllegalArgumentException e) {
throw new InvalidAllocationTagException(
"Invalid application ID for "
+ APP_ID.getTypeKeyword() + ": " + appIDStr);
}
}
/**
* Valid given namespace string and parse it to a list of sub-strings
* that can be consumed by the parser according to the type of the
* namespace. Currently the size of return list should be either 1 or 2.
* Extra slash is escaped during the normalization.
*
* @param namespaceStr namespace string.
* @return a list of parsed strings.
* @throws InvalidAllocationTagException
* if namespace format is unexpected.
*/
private static List<String> normalize(String namespaceStr)
throws InvalidAllocationTagException {
List<String> result = new ArrayList<>();
if (namespaceStr == null) {
return result;
}
String[] nsValues = namespaceStr.split(NAMESPACE_DELIMITER);
for (String str : nsValues) {
if (!Strings.isNullOrEmpty(str)) {
result.add(str);
}
}
// Currently we only allow 1 or 2 values for a namespace string
if (result.size() == 0 || result.size() > 2) {
throw new InvalidAllocationTagException("Invalid namespace string: "
+ namespaceStr + ", the syntax is <namespace_prefix> or"
+ " <namespace_prefix>/<namespace_value>");
}
return result;
}
}

View File

@ -0,0 +1,74 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
import java.util.Arrays;
import java.util.Set;
import java.util.stream.Collectors;
/**
* Class to describe all supported forms of namespaces for an allocation tag.
*/
public enum AllocationTagNamespaceType {
SELF("self"),
NOT_SELF("not-self"),
APP_ID("app-id"),
APP_LABEL("app-label"),
ALL("all");
private String typeKeyword;
AllocationTagNamespaceType(String keyword) {
this.typeKeyword = keyword;
}
public String getTypeKeyword() {
return this.typeKeyword;
}
/**
* Parses the namespace type from a given string.
* @param prefix namespace prefix.
* @return namespace type.
* @throws InvalidAllocationTagException
*/
public static AllocationTagNamespaceType fromString(String prefix) throws
InvalidAllocationTagException {
for (AllocationTagNamespaceType type :
AllocationTagNamespaceType.values()) {
if(type.getTypeKeyword().equals(prefix)) {
return type;
}
}
Set<String> values = Arrays.stream(AllocationTagNamespaceType.values())
.map(AllocationTagNamespaceType::toString)
.collect(Collectors.toSet());
throw new InvalidAllocationTagException(
"Invalid namespace prefix: " + prefix
+ ", valid values are: " + String.join(",", values));
}
@Override
public String toString() {
return this.getTypeKeyword();
}
}

View File

@ -0,0 +1,50 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.Set;
/**
* Allocation tags under same namespace.
*/
public class AllocationTags {
private AllocationTagNamespace ns;
private Set<String> tags;
public AllocationTags(AllocationTagNamespace namespace,
Set<String> allocationTags) {
this.ns = namespace;
this.tags = allocationTags;
}
/**
* @return the namespace of these tags.
*/
public AllocationTagNamespace getNamespace() {
return this.ns;
}
/**
* @return the allocation tags.
*/
public Set<String> getTags() {
return this.tags;
}
}

View File

@ -0,0 +1,38 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import org.apache.hadoop.yarn.exceptions.YarnException;
/**
* A class implements Evaluable interface represents the internal state
* of the class can be changed against a given target.
* @param <T> a target to evaluate against
*/
public interface Evaluable<T> {
/**
* Evaluate against a given target, this process changes the internal state
* of current class.
*
* @param target a generic type target that impacts this evaluation.
* @throws YarnException
*/
void evaluate(T target) throws YarnException;
}

View File

@ -0,0 +1,53 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.api.records;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is used by
* {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate
* a namespace.
*/
public class TargetApplications {
private ApplicationId currentAppId;
private Set<ApplicationId> allAppIds;
public TargetApplications(ApplicationId currentApplicationId,
Set<ApplicationId> allApplicationIds) {
this.currentAppId = currentApplicationId;
this.allAppIds = allApplicationIds;
}
public Set<ApplicationId> getAllApplicationIds() {
return this.allAppIds;
}
public ApplicationId getCurrentApplicationId() {
return this.currentAppId;
}
public Set<ApplicationId> getOtherApplicationIds() {
return allAppIds == null ? null : allAppIds.stream().filter(appId ->
!appId.equals(getCurrentApplicationId()))
.collect(Collectors.toSet());
}
}

View File

@ -20,9 +20,9 @@
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.DelayedOr;
@ -50,13 +50,6 @@ private PlacementConstraints() {
public static final String RACK = PlacementConstraint.RACK_SCOPE; public static final String RACK = PlacementConstraint.RACK_SCOPE;
public static final String NODE_PARTITION = "yarn_node_partition/"; public static final String NODE_PARTITION = "yarn_node_partition/";
private static final String APPLICATION_LABEL_PREFIX =
"yarn_application_label/";
@InterfaceAudience.Private
public static final String APPLICATION_LABEL_INTRA_APPLICATION =
APPLICATION_LABEL_PREFIX + "%intra_app%";
/** /**
* Creates a constraint that requires allocations to be placed on nodes that * Creates a constraint that requires allocations to be placed on nodes that
* satisfy all target expressions within the given scope (e.g., node or rack). * satisfy all target expressions within the given scope (e.g., node or rack).
@ -223,6 +216,20 @@ public static TargetExpression allocationTag(String... allocationTags) {
allocationTags); allocationTags);
} }
/**
* Constructs a target expression on a set of allocation tags under
* a certain namespace.
*
* @param namespace namespace of the allocation tags
* @param allocationTags allocation tags
* @return a target expression
*/
public static TargetExpression allocationTagWithNamespace(String namespace,
String... allocationTags) {
return new TargetExpression(TargetType.ALLOCATION_TAG,
namespace, allocationTags);
}
/** /**
* Constructs a target expression on an allocation tag. It is satisfied if * Constructs a target expression on an allocation tag. It is satisfied if
* there are allocations with one of the given tags. Comparing to * there are allocations with one of the given tags. Comparing to
@ -235,8 +242,9 @@ public static TargetExpression allocationTag(String... allocationTags) {
*/ */
public static TargetExpression allocationTagToIntraApp( public static TargetExpression allocationTagToIntraApp(
String... allocationTags) { String... allocationTags) {
AllocationTagNamespace selfNs = new AllocationTagNamespace.Self();
return new TargetExpression(TargetType.ALLOCATION_TAG, return new TargetExpression(TargetType.ALLOCATION_TAG,
APPLICATION_LABEL_INTRA_APPLICATION, allocationTags); selfNs.toString(), allocationTags);
} }
} }

View File

@ -0,0 +1,34 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.exceptions;
/**
* This exception is thrown by
* {@link
* org.apache.hadoop.yarn.api.records.AllocationTagNamespace#parse(String)}
* when it fails to parse a namespace.
*/
public class InvalidAllocationTagException extends YarnException {
private static final long serialVersionUID = 1L;
public InvalidAllocationTagException(String message) {
super(message);
}
}

View File

@ -21,6 +21,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.annotations.VisibleForTesting; import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
@ -561,4 +562,12 @@ public long getRackCardinalityByOp(String rack, ApplicationId applicationId,
public Map<String, Long> getAllocationTagsWithCount(NodeId nodeId) { public Map<String, Long> getAllocationTagsWithCount(NodeId nodeId) {
return globalNodeMapping.getTypeToTagsWithCount().get(nodeId); return globalNodeMapping.getTypeToTagsWithCount().get(nodeId);
} }
/**
* @return all application IDs in a set that currently visible by
* the allocation tags manager.
*/
public Set<ApplicationId> getAllApplicationIds() {
return ImmutableSet.copyOf(perAppNodeMappings.keySet());
}
} }

View File

@ -32,4 +32,8 @@ public class InvalidAllocationTagsQueryException extends YarnException {
public InvalidAllocationTagsQueryException(String msg) { public InvalidAllocationTagsQueryException(String msg) {
super(msg); super(msg);
} }
public InvalidAllocationTagsQueryException(YarnException e) {
super(e);
}
} }

View File

@ -24,8 +24,11 @@
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public; import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable; import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.TargetApplications;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
@ -35,6 +38,7 @@
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType;
import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer; import org.apache.hadoop.yarn.api.resource.PlacementConstraintTransformations.SingleConstraintTransformer;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
@ -56,6 +60,53 @@ public final class PlacementConstraintsUtil {
private PlacementConstraintsUtil() { private PlacementConstraintsUtil() {
} }
/**
* Try to the namespace of the allocation tags from the given target key.
*
* @param targetKey
* @return allocation tag namespace.
* @throws InvalidAllocationTagsQueryException
* if fail to parse the target key to a valid namespace.
*/
private static AllocationTagNamespace getAllocationTagNamespace(
ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
throws InvalidAllocationTagException{
// Parse to a valid namespace.
AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
// TODO remove such check once we support all forms of namespaces
if (!namespace.isIntraApp() && !namespace.isSingleInterApp()) {
throw new InvalidAllocationTagException(
"Only support " + AllocationTagNamespaceType.SELF.toString()
+ " and "+ AllocationTagNamespaceType.APP_ID + " now,"
+ namespace.toString() + " is not supported yet!");
}
// Evaluate the namespace according to the given target
// before it can be consumed.
TargetApplications ta = new TargetApplications(currentAppId,
atm.getAllApplicationIds());
namespace.evaluate(ta);
return namespace;
}
// We return a single app Id now, because at present,
// only self and app-id namespace is supported. But moving on,
// this will return a set of application IDs.
// TODO support other forms of namespaces
private static ApplicationId getNamespaceScope(
AllocationTagNamespace namespace)
throws InvalidAllocationTagException {
if (namespace.getNamespaceScope() == null
|| namespace.getNamespaceScope().size() != 1) {
throw new InvalidAllocationTagException(
"Invalid allocation tag namespace " + namespace.toString()
+ ", expecting it is not null and only 1 application"
+ " ID in the scope.");
}
return namespace.getNamespaceScope().iterator().next();
}
/** /**
* Returns true if <b>single</b> placement constraint with associated * Returns true if <b>single</b> placement constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node. * allocationTags and scope is satisfied by a specific scheduler Node.
@ -74,6 +125,18 @@ private static boolean canSatisfySingleConstraintExpression(
ApplicationId targetApplicationId, SingleConstraint sc, ApplicationId targetApplicationId, SingleConstraint sc,
TargetExpression te, SchedulerNode node, AllocationTagsManager tm) TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
throws InvalidAllocationTagsQueryException { throws InvalidAllocationTagsQueryException {
// Parse the allocation tag's namespace from the given target key,
// then evaluate the namespace and get its scope,
// which is represented by one or more application IDs.
ApplicationId effectiveAppID;
try {
AllocationTagNamespace namespace = getAllocationTagNamespace(
targetApplicationId, te.getTargetKey(), tm);
effectiveAppID = getNamespaceScope(namespace);
} catch (InvalidAllocationTagException e) {
throw new InvalidAllocationTagsQueryException(e);
}
long minScopeCardinality = 0; long minScopeCardinality = 0;
long maxScopeCardinality = 0; long maxScopeCardinality = 0;
@ -86,20 +149,20 @@ private static boolean canSatisfySingleConstraintExpression(
if (sc.getScope().equals(PlacementConstraints.NODE)) { if (sc.getScope().equals(PlacementConstraints.NODE)) {
if (checkMinCardinality) { if (checkMinCardinality) {
minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), minScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
targetApplicationId, te.getTargetValues(), Long::max); effectiveAppID, te.getTargetValues(), Long::max);
} }
if (checkMaxCardinality) { if (checkMaxCardinality) {
maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(), maxScopeCardinality = tm.getNodeCardinalityByOp(node.getNodeID(),
targetApplicationId, te.getTargetValues(), Long::min); effectiveAppID, te.getTargetValues(), Long::min);
} }
} else if (sc.getScope().equals(PlacementConstraints.RACK)) { } else if (sc.getScope().equals(PlacementConstraints.RACK)) {
if (checkMinCardinality) { if (checkMinCardinality) {
minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), minScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
targetApplicationId, te.getTargetValues(), Long::max); effectiveAppID, te.getTargetValues(), Long::max);
} }
if (checkMaxCardinality) { if (checkMaxCardinality) {
maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(), maxScopeCardinality = tm.getRackCardinalityByOp(node.getRackName(),
targetApplicationId, te.getTargetValues(), Long::min); effectiveAppID, te.getTargetValues(), Long::min);
} }
} }

View File

@ -23,6 +23,8 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.ResourceSizing;
@ -30,6 +32,7 @@
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -53,7 +56,6 @@
import java.util.Set; import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.APPLICATION_LABEL_INTRA_APPLICATION;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
/** /**
@ -220,7 +222,8 @@ private String throwExceptionWithMetaInfo(String message) {
throw new SchedulerInvalidResoureRequestException(sb.toString()); throw new SchedulerInvalidResoureRequestException(sb.toString());
} }
private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequest) private void validateAndSetSchedulingRequest(SchedulingRequest
newSchedulingRequest)
throws SchedulerInvalidResoureRequestException { throws SchedulerInvalidResoureRequestException {
// Check sizing exists // Check sizing exists
if (newSchedulingRequest.getResourceSizing() == null if (newSchedulingRequest.getResourceSizing() == null
@ -333,15 +336,23 @@ private void validateAndSetSchedulingRequest(SchedulingRequest newSchedulingRequ
targetAllocationTags = new HashSet<>( targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues()); targetExpression.getTargetValues());
if (targetExpression.getTargetKey() != null && !targetExpression try {
.getTargetKey().equals(APPLICATION_LABEL_INTRA_APPLICATION)) { AllocationTagNamespace tagNS =
AllocationTagNamespace.parse(targetExpression.getTargetKey());
if (!AllocationTagNamespaceType.SELF
.equals(tagNS.getNamespaceType())) {
throwExceptionWithMetaInfo(
"As of now, the only accepted target key for targetKey of "
+ "allocation_tag target expression is: ["
+ AllocationTagNamespaceType.SELF.toString()
+ "]. Please make changes to placement constraints "
+ "accordingly. If this is null, it will be set to "
+ AllocationTagNamespaceType.SELF.toString()
+ " by default.");
}
} catch (InvalidAllocationTagException e) {
throwExceptionWithMetaInfo( throwExceptionWithMetaInfo(
"As of now, the only accepted target key for targetKey of " "Invalid allocation tag namespace, message: " + e.getMessage());
+ "allocation_tag target expression is: ["
+ APPLICATION_LABEL_INTRA_APPLICATION
+ "]. Please make changes to placement constraints "
+ "accordingly. If this is null, it will be set to "
+ APPLICATION_LABEL_INTRA_APPLICATION + " by default.");
} }
} }
} }

View File

@ -0,0 +1,147 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; /**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.TargetApplications;
import org.apache.hadoop.yarn.exceptions.InvalidAllocationTagException;
import org.junit.Assert;
import org.junit.Test;
/**
* Test class for {@link AllocationTagNamespace}.
*/
public class TestAllocationTagsNamespace {
@Test
public void testNamespaceParse() throws InvalidAllocationTagException {
AllocationTagNamespace namespace;
String namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertTrue(namespace.isIntraApp());
namespaceStr = "not-self";
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertTrue(namespace.isNotSelf());
namespaceStr = "all";
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertTrue(namespace.isGlobal());
namespaceStr = "app-label";
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertTrue(namespace.isAppLabel());
ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
namespaceStr = "app-id/" + applicationId.toString();
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertTrue(namespace.isSingleInterApp());
// Invalid app-id namespace syntax, invalid app ID.
try {
namespaceStr = "app-id/apppppp_12345_99999";
AllocationTagNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the given app ID is invalid");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagException);
Assert.assertTrue(e.getMessage().startsWith(
"Invalid application ID for app-id"));
}
// Invalid app-id namespace syntax, missing app ID.
try {
namespaceStr = "app-id";
AllocationTagNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the given namespace"
+ " is missing application ID");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagException);
Assert.assertTrue(e.getMessage().startsWith(
"Missing the application ID in the namespace string"));
}
// Invalid namespace type.
try {
namespaceStr = "non_exist_ns";
AllocationTagNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the giving type is not supported.");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagException);
Assert.assertTrue(e.getMessage().startsWith(
"Invalid namespace prefix"));
}
}
@Test
public void testNamespaceEvaluation() throws InvalidAllocationTagException {
AllocationTagNamespace namespace;
TargetApplications targetApplications;
ApplicationId app1 = ApplicationId.newInstance(10000, 1);
ApplicationId app2 = ApplicationId.newInstance(10000, 2);
ApplicationId app3 = ApplicationId.newInstance(10000, 3);
ApplicationId app4 = ApplicationId.newInstance(10000, 4);
ApplicationId app5 = ApplicationId.newInstance(10000, 5);
// Ensure eval is called before using the scope.
String namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
try {
namespace.getNamespaceScope();
Assert.fail("Call getNamespaceScope before evaluate is not allowed.");
} catch (Exception e) {
Assert.assertTrue(e instanceof IllegalStateException);
Assert.assertTrue(e.getMessage().contains(
"Evaluate must be called before a namespace can be consumed."));
}
namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
namespace.evaluate(targetApplications);
Assert.assertEquals(1, namespace.getNamespaceScope().size());
Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next());
namespaceStr = "not-self";
namespace = AllocationTagNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
namespace.evaluate(targetApplications);
Assert.assertEquals(0, namespace.getNamespaceScope().size());
targetApplications = new TargetApplications(app1,
ImmutableSet.of(app1, app2, app3));
namespace.evaluate(targetApplications);
Assert.assertEquals(2, namespace.getNamespaceScope().size());
Assert.assertFalse(namespace.getNamespaceScope().contains(app1));
namespaceStr = "all";
namespace = AllocationTagNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(null,
ImmutableSet.of(app1, app2));
namespace.evaluate(targetApplications);
Assert.assertEquals(2, namespace.getNamespaceScope().size());
namespaceStr = "app-id/" + app2.toString();
namespace = AllocationTagNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1,
ImmutableSet.of(app1, app2, app3, app4, app5));
namespace.evaluate(targetApplications);
Assert.assertEquals(1, namespace.getNamespaceScope().size());
Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next());
}
}

View File

@ -18,6 +18,7 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint; package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
@ -30,6 +31,7 @@
import java.util.AbstractMap; import java.util.AbstractMap;
import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap;
import java.util.HashSet; import java.util.HashSet;
import java.util.Iterator; import java.util.Iterator;
import java.util.List; import java.util.List;
@ -39,6 +41,7 @@
import java.util.stream.Stream; import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicLong;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ApplicationId;
@ -508,4 +511,152 @@ public void testANDConstraintAssignment()
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1,
createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm));
} }
@Test
public void testInterAppConstraintsByAppID()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/hbase-m(1)
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID())
.get("hbase-m").longValue());
Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID())
.get("hbase-m").longValue());
SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
AllocationTagNamespace namespace =
new AllocationTagNamespace.AppID(application1);
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
PlacementConstraint constraint2 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
"hbase-m"))
.build();
Set<String> srcTags2 = new HashSet<>();
srcTags2.add("app2");
constraintMap.put(srcTags2, constraint2);
ts = System.currentTimeMillis();
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 124);
pcm.registerApplication(application2, constraintMap);
// Anti-affinity with app1/hbase-m so it should not be able to be placed
// onto n0 and n2 as they already have hbase-m allocated.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
// Intra-app constraint
// Test with default and empty namespace
AllocationTagNamespace self = new AllocationTagNamespace.Self();
PlacementConstraint constraint3 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(self.toString(),
"hbase-m"))
.build();
Set<String> srcTags3 = new HashSet<>();
srcTags3.add("app3");
constraintMap.put(srcTags3, constraint3);
ts = System.currentTimeMillis();
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 124);
pcm.registerApplication(application3, constraintMap);
/**
* Place container:
* n0: app1/hbase-m(1), app3/hbase-m
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application3), ImmutableSet.of("hbase-m"));
// Anti-affinity to self/hbase-m
Assert.assertFalse(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode1, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil
.canSatisfyConstraints(application3, createSchedulingRequest(srcTags3),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application3);
}
@Test
public void testInvalidAllocationTagNamespace() {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 123);
RMNode n0r1 = rmNodes.get(0);
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
PlacementConstraint constraint1 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace("unknown_namespace",
"hbase-m"))
.build();
Set<String> srcTags1 = new HashSet<>();
srcTags1.add("app1");
try {
PlacementConstraintsUtil.canSatisfyConstraints(application1,
createSchedulingRequest(srcTags1, constraint1), schedulerNode0,
pcm, tm);
Assert.fail("This should fail because we gave an invalid namespace");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
Assert.assertTrue(e.getMessage()
.contains("Invalid namespace prefix: unknown_namespace"));
}
}
} }