From a4c539fcdba817e313b2375abf2c4c9a1d13a4fd Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Tue, 23 Jan 2018 08:15:58 -0800 Subject: [PATCH] YARN-7783. Add validation step to ensure constraints are not violated due to order in which a request is processed. (asuresh) --- .../algorithm/DefaultPlacementAlgorithm.java | 119 ++++++++++++++++-- .../constraint/TestPlacementProcessor.java | 49 ++++++++ 2 files changed, 155 insertions(+), 13 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 9887749e3a..4e6473f56c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -22,6 +22,7 @@ import java.util.Iterator; import java.util.List; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; @@ -69,13 +70,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { public boolean attemptPlacementOnNode(ApplicationId appId, SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) throws InvalidAllocationTagsQueryException { - int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); - if (numAllocs > 0) { - if (PlacementConstraintsUtil.canSatisfyConstraints(appId, - schedulingRequest, schedulerNode, - constraintManager, tagsManager)) { - return true; - } + if (PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest, schedulerNode, constraintManager, tagsManager)) { + return true; } return false; } @@ -93,6 +90,9 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { int rePlacementCount = RE_ATTEMPT_COUNT; while (rePlacementCount > 0) { doPlacement(requests, resp, allNodes, rejectedRequests); + // Double check if placement constraints are really satisfied + validatePlacement(requests.getApplicationId(), resp, + rejectedRequests); if (rejectedRequests.size() == 0 || rePlacementCount == 1) { break; } @@ -122,9 +122,14 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { break; } SchedulingRequest schedulingRequest = requestIterator.next(); + PlacedSchedulingRequest placedReq = + new PlacedSchedulingRequest(schedulingRequest); + placedReq.setPlacementAttempt(requests.getPlacementAttempt()); + resp.getPlacedRequests().add(placedReq); CircularIterator nodeIter = new CircularIterator(lastSatisfiedNode, nIter, allNodes); - int numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); + int numAllocs = + schedulingRequest.getResourceSizing().getNumAllocations(); while (nodeIter.hasNext() && numAllocs > 0) { SchedulerNode node = nodeIter.next(); try { @@ -135,11 +140,7 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { requests.getApplicationId(), schedulingRequest, node)) { schedulingRequest.getResourceSizing() .setNumAllocations(--numAllocs); - PlacedSchedulingRequest placedReq = - new PlacedSchedulingRequest(schedulingRequest); - placedReq.setPlacementAttempt(requests.getPlacementAttempt()); placedReq.getNodes().add(node); - resp.getPlacedRequests().add(placedReq); numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); // Add temp-container tags for current placement cycle @@ -156,6 +157,98 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { // Add all requests whose numAllocations still > 0 to rejected list. requests.getSchedulingRequests().stream() .filter(sReq -> sReq.getResourceSizing().getNumAllocations() > 0) - .forEach(rejReq -> rejectedRequests.add(rejReq)); + .forEach(rejReq -> rejectedRequests.add(cloneReq(rejReq))); } + + /** + * During the placement phase, allocation tags are added to the node if the + * constraint is satisfied, But depending on the order in which the + * algorithm sees the request, it is possible that a constraint that happened + * to be valid during placement of an earlier-seen request, might not be + * valid after all subsequent requests have been placed. + * + * For eg: + * Assume nodes n1, n2, n3, n4 and n5 + * + * Consider the 2 constraints: + * 1) "foo", anti-affinity with "foo" + * 2) "bar", anti-affinity with "foo" + * + * And 2 requests + * req1: NumAllocations = 4, allocTags = [foo] + * req2: NumAllocations = 1, allocTags = [bar] + * + * If "req1" is seen first, the algorithm can place the 4 containers in + * n1, n2, n3 and n4. And when it gets to "req2", it will see that 4 nodes + * with the "foo" tag and will place on n5. + * But if "req2" is seem first, then "bar" will be placed on any node, + * since no node currently has "foo", and when it gets to "req1", since + * "foo" has not anti-affinity with "bar", the algorithm can end up placing + * "foo" on a node with "bar" violating the second constraint. + * + * To prevent the above, we need a validation step: after the placements for a + * batch of requests are made, for each req, we remove its tags from the node + * and try to see of constraints are still satisfied if the tag were to be + * added back on the node. + * + * When applied to the example above, after "req2" and "req1" are placed, + * we remove the "bar" tag from the node and try to add it back on the node. + * This time, constraint satisfaction will fail, since there is now a "foo" + * tag on the node and "bar" cannot be added. The algorithm will then + * retry placing "req2" on another node. + * + * @param applicationId + * @param resp + * @param rejectedRequests + */ + private void validatePlacement(ApplicationId applicationId, + ConstraintPlacementAlgorithmOutput resp, + List rejectedRequests) { + Iterator pReqIter = + resp.getPlacedRequests().iterator(); + while (pReqIter.hasNext()) { + PlacedSchedulingRequest pReq = pReqIter.next(); + Iterator nodeIter = pReq.getNodes().iterator(); + // Assuming all reqs were satisfied. + int num = 0; + while (nodeIter.hasNext()) { + SchedulerNode node = nodeIter.next(); + try { + // Remove just the tags for this placement. + this.tagsManager.removeTempTags(node.getNodeID(), + applicationId, pReq.getSchedulingRequest().getAllocationTags()); + if (!attemptPlacementOnNode( + applicationId, pReq.getSchedulingRequest(), node)) { + nodeIter.remove(); + num++; + } else { + // Add back the tags if everything is fine. + this.tagsManager.addTempTags(node.getNodeID(), + applicationId, pReq.getSchedulingRequest().getAllocationTags()); + } + } catch (InvalidAllocationTagsQueryException e) { + LOG.warn("Got exception from TagManager !", e); + } + } + if (num > 0) { + SchedulingRequest sReq = cloneReq(pReq.getSchedulingRequest()); + sReq.getResourceSizing().setNumAllocations(num); + rejectedRequests.add(sReq); + } + if (pReq.getNodes().isEmpty()) { + pReqIter.remove(); + } + } + } + + private static SchedulingRequest cloneReq(SchedulingRequest sReq) { + return SchedulingRequest.newInstance( + sReq.getAllocationRequestId(), sReq.getPriority(), + sReq.getExecutionType(), sReq.getAllocationTags(), + ResourceSizing.newInstance( + sReq.getResourceSizing().getNumAllocations(), + sReq.getResourceSizing().getResources()), + sReq.getPlacementConstraint()); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index 65daeb8ff4..8426b20b79 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -150,6 +150,55 @@ public class TestPlacementProcessor { Assert.assertEquals(4, nodeIds.size()); } + @Test(timeout = 300000) + public void testMutualAntiAffinityPlacement() throws Exception { + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + MockNM nm5 = new MockNM("h5:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm5.getNodeId(), nm5); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + nm5.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "default"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE + Map, PlacementConstraint> pcMap = new HashMap<>(); + pcMap.put(Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); + pcMap.put(Collections.singleton("bar"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, pcMap); + am1.addSchedulingRequest( + Arrays.asList(schedulingRequest(1, 1, 1, 512, "bar"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + schedulingRequest(1, 4, 1, 512, "foo"), + schedulingRequest(1, 5, 1, 512, "foo"))); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + + // kick the scheduler + waitForContainerAllocation(nodes.values(), am1, allocatedContainers, 5); + + Assert.assertEquals(5, allocatedContainers.size()); + Set nodeIds = allocatedContainers.stream().map(x -> x.getNodeId()) + .collect(Collectors.toSet()); + // Ensure unique nodes (antiaffinity) + Assert.assertEquals(5, nodeIds.size()); + } + @Test(timeout = 300000) public void testCardinalityPlacement() throws Exception { HashMap nodes = new HashMap<>();