diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java index 914f35d7bf..54e4666729 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/SingleConstraintAppPlacementAllocator.java @@ -19,18 +19,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; import com.google.common.annotations.VisibleForTesting; -import com.google.common.collect.ImmutableSet; import org.apache.commons.collections.IteratorUtils; import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; -import org.apache.hadoop.util.StringUtils; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.records.impl.pb.SchedulingRequestPBImpl; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.exceptions.SchedulerInvalidResoureRequestException; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; @@ -48,12 +45,12 @@ import java.util.Collection; import java.util.Collections; -import java.util.HashSet; import java.util.Iterator; import java.util.Map; import java.util.Set; import java.util.concurrent.locks.ReentrantReadWriteLock; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE_PARTITION; /** @@ -70,7 +67,6 @@ public class SingleConstraintAppPlacementAllocator private SchedulingRequest schedulingRequest = null; private String targetNodePartition; - private Set targetAllocationTags; private AllocationTagsManager allocationTagsManager; private PlacementConstraintManager placementConstraintManager; @@ -239,135 +235,55 @@ private void validateAndSetSchedulingRequest(SchedulingRequest "Only GUARANTEED execution type is supported."); } - // Node partition - String nodePartition = null; - // Target allocation tags - Set targetAllocationTags = null; - - PlacementConstraint constraint = - newSchedulingRequest.getPlacementConstraint(); - - if (constraint != null) { - // We only accept SingleConstraint - PlacementConstraint.AbstractConstraint ac = constraint - .getConstraintExpr(); - if (!(ac instanceof PlacementConstraint.SingleConstraint)) { - throwExceptionWithMetaInfo("Only accepts " - + PlacementConstraint.SingleConstraint.class.getName() - + " as constraint-expression. Rejecting the new added " - + "constraint-expression.class=" + ac.getClass().getName()); - } - - PlacementConstraint.SingleConstraint singleConstraint = - (PlacementConstraint.SingleConstraint) ac; - - // Make sure it is an anti-affinity request (actually this implementation - // should be able to support both affinity / anti-affinity without much - // effort. Considering potential test effort required. Limit to - // anti-affinity to intra-app and scope is node. - if (!singleConstraint.getScope().equals(PlacementConstraints.NODE)) { - throwExceptionWithMetaInfo( - "Only support scope=" + PlacementConstraints.NODE - + "now. PlacementConstraint=" + singleConstraint); - } - - if (singleConstraint.getMinCardinality() != 0 - || singleConstraint.getMaxCardinality() != 0) { - throwExceptionWithMetaInfo( - "Only support anti-affinity, which is: minCardinality=0, " - + "maxCardinality=1"); - } - - Set targetExpressionSet = - singleConstraint.getTargetExpressions(); - if (targetExpressionSet == null || targetExpressionSet.isEmpty()) { - throwExceptionWithMetaInfo( - "TargetExpression should not be null or empty"); - } - - for (PlacementConstraint.TargetExpression targetExpression : - targetExpressionSet) { - // Handle node partition - if (targetExpression.getTargetType().equals( - PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE)) { - // For node attribute target, we only support Partition now. And once - // YARN-3409 is merged, we will support node attribute. - if (!targetExpression.getTargetKey().equals(NODE_PARTITION)) { - throwExceptionWithMetaInfo("When TargetType=" - + PlacementConstraint.TargetExpression.TargetType.NODE_ATTRIBUTE - + " only " + NODE_PARTITION + " is accepted as TargetKey."); - } - - if (nodePartition != null) { - // This means we have duplicated node partition entry - // inside placement constraint, which might be set by mistake. - throwExceptionWithMetaInfo( - "Only one node partition targetExpression is allowed"); - } - - Set values = targetExpression.getTargetValues(); - if (values == null || values.isEmpty()) { - nodePartition = RMNodeLabelsManager.NO_LABEL; - continue; - } - - if (values.size() > 1) { - throwExceptionWithMetaInfo("Inside one targetExpression, we only " - + "support affinity to at most one node partition now"); - } - - nodePartition = values.iterator().next(); - } else if (targetExpression.getTargetType().equals( - PlacementConstraint.TargetExpression.TargetType.ALLOCATION_TAG)) { - // Handle allocation tags - if (targetAllocationTags != null) { - // This means we have duplicated AllocationTag expressions entries - // inside placement constraint, which might be set by mistake. - throwExceptionWithMetaInfo( - "Only one AllocationTag targetExpression is allowed"); - } - - if (targetExpression.getTargetValues() == null || - targetExpression.getTargetValues().isEmpty()) { - throwExceptionWithMetaInfo("Failed to find allocation tags from " - + "TargetExpressions or couldn't find self-app target."); - } - - targetAllocationTags = new HashSet<>( - targetExpression.getTargetValues()); - } - } - - if (targetAllocationTags == null) { - // That means we don't have ALLOCATION_TAG specified - throwExceptionWithMetaInfo( - "Couldn't find target expression with type == ALLOCATION_TAG," - + " it is required to include one and only one target" - + " expression with type == ALLOCATION_TAG"); - } - } - - // If this scheduling request doesn't contain a placement constraint, - // we set allocation tags an empty set. - if (targetAllocationTags == null) { - targetAllocationTags = ImmutableSet.of(); - } - - if (nodePartition == null) { - nodePartition = RMNodeLabelsManager.NO_LABEL; - } - - // Validation is done. set local results: - this.targetNodePartition = nodePartition; - this.targetAllocationTags = targetAllocationTags; - + this.targetNodePartition = validateAndGetTargetNodePartition( + newSchedulingRequest.getPlacementConstraint()); this.schedulingRequest = new SchedulingRequestPBImpl( ((SchedulingRequestPBImpl) newSchedulingRequest).getProto()); - LOG.info("Successfully added SchedulingRequest to app=" + appSchedulingInfo - .getApplicationAttemptId() + " targetAllocationTags=[" + StringUtils - .join(",", targetAllocationTags) + "]. nodePartition=" - + targetNodePartition); + + LOG.info("Successfully added SchedulingRequest to app=" + + appSchedulingInfo.getApplicationAttemptId() + + " placementConstraint=[" + + schedulingRequest.getPlacementConstraint() + + "]. nodePartition=" + targetNodePartition); + } + + // Tentatively find out potential exist node-partition in the placement + // constraint and set as the app's primary node-partition. + // Currently only single constraint is handled. + private String validateAndGetTargetNodePartition( + PlacementConstraint placementConstraint) { + String nodePartition = RMNodeLabelsManager.NO_LABEL; + if (placementConstraint != null && + placementConstraint.getConstraintExpr() != null) { + PlacementConstraint.AbstractConstraint ac = + placementConstraint.getConstraintExpr(); + if (ac != null && ac instanceof PlacementConstraint.SingleConstraint) { + PlacementConstraint.SingleConstraint singleConstraint = + (PlacementConstraint.SingleConstraint) ac; + for (PlacementConstraint.TargetExpression targetExpression : + singleConstraint.getTargetExpressions()) { + // Handle node partition + if (targetExpression.getTargetType().equals(NODE_ATTRIBUTE) && + targetExpression.getTargetKey().equals(NODE_PARTITION)) { + Set values = targetExpression.getTargetValues(); + if (values == null || values.isEmpty()) { + continue; + } + if (values.size() > 1) { + throwExceptionWithMetaInfo( + "Inside one targetExpression, we only support" + + " affinity to at most one node partition now"); + } + nodePartition = values.iterator().next(); + if (nodePartition != null) { + break; + } + } + } + } + } + return nodePartition; } @Override @@ -515,11 +431,6 @@ String getTargetNodePartition() { return targetNodePartition; } - @VisibleForTesting - Set getTargetAllocationTags() { - return targetAllocationTags; - } - @Override public void initialize(AppSchedulingInfo appSchedulingInfo, SchedulerRequestKey schedulerRequestKey, RMContext rmContext) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java index eb4c626f95..2ad439189f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/MockRM.java @@ -513,6 +513,19 @@ public RMApp submitApp(int masterMemory) throws Exception { return submitApp(masterMemory, false); } + public RMApp submitApp(int masterMemory, Set appTags) + throws Exception { + Resource resource = Resource.newInstance(masterMemory, 0); + ResourceRequest amResourceRequest = ResourceRequest.newInstance( + Priority.newInstance(0), ResourceRequest.ANY, resource, 1); + return submitApp(Collections.singletonList(amResourceRequest), "", + UserGroupInformation.getCurrentUser().getShortUserName(), null, false, + null, super.getConfig().getInt(YarnConfiguration.RM_AM_MAX_ATTEMPTS, + YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS), null, null, true, + false, false, null, 0, null, true, Priority.newInstance(0), null, + null, null, appTags); + } + public RMApp submitApp(int masterMemory, Priority priority) throws Exception { Resource resource = Resource.newInstance(masterMemory, 0); return submitApp(resource, "", UserGroupInformation.getCurrentUser() @@ -732,8 +745,23 @@ public RMApp submitApp(List amResourceRequests, String name, LogAggregationContext logAggregationContext, boolean cancelTokensWhenComplete, Priority priority, String amLabel, Map applicationTimeouts, - ByteBuffer tokensConf) - throws Exception { + ByteBuffer tokensConf) throws Exception { + return submitApp(amResourceRequests, name, user, acls, unmanaged, queue, + maxAppAttempts, ts, appType, waitForAccepted, keepContainers, + isAppIdProvided, applicationId, attemptFailuresValidityInterval, + logAggregationContext, cancelTokensWhenComplete, priority, amLabel, + applicationTimeouts, tokensConf, null); + } + + public RMApp submitApp(List amResourceRequests, String name, + String user, Map acls, boolean unmanaged, + String queue, int maxAppAttempts, Credentials ts, String appType, + boolean waitForAccepted, boolean keepContainers, boolean isAppIdProvided, + ApplicationId applicationId, long attemptFailuresValidityInterval, + LogAggregationContext logAggregationContext, + boolean cancelTokensWhenComplete, Priority priority, String amLabel, + Map applicationTimeouts, + ByteBuffer tokensConf, Set applicationTags) throws Exception { ApplicationId appId = isAppIdProvided ? applicationId : null; ApplicationClientProtocol client = getClientRMService(); if (! isAppIdProvided) { @@ -749,6 +777,9 @@ public RMApp submitApp(List amResourceRequests, String name, sub.setApplicationId(appId); sub.setApplicationName(name); sub.setMaxAppAttempts(maxAppAttempts); + if (applicationTags != null) { + sub.setApplicationTags(applicationTags); + } if (applicationTimeouts != null && applicationTimeouts.size() > 0) { sub.setApplicationTimeouts(applicationTimeouts); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java index f23fd8f06f..26c709f12a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestSchedulingRequestContainerAllocation.java @@ -26,12 +26,18 @@ import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; -import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.TargetApplicationsNamespace; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; +import org.apache.hadoop.yarn.api.records.AllocationTagNamespaceType; +import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.Container; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttempt; import org.apache.hadoop.yarn.server.resourcemanager.MockAM; import org.apache.hadoop.yarn.server.resourcemanager.MockNM; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; @@ -46,10 +52,24 @@ import org.junit.Before; import org.junit.Test; -import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.*; +import java.util.ArrayList; +import java.util.Arrays; +import java.util.HashMap; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.HashSet; +import java.util.concurrent.ConcurrentMap; + +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTagWithNamespace; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.cardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; public class TestSchedulingRequestContainerAllocation { - private final int GB = 1024; + private static final int GB = 1024; private YarnConfiguration conf; @@ -435,8 +455,7 @@ public RMNodeLabelsManager createNodeLabelManager() { CapacityScheduler cs = (CapacityScheduler) rm1.getResourceScheduler(); - PlacementConstraint constraint = PlacementConstraints - .targetNotIn("node", allocationTag("t1")) + PlacementConstraint constraint = targetNotIn("node", allocationTag("t1")) .build(); SchedulingRequest sc = SchedulingRequest .newInstance(0, Priority.newInstance(1), @@ -477,4 +496,413 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + + private void doNodeHeartbeat(MockNM... nms) throws Exception { + for (MockNM nm : nms) { + nm.nodeHeartbeat(true); + } + } + + private List waitForAllocation(int allocNum, int timeout, + MockAM am, MockNM... nms) throws Exception { + final List result = new ArrayList<>(); + GenericTestUtils.waitFor(() -> { + try { + AllocateResponse response = am.schedule(); + List allocated = response.getAllocatedContainers(); + System.out.println("Expecting allocation: " + allocNum + + ", actual allocation: " + allocated.size()); + for (Container c : allocated) { + System.out.println("Container " + c.getId().toString() + + " is allocated on node: " + c.getNodeId().toString() + + ", allocation tags: " + + String.join(",", c.getAllocationTags())); + } + result.addAll(allocated); + if (result.size() == allocNum) { + return true; + } + doNodeHeartbeat(nms); + } catch (Exception e) { + e.printStackTrace(); + } + return false; + }, 500, timeout); + return result; + } + + private static SchedulingRequest schedulingRequest(int requestId, + int containers, int cores, int mem, PlacementConstraint constraint, + String... tags) { + return schedulingRequest(1, requestId, containers, cores, mem, + ExecutionType.GUARANTEED, constraint, tags); + } + + private static SchedulingRequest schedulingRequest( + int priority, long allocReqId, int containers, int cores, int mem, + ExecutionType execType, PlacementConstraint constraint, String... tags) { + return SchedulingRequest.newBuilder() + .priority(Priority.newInstance(priority)) + .allocationRequestId(allocReqId) + .allocationTags(new HashSet<>(Arrays.asList(tags))) + .executionType(ExecutionTypeRequest.newInstance(execType, true)) + .resourceSizing( + ResourceSizing.newInstance(containers, + Resource.newInstance(mem, cores))) + .placementConstraintExpression(constraint) + .build(); + } + + private int getContainerNodesNum(List containers) { + Set nodes = new HashSet<>(); + if (containers != null) { + containers.forEach(c -> nodes.add(c.getNodeId())); + } + return nodes.size(); + } + + @Test(timeout = 30000L) + public void testInterAppCompositeConstraints() throws Exception { + // This test both intra and inter app constraints. + // Including simple affinity, anti-affinity, cardinality constraints, + // and simple AND composite constraints. + YarnConfiguration config = new YarnConfiguration(); + config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(config); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("192.168.0.1:1234", 100*GB, 100); + MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100); + MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100); + MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100); + MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100); + + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("hbase")); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm1); + + // App1 (hbase) + // h1: hbase-master(1) + // h2: hbase-master(1) + // h3: + // h4: + // h5: + PlacementConstraint pc = targetNotIn("node", + allocationTag("hbase-master")).build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 2048, pc, "hbase-master"))); + List allocated = waitForAllocation(2, 3000, am1, nm1, nm2); + + // 2 containers allocated + Assert.assertEquals(2, allocated.size()); + // containers should be distributed on 2 different nodes + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + // App1 (hbase) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1) + // h4: hbase-rs(1) + // h5: + pc = targetNotIn("node", allocationTag("hbase-rs")).build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(2, 4, 1, 1024, pc, "hbase-rs"))); + allocated = waitForAllocation(4, 3000, am1, nm1, nm2, nm3, nm4, nm5); + + Assert.assertEquals(4, allocated.size()); + Assert.assertEquals(4, getContainerNodesNum(allocated)); + + // App2 (web-server) + // Web server instance has 2 instance and non of them can be co-allocated + // with hbase-master. + RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("web-server")); + MockAM am2 = MockRM.launchAndRegisterAM(app2, rm, nm2); + + // App2 (web-server) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1), ws-inst(1) + // h4: hbase-rs(1), ws-inst(1) + // h5: + pc = and( + targetIn("node", allocationTagWithNamespace( + new TargetApplicationsNamespace.All().toString(), + "hbase-master")), + targetNotIn("node", allocationTag("ws-inst"))).build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 2048, pc, "ws-inst"))); + allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(2, allocated.size()); + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("If ws-inst is allocated to a node," + + " this node should have inherited the ws-inst tag ", + rmNode.getAllocationTagsWithCount().get("ws-inst") == 1); + Assert.assertTrue("ws-inst should be co-allocated to " + + "hbase-master nodes", + rmNode.getAllocationTagsWithCount().get("hbase-master") == 1); + } + + // App3 (ws-servant) + // App3 has multiple instances that must be co-allocated + // with app2 server instance, and each node cannot have more than + // 3 instances. + RMApp app3 = rm.submitApp(1*GB, ImmutableSet.of("ws-servants")); + MockAM am3 = MockRM.launchAndRegisterAM(app3, rm, nm3); + + + // App3 (ws-servant) + // h1: hbase-rs(1), hbase-master(1) + // h2: hbase-rs(1), hbase-master(1) + // h3: hbase-rs(1), ws-inst(1), ws-servant(3) + // h4: hbase-rs(1), ws-inst(1), ws-servant(3) + // h5: + pc = and( + targetIn("node", allocationTagWithNamespace( + new TargetApplicationsNamespace.AppTag("web-server").toString(), + "ws-inst")), + cardinality("node", 0, 2, "ws-servant")).build(); + am3.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 10, 1, 512, pc, "ws-servant"))); + // total 6 containers can be allocated due to cardinality constraint + // each round, 2 containers can be allocated + allocated = waitForAllocation(6, 10000, am3, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(6, allocated.size()); + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("Node has ws-servant allocated must have 3 instances", + rmNode.getAllocationTagsWithCount().get("ws-servant") == 3); + Assert.assertTrue("Every ws-servant container should be co-allocated" + + " with ws-inst", + rmNode.getAllocationTagsWithCount().get("ws-inst") == 1); + } + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000L) + public void testMultiAllocationTagsConstraints() throws Exception { + // This test simulates to use PC to avoid port conflicts + YarnConfiguration config = new YarnConfiguration(); + config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(config); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("192.168.0.1:1234", 10*GB, 10); + MockNM nm2 = rm.registerNode("192.168.0.2:1234", 10*GB, 10); + MockNM nm3 = rm.registerNode("192.168.0.3:1234", 10*GB, 10); + MockNM nm4 = rm.registerNode("192.168.0.4:1234", 10*GB, 10); + MockNM nm5 = rm.registerNode("192.168.0.5:1234", 10*GB, 10); + + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("server1")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + // App1 uses ports: 7000, 8000 and 9000 + String[] server1Ports = + new String[] {"port_6000", "port_7000", "port_8000"}; + PlacementConstraint pc = targetNotIn("node", + allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(), + server1Ports)) + .build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 1024, pc, server1Ports))); + List allocated = waitForAllocation(2, 3000, + am1, nm1, nm2, nm3, nm4, nm5); + + // 2 containers allocated + Assert.assertEquals(2, allocated.size()); + // containers should be distributed on 2 different nodes + Assert.assertEquals(2, getContainerNodesNum(allocated)); + + // App1 uses ports: 6000 + String[] server2Ports = new String[] {"port_6000"}; + RMApp app2 = rm.submitApp(1*GB, ImmutableSet.of("server2")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm2); + RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId()); + am2.registerAppAttempt(); + + pc = targetNotIn("node", + allocationTagWithNamespace(AllocationTagNamespaceType.ALL.toString(), + server2Ports)) + .build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, server2Ports))); + allocated = waitForAllocation(3, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(3, allocated.size()); + Assert.assertEquals(3, getContainerNodesNum(allocated)); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("server2 should not co-allocate to server1 as" + + " they both need to use port 6000", + rmNode.getAllocationTagsWithCount().get("port_6000") == 1); + Assert.assertFalse(rmNode.getAllocationTagsWithCount() + .containsKey("port_7000")); + Assert.assertFalse(rmNode.getAllocationTagsWithCount() + .containsKey("port_8000")); + } + } finally { + rm.stop(); + } + } + + @Test(timeout = 30000L) + public void testInterAppConstraintsWithNamespaces() throws Exception { + // This test verifies inter-app constraints with namespaces + // not-self/app-id/app-tag + YarnConfiguration config = new YarnConfiguration(); + config.set(YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_HANDLER, + YarnConfiguration.SCHEDULER_RM_PLACEMENT_CONSTRAINTS_HANDLER); + config.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + MockRM rm = new MockRM(config); + try { + rm.start(); + + MockNM nm1 = rm.registerNode("192.168.0.1:1234:", 100*GB, 100); + MockNM nm2 = rm.registerNode("192.168.0.2:1234", 100*GB, 100); + MockNM nm3 = rm.registerNode("192.168.0.3:1234", 100*GB, 100); + MockNM nm4 = rm.registerNode("192.168.0.4:1234", 100*GB, 100); + MockNM nm5 = rm.registerNode("192.168.0.5:1234", 100*GB, 100); + + ApplicationId app5Id = null; + Map> allocMap = new HashMap<>(); + // 10 apps and all containers are attached with foo tag + for (int i = 0; i<10; i++) { + // App1 ~ app5 tag "former5" + // App6 ~ app10 tag "latter5" + String applicationTag = i<5 ? "former5" : "latter5"; + RMApp app = rm.submitApp(1*GB, ImmutableSet.of(applicationTag)); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1, nm2, nm3, nm4, nm5); + RMAppAttempt attempt = app.getCurrentAppAttempt(); + MockAM am = rm.sendAMLaunched(attempt.getAppAttemptId()); + am.registerAppAttempt(); + + PlacementConstraint pc = targetNotIn("node", allocationTag("foo")) + .build(); + am.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, "foo"))); + List allocated = waitForAllocation(3, 3000, + am, nm1, nm2, nm3, nm4, nm5); + // Memorize containers that has app5 foo + if (i == 5) { + app5Id = am.getApplicationAttemptId().getApplicationId(); + } + allocMap.put(am.getApplicationAttemptId().getApplicationId(), + allocated); + } + + Assert.assertNotNull(app5Id); + Assert.assertEquals(3, getContainerNodesNum(allocMap.get(app5Id))); + + // *** app-id + // Submit another app, use app-id constraint against app5 + RMApp app1 = rm.submitApp(1*GB, ImmutableSet.of("xyz")); + // Allocate AM container on nm1 + doNodeHeartbeat(nm1); + RMAppAttempt attempt1 = app1.getCurrentAppAttempt(); + MockAM am1 = rm.sendAMLaunched(attempt1.getAppAttemptId()); + am1.registerAppAttempt(); + + PlacementConstraint pc = targetIn("node", + allocationTagWithNamespace( + new TargetApplicationsNamespace.AppID(app5Id).toString(), + "foo")) + .build(); + am1.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 3, 1, 1024, pc, "foo"))); + List allocated = waitForAllocation(3, 3000, + am1, nm1, nm2, nm3, nm4, nm5); + + ConcurrentMap rmNodes = rm.getRMContext().getRMNodes(); + List app5Alloc = allocMap.get(app5Id); + for (Container c : allocated) { + RMNode rmNode = rmNodes.get(c.getNodeId()); + Assert.assertNotNull(rmNode); + Assert.assertTrue("This app is affinity with app-id/app5/foo " + + "containers", + app5Alloc.stream().anyMatch( + c5 -> c5.getNodeId() == c.getNodeId())); + } + + // *** app-tag + RMApp app2 = rm.submitApp(1*GB); + // Allocate AM container on nm1 + doNodeHeartbeat(nm2); + RMAppAttempt app2attempt1 = app2.getCurrentAppAttempt(); + MockAM am2 = rm.sendAMLaunched(app2attempt1.getAppAttemptId()); + am2.registerAppAttempt(); + + pc = targetNotIn("node", + allocationTagWithNamespace( + new TargetApplicationsNamespace.AppTag("xyz").toString(), + "foo")) + .build(); + am2.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 2, 1, 1024, pc, "foo"))); + allocated = waitForAllocation(2, 3000, am2, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(2, allocated.size()); + + // none of them can be allocated to nodes that has app5 foo containers + for (Container c : app5Alloc) { + Assert.assertNotEquals(c.getNodeId(), + allocated.iterator().next().getNodeId()); + } + + // *** not-self + RMApp app3 = rm.submitApp(1*GB); + // Allocate AM container on nm1 + doNodeHeartbeat(nm3); + RMAppAttempt app3attempt1 = app3.getCurrentAppAttempt(); + MockAM am3 = rm.sendAMLaunched(app3attempt1.getAppAttemptId()); + am3.registerAppAttempt(); + + pc = cardinality("node", + new TargetApplicationsNamespace.NotSelf().toString(), + 1, 1, "foo").build(); + am3.addSchedulingRequest( + ImmutableList.of( + schedulingRequest(1, 1, 1, 1024, pc, "foo"))); + allocated = waitForAllocation(1, 3000, am3, nm1, nm2, nm3, nm4, nm5); + Assert.assertEquals(1, allocated.size()); + // All 5 containers should be allocated + Assert.assertTrue(rmNodes.get(allocated.iterator().next().getNodeId()) + .getAllocationTagsWithCount().get("foo") == 2); + } finally { + rm.stop(); + } + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java index ccf428143d..902c6d5a55 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement; -import com.google.common.collect.ImmutableSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTags; import org.apache.hadoop.yarn.api.records.ExecutionType; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; @@ -131,8 +130,6 @@ public void testSchedulingRequestValidation() { .build()).resourceSizing( ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) .build()); - Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), - allocator.getTargetAllocationTags()); Assert.assertEquals("", allocator.getTargetNodePartition()); // Valid (with partition) @@ -147,8 +144,6 @@ public void testSchedulingRequestValidation() { .build()).resourceSizing( ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) .build()); - Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), - allocator.getTargetAllocationTags()); Assert.assertEquals("x", allocator.getTargetNodePartition()); // Valid (without specifying node partition) @@ -162,8 +157,6 @@ public void testSchedulingRequestValidation() { .resourceSizing( ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) .build()); - Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), - allocator.getTargetAllocationTags()); Assert.assertEquals("", allocator.getTargetNodePartition()); // Valid (with application Id target) @@ -178,8 +171,6 @@ public void testSchedulingRequestValidation() { ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) .build()); // Allocation tags should not include application Id - Assert.assertEquals(ImmutableSet.of("mapper", "reducer"), - allocator.getTargetAllocationTags()); Assert.assertEquals("", allocator.getTargetNodePartition()); // Invalid (without sizing) @@ -200,75 +191,6 @@ public void testSchedulingRequestValidation() { .targetNotIn(PlacementConstraints.NODE).build()) .build(), true); - // Invalid (with multiple allocation tags expression specified) - assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) - .allocationRequestId(10L).priority(Priority.newInstance(1)) - .placementConstraintExpression(PlacementConstraints - .targetNotIn(PlacementConstraints.NODE, - PlacementConstraints.PlacementTargets - .allocationTag("mapper"), - PlacementConstraints.PlacementTargets - .allocationTag("reducer"), - PlacementConstraints.PlacementTargets.nodePartition("")) - .build()).resourceSizing( - ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) - .build(), true); - - // Invalid (with multiple node partition target expression specified) - assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) - .allocationRequestId(10L).priority(Priority.newInstance(1)) - .placementConstraintExpression(PlacementConstraints - .targetNotIn(PlacementConstraints.NODE, - PlacementConstraints.PlacementTargets - .allocationTag("mapper"), - PlacementConstraints.PlacementTargets - .allocationTag(""), - PlacementConstraints.PlacementTargets.nodePartition("x")) - .build()).resourceSizing( - ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) - .build(), true); - - // Invalid (not anti-affinity cardinality) - assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) - .allocationRequestId(10L).priority(Priority.newInstance(1)) - .placementConstraintExpression(PlacementConstraints - .targetCardinality(PlacementConstraints.NODE, 1, 2, - PlacementConstraints.PlacementTargets - .allocationTag("mapper"), - PlacementConstraints.PlacementTargets.nodePartition("")) - .build()).resourceSizing( - ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) - .build(), true); - - // Invalid (not anti-affinity cardinality) - assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) - .allocationRequestId(10L).priority(Priority.newInstance(1)) - .placementConstraintExpression(PlacementConstraints - .targetCardinality(PlacementConstraints.NODE, 0, 2, - PlacementConstraints.PlacementTargets - .allocationTag("mapper"), - PlacementConstraints.PlacementTargets.nodePartition("")) - .build()).resourceSizing( - ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) - .build(), true); - - // Invalid (not NODE scope) - assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( - ExecutionTypeRequest.newInstance(ExecutionType.GUARANTEED)) - .allocationRequestId(10L).priority(Priority.newInstance(1)) - .placementConstraintExpression(PlacementConstraints - .targetNotIn(PlacementConstraints.RACK, - PlacementConstraints.PlacementTargets - .allocationTag("mapper", "reducer"), - PlacementConstraints.PlacementTargets.nodePartition("")) - .build()).resourceSizing( - ResourceSizing.newInstance(1, Resource.newInstance(1024, 1))) - .build(), true); - // Invalid (not GUARANTEED) assertInvalidSchedulingRequest(SchedulingRequest.newBuilder().executionType( ExecutionTypeRequest.newInstance(ExecutionType.OPPORTUNISTIC))