YARN-8015. Support all types of placement constraint support for Capacity Scheduler. Contributed by Weiwei Yang.

This commit is contained in:
Sunil G 2018-08-23 10:05:43 +05:30
parent b021249ac8
commit 1ac01444a2
4 changed files with 513 additions and 221 deletions

View File

@ -19,18 +19,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableSet;
import org.apache.commons.collections.IteratorUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ResourceRequest;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
@ -48,12 +45,12 @@
import java.util.Collection;
import java.util.Collections;
import java.util.HashSet;
import java.util.Iterator;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION;
/**
@ -70,7 +67,6 @@ public class SingleConstraintAppPlacementAllocator<N extends SchedulerNode>
private SchedulingRequest schedulingRequest = null;
private String targetNodePartition;
private Set<String> targetAllocationTags;
private AllocationTagsManager allocationTagsManager;
private PlacementConstraintManager placementConstraintManager;
@ -239,135 +235,55 @@ private void validateAndSetSchedulingRequest(SchedulingRequest
"Only GUARANTEED execution type is supported.");
}
// Node partition
String nodePartition = null;
// Target allocation tags
Set<String> targetAllocationTags = null;
PlacementConstraint constraint =
newSchedulingRequest.getPlacementConstraint();
if (constraint != null) {
// We only accept SingleConstraint
PlacementConstraint.AbstractConstraint ac = constraint
.getConstraintExpr();
if (!(ac instanceof PlacementConstraint.SingleConstraint)) {
throwExceptionWithMetaInfo("Only accepts "
+ PlacementConstraint.SingleConstraint.class.getName()
+ " as constraint-expression. Rejecting the new added "
+ "constraint-expression.class=" + ac.getClass().getName());
}
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
// Make sure it is an anti-affinity request (actually this implementation
// should be able to support both affinity / anti-affinity without much
// effort. Considering potential test effort required. Limit to
// anti-affinity to intra-app and scope is node.
if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) {
throwExceptionWithMetaInfo(
"Only support scope=" + PlacementConstraints.NODE
+ "now. PlacementConstraint=" + singleConstraint);
}
if (singleConstraint.getMinCardinality() != 0
|| singleConstraint.getMaxCardinality() != 0) {
throwExceptionWithMetaInfo(
"Only support anti-affinity, which is: minCardinality=0, "
+ "maxCardinality=1");
}
Set<PlacementConstraint.TargetExpression> targetExpressionSet =
singleConstraint.getTargetExpressions();
if (targetExpressionSet == null || targetExpressionSet.isEmpty()) {
throwExceptionWithMetaInfo(
"TargetExpression should not be null or empty");
}
for (PlacementConstraint.TargetExpression targetExpression :
targetExpressionSet) {
// Handle node partition
if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) {
// For node attribute target, we only support Partition now. And once
// YARN-3409 is merged, we will support node attribute.
if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) {
throwExceptionWithMetaInfo("When TargetType="
+ PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE
+ " only " + NODE_PARTITION + " is accepted as TargetKey.");
}
if (nodePartition != null) {
// This means we have duplicated node partition entry
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one node partition targetExpression is allowed");
}
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
nodePartition = RMNodeLabelsManager.NO_LABEL;
continue;
}
if (values.size() > 1) {
throwExceptionWithMetaInfo("Inside one targetExpression, we only "
+ "support affinity to at most one node partition now");
}
nodePartition = values.iterator().next();
} else if (targetExpression.getTargetType().equals(
PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) {
// Handle allocation tags
if (targetAllocationTags != null) {
// This means we have duplicated AllocationTag expressions entries
// inside placement constraint, which might be set by mistake.
throwExceptionWithMetaInfo(
"Only one AllocationTag targetExpression is allowed");
}
if (targetExpression.getTargetValues() == null ||
targetExpression.getTargetValues().isEmpty()) {
throwExceptionWithMetaInfo("Failed to find allocation tags from "
+ "TargetExpressions or couldn't find self-app target.");
}
targetAllocationTags = new HashSet<>(
targetExpression.getTargetValues());
}
}
if (targetAllocationTags == null) {
// That means we don't have ALLOCATION_TAG specified
throwExceptionWithMetaInfo(
"Couldn't find target expression with type == ALLOCATION_TAG,"
+ " it is required to include one and only one target"
+ " expression with type == ALLOCATION_TAG");
}
}
// If this scheduling request doesn't contain a placement constraint,
// we set allocation tags an empty set.
if (targetAllocationTags == null) {
targetAllocationTags = ImmutableSet.of();
}
if (nodePartition == null) {
nodePartition = RMNodeLabelsManager.NO_LABEL;
}
// Validation is done. set local results:
this.targetNodePartition = nodePartition;
this.targetAllocationTags = targetAllocationTags;
this.targetNodePartition = validateAndGetTargetNodePartition(
newSchedulingRequest.getPlacementConstraint());
this.schedulingRequest = new SchedulingRequestPBImpl(
((SchedulingRequestPBImpl) newSchedulingRequest).getProto());
LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo
.getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils
.join(",", targetAllocationTags) + "]. nodePartition="
+ targetNodePartition);
LOG.info("Successfully added SchedulingRequest to app="
+ appSchedulingInfo.getApplicationAttemptId()
+ " placementConstraint=["
+ schedulingRequest.getPlacementConstraint()
+ "]. nodePartition=" + targetNodePartition);
}
// Tentatively find out potential exist node-partition in the placement
// constraint and set as the app's primary node-partition.
// Currently only single constraint is handled.
private String validateAndGetTargetNodePartition(
PlacementConstraint placementConstraint) {
String nodePartition = RMNodeLabelsManager.NO_LABEL;
if (placementConstraint != null &&
placementConstraint.getConstraintExpr() != null) {
PlacementConstraint.AbstractConstraint ac =
placementConstraint.getConstraintExpr();
if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) {
PlacementConstraint.SingleConstraint singleConstraint =
(PlacementConstraint.SingleConstraint) ac;
for (PlacementConstraint.TargetExpression targetExpression :
singleConstraint.getTargetExpressions()) {
// Handle node partition
if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) &&
targetExpression.getTargetKey().equals(NODE_PARTITION)) {
Set<String> values = targetExpression.getTargetValues();
if (values == null || values.isEmpty()) {
continue;
}
if (values.size() > 1) {
throwExceptionWithMetaInfo(
"Inside one targetExpression, we only support"
+ " affinity to at most one node partition now");
}
nodePartition = values.iterator().next();
if (nodePartition != null) {
break;
}
}
}
}
}
return nodePartition;
}
@Override
@ -515,11 +431,6 @@ String getTargetNodePartition() {
return targetNodePartition;
}
@VisibleForTesting
Set<String> getTargetAllocationTags() {
return targetAllocationTags;
}
@Override
public void initialize(AppSchedulingInfo appSchedulingInfo,
SchedulerRequestKey schedulerRequestKey, RMContext rmContext) {

View File

@ -513,6 +513,19 @@ public RMApp submitApp(int masterMemory) throws Exception {
return submitApp(masterMemory, false);
}
public RMApp submitApp(int masterMemory, Set<String> appTags)
throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
ResourceRequest amResourceRequest = ResourceRequest.newInstance(
Priority.newInstance(0), ResourceRequest.ANY, resource, 1);
return submitApp(Collections.singletonList(amResourceRequest), "",
UserGroupInformation.getCurrentUser().getShortUserName(), null, false,
null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS,
YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true,
false, false, null, 0, null, true, Priority.newInstance(0), null,
null, null, appTags);
}
public RMApp submitApp(int masterMemory, Priority priority) throws Exception {
Resource resource = Resource.newInstance(masterMemory, 0);
return submitApp(resource, "", UserGroupInformation.getCurrentUser()
@ -732,8 +745,23 @@ public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
Map<ApplicationTimeoutType, Long> applicationTimeouts,
ByteBuffer tokensConf)
throws Exception {
ByteBuffer tokensConf) throws Exception {
return submitApp(amResourceRequests, name, user, acls, unmanaged, queue,
maxAppAttempts, ts, appType, waitForAccepted, keepContainers,
isAppIdProvided, applicationId, attemptFailuresValidityInterval,
logAggregationContext, cancelTokensWhenComplete, priority, amLabel,
applicationTimeouts, tokensConf, null);
}
public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
String user, Map<ApplicationAccessType, String> acls, boolean unmanaged,
String queue, int maxAppAttempts, Credentials ts, String appType,
boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided,
ApplicationId applicationId, long attemptFailuresValidityInterval,
LogAggregationContext logAggregationContext,
boolean cancelTokensWhenComplete, Priority priority, String amLabel,
Map<ApplicationTimeoutType, Long> applicationTimeouts,
ByteBuffer tokensConf, Set<String> applicationTags) throws Exception {
ApplicationId appId = isAppIdProvided ? applicationId : null;
ApplicationClientProtocol client = getClientRMService();
if (! isAppIdProvided) {
@ -749,6 +777,9 @@ public RMApp submitApp(List<ResourceRequest> amResourceRequests, String name,
sub.setApplicationId(appId);
sub.setApplicationName(name);
sub.setMaxAppAttempts(maxAppAttempts);
if (applicationTags != null) {
sub.setApplicationTags(applicationTags);
}
if (applicationTimeouts != null && applicationTimeouts.size() > 0) {
sub.setApplicationTimeouts(applicationTimeouts);
}

View File

@ -26,12 +26,18 @@
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace;
import org.apache.hadoop.yarn.api.records.Priority;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.test.GenericTestUtils;
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.Container;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.MockAM;
import org.apache.hadoop.yarn.server.resourcemanager.MockNM;
import org.apache.hadoop.yarn.server.resourcemanager.MockRM;
@ -46,10 +52,24 @@
import org.junit.Before;
import org.junit.Test;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.HashSet;
import java.util.concurrent.ConcurrentMap;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn;
import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn;
public class TestSchedulingRequestContainerAllocation {
private final int GB = 1024;
private static final int GB = 1024;
private YarnConfiguration conf;
@ -435,8 +455,7 @@ public RMNodeLabelsManager createNodeLabelManager() {
CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler();
PlacementConstraint constraint = PlacementConstraints
.targetNotIn("node", allocationTag("t1"))
PlacementConstraint constraint = targetNotIn("node", allocationTag("t1"))
.build();
SchedulingRequest sc = SchedulingRequest
.newInstance(0, Priority.newInstance(1),
@ -477,4 +496,413 @@ public RMNodeLabelsManager createNodeLabelManager() {
rm1.close();
}
private void doNodeHeartbeat(MockNM... nms) throws Exception {
for (MockNM nm : nms) {
nm.nodeHeartbeat(true);
}
}
private List<Container> waitForAllocation(int allocNum, int timeout,
MockAM am, MockNM... nms) throws Exception {
final List<Container> result = new ArrayList<>();
GenericTestUtils.waitFor(() -> {
try {
AllocateResponse response = am.schedule();
List<Container> allocated = response.getAllocatedContainers();
System.out.println("Expecting allocation: " + allocNum
+ ", actual allocation: " + allocated.size());
for (Container c : allocated) {
System.out.println("Container " + c.getId().toString()
+ " is allocated on node: " + c.getNodeId().toString()
+ ", allocation tags: "
+ String.join(",", c.getAllocationTags()));
}
result.addAll(allocated);
if (result.size() == allocNum) {
return true;
}
doNodeHeartbeat(nms);
} catch (Exception e) {
e.printStackTrace();
}
return false;
}, 500, timeout);
return result;
}
private static SchedulingRequest schedulingRequest(int requestId,
int containers, int cores, int mem, PlacementConstraint constraint,
String... tags) {
return schedulingRequest(1, requestId, containers, cores, mem,
ExecutionType.GUARANTEED, constraint, tags);
}
private static SchedulingRequest schedulingRequest(
int priority, long allocReqId, int containers, int cores, int mem,
ExecutionType execType, PlacementConstraint constraint, String... tags) {
return SchedulingRequest.newBuilder()
.priority(Priority.newInstance(priority))
.allocationRequestId(allocReqId)
.allocationTags(new HashSet<>(Arrays.asList(tags)))
.executionType(ExecutionTypeRequest.newInstance(execType, true))
.resourceSizing(
ResourceSizing.newInstance(containers,
Resource.newInstance(mem, cores)))
.placementConstraintExpression(constraint)
.build();
}
private int getContainerNodesNum(List<Container> containers) {
Set<NodeId> nodes = new HashSet<>();
if (containers != null) {
containers.forEach(c -> nodes.add(c.getNodeId()));
}
return nodes.size();
}
@Test(timeout = 30000L)
public void testInterAppCompositeConstraints() throws Exception {
// This test both intra and inter app constraints.
// Including simple affinity, anti-affinity, cardinality constraints,
// and simple AND composite constraints.
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
try {
rm.start();
MockNM nm1 = rm.registerNode("192.168.0.1:1234", 100*GB, 100);
MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);
RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("hbase"));
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1);
// App1 (hbase)
// h1: hbase-master(1)
// h2: hbase-master(1)
// h3:
// h4:
// h5:
PlacementConstraint pc = targetNotIn("node",
allocationTag("hbase-master")).build();
am1.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 2, 1, 2048, pc, "hbase-master")));
List<Container> allocated = waitForAllocation(2, 3000, am1, nm1, nm2);
// 2 containers allocated
Assert.assertEquals(2, allocated.size());
// containers should be distributed on 2 different nodes
Assert.assertEquals(2, getContainerNodesNum(allocated));
// App1 (hbase)
// h1: hbase-rs(1), hbase-master(1)
// h2: hbase-rs(1), hbase-master(1)
// h3: hbase-rs(1)
// h4: hbase-rs(1)
// h5:
pc = targetNotIn("node", allocationTag("hbase-rs")).build();
am1.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(2, 4, 1, 1024, pc, "hbase-rs")));
allocated = waitForAllocation(4, 3000, am1, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(4, allocated.size());
Assert.assertEquals(4, getContainerNodesNum(allocated));
// App2 (web-server)
// Web server instance has 2 instance and non of them can be co-allocated
// with hbase-master.
RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("web-server"));
MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2);
// App2 (web-server)
// h1: hbase-rs(1), hbase-master(1)
// h2: hbase-rs(1), hbase-master(1)
// h3: hbase-rs(1), ws-inst(1)
// h4: hbase-rs(1), ws-inst(1)
// h5:
pc = and(
targetIn("node", allocationTagWithNamespace(
new TargetApplicationsNamespace.All().toString(),
"hbase-master")),
targetNotIn("node", allocationTag("ws-inst"))).build();
am2.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 2, 1, 2048, pc, "ws-inst")));
allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(2, allocated.size());
Assert.assertEquals(2, getContainerNodesNum(allocated));
ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
for (Container c : allocated) {
RMNode rmNode = rmNodes.get(c.getNodeId());
Assert.assertNotNull(rmNode);
Assert.assertTrue("If ws-inst is allocated to a node,"
+ " this node should have inherited the ws-inst tag ",
rmNode.getAllocationTagsWithCount().get("ws-inst") == 1);
Assert.assertTrue("ws-inst should be co-allocated to "
+ "hbase-master nodes",
rmNode.getAllocationTagsWithCount().get("hbase-master") == 1);
}
// App3 (ws-servant)
// App3 has multiple instances that must be co-allocated
// with app2 server instance, and each node cannot have more than
// 3 instances.
RMApp app3 = rm.submitApp(1*GB, ImmutableSet.of("ws-servants"));
MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm3);
// App3 (ws-servant)
// h1: hbase-rs(1), hbase-master(1)
// h2: hbase-rs(1), hbase-master(1)
// h3: hbase-rs(1), ws-inst(1), ws-servant(3)
// h4: hbase-rs(1), ws-inst(1), ws-servant(3)
// h5:
pc = and(
targetIn("node", allocationTagWithNamespace(
new TargetApplicationsNamespace.AppTag("web-server").toString(),
"ws-inst")),
cardinality("node", 0, 2, "ws-servant")).build();
am3.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 10, 1, 512, pc, "ws-servant")));
// total 6 containers can be allocated due to cardinality constraint
// each round, 2 containers can be allocated
allocated = waitForAllocation(6, 10000, am3, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(6, allocated.size());
Assert.assertEquals(2, getContainerNodesNum(allocated));
for (Container c : allocated) {
RMNode rmNode = rmNodes.get(c.getNodeId());
Assert.assertNotNull(rmNode);
Assert.assertTrue("Node has ws-servant allocated must have 3 instances",
rmNode.getAllocationTagsWithCount().get("ws-servant") == 3);
Assert.assertTrue("Every ws-servant container should be co-allocated"
+ " with ws-inst",
rmNode.getAllocationTagsWithCount().get("ws-inst") == 1);
}
} finally {
rm.stop();
}
}
@Test(timeout = 30000L)
public void testMultiAllocationTagsConstraints() throws Exception {
// This test simulates to use PC to avoid port conflicts
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
try {
rm.start();
MockNM nm1 = rm.registerNode("192.168.0.1:1234", 10*GB, 10);
MockNM nm2 = rm.registerNode("192.168.0.2:1234", 10*GB, 10);
MockNM nm3 = rm.registerNode("192.168.0.3:1234", 10*GB, 10);
MockNM nm4 = rm.registerNode("192.168.0.4:1234", 10*GB, 10);
MockNM nm5 = rm.registerNode("192.168.0.5:1234", 10*GB, 10);
RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("server1"));
// Allocate AM container on nm1
doNodeHeartbeat(nm1);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
// App1 uses ports: 7000, 8000 and 9000
String[] server1Ports =
new String[] {"port_6000", "port_7000", "port_8000"};
PlacementConstraint pc = targetNotIn("node",
allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
server1Ports))
.build();
am1.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 2, 1, 1024, pc, server1Ports)));
List<Container> allocated = waitForAllocation(2, 3000,
am1, nm1, nm2, nm3, nm4, nm5);
// 2 containers allocated
Assert.assertEquals(2, allocated.size());
// containers should be distributed on 2 different nodes
Assert.assertEquals(2, getContainerNodesNum(allocated));
// App1 uses ports: 6000
String[] server2Ports = new String[] {"port_6000"};
RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("server2"));
// Allocate AM container on nm1
doNodeHeartbeat(nm2);
RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
am2.registerAppAttempt();
pc = targetNotIn("node",
allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(),
server2Ports))
.build();
am2.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 3, 1, 1024, pc, server2Ports)));
allocated = waitForAllocation(3, 3000, am2, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(3, allocated.size());
Assert.assertEquals(3, getContainerNodesNum(allocated));
ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
for (Container c : allocated) {
RMNode rmNode = rmNodes.get(c.getNodeId());
Assert.assertNotNull(rmNode);
Assert.assertTrue("server2 should not co-allocate to server1 as"
+ " they both need to use port 6000",
rmNode.getAllocationTagsWithCount().get("port_6000") == 1);
Assert.assertFalse(rmNode.getAllocationTagsWithCount()
.containsKey("port_7000"));
Assert.assertFalse(rmNode.getAllocationTagsWithCount()
.containsKey("port_8000"));
}
} finally {
rm.stop();
}
}
@Test(timeout = 30000L)
public void testInterAppConstraintsWithNamespaces() throws Exception {
// This test verifies inter-app constraints with namespaces
// not-self/app-id/app-tag
YarnConfiguration config = new YarnConfiguration();
config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER,
YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER);
config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
MockRM rm = new MockRM(config);
try {
rm.start();
MockNM nm1 = rm.registerNode("192.168.0.1:1234:", 100*GB, 100);
MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100);
MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100);
MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100);
MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100);
ApplicationId app5Id = null;
Map<ApplicationId, List<Container>> allocMap = new HashMap<>();
// 10 apps and all containers are attached with foo tag
for (int i = 0; i<10; i++) {
// App1 ~ app5 tag "former5"
// App6 ~ app10 tag "latter5"
String applicationTag = i<5 ? "former5" : "latter5";
RMApp app = rm.submitApp(1*GB, ImmutableSet.of(applicationTag));
// Allocate AM container on nm1
doNodeHeartbeat(nm1, nm2, nm3, nm4, nm5);
RMAppAttempt attempt = app.getCurrentAppAttempt();
MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId());
am.registerAppAttempt();
PlacementConstraint pc = targetNotIn("node", allocationTag("foo"))
.build();
am.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 3, 1, 1024, pc, "foo")));
List<Container> allocated = waitForAllocation(3, 3000,
am, nm1, nm2, nm3, nm4, nm5);
// Memorize containers that has app5 foo
if (i == 5) {
app5Id = am.getApplicationAttemptId().getApplicationId();
}
allocMap.put(am.getApplicationAttemptId().getApplicationId(),
allocated);
}
Assert.assertNotNull(app5Id);
Assert.assertEquals(3, getContainerNodesNum(allocMap.get(app5Id)));
// *** app-id
// Submit another app, use app-id constraint against app5
RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("xyz"));
// Allocate AM container on nm1
doNodeHeartbeat(nm1);
RMAppAttempt attempt1 = app1.getCurrentAppAttempt();
MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId());
am1.registerAppAttempt();
PlacementConstraint pc = targetIn("node",
allocationTagWithNamespace(
new TargetApplicationsNamespace.AppID(app5Id).toString(),
"foo"))
.build();
am1.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 3, 1, 1024, pc, "foo")));
List<Container> allocated = waitForAllocation(3, 3000,
am1, nm1, nm2, nm3, nm4, nm5);
ConcurrentMap<NodeId, RMNode> rmNodes = rm.getRMContext().getRMNodes();
List<Container> app5Alloc = allocMap.get(app5Id);
for (Container c : allocated) {
RMNode rmNode = rmNodes.get(c.getNodeId());
Assert.assertNotNull(rmNode);
Assert.assertTrue("This app is affinity with app-id/app5/foo "
+ "containers",
app5Alloc.stream().anyMatch(
c5 -> c5.getNodeId() == c.getNodeId()));
}
// *** app-tag
RMApp app2 = rm.submitApp(1*GB);
// Allocate AM container on nm1
doNodeHeartbeat(nm2);
RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt();
MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId());
am2.registerAppAttempt();
pc = targetNotIn("node",
allocationTagWithNamespace(
new TargetApplicationsNamespace.AppTag("xyz").toString(),
"foo"))
.build();
am2.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 2, 1, 1024, pc, "foo")));
allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(2, allocated.size());
// none of them can be allocated to nodes that has app5 foo containers
for (Container c : app5Alloc) {
Assert.assertNotEquals(c.getNodeId(),
allocated.iterator().next().getNodeId());
}
// *** not-self
RMApp app3 = rm.submitApp(1*GB);
// Allocate AM container on nm1
doNodeHeartbeat(nm3);
RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt();
MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId());
am3.registerAppAttempt();
pc = cardinality("node",
new TargetApplicationsNamespace.NotSelf().toString(),
1, 1, "foo").build();
am3.addSchedulingRequest(
ImmutableList.of(
schedulingRequest(1, 1, 1, 1024, pc, "foo")));
allocated = waitForAllocation(1, 3000, am3, nm1, nm2, nm3, nm4, nm5);
Assert.assertEquals(1, allocated.size());
// All 5 containers should be allocated
Assert.assertTrue(rmNodes.get(allocated.iterator().next().getNodeId())
.getAllocationTagsWithCount().get("foo") == 2);
} finally {
rm.stop();
}
}
}

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement;
import com.google.common.collect.ImmutableSet;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags;
import org.apache.hadoop.yarn.api.records.ExecutionType;
import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest;
@ -131,8 +130,6 @@ public void testSchedulingRequestValidation() {
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Valid (with partition)
@ -147,8 +144,6 @@ public void testSchedulingRequestValidation() {
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
allocator.getTargetAllocationTags());
Assert.assertEquals("x", allocator.getTargetNodePartition());
// Valid (without specifying node partition)
@ -162,8 +157,6 @@ public void testSchedulingRequestValidation() {
.resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Valid (with application Id target)
@ -178,8 +171,6 @@ public void testSchedulingRequestValidation() {
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build());
// Allocation tags should not include application Id
Assert.assertEquals(ImmutableSet.of("mapper", "reducer"),
allocator.getTargetAllocationTags());
Assert.assertEquals("", allocator.getTargetNodePartition());
// Invalid (without sizing)
@ -200,75 +191,6 @@ public void testSchedulingRequestValidation() {
.targetNotIn(PlacementConstraints.NODE).build())
.build(), true);
// Invalid (with multiple allocation tags expression specified)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetNotIn(PlacementConstraints.NODE,
PlacementConstraints.PlacementTargets
.allocationTag("mapper"),
PlacementConstraints.PlacementTargets
.allocationTag("reducer"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build(), true);
// Invalid (with multiple node partition target expression specified)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetNotIn(PlacementConstraints.NODE,
PlacementConstraints.PlacementTargets
.allocationTag("mapper"),
PlacementConstraints.PlacementTargets
.allocationTag(""),
PlacementConstraints.PlacementTargets.nodePartition("x"))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build(), true);
// Invalid (not anti-affinity cardinality)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetCardinality(PlacementConstraints.NODE, 1, 2,
PlacementConstraints.PlacementTargets
.allocationTag("mapper"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build(), true);
// Invalid (not anti-affinity cardinality)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetCardinality(PlacementConstraints.NODE, 0, 2,
PlacementConstraints.PlacementTargets
.allocationTag("mapper"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build(), true);
// Invalid (not NODE scope)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED))
.allocationRequestId(10L).priority(Priority.newInstance(1))
.placementConstraintExpression(PlacementConstraints
.targetNotIn(PlacementConstraints.RACK,
PlacementConstraints.PlacementTargets
.allocationTag("mapper", "reducer"),
PlacementConstraints.PlacementTargets.nodePartition(""))
.build()).resourceSizing(
ResourceSizing.newInstance(1, Resource.newInstance(1024, 1)))
.build(), true);
// Invalid (not GUARANTEED)
assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType(
ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))