diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 8eaed428c3..ee57e4b036 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -170,6 +170,9 @@ Release 2.9.0 - UNRELEASED YARN-4633. Fix random test failure in TestRMRestart#testRMRestartAfterPreemption (Bibin A Chundatt via rohithsharmaks) + YARN-4519. Potential deadlock of CapacityScheduler between decrease container + and assign containers. (Meng Ding via jianhe) + Release 2.8.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java index cc305931dc..e19d55ee81 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/RMServerUtils.java @@ -53,9 +53,10 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.attempt.RMAppAttemptState; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; -import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedContainerChangeRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerUtils; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.YarnScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -114,43 +115,25 @@ public static void normalizeAndValidateRequests(List ask, queueName, scheduler, rmContext, queueInfo); } } - + /** - * Normalize container increase/decrease request, it will normalize and update - * ContainerResourceChangeRequest.targetResource + * Validate increase/decrease request. This function must be called under + * the queue lock to make sure that the access to container resource is + * atomic. Refer to LeafQueue.decreaseContainer() and + * CapacityScheduelr.updateIncreaseRequests() + * * *
    * - Throw exception when any other error happens
    * 
*/ - public static void checkAndNormalizeContainerChangeRequest( - RMContext rmContext, ContainerResourceChangeRequest request, - boolean increase) throws InvalidResourceRequestException { + public static void checkSchedContainerChangeRequest( + SchedContainerChangeRequest request, boolean increase) + throws InvalidResourceRequestException { + RMContext rmContext = request.getRmContext(); ContainerId containerId = request.getContainerId(); - ResourceScheduler scheduler = rmContext.getScheduler(); - RMContainer rmContainer = scheduler.getRMContainer(containerId); - ResourceCalculator rc = scheduler.getResourceCalculator(); - - if (null == rmContainer) { - String msg = - "Failed to get rmContainer for " - + (increase ? "increase" : "decrease") - + " request, with container-id=" + containerId; - throw new InvalidResourceRequestException(msg); - } - - if (rmContainer.getState() != RMContainerState.RUNNING) { - String msg = - "rmContainer's state is not RUNNING, for " - + (increase ? "increase" : "decrease") - + " request, with container-id=" + containerId; - throw new InvalidResourceRequestException(msg); - } - - Resource targetResource = Resources.normalize(rc, request.getCapability(), - scheduler.getMinimumResourceCapability(), - scheduler.getMaximumResourceCapability(), - scheduler.getMinimumResourceCapability()); + RMContainer rmContainer = request.getRMContainer(); + Resource targetResource = request.getTargetCapacity(); // Compare targetResource and original resource Resource originalResource = rmContainer.getAllocatedResource(); @@ -181,10 +164,10 @@ public static void checkAndNormalizeContainerChangeRequest( throw new InvalidResourceRequestException(msg); } } - - RMNode rmNode = rmContext.getRMNodes().get(rmContainer.getAllocatedNode()); - + // Target resource of the increase request is more than NM can offer + ResourceScheduler scheduler = rmContext.getScheduler(); + RMNode rmNode = request.getSchedulerNode().getRMNode(); if (!Resources.fitsIn(scheduler.getResourceCalculator(), scheduler.getClusterResource(), targetResource, rmNode.getTotalCapability())) { @@ -193,9 +176,6 @@ public static void checkAndNormalizeContainerChangeRequest( + rmNode.getTotalCapability(); throw new InvalidResourceRequestException(msg); } - - // Update normalized target resource - request.setCapability(targetResource); } /* @@ -253,7 +233,8 @@ private static void checkDuplicatedIncreaseDecreaseRequest( } } } - + + // Sanity check and normalize target resource private static void validateIncreaseDecreaseRequest(RMContext rmContext, List requests, Resource maximumAllocation, boolean increase) @@ -283,8 +264,23 @@ private static void validateIncreaseDecreaseRequest(RMContext rmContext, + request.getCapability().getVirtualCores() + ", maxVirtualCores=" + maximumAllocation.getVirtualCores()); } - - checkAndNormalizeContainerChangeRequest(rmContext, request, increase); + ContainerId containerId = request.getContainerId(); + ResourceScheduler scheduler = rmContext.getScheduler(); + RMContainer rmContainer = scheduler.getRMContainer(containerId); + if (null == rmContainer) { + String msg = + "Failed to get rmContainer for " + + (increase ? "increase" : "decrease") + + " request, with container-id=" + containerId; + throw new InvalidResourceRequestException(msg); + } + ResourceCalculator rc = scheduler.getResourceCalculator(); + Resource targetResource = Resources.normalize(rc, request.getCapability(), + scheduler.getMinimumResourceCapability(), + scheduler.getMaximumResourceCapability(), + scheduler.getMinimumResourceCapability()); + // Update normalized target resource + request.setCapability(targetResource); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java index 41a04f23da..27d4f91bc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AbstractYarnScheduler.java @@ -22,7 +22,6 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; -import java.util.Collections; import java.util.EnumSet; import java.util.List; import java.util.Map; @@ -55,13 +54,13 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; import org.apache.hadoop.yarn.server.api.protocolrecords.NMContainerStatus; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger; import org.apache.hadoop.yarn.server.resourcemanager.RMAuditLogger.AuditConstants; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; -import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.ResourceManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMApp; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppEvent; @@ -74,6 +73,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerFinishedEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerRecoverEvent; + import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeCleanContainerEvent; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeDecreaseContainerEvent; @@ -618,28 +618,20 @@ protected void releaseContainers(List containers, SchedulerUtils.RELEASED_CONTAINER), RMContainerEventType.RELEASED); } } - + protected void decreaseContainers( - List decreaseRequests, + List decreaseRequests, SchedulerApplicationAttempt attempt) { - for (SchedContainerChangeRequest request : decreaseRequests) { + if (null == decreaseRequests || decreaseRequests.isEmpty()) { + return; + } + // Pre-process decrease requests + List schedDecreaseRequests = + createSchedContainerChangeRequests(decreaseRequests, false); + for (SchedContainerChangeRequest request : schedDecreaseRequests) { if (LOG.isDebugEnabled()) { LOG.debug("Processing decrease request:" + request); } - - boolean hasIncreaseRequest = - attempt.removeIncreaseRequest(request.getNodeId(), - request.getPriority(), request.getContainerId()); - - if (hasIncreaseRequest) { - if (LOG.isDebugEnabled()) { - LOG.debug("While processing decrease request, found a increase request " - + "for the same container " - + request.getContainerId() - + ", removed the increase request"); - } - } - // handle decrease request decreaseContainer(request, attempt); } @@ -877,7 +869,7 @@ public synchronized void setClusterMaxPriority(Configuration conf) } /** - * Normalize container increase/decrease request, and return + * Sanity check increase/decrease request, and return * SchedulerContainerResourceChangeRequest according to given * ContainerResourceChangeRequest. * @@ -886,37 +878,34 @@ public synchronized void setClusterMaxPriority(Configuration conf) * - Throw exception when any other error happens * */ - private SchedContainerChangeRequest - checkAndNormalizeContainerChangeRequest( - ContainerResourceChangeRequest request, boolean increase) - throws YarnException { - // We have done a check in ApplicationMasterService, but RMContainer status - // / Node resource could change since AMS won't acquire lock of scheduler. - RMServerUtils.checkAndNormalizeContainerChangeRequest(rmContext, request, - increase); + private SchedContainerChangeRequest createSchedContainerChangeRequest( + ContainerResourceChangeRequest request, boolean increase) + throws YarnException { ContainerId containerId = request.getContainerId(); RMContainer rmContainer = getRMContainer(containerId); + if (null == rmContainer) { + String msg = + "Failed to get rmContainer for " + + (increase ? "increase" : "decrease") + + " request, with container-id=" + containerId; + throw new InvalidResourceRequestException(msg); + } SchedulerNode schedulerNode = getSchedulerNode(rmContainer.getAllocatedNode()); - - return new SchedContainerChangeRequest(schedulerNode, rmContainer, - request.getCapability()); + return new SchedContainerChangeRequest( + this.rmContext, schedulerNode, rmContainer, request.getCapability()); } protected List - checkAndNormalizeContainerChangeRequests( + createSchedContainerChangeRequests( List changeRequests, boolean increase) { - if (null == changeRequests || changeRequests.isEmpty()) { - return Collections.EMPTY_LIST; - } - List schedulerChangeRequests = new ArrayList(); for (ContainerResourceChangeRequest r : changeRequests) { SchedContainerChangeRequest sr = null; try { - sr = checkAndNormalizeContainerChangeRequest(r, increase); + sr = createSchedContainerChangeRequest(r, increase); } catch (YarnException e) { LOG.warn("Error happens when checking increase request, Ignoring.." + " exception=", e); @@ -924,7 +913,6 @@ public synchronized void setClusterMaxPriority(Configuration conf) } schedulerChangeRequests.add(sr); } - return schedulerChangeRequests; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 07f3d8baae..a61001e8be 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -44,6 +44,8 @@ import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceRequest; +import org.apache.hadoop.yarn.exceptions.YarnException; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.util.resource.Resources; @@ -148,6 +150,18 @@ public synchronized boolean updateIncreaseRequests( boolean resourceUpdated = false; for (SchedContainerChangeRequest r : increaseRequests) { + if (r.getRMContainer().getState() != RMContainerState.RUNNING) { + LOG.warn("rmContainer's state is not RUNNING, for increase request with" + + " container-id=" + r.getContainerId()); + continue; + } + try { + RMServerUtils.checkSchedContainerChangeRequest(r, true); + } catch (YarnException e) { + LOG.warn("Error happens when checking increase request, Ignoring.." + + " exception=", e); + continue; + } NodeId nodeId = r.getRMContainer().getAllocatedNode(); Map> requestsOnNode = @@ -221,7 +235,7 @@ private void insertIncreaseRequest(SchedContainerChangeRequest request) { if (LOG.isDebugEnabled()) { LOG.debug("Added increase request:" + request.getContainerId() - + " delta=" + request.getDeltaCapacity()); + + " delta=" + delta); } // update priorities @@ -520,24 +534,20 @@ public synchronized void increaseContainer( NodeId nodeId = increaseRequest.getNodeId(); Priority priority = increaseRequest.getPriority(); ContainerId containerId = increaseRequest.getContainerId(); - + Resource deltaCapacity = increaseRequest.getDeltaCapacity(); + if (LOG.isDebugEnabled()) { LOG.debug("allocated increase request : applicationId=" + applicationId + " container=" + containerId + " host=" + increaseRequest.getNodeId() + " user=" + user + " resource=" - + increaseRequest.getDeltaCapacity()); + + deltaCapacity); } - // Set queue metrics - queue.getMetrics().allocateResources(user, - increaseRequest.getDeltaCapacity()); - + queue.getMetrics().allocateResources(user, deltaCapacity); // remove the increase request from pending increase request map removeIncreaseRequest(nodeId, priority, containerId); - // update usage - appResourceUsage.incUsed(increaseRequest.getNodePartition(), - increaseRequest.getDeltaCapacity()); + appResourceUsage.incUsed(increaseRequest.getNodePartition(), deltaCapacity); } public synchronized void decreaseContainer( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java index ea109fddf1..e4ab3a28d2 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedContainerChangeRequest.java @@ -22,6 +22,7 @@ import org.apache.hadoop.yarn.api.records.NodeId; import org.apache.hadoop.yarn.api.records.Priority; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.util.resource.Resources; @@ -32,18 +33,19 @@ */ public class SchedContainerChangeRequest implements Comparable { - RMContainer rmContainer; - Resource targetCapacity; - SchedulerNode schedulerNode; - Resource deltaCapacity; + private RMContext rmContext; + private RMContainer rmContainer; + private Resource targetCapacity; + private SchedulerNode schedulerNode; + private Resource deltaCapacity; - public SchedContainerChangeRequest(SchedulerNode schedulerNode, + public SchedContainerChangeRequest( + RMContext rmContext, SchedulerNode schedulerNode, RMContainer rmContainer, Resource targetCapacity) { + this.rmContext = rmContext; this.rmContainer = rmContainer; this.targetCapacity = targetCapacity; this.schedulerNode = schedulerNode; - deltaCapacity = Resources.subtract(targetCapacity, - rmContainer.getAllocatedResource()); } public NodeId getNodeId() { @@ -58,11 +60,19 @@ public Resource getTargetCapacity() { return this.targetCapacity; } + public RMContext getRmContext() { + return this.rmContext; + } /** - * Delta capacity = before - target, so if it is a decrease request, delta + * Delta capacity = target - before, so if it is a decrease request, delta * capacity will be negative */ - public Resource getDeltaCapacity() { + public synchronized Resource getDeltaCapacity() { + // Only calculate deltaCapacity once + if (deltaCapacity == null) { + deltaCapacity = Resources.subtract( + targetCapacity, rmContainer.getAllocatedResource()); + } return deltaCapacity; } @@ -81,7 +91,7 @@ public String getNodePartition() { public SchedulerNode getSchedulerNode() { return schedulerNode; } - + @Override public int hashCode() { return (getContainerId().hashCode() << 16) + targetCapacity.hashCode(); @@ -112,7 +122,6 @@ public int compareTo(SchedContainerChangeRequest other) { @Override public String toString() { return ""; + + targetCapacity + ", node=" + getNodeId().toString() + ">"; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java index 6ffba02439..daf77903fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueue.java @@ -33,6 +33,7 @@ import org.apache.hadoop.yarn.api.records.QueueACL; import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.security.PrivilegedEntity; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -332,7 +333,7 @@ public void decUsedResource(String nodePartition, Resource resourceToDec, */ public void decreaseContainer(Resource clusterResource, SchedContainerChangeRequest decreaseRequest, - FiCaSchedulerApp app); + FiCaSchedulerApp app) throws InvalidResourceRequestException; /** * Get valid Node Labels for this queue diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index e773384d89..dcb60fcc5e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -21,8 +21,8 @@ import java.io.IOException; import java.io.InputStream; import java.util.ArrayList; -import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.Comparator; import java.util.EnumSet; import java.util.HashMap; @@ -65,6 +65,7 @@ import org.apache.hadoop.yarn.api.records.ResourceOption; import org.apache.hadoop.yarn.api.records.ResourceRequest; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.proto.YarnServiceProtos.SchedulerResourceTypes; @@ -895,9 +896,36 @@ private synchronized void doneApplicationAttempt( } } + // It is crucial to acquire leaf queue lock first to prevent: + // 1. Race condition when calculating the delta resource in + // SchedContainerChangeRequest + // 2. Deadlock with the scheduling thread. + private LeafQueue updateIncreaseRequests( + List increaseRequests, + FiCaSchedulerApp app) { + if (null == increaseRequests || increaseRequests.isEmpty()) { + return null; + } + // Pre-process increase requests + List schedIncreaseRequests = + createSchedContainerChangeRequests(increaseRequests, true); + LeafQueue leafQueue = (LeafQueue) app.getQueue(); + synchronized(leafQueue) { + // make sure we aren't stopping/removing the application + // when the allocate comes in + if (app.isStopped()) { + return null; + } + // Process increase resource requests + if (app.updateIncreaseRequests(schedIncreaseRequests)) { + return leafQueue; + } + return null; + } + } + @Override - // Note: when AM asks to decrease container or release container, we will - // acquire scheduler lock + // Note: when AM asks to release container, we will acquire scheduler lock @Lock(Lock.NoLock.class) public Allocation allocate(ApplicationAttemptId applicationAttemptId, List ask, List release, @@ -909,26 +937,23 @@ public Allocation allocate(ApplicationAttemptId applicationAttemptId, if (application == null) { return EMPTY_ALLOCATION; } - - // Sanity check - SchedulerUtils.normalizeRequests( - ask, getResourceCalculator(), getClusterResource(), - getMinimumResourceCapability(), getMaximumResourceCapability()); - - // Pre-process increase requests - List normalizedIncreaseRequests = - checkAndNormalizeContainerChangeRequests(increaseRequests, true); - - // Pre-process decrease requests - List normalizedDecreaseRequests = - checkAndNormalizeContainerChangeRequests(decreaseRequests, false); // Release containers releaseContainers(release, application); - Allocation allocation; + // update increase requests + LeafQueue updateDemandForQueue = + updateIncreaseRequests(increaseRequests, application); - LeafQueue updateDemandForQueue = null; + // Decrease containers + decreaseContainers(decreaseRequests, application); + + // Sanity check for new allocation requests + SchedulerUtils.normalizeRequests( + ask, getResourceCalculator(), getClusterResource(), + getMinimumResourceCapability(), getMaximumResourceCapability()); + + Allocation allocation; synchronized (application) { @@ -947,7 +972,8 @@ ask, getResourceCalculator(), getClusterResource(), } // Update application requests - if (application.updateResourceRequests(ask)) { + if (application.updateResourceRequests(ask) + && (updateDemandForQueue == null)) { updateDemandForQueue = (LeafQueue) application.getQueue(); } @@ -957,12 +983,6 @@ ask, getResourceCalculator(), getClusterResource(), } } - // Process increase resource requests - if (application.updateIncreaseRequests(normalizedIncreaseRequests) - && (updateDemandForQueue == null)) { - updateDemandForQueue = (LeafQueue) application.getQueue(); - } - if (application.isWaitingForAMContainer()) { // Allocate is for AM and update AM blacklist for this application.updateAMBlacklist( @@ -971,8 +991,6 @@ ask, getResourceCalculator(), getClusterResource(), application.updateBlacklist(blacklistAdditions, blacklistRemovals); } - // Decrease containers - decreaseContainers(normalizedDecreaseRequests, application); allocation = application.getAllocation(getResourceCalculator(), clusterResource, getMinimumResourceCapability()); @@ -1167,7 +1185,8 @@ private void updateSchedulerHealth(long now, FiCaSchedulerNode node, .getAssignmentInformation().getReserved()); } - private synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + @VisibleForTesting + protected synchronized void allocateContainersToNode(FiCaSchedulerNode node) { if (rmContext.isWorkPreservingRecoveryEnabled() && !rmContext.isSchedulerReadyForAllocatingContainers()) { return; @@ -1517,48 +1536,30 @@ protected synchronized void completedContainerInternal( } } - @Lock(CapacityScheduler.class) @Override - protected synchronized void decreaseContainer( - SchedContainerChangeRequest decreaseRequest, + protected void decreaseContainer(SchedContainerChangeRequest decreaseRequest, SchedulerApplicationAttempt attempt) { RMContainer rmContainer = decreaseRequest.getRMContainer(); - // Check container status before doing decrease if (rmContainer.getState() != RMContainerState.RUNNING) { LOG.info("Trying to decrease a container not in RUNNING state, container=" + rmContainer + " state=" + rmContainer.getState().name()); return; } - - // Delta capacity of this decrease request is 0, this decrease request may - // just to cancel increase request - if (Resources.equals(decreaseRequest.getDeltaCapacity(), Resources.none())) { - if (LOG.isDebugEnabled()) { - LOG.debug("Decrease target resource equals to existing resource for container:" - + decreaseRequest.getContainerId() - + " ignore this decrease request."); - } - return; - } - - // Save resource before decrease - Resource resourceBeforeDecrease = - Resources.clone(rmContainer.getContainer().getResource()); - FiCaSchedulerApp app = (FiCaSchedulerApp)attempt; LeafQueue queue = (LeafQueue) attempt.getQueue(); - queue.decreaseContainer(clusterResource, decreaseRequest, app); - - // Notify RMNode the container will be decreased - this.rmContext.getDispatcher().getEventHandler() - .handle(new RMNodeDecreaseContainerEvent(decreaseRequest.getNodeId(), - Arrays.asList(rmContainer.getContainer()))); - - LOG.info("Application attempt " + app.getApplicationAttemptId() - + " decreased container:" + decreaseRequest.getContainerId() + " from " - + resourceBeforeDecrease + " to " - + decreaseRequest.getTargetCapacity()); + try { + queue.decreaseContainer(clusterResource, decreaseRequest, app); + // Notify RMNode that the container can be pulled by NodeManager in the + // next heartbeat + this.rmContext.getDispatcher().getEventHandler() + .handle(new RMNodeDecreaseContainerEvent( + decreaseRequest.getNodeId(), + Collections.singletonList(rmContainer.getContainer()))); + } catch (InvalidResourceRequestException e) { + LOG.warn("Error happens when checking decrease request, Ignoring.." + + " exception=", e); + } } @Lock(Lock.NoLock.class) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 9e64b42ff9..56e450247d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -47,10 +47,12 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.nodelabels.CommonNodeLabelsManager; import org.apache.hadoop.yarn.security.AccessType; +import org.apache.hadoop.yarn.server.resourcemanager.RMServerUtils; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerEventType; @@ -1676,11 +1678,17 @@ public synchronized void setOrderingPolicy( public Priority getDefaultApplicationPriority() { return defaultAppPriorityPerQueue; } - + + /** + * + * @param clusterResource Total cluster resource + * @param decreaseRequest The decrease request + * @param app The application of interest + */ @Override public void decreaseContainer(Resource clusterResource, SchedContainerChangeRequest decreaseRequest, - FiCaSchedulerApp app) { + FiCaSchedulerApp app) throws InvalidResourceRequestException { // If the container being decreased is reserved, we need to unreserve it // first. RMContainer rmContainer = decreaseRequest.getRMContainer(); @@ -1688,25 +1696,62 @@ public void decreaseContainer(Resource clusterResource, unreserveIncreasedContainer(clusterResource, app, (FiCaSchedulerNode)decreaseRequest.getSchedulerNode(), rmContainer); } - - // Delta capacity is negative when it's a decrease request - Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); - + boolean resourceDecreased = false; + Resource resourceBeforeDecrease; + // Grab queue lock to avoid race condition when getting container resource synchronized (this) { - // Delta is negative when it's a decrease request - releaseResource(clusterResource, app, absDelta, - decreaseRequest.getNodePartition(), decreaseRequest.getRMContainer(), - true); - // Notify application - app.decreaseContainer(decreaseRequest); - // Notify node - decreaseRequest.getSchedulerNode() - .decreaseContainer(decreaseRequest.getContainerId(), absDelta); + // Make sure the decrease request is valid in terms of current resource + // and target resource. This must be done under the leaf queue lock. + // Throws exception if the check fails. + RMServerUtils.checkSchedContainerChangeRequest(decreaseRequest, false); + // Save resource before decrease for debug log + resourceBeforeDecrease = + Resources.clone(rmContainer.getAllocatedResource()); + // Do we have increase request for the same container? If so, remove it + boolean hasIncreaseRequest = + app.removeIncreaseRequest(decreaseRequest.getNodeId(), + decreaseRequest.getPriority(), decreaseRequest.getContainerId()); + if (hasIncreaseRequest) { + if (LOG.isDebugEnabled()) { + LOG.debug("While processing decrease requests, found an increase" + + " request for the same container " + + decreaseRequest.getContainerId() + + ", removed the increase request"); + } + } + // Delta capacity is negative when it's a decrease request + Resource absDelta = Resources.negate(decreaseRequest.getDeltaCapacity()); + if (Resources.equals(absDelta, Resources.none())) { + // If delta capacity of this decrease request is 0, this decrease + // request serves the purpose of cancelling an existing increase request + // if any + if (LOG.isDebugEnabled()) { + LOG.debug("Decrease target resource equals to existing resource for" + + " container:" + decreaseRequest.getContainerId() + + " ignore this decrease request."); + } + } else { + // Release the delta resource + releaseResource(clusterResource, app, absDelta, + decreaseRequest.getNodePartition(), + decreaseRequest.getRMContainer(), + true); + // Notify application + app.decreaseContainer(decreaseRequest); + // Notify node + decreaseRequest.getSchedulerNode() + .decreaseContainer(decreaseRequest.getContainerId(), absDelta); + resourceDecreased = true; + } } - // Notify parent - if (getParent() != null) { + if (resourceDecreased) { + // Notify parent queue outside of leaf queue lock getParent().decreaseContainer(clusterResource, decreaseRequest, app); + LOG.info("Application attempt " + app.getApplicationAttemptId() + + " decreased container:" + decreaseRequest.getContainerId() + + " from " + resourceBeforeDecrease + " to " + + decreaseRequest.getTargetCapacity()); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index badab723af..a7d8796eb1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -46,6 +46,7 @@ import org.apache.hadoop.yarn.api.records.QueueState; import org.apache.hadoop.yarn.api.records.QueueUserACLInfo; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.exceptions.InvalidResourceRequestException; import org.apache.hadoop.yarn.factories.RecordFactory; import org.apache.hadoop.yarn.factory.providers.RecordFactoryProvider; import org.apache.hadoop.yarn.security.AccessType; @@ -656,7 +657,8 @@ private synchronized void internalReleaseResource(Resource clusterResource, @Override public void decreaseContainer(Resource clusterResource, - SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) { + SchedContainerChangeRequest decreaseRequest, FiCaSchedulerApp app) + throws InvalidResourceRequestException { // delta capacity is negative when it's a decrease request Resource absDeltaCapacity = Resources.negate(decreaseRequest.getDeltaCapacity()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java index 672af64557..c08af9d843 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestContainerResizing.java @@ -21,8 +21,11 @@ import java.util.ArrayList; import java.util.Arrays; import java.util.Collection; +import java.util.Collections; import java.util.List; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.yarn.api.protocolrecords.AllocateResponse; import org.apache.hadoop.yarn.api.records.ApplicationAttemptId; import org.apache.hadoop.yarn.api.records.ApplicationId; @@ -47,8 +50,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNode; import org.apache.hadoop.yarn.server.resourcemanager.rmnode.RMNodeImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .SchedulerApplicationAttempt; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.SchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica.FiCaSchedulerApp; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica + .FiCaSchedulerNode; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.AppAttemptRemovedSchedulerEvent; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.event.NodeUpdateSchedulerEvent; import org.apache.hadoop.yarn.util.resource.Resources; @@ -57,12 +64,48 @@ import org.junit.Test; public class TestContainerResizing { + private static final Log LOG = LogFactory.getLog(TestContainerResizing.class); private final int GB = 1024; private YarnConfiguration conf; RMNodeLabelsManager mgr; + class MyScheduler extends CapacityScheduler { + /* + * A Mock Scheduler to simulate the potential effect of deadlock between: + * 1. The AbstractYarnScheduler.decreaseContainers() call (from + * ApplicationMasterService thread) + * 2. The CapacityScheduler.allocateContainersToNode() call (from the + * scheduler thread) + */ + MyScheduler() { + super(); + } + + @Override + protected void decreaseContainers( + List decreaseRequests, + SchedulerApplicationAttempt attempt) { + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + LOG.debug("Thread interrupted."); + } + super.decreaseContainers(decreaseRequests, attempt); + } + + @Override + public synchronized void allocateContainersToNode(FiCaSchedulerNode node) { + try { + Thread.sleep(1000); + } catch(InterruptedException e) { + LOG.debug("Thread interrupted."); + } + super.allocateContainersToNode(node); + } + } + @Before public void setUp() throws Exception { conf = new YarnConfiguration(); @@ -958,6 +1001,50 @@ public RMNodeLabelsManager createNodeLabelManager() { rm1.close(); } + @Test (timeout = 60000) + public void testDecreaseContainerWillNotDeadlockContainerAllocation() + throws Exception { + // create and start MockRM with our MyScheduler + MockRM rm = new MockRM() { + @Override + public ResourceScheduler createScheduler() { + CapacityScheduler cs = new MyScheduler(); + cs.setConf(conf); + return cs; + } + }; + rm.start(); + // register a node + MockNM nm = rm.registerNode("h1:1234", 20 * GB); + // submit an application -> app1 + RMApp app1 = rm.submitApp(3 * GB, "app", "user", null, "default"); + MockAM am1 = MockRM.launchAndRegisterAM(app1, rm, nm); + // making sure resource is allocated + checkUsedResource(rm, "default", 3 * GB, null); + FiCaSchedulerApp app = getFiCaSchedulerApp(rm, app1.getApplicationId()); + Assert.assertEquals(3 * GB, + app.getAppAttemptResourceUsage().getUsed().getMemory()); + // making sure container is launched + ContainerId containerId1 = + ContainerId.newContainerId(am1.getApplicationAttemptId(), 1); + sentRMContainerLaunched(rm, containerId1); + // submit allocation request for a new container + am1.allocate(Collections.singletonList(ResourceRequest.newInstance( + Priority.newInstance(1), "*", Resources.createResource(2 * GB), 1)), + null); + // nm reports status update and triggers container allocation + nm.nodeHeartbeat(true); + // *In the mean time*, am1 asks to decrease its AM container resource from + // 3GB to 1GB + AllocateResponse response = am1.sendContainerResizingRequest(null, + Collections.singletonList(ContainerResourceChangeRequest + .newInstance(containerId1, Resources.createResource(GB)))); + // verify that the containe resource is decreased + verifyContainerDecreased(response, containerId1, GB); + + rm.close(); + } + private void checkPendingResource(MockRM rm, String queueName, int memory, String label) { CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler();