From 189a63a719c63b67a1783a280bfc2f72dcb55277 Mon Sep 17 00:00:00 2001 From: tgraves Date: Thu, 23 Apr 2015 14:39:25 +0000 Subject: [PATCH] YARN-3434. Interaction between reservations and userlimit can result in significant ULF violation --- hadoop-yarn-project/CHANGES.txt | 3 + .../scheduler/ResourceLimits.java | 28 ++- .../scheduler/capacity/AbstractCSQueue.java | 94 +++++----- .../scheduler/capacity/LeafQueue.java | 162 ++++++++---------- .../scheduler/capacity/TestReservations.java | 65 ++++--- 5 files changed, 186 insertions(+), 166 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index f4413a891d..d335389fdb 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -252,6 +252,9 @@ Release 2.8.0 - UNRELEASED YARN-3495. Confusing log generated by FairScheduler. (Brahma Reddy Battula via ozawa) + YARN-3434. Interaction between reservations and userlimit can result in + significant ULF violation (tgraves) + Release 2.7.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java index 12333e877b..8074794645 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceLimits.java @@ -19,22 +19,44 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.util.resource.Resources; /** * Resource limits for queues/applications, this means max overall (please note * that, it's not "extra") resource you can get. */ public class ResourceLimits { + volatile Resource limit; + + // This is special limit that goes with the RESERVE_CONT_LOOK_ALL_NODES + // config. This limit indicates how much we need to unreserve to allocate + // another container. + private volatile Resource amountNeededUnreserve; + public ResourceLimits(Resource limit) { + this.amountNeededUnreserve = Resources.none(); this.limit = limit; } - - volatile Resource limit; + + public ResourceLimits(Resource limit, Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; + this.limit = limit; + } + public Resource getLimit() { return limit; } - + + public Resource getAmountNeededUnreserve() { + return amountNeededUnreserve; + } + public void setLimit(Resource limit) { this.limit = limit; } + + public void setAmountNeededUnreserve(Resource amountNeededUnreserve) { + this.amountNeededUnreserve = amountNeededUnreserve; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 9233e015ac..47cea19dbd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -85,7 +85,7 @@ public abstract class AbstractCSQueue implements CSQueue { // Track capacities like used-capcity/abs-used-capacity/capacity/abs-capacity, // etc. QueueCapacities queueCapacities; - + private final RecordFactory recordFactory = RecordFactoryProvider.getRecordFactory(null); protected CapacitySchedulerContext csContext; @@ -473,55 +473,55 @@ synchronized boolean canAssignToThisQueue(Resource clusterResource, getCurrentLimitResource(nodePartition, clusterResource, currentResourceLimits, schedulingMode); - // if reservation continous looking enabled, check to see if could we - // potentially use this node instead of a reserved node if the application - // has reserved containers. - // TODO, now only consider reservation cases when the node has no label - if (this.reservationsContinueLooking - && nodePartition.equals(RMNodeLabelsManager.NO_LABEL) - && Resources.greaterThan(resourceCalculator, clusterResource, - resourceCouldBeUnreserved, Resources.none())) { - // resource-without-reserved = used - reserved - Resource newTotalWithoutReservedResource = - Resources.subtract(newTotalResource, resourceCouldBeUnreserved); - - // when total-used-without-reserved-resource < currentLimit, we still - // have chance to allocate on this node by unreserving some containers - if (Resources.lessThan(resourceCalculator, clusterResource, - newTotalWithoutReservedResource, currentLimitResource)) { - if (LOG.isDebugEnabled()) { - LOG.debug("try to use reserved: " + getQueueName() - + " usedResources: " + queueUsage.getUsed() - + ", clusterResources: " + clusterResource - + ", reservedResources: " + resourceCouldBeUnreserved - + ", capacity-without-reserved: " - + newTotalWithoutReservedResource + ", maxLimitCapacity: " - + currentLimitResource); - } - return true; - } - } - - // Check if we over current-resource-limit computed. if (Resources.greaterThan(resourceCalculator, clusterResource, newTotalResource, currentLimitResource)) { - return false; - } - if (LOG.isDebugEnabled()) { - LOG.debug(getQueueName() - + "Check assign to queue, nodePartition=" - + nodePartition - + " usedResources: " - + queueUsage.getUsed(nodePartition) - + " clusterResources: " - + clusterResource - + " currentUsedCapacity " - + Resources.divide(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), - labelManager.getResourceByLabel(nodePartition, clusterResource)) - + " max-capacity: " - + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); + // if reservation continous looking enabled, check to see if could we + // potentially use this node instead of a reserved node if the application + // has reserved containers. + // TODO, now only consider reservation cases when the node has no label + if (this.reservationsContinueLooking + && nodePartition.equals(RMNodeLabelsManager.NO_LABEL) + && Resources.greaterThan(resourceCalculator, clusterResource, + resourceCouldBeUnreserved, Resources.none())) { + // resource-without-reserved = used - reserved + Resource newTotalWithoutReservedResource = + Resources.subtract(newTotalResource, resourceCouldBeUnreserved); + + // when total-used-without-reserved-resource < currentLimit, we still + // have chance to allocate on this node by unreserving some containers + if (Resources.lessThan(resourceCalculator, clusterResource, + newTotalWithoutReservedResource, currentLimitResource)) { + if (LOG.isDebugEnabled()) { + LOG.debug("try to use reserved: " + getQueueName() + + " usedResources: " + queueUsage.getUsed() + + ", clusterResources: " + clusterResource + + ", reservedResources: " + resourceCouldBeUnreserved + + ", capacity-without-reserved: " + + newTotalWithoutReservedResource + ", maxLimitCapacity: " + + currentLimitResource); + } + currentResourceLimits.setAmountNeededUnreserve(Resources.subtract(newTotalResource, + currentLimitResource)); + return true; + } + } + if (LOG.isDebugEnabled()) { + LOG.debug(getQueueName() + + "Check assign to queue, nodePartition=" + + nodePartition + + " usedResources: " + + queueUsage.getUsed(nodePartition) + + " clusterResources: " + + clusterResource + + " currentUsedCapacity " + + Resources.divide(resourceCalculator, clusterResource, + queueUsage.getUsed(nodePartition), + labelManager.getResourceByLabel(nodePartition, clusterResource)) + + " max-capacity: " + + queueCapacities.getAbsoluteMaximumCapacity(nodePartition) + ")"); + } + return false; } return true; } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 69e7e535b7..22aafaaaaa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -119,8 +119,8 @@ public class LeafQueue extends AbstractCSQueue { private final QueueResourceLimitsInfo queueResourceLimitsInfo = new QueueResourceLimitsInfo(); - private volatile ResourceLimits currentResourceLimits = null; - + private volatile ResourceLimits cachedResourceLimitsForHeadroom = null; + private OrderingPolicy orderingPolicy = new FifoOrderingPolicy(); @@ -151,7 +151,7 @@ protected synchronized void setupQueueConfigs(Resource clusterResource) this.lastClusterResource = clusterResource; updateAbsoluteCapacityResource(clusterResource); - this.currentResourceLimits = new ResourceLimits(clusterResource); + this.cachedResourceLimitsForHeadroom = new ResourceLimits(clusterResource); // Initialize headroom info, also used for calculating application // master resource limits. Since this happens during queue initialization @@ -715,14 +715,14 @@ public synchronized void removeApplicationAttempt( activateApplications(); LOG.info("Application removed -" + - " appId: " + application.getApplicationId() + - " user: " + application.getUser() + + " appId: " + application.getApplicationId() + + " user: " + application.getUser() + " queue: " + getQueueName() + " #user-pending-applications: " + user.getPendingApplications() + " #user-active-applications: " + user.getActiveApplications() + " #queue-pending-applications: " + getNumPendingApplications() + " #queue-active-applications: " + getNumActiveApplications() - ); + ); } private synchronized FiCaSchedulerApp getApplication( @@ -854,18 +854,18 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, // before all higher priority ones are serviced. Resource userLimit = computeUserLimitAndSetHeadroom(application, clusterResource, - required, node.getPartition(), schedulingMode); - + required, node.getPartition(), schedulingMode); + // Check queue max-capacity limit if (!super.canAssignToThisQueue(clusterResource, node.getPartition(), - this.currentResourceLimits, required, + currentResourceLimits, required, application.getCurrentReservation(), schedulingMode)) { return NULL_ASSIGNMENT; } // Check user limit if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, true, node.getPartition())) { + application, node.getPartition(), currentResourceLimits)) { break; } @@ -906,9 +906,9 @@ public synchronized CSAssignment assignContainers(Resource clusterResource, } // Try to schedule - CSAssignment assignment = - assignContainersOnNode(clusterResource, node, application, priority, - null, schedulingMode); + CSAssignment assignment = + assignContainersOnNode(clusterResource, node, application, priority, + null, schedulingMode, currentResourceLimits); // Did the application skip this node? if (assignment.getSkipped()) { @@ -975,7 +975,7 @@ private synchronized CSAssignment assignReservedContainer( // Try to assign if we have sufficient resources CSAssignment tmp = assignContainersOnNode(clusterResource, node, application, priority, - rmContainer, schedulingMode); + rmContainer, schedulingMode, new ResourceLimits(Resources.none())); // Doesn't matter... since it's already charged for at time of reservation // "re-reservation" is *free* @@ -1026,7 +1026,7 @@ private Resource getHeadroom(User user, Resource currentResourceLimit, private void setQueueResourceLimitsInfo( Resource clusterResource) { synchronized (queueResourceLimitsInfo) { - queueResourceLimitsInfo.setQueueCurrentLimit(currentResourceLimits + queueResourceLimitsInfo.setQueueCurrentLimit(cachedResourceLimitsForHeadroom .getLimit()); queueResourceLimitsInfo.setClusterResource(clusterResource); } @@ -1048,13 +1048,13 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, setQueueResourceLimitsInfo(clusterResource); Resource headroom = - getHeadroom(queueUser, currentResourceLimits.getLimit(), + getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit); if (LOG.isDebugEnabled()) { LOG.debug("Headroom calculation for user " + user + ": " + " userLimit=" + userLimit + - " queueMaxAvailRes=" + currentResourceLimits.getLimit() + + " queueMaxAvailRes=" + cachedResourceLimitsForHeadroom.getLimit() + " consumed=" + queueUser.getUsed() + " headroom=" + headroom); } @@ -1169,7 +1169,7 @@ private Resource computeUserLimit(FiCaSchedulerApp application, @Private protected synchronized boolean canAssignToUser(Resource clusterResource, String userName, Resource limit, FiCaSchedulerApp application, - boolean checkReservations, String nodePartition) { + String nodePartition, ResourceLimits currentResoureLimits) { User user = getUser(userName); // Note: We aren't considering the current request since there is a fixed @@ -1180,13 +1180,13 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, limit)) { // if enabled, check to see if could we potentially use this node instead // of a reserved node if the application has reserved containers - if (this.reservationsContinueLooking && checkReservations - && nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { + if (this.reservationsContinueLooking && + nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { if (Resources.lessThanOrEqual( resourceCalculator, clusterResource, - Resources.subtract(user.getUsed(), - application.getCurrentReservation()), limit)) { + Resources.subtract(user.getUsed(),application.getCurrentReservation()), + limit)) { if (LOG.isDebugEnabled()) { LOG.debug("User " + userName + " in queue " + getQueueName() @@ -1194,6 +1194,13 @@ protected synchronized boolean canAssignToUser(Resource clusterResource, + user.getUsed() + " reserved: " + application.getCurrentReservation() + " limit: " + limit); } + Resource amountNeededToUnreserve = Resources.subtract(user.getUsed(nodePartition), limit); + // we can only acquire a new container if we unreserve first since we ignored the + // user limit. Choose the max of user limit or what was previously set by max + // capacity. + currentResoureLimits.setAmountNeededUnreserve( + Resources.max(resourceCalculator, clusterResource, + currentResoureLimits.getAmountNeededUnreserve(), amountNeededToUnreserve)); return true; } } @@ -1240,7 +1247,8 @@ resourceCalculator, required, getMaximumAllocation() private CSAssignment assignContainersOnNode(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - RMContainer reservedContainer, SchedulingMode schedulingMode) { + RMContainer reservedContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { CSAssignment assigned; @@ -1254,7 +1262,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignNodeLocalContainers(clusterResource, nodeLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned.getResource(), Resources.none())) { @@ -1283,7 +1291,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignRackLocalContainers(clusterResource, rackLocalResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); if (Resources.greaterThan(resourceCalculator, clusterResource, assigned.getResource(), Resources.none())) { @@ -1312,7 +1320,7 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, assigned = assignOffSwitchContainers(clusterResource, offSwitchResourceRequest, node, application, priority, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); // update locality statistics if (allocatedContainer.getValue() != null) { @@ -1324,19 +1332,11 @@ private CSAssignment assignContainersOnNode(Resource clusterResource, return SKIP_ASSIGNMENT; } - - private Resource getMinimumResourceNeedUnreserved(Resource askedResource) { - // First we need to get minimum resource we need unreserve - // minimum-resource-need-unreserve = used + asked - limit - return Resources.subtract( - Resources.add(queueUsage.getUsed(), askedResource), - currentResourceLimits.getLimit()); - } @Private protected boolean findNodeToUnreserve(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, - Resource askedResource, Resource minimumUnreservedResource) { + Resource minimumUnreservedResource) { // need to unreserve some other container first NodeId idToUnreserve = application.getNodeIdToUnreserve(priority, minimumUnreservedResource, @@ -1357,7 +1357,7 @@ protected boolean findNodeToUnreserve(Resource clusterResource, LOG.debug("unreserving for app: " + application.getApplicationId() + " on nodeId: " + idToUnreserve + " in order to replace reserved application and place it on node: " - + node.getNodeID() + " needing: " + askedResource); + + node.getNodeID() + " needing: " + minimumUnreservedResource); } // headroom @@ -1376,47 +1376,16 @@ protected boolean findNodeToUnreserve(Resource clusterResource, return true; } - @Private - protected boolean checkLimitsToReserve(Resource clusterResource, - FiCaSchedulerApp application, Resource capability, String nodePartition, - SchedulingMode schedulingMode) { - // we can't reserve if we got here based on the limit - // checks assuming we could unreserve!!! - Resource userLimit = computeUserLimitAndSetHeadroom(application, - clusterResource, capability, nodePartition, schedulingMode); - - // Check queue max-capacity limit, - // TODO: Consider reservation on labels - if (!canAssignToThisQueue(clusterResource, RMNodeLabelsManager.NO_LABEL, - this.currentResourceLimits, capability, Resources.none(), schedulingMode)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit queue limit"); - } - return false; - } - - // Check user limit - if (!canAssignToUser(clusterResource, application.getUser(), userLimit, - application, false, nodePartition)) { - if (LOG.isDebugEnabled()) { - LOG.debug("was going to reserve but hit user limit"); - } - return false; - } - return true; - } - - private CSAssignment assignNodeLocalContainers(Resource clusterResource, ResourceRequest nodeLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.NODE_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, nodeLocalResourceRequest, NodeType.NODE_LOCAL, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); } return new CSAssignment(Resources.none(), NodeType.NODE_LOCAL); @@ -1426,12 +1395,12 @@ private CSAssignment assignRackLocalContainers(Resource clusterResource, ResourceRequest rackLocalResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.RACK_LOCAL, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, rackLocalResourceRequest, NodeType.RACK_LOCAL, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); } return new CSAssignment(Resources.none(), NodeType.RACK_LOCAL); @@ -1441,12 +1410,12 @@ private CSAssignment assignOffSwitchContainers(Resource clusterResource, ResourceRequest offSwitchResourceRequest, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, RMContainer reservedContainer, MutableObject allocatedContainer, - SchedulingMode schedulingMode) { + SchedulingMode schedulingMode, ResourceLimits currentResoureLimits) { if (canAssign(application, priority, node, NodeType.OFF_SWITCH, reservedContainer)) { return assignContainer(clusterResource, node, application, priority, offSwitchResourceRequest, NodeType.OFF_SWITCH, reservedContainer, - allocatedContainer, schedulingMode); + allocatedContainer, schedulingMode, currentResoureLimits); } return new CSAssignment(Resources.none(), NodeType.OFF_SWITCH); @@ -1529,7 +1498,8 @@ Container createContainer(FiCaSchedulerApp application, FiCaSchedulerNode node, private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode node, FiCaSchedulerApp application, Priority priority, ResourceRequest request, NodeType type, RMContainer rmContainer, - MutableObject createdContainer, SchedulingMode schedulingMode) { + MutableObject createdContainer, SchedulingMode schedulingMode, + ResourceLimits currentResoureLimits) { if (LOG.isDebugEnabled()) { LOG.debug("assignContainers: node=" + node.getNodeName() + " application=" + application.getApplicationId() @@ -1573,13 +1543,17 @@ private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode LOG.warn("Couldn't get container for allocation!"); return new CSAssignment(Resources.none(), type); } - + boolean shouldAllocOrReserveNewContainer = shouldAllocOrReserveNewContainer( application, priority, capability); // Can we allocate a container on this node? int availableContainers = resourceCalculator.computeAvailableContainers(available, capability); + + boolean needToUnreserve = Resources.greaterThan(resourceCalculator,clusterResource, + currentResoureLimits.getAmountNeededUnreserve(), Resources.none()); + if (availableContainers > 0) { // Allocate... @@ -1588,20 +1562,24 @@ private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode unreserve(application, priority, node, rmContainer); } else if (this.reservationsContinueLooking && node.getLabels().isEmpty()) { // when reservationsContinueLooking is set, we may need to unreserve - // some containers to meet this queue and its parents' resource limits + // some containers to meet this queue, its parents', or the users' resource limits. // TODO, need change here when we want to support continuous reservation // looking for labeled partitions. - Resource minimumUnreservedResource = - getMinimumResourceNeedUnreserved(capability); - if (!shouldAllocOrReserveNewContainer - || Resources.greaterThan(resourceCalculator, clusterResource, - minimumUnreservedResource, Resources.none())) { + if (!shouldAllocOrReserveNewContainer || needToUnreserve) { + // If we shouldn't allocate/reserve new container then we should unreserve one the same + // size we are asking for since the currentResoureLimits.getAmountNeededUnreserve + // could be zero. If the limit was hit then use the amount we need to unreserve to be + // under the limit. + Resource amountToUnreserve = capability; + if (needToUnreserve) { + amountToUnreserve = currentResoureLimits.getAmountNeededUnreserve(); + } boolean containerUnreserved = findNodeToUnreserve(clusterResource, node, application, priority, - capability, minimumUnreservedResource); + amountToUnreserve); // When (minimum-unreserved-resource > 0 OR we cannot allocate new/reserved // container (That means we *have to* unreserve some resource to - // continue)). If we failed to unreserve some resource, + // continue)). If we failed to unreserve some resource, we can't continue. if (!containerUnreserved) { return new CSAssignment(Resources.none(), type); } @@ -1642,13 +1620,13 @@ private CSAssignment assignContainer(Resource clusterResource, FiCaSchedulerNode if (shouldAllocOrReserveNewContainer || rmContainer != null) { if (reservationsContinueLooking && rmContainer == null) { - // we could possibly ignoring parent queue capacity limits when - // reservationsContinueLooking is set. - // If we're trying to reserve a container here, not container will be - // unreserved for reserving the new one. Check limits again before - // reserve the new container - if (!checkLimitsToReserve(clusterResource, - application, capability, node.getPartition(), schedulingMode)) { + // we could possibly ignoring queue capacity or user limits when + // reservationsContinueLooking is set. Make sure we didn't need to unreserve + // one. + if (needToUnreserve) { + if (LOG.isDebugEnabled()) { + LOG.debug("we needed to unreserve to be able to allocate"); + } return new CSAssignment(Resources.none(), type); } } @@ -1811,14 +1789,14 @@ private void updateCurrentResourceLimits( // Even if ParentQueue will set limits respect child's max queue capacity, // but when allocating reserved container, CapacityScheduler doesn't do // this. So need cap limits by queue's max capacity here. - this.currentResourceLimits = currentResourceLimits; + this.cachedResourceLimitsForHeadroom = new ResourceLimits(currentResourceLimits.getLimit()); Resource queueMaxResource = Resources.multiplyAndNormalizeDown(resourceCalculator, labelManager .getResourceByLabel(RMNodeLabelsManager.NO_LABEL, clusterResource), queueCapacities .getAbsoluteMaximumCapacity(RMNodeLabelsManager.NO_LABEL), minimumAllocation); - this.currentResourceLimits.setLimit(Resources.min(resourceCalculator, + this.cachedResourceLimitsForHeadroom.setLimit(Resources.min(resourceCalculator, clusterResource, queueMaxResource, currentResourceLimits.getLimit())); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java index fc546ee30c..44845cfd35 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestReservations.java @@ -748,14 +748,14 @@ public void testFindNodeToUnreserve() throws Exception { // nothing reserved boolean res = a.findNodeToUnreserve(csContext.getClusterResource(), - node_1, app_0, priorityMap, capability, capability); + node_1, app_0, priorityMap, capability); assertFalse(res); // reserved but scheduler doesn't know about that node. app_0.reserve(node_1, priorityMap, rmContainer, container); node_1.reserveResource(app_0, priorityMap, rmContainer); res = a.findNodeToUnreserve(csContext.getClusterResource(), node_1, app_0, - priorityMap, capability, capability); + priorityMap, capability); assertFalse(res); } @@ -858,12 +858,13 @@ public void testAssignToQueue() throws Exception { // allocate to queue so that the potential new capacity is greater then // absoluteMaxCapacity Resource capability = Resources.createResource(32 * GB, 0); + ResourceLimits limits = new ResourceLimits(clusterResource); boolean res = a.canAssignToThisQueue(clusterResource, - RMNodeLabelsManager.NO_LABEL, new ResourceLimits( - clusterResource), capability, Resources.none(), + RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); // now add in reservations and make sure it continues if config set // allocate to queue so that the potential new capacity is greater then @@ -880,38 +881,43 @@ RMNodeLabelsManager.NO_LABEL, new ResourceLimits( assertEquals(3 * GB, node_1.getUsedResource().getMemory()); capability = Resources.createResource(5 * GB, 0); + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - RMNodeLabelsManager.NO_LABEL, new ResourceLimits( - clusterResource), capability, Resources.createResource(5 * GB), + RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertTrue(res); + // 16GB total, 13GB consumed (8 allocated, 5 reserved). asking for 5GB so we would have to + // unreserve 2GB to get the total 5GB needed. + // also note vcore checks not enabled + assertEquals(Resources.createResource(2 * GB, 3), limits.getAmountNeededUnreserve()); // tell to not check reservations + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - RMNodeLabelsManager.NO_LABEL, new ResourceLimits( - clusterResource), capability, Resources.none(), + RMNodeLabelsManager.NO_LABEL,limits, capability, Resources.none(), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); - // should return false no matter what checkReservations is passed - // in since feature is off + // should return false since reservations continue look is off. + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - RMNodeLabelsManager.NO_LABEL, new ResourceLimits( - clusterResource), capability, Resources.none(), + RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.none(), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); - + assertEquals(limits.getAmountNeededUnreserve(), Resources.none()); + limits = new ResourceLimits(clusterResource); res = a.canAssignToThisQueue(clusterResource, - RMNodeLabelsManager.NO_LABEL, new ResourceLimits( - clusterResource), capability, Resources.createResource(5 * GB), + RMNodeLabelsManager.NO_LABEL, limits, capability, Resources.createResource(5 * GB), SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY); assertFalse(res); + assertEquals(Resources.none(), limits.getAmountNeededUnreserve()); } public void refreshQueuesTurnOffReservationsContLook(LeafQueue a, @@ -1059,22 +1065,33 @@ public void testAssignToUser() throws Exception { assertEquals(5 * GB, node_0.getUsedResource().getMemory()); assertEquals(3 * GB, node_1.getUsedResource().getMemory()); - // set limit so subtrace reservations it can continue - Resource limit = Resources.createResource(12 * GB, 0); - boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, - true, ""); + // not over the limit + Resource limit = Resources.createResource(14 * GB, 0); + ResourceLimits userResourceLimits = new ResourceLimits(clusterResource); + boolean res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits); assertTrue(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); - // tell it not to check for reservations and should fail as already over - // limit - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, false, ""); - assertFalse(res); + + // set limit so it subtracts reservations and it can continue + limit = Resources.createResource(12 * GB, 0); + userResourceLimits = new ResourceLimits(clusterResource); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, + "", userResourceLimits); + assertTrue(res); + // limit set to 12GB, we are using 13GB (8 allocated, 5 reserved), to get under limit + // we need to unreserve 1GB + // also note vcore checks not enabled + assertEquals(Resources.createResource(1 * GB, 4), + userResourceLimits.getAmountNeededUnreserve()); refreshQueuesTurnOffReservationsContLook(a, csConf); + userResourceLimits = new ResourceLimits(clusterResource); // should now return false since feature off - res = a.canAssignToUser(clusterResource, user_0, limit, app_0, true, ""); + res = a.canAssignToUser(clusterResource, user_0, limit, app_0, "", userResourceLimits); assertFalse(res); + assertEquals(Resources.none(), userResourceLimits.getAmountNeededUnreserve()); } @Test