From 025f97c8c2d6ecf4d87b084f17ab0b8966fa2875 Mon Sep 17 00:00:00 2001 From: Szilard Nemeth <954799+szilard-nemeth@users.noreply.github.com> Date: Tue, 19 Oct 2021 12:24:58 +0200 Subject: [PATCH] YARN-10942. Move AbstractCSQueue fields to separate objects that are tracking usage. Contributed by Szilard Nemeth --- .../scheduler/capacity/AbstractCSQueue.java | 95 ++++------ .../capacity/CSQueueUsageTracker.java | 78 ++++++++ .../scheduler/capacity/LeafQueue.java | 71 ++++---- .../scheduler/capacity/ParentQueue.java | 40 ++-- .../TestAbsoluteResourceConfiguration.java | 172 +++++++++--------- 5 files changed, 258 insertions(+), 198 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUsageTracker.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 5c520b783b..e25c6941a3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -87,12 +87,10 @@ public abstract class AbstractCSQueue implements CSQueue { volatile CSQueue parent; final String queueName; private final String queuePath; - volatile int numContainers; final Resource minimumAllocation; volatile Resource maximumAllocation; private volatile QueueState state = null; - final CSQueueMetrics metrics; protected final PrivilegedEntity queueEntity; final ResourceCalculator resourceCalculator; @@ -107,16 +105,11 @@ public abstract class AbstractCSQueue implements CSQueue { new HashMap(); volatile boolean reservationsContinueLooking; - // Track resource usage-by-label like used-resource/pending-resource, etc. - volatile ResourceUsage queueUsage; - // Track capacities like // used-capacity/abs-used-capacity/capacity/abs-capacity, // etc. QueueCapacities queueCapacities; - QueueResourceQuotas queueResourceQuotas; - // -1 indicates lifetime is disabled private volatile long maxApplicationLifetime = -1; @@ -127,6 +120,8 @@ public abstract class AbstractCSQueue implements CSQueue { private volatile boolean defaultAppLifetimeWasSpecifiedInConfig = false; private CSQueuePreemption preemptionSettings; + CSQueueUsageTracker usageTracker; + public enum CapacityConfigType { // FIXME, from what I can see, Percentage mode can almost apply to weighted // and percentage mode at the same time, there's only small area need to be @@ -153,10 +148,6 @@ public enum CapacityConfigType { // is it a dynamic queue? private boolean dynamicQueue = false; - // The timestamp of the last submitted application to this queue. - // Only applies to dynamic queues. - private long lastSubmittedTimestamp; - public AbstractCSQueue(CapacitySchedulerContext cs, String queueName, CSQueue parent, CSQueue old) throws IOException { this(cs, cs.getConfiguration(), queueName, parent, old); @@ -175,24 +166,15 @@ public AbstractCSQueue(CapacitySchedulerContext cs, this.activitiesManager = cs.getActivitiesManager(); // must be called after parent and queueName is set - this.metrics = old != null ? + CSQueueMetrics metrics = old != null ? (CSQueueMetrics) old.getMetrics() : CSQueueMetrics.forQueue(getQueuePath(), parent, cs.getConfiguration().getEnableUserMetrics(), configuration); - + usageTracker = new CSQueueUsageTracker(metrics); this.csContext = cs; this.minimumAllocation = csContext.getMinimumResourceCapability(); - - // initialize ResourceUsage - queueUsage = new ResourceUsage(); queueEntity = new PrivilegedEntity(EntityType.QUEUE, getQueuePath()); - - // initialize QueueCapacities queueCapacities = new QueueCapacities(parent == null); - - // initialize queueResourceQuotas - queueResourceQuotas = new QueueResourceQuotas(); - ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); readLock = lock.readLock(); writeLock = lock.writeLock(); @@ -246,11 +228,11 @@ public float getUsedCapacity() { @Override public Resource getUsedResources() { - return queueUsage.getUsed(); + return usageTracker.getQueueUsage().getUsed(); } public int getNumContainers() { - return numContainers; + return usageTracker.getNumContainers(); } @Override @@ -260,7 +242,7 @@ public QueueState getState() { @Override public CSQueueMetrics getMetrics() { - return metrics; + return usageTracker.getMetrics(); } @Override @@ -650,8 +632,8 @@ protected void updateConfigurableResourceLimits(Resource clusterResource) { + " minResource={} and maxResource={}", getQueuePath(), minResource, maxResource); - queueResourceQuotas.setConfiguredMinResource(label, minResource); - queueResourceQuotas.setConfiguredMaxResource(label, maxResource); + usageTracker.getQueueResourceQuotas().setConfiguredMinResource(label, minResource); + usageTracker.getQueueResourceQuotas().setConfiguredMaxResource(label, maxResource); } } @@ -815,6 +797,7 @@ public QueueStatistics getQueueStatistics() { public Map getQueueConfigurations() { Map queueConfigurations = new HashMap<>(); Set nodeLabels = getNodeLabelsForQueue(); + QueueResourceQuotas queueResourceQuotas = usageTracker.getQueueResourceQuotas(); for (String nodeLabel : nodeLabels) { QueueConfigurations queueConfiguration = recordFactory.newRecordInstance(QueueConfigurations.class); @@ -857,10 +840,8 @@ void allocateResource(Resource clusterResource, Resource resource, String nodePartition) { writeLock.lock(); try { - queueUsage.incUsed(nodePartition, resource); - - ++numContainers; - + usageTracker.getQueueUsage().incUsed(nodePartition, resource); + usageTracker.increaseNumContainers(); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, nodePartition); } finally { @@ -872,12 +853,12 @@ protected void releaseResource(Resource clusterResource, Resource resource, String nodePartition) { writeLock.lock(); try { - queueUsage.decUsed(nodePartition, resource); + usageTracker.getQueueUsage().decUsed(nodePartition, resource); CSQueueUtils.updateQueueStatistics(resourceCalculator, clusterResource, this, labelManager, nodePartition); - --numContainers; + usageTracker.decreaseNumContainers(); } finally { writeLock.unlock(); } @@ -921,12 +902,12 @@ public QueueCapacities getQueueCapacities() { @Private public ResourceUsage getQueueResourceUsage() { - return queueUsage; + return usageTracker.getQueueUsage(); } @Override public QueueResourceQuotas getQueueResourceQuotas() { - return queueResourceQuotas; + return usageTracker.getQueueResourceQuotas(); } @Override @@ -1056,7 +1037,7 @@ boolean canAssignToThisQueue(Resource clusterResource, Resource currentLimitResource = getCurrentLimitResource(nodePartition, clusterResource, currentResourceLimits, schedulingMode); - Resource nowTotalUsed = queueUsage.getUsed(nodePartition); + Resource nowTotalUsed = usageTracker.getQueueUsage().getUsed(nodePartition); // Set headroom for currentResourceLimits: // When queue is a parent queue: Headroom = limit - used + killable @@ -1088,7 +1069,7 @@ boolean canAssignToThisQueue(Resource clusterResource, newTotalWithoutReservedResource, currentLimitResource)) { if (LOG.isDebugEnabled()) { LOG.debug("try to use reserved: " + getQueuePath() - + " usedResources: " + queueUsage.getUsed() + + " usedResources: " + usageTracker.getQueueUsage().getUsed() + ", clusterResources: " + clusterResource + ", reservedResources: " + resourceCouldBeUnreserved + ", capacity-without-reserved: " @@ -1103,7 +1084,7 @@ boolean canAssignToThisQueue(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("Failed to assign to queue: " + getQueuePath() + " nodePartition: " + nodePartition - + ", usedResources: " + queueUsage.getUsed(nodePartition) + + ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition) + ", clusterResources: " + clusterResource + ", reservedResources: " + resourceCouldBeUnreserved + ", maxLimitCapacity: " + currentLimitResource @@ -1114,11 +1095,11 @@ boolean canAssignToThisQueue(Resource clusterResource, if (LOG.isDebugEnabled()) { LOG.debug("Check assign to queue: " + getQueuePath() + " nodePartition: " + nodePartition - + ", usedResources: " + queueUsage.getUsed(nodePartition) + + ", usedResources: " + usageTracker.getQueueUsage().getUsed(nodePartition) + ", clusterResources: " + clusterResource + ", currentUsedCapacity: " + Resources .divide(resourceCalculator, clusterResource, - queueUsage.getUsed(nodePartition), labelManager + usageTracker.getQueueUsage().getUsed(nodePartition), labelManager .getResourceByLabel(nodePartition, clusterResource)) + ", max-capacity: " + queueCapacities .getAbsoluteMaximumCapacity(nodePartition)); @@ -1162,39 +1143,39 @@ private void countAndUpdate(String partition, Resource resource, @Override public void incReservedResource(String partition, Resource reservedRes) { - count(partition, reservedRes, queueUsage::incReserved, + count(partition, reservedRes, usageTracker.getQueueUsage()::incReserved, parent == null ? null : parent::incReservedResource); } @Override public void decReservedResource(String partition, Resource reservedRes) { - count(partition, reservedRes, queueUsage::decReserved, + count(partition, reservedRes, usageTracker.getQueueUsage()::decReserved, parent == null ? null : parent::decReservedResource); } @Override public void incPendingResource(String nodeLabel, Resource resourceToInc) { - count(nodeLabel, resourceToInc, queueUsage::incPending, + count(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incPending, parent == null ? null : parent::incPendingResource); } @Override public void decPendingResource(String nodeLabel, Resource resourceToDec) { - count(nodeLabel, resourceToDec, queueUsage::decPending, + count(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decPending, parent == null ? null : parent::decPendingResource); } @Override public void incUsedResource(String nodeLabel, Resource resourceToInc, SchedulerApplicationAttempt application) { - countAndUpdate(nodeLabel, resourceToInc, queueUsage::incUsed, + countAndUpdate(nodeLabel, resourceToInc, usageTracker.getQueueUsage()::incUsed, parent == null ? null : parent::incUsedResource); } @Override public void decUsedResource(String nodeLabel, Resource resourceToDec, SchedulerApplicationAttempt application) { - countAndUpdate(nodeLabel, resourceToDec, queueUsage::decUsed, + countAndUpdate(nodeLabel, resourceToDec, usageTracker.getQueueUsage()::decUsed, parent == null ? null : parent::decUsedResource); } @@ -1205,7 +1186,7 @@ public void decUsedResource(String nodeLabel, Resource resourceToDec, boolean hasPendingResourceRequest(String nodePartition, Resource cluster, SchedulingMode schedulingMode) { return SchedulerUtils.hasPendingResourceRequest(resourceCalculator, - queueUsage, nodePartition, cluster, schedulingMode); + usageTracker.getQueueUsage(), nodePartition, cluster, schedulingMode); } public boolean accessibleToPartition(String nodePartition) { @@ -1304,10 +1285,10 @@ public boolean accept(Resource cluster, schedulerContainer.getNodePartition(), cluster); } if (!Resources.fitsIn(resourceCalculator, - Resources.add(queueUsage.getUsed(partition), netAllocated), + Resources.add(usageTracker.getQueueUsage().getUsed(partition), netAllocated), maxResourceLimit)) { if (LOG.isDebugEnabled()) { - LOG.debug("Used resource=" + queueUsage.getUsed(partition) + LOG.debug("Used resource=" + usageTracker.getQueueUsage().getUsed(partition) + " exceeded maxResourceLimit of the queue =" + maxResourceLimit); } @@ -1534,7 +1515,7 @@ void deriveCapacityFromAbsoluteConfigurations(String label, // and the recently changed queue minResources. // capacity = effectiveMinResource / {parent's effectiveMinResource} float result = resourceCalculator.divide(clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), + usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label), parent.getQueueResourceQuotas().getEffectiveMinResource(label)); queueCapacities.setCapacity(label, Float.isInfinite(result) ? 0 : result); @@ -1543,7 +1524,7 @@ void deriveCapacityFromAbsoluteConfigurations(String label, // and the recently changed queue maxResources. // maxCapacity = effectiveMaxResource / parent's effectiveMaxResource result = resourceCalculator.divide(clusterResource, - queueResourceQuotas.getEffectiveMaxResource(label), + usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(label), parent.getQueueResourceQuotas().getEffectiveMaxResource(label)); queueCapacities.setMaximumCapacity(label, Float.isInfinite(result) ? 0 : result); @@ -1577,7 +1558,7 @@ void updateEffectiveResources(Resource clusterResource) { if (getCapacityConfigType().equals( CapacityConfigType.ABSOLUTE_RESOURCE)) { newEffectiveMinResource = createNormalizedMinResource( - queueResourceQuotas.getConfiguredMinResource(label), + usageTracker.getQueueResourceQuotas().getConfiguredMinResource(label), ((ParentQueue) parent).getEffectiveMinRatioPerResource()); // Max resource of a queue should be the minimum of {parent's maxResources, @@ -1597,9 +1578,9 @@ void updateEffectiveResources(Resource clusterResource) { } // Update the effective min - queueResourceQuotas.setEffectiveMinResource(label, + usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, newEffectiveMinResource); - queueResourceQuotas.setEffectiveMaxResource(label, + usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, newEffectiveMaxResource); if (LOG.isDebugEnabled()) { @@ -1667,7 +1648,7 @@ public boolean isInactiveDynamicQueue() { public void updateLastSubmittedTimeStamp() { writeLock.lock(); try { - this.lastSubmittedTimestamp = Time.monotonicNow(); + usageTracker.setLastSubmittedTimestamp(Time.monotonicNow()); } finally { writeLock.unlock(); } @@ -1677,7 +1658,7 @@ public long getLastSubmittedTimestamp() { readLock.lock(); try { - return lastSubmittedTimestamp; + return usageTracker.getLastSubmittedTimestamp(); } finally { readLock.unlock(); } @@ -1687,7 +1668,7 @@ public long getLastSubmittedTimestamp() { public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { writeLock.lock(); try { - this.lastSubmittedTimestamp = lastSubmittedTimestamp; + usageTracker.setLastSubmittedTimestamp(lastSubmittedTimestamp); } finally { writeLock.unlock(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUsageTracker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUsageTracker.java new file mode 100644 index 0000000000..0f18e944e9 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUsageTracker.java @@ -0,0 +1,78 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueResourceQuotas; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceUsage; + +public class CSQueueUsageTracker { + private final CSQueueMetrics metrics; + private int numContainers; + + /** + * The timestamp of the last submitted application to this queue. + * Only applies to dynamic queues. + */ + private long lastSubmittedTimestamp; + + /** + * Tracks resource usage by label like used-resource / pending-resource. + */ + private final ResourceUsage queueUsage; + + private final QueueResourceQuotas queueResourceQuotas; + + public CSQueueUsageTracker(CSQueueMetrics metrics) { + this.metrics = metrics; + this.queueUsage = new ResourceUsage(); + this.queueResourceQuotas = new QueueResourceQuotas(); + } + + public int getNumContainers() { + return numContainers; + } + + public synchronized void increaseNumContainers() { + numContainers++; + } + + public synchronized void decreaseNumContainers() { + numContainers--; + } + + public CSQueueMetrics getMetrics() { + return metrics; + } + + public long getLastSubmittedTimestamp() { + return lastSubmittedTimestamp; + } + + public void setLastSubmittedTimestamp(long lastSubmittedTimestamp) { + this.lastSubmittedTimestamp = lastSubmittedTimestamp; + } + + public ResourceUsage getQueueUsage() { + return queueUsage; + } + + public QueueResourceQuotas getQueueResourceQuotas() { + return queueResourceQuotas; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index cc282ef59a..5a3784dae6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -161,7 +161,7 @@ public LeafQueue(CapacitySchedulerContext cs, super(cs, configuration, queueName, parent, old); setDynamicQueue(isDynamic); - this.usersManager = new UsersManager(metrics, this, labelManager, csContext, + this.usersManager = new UsersManager(usageTracker.getMetrics(), this, labelManager, csContext, resourceCalculator); // One time initialization is enough since it is static ordering policy @@ -299,7 +299,7 @@ protected void setupQueueConfigs(Resource clusterResource, + " [= (float)(maximumAllocationMemory - minimumAllocationMemory) / " + "maximumAllocationMemory ]" + "\n" + "maximumAllocation = " + maximumAllocation + " [= configuredMaxAllocation ]" + "\n" - + "numContainers = " + numContainers + + "numContainers = " + usageTracker.getNumContainers() + " [= currentNumContainers ]" + "\n" + "state = " + getState() + " [= configuredState ]" + "\n" + "acls = " + aclsString + " [= configuredAcls ]" + "\n" @@ -483,7 +483,7 @@ public String toString() { try { return getQueuePath() + ": " + getCapacityOrWeightString() + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() - + ", " + "usedResources=" + queueUsage.getUsed() + ", " + + ", " + "usedResources=" + usageTracker.getQueueUsage().getUsed() + ", " + "usedCapacity=" + getUsedCapacity() + ", " + "absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + ", " + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers() + ", " @@ -600,7 +600,7 @@ public void submitApplicationAttempt(FiCaSchedulerApp application, if (!isMoveApp) { boolean unmanagedAM = application.getAppSchedulingInfo() != null && application.getAppSchedulingInfo().isUnmanagedAM(); - metrics.submitAppAttempt(userName, unmanagedAM); + usageTracker.getMetrics().submitAppAttempt(userName, unmanagedAM); } getParent().submitApplicationAttempt(application, userName); @@ -674,11 +674,11 @@ public void validateSubmitApplication(ApplicationId applicationId, } public Resource getAMResourceLimit() { - return queueUsage.getAMLimit(); + return usageTracker.getQueueUsage().getAMLimit(); } public Resource getAMResourceLimitPerPartition(String nodePartition) { - return queueUsage.getAMLimit(nodePartition); + return usageTracker.getQueueUsage().getAMLimit(nodePartition); } @VisibleForTesting @@ -751,7 +751,7 @@ public Resource getUserAMResourceLimitPerPartition( Resources.min(resourceCalculator, lastClusterResource, preWeighteduserAMLimit, Resources.clone(getAMResourceLimitPerPartition(nodePartition))); - queueUsage.setUserAMLimit(nodePartition, preWeighteduserAMLimit); + usageTracker.getQueueUsage().setUserAMLimit(nodePartition, preWeighteduserAMLimit); LOG.debug("Effective user AM limit for \"{}\":{}. Effective weighted" + " user AM limit: {}. User weight: {}", userName, @@ -803,8 +803,8 @@ public Resource calculateAndGetAMResourceLimitPerPartition( resourceCalculator, queuePartitionUsableResource, amResourcePercent, minimumAllocation); - metrics.setAMResouceLimit(nodePartition, amResouceLimit); - queueUsage.setAMLimit(nodePartition, amResouceLimit); + usageTracker.getMetrics().setAMResouceLimit(nodePartition, amResouceLimit); + usageTracker.getQueueUsage().setAMLimit(nodePartition, amResouceLimit); LOG.debug("Queue: {}, node label : {}, queue partition resource : {}," + " queue current limit : {}, queue partition usable resource : {}," + " amResourceLimit : {}", getQueuePath(), nodePartition, @@ -849,7 +849,7 @@ protected void activateApplications() { // Check am resource limit. Resource amIfStarted = Resources.add( application.getAMResource(partitionName), - queueUsage.getAMUsed(partitionName)); + usageTracker.getQueueUsage().getAMUsed(partitionName)); if (LOG.isDebugEnabled()) { LOG.debug("application " + application.getId() + " AMResource " @@ -863,7 +863,7 @@ protected void activateApplications() { if (!resourceCalculator.fitsIn(amIfStarted, amLimit)) { if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( resourceCalculator, lastClusterResource, - queueUsage.getAMUsed(partitionName), Resources.none()))) { + usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) { LOG.warn("maximum-am-resource-percent is insufficient to start a" + " single application in queue, it is likely set too low." + " skipping enforcement to allow at least one application" @@ -895,7 +895,7 @@ protected void activateApplications() { if (!resourceCalculator.fitsIn(userAmIfStarted, userAMLimit)) { if (getNumActiveApplications() < 1 || (Resources.lessThanOrEqual( resourceCalculator, lastClusterResource, - queueUsage.getAMUsed(partitionName), Resources.none()))) { + usageTracker.getQueueUsage().getAMUsed(partitionName), Resources.none()))) { LOG.warn("maximum-am-resource-percent is insufficient to start a" + " single application in queue for user, it is likely set too" + " low. skipping enforcement to allow at least one application" @@ -913,14 +913,14 @@ protected void activateApplications() { orderingPolicy.addSchedulableEntity(application); application.updateAMContainerDiagnostics(AMState.ACTIVATED, null); - queueUsage.incAMUsed(partitionName, + usageTracker.getQueueUsage().incAMUsed(partitionName, application.getAMResource(partitionName)); user.getResourceUsage().incAMUsed(partitionName, application.getAMResource(partitionName)); user.getResourceUsage().setAMLimit(partitionName, userAMLimit); - metrics.incAMUsed(partitionName, application.getUser(), + usageTracker.getMetrics().incAMUsed(partitionName, application.getUser(), application.getAMResource(partitionName)); - metrics.setAMResouceLimitForUser(partitionName, + usageTracker.getMetrics().setAMResouceLimitForUser(partitionName, application.getUser(), userAMLimit); fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application @@ -1021,11 +1021,11 @@ private void removeApplicationAttempt( if (!wasActive) { pendingOrderingPolicy.removeSchedulableEntity(application); } else{ - queueUsage.decAMUsed(partitionName, + usageTracker.getQueueUsage().decAMUsed(partitionName, application.getAMResource(partitionName)); user.getResourceUsage().decAMUsed(partitionName, application.getAMResource(partitionName)); - metrics.decAMUsed(partitionName, application.getUser(), + usageTracker.getMetrics().decAMUsed(partitionName, application.getUser(), application.getAMResource(partitionName)); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -1058,11 +1058,11 @@ private FiCaSchedulerApp getApplication( private void setPreemptionAllowed(ResourceLimits limits, String nodePartition) { // Set preemption-allowed: // For leaf queue, only under-utilized queue is allowed to preempt resources from other queues - if (!queueResourceQuotas.getEffectiveMinResource(nodePartition) + if (!usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition) .equals(Resources.none())) { limits.setIsAllowPreemption(Resources.lessThan(resourceCalculator, - csContext.getClusterResource(), queueUsage.getUsed(nodePartition), - queueResourceQuotas.getEffectiveMinResource(nodePartition))); + csContext.getClusterResource(), usageTracker.getQueueUsage().getUsed(nodePartition), + usageTracker.getQueueResourceQuotas().getEffectiveMinResource(nodePartition))); return; } @@ -1514,7 +1514,7 @@ private Resource getHeadroom(User user, Resources.subtractNonNegative(userLimitResource, user.getUsed(partition)), Resources.subtractNonNegative(currentPartitionResourceLimit, - queueUsage.getUsed(partition))); + usageTracker.getQueueUsage().getUsed(partition))); // Normalize it before return headroom = Resources.roundDown(resourceCalculator, headroom, minimumAllocation); @@ -1560,7 +1560,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, setQueueResourceLimitsInfo(clusterResource); Resource headroom = - metrics.getUserMetrics(user) == null ? Resources.none() : + usageTracker.getMetrics().getUserMetrics(user) == null ? Resources.none() : getHeadroom(queueUser, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, userLimit, nodePartition); @@ -1577,7 +1577,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, application.setHeadroomProvider(headroomProvider); - metrics.setAvailableResourcesToUser(nodePartition, user, headroom); + usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, user, headroom); return userLimit; } @@ -1844,20 +1844,20 @@ void allocateResource(Resource clusterResource, nodePartition, true); Resource partitionHeadroom = Resources.createResource(0, 0); - if (metrics.getUserMetrics(userName) != null) { + if (usageTracker.getMetrics().getUserMetrics(userName) != null) { partitionHeadroom = getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, getResourceLimitForActiveUsers(userName, clusterResource, nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition); } - metrics.setAvailableResourcesToUser(nodePartition, userName, + usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug(getQueuePath() + " user=" + userName + " used=" - + queueUsage.getUsed(nodePartition) + " numContainers=" - + numContainers + " headroom = " + application.getHeadroom() + + usageTracker.getQueueUsage().getUsed(nodePartition) + " numContainers=" + + usageTracker.getNumContainers() + " headroom = " + application.getHeadroom() + " user-resources=" + user.getUsed()); } } finally { @@ -1892,20 +1892,20 @@ void releaseResource(Resource clusterResource, nodePartition, false); Resource partitionHeadroom = Resources.createResource(0, 0); - if (metrics.getUserMetrics(userName) != null) { + if (usageTracker.getMetrics().getUserMetrics(userName) != null) { partitionHeadroom = getHeadroom(user, cachedResourceLimitsForHeadroom.getLimit(), clusterResource, getResourceLimitForActiveUsers(userName, clusterResource, nodePartition, SchedulingMode.RESPECT_PARTITION_EXCLUSIVITY), nodePartition); } - metrics.setAvailableResourcesToUser(nodePartition, userName, + usageTracker.getMetrics().setAvailableResourcesToUser(nodePartition, userName, partitionHeadroom); if (LOG.isDebugEnabled()) { LOG.debug( - getQueuePath() + " used=" + queueUsage.getUsed() + " numContainers=" - + numContainers + " user=" + userName + " user-resources=" + getQueuePath() + " used=" + usageTracker.getQueueUsage().getUsed() + " numContainers=" + + usageTracker.getNumContainers() + " user=" + userName + " user-resources=" + user.getUsed()); } } finally { @@ -2001,7 +2001,7 @@ public void incAMUsedResource(String nodeLabel, Resource resourceToInc, getUser(application.getUser()).getResourceUsage().incAMUsed(nodeLabel, resourceToInc); // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.incAMUsed(nodeLabel, resourceToInc); + usageTracker.getQueueUsage().incAMUsed(nodeLabel, resourceToInc); } public void decAMUsedResource(String nodeLabel, Resource resourceToDec, @@ -2009,7 +2009,7 @@ public void decAMUsedResource(String nodeLabel, Resource resourceToDec, getUser(application.getUser()).getResourceUsage().decAMUsed(nodeLabel, resourceToDec); // ResourceUsage has its own lock, no addition lock needs here. - queueUsage.decAMUsed(nodeLabel, resourceToDec); + usageTracker.getQueueUsage().decAMUsed(nodeLabel, resourceToDec); } @Override @@ -2159,7 +2159,7 @@ public void attachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveIn=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + queueUsage.getUsed() + " cluster=" + clusterResource); + + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().attachContainer(clusterResource, application, rmContainer); } @@ -2179,7 +2179,7 @@ public void detachContainer(Resource clusterResource, + " resource=" + rmContainer.getContainer().getResource() + " queueMoveOut=" + this + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + queueUsage.getUsed() + " cluster=" + clusterResource); + + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource); // Inform the parent queue getParent().detachContainer(clusterResource, application, rmContainer); } @@ -2347,6 +2347,7 @@ static class CachedUserLimit { private void updateQueuePreemptionMetrics(RMContainer rmc) { final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND; + CSQueueMetrics metrics = usageTracker.getMetrics(); Resource containerResource = rmc.getAllocatedResource(); metrics.preemptContainer(); long mbSeconds = (containerResource.getMemorySize() * usedMillis) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java index 3a25a17d97..8ef5ba2b1e 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/ParentQueue.java @@ -328,7 +328,7 @@ void setChildQueues(Collection childQueues) throws IOException { Resource resourceByLabel = labelManager.getResourceByLabel(nodeLabel, scheduler.getClusterResource()); Resource parentMinResource = - queueResourceQuotas.getConfiguredMinResource(nodeLabel); + usageTracker.getQueueResourceQuotas().getConfiguredMinResource(nodeLabel); if (!parentMinResource.equals(Resources.none()) && Resources.lessThan( resourceCalculator, resourceByLabel, parentMinResource, minRes)) { throw new IOException( @@ -475,9 +475,9 @@ public String toString() { "numChildQueue= " + childQueues.size() + ", " + getCapacityOrWeightString() + ", " + "absoluteCapacity=" + queueCapacities.getAbsoluteCapacity() + ", " + - "usedResources=" + queueUsage.getUsed() + - "usedCapacity=" + getUsedCapacity() + ", " + - "numApps=" + getNumApplications() + ", " + + "usedResources=" + usageTracker.getQueueUsage().getUsed() + + "usedCapacity=" + getUsedCapacity() + ", " + + "numApps=" + getNumApplications() + ", " + "numContainers=" + getNumContainers(); } @@ -992,7 +992,7 @@ public CSAssignment assignContainers(Resource clusterResource, LOG.debug("assignedContainer reserved=" + isReserved + " queue=" + getQueuePath() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" + getAbsoluteUsedCapacity() + " used=" - + queueUsage.getUsed() + " cluster=" + clusterResource); + + usageTracker.getQueueUsage().getUsed() + " cluster=" + clusterResource); LOG.debug( "ParentQ=" + getQueuePath() + " assignedSoFarInThisIteration=" @@ -1065,7 +1065,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, // First, cap parent limit by parent's max parentLimits.setLimit(Resources.min(resourceCalculator, clusterResource, parentLimits.getLimit(), - queueResourceQuotas.getEffectiveMaxResource(nodePartition))); + usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition))); // Parent available resource = parent-limit - parent-used-resource Resource limit = parentLimits.getLimit(); @@ -1073,7 +1073,7 @@ private ResourceLimits getResourceLimitsOfChild(CSQueue child, limit = parentLimits.getNetLimit(); } Resource parentMaxAvailableResource = Resources.subtract( - limit, queueUsage.getUsed(nodePartition)); + limit, usageTracker.getQueueUsage().getUsed(nodePartition)); // Deduct killable from used Resources.addTo(parentMaxAvailableResource, @@ -1332,9 +1332,9 @@ private void calculateEffectiveResourcesAndCapacity(String label, } } else { if (Resources.lessThan(resourceCalculator, clusterResource, - queueResourceQuotas.getEffectiveMinResource(label), + usageTracker.getQueueResourceQuotas().getEffectiveMinResource(label), configuredMinResources)) { - numeratorForMinRatio = queueResourceQuotas + numeratorForMinRatio = usageTracker.getQueueResourceQuotas() .getEffectiveMinResource(label); } } @@ -1344,8 +1344,8 @@ private void calculateEffectiveResourcesAndCapacity(String label, // Update effective resources for my self; if (rootQueue) { - queueResourceQuotas.setEffectiveMinResource(label, resourceByLabel); - queueResourceQuotas.setEffectiveMaxResource(label, resourceByLabel); + usageTracker.getQueueResourceQuotas().setEffectiveMinResource(label, resourceByLabel); + usageTracker.getQueueResourceQuotas().setEffectiveMaxResource(label, resourceByLabel); } else{ super.updateEffectiveResources(clusterResource); } @@ -1442,8 +1442,8 @@ public void attachContainer(Resource clusterResource, .getResource(), node.getPartition()); LOG.info("movedContainer" + " queueMoveIn=" + getQueuePath() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" - + clusterResource); + + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() + + " cluster=" + clusterResource); // Inform the parent if (parent != null) { parent.attachContainer(clusterResource, application, rmContainer); @@ -1462,8 +1462,8 @@ public void detachContainer(Resource clusterResource, node.getPartition()); LOG.info("movedContainer" + " queueMoveOut=" + getQueuePath() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + " cluster=" - + clusterResource); + + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() + + " cluster=" + clusterResource); // Inform the parent if (parent != null) { parent.detachContainer(clusterResource, application, rmContainer); @@ -1507,11 +1507,11 @@ void allocateResource(Resource clusterResource, * When this happens, we have to preempt killable container (on same or different * nodes) of parent queue to avoid violating parent's max resource. */ - if (!queueResourceQuotas.getEffectiveMaxResource(nodePartition) + if (!usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition) .equals(Resources.none())) { if (Resources.lessThan(resourceCalculator, clusterResource, - queueResourceQuotas.getEffectiveMaxResource(nodePartition), - queueUsage.getUsed(nodePartition))) { + usageTracker.getQueueResourceQuotas().getEffectiveMaxResource(nodePartition), + usageTracker.getQueueUsage().getUsed(nodePartition))) { killContainersToEnforceMaxQueueCapacity(nodePartition, clusterResource); } @@ -1541,7 +1541,7 @@ private void killContainersToEnforceMaxQueueCapacity(String partition, Resource maxResource = getEffectiveMaxCapacity(partition); while (Resources.greaterThan(resourceCalculator, partitionResource, - queueUsage.getUsed(partition), maxResource)) { + usageTracker.getQueueUsage().getUsed(partition), maxResource)) { RMContainer toKillContainer = killableContainerIter.next(); FiCaSchedulerApp attempt = csContext.getApplicationAttempt( toKillContainer.getContainerId().getApplicationAttemptId()); @@ -1584,7 +1584,7 @@ public void apply(Resource cluster, LOG.info("assignedContainer" + " queue=" + getQueuePath() + " usedCapacity=" + getUsedCapacity() + " absoluteUsedCapacity=" - + getAbsoluteUsedCapacity() + " used=" + queueUsage.getUsed() + + getAbsoluteUsedCapacity() + " used=" + usageTracker.getQueueUsage().getUsed() + " cluster=" + cluster); } finally { writeLock.unlock(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java index 97f9a652c1..7114728fc6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestAbsoluteResourceConfiguration.java @@ -222,13 +222,13 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue() new ResourceLimits(cs.getClusterResource())); Assert.assertEquals(QUEUE_D_TEMPL_MINRES, - d1.queueResourceQuotas.getConfiguredMinResource()); + d1.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals(QUEUE_D_TEMPL_MINRES, - d1.queueResourceQuotas.getEffectiveMinResource()); + d1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d1.queueResourceQuotas.getConfiguredMaxResource()); + d1.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d1.queueResourceQuotas.getEffectiveMaxResource()); + d1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); /** * After adding child queue d2, d1 + d2 > resource @@ -243,22 +243,22 @@ public void testSimpleMinMaxResourceConfigurartionPerQueue() new ResourceLimits(cs.getClusterResource())); Assert.assertEquals(Resource.newInstance(0, 0), - d2.queueResourceQuotas.getConfiguredMinResource()); + d2.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals(Resource.newInstance(0, 0), - d2.queueResourceQuotas.getEffectiveMinResource()); + d2.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d2.queueResourceQuotas.getConfiguredMaxResource()); + d2.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d2.queueResourceQuotas.getEffectiveMaxResource()); + d2.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals(QUEUE_D_TEMPL_MINRES, - d1.queueResourceQuotas.getConfiguredMinResource()); + d1.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals(QUEUE_D_TEMPL_MINRES, - d1.queueResourceQuotas.getEffectiveMinResource()); + d1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d1.queueResourceQuotas.getConfiguredMaxResource()); + d1.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals(QUEUE_D_TEMPL_MAXRES, - d1.queueResourceQuotas.getEffectiveMaxResource()); + d1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); rm.close(); } @@ -287,35 +287,35 @@ public void testEffectiveMinMaxResourceConfigurartionPerQueue() LeafQueue qA = (LeafQueue) cs.getQueue(QUEUEA); Assert.assertNotNull(qA); Assert.assertEquals("Min resource configured for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qB = (LeafQueue) cs.getQueue(QUEUEB); Assert.assertNotNull(qB); Assert.assertEquals("Min resource configured for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC); Assert.assertNotNull(qC); Assert.assertEquals("Min resource configured for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); rm.stop(); } @@ -378,37 +378,37 @@ public void testSimpleValidateAbsoluteResourceConfig() throws Exception { LeafQueue qA1 = (LeafQueue) cs.getQueue(QUEUEA1); Assert.assertEquals("Effective Min resource for QUEUEA1 is not correct", - QUEUE_A1_MINRES, qA1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A1_MINRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA1 is not correct", - QUEUE_A_MAXRES, qA1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qA2 = (LeafQueue) cs.getQueue(QUEUEA2); Assert.assertEquals("Effective Min resource for QUEUEA2 is not correct", - QUEUE_A2_MINRES, qA2.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A2_MINRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA2 is not correct", - QUEUE_A_MAXRES, qA2.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qB1 = (LeafQueue) cs.getQueue(QUEUEB1); Assert.assertNotNull(qB1); Assert.assertEquals("Min resource configured for QUEUEB1 is not correct", - QUEUE_B1_MINRES, qB1.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_B1_MINRES, qB1.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEB1 is not correct", - QUEUE_B_MAXRES, qB1.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_B_MAXRES, qB1.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB1 is not correct", - QUEUE_B1_MINRES, qB1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B1_MINRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB1 is not correct", - QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC); Assert.assertNotNull(qC); Assert.assertEquals("Min resource configured for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); // 3. Create a new config and make sure one queue's min resource is more // than its max resource configured. @@ -597,35 +597,35 @@ public void testEffectiveResourceAfterReducingClusterResource() LeafQueue qA = (LeafQueue) cs.getQueue(QUEUEA); Assert.assertNotNull(qA); Assert.assertEquals("Min resource configured for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qB = (LeafQueue) cs.getQueue(QUEUEB); Assert.assertNotNull(qB); Assert.assertEquals("Min resource configured for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC); Assert.assertNotNull(qC); Assert.assertEquals("Min resource configured for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); // unregister one NM. rm.unRegisterNode(nm1); @@ -634,19 +634,19 @@ public void testEffectiveResourceAfterReducingClusterResource() // above half. Hence A's min will be 60Gi and 6 cores and max will be // 128GB and 20 cores. Assert.assertEquals("Effective Min resource for QUEUEA is not correct", - QUEUEA_REDUCED, qA.queueResourceQuotas.getEffectiveMinResource()); + QUEUEA_REDUCED, qA.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA is not correct", - QUEUEMAX_REDUCED, qA.queueResourceQuotas.getEffectiveMaxResource()); + QUEUEMAX_REDUCED, qA.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB is not correct", - QUEUEB_REDUCED, qB.queueResourceQuotas.getEffectiveMinResource()); + QUEUEB_REDUCED, qB.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB is not correct", - QUEUEMAX_REDUCED, qB.queueResourceQuotas.getEffectiveMaxResource()); + QUEUEMAX_REDUCED, qB.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUEC_REDUCED, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUEC_REDUCED, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUEMAX_REDUCED, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUEMAX_REDUCED, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); rm.stop(); } @@ -676,13 +676,13 @@ public void testEffectiveResourceAfterIncreasingClusterResource() ParentQueue qA = (ParentQueue) cs.getQueue(QUEUEA); Assert.assertNotNull(qA); Assert.assertEquals("Min resource configured for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA is not correct", 0.4, qA.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA is not correct", @@ -691,13 +691,13 @@ public void testEffectiveResourceAfterIncreasingClusterResource() ParentQueue qB = (ParentQueue) cs.getQueue(QUEUEB); Assert.assertNotNull(qB); Assert.assertEquals("Min resource configured for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEB is not correct", 0.2, qB.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEB is not correct", @@ -706,13 +706,13 @@ public void testEffectiveResourceAfterIncreasingClusterResource() LeafQueue qC = (LeafQueue) cs.getQueue(QUEUEC); Assert.assertNotNull(qC); Assert.assertEquals("Min resource configured for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEC is not correct", 0.1, qC.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEC is not correct", @@ -720,9 +720,9 @@ public void testEffectiveResourceAfterIncreasingClusterResource() LeafQueue qA1 = (LeafQueue) cs.getQueue(QUEUEA1); Assert.assertEquals("Effective Min resource for QUEUEA1 is not correct", - QUEUE_A1_MINRES, qA1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A1_MINRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA1 is not correct", - QUEUE_A_MAXRES, qA1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA1 is not correct", 0.2, qA1.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA1 is not correct", @@ -730,9 +730,9 @@ public void testEffectiveResourceAfterIncreasingClusterResource() LeafQueue qA2 = (LeafQueue) cs.getQueue(QUEUEA2); Assert.assertEquals("Effective Min resource for QUEUEA2 is not correct", - QUEUE_A2_MINRES, qA2.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A2_MINRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA2 is not correct", - QUEUE_A_MAXRES, qA2.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA2 is not correct", 0.2, qA2.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA2 is not correct", @@ -740,13 +740,13 @@ public void testEffectiveResourceAfterIncreasingClusterResource() LeafQueue qB1 = (LeafQueue) cs.getQueue(QUEUEB1); Assert.assertEquals("Min resource configured for QUEUEB1 is not correct", - QUEUE_B1_MINRES, qB1.queueResourceQuotas.getConfiguredMinResource()); + QUEUE_B1_MINRES, qB1.usageTracker.getQueueResourceQuotas().getConfiguredMinResource()); Assert.assertEquals("Max resource configured for QUEUEB1 is not correct", - QUEUE_B_MAXRES, qB1.queueResourceQuotas.getConfiguredMaxResource()); + QUEUE_B_MAXRES, qB1.usageTracker.getQueueResourceQuotas().getConfiguredMaxResource()); Assert.assertEquals("Effective Min resource for QUEUEB1 is not correct", - QUEUE_B1_MINRES, qB1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B1_MINRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB1 is not correct", - QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEB1 is not correct", 0.16, qB1.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEB1 is not correct", @@ -759,54 +759,54 @@ public void testEffectiveResourceAfterIncreasingClusterResource() // Since configured capacity was based on initial node capacity, a // re configurations is needed to use this added capacity. Assert.assertEquals("Effective Min resource for QUEUEA is not correct", - QUEUE_A_MINRES, qA.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A_MINRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA is not correct", - QUEUE_A_MAXRES, qA.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA is not correct", 0.266, qA.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA is not correct", 0.533, qA.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals("Effective Min resource for QUEUEB is not correct", - QUEUE_B_MINRES, qB.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B_MINRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB is not correct", - QUEUE_B_MAXRES, qB.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEB is not correct", 0.133, qB.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEB is not correct", 0.4, qB.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals("Effective Min resource for QUEUEC is not correct", - QUEUE_C_MINRES, qC.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_C_MINRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEC is not correct", - QUEUE_C_MAXRES, qC.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_C_MAXRES, qC.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEC is not correct", 0.066, qC.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEC is not correct", 0.4, qC.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals("Effective Min resource for QUEUEB1 is not correct", - QUEUE_B1_MINRES, qB1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_B1_MINRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEB1 is not correct", - QUEUE_B_MAXRES, qB1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_B_MAXRES, qB1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEB1 is not correct", 0.106, qB1.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEB1 is not correct", 0.4, qB1.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals("Effective Min resource for QUEUEA1 is not correct", - QUEUE_A1_MINRES, qA1.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A1_MINRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA1 is not correct", - QUEUE_A_MAXRES, qA1.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA1.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA1 is not correct", 0.133, qA1.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA1 is not correct", 0.533, qA1.getAbsoluteMaximumCapacity(), DELTA); Assert.assertEquals("Effective Min resource for QUEUEA2 is not correct", - QUEUE_A2_MINRES, qA2.queueResourceQuotas.getEffectiveMinResource()); + QUEUE_A2_MINRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMinResource()); Assert.assertEquals("Effective Max resource for QUEUEA2 is not correct", - QUEUE_A_MAXRES, qA2.queueResourceQuotas.getEffectiveMaxResource()); + QUEUE_A_MAXRES, qA2.usageTracker.getQueueResourceQuotas().getEffectiveMaxResource()); Assert.assertEquals("Absolute capacity for QUEUEA2 is not correct", 0.133, qA2.getAbsoluteCapacity(), DELTA); Assert.assertEquals("Absolute Max capacity for QUEUEA2 is not correct",