diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java index 62da092201..aa92d7a373 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/api/resource/TestPlacementConstraintTransformations.java @@ -156,7 +156,7 @@ public void testCompositeConstraint() { SingleConstraintTransformer singleTransformer = new SingleConstraintTransformer(specConstraint); PlacementConstraint simConstraint = singleTransformer.transform(); - Assert.assertTrue(constraintExpr instanceof Or); + Assert.assertTrue(simConstraint.getConstraintExpr() instanceof Or); Or simOrExpr = (Or) specConstraint.getConstraintExpr(); for (AbstractConstraint child : simOrExpr.getChildren()) { Assert.assertTrue(child instanceof SingleConstraint); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 199dd62c21..6396e5722b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -28,6 +28,8 @@ import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.AbstractConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.Or; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.SingleConstraint; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression; import org.apache.hadoop.yarn.api.resource.PlacementConstraint.TargetExpression.TargetType; @@ -149,6 +151,48 @@ private static boolean canSatisfySingleConstraint(ApplicationId applicationId, return true; } + /** + * Returns true if all child constraints are satisfied. + * @param appId application id + * @param constraint Or constraint + * @param node node + * @param atm allocation tags manager + * @return true if all child constraints are satisfied, false otherwise + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfyAndConstraint(ApplicationId appId, + And constraint, SchedulerNode node, AllocationTagsManager atm) + throws InvalidAllocationTagsQueryException { + // Iterate over the constraints tree, if found any child constraint + // isn't satisfied, return false. + for (AbstractConstraint child : constraint.getChildren()) { + if(!canSatisfyConstraints(appId, child.build(), node, atm)) { + return false; + } + } + return true; + } + + /** + * Returns true as long as any of child constraint is satisfied. + * @param appId application id + * @param constraint Or constraint + * @param node node + * @param atm allocation tags manager + * @return true if any child constraint is satisfied, false otherwise + * @throws InvalidAllocationTagsQueryException + */ + private static boolean canSatisfyOrConstraint(ApplicationId appId, + Or constraint, SchedulerNode node, AllocationTagsManager atm) + throws InvalidAllocationTagsQueryException { + for (AbstractConstraint child : constraint.getChildren()) { + if (canSatisfyConstraints(appId, child.build(), node, atm)) { + return true; + } + } + return false; + } + private static boolean canSatisfyConstraints(ApplicationId appId, PlacementConstraint constraint, SchedulerNode node, AllocationTagsManager atm) @@ -167,9 +211,16 @@ private static boolean canSatisfyConstraints(ApplicationId appId, if (sConstraintExpr instanceof SingleConstraint) { SingleConstraint single = (SingleConstraint) sConstraintExpr; return canSatisfySingleConstraint(appId, single, node, atm); + } else if (sConstraintExpr instanceof And) { + And and = (And) sConstraintExpr; + return canSatisfyAndConstraint(appId, and, node, atm); + } else if (sConstraintExpr instanceof Or) { + Or or = (Or) sConstraintExpr; + return canSatisfyOrConstraint(appId, or, node, atm); } else { throw new InvalidAllocationTagsQueryException( - "Unsupported type of constraint."); + "Unsupported type of constraint: " + + sConstraintExpr.getClass().getSimpleName()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java index a5460c2d7c..5135f636dc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintsUtil.java @@ -21,7 +21,12 @@ import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.RACK; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetIn; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetNotIn; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.maxCardinality; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.and; +import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.or; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; import java.util.AbstractMap; import java.util.Arrays; @@ -34,6 +39,7 @@ import java.util.stream.Stream; import java.util.concurrent.atomic.AtomicLong; +import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -44,12 +50,11 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.ExecutionTypeRequest; import org.apache.hadoop.yarn.api.records.ResourceSizing; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.MockNodes; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerNode; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; import org.junit.Before; @@ -66,9 +71,10 @@ public class TestPlacementConstraintsUtil { private RMContext rmContext; private static final int GB = 1024; private ApplicationId appId1; - private PlacementConstraint c1, c2, c3, c4; + private PlacementConstraint c1, c2, c3, c4, c5, c6, c7; private Set sourceTag1, sourceTag2; - private Map, PlacementConstraint> constraintMap1, constraintMap2; + private Map, PlacementConstraint> constraintMap1, + constraintMap2, constraintMap3, constraintMap4; private AtomicLong requestID = new AtomicLong(0); @Before @@ -92,6 +98,16 @@ public void setup() { .build(targetNotIn(NODE, allocationTag("hbase-m"))); c4 = PlacementConstraints .build(targetNotIn(RACK, allocationTag("hbase-rs"))); + c5 = PlacementConstraints + .build(and(targetNotIn(NODE, allocationTag("hbase-m")), + maxCardinality(NODE, 3, "spark"))); + c6 = PlacementConstraints + .build(or(targetIn(NODE, allocationTag("hbase-m")), + targetIn(NODE, allocationTag("hbase-rs")))); + c7 = PlacementConstraints + .build(or(targetIn(NODE, allocationTag("hbase-m")), + and(targetIn(NODE, allocationTag("hbase-rs")), + targetIn(NODE, allocationTag("spark"))))); sourceTag1 = new HashSet<>(Arrays.asList("spark")); sourceTag2 = new HashSet<>(Arrays.asList("zk")); @@ -106,6 +122,15 @@ public void setup() { new AbstractMap.SimpleEntry<>(sourceTag2, c4)) .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, AbstractMap.SimpleEntry::getValue)); + constraintMap3 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c5)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); + constraintMap4 = Stream + .of(new AbstractMap.SimpleEntry<>(sourceTag1, c6), + new AbstractMap.SimpleEntry<>(sourceTag2, c7)) + .collect(Collectors.toMap(AbstractMap.SimpleEntry::getKey, + AbstractMap.SimpleEntry::getValue)); } private SchedulingRequest createSchedulingRequest(Set allocationTags, @@ -124,6 +149,20 @@ private SchedulingRequest createSchedulingRequest(Set return createSchedulingRequest(allocationTags, null); } + private ContainerId newContainerId(ApplicationId appId) { + return ContainerId.newContainerId( + ApplicationAttemptId.newInstance(appId, 0), 0); + } + + private SchedulerNode newSchedulerNode(String hostname, String rackName, + NodeId nodeId) { + SchedulerNode node = mock(SchedulerNode.class); + when(node.getNodeName()).thenReturn(hostname); + when(node.getRackName()).thenReturn(rackName); + when(node.getNodeID()).thenReturn(nodeId); + return node; + } + @Test public void testNodeAffinityAssignment() throws InvalidAllocationTagsQueryException { @@ -137,8 +176,9 @@ public void testNodeAffinityAssignment() Iterator nodeIterator = rmNodes.iterator(); while (nodeIterator.hasNext()) { RMNode currentNode = nodeIterator.next(); - FiCaSchedulerNode schedulerNode = TestUtils.getMockNode( - currentNode.getHostName(), currentNode.getRackName(), 123, 4 * GB); + SchedulerNode schedulerNode =newSchedulerNode(currentNode.getHostName(), + currentNode.getRackName(), currentNode.getNodeID()); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag1), schedulerNode, pcm, tm)); Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, @@ -153,14 +193,15 @@ public void testNodeAffinityAssignment() RMNode n1_r1 = rmNodes.get(1); RMNode n2_r2 = rmNodes.get(2); RMNode n3_r2 = rmNodes.get(3); - FiCaSchedulerNode schedulerNode0 = TestUtils - .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode1 = TestUtils - .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode2 = TestUtils - .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode3 = TestUtils - .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(), + n0_r1.getRackName(), n0_r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(), + n1_r1.getRackName(), n1_r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(), + n2_r2.getRackName(), n2_r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(), + n3_r2.getRackName(), n3_r2.getNodeID()); + // 1 Containers on node 0 with allocationTag 'hbase-m' ContainerId hbase_m = ContainerId .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); @@ -200,14 +241,15 @@ public void testRackAffinityAssignment() .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs")); - FiCaSchedulerNode schedulerNode0 = TestUtils - .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode1 = TestUtils - .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode2 = TestUtils - .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode3 = TestUtils - .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(), + n0_r1.getRackName(), n0_r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(), + n1_r1.getRackName(), n1_r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(), + n2_r2.getRackName(), n2_r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(), + n3_r2.getRackName(), n3_r2.getNodeID()); + // 'zk' placement on Rack1 should now SUCCEED Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm)); @@ -238,14 +280,16 @@ public void testNodeAntiAffinityAssignment() RMNode n1_r1 = rmNodes.get(1); RMNode n2_r2 = rmNodes.get(2); RMNode n3_r2 = rmNodes.get(3); - FiCaSchedulerNode schedulerNode0 = TestUtils - .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode1 = TestUtils - .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode2 = TestUtils - .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode3 = TestUtils - .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + + SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(), + n0_r1.getRackName(), n0_r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(), + n1_r1.getRackName(), n1_r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(), + n2_r2.getRackName(), n2_r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(), + n3_r2.getRackName(), n3_r2.getNodeID()); + // 1 Containers on node 0 with allocationTag 'hbase-m' ContainerId hbase_m = ContainerId .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); @@ -285,14 +329,14 @@ public void testRackAntiAffinityAssignment() .newContainerId(ApplicationAttemptId.newInstance(appId1, 0), 0); tm.addContainer(n0_r1.getNodeID(), hbase_m, ImmutableSet.of("hbase-rs")); - FiCaSchedulerNode schedulerNode0 = TestUtils - .getMockNode(n0_r1.getHostName(), n0_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode1 = TestUtils - .getMockNode(n1_r1.getHostName(), n1_r1.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode2 = TestUtils - .getMockNode(n2_r2.getHostName(), n2_r2.getRackName(), 123, 4 * GB); - FiCaSchedulerNode schedulerNode3 = TestUtils - .getMockNode(n3_r2.getHostName(), n3_r2.getRackName(), 123, 4 * GB); + SchedulerNode schedulerNode0 =newSchedulerNode(n0_r1.getHostName(), + n0_r1.getRackName(), n0_r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1_r1.getHostName(), + n1_r1.getRackName(), n1_r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2_r2.getHostName(), + n2_r2.getRackName(), n2_r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3_r2.getHostName(), + n3_r2.getRackName(), n3_r2.getNodeID()); // 'zk' placement on Rack1 should FAIL Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, @@ -306,4 +350,162 @@ public void testRackAntiAffinityAssignment() Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm)); } + + @Test + public void testORConstraintAssignment() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + // Register App1 with anti-affinity constraint map. + pcm.registerApplication(appId1, constraintMap4); + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-rs(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-rs")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID()) + .get("hbase-rs").longValue()); + + SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + // n0 and n2 should be qualified for allocation as + // they either have hbase-m or hbase-rs tag + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-rs(1) + * n3: hbase-rs(1) + */ + tm.addContainer(n3r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-rs")); + // n3 is qualified now because it is allocated with hbase-rs tag + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-rs(1), spark(1) + * n3: hbase-rs(1) + */ + // Place + tm.addContainer(n2r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("spark")); + // According to constraint, "zk" is allowed to be placed on a node + // has "hbase-m" tag OR a node has both "hbase-rs" and "spark" tags. + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode1, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode2, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag2), schedulerNode3, pcm, tm)); + } + + @Test + public void testANDConstraintAssignment() + throws InvalidAllocationTagsQueryException { + AllocationTagsManager tm = new AllocationTagsManager(rmContext); + PlacementConstraintManagerService pcm = + new MemoryPlacementConstraintManager(); + // Register App1 with anti-affinity constraint map. + pcm.registerApplication(appId1, constraintMap3); + RMNode n0r1 = rmNodes.get(0); + RMNode n1r1 = rmNodes.get(1); + RMNode n2r2 = rmNodes.get(2); + RMNode n3r2 = rmNodes.get(3); + + /** + * Place container: + * n0: hbase-m(1) + * n1: "" + * n2: hbase-m(1) + * n3: "" + */ + tm.addContainer(n0r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + tm.addContainer(n2r2.getNodeID(), + newContainerId(appId1), ImmutableSet.of("hbase-m")); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n0r1.getNodeID()) + .get("hbase-m").longValue()); + Assert.assertEquals(1L, tm.getAllocationTagsWithCount(n2r2.getNodeID()) + .get("hbase-m").longValue()); + + SchedulerNode schedulerNode0 =newSchedulerNode(n0r1.getHostName(), + n0r1.getRackName(), n0r1.getNodeID()); + SchedulerNode schedulerNode1 =newSchedulerNode(n1r1.getHostName(), + n1r1.getRackName(), n1r1.getNodeID()); + SchedulerNode schedulerNode2 =newSchedulerNode(n2r2.getHostName(), + n2r2.getRackName(), n2r2.getNodeID()); + SchedulerNode schedulerNode3 =newSchedulerNode(n3r2.getHostName(), + n3r2.getRackName(), n3r2.getNodeID()); + + // Anti-affinity with hbase-m so it should not be able to be placed + // onto n0 and n2 as they already have hbase-m allocated. + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + + /** + * Place container: + * n0: hbase-m(1) + * n1: spark(3) + * n2: hbase-m(1) + * n3: "" + */ + for (int i=0; i<4; i++) { + tm.addContainer(n1r1.getNodeID(), + newContainerId(appId1), ImmutableSet.of("spark")); + } + Assert.assertEquals(4L, tm.getAllocationTagsWithCount(n1r1.getNodeID()) + .get("spark").longValue()); + + // Violate cardinality constraint + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode0, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode1, pcm, tm)); + Assert.assertFalse(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode2, pcm, tm)); + Assert.assertTrue(PlacementConstraintsUtil.canSatisfyConstraints(appId1, + createSchedulingRequest(sourceTag1), schedulerNode3, pcm, tm)); + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index 698c17b257..a530230404 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -60,6 +60,7 @@ import java.util.stream.Collectors; import static java.lang.Thread.sleep; +import static org.apache.hadoop.yarn.api.records.RejectionReason.COULD_NOT_PLACE_ON_NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.NODE; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.PlacementTargets.allocationTag; import static org.apache.hadoop.yarn.api.resource.PlacementConstraints.targetCardinality; @@ -142,7 +143,8 @@ public void testAntiAffinityPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 4); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 4); Assert.assertEquals(4, allocatedContainers.size()); Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) @@ -195,7 +197,8 @@ public void testMutualAntiAffinityPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 5); Assert.assertEquals(5, allocatedContainers.size()); Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) @@ -244,7 +247,8 @@ public void testCardinalityPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 8); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 8); Assert.assertEquals(8, allocatedContainers.size()); Map nodeIdContainerIdMap = @@ -294,7 +298,8 @@ public void testAffinityPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 5); Assert.assertEquals(5, allocatedContainers.size()); Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) @@ -347,7 +352,8 @@ public void testComplexPlacement() throws Exception { allocatedContainers.addAll(allocResponse.getAllocatedContainers()); // kick the scheduler - waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 6); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 6); Assert.assertEquals(6, allocatedContainers.size()); Map nodeIdContainerIdMap = @@ -584,7 +590,7 @@ public void testPlacementRejection() throws Exception { // Ensure unique nodes Assert.assertEquals(4, nodeIds.size()); RejectedSchedulingRequest rej = rejectedReqs.get(0); - Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE, + Assert.assertEquals(COULD_NOT_PLACE_ON_NODE, rej.getReason()); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); @@ -592,9 +598,145 @@ public void testPlacementRejection() throws Exception { verifyMetrics(metrics, 11264, 11, 5120, 5, 5); } + @Test(timeout = 300000) + public void testAndOrPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 40960, 100, + rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 40960, 100, + rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 40960, 100, + rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 40960, 100, + rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + + // Register app1 with following constraints + // 1) foo anti-affinity with foo on node + // 2) bar anti-affinity with foo on node AND maxCardinality = 2 + // 3) moo affinity with foo OR bar + Map, PlacementConstraint> app1Constraints = new HashMap<>(); + app1Constraints.put(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); + app1Constraints.put(Collections.singleton("bar"), + PlacementConstraints.build( + PlacementConstraints.and( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")), + PlacementConstraints.maxCardinality(NODE, 2, "bar")))); + app1Constraints.put(Collections.singleton("moo"), + PlacementConstraints.build( + PlacementConstraints.or( + PlacementConstraints.targetIn(NODE, allocationTag("foo")), + PlacementConstraints.targetIn(NODE, allocationTag("bar"))))); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, app1Constraints); + + // Allocates 3 foo containers on 3 different nodes, + // in anti-affinity fashion. + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo") + )); + List allocatedContainers = new ArrayList<>(); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 3); + printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager()); + Assert.assertEquals(3, allocatedContainers.size()); + + /** Testing AND placement constraint**/ + // Now allocates a bar container, as restricted by the AND constraint, + // bar could be only allocated to the node without foo + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "bar") + )); + allocatedContainers.clear(); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 1); + printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager()); + Assert.assertEquals(1, allocatedContainers.size()); + NodeId barNode = allocatedContainers.get(0).getNodeId(); + + // Sends another 3 bar request, 2 of them can be allocated + // as maxCardinality is 2, for placed containers, they should be all + // on the node where the last bar was placed. + allocatedContainers.clear(); + List rejectedContainers = new ArrayList<>(); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 2, 1, 512, "bar"), + schedulingRequest(1, 3, 1, 512, "bar"), + schedulingRequest(1, 4, 1, 512, "bar") + )); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, rejectedContainers, 2); + printTags(nodes.values(), rm.getRMContext().getAllocationTagsManager()); + Assert.assertEquals(2, allocatedContainers.size()); + Assert.assertTrue(allocatedContainers.stream().allMatch( + container -> container.getNodeId().equals(barNode))); + + // The third request could not be satisfied because it violates + // the cardinality constraint. Validate rejected request correctly + // capture this. + Assert.assertEquals(1, rejectedContainers.size()); + Assert.assertEquals(COULD_NOT_PLACE_ON_NODE, + rejectedContainers.get(0).getReason()); + + /** Testing OR placement constraint**/ + // Register one more NM for testing + MockNM nm5 = new MockNM("h5:1234", 4096, 100, + rm.getResourceTrackerService()); + nodes.put(nm5.getNodeId(), nm5); + nm5.registerNode(); + nm5.nodeHeartbeat(true); + + List mooRequests = new ArrayList<>(); + for (int i=5; i<25; i++) { + mooRequests.add(schedulingRequest(1, i, 1, 100, "moo")); + } + am1.addSchedulingRequest(mooRequests); + allocatedContainers.clear(); + waitForContainerAllocation(nodes.values(), am1, + allocatedContainers, new ArrayList<>(), 20); + + // All 20 containers should be allocated onto nodes besides nm5, + // because moo affinity to foo or bar which only exists on rest of nodes. + Assert.assertEquals(20, allocatedContainers.size()); + for (Container mooContainer : allocatedContainers) { + // nm5 has no moo allocated containers. + Assert.assertFalse(mooContainer.getNodeId().equals(nm5.getNodeId())); + } + } + + private static void printTags(Collection nodes, + AllocationTagsManager atm){ + for (MockNM nm : nodes) { + Map nmTags = atm + .getAllocationTagsWithCount(nm.getNodeId()); + StringBuffer sb = new StringBuffer(); + if (nmTags != null) { + nmTags.forEach((tag, count) -> + sb.append(tag + "(" + count + "),")); + LOG.info("nm_" + nm.getNodeId() + ": " + sb.toString()); + } + } + } + private static void waitForContainerAllocation(Collection nodes, - MockAM am, List allocatedContainers, int containerNum) - throws Exception { + MockAM am, List allocatedContainers, + List rejectedRequests, + int containerNum) throws Exception { int attemptCount = 10; while (allocatedContainers.size() < containerNum && attemptCount > 0) { for (MockNM node : nodes) { @@ -605,6 +747,7 @@ private static void waitForContainerAllocation(Collection nodes, sleep(1000); AllocateResponse allocResponse = am.schedule(); allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedRequests.addAll(allocResponse.getRejectedSchedulingRequests()); attemptCount--; } }