diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java index ceff6f6881..5cb8b99f8b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/MemoryPlacementConstraintManager.java @@ -24,6 +24,8 @@ import java.util.HashMap; import java.util.Map; import java.util.Set; +import java.util.List; +import java.util.ArrayList; import java.util.concurrent.locks.ReentrantReadWriteLock; import java.util.stream.Collectors; import java.util.stream.Stream; @@ -33,6 +35,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -236,6 +239,45 @@ public PlacementConstraint getGlobalConstraint(Set sourceTags) { } } + @Override + public PlacementConstraint getMultilevelConstraint(ApplicationId appId, + Set sourceTags, PlacementConstraint schedulingRequestConstraint) { + List constraints = new ArrayList<>(); + // Add scheduling request-level constraint. + if (schedulingRequestConstraint != null) { + constraints.add(schedulingRequestConstraint); + } + // Add app-level constraint if appId is given. + if (appId != null && sourceTags != null + && !sourceTags.isEmpty()) { + constraints.add(getConstraint(appId, sourceTags)); + } + // Add global constraint. + if (sourceTags != null && !sourceTags.isEmpty()) { + constraints.add(getGlobalConstraint(sourceTags)); + } + + // Remove all null or duplicate constraints. + List allConstraints = + constraints.stream() + .filter(placementConstraint -> placementConstraint != null + && placementConstraint.getConstraintExpr() != null) + .map(PlacementConstraint::getConstraintExpr) + .distinct() + .collect(Collectors.toList()); + + // Compose an AND constraint + // When merge request(RC), app(AC) and global constraint(GC), + // we do a merge on them with CC=AND(GC, AC, RC) and returns a + // composite AND constraint. Subsequently we check if CC could + // be satisfied. This ensures that every level of constraint + // is satisfied. + PlacementConstraint.And andConstraint = PlacementConstraints.and( + allConstraints.toArray(new PlacementConstraint + .AbstractConstraint[allConstraints.size()])); + return andConstraint.build(); + } + @Override public void unregisterApplication(ApplicationId appId) { try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java index 7725d0d1a6..bf2365c147 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintManager.java @@ -104,6 +104,19 @@ PlacementConstraint getConstraint(ApplicationId appId, */ PlacementConstraint getGlobalConstraint(Set sourceTags); + /** + * Consider all levels of constraints (scheduling request, app, cluster) and + * return a merged constraint. + * + * @param applicationId application ID + * @param sourceTags a set of source allocation tags + * @param schedulingRequestConstraint placement constraint at scheduling + * request level + * @return a merged placement constraint + */ + PlacementConstraint getMultilevelConstraint(ApplicationId applicationId, + Set sourceTags, PlacementConstraint schedulingRequestConstraint); + /** * Remove the constraints that correspond to a given application. * diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java index 6396e5722b..ab0bbd7f77 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/PlacementConstraintsUtil.java @@ -248,22 +248,14 @@ public static boolean canSatisfyConstraints(ApplicationId applicationId, SchedulingRequest request, SchedulerNode schedulerNode, PlacementConstraintManager pcm, AllocationTagsManager atm) throws InvalidAllocationTagsQueryException { - // TODO do proper merge on different level of constraints, see YARN-7778. - - // Request level constraint - PlacementConstraint constraint = request.getPlacementConstraint(); - if (constraint == null) { - // Application level constraint - constraint = pcm.getConstraint(applicationId, - request.getAllocationTags()); - if (constraint == null) { - // Global level constraint - constraint = pcm.getGlobalConstraint(request.getAllocationTags()); - if (constraint == null) { - return true; - } - } + Set sourceTags = null; + PlacementConstraint pc = null; + if (request != null) { + sourceTags = request.getAllocationTags(); + pc = request.getPlacementConstraint(); } - return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm); + return canSatisfyConstraints(applicationId, + pcm.getMultilevelConstraint(applicationId, sourceTags, pc), + schedulerNode, atm); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java index abcab1a3a0..976906d2a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementConstraintManagerService.java @@ -34,8 +34,11 @@ import java.util.stream.Collectors; import java.util.stream.Stream; +import com.google.common.collect.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; +import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And; import org.apache.hadoop.yarn.api.resource.PlacementConstraints; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.junit.Assert; @@ -179,4 +182,83 @@ public void testValidateConstraint() { Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1)); Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1)); } + + @Test + public void testGetRequestConstraint() { + // Request Constraint(RC), App Constraint(AC), Global Constraint(GC) + PlacementConstraint constraint; + And mergedConstraint; + SchedulingRequest request; + + // RC = c1 + // AC = null + // GC = null + constraint = pcm.getMultilevelConstraint(appId1, null, c1); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + + // RC = null + // AC = tag1->c1, tag2->c2 + // GC = null + pcm.registerApplication(appId1, constraintMap1); + // if the source tag in the request is not mapped to any existing + // registered constraint, we should get an empty AND constraint. + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet("not_exist_tag"), null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND() + Assert.assertEquals(0, mergedConstraint.getChildren().size()); + // if a mapping is found for a given source tag + constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.unregisterApplication(appId1); + + // RC = null + // AC = null + // GC = tag1->c1 + pcm.addGlobalConstraint(sourceTag1, c1, true); + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet(sourceTag1), null); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1) + Assert.assertEquals(1, mergedConstraint.getChildren().size()); + Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build()); + pcm.removeGlobalConstraint(sourceTag1); + + // RC = c2 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c3 + pcm.addGlobalConstraint(sourceTag1, c3, true); + pcm.registerApplication(appId1, constraintMap1); + // both RC, AC and GC should be respected + constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, c2); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2, c3) + Assert.assertEquals(3, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + + // RC = c1 + // AC = tag1->c1, tag2->c2 + // GC = tag1->c2 + pcm.addGlobalConstraint(sourceTag1, c2, true); + pcm.registerApplication(appId1, constraintMap1); + constraint = pcm.getMultilevelConstraint(appId1, + Sets.newHashSet(sourceTag1), c1); + Assert.assertTrue(constraint.getConstraintExpr() instanceof And); + mergedConstraint = (And) constraint.getConstraintExpr(); + // AND(c1, c2) + Assert.assertEquals(2, mergedConstraint.getChildren().size()); + pcm.removeGlobalConstraint(sourceTag1); + pcm.unregisterApplication(appId1); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java index 3485ea8d4e..9be56ff0c4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/placement/TestSingleConstraintAppPlacementAllocator.java @@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager; import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.junit.Assert; import org.junit.Before; @@ -76,10 +78,13 @@ public void setup() throws Exception { // Create allocation tags manager AllocationTagsManager allocationTagsManager = new AllocationTagsManager( rmContext); + PlacementConstraintManager placementConstraintManager = + new MemoryPlacementConstraintManager(); spyAllocationTagsManager = spy(allocationTagsManager); schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L, TestUtils.getMockContainerId(1, 1)); rmContext.setAllocationTagsManager(spyAllocationTagsManager); + rmContext.setPlacementConstraintManager(placementConstraintManager); // Create allocator allocator = new SingleConstraintAppPlacementAllocator();