From 6e5ba9366fc05719906ff2789b1a0fd26001182b Mon Sep 17 00:00:00 2001 From: Arun Suresh Date: Fri, 2 Feb 2018 10:28:22 -0800 Subject: [PATCH] YARN-7839. Modify PlacementAlgorithm to Check node capacity before placing request on node. (Panagiotis Garefalakis via asuresh) --- .../scheduler/capacity/CapacityScheduler.java | 4 - .../algorithm/DefaultPlacementAlgorithm.java | 61 +++++++++---- .../ConstraintPlacementAlgorithmOutput.java | 5 +- ...SchedulingRequestWithPlacementAttempt.java | 52 +++++++++++ .../constraint/processor/BatchedRequests.java | 2 +- .../processor/PlacementDispatcher.java | 12 +-- .../processor/PlacementProcessor.java | 28 ++++-- .../constraint/TestPlacementProcessor.java | 87 ++++++++++++++++++- 8 files changed, 215 insertions(+), 36 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index cb01351718..d3aa5cbe72 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2609,10 +2609,6 @@ public class CapacityScheduler extends " but only 1 will be attempted !!"); } if (!appAttempt.isStopped()) { - Resource resource = - schedulingRequest.getResourceSizing().getResources(); - schedulingRequest.getResourceSizing().setResources( - getNormalizedResource(resource)); ResourceCommitRequest resourceCommitRequest = createResourceCommitRequest( appAttempt, schedulingRequest, schedulerNode); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java index 4e6473f56c..710e6c0863 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/algorithm/DefaultPlacementAlgorithm.java @@ -18,10 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm; import java.util.ArrayList; +import java.util.HashMap; import java.util.Iterator; import java.util.List; +import java.util.Map; +import java.util.stream.Collectors; import org.apache.hadoop.yarn.api.records.ApplicationId; +import org.apache.hadoop.yarn.api.records.NodeId; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; @@ -35,8 +40,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.Co import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector; +import org.apache.hadoop.yarn.util.resource.ResourceCalculator; +import org.apache.hadoop.yarn.util.resource.Resources; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -56,25 +64,31 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { private LocalAllocationTagsManager tagsManager; private PlacementConstraintManager constraintManager; private NodeCandidateSelector nodeSelector; + private ResourceCalculator resourceCalculator; @Override public void init(RMContext rmContext) { this.tagsManager = new LocalAllocationTagsManager( rmContext.getAllocationTagsManager()); this.constraintManager = rmContext.getPlacementConstraintManager(); + this.resourceCalculator = rmContext.getScheduler().getResourceCalculator(); this.nodeSelector = filter -> ((AbstractYarnScheduler) (rmContext).getScheduler()) .getNodes(filter); } - public boolean attemptPlacementOnNode(ApplicationId appId, - SchedulingRequest schedulingRequest, SchedulerNode schedulerNode) + boolean attemptPlacementOnNode(ApplicationId appId, + Resource availableResources, SchedulingRequest schedulingRequest, + SchedulerNode schedulerNode, boolean ignoreResourceCheck) throws InvalidAllocationTagsQueryException { - if (PlacementConstraintsUtil.canSatisfyConstraints(appId, - schedulingRequest, schedulerNode, constraintManager, tagsManager)) { - return true; - } - return false; + boolean fitsInNode = ignoreResourceCheck || + Resources.fitsIn(resourceCalculator, + schedulingRequest.getResourceSizing().getResources(), + availableResources); + boolean constraintsSatisfied = + PlacementConstraintsUtil.canSatisfyConstraints(appId, + schedulingRequest, schedulerNode, constraintManager, tagsManager); + return fitsInNode && constraintsSatisfied; } @@ -82,17 +96,19 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { public void place(ConstraintPlacementAlgorithmInput input, ConstraintPlacementAlgorithmOutputCollector collector) { BatchedRequests requests = (BatchedRequests) input; + int placementAttempt = requests.getPlacementAttempt(); ConstraintPlacementAlgorithmOutput resp = new ConstraintPlacementAlgorithmOutput(requests.getApplicationId()); List allNodes = nodeSelector.selectNodes(null); List rejectedRequests = new ArrayList<>(); + Map availResources = new HashMap<>(); int rePlacementCount = RE_ATTEMPT_COUNT; while (rePlacementCount > 0) { - doPlacement(requests, resp, allNodes, rejectedRequests); + doPlacement(requests, resp, allNodes, rejectedRequests, availResources); // Double check if placement constraints are really satisfied validatePlacement(requests.getApplicationId(), resp, - rejectedRequests); + rejectedRequests, availResources); if (rejectedRequests.size() == 0 || rePlacementCount == 1) { break; } @@ -103,7 +119,10 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { rePlacementCount--; } - resp.getRejectedRequests().addAll(rejectedRequests); + resp.getRejectedRequests().addAll( + rejectedRequests.stream().map( + x -> new SchedulingRequestWithPlacementAttempt( + placementAttempt, x)).collect(Collectors.toList())); collector.collect(resp); // Clean current temp-container tags this.tagsManager.cleanTempContainers(requests.getApplicationId()); @@ -112,7 +131,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { private void doPlacement(BatchedRequests requests, ConstraintPlacementAlgorithmOutput resp, List allNodes, - List rejectedRequests) { + List rejectedRequests, + Map availableResources) { Iterator requestIterator = requests.iterator(); Iterator nIter = allNodes.iterator(); SchedulerNode lastSatisfiedNode = null; @@ -135,11 +155,17 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { try { String tag = schedulingRequest.getAllocationTags() == null ? "" : schedulingRequest.getAllocationTags().iterator().next(); + Resource unallocatedResource = + availableResources.computeIfAbsent(node.getNodeID(), + x -> Resource.newInstance(node.getUnallocatedResource())); if (!requests.getBlacklist(tag).contains(node.getNodeID()) && attemptPlacementOnNode( - requests.getApplicationId(), schedulingRequest, node)) { + requests.getApplicationId(), unallocatedResource, + schedulingRequest, node, false)) { schedulingRequest.getResourceSizing() .setNumAllocations(--numAllocs); + Resources.addTo(unallocatedResource, + schedulingRequest.getResourceSizing().getResources()); placedReq.getNodes().add(node); numAllocs = schedulingRequest.getResourceSizing().getNumAllocations(); @@ -200,10 +226,12 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { * @param applicationId * @param resp * @param rejectedRequests + * @param availableResources */ private void validatePlacement(ApplicationId applicationId, ConstraintPlacementAlgorithmOutput resp, - List rejectedRequests) { + List rejectedRequests, + Map availableResources) { Iterator pReqIter = resp.getPlacedRequests().iterator(); while (pReqIter.hasNext()) { @@ -217,10 +245,13 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm { // Remove just the tags for this placement. this.tagsManager.removeTempTags(node.getNodeID(), applicationId, pReq.getSchedulingRequest().getAllocationTags()); - if (!attemptPlacementOnNode( - applicationId, pReq.getSchedulingRequest(), node)) { + Resource availOnNode = availableResources.get(node.getNodeID()); + if (!attemptPlacementOnNode(applicationId, availOnNode, + pReq.getSchedulingRequest(), node, true)) { nodeIter.remove(); num++; + Resources.subtractFrom(availOnNode, + pReq.getSchedulingRequest().getResourceSizing().getResources()); } else { // Add back the tags if everything is fine. this.tagsManager.addTempTags(node.getNodeID(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java index 9571f0e650..952bfbf01f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/ConstraintPlacementAlgorithmOutput.java @@ -18,7 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.SchedulingRequest; import java.util.ArrayList; import java.util.List; @@ -41,14 +40,14 @@ public class ConstraintPlacementAlgorithmOutput { private final List placedRequests = new ArrayList<>(); - private final List rejectedRequests = + private final List rejectedRequests = new ArrayList<>(); public List getPlacedRequests() { return placedRequests; } - public List getRejectedRequests() { + public List getRejectedRequests() { return rejectedRequests; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java new file mode 100644 index 0000000000..e14d23557a --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/api/SchedulingRequestWithPlacementAttempt.java @@ -0,0 +1,52 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + *

+ * http://www.apache.org/licenses/LICENSE-2.0 + *

+ * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api; + +import org.apache.hadoop.yarn.api.records.SchedulingRequest; + +/** + * Simple holder class encapsulating a SchedulingRequest + * with a placement attempt. + */ +public class SchedulingRequestWithPlacementAttempt { + + private final int placementAttempt; + private final SchedulingRequest schedulingRequest; + + public SchedulingRequestWithPlacementAttempt(int placementAttempt, + SchedulingRequest schedulingRequest) { + this.placementAttempt = placementAttempt; + this.schedulingRequest = schedulingRequest; + } + + public int getPlacementAttempt() { + return placementAttempt; + } + + public SchedulingRequest getSchedulingRequest() { + return schedulingRequest; + } + + @Override + public String toString() { + return "SchedulingRequestWithPlacementAttempt{" + + "placementAttempt=" + placementAttempt + + ", schedulingRequest=" + schedulingRequest + + '}'; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java index 8e39b638c5..6badfee03c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/BatchedRequests.java @@ -109,7 +109,7 @@ public class BatchedRequests } public void addToBlacklist(Set tags, SchedulerNode node) { - if (tags != null && !tags.isEmpty()) { + if (tags != null && !tags.isEmpty() && node != null) { // We are currently assuming a single allocation tag // per scheduler request currently. blacklist.computeIfAbsent(tags.iterator().next(), diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java index 6a00ba82c2..849eb210b1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementDispatcher.java @@ -18,12 +18,12 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor; import org.apache.hadoop.yarn.api.records.ApplicationId; -import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -49,7 +49,7 @@ class PlacementDispatcher implements private Map> placedRequests = new ConcurrentHashMap<>(); - private Map> + private Map> rejectedRequests = new ConcurrentHashMap<>(); public void init(RMContext rmContext, @@ -90,12 +90,12 @@ class PlacementDispatcher implements return Collections.EMPTY_LIST; } - public List pullRejectedRequests( + public List pullRejectedRequests( ApplicationId applicationId) { - List rejectedReqs = + List rejectedReqs = this.rejectedRequests.get(applicationId); if (rejectedReqs != null && !rejectedReqs.isEmpty()) { - List retList = new ArrayList<>(); + List retList = new ArrayList<>(); synchronized (rejectedReqs) { if (rejectedReqs.size() > 0) { retList.addAll(rejectedReqs); @@ -130,7 +130,7 @@ class PlacementDispatcher implements } } if (!placement.getRejectedRequests().isEmpty()) { - List rejected = + List rejected = rejectedRequests.computeIfAbsent( placement.getApplicationId(), k -> new ArrayList()); LOG.warn( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java index 2a6b889c8b..9ce38f4fdd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/processor/PlacementProcessor.java @@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest; import org.apache.hadoop.yarn.api.records.RejectionReason; +import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceSizing; import org.apache.hadoop.yarn.api.records.SchedulingRequest; import org.apache.hadoop.yarn.api.resource.PlacementConstraint; @@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.Placem import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt; @@ -208,6 +210,12 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor { private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId, List schedulingRequests) { if (schedulingRequests != null && !schedulingRequests.isEmpty()) { + // Normalize the Requests before dispatching + schedulingRequests.forEach(req -> { + Resource reqResource = req.getResourceSizing().getResources(); + req.getResourceSizing() + .setResources(this.scheduler.getNormalizedResource(reqResource)); + }); this.placementDispatcher.dispatch(new BatchedRequests(iteratorType, appAttemptId.getApplicationId(), schedulingRequests, 1)); } @@ -261,20 +269,28 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor { private void handleRejectedRequests(ApplicationAttemptId appAttemptId, AllocateResponse response) { - List rejectedRequests = + List rejectedAlgoRequests = this.placementDispatcher.pullRejectedRequests( appAttemptId.getApplicationId()); - if (rejectedRequests != null && !rejectedRequests.isEmpty()) { + if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) { LOG.warn("Following requests of [{}] were rejected by" + " the PlacementAlgorithmOutput Algorithm: {}", - appAttemptId.getApplicationId(), rejectedRequests); + appAttemptId.getApplicationId(), rejectedAlgoRequests); + rejectedAlgoRequests.stream() + .filter(req -> req.getPlacementAttempt() < retryAttempts) + .forEach(req -> handleSchedulingResponse( + new Response(false, appAttemptId.getApplicationId(), + req.getSchedulingRequest(), req.getPlacementAttempt(), + null))); ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response, - rejectedRequests.stream() + rejectedAlgoRequests.stream() + .filter(req -> req.getPlacementAttempt() >= retryAttempts) .map(sr -> RejectedSchedulingRequest.newInstance( - RejectionReason.COULD_NOT_PLACE_ON_NODE, sr)) + RejectionReason.COULD_NOT_PLACE_ON_NODE, + sr.getSchedulingRequest())) .collect(Collectors.toList())); } - rejectedRequests = + List rejectedRequests = this.requestsToReject.get(appAttemptId.getApplicationId()); if (rejectedRequests != null && !rejectedRequests.isEmpty()) { synchronized (rejectedRequests) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java index a530230404..c4c0b5df47 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/constraint/TestPlacementProcessor.java @@ -371,6 +371,91 @@ public class TestPlacementProcessor { @Test(timeout = 300000) public void testSchedulerRejection() throws Exception { + stopRM(); + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setQueues(CapacitySchedulerConfiguration.ROOT, + new String[] {"a", "b"}); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 15.0f); + csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 85.0f); + YarnConfiguration conf = new YarnConfiguration(csConf); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + conf.setBoolean( + YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true); + startRM(conf); + + HashMap nodes = new HashMap<>(); + MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm1.getNodeId(), nm1); + MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm2.getNodeId(), nm2); + MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm3.getNodeId(), nm3); + MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService()); + nodes.put(nm4.getNodeId(), nm4); + nm1.registerNode(); + nm2.registerNode(); + nm3.registerNode(); + nm4.registerNode(); + + RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a"); + // Containers with allocationTag 'foo' are restricted to 1 per NODE + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2, + Collections.singletonMap( + Collections.singleton("foo"), + PlacementConstraints.build( + PlacementConstraints.targetNotIn(NODE, allocationTag("foo"))) + )); + am1.addSchedulingRequest( + Arrays.asList( + schedulingRequest(1, 1, 1, 512, "foo"), + schedulingRequest(1, 2, 1, 512, "foo"), + schedulingRequest(1, 3, 1, 512, "foo"), + // Ask for a container larger than the node + schedulingRequest(1, 4, 1, 512, "foo")) + ); + AllocateResponse allocResponse = am1.schedule(); // send the request + List allocatedContainers = new ArrayList<>(); + List rejectedReqs = new ArrayList<>(); + int allocCount = 1; + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + + // kick the scheduler + while (allocCount < 11) { + nm1.nodeHeartbeat(true); + nm2.nodeHeartbeat(true); + nm3.nodeHeartbeat(true); + nm4.nodeHeartbeat(true); + LOG.info("Waiting for containers to be created for app 1..."); + sleep(1000); + allocResponse = am1.schedule(); + allocatedContainers.addAll(allocResponse.getAllocatedContainers()); + rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests()); + allocCount++; + if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) { + break; + } + } + + Assert.assertEquals(3, allocatedContainers.size()); + Set nodeIds = allocatedContainers.stream() + .map(x -> x.getNodeId()).collect(Collectors.toSet()); + // Ensure unique nodes + Assert.assertEquals(3, nodeIds.size()); + RejectedSchedulingRequest rej = rejectedReqs.get(0); + Assert.assertEquals(4, rej.getRequest().getAllocationRequestId()); + Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, + rej.getReason()); + + QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics(); + // Verify Metrics + verifyMetrics(metrics, 12288, 12, 4096, 4, 4); + } + + @Test(timeout = 300000) + public void testNodeCapacityRejection() throws Exception { HashMap nodes = new HashMap<>(); MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService()); nodes.put(nm1.getNodeId(), nm1); @@ -432,7 +517,7 @@ public class TestPlacementProcessor { Assert.assertEquals(3, nodeIds.size()); RejectedSchedulingRequest rej = rejectedReqs.get(0); Assert.assertEquals(4, rej.getRequest().getAllocationRequestId()); - Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE, + Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE, rej.getReason()); QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();