YARN-7839. Modify PlacementAlgorithm to Check node capacity before placing request on node. (Panagiotis Garefalakis via asuresh)

This commit is contained in:
Arun Suresh 2018-02-02 10:28:22 -08:00
parent 460d77bd64
commit 6e5ba9366f
8 changed files with 215 additions and 36 deletions

View File

@ -2609,10 +2609,6 @@ public class CapacityScheduler extends
" but only 1 will be attempted !!");
}
if (!appAttempt.isStopped()) {
Resource resource =
schedulingRequest.getResourceSizing().getResources();
schedulingRequest.getResourceSizing().setResources(
getNormalizedResource(resource));
ResourceCommitRequest<FiCaSchedulerApp, FiCaSchedulerNode>
resourceCommitRequest = createResourceCommitRequest(
appAttempt, schedulingRequest, schedulerNode);

View File

@ -18,10 +18,15 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.NodeId;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
@ -35,8 +40,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.Co
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.BatchedRequests;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor.NodeCandidateSelector;
import org.apache.hadoop.yarn.util.resource.ResourceCalculator;
import org.apache.hadoop.yarn.util.resource.Resources;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -56,25 +64,31 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
private LocalAllocationTagsManager tagsManager;
private PlacementConstraintManager constraintManager;
private NodeCandidateSelector nodeSelector;
private ResourceCalculator resourceCalculator;
@Override
public void init(RMContext rmContext) {
this.tagsManager = new LocalAllocationTagsManager(
rmContext.getAllocationTagsManager());
this.constraintManager = rmContext.getPlacementConstraintManager();
this.resourceCalculator = rmContext.getScheduler().getResourceCalculator();
this.nodeSelector =
filter -> ((AbstractYarnScheduler) (rmContext).getScheduler())
.getNodes(filter);
}
public boolean attemptPlacementOnNode(ApplicationId appId,
SchedulingRequest schedulingRequest, SchedulerNode schedulerNode)
boolean attemptPlacementOnNode(ApplicationId appId,
Resource availableResources, SchedulingRequest schedulingRequest,
SchedulerNode schedulerNode, boolean ignoreResourceCheck)
throws InvalidAllocationTagsQueryException {
if (PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest, schedulerNode, constraintManager, tagsManager)) {
return true;
}
return false;
boolean fitsInNode = ignoreResourceCheck ||
Resources.fitsIn(resourceCalculator,
schedulingRequest.getResourceSizing().getResources(),
availableResources);
boolean constraintsSatisfied =
PlacementConstraintsUtil.canSatisfyConstraints(appId,
schedulingRequest, schedulerNode, constraintManager, tagsManager);
return fitsInNode && constraintsSatisfied;
}
@ -82,17 +96,19 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
public void place(ConstraintPlacementAlgorithmInput input,
ConstraintPlacementAlgorithmOutputCollector collector) {
BatchedRequests requests = (BatchedRequests) input;
int placementAttempt = requests.getPlacementAttempt();
ConstraintPlacementAlgorithmOutput resp =
new ConstraintPlacementAlgorithmOutput(requests.getApplicationId());
List<SchedulerNode> allNodes = nodeSelector.selectNodes(null);
List<SchedulingRequest> rejectedRequests = new ArrayList<>();
Map<NodeId, Resource> availResources = new HashMap<>();
int rePlacementCount = RE_ATTEMPT_COUNT;
while (rePlacementCount > 0) {
doPlacement(requests, resp, allNodes, rejectedRequests);
doPlacement(requests, resp, allNodes, rejectedRequests, availResources);
// Double check if placement constraints are really satisfied
validatePlacement(requests.getApplicationId(), resp,
rejectedRequests);
rejectedRequests, availResources);
if (rejectedRequests.size() == 0 || rePlacementCount == 1) {
break;
}
@ -103,7 +119,10 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
rePlacementCount--;
}
resp.getRejectedRequests().addAll(rejectedRequests);
resp.getRejectedRequests().addAll(
rejectedRequests.stream().map(
x -> new SchedulingRequestWithPlacementAttempt(
placementAttempt, x)).collect(Collectors.toList()));
collector.collect(resp);
// Clean current temp-container tags
this.tagsManager.cleanTempContainers(requests.getApplicationId());
@ -112,7 +131,8 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
private void doPlacement(BatchedRequests requests,
ConstraintPlacementAlgorithmOutput resp,
List<SchedulerNode> allNodes,
List<SchedulingRequest> rejectedRequests) {
List<SchedulingRequest> rejectedRequests,
Map<NodeId, Resource> availableResources) {
Iterator<SchedulingRequest> requestIterator = requests.iterator();
Iterator<SchedulerNode> nIter = allNodes.iterator();
SchedulerNode lastSatisfiedNode = null;
@ -135,11 +155,17 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
try {
String tag = schedulingRequest.getAllocationTags() == null ? "" :
schedulingRequest.getAllocationTags().iterator().next();
Resource unallocatedResource =
availableResources.computeIfAbsent(node.getNodeID(),
x -> Resource.newInstance(node.getUnallocatedResource()));
if (!requests.getBlacklist(tag).contains(node.getNodeID()) &&
attemptPlacementOnNode(
requests.getApplicationId(), schedulingRequest, node)) {
requests.getApplicationId(), unallocatedResource,
schedulingRequest, node, false)) {
schedulingRequest.getResourceSizing()
.setNumAllocations(--numAllocs);
Resources.addTo(unallocatedResource,
schedulingRequest.getResourceSizing().getResources());
placedReq.getNodes().add(node);
numAllocs =
schedulingRequest.getResourceSizing().getNumAllocations();
@ -200,10 +226,12 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
* @param applicationId
* @param resp
* @param rejectedRequests
* @param availableResources
*/
private void validatePlacement(ApplicationId applicationId,
ConstraintPlacementAlgorithmOutput resp,
List<SchedulingRequest> rejectedRequests) {
List<SchedulingRequest> rejectedRequests,
Map<NodeId, Resource> availableResources) {
Iterator<PlacedSchedulingRequest> pReqIter =
resp.getPlacedRequests().iterator();
while (pReqIter.hasNext()) {
@ -217,10 +245,13 @@ public class DefaultPlacementAlgorithm implements ConstraintPlacementAlgorithm {
// Remove just the tags for this placement.
this.tagsManager.removeTempTags(node.getNodeID(),
applicationId, pReq.getSchedulingRequest().getAllocationTags());
if (!attemptPlacementOnNode(
applicationId, pReq.getSchedulingRequest(), node)) {
Resource availOnNode = availableResources.get(node.getNodeID());
if (!attemptPlacementOnNode(applicationId, availOnNode,
pReq.getSchedulingRequest(), node, true)) {
nodeIter.remove();
num++;
Resources.subtractFrom(availOnNode,
pReq.getSchedulingRequest().getResourceSizing().getResources());
} else {
// Add back the tags if everything is fine.
this.tagsManager.addTempTags(node.getNodeID(),

View File

@ -18,7 +18,6 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import java.util.ArrayList;
import java.util.List;
@ -41,14 +40,14 @@ public class ConstraintPlacementAlgorithmOutput {
private final List<PlacedSchedulingRequest> placedRequests =
new ArrayList<>();
private final List<SchedulingRequest> rejectedRequests =
private final List<SchedulingRequestWithPlacementAttempt> rejectedRequests =
new ArrayList<>();
public List<PlacedSchedulingRequest> getPlacedRequests() {
return placedRequests;
}
public List<SchedulingRequest> getRejectedRequests() {
public List<SchedulingRequestWithPlacementAttempt> getRejectedRequests() {
return rejectedRequests;
}

View File

@ -0,0 +1,52 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
* <p>
* http://www.apache.org/licenses/LICENSE-2.0
* <p>
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
/**
* Simple holder class encapsulating a SchedulingRequest
* with a placement attempt.
*/
public class SchedulingRequestWithPlacementAttempt {
private final int placementAttempt;
private final SchedulingRequest schedulingRequest;
public SchedulingRequestWithPlacementAttempt(int placementAttempt,
SchedulingRequest schedulingRequest) {
this.placementAttempt = placementAttempt;
this.schedulingRequest = schedulingRequest;
}
public int getPlacementAttempt() {
return placementAttempt;
}
public SchedulingRequest getSchedulingRequest() {
return schedulingRequest;
}
@Override
public String toString() {
return "SchedulingRequestWithPlacementAttempt{" +
"placementAttempt=" + placementAttempt +
", schedulingRequest=" + schedulingRequest +
'}';
}
}

View File

@ -109,7 +109,7 @@ public class BatchedRequests
}
public void addToBlacklist(Set<String> tags, SchedulerNode node) {
if (tags != null && !tags.isEmpty()) {
if (tags != null && !tags.isEmpty() && node != null) {
// We are currently assuming a single allocation tag
// per scheduler request currently.
blacklist.computeIfAbsent(tags.iterator().next(),

View File

@ -18,12 +18,12 @@
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.processor;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutput;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithmOutputCollector;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@ -49,7 +49,7 @@ class PlacementDispatcher implements
private Map<ApplicationId, List<PlacedSchedulingRequest>>
placedRequests = new ConcurrentHashMap<>();
private Map<ApplicationId, List<SchedulingRequest>>
private Map<ApplicationId, List<SchedulingRequestWithPlacementAttempt>>
rejectedRequests = new ConcurrentHashMap<>();
public void init(RMContext rmContext,
@ -90,12 +90,12 @@ class PlacementDispatcher implements
return Collections.EMPTY_LIST;
}
public List<SchedulingRequest> pullRejectedRequests(
public List<SchedulingRequestWithPlacementAttempt> pullRejectedRequests(
ApplicationId applicationId) {
List<SchedulingRequest> rejectedReqs =
List<SchedulingRequestWithPlacementAttempt> rejectedReqs =
this.rejectedRequests.get(applicationId);
if (rejectedReqs != null && !rejectedReqs.isEmpty()) {
List<SchedulingRequest> retList = new ArrayList<>();
List<SchedulingRequestWithPlacementAttempt> retList = new ArrayList<>();
synchronized (rejectedReqs) {
if (rejectedReqs.size() > 0) {
retList.addAll(rejectedReqs);
@ -130,7 +130,7 @@ class PlacementDispatcher implements
}
}
if (!placement.getRejectedRequests().isEmpty()) {
List<SchedulingRequest> rejected =
List<SchedulingRequestWithPlacementAttempt> rejected =
rejectedRequests.computeIfAbsent(
placement.getApplicationId(), k -> new ArrayList());
LOG.warn(

View File

@ -30,6 +30,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
import org.apache.hadoop.yarn.api.records.ApplicationId;
import org.apache.hadoop.yarn.api.records.RejectedSchedulingRequest;
import org.apache.hadoop.yarn.api.records.RejectionReason;
import org.apache.hadoop.yarn.api.records.Resource;
import org.apache.hadoop.yarn.api.records.ResourceSizing;
import org.apache.hadoop.yarn.api.records.SchedulingRequest;
import org.apache.hadoop.yarn.api.resource.PlacementConstraint;
@ -41,6 +42,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.Placem
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.algorithm.DefaultPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.ConstraintPlacementAlgorithm;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.PlacedSchedulingRequest;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingRequestWithPlacementAttempt;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.constraint.api.SchedulingResponse;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.AbstractYarnScheduler;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerApplicationAttempt;
@ -208,6 +210,12 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void dispatchRequestsForPlacement(ApplicationAttemptId appAttemptId,
List<SchedulingRequest> schedulingRequests) {
if (schedulingRequests != null && !schedulingRequests.isEmpty()) {
// Normalize the Requests before dispatching
schedulingRequests.forEach(req -> {
Resource reqResource = req.getResourceSizing().getResources();
req.getResourceSizing()
.setResources(this.scheduler.getNormalizedResource(reqResource));
});
this.placementDispatcher.dispatch(new BatchedRequests(iteratorType,
appAttemptId.getApplicationId(), schedulingRequests, 1));
}
@ -261,20 +269,28 @@ public class PlacementProcessor implements ApplicationMasterServiceProcessor {
private void handleRejectedRequests(ApplicationAttemptId appAttemptId,
AllocateResponse response) {
List<SchedulingRequest> rejectedRequests =
List<SchedulingRequestWithPlacementAttempt> rejectedAlgoRequests =
this.placementDispatcher.pullRejectedRequests(
appAttemptId.getApplicationId());
if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
if (rejectedAlgoRequests != null && !rejectedAlgoRequests.isEmpty()) {
LOG.warn("Following requests of [{}] were rejected by" +
" the PlacementAlgorithmOutput Algorithm: {}",
appAttemptId.getApplicationId(), rejectedRequests);
appAttemptId.getApplicationId(), rejectedAlgoRequests);
rejectedAlgoRequests.stream()
.filter(req -> req.getPlacementAttempt() < retryAttempts)
.forEach(req -> handleSchedulingResponse(
new Response(false, appAttemptId.getApplicationId(),
req.getSchedulingRequest(), req.getPlacementAttempt(),
null)));
ApplicationMasterServiceUtils.addToRejectedSchedulingRequests(response,
rejectedRequests.stream()
rejectedAlgoRequests.stream()
.filter(req -> req.getPlacementAttempt() >= retryAttempts)
.map(sr -> RejectedSchedulingRequest.newInstance(
RejectionReason.COULD_NOT_PLACE_ON_NODE, sr))
RejectionReason.COULD_NOT_PLACE_ON_NODE,
sr.getSchedulingRequest()))
.collect(Collectors.toList()));
}
rejectedRequests =
List<SchedulingRequest> rejectedRequests =
this.requestsToReject.get(appAttemptId.getApplicationId());
if (rejectedRequests != null && !rejectedRequests.isEmpty()) {
synchronized (rejectedRequests) {

View File

@ -371,6 +371,91 @@ public class TestPlacementProcessor {
@Test(timeout = 300000)
public void testSchedulerRejection() throws Exception {
stopRM();
CapacitySchedulerConfiguration csConf =
new CapacitySchedulerConfiguration();
csConf.setQueues(CapacitySchedulerConfiguration.ROOT,
new String[] {"a", "b"});
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".a", 15.0f);
csConf.setCapacity(CapacitySchedulerConfiguration.ROOT + ".b", 85.0f);
YarnConfiguration conf = new YarnConfiguration(csConf);
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
ResourceScheduler.class);
conf.setBoolean(
YarnConfiguration.RM_PLACEMENT_CONSTRAINTS_ENABLED, true);
startRM(conf);
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
MockNM nm2 = new MockNM("h2:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm2.getNodeId(), nm2);
MockNM nm3 = new MockNM("h3:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm3.getNodeId(), nm3);
MockNM nm4 = new MockNM("h4:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm4.getNodeId(), nm4);
nm1.registerNode();
nm2.registerNode();
nm3.registerNode();
nm4.registerNode();
RMApp app1 = rm.submitApp(1 * GB, "app", "user", null, "a");
// Containers with allocationTag 'foo' are restricted to 1 per NODE
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm2,
Collections.singletonMap(
Collections.singleton("foo"),
PlacementConstraints.build(
PlacementConstraints.targetNotIn(NODE, allocationTag("foo")))
));
am1.addSchedulingRequest(
Arrays.asList(
schedulingRequest(1, 1, 1, 512, "foo"),
schedulingRequest(1, 2, 1, 512, "foo"),
schedulingRequest(1, 3, 1, 512, "foo"),
// Ask for a container larger than the node
schedulingRequest(1, 4, 1, 512, "foo"))
);
AllocateResponse allocResponse = am1.schedule(); // send the request
List<Container> allocatedContainers = new ArrayList<>();
List<RejectedSchedulingRequest> rejectedReqs = new ArrayList<>();
int allocCount = 1;
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
// kick the scheduler
while (allocCount < 11) {
nm1.nodeHeartbeat(true);
nm2.nodeHeartbeat(true);
nm3.nodeHeartbeat(true);
nm4.nodeHeartbeat(true);
LOG.info("Waiting for containers to be created for app 1...");
sleep(1000);
allocResponse = am1.schedule();
allocatedContainers.addAll(allocResponse.getAllocatedContainers());
rejectedReqs.addAll(allocResponse.getRejectedSchedulingRequests());
allocCount++;
if (rejectedReqs.size() > 0 && allocatedContainers.size() > 2) {
break;
}
}
Assert.assertEquals(3, allocatedContainers.size());
Set<NodeId> nodeIds = allocatedContainers.stream()
.map(x -> x.getNodeId()).collect(Collectors.toSet());
// Ensure unique nodes
Assert.assertEquals(3, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();
// Verify Metrics
verifyMetrics(metrics, 12288, 12, 4096, 4, 4);
}
@Test(timeout = 300000)
public void testNodeCapacityRejection() throws Exception {
HashMap<NodeId, MockNM> nodes = new HashMap<>();
MockNM nm1 = new MockNM("h1:1234", 4096, rm.getResourceTrackerService());
nodes.put(nm1.getNodeId(), nm1);
@ -432,7 +517,7 @@ public class TestPlacementProcessor {
Assert.assertEquals(3, nodeIds.size());
RejectedSchedulingRequest rej = rejectedReqs.get(0);
Assert.assertEquals(4, rej.getRequest().getAllocationRequestId());
Assert.assertEquals(RejectionReason.COULD_NOT_SCHEDULE_ON_NODE,
Assert.assertEquals(RejectionReason.COULD_NOT_PLACE_ON_NODE,
rej.getReason());
QueueMetrics metrics = rm.getResourceScheduler().getRootQueueMetrics();