YARN-8013. Support application tags when defining application namespaces for placement constraints. Contributed by Weiwei Yang.

This commit is contained in:
Konstantinos Karanasos 2018-04-04 10:51:58 -07:00
parent 42cd367c93
commit 7853ec8d2f
12 changed files with 363 additions and 151 deletions

View File

@ -26,7 +26,7 @@ public enum AllocationTagNamespaceType {
SELF("self"),
NOT_SELF("not-self"),
APP_ID("app-id"),
APP_LABEL("app-label"),
APP_TAG("app-tag"),
ALL("all");
private String typeKeyword;

View File

@ -19,7 +19,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.Set;
@ -29,22 +28,34 @@
*/
public final class AllocationTags {
private AllocationTagNamespace ns;
private TargetApplicationsNamespace ns;
private Set<String> tags;
private ApplicationId applicationId;
private AllocationTags(AllocationTagNamespace namespace,
private AllocationTags(TargetApplicationsNamespace namespace,
Set<String> allocationTags) {
this.ns = namespace;
this.tags = allocationTags;
}
private AllocationTags(TargetApplicationsNamespace namespace,
Set<String> allocationTags, ApplicationId currentAppId) {
this.ns = namespace;
this.tags = allocationTags;
this.applicationId = currentAppId;
}
/**
* @return the namespace of these tags.
*/
public AllocationTagNamespace getNamespace() {
public TargetApplicationsNamespace getNamespace() {
return this.ns;
}
public ApplicationId getCurrentApplicationId() {
return this.applicationId;
}
/**
* @return the allocation tags.
*/
@ -55,28 +66,31 @@ public Set<String> getTags() {
@VisibleForTesting
public static AllocationTags createSingleAppAllocationTags(
ApplicationId appId, Set<String> tags) {
AllocationTagNamespace namespace = new AllocationTagNamespace.AppID(appId);
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.AppID(appId);
return new AllocationTags(namespace, tags);
}
@VisibleForTesting
public static AllocationTags createGlobalAllocationTags(Set<String> tags) {
AllocationTagNamespace namespace = new AllocationTagNamespace.All();
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.All();
return new AllocationTags(namespace, tags);
}
@VisibleForTesting
public static AllocationTags createOtherAppAllocationTags(
ApplicationId currentApp, Set<ApplicationId> allIds, Set<String> tags)
throws InvalidAllocationTagsQueryException {
AllocationTagNamespace namespace = new AllocationTagNamespace.NotSelf();
TargetApplications ta = new TargetApplications(currentApp, allIds);
namespace.evaluate(ta);
return new AllocationTags(namespace, tags);
ApplicationId currentApp, Set<String> tags) {
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.NotSelf();
return new AllocationTags(namespace, tags, currentApp);
}
public static AllocationTags newAllocationTags(
AllocationTagNamespace namespace, Set<String> tags) {
return new AllocationTags(namespace, tags);
public static AllocationTags createAllocationTags(
ApplicationId currentApplicationId, String namespaceString,
Set<String> tags) throws InvalidAllocationTagsQueryException {
TargetApplicationsNamespace namespace = TargetApplicationsNamespace
.parse(namespaceString);
return new AllocationTags(namespace, tags, currentApplicationId);
}
}

View File

@ -21,7 +21,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Maps;
import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.classification.InterfaceAudience;
@ -32,12 +31,14 @@
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.log4j.Logger;
import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.function.LongBinaryOperator;
@ -292,13 +293,21 @@ public AllocationTagsManager(RMContext context) {
/**
* Aggregates multiple {@link TypeToCountedTags} to a single one based on
* a given set of application IDs, the values are properly merged.
* the scope defined in the allocation tags, the values are properly merged.
*
* @param appIds a set of application IDs.
* @param allocationTags {@link AllocationTags}.
* @return an aggregated {@link TypeToCountedTags}.
*/
private TypeToCountedTags aggregateAllocationTags(Set<ApplicationId> appIds,
Map<ApplicationId, TypeToCountedTags> mapping) {
private TypeToCountedTags aggregateAllocationTags(
AllocationTags allocationTags,
Map<ApplicationId, TypeToCountedTags> mapping)
throws InvalidAllocationTagsQueryException {
// Based on the namespace type of the given allocation tags
TargetApplicationsNamespace namespace = allocationTags.getNamespace();
TargetApplications ta = new TargetApplications(
allocationTags.getCurrentApplicationId(), getApplicationIdToTags());
namespace.evaluate(ta);
Set<ApplicationId> appIds = namespace.getNamespaceScope();
TypeToCountedTags result = new TypeToCountedTags();
if (appIds != null) {
if (appIds.size() == 1) {
@ -571,9 +580,7 @@ public long getNodeCardinalityByOp(NodeId nodeId, AllocationTags tags,
mapping = globalNodeMapping;
} else {
// Aggregate app tags cardinality by applications.
mapping = aggregateAllocationTags(
tags.getNamespace().getNamespaceScope(),
perAppNodeMappings);
mapping = aggregateAllocationTags(tags, perAppNodeMappings);
}
return mapping == null ? 0 :
@ -618,9 +625,7 @@ public long getRackCardinalityByOp(String rack, AllocationTags tags,
mapping = globalRackMapping;
} else {
// Aggregates cardinality by rack.
mapping = aggregateAllocationTags(
tags.getNamespace().getNamespaceScope(),
perAppRackMappings);
mapping = aggregateAllocationTags(tags, perAppRackMappings);
}
return mapping == null ? 0 :
@ -642,10 +647,22 @@ public Map<String, Long> getAllocationTagsWithCount(NodeId nodeId) {
}
/**
* @return all application IDs in a set that currently visible by
* the allocation tags manager.
* @return all applications that is known to the
* {@link AllocationTagsManager}, along with their application tags.
* The result is a map, where key is an application ID, and value is the
* application-tags attached to this application. If there is no
* application-tag exists for the application, the value is an empty set.
*/
public Set<ApplicationId> getAllApplicationIds() {
return ImmutableSet.copyOf(perAppNodeMappings.keySet());
private Map<ApplicationId, Set<String>> getApplicationIdToTags() {
Map<ApplicationId, Set<String>> result = new HashMap<>();
ConcurrentMap<ApplicationId, RMApp> allApps = rmContext.getRMApps();
if (allApps != null) {
for (Map.Entry<ApplicationId, RMApp> app : allApps.entrySet()) {
if (perAppNodeMappings.containsKey(app.getKey())) {
result.put(app.getKey(), app.getValue().getApplicationTags());
}
}
}
return result;
}
}

View File

@ -24,7 +24,6 @@
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.classification.InterfaceAudience.Public;
import org.apache.hadoop.classification.InterfaceStability.Unstable;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
@ -57,35 +56,6 @@ public final class PlacementConstraintsUtil {
private PlacementConstraintsUtil() {
}
/**
* Try to the namespace of the allocation tags from the given target key.
*
* @param targetKey
* @return allocation tag namespace.
* @throws InvalidAllocationTagsQueryException
* if fail to parse the target key to a valid namespace.
*/
private static AllocationTagNamespace getAllocationTagNamespace(
ApplicationId currentAppId, String targetKey, AllocationTagsManager atm)
throws InvalidAllocationTagsQueryException {
// Parse to a valid namespace.
AllocationTagNamespace namespace = AllocationTagNamespace.parse(targetKey);
// TODO Complete remove this check once we support app-label.
if (AllocationTagNamespaceType.APP_LABEL
.equals(namespace.getNamespaceType())) {
throw new InvalidAllocationTagsQueryException(
namespace.toString() + " is not supported yet!");
}
// Evaluate the namespace according to the given target
// before it can be consumed.
TargetApplications ta =
new TargetApplications(currentAppId, atm.getAllApplicationIds());
namespace.evaluate(ta);
return namespace;
}
/**
* Returns true if <b>single</b> placement constraint with associated
* allocationTags and scope is satisfied by a specific scheduler Node.
@ -104,13 +74,10 @@ private static boolean canSatisfySingleConstraintExpression(
ApplicationId targetApplicationId, SingleConstraint sc,
TargetExpression te, SchedulerNode node, AllocationTagsManager tm)
throws InvalidAllocationTagsQueryException {
// Parse the allocation tag's namespace from the given target key,
// then evaluate the namespace and get its scope,
// which is represented by one or more application IDs.
AllocationTagNamespace namespace = getAllocationTagNamespace(
targetApplicationId, te.getTargetKey(), tm);
AllocationTags allocationTags = AllocationTags
.newAllocationTags(namespace, te.getTargetValues());
// Creates AllocationTags that will be further consumed by allocation
// tags manager for cardinality check.
AllocationTags allocationTags = AllocationTags.createAllocationTags(
targetApplicationId, te.getTargetKey(), te.getTargetValues());
long minScopeCardinality = 0;
long maxScopeCardinality = 0;

View File

@ -18,34 +18,77 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint;
import com.google.common.base.Strings;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
/**
* This class is used by
* {@link AllocationTagNamespace#evaluate(TargetApplications)} to evaluate
* {@link TargetApplicationsNamespace#evaluate(TargetApplications)} to evaluate
* a namespace.
*/
public class TargetApplications {
private ApplicationId currentAppId;
private Set<ApplicationId> allAppIds;
private Map<ApplicationId, Set<String>> allApps;
public TargetApplications(ApplicationId currentApplicationId,
Set<ApplicationId> allApplicationIds) {
this.currentAppId = currentApplicationId;
this.allAppIds = allApplicationIds;
allApps = new HashMap<>();
if (allApplicationIds != null) {
allApplicationIds.forEach(appId ->
allApps.put(appId, ImmutableSet.of()));
}
}
public TargetApplications(ApplicationId currentApplicationId,
Map<ApplicationId, Set<String>> allApplicationIds) {
this.currentAppId = currentApplicationId;
this.allApps = allApplicationIds;
}
public ApplicationId getCurrentApplicationId() {
return this.currentAppId;
}
public Set<ApplicationId> getAllApplicationIds() {
return this.allApps == null ?
ImmutableSet.of() : allApps.keySet();
}
public Set<ApplicationId> getOtherApplicationIds() {
return allAppIds == null ? null : allAppIds.stream().filter(appId ->
!appId.equals(getCurrentApplicationId()))
if (getAllApplicationIds() == null
|| getAllApplicationIds().isEmpty()) {
return ImmutableSet.of();
}
return getAllApplicationIds()
.stream()
.filter(appId -> !appId.equals(getCurrentApplicationId()))
.collect(Collectors.toSet());
}
public Set<ApplicationId> getApplicationIdsByTag(String applicationTag) {
Set<ApplicationId> result = new HashSet<>();
if (Strings.isNullOrEmpty(applicationTag)
|| this.allApps == null) {
return result;
}
for (Map.Entry<ApplicationId, Set<String>> app
: this.allApps.entrySet()) {
if (app.getValue() != null
&& app.getValue().contains(applicationTag)) {
result.add(app.getKey());
}
}
return result;
}
}

View File

@ -31,17 +31,18 @@
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.NOT_SELF;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_LABEL;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_TAG;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.APP_ID;
import static org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType.ALL;
/**
* Class to describe the namespace of an allocation tag.
* Each namespace can be evaluated against a set of applications.
* After evaluation, the namespace should have an implicit set of
* applications which defines its scope.
* Class to describe the namespace of allocation tags, used by
* {@link AllocationTags}. Each namespace can be evaluated against
* a target set applications, represented by {@link TargetApplications}.
* After evaluation, the namespace is interpreted to be a set of
* applications based on the namespace type.
*/
public abstract class AllocationTagNamespace implements
public abstract class TargetApplicationsNamespace implements
Evaluable<TargetApplications> {
public final static String NAMESPACE_DELIMITER = "/";
@ -50,7 +51,7 @@ public abstract class AllocationTagNamespace implements
// Namespace scope value will be delay binding by eval method.
private Set<ApplicationId> nsScope;
public AllocationTagNamespace(AllocationTagNamespaceType
public TargetApplicationsNamespace(AllocationTagNamespaceType
allocationTagNamespaceType) {
this.nsType = allocationTagNamespaceType;
}
@ -107,7 +108,7 @@ public String toString() {
/**
* Namespace within application itself.
*/
public static class Self extends AllocationTagNamespace {
public static class Self extends TargetApplicationsNamespace {
public Self() {
super(SELF);
@ -128,7 +129,7 @@ public void evaluate(TargetApplications target)
/**
* Namespace to all applications except itself.
*/
public static class NotSelf extends AllocationTagNamespace {
public static class NotSelf extends TargetApplicationsNamespace {
private ApplicationId applicationId;
@ -160,7 +161,7 @@ public void evaluate(TargetApplications target) {
/**
* Namespace to all applications in the cluster.
*/
public static class All extends AllocationTagNamespace {
public static class All extends TargetApplicationsNamespace {
public All() {
super(ALL);
@ -168,24 +169,32 @@ public All() {
}
/**
* Namespace to all applications in the cluster.
* Namespace to applications that attached with a certain application tag.
*/
public static class AppLabel extends AllocationTagNamespace {
public static class AppTag extends TargetApplicationsNamespace {
public AppLabel() {
super(APP_LABEL);
private String applicationTag;
public AppTag(String appTag) {
super(APP_TAG);
this.applicationTag = appTag;
}
@Override
public void evaluate(TargetApplications target) {
// TODO Implement app-label namespace evaluation
setScopeIfNotNull(target.getApplicationIdsByTag(applicationTag));
}
@Override
public String toString() {
return APP_TAG.toString() + NAMESPACE_DELIMITER + this.applicationTag;
}
}
/**
* Namespace defined by a certain application ID.
*/
public static class AppID extends AllocationTagNamespace {
public static class AppID extends TargetApplicationsNamespace {
private ApplicationId targetAppId;
// app-id namespace requires an extra value of an application id.
@ -206,11 +215,11 @@ public String toString() {
* defined by each {@link AllocationTagNamespaceType}.
*
* @param namespaceStr namespace string.
* @return an instance of {@link AllocationTagNamespace}.
* @return an instance of {@link TargetApplicationsNamespace}.
* @throws InvalidAllocationTagsQueryException
* if given string is not in valid format
*/
public static AllocationTagNamespace parse(String namespaceStr)
public static TargetApplicationsNamespace parse(String namespaceStr)
throws InvalidAllocationTagsQueryException {
// Return the default namespace if no valid string is given.
if (Strings.isNullOrEmpty(namespaceStr)) {
@ -238,8 +247,13 @@ public static AllocationTagNamespace parse(String namespaceStr)
}
String appIDStr = nsValues.get(1);
return parseAppID(appIDStr);
case APP_LABEL:
return new AppLabel();
case APP_TAG:
if (nsValues.size() != 2) {
throw new InvalidAllocationTagsQueryException(
"Missing the application tag in the namespace string: "
+ namespaceStr);
}
return new AppTag(nsValues.get(1));
default:
throw new InvalidAllocationTagsQueryException(
"Invalid namespace string " + namespaceStr);
@ -263,7 +277,7 @@ private static AllocationTagNamespaceType fromString(String prefix) throws
+ ", valid values are: " + String.join(",", values));
}
private static AllocationTagNamespace parseAppID(String appIDStr)
private static TargetApplicationsNamespace parseAppID(String appIDStr)
throws InvalidAllocationTagsQueryException {
try {
ApplicationId applicationId = ApplicationId.fromString(appIDStr);

View File

@ -23,8 +23,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
@ -334,25 +332,6 @@ private void validateAndSetSchedulingRequest(SchedulingRequest
targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues());
try {
AllocationTagNamespace tagNS =
AllocationTagNamespace.parse(targetExpression.getTargetKey());
if (AllocationTagNamespaceType.APP_LABEL
.equals(tagNS.getNamespaceType())) {
throwExceptionWithMetaInfo(
"As of now, allocation tag namespace ["
+ AllocationTagNamespaceType.APP_LABEL.toString()
+ "] is not supported. Please make changes to placement "
+ "constraints accordingly. If this is null, it will be "
+ "set to "
+ AllocationTagNamespaceType.SELF.toString()
+ " by default.");
}
} catch (InvalidAllocationTagsQueryException e) {
throwExceptionWithMetaInfo(
"Invalid allocation tag namespace, message: " + e.getMessage());
}
}
}

View File

@ -68,6 +68,7 @@ public class MockRMApp implements RMApp {
RMAppAttempt attempt;
int maxAppAttempts = 1;
List<ResourceRequest> amReqs;
private Set<String> applicationTags = null;
public MockRMApp(int newid, long time, RMAppState newState) {
finish = time;
@ -82,6 +83,12 @@ public MockRMApp(int newid, long time, RMAppState newState, String userName) {
user = userName;
}
public MockRMApp(int newid, long time, RMAppState newState,
String userName, Set<String> appTags) {
this(newid, time, newState, userName);
this.applicationTags = appTags;
}
public MockRMApp(int newid, long time, RMAppState newState, String userName, String diag) {
this(newid, time, newState, userName);
this.diagnostics = new StringBuilder(diag);
@ -248,7 +255,7 @@ public String getApplicationType() {
@Override
public Set<String> getApplicationTags() {
return null;
return this.applicationTags;
}
@Override

View File

@ -20,7 +20,7 @@
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagNamespace;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
@ -294,7 +294,8 @@ public RMNodeLabelsManager createNodeLabelManager() {
// App2 asks for 3 containers that anti-affinity with any mapper,
// since 3 out of 4 nodes already have mapper containers, all 3
// containers will be allocated on the other node.
AllocationTagNamespace.All allNs = new AllocationTagNamespace.All();
TargetApplicationsNamespace.All allNs =
new TargetApplicationsNamespace.All();
am2.allocateAppAntiAffinity(
ResourceSizing.newInstance(3, Resource.newInstance(1024, 1)),
Priority.newInstance(1), 1L, allNs.toString(),

View File

@ -27,13 +27,19 @@
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
/**
* Test functionality of AllocationTagsManager.
@ -468,15 +474,27 @@ public void testQueryCardinalityWithIllegalParameters() {
@Test
public void testNodeAllocationTagsAggregation()
throws InvalidAllocationTagsQueryException {
RMContext mockContext = Mockito.spy(rmContext);
AllocationTagsManager atm = new AllocationTagsManager(rmContext);
ApplicationId app1 = TestUtils.getMockApplicationId(1);
ApplicationId app2 = TestUtils.getMockApplicationId(2);
ApplicationId app3 = TestUtils.getMockApplicationId(3);
NodeId host1 = NodeId.fromString("host1:123");
NodeId host2 = NodeId.fromString("host2:123");
NodeId host3 = NodeId.fromString("host3:123");
ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>();
allApps.put(app1, new MockRMApp(123, 1000,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(app2, new MockRMApp(124, 1001,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(app3, new MockRMApp(125, 1002,
RMAppState.NEW, "userA", ImmutableSet.of("")));
Mockito.when(mockContext.getRMApps()).thenReturn(allApps);
AllocationTagsManager atm = new AllocationTagsManager(mockContext);
/**
* Node1 (rack0)
* app1/A(2)
@ -561,7 +579,7 @@ public void testNodeAllocationTagsAggregation()
*
*/
tags = AllocationTags.createOtherAppAllocationTags(app1,
ImmutableSet.of(app1, app2, app3), ImmutableSet.of("A", "B"));
ImmutableSet.of("A", "B"));
Assert.assertEquals(4, atm.getNodeCardinalityByOp(host1, tags, Long::max));
Assert.assertEquals(0, atm.getNodeCardinalityByOp(host1, tags, Long::min));

View File

@ -16,50 +16,67 @@
* limitations under the License.
*/
import com.google.common.collect.ImmutableSet;
import com.google.common.collect.Sets;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.junit.Assert;
import org.junit.Test;
import java.util.HashMap;
import java.util.Map;
import java.util.Set;
/**
* Test class for {@link AllocationTagNamespace}.
* Test class for {@link TargetApplicationsNamespace}.
*/
public class TestAllocationTagsNamespace {
@Test
public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
AllocationTagNamespace namespace;
TargetApplicationsNamespace namespace;
String namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.SELF,
namespace.getNamespaceType());
namespaceStr = "not-self";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.NOT_SELF,
namespace.getNamespaceType());
namespaceStr = "all";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.ALL,
namespace.getNamespaceType());
namespaceStr = "app-label";
namespace = AllocationTagNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.APP_LABEL,
namespaceStr = "app-tag/spark-jobs";
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.APP_TAG,
namespace.getNamespaceType());
// Invalid app-tag namespace syntax
try {
namespaceStr = "app-tag/tag123/tag234";
TargetApplicationsNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the given namespace is invalid");
} catch (Exception e) {
e.printStackTrace();
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
Assert.assertTrue(e.getMessage().startsWith(
"Invalid namespace string"));
}
ApplicationId applicationId = ApplicationId.newInstance(12345, 1);
namespaceStr = "app-id/" + applicationId.toString();
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.APP_ID,
namespace.getNamespaceType());
// Invalid app-id namespace syntax, invalid app ID.
try {
namespaceStr = "app-id/apppppp_12345_99999";
AllocationTagNamespace.parse(namespaceStr);
TargetApplicationsNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the given app ID is invalid");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
@ -70,7 +87,7 @@ public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
// Invalid app-id namespace syntax, missing app ID.
try {
namespaceStr = "app-id";
AllocationTagNamespace.parse(namespaceStr);
TargetApplicationsNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the given namespace"
+ " is missing application ID");
} catch (Exception e) {
@ -82,7 +99,7 @@ public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
// Invalid namespace type.
try {
namespaceStr = "non_exist_ns";
AllocationTagNamespace.parse(namespaceStr);
TargetApplicationsNamespace.parse(namespaceStr);
Assert.fail("Parsing should fail as the giving type is not supported.");
} catch (Exception e) {
Assert.assertTrue(e instanceof InvalidAllocationTagsQueryException);
@ -94,7 +111,7 @@ public void testNamespaceParse() throws InvalidAllocationTagsQueryException {
@Test
public void testNamespaceEvaluation() throws
InvalidAllocationTagsQueryException {
AllocationTagNamespace namespace;
TargetApplicationsNamespace namespace;
TargetApplications targetApplications;
ApplicationId app1 = ApplicationId.newInstance(10000, 1);
ApplicationId app2 = ApplicationId.newInstance(10000, 2);
@ -104,7 +121,7 @@ public void testNamespaceEvaluation() throws
// Ensure eval is called before using the scope.
String namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
try {
namespace.getNamespaceScope();
Assert.fail("Call getNamespaceScope before evaluate is not allowed.");
@ -115,14 +132,14 @@ public void testNamespaceEvaluation() throws
}
namespaceStr = "self";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
namespace.evaluate(targetApplications);
Assert.assertEquals(1, namespace.getNamespaceScope().size());
Assert.assertEquals(app1, namespace.getNamespaceScope().iterator().next());
namespaceStr = "not-self";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1, ImmutableSet.of(app1));
namespace.evaluate(targetApplications);
Assert.assertEquals(0, namespace.getNamespaceScope().size());
@ -134,16 +151,52 @@ public void testNamespaceEvaluation() throws
Assert.assertFalse(namespace.getNamespaceScope().contains(app1));
namespaceStr = "all";
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
Assert.assertEquals(AllocationTagNamespaceType.ALL,
namespace.getNamespaceType());
namespaceStr = "app-id/" + app2.toString();
namespace = AllocationTagNamespace.parse(namespaceStr);
namespace = TargetApplicationsNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1,
ImmutableSet.of(app1, app2, app3, app4, app5));
namespace.evaluate(targetApplications);
Assert.assertEquals(1, namespace.getNamespaceScope().size());
Assert.assertEquals(app2, namespace.getNamespaceScope().iterator().next());
/**
* App to Application Tags
* app1: A, B
* app2: A
* app3:
* app4: C
* app5: A, B, C
*/
Map<ApplicationId, Set<String>> appsWithTags = new HashMap<>();
appsWithTags.put(app1, ImmutableSet.of("A", "B"));
appsWithTags.put(app2, ImmutableSet.of("A"));
appsWithTags.put(app3, ImmutableSet.of());
appsWithTags.put(app4, ImmutableSet.of("C"));
appsWithTags.put(app5, ImmutableSet.of("A", "B", "C"));
namespaceStr = "app-tag/A";
namespace = TargetApplicationsNamespace.parse(namespaceStr);
targetApplications = new TargetApplications(app1, appsWithTags);
namespace.evaluate(targetApplications);
Assert.assertEquals(3, namespace.getNamespaceScope().size());
Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(),
ImmutableSet.of(app1, app2, app5)).isEmpty());
namespaceStr = "app-tag/B";
namespace = TargetApplicationsNamespace.parse(namespaceStr);
namespace.evaluate(targetApplications);
Assert.assertEquals(2, namespace.getNamespaceScope().size());
Assert.assertTrue(Sets.difference(namespace.getNamespaceScope(),
ImmutableSet.of(app1, app5)).isEmpty());
// Not exist
namespaceStr = "app-tag/xyz";
namespace = TargetApplicationsNamespace.parse(namespaceStr);
namespace.evaluate(targetApplications);
Assert.assertEquals(0, namespace.getNamespaceScope().size());
}
}

View File

@ -37,6 +37,8 @@
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import java.util.concurrent.atomic.AtomicLong;
@ -52,6 +54,9 @@
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.MockRMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
import org.apache.hadoop.yarn.server.resourcemanager.MockNodes;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -63,6 +68,7 @@
import org.junit.Test;
import com.google.common.collect.ImmutableSet;
import org.mockito.Mockito;
/**
* Test the PlacementConstraint Utility class functionality.
@ -562,8 +568,8 @@ public void testGlobalAppConstraints()
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
AllocationTagNamespace namespaceAll =
new AllocationTagNamespace.All();
TargetApplicationsNamespace namespaceAll =
new TargetApplicationsNamespace.All();
//***************************
// 1) all, anti-affinity
@ -648,17 +654,28 @@ application3, createSchedulingRequest(srcTags3),
@Test
public void testNotSelfAppConstraints()
throws InvalidAllocationTagsQueryException {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
rmContext.setAllocationTagsManager(tm);
rmContext.setPlacementConstraintManager(pcm);
long ts = System.currentTimeMillis();
ApplicationId application1 = BuilderUtils.newApplicationId(ts, 100);
ApplicationId application2 = BuilderUtils.newApplicationId(ts, 101);
ApplicationId application3 = BuilderUtils.newApplicationId(ts, 102);
ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>();
allApps.put(application1, new MockRMApp(123, 1000,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(application2, new MockRMApp(124, 1001,
RMAppState.NEW, "userA", ImmutableSet.of("")));
allApps.put(application3, new MockRMApp(125, 1002,
RMAppState.NEW, "userA", ImmutableSet.of("")));
RMContext mockedContext = Mockito.spy(rmContext);
when(mockedContext.getRMApps()).thenReturn(allApps);
AllocationTagsManager tm = new AllocationTagsManager(mockedContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
mockedContext.setAllocationTagsManager(tm);
mockedContext.setPlacementConstraintManager(pcm);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
@ -696,8 +713,8 @@ public void testNotSelfAppConstraints()
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
AllocationTagNamespace notSelf =
new AllocationTagNamespace.NotSelf();
TargetApplicationsNamespace notSelf =
new TargetApplicationsNamespace.NotSelf();
//***************************
// 1) not-self, app1
@ -800,8 +817,8 @@ public void testInterAppConstraintsByAppID()
SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
AllocationTagNamespace namespace =
new AllocationTagNamespace.AppID(application1);
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.AppID(application1);
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
PlacementConstraint constraint2 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
@ -832,7 +849,7 @@ application2, createSchedulingRequest(srcTags2),
// Intra-app constraint
// Test with default and empty namespace
AllocationTagNamespace self = new AllocationTagNamespace.Self();
TargetApplicationsNamespace self = new TargetApplicationsNamespace.Self();
PlacementConstraint constraint3 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(self.toString(),
"hbase-m"))
@ -872,6 +889,88 @@ application2, createSchedulingRequest(srcTags2),
pcm.unregisterApplication(application3);
}
@Test
public void testInterAppConstriantsByAppTag()
throws InvalidAllocationTagsQueryException {
ApplicationId application1 = BuilderUtils.newApplicationId(1000, 123);
ApplicationId application2 = BuilderUtils.newApplicationId(1001, 124);
// app1: test-tag
// app2: N/A
RMContext mockedContext = Mockito.spy(rmContext);
ConcurrentMap<ApplicationId, RMApp> allApps = new ConcurrentHashMap<>();
allApps.put(application1, new MockRMApp(123, 1000,
RMAppState.NEW, "userA", ImmutableSet.of("test-tag")));
allApps.put(application2, new MockRMApp(124, 1001,
RMAppState.NEW, "userA", ImmutableSet.of("")));
when(mockedContext.getRMApps()).thenReturn(allApps);
AllocationTagsManager tm = new AllocationTagsManager(mockedContext);
PlacementConstraintManagerService pcm =
new MemoryPlacementConstraintManager();
mockedContext.setAllocationTagsManager(tm);
mockedContext.setPlacementConstraintManager(pcm);
// Register App1 with anti-affinity constraint map.
RMNode n0r1 = rmNodes.get(0);
RMNode n1r1 = rmNodes.get(1);
RMNode n2r2 = rmNodes.get(2);
RMNode n3r2 = rmNodes.get(3);
/**
* Place container:
* n0: app1/hbase-m(1)
* n1: ""
* n2: app1/hbase-m(1)
* n3: ""
*/
tm.addContainer(n0r1.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
tm.addContainer(n2r2.getNodeID(),
newContainerId(application1), ImmutableSet.of("hbase-m"));
SchedulerNode schedulerNode0 = newSchedulerNode(n0r1.getHostName(),
n0r1.getRackName(), n0r1.getNodeID());
SchedulerNode schedulerNode1 = newSchedulerNode(n1r1.getHostName(),
n1r1.getRackName(), n1r1.getNodeID());
SchedulerNode schedulerNode2 = newSchedulerNode(n2r2.getHostName(),
n2r2.getRackName(), n2r2.getNodeID());
SchedulerNode schedulerNode3 = newSchedulerNode(n3r2.getHostName(),
n3r2.getRackName(), n3r2.getNodeID());
TargetApplicationsNamespace namespace =
new TargetApplicationsNamespace.AppTag("test-tag");
Map<Set<String>, PlacementConstraint> constraintMap = new HashMap<>();
PlacementConstraint constraint2 = PlacementConstraints
.targetNotIn(NODE, allocationTagWithNamespace(namespace.toString(),
"hbase-m"))
.build();
Set<String> srcTags2 = ImmutableSet.of("app2");
constraintMap.put(srcTags2, constraint2);
pcm.registerApplication(application2, constraintMap);
// Anti-affinity with app-tag/test-tag/hbase-m,
// app1 has tag "test-tag" so the constraint is equally to work on app1
// onto n1 and n3 as they don't have "hbase-m" from app1.
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode0, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode1, pcm, tm));
Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode2, pcm, tm));
Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(
application2, createSchedulingRequest(srcTags2),
schedulerNode3, pcm, tm));
pcm.unregisterApplication(application1);
pcm.unregisterApplication(application2);
}
@Test
public void testInvalidAllocationTagNamespace() {
AllocationTagsManager tm = new AllocationTagsManager(rmContext);