YARN-7778. Merging of placement constraints defined at different levels. Contributed by Weiwei Yang.
This commit is contained in:
parent
b6e50fad53
commit
50723889cc
@ -24,6 +24,8 @@ import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.List;
|
||||
import java.util.ArrayList;
|
||||
import java.util.concurrent.locks.ReentrantReadWriteLock;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
@ -33,6 +35,7 @@ import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -236,6 +239,45 @@ public class MemoryPlacementConstraintManager
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public PlacementConstraint getMultilevelConstraint(ApplicationId appId,
|
||||
Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint) {
|
||||
List<PlacementConstraint> constraints = new ArrayList<>();
|
||||
// Add scheduling request-level constraint.
|
||||
if (schedulingRequestConstraint != null) {
|
||||
constraints.add(schedulingRequestConstraint);
|
||||
}
|
||||
// Add app-level constraint if appId is given.
|
||||
if (appId != null && sourceTags != null
|
||||
&& !sourceTags.isEmpty()) {
|
||||
constraints.add(getConstraint(appId, sourceTags));
|
||||
}
|
||||
// Add global constraint.
|
||||
if (sourceTags != null && !sourceTags.isEmpty()) {
|
||||
constraints.add(getGlobalConstraint(sourceTags));
|
||||
}
|
||||
|
||||
// Remove all null or duplicate constraints.
|
||||
List<PlacementConstraint.AbstractConstraint> allConstraints =
|
||||
constraints.stream()
|
||||
.filter(placementConstraint -> placementConstraint != null
|
||||
&& placementConstraint.getConstraintExpr() != null)
|
||||
.map(PlacementConstraint::getConstraintExpr)
|
||||
.distinct()
|
||||
.collect(Collectors.toList());
|
||||
|
||||
// Compose an AND constraint
|
||||
// When merge request(RC), app(AC) and global constraint(GC),
|
||||
// we do a merge on them with CC=AND(GC, AC, RC) and returns a
|
||||
// composite AND constraint. Subsequently we check if CC could
|
||||
// be satisfied. This ensures that every level of constraint
|
||||
// is satisfied.
|
||||
PlacementConstraint.And andConstraint = PlacementConstraints.and(
|
||||
allConstraints.toArray(new PlacementConstraint
|
||||
.AbstractConstraint[allConstraints.size()]));
|
||||
return andConstraint.build();
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterApplication(ApplicationId appId) {
|
||||
try {
|
||||
|
@ -104,6 +104,19 @@ public interface PlacementConstraintManager {
|
||||
*/
|
||||
PlacementConstraint getGlobalConstraint(Set<String> sourceTags);
|
||||
|
||||
/**
|
||||
* Consider all levels of constraints (scheduling request, app, cluster) and
|
||||
* return a merged constraint.
|
||||
*
|
||||
* @param applicationId application ID
|
||||
* @param sourceTags a set of source allocation tags
|
||||
* @param schedulingRequestConstraint placement constraint at scheduling
|
||||
* request level
|
||||
* @return a merged placement constraint
|
||||
*/
|
||||
PlacementConstraint getMultilevelConstraint(ApplicationId applicationId,
|
||||
Set<String> sourceTags, PlacementConstraint schedulingRequestConstraint);
|
||||
|
||||
/**
|
||||
* Remove the constraints that correspond to a given application.
|
||||
*
|
||||
|
@ -248,22 +248,14 @@ public final class PlacementConstraintsUtil {
|
||||
SchedulingRequest request, SchedulerNode schedulerNode,
|
||||
PlacementConstraintManager pcm, AllocationTagsManager atm)
|
||||
throws InvalidAllocationTagsQueryException {
|
||||
// TODO do proper merge on different level of constraints, see YARN-7778.
|
||||
|
||||
// Request level constraint
|
||||
PlacementConstraint constraint = request.getPlacementConstraint();
|
||||
if (constraint == null) {
|
||||
// Application level constraint
|
||||
constraint = pcm.getConstraint(applicationId,
|
||||
request.getAllocationTags());
|
||||
if (constraint == null) {
|
||||
// Global level constraint
|
||||
constraint = pcm.getGlobalConstraint(request.getAllocationTags());
|
||||
if (constraint == null) {
|
||||
return true;
|
||||
}
|
||||
}
|
||||
Set<String> sourceTags = null;
|
||||
PlacementConstraint pc = null;
|
||||
if (request != null) {
|
||||
sourceTags = request.getAllocationTags();
|
||||
pc = request.getPlacementConstraint();
|
||||
}
|
||||
return canSatisfyConstraints(applicationId, constraint, schedulerNode, atm);
|
||||
return canSatisfyConstraints(applicationId,
|
||||
pcm.getMultilevelConstraint(applicationId, sourceTags, pc),
|
||||
schedulerNode, atm);
|
||||
}
|
||||
}
|
||||
|
@ -34,8 +34,11 @@ import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import java.util.stream.Stream;
|
||||
|
||||
import com.google.common.collect.Sets;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
|
||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
|
||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraint.And;
|
||||
import org.apache.hadoop.yarn.api.resource.PlacementConstraints;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
import org.junit.Assert;
|
||||
@ -179,4 +182,83 @@ public class TestPlacementConstraintManagerService {
|
||||
Assert.assertTrue(pcm.validateConstraint(sourceTag1, c1));
|
||||
Assert.assertFalse(pcm.validateConstraint(sourceTag4, c1));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testGetRequestConstraint() {
|
||||
// Request Constraint(RC), App Constraint(AC), Global Constraint(GC)
|
||||
PlacementConstraint constraint;
|
||||
And mergedConstraint;
|
||||
SchedulingRequest request;
|
||||
|
||||
// RC = c1
|
||||
// AC = null
|
||||
// GC = null
|
||||
constraint = pcm.getMultilevelConstraint(appId1, null, c1);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
Assert.assertEquals(1, mergedConstraint.getChildren().size());
|
||||
Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build());
|
||||
|
||||
// RC = null
|
||||
// AC = tag1->c1, tag2->c2
|
||||
// GC = null
|
||||
pcm.registerApplication(appId1, constraintMap1);
|
||||
// if the source tag in the request is not mapped to any existing
|
||||
// registered constraint, we should get an empty AND constraint.
|
||||
constraint = pcm.getMultilevelConstraint(appId1,
|
||||
Sets.newHashSet("not_exist_tag"), null);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
// AND()
|
||||
Assert.assertEquals(0, mergedConstraint.getChildren().size());
|
||||
// if a mapping is found for a given source tag
|
||||
constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, null);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
// AND(c1)
|
||||
Assert.assertEquals(1, mergedConstraint.getChildren().size());
|
||||
Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build());
|
||||
pcm.unregisterApplication(appId1);
|
||||
|
||||
// RC = null
|
||||
// AC = null
|
||||
// GC = tag1->c1
|
||||
pcm.addGlobalConstraint(sourceTag1, c1, true);
|
||||
constraint = pcm.getMultilevelConstraint(appId1,
|
||||
Sets.newHashSet(sourceTag1), null);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
// AND(c1)
|
||||
Assert.assertEquals(1, mergedConstraint.getChildren().size());
|
||||
Assert.assertEquals(c1, mergedConstraint.getChildren().get(0).build());
|
||||
pcm.removeGlobalConstraint(sourceTag1);
|
||||
|
||||
// RC = c2
|
||||
// AC = tag1->c1, tag2->c2
|
||||
// GC = tag1->c3
|
||||
pcm.addGlobalConstraint(sourceTag1, c3, true);
|
||||
pcm.registerApplication(appId1, constraintMap1);
|
||||
// both RC, AC and GC should be respected
|
||||
constraint = pcm.getMultilevelConstraint(appId1, sourceTag1, c2);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
// AND(c1, c2, c3)
|
||||
Assert.assertEquals(3, mergedConstraint.getChildren().size());
|
||||
pcm.removeGlobalConstraint(sourceTag1);
|
||||
pcm.unregisterApplication(appId1);
|
||||
|
||||
// RC = c1
|
||||
// AC = tag1->c1, tag2->c2
|
||||
// GC = tag1->c2
|
||||
pcm.addGlobalConstraint(sourceTag1, c2, true);
|
||||
pcm.registerApplication(appId1, constraintMap1);
|
||||
constraint = pcm.getMultilevelConstraint(appId1,
|
||||
Sets.newHashSet(sourceTag1), c1);
|
||||
Assert.assertTrue(constraint.getConstraintExpr() instanceof And);
|
||||
mergedConstraint = (And) constraint.getConstraintExpr();
|
||||
// AND(c1, c2)
|
||||
Assert.assertEquals(2, mergedConstraint.getChildren().size());
|
||||
pcm.removeGlobalConstraint(sourceTag1);
|
||||
pcm.unregisterApplication(appId1);
|
||||
}
|
||||
}
|
||||
|
@ -36,6 +36,8 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.Scheduli
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.AllocationTagsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.InvalidAllocationTagsQueryException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.MemoryPlacementConstraintManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.PlacementConstraintManager;
|
||||
import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -76,10 +78,13 @@ public class TestSingleConstraintAppPlacementAllocator {
|
||||
// Create allocation tags manager
|
||||
AllocationTagsManager allocationTagsManager = new AllocationTagsManager(
|
||||
rmContext);
|
||||
PlacementConstraintManager placementConstraintManager =
|
||||
new MemoryPlacementConstraintManager();
|
||||
spyAllocationTagsManager = spy(allocationTagsManager);
|
||||
schedulerRequestKey = new SchedulerRequestKey(Priority.newInstance(1), 2L,
|
||||
TestUtils.getMockContainerId(1, 1));
|
||||
rmContext.setAllocationTagsManager(spyAllocationTagsManager);
|
||||
rmContext.setPlacementConstraintManager(placementConstraintManager);
|
||||
|
||||
// Create allocator
|
||||
allocator = new SingleConstraintAppPlacementAllocator();
|
||||
|
Loading…
x
Reference in New Issue
Block a user