YARN-4519. Potential deadlock of CapacityScheduler between decrease container and assign containers. Contributed by Meng Ding
This commit is contained in:
parent
41da9a0773
commit
7f46636495
@ -170,6 +170,9 @@ Release 2.9.0 - UNRELEASED
|
||||
YARN-4633. Fix random test failure in TestRMRestart#testRMRestartAfterPreemption
|
||||
(Bibin A Chundatt via rohithsharmaks)
|
||||
|
||||
YARN-4519. Potential deadlock of CapacityScheduler between decrease container
|
||||
and assign containers. (Meng Ding via jianhe)
|
||||
|
||||
Release 2.8.0 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -53,9 +53,10 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.SchedContainerChangeRequest;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler;
|
||||
import org.apache.hadoop.yarn.server.utils.BuilderUtils;
|
||||
@ -114,43 +115,25 @@ public static void normalizeAndValidateRequests(List<ResourceRequest> ask,
|
||||
queueName, scheduler, rmContext, queueInfo);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
* Normalize container increase/decrease request, it will normalize and update
|
||||
* ContainerResourceChangeRequest.targetResource
|
||||
* Validate increase/decrease request. This function must be called under
|
||||
* the queue lock to make sure that the access to container resource is
|
||||
* atomic. Refer to LeafQueue.decreaseContainer() and
|
||||
* CapacityScheduelr.updateIncreaseRequests()
|
||||
*
|
||||
*
|
||||
* <pre>
|
||||
* - Throw exception when any other error happens
|
||||
* </pre>
|
||||
*/
|
||||
public static void checkAndNormalizeContainerChangeRequest(
|
||||
RMContext rmContext, ContainerResourceChangeRequest request,
|
||||
boolean increase) throws InvalidResourceRequestException {
|
||||
public static void checkSchedContainerChangeRequest(
|
||||
SchedContainerChangeRequest request, boolean increase)
|
||||
throws InvalidResourceRequestException {
|
||||
RMContext rmContext = request.getRmContext();
|
||||
ContainerId containerId = request.getContainerId();
|
||||
ResourceScheduler scheduler = rmContext.getScheduler();
|
||||
RMContainer rmContainer = scheduler.getRMContainer(containerId);
|
||||
ResourceCalculator rc = scheduler.getResourceCalculator();
|
||||
|
||||
if (null == rmContainer) {
|
||||
String msg =
|
||||
"Failed to get rmContainer for "
|
||||
+ (increase ? "increase" : "decrease")
|
||||
+ " request, with container-id=" + containerId;
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
|
||||
if (rmContainer.getState() != RMContainerState.RUNNING) {
|
||||
String msg =
|
||||
"rmContainer's state is not RUNNING, for "
|
||||
+ (increase ? "increase" : "decrease")
|
||||
+ " request, with container-id=" + containerId;
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
|
||||
Resource targetResource = Resources.normalize(rc, request.getCapability(),
|
||||
scheduler.getMinimumResourceCapability(),
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
scheduler.getMinimumResourceCapability());
|
||||
RMContainer rmContainer = request.getRMContainer();
|
||||
Resource targetResource = request.getTargetCapacity();
|
||||
|
||||
// Compare targetResource and original resource
|
||||
Resource originalResource = rmContainer.getAllocatedResource();
|
||||
@ -181,10 +164,10 @@ public static void checkAndNormalizeContainerChangeRequest(
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
}
|
||||
|
||||
RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode());
|
||||
|
||||
|
||||
// Target resource of the increase request is more than NM can offer
|
||||
ResourceScheduler scheduler = rmContext.getScheduler();
|
||||
RMNode rmNode = request.getSchedulerNode().getRMNode();
|
||||
if (!Resources.fitsIn(scheduler.getResourceCalculator(),
|
||||
scheduler.getClusterResource(), targetResource,
|
||||
rmNode.getTotalCapability())) {
|
||||
@ -193,9 +176,6 @@ public static void checkAndNormalizeContainerChangeRequest(
|
||||
+ rmNode.getTotalCapability();
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
|
||||
// Update normalized target resource
|
||||
request.setCapability(targetResource);
|
||||
}
|
||||
|
||||
/*
|
||||
@ -253,7 +233,8 @@ private static void checkDuplicatedIncreaseDecreaseRequest(
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
// Sanity check and normalize target resource
|
||||
private static void validateIncreaseDecreaseRequest(RMContext rmContext,
|
||||
List<ContainerResourceChangeRequest> requests, Resource maximumAllocation,
|
||||
boolean increase)
|
||||
@ -283,8 +264,23 @@ private static void validateIncreaseDecreaseRequest(RMContext rmContext,
|
||||
+ request.getCapability().getVirtualCores() + ", maxVirtualCores="
|
||||
+ maximumAllocation.getVirtualCores());
|
||||
}
|
||||
|
||||
checkAndNormalizeContainerChangeRequest(rmContext, request, increase);
|
||||
ContainerId containerId = request.getContainerId();
|
||||
ResourceScheduler scheduler = rmContext.getScheduler();
|
||||
RMContainer rmContainer = scheduler.getRMContainer(containerId);
|
||||
if (null == rmContainer) {
|
||||
String msg =
|
||||
"Failed to get rmContainer for "
|
||||
+ (increase ? "increase" : "decrease")
|
||||
+ " request, with container-id=" + containerId;
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
ResourceCalculator rc = scheduler.getResourceCalculator();
|
||||
Resource targetResource = Resources.normalize(rc, request.getCapability(),
|
||||
scheduler.getMinimumResourceCapability(),
|
||||
scheduler.getMaximumResourceCapability(),
|
||||
scheduler.getMinimumResourceCapability());
|
||||
// Update normalized target resource
|
||||
request.setCapability(targetResource);
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -22,7 +22,6 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.EnumSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
@ -55,13 +54,13 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent;
|
||||
@ -74,6 +73,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent;
|
||||
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent;
|
||||
@ -618,28 +618,20 @@ protected void releaseContainers(List<ContainerId> containers,
|
||||
SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
protected void decreaseContainers(
|
||||
List<SchedContainerChangeRequest> decreaseRequests,
|
||||
List<ContainerResourceChangeRequest> decreaseRequests,
|
||||
SchedulerApplicationAttempt attempt) {
|
||||
for (SchedContainerChangeRequest request : decreaseRequests) {
|
||||
if (null == decreaseRequests || decreaseRequests.isEmpty()) {
|
||||
return;
|
||||
}
|
||||
// Pre-process decrease requests
|
||||
List<SchedContainerChangeRequest> schedDecreaseRequests =
|
||||
createSchedContainerChangeRequests(decreaseRequests, false);
|
||||
for (SchedContainerChangeRequest request : schedDecreaseRequests) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Processing decrease request:" + request);
|
||||
}
|
||||
|
||||
boolean hasIncreaseRequest =
|
||||
attempt.removeIncreaseRequest(request.getNodeId(),
|
||||
request.getPriority(), request.getContainerId());
|
||||
|
||||
if (hasIncreaseRequest) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("While processing decrease request, found a increase request "
|
||||
+ "for the same container "
|
||||
+ request.getContainerId()
|
||||
+ ", removed the increase request");
|
||||
}
|
||||
}
|
||||
|
||||
// handle decrease request
|
||||
decreaseContainer(request, attempt);
|
||||
}
|
||||
@ -877,7 +869,7 @@ public synchronized void setClusterMaxPriority(Configuration conf)
|
||||
}
|
||||
|
||||
/**
|
||||
* Normalize container increase/decrease request, and return
|
||||
* Sanity check increase/decrease request, and return
|
||||
* SchedulerContainerResourceChangeRequest according to given
|
||||
* ContainerResourceChangeRequest.
|
||||
*
|
||||
@ -886,37 +878,34 @@ public synchronized void setClusterMaxPriority(Configuration conf)
|
||||
* - Throw exception when any other error happens
|
||||
* </pre>
|
||||
*/
|
||||
private SchedContainerChangeRequest
|
||||
checkAndNormalizeContainerChangeRequest(
|
||||
ContainerResourceChangeRequest request, boolean increase)
|
||||
throws YarnException {
|
||||
// We have done a check in ApplicationMasterService, but RMContainer status
|
||||
// / Node resource could change since AMS won't acquire lock of scheduler.
|
||||
RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request,
|
||||
increase);
|
||||
private SchedContainerChangeRequest createSchedContainerChangeRequest(
|
||||
ContainerResourceChangeRequest request, boolean increase)
|
||||
throws YarnException {
|
||||
ContainerId containerId = request.getContainerId();
|
||||
RMContainer rmContainer = getRMContainer(containerId);
|
||||
if (null == rmContainer) {
|
||||
String msg =
|
||||
"Failed to get rmContainer for "
|
||||
+ (increase ? "increase" : "decrease")
|
||||
+ " request, with container-id=" + containerId;
|
||||
throw new InvalidResourceRequestException(msg);
|
||||
}
|
||||
SchedulerNode schedulerNode =
|
||||
getSchedulerNode(rmContainer.getAllocatedNode());
|
||||
|
||||
return new SchedContainerChangeRequest(schedulerNode, rmContainer,
|
||||
request.getCapability());
|
||||
return new SchedContainerChangeRequest(
|
||||
this.rmContext, schedulerNode, rmContainer, request.getCapability());
|
||||
}
|
||||
|
||||
protected List<SchedContainerChangeRequest>
|
||||
checkAndNormalizeContainerChangeRequests(
|
||||
createSchedContainerChangeRequests(
|
||||
List<ContainerResourceChangeRequest> changeRequests,
|
||||
boolean increase) {
|
||||
if (null == changeRequests || changeRequests.isEmpty()) {
|
||||
return Collections.EMPTY_LIST;
|
||||
}
|
||||
|
||||
List<SchedContainerChangeRequest> schedulerChangeRequests =
|
||||
new ArrayList<SchedContainerChangeRequest>();
|
||||
for (ContainerResourceChangeRequest r : changeRequests) {
|
||||
SchedContainerChangeRequest sr = null;
|
||||
try {
|
||||
sr = checkAndNormalizeContainerChangeRequest(r, increase);
|
||||
sr = createSchedContainerChangeRequest(r, increase);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("Error happens when checking increase request, Ignoring.."
|
||||
+ " exception=", e);
|
||||
@ -924,7 +913,6 @@ public synchronized void setClusterMaxPriority(Configuration conf)
|
||||
}
|
||||
schedulerChangeRequests.add(sr);
|
||||
}
|
||||
|
||||
return schedulerChangeRequests;
|
||||
}
|
||||
}
|
||||
|
@ -44,6 +44,8 @@
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -148,6 +150,18 @@ public synchronized boolean updateIncreaseRequests(
|
||||
boolean resourceUpdated = false;
|
||||
|
||||
for (SchedContainerChangeRequest r : increaseRequests) {
|
||||
if (r.getRMContainer().getState() != RMContainerState.RUNNING) {
|
||||
LOG.warn("rmContainer's state is not RUNNING, for increase request with"
|
||||
+ " container-id=" + r.getContainerId());
|
||||
continue;
|
||||
}
|
||||
try {
|
||||
RMServerUtils.checkSchedContainerChangeRequest(r, true);
|
||||
} catch (YarnException e) {
|
||||
LOG.warn("Error happens when checking increase request, Ignoring.."
|
||||
+ " exception=", e);
|
||||
continue;
|
||||
}
|
||||
NodeId nodeId = r.getRMContainer().getAllocatedNode();
|
||||
|
||||
Map<Priority, Map<ContainerId, SchedContainerChangeRequest>> requestsOnNode =
|
||||
@ -221,7 +235,7 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) {
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Added increase request:" + request.getContainerId()
|
||||
+ " delta=" + request.getDeltaCapacity());
|
||||
+ " delta=" + delta);
|
||||
}
|
||||
|
||||
// update priorities
|
||||
@ -520,24 +534,20 @@ public synchronized void increaseContainer(
|
||||
NodeId nodeId = increaseRequest.getNodeId();
|
||||
Priority priority = increaseRequest.getPriority();
|
||||
ContainerId containerId = increaseRequest.getContainerId();
|
||||
|
||||
Resource deltaCapacity = increaseRequest.getDeltaCapacity();
|
||||
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("allocated increase request : applicationId=" + applicationId
|
||||
+ " container=" + containerId + " host="
|
||||
+ increaseRequest.getNodeId() + " user=" + user + " resource="
|
||||
+ increaseRequest.getDeltaCapacity());
|
||||
+ deltaCapacity);
|
||||
}
|
||||
|
||||
// Set queue metrics
|
||||
queue.getMetrics().allocateResources(user,
|
||||
increaseRequest.getDeltaCapacity());
|
||||
|
||||
queue.getMetrics().allocateResources(user, deltaCapacity);
|
||||
// remove the increase request from pending increase request map
|
||||
removeIncreaseRequest(nodeId, priority, containerId);
|
||||
|
||||
// update usage
|
||||
appResourceUsage.incUsed(increaseRequest.getNodePartition(),
|
||||
increaseRequest.getDeltaCapacity());
|
||||
appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity);
|
||||
}
|
||||
|
||||
public synchronized void decreaseContainer(
|
||||
|
@ -22,6 +22,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.NodeId;
|
||||
import org.apache.hadoop.yarn.api.records.Priority;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
|
||||
@ -32,18 +33,19 @@
|
||||
*/
|
||||
public class SchedContainerChangeRequest implements
|
||||
Comparable<SchedContainerChangeRequest> {
|
||||
RMContainer rmContainer;
|
||||
Resource targetCapacity;
|
||||
SchedulerNode schedulerNode;
|
||||
Resource deltaCapacity;
|
||||
private RMContext rmContext;
|
||||
private RMContainer rmContainer;
|
||||
private Resource targetCapacity;
|
||||
private SchedulerNode schedulerNode;
|
||||
private Resource deltaCapacity;
|
||||
|
||||
public SchedContainerChangeRequest(SchedulerNode schedulerNode,
|
||||
public SchedContainerChangeRequest(
|
||||
RMContext rmContext, SchedulerNode schedulerNode,
|
||||
RMContainer rmContainer, Resource targetCapacity) {
|
||||
this.rmContext = rmContext;
|
||||
this.rmContainer = rmContainer;
|
||||
this.targetCapacity = targetCapacity;
|
||||
this.schedulerNode = schedulerNode;
|
||||
deltaCapacity = Resources.subtract(targetCapacity,
|
||||
rmContainer.getAllocatedResource());
|
||||
}
|
||||
|
||||
public NodeId getNodeId() {
|
||||
@ -58,11 +60,19 @@ public Resource getTargetCapacity() {
|
||||
return this.targetCapacity;
|
||||
}
|
||||
|
||||
public RMContext getRmContext() {
|
||||
return this.rmContext;
|
||||
}
|
||||
/**
|
||||
* Delta capacity = before - target, so if it is a decrease request, delta
|
||||
* Delta capacity = target - before, so if it is a decrease request, delta
|
||||
* capacity will be negative
|
||||
*/
|
||||
public Resource getDeltaCapacity() {
|
||||
public synchronized Resource getDeltaCapacity() {
|
||||
// Only calculate deltaCapacity once
|
||||
if (deltaCapacity == null) {
|
||||
deltaCapacity = Resources.subtract(
|
||||
targetCapacity, rmContainer.getAllocatedResource());
|
||||
}
|
||||
return deltaCapacity;
|
||||
}
|
||||
|
||||
@ -81,7 +91,7 @@ public String getNodePartition() {
|
||||
public SchedulerNode getSchedulerNode() {
|
||||
return schedulerNode;
|
||||
}
|
||||
|
||||
|
||||
@Override
|
||||
public int hashCode() {
|
||||
return (getContainerId().hashCode() << 16) + targetCapacity.hashCode();
|
||||
@ -112,7 +122,6 @@ public int compareTo(SchedContainerChangeRequest other) {
|
||||
@Override
|
||||
public String toString() {
|
||||
return "<container=" + getContainerId() + ", targetCapacity="
|
||||
+ targetCapacity + ", delta=" + deltaCapacity + ", node="
|
||||
+ getNodeId().toString() + ">";
|
||||
+ targetCapacity + ", node=" + getNodeId().toString() + ">";
|
||||
}
|
||||
}
|
||||
|
@ -33,6 +33,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.QueueACL;
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.security.PrivilegedEntity;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
@ -332,7 +333,7 @@ public void decUsedResource(String nodePartition, Resource resourceToDec,
|
||||
*/
|
||||
public void decreaseContainer(Resource clusterResource,
|
||||
SchedContainerChangeRequest decreaseRequest,
|
||||
FiCaSchedulerApp app);
|
||||
FiCaSchedulerApp app) throws InvalidResourceRequestException;
|
||||
|
||||
/**
|
||||
* Get valid Node Labels for this queue
|
||||
|
@ -21,8 +21,8 @@
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.Comparator;
|
||||
import java.util.EnumSet;
|
||||
import java.util.HashMap;
|
||||
@ -65,6 +65,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.ResourceOption;
|
||||
import org.apache.hadoop.yarn.api.records.ResourceRequest;
|
||||
import org.apache.hadoop.yarn.conf.YarnConfiguration;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnException;
|
||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||
import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes;
|
||||
@ -895,9 +896,36 @@ private synchronized void doneApplicationAttempt(
|
||||
}
|
||||
}
|
||||
|
||||
// It is crucial to acquire leaf queue lock first to prevent:
|
||||
// 1. Race condition when calculating the delta resource in
|
||||
// SchedContainerChangeRequest
|
||||
// 2. Deadlock with the scheduling thread.
|
||||
private LeafQueue updateIncreaseRequests(
|
||||
List<ContainerResourceChangeRequest> increaseRequests,
|
||||
FiCaSchedulerApp app) {
|
||||
if (null == increaseRequests || increaseRequests.isEmpty()) {
|
||||
return null;
|
||||
}
|
||||
// Pre-process increase requests
|
||||
List<SchedContainerChangeRequest> schedIncreaseRequests =
|
||||
createSchedContainerChangeRequests(increaseRequests, true);
|
||||
LeafQueue leafQueue = (LeafQueue) app.getQueue();
|
||||
synchronized(leafQueue) {
|
||||
// make sure we aren't stopping/removing the application
|
||||
// when the allocate comes in
|
||||
if (app.isStopped()) {
|
||||
return null;
|
||||
}
|
||||
// Process increase resource requests
|
||||
if (app.updateIncreaseRequests(schedIncreaseRequests)) {
|
||||
return leafQueue;
|
||||
}
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
// Note: when AM asks to decrease container or release container, we will
|
||||
// acquire scheduler lock
|
||||
// Note: when AM asks to release container, we will acquire scheduler lock
|
||||
@Lock(Lock.NoLock.class)
|
||||
public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||
List<ResourceRequest> ask, List<ContainerId> release,
|
||||
@ -909,26 +937,23 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId,
|
||||
if (application == null) {
|
||||
return EMPTY_ALLOCATION;
|
||||
}
|
||||
|
||||
// Sanity check
|
||||
SchedulerUtils.normalizeRequests(
|
||||
ask, getResourceCalculator(), getClusterResource(),
|
||||
getMinimumResourceCapability(), getMaximumResourceCapability());
|
||||
|
||||
// Pre-process increase requests
|
||||
List<SchedContainerChangeRequest> normalizedIncreaseRequests =
|
||||
checkAndNormalizeContainerChangeRequests(increaseRequests, true);
|
||||
|
||||
// Pre-process decrease requests
|
||||
List<SchedContainerChangeRequest> normalizedDecreaseRequests =
|
||||
checkAndNormalizeContainerChangeRequests(decreaseRequests, false);
|
||||
|
||||
// Release containers
|
||||
releaseContainers(release, application);
|
||||
|
||||
Allocation allocation;
|
||||
// update increase requests
|
||||
LeafQueue updateDemandForQueue =
|
||||
updateIncreaseRequests(increaseRequests, application);
|
||||
|
||||
LeafQueue updateDemandForQueue = null;
|
||||
// Decrease containers
|
||||
decreaseContainers(decreaseRequests, application);
|
||||
|
||||
// Sanity check for new allocation requests
|
||||
SchedulerUtils.normalizeRequests(
|
||||
ask, getResourceCalculator(), getClusterResource(),
|
||||
getMinimumResourceCapability(), getMaximumResourceCapability());
|
||||
|
||||
Allocation allocation;
|
||||
|
||||
synchronized (application) {
|
||||
|
||||
@ -947,7 +972,8 @@ ask, getResourceCalculator(), getClusterResource(),
|
||||
}
|
||||
|
||||
// Update application requests
|
||||
if (application.updateResourceRequests(ask)) {
|
||||
if (application.updateResourceRequests(ask)
|
||||
&& (updateDemandForQueue == null)) {
|
||||
updateDemandForQueue = (LeafQueue) application.getQueue();
|
||||
}
|
||||
|
||||
@ -957,12 +983,6 @@ ask, getResourceCalculator(), getClusterResource(),
|
||||
}
|
||||
}
|
||||
|
||||
// Process increase resource requests
|
||||
if (application.updateIncreaseRequests(normalizedIncreaseRequests)
|
||||
&& (updateDemandForQueue == null)) {
|
||||
updateDemandForQueue = (LeafQueue) application.getQueue();
|
||||
}
|
||||
|
||||
if (application.isWaitingForAMContainer()) {
|
||||
// Allocate is for AM and update AM blacklist for this
|
||||
application.updateAMBlacklist(
|
||||
@ -971,8 +991,6 @@ ask, getResourceCalculator(), getClusterResource(),
|
||||
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
|
||||
}
|
||||
|
||||
// Decrease containers
|
||||
decreaseContainers(normalizedDecreaseRequests, application);
|
||||
|
||||
allocation = application.getAllocation(getResourceCalculator(),
|
||||
clusterResource, getMinimumResourceCapability());
|
||||
@ -1167,7 +1185,8 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node,
|
||||
.getAssignmentInformation().getReserved());
|
||||
}
|
||||
|
||||
private synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
@VisibleForTesting
|
||||
protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
if (rmContext.isWorkPreservingRecoveryEnabled()
|
||||
&& !rmContext.isSchedulerReadyForAllocatingContainers()) {
|
||||
return;
|
||||
@ -1517,48 +1536,30 @@ protected synchronized void completedContainerInternal(
|
||||
}
|
||||
}
|
||||
|
||||
@Lock(CapacityScheduler.class)
|
||||
@Override
|
||||
protected synchronized void decreaseContainer(
|
||||
SchedContainerChangeRequest decreaseRequest,
|
||||
protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest,
|
||||
SchedulerApplicationAttempt attempt) {
|
||||
RMContainer rmContainer = decreaseRequest.getRMContainer();
|
||||
|
||||
// Check container status before doing decrease
|
||||
if (rmContainer.getState() != RMContainerState.RUNNING) {
|
||||
LOG.info("Trying to decrease a container not in RUNNING state, container="
|
||||
+ rmContainer + " state=" + rmContainer.getState().name());
|
||||
return;
|
||||
}
|
||||
|
||||
// Delta capacity of this decrease request is 0, this decrease request may
|
||||
// just to cancel increase request
|
||||
if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Decrease target resource equals to existing resource for container:"
|
||||
+ decreaseRequest.getContainerId()
|
||||
+ " ignore this decrease request.");
|
||||
}
|
||||
return;
|
||||
}
|
||||
|
||||
// Save resource before decrease
|
||||
Resource resourceBeforeDecrease =
|
||||
Resources.clone(rmContainer.getContainer().getResource());
|
||||
|
||||
FiCaSchedulerApp app = (FiCaSchedulerApp)attempt;
|
||||
LeafQueue queue = (LeafQueue) attempt.getQueue();
|
||||
queue.decreaseContainer(clusterResource, decreaseRequest, app);
|
||||
|
||||
// Notify RMNode the container will be decreased
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(),
|
||||
Arrays.asList(rmContainer.getContainer())));
|
||||
|
||||
LOG.info("Application attempt " + app.getApplicationAttemptId()
|
||||
+ " decreased container:" + decreaseRequest.getContainerId() + " from "
|
||||
+ resourceBeforeDecrease + " to "
|
||||
+ decreaseRequest.getTargetCapacity());
|
||||
try {
|
||||
queue.decreaseContainer(clusterResource, decreaseRequest, app);
|
||||
// Notify RMNode that the container can be pulled by NodeManager in the
|
||||
// next heartbeat
|
||||
this.rmContext.getDispatcher().getEventHandler()
|
||||
.handle(new RMNodeDecreaseContainerEvent(
|
||||
decreaseRequest.getNodeId(),
|
||||
Collections.singletonList(rmContainer.getContainer())));
|
||||
} catch (InvalidResourceRequestException e) {
|
||||
LOG.warn("Error happens when checking decrease request, Ignoring.."
|
||||
+ " exception=", e);
|
||||
}
|
||||
}
|
||||
|
||||
@Lock(Lock.NoLock.class)
|
||||
|
@ -47,10 +47,12 @@
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType;
|
||||
@ -1676,11 +1678,17 @@ public synchronized void setOrderingPolicy(
|
||||
public Priority getDefaultApplicationPriority() {
|
||||
return defaultAppPriorityPerQueue;
|
||||
}
|
||||
|
||||
|
||||
/**
|
||||
*
|
||||
* @param clusterResource Total cluster resource
|
||||
* @param decreaseRequest The decrease request
|
||||
* @param app The application of interest
|
||||
*/
|
||||
@Override
|
||||
public void decreaseContainer(Resource clusterResource,
|
||||
SchedContainerChangeRequest decreaseRequest,
|
||||
FiCaSchedulerApp app) {
|
||||
FiCaSchedulerApp app) throws InvalidResourceRequestException {
|
||||
// If the container being decreased is reserved, we need to unreserve it
|
||||
// first.
|
||||
RMContainer rmContainer = decreaseRequest.getRMContainer();
|
||||
@ -1688,25 +1696,62 @@ public void decreaseContainer(Resource clusterResource,
|
||||
unreserveIncreasedContainer(clusterResource, app,
|
||||
(FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer);
|
||||
}
|
||||
|
||||
// Delta capacity is negative when it's a decrease request
|
||||
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
|
||||
|
||||
boolean resourceDecreased = false;
|
||||
Resource resourceBeforeDecrease;
|
||||
// Grab queue lock to avoid race condition when getting container resource
|
||||
synchronized (this) {
|
||||
// Delta is negative when it's a decrease request
|
||||
releaseResource(clusterResource, app, absDelta,
|
||||
decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(),
|
||||
true);
|
||||
// Notify application
|
||||
app.decreaseContainer(decreaseRequest);
|
||||
// Notify node
|
||||
decreaseRequest.getSchedulerNode()
|
||||
.decreaseContainer(decreaseRequest.getContainerId(), absDelta);
|
||||
// Make sure the decrease request is valid in terms of current resource
|
||||
// and target resource. This must be done under the leaf queue lock.
|
||||
// Throws exception if the check fails.
|
||||
RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false);
|
||||
// Save resource before decrease for debug log
|
||||
resourceBeforeDecrease =
|
||||
Resources.clone(rmContainer.getAllocatedResource());
|
||||
// Do we have increase request for the same container? If so, remove it
|
||||
boolean hasIncreaseRequest =
|
||||
app.removeIncreaseRequest(decreaseRequest.getNodeId(),
|
||||
decreaseRequest.getPriority(), decreaseRequest.getContainerId());
|
||||
if (hasIncreaseRequest) {
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("While processing decrease requests, found an increase"
|
||||
+ " request for the same container "
|
||||
+ decreaseRequest.getContainerId()
|
||||
+ ", removed the increase request");
|
||||
}
|
||||
}
|
||||
// Delta capacity is negative when it's a decrease request
|
||||
Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity());
|
||||
if (Resources.equals(absDelta, Resources.none())) {
|
||||
// If delta capacity of this decrease request is 0, this decrease
|
||||
// request serves the purpose of cancelling an existing increase request
|
||||
// if any
|
||||
if (LOG.isDebugEnabled()) {
|
||||
LOG.debug("Decrease target resource equals to existing resource for"
|
||||
+ " container:" + decreaseRequest.getContainerId()
|
||||
+ " ignore this decrease request.");
|
||||
}
|
||||
} else {
|
||||
// Release the delta resource
|
||||
releaseResource(clusterResource, app, absDelta,
|
||||
decreaseRequest.getNodePartition(),
|
||||
decreaseRequest.getRMContainer(),
|
||||
true);
|
||||
// Notify application
|
||||
app.decreaseContainer(decreaseRequest);
|
||||
// Notify node
|
||||
decreaseRequest.getSchedulerNode()
|
||||
.decreaseContainer(decreaseRequest.getContainerId(), absDelta);
|
||||
resourceDecreased = true;
|
||||
}
|
||||
}
|
||||
|
||||
// Notify parent
|
||||
if (getParent() != null) {
|
||||
if (resourceDecreased) {
|
||||
// Notify parent queue outside of leaf queue lock
|
||||
getParent().decreaseContainer(clusterResource, decreaseRequest, app);
|
||||
LOG.info("Application attempt " + app.getApplicationAttemptId()
|
||||
+ " decreased container:" + decreaseRequest.getContainerId()
|
||||
+ " from " + resourceBeforeDecrease + " to "
|
||||
+ decreaseRequest.getTargetCapacity());
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -46,6 +46,7 @@
|
||||
import org.apache.hadoop.yarn.api.records.QueueState;
|
||||
import org.apache.hadoop.yarn.api.records.QueueUserACLInfo;
|
||||
import org.apache.hadoop.yarn.api.records.Resource;
|
||||
import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException;
|
||||
import org.apache.hadoop.yarn.factories.RecordFactory;
|
||||
import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider;
|
||||
import org.apache.hadoop.yarn.security.AccessType;
|
||||
@ -656,7 +657,8 @@ private synchronized void internalReleaseResource(Resource clusterResource,
|
||||
|
||||
@Override
|
||||
public void decreaseContainer(Resource clusterResource,
|
||||
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) {
|
||||
SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app)
|
||||
throws InvalidResourceRequestException {
|
||||
// delta capacity is negative when it's a decrease request
|
||||
Resource absDeltaCapacity =
|
||||
Resources.negate(decreaseRequest.getDeltaCapacity());
|
||||
|
@ -21,8 +21,11 @@
|
||||
import java.util.ArrayList;
|
||||
import java.util.Arrays;
|
||||
import java.util.Collection;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
|
||||
import org.apache.commons.logging.Log;
|
||||
import org.apache.commons.logging.LogFactory;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationAttemptId;
|
||||
import org.apache.hadoop.yarn.api.records.ApplicationId;
|
||||
@ -47,8 +50,12 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler
|
||||
.SchedulerApplicationAttempt;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica
|
||||
.FiCaSchedulerNode;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent;
|
||||
import org.apache.hadoop.yarn.util.resource.Resources;
|
||||
@ -57,12 +64,48 @@
|
||||
import org.junit.Test;
|
||||
|
||||
public class TestContainerResizing {
|
||||
private static final Log LOG = LogFactory.getLog(TestContainerResizing.class);
|
||||
private final int GB = 1024;
|
||||
|
||||
private YarnConfiguration conf;
|
||||
|
||||
RMNodeLabelsManager mgr;
|
||||
|
||||
class MyScheduler extends CapacityScheduler {
|
||||
/*
|
||||
* A Mock Scheduler to simulate the potential effect of deadlock between:
|
||||
* 1. The AbstractYarnScheduler.decreaseContainers() call (from
|
||||
* ApplicationMasterService thread)
|
||||
* 2. The CapacityScheduler.allocateContainersToNode() call (from the
|
||||
* scheduler thread)
|
||||
*/
|
||||
MyScheduler() {
|
||||
super();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void decreaseContainers(
|
||||
List<ContainerResourceChangeRequest> decreaseRequests,
|
||||
SchedulerApplicationAttempt attempt) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch(InterruptedException e) {
|
||||
LOG.debug("Thread interrupted.");
|
||||
}
|
||||
super.decreaseContainers(decreaseRequests, attempt);
|
||||
}
|
||||
|
||||
@Override
|
||||
public synchronized void allocateContainersToNode(FiCaSchedulerNode node) {
|
||||
try {
|
||||
Thread.sleep(1000);
|
||||
} catch(InterruptedException e) {
|
||||
LOG.debug("Thread interrupted.");
|
||||
}
|
||||
super.allocateContainersToNode(node);
|
||||
}
|
||||
}
|
||||
|
||||
@Before
|
||||
public void setUp() throws Exception {
|
||||
conf = new YarnConfiguration();
|
||||
@ -958,6 +1001,50 @@ public RMNodeLabelsManager createNodeLabelManager() {
|
||||
rm1.close();
|
||||
}
|
||||
|
||||
@Test (timeout = 60000)
|
||||
public void testDecreaseContainerWillNotDeadlockContainerAllocation()
|
||||
throws Exception {
|
||||
// create and start MockRM with our MyScheduler
|
||||
MockRM rm = new MockRM() {
|
||||
@Override
|
||||
public ResourceScheduler createScheduler() {
|
||||
CapacityScheduler cs = new MyScheduler();
|
||||
cs.setConf(conf);
|
||||
return cs;
|
||||
}
|
||||
};
|
||||
rm.start();
|
||||
// register a node
|
||||
MockNM nm = rm.registerNode("h1:1234", 20 * GB);
|
||||
// submit an application -> app1
|
||||
RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default");
|
||||
MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm);
|
||||
// making sure resource is allocated
|
||||
checkUsedResource(rm, "default", 3 * GB, null);
|
||||
FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId());
|
||||
Assert.assertEquals(3 * GB,
|
||||
app.getAppAttemptResourceUsage().getUsed().getMemory());
|
||||
// making sure container is launched
|
||||
ContainerId containerId1 =
|
||||
ContainerId.newContainerId(am1.getApplicationAttemptId(), 1);
|
||||
sentRMContainerLaunched(rm, containerId1);
|
||||
// submit allocation request for a new container
|
||||
am1.allocate(Collections.singletonList(ResourceRequest.newInstance(
|
||||
Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)),
|
||||
null);
|
||||
// nm reports status update and triggers container allocation
|
||||
nm.nodeHeartbeat(true);
|
||||
// *In the mean time*, am1 asks to decrease its AM container resource from
|
||||
// 3GB to 1GB
|
||||
AllocateResponse response = am1.sendContainerResizingRequest(null,
|
||||
Collections.singletonList(ContainerResourceChangeRequest
|
||||
.newInstance(containerId1, Resources.createResource(GB))));
|
||||
// verify that the containe resource is decreased
|
||||
verifyContainerDecreased(response, containerId1, GB);
|
||||
|
||||
rm.close();
|
||||
}
|
||||
|
||||
private void checkPendingResource(MockRM rm, String queueName, int memory,
|
||||
String label) {
|
||||
CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();
|
||||
|
Loading…
Reference in New Issue
Block a user