diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java index 4de5eace54..851c9f5587 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/AppSchedulingInfo.java @@ -18,6 +18,19 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import java.util.ArrayList; +import java.util.Collection; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentSkipListSet; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -31,27 +44,12 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.SchedulingMode; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.LocalitySchedulingPlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.ResourceRequestUpdateResult; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; - import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.util.resource.Resources; - -import java.util.ArrayList; -import java.util.Collection; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.ConcurrentSkipListSet; -import java.util.concurrent.atomic.AtomicBoolean; -import java.util.concurrent.atomic.AtomicLong; -import java.util.concurrent.locks.ReentrantReadWriteLock; /** * This class keeps track of all the consumption of an application. This also * keeps track of current running/completed containers for the application. @@ -260,10 +258,13 @@ private void updatePendingResources(ResourceRequest lastRequest, Resource lastRequestCapability = lastRequest != null ? lastRequest.getCapability() : Resources.none(); - metrics.incrPendingResources(user, + metrics.incrPendingResources(request.getNodeLabelExpression(), user, request.getNumContainers(), request.getCapability()); - metrics.decrPendingResources(user, - lastRequestContainers, lastRequestCapability); + + if(lastRequest != null) { + metrics.decrPendingResources(lastRequest.getNodeLabelExpression(), user, + lastRequestContainers, lastRequestCapability); + } // update queue: Resource increasedResource = @@ -419,7 +420,7 @@ public List allocate(NodeType type, writeLock.lock(); if (null != containerAllocated) { - updateMetricsForAllocatedContainer(type, containerAllocated); + updateMetricsForAllocatedContainer(type, node, containerAllocated); } return schedulerKeyToPlacementSets.get(schedulerKey).allocate( @@ -443,10 +444,12 @@ public void move(Queue newQueue) { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { - oldMetrics.decrPendingResources(user, ask.getCount(), - ask.getPerAllocationResource()); - newMetrics.incrPendingResources(user, ask.getCount(), - ask.getPerAllocationResource()); + oldMetrics.decrPendingResources( + ps.getPrimaryRequestedNodePartition(), + user, ask.getCount(), ask.getPerAllocationResource()); + newMetrics.incrPendingResources( + ps.getPrimaryRequestedNodePartition(), + user, ask.getCount(), ask.getPerAllocationResource()); Resource delta = Resources.multiply(ask.getPerAllocationResource(), ask.getCount()); @@ -476,8 +479,8 @@ public void stop() { for (SchedulingPlacementSet ps : schedulerKeyToPlacementSets.values()) { PendingAsk ask = ps.getPendingAsk(ResourceRequest.ANY); if (ask.getCount() > 0) { - metrics.decrPendingResources(user, ask.getCount(), - ask.getPerAllocationResource()); + metrics.decrPendingResources(ps.getPrimaryRequestedNodePartition(), + user, ask.getCount(), ask.getPerAllocationResource()); // Update Queue queue.decPendingResource( @@ -537,8 +540,8 @@ public void recoverContainer(RMContainer rmContainer) { return; } - metrics.allocateResources(user, 1, rmContainer.getAllocatedResource(), - false); + metrics.allocateResources(rmContainer.getNodeLabelExpression(), + user, 1, rmContainer.getAllocatedResource(), false); } finally { this.writeLock.unlock(); } @@ -562,8 +565,8 @@ public boolean checkAllocation(NodeType type, SchedulerNode node, } } - private void updateMetricsForAllocatedContainer( - NodeType type, Container containerAllocated) { + private void updateMetricsForAllocatedContainer(NodeType type, + SchedulerNode node, Container containerAllocated) { QueueMetrics metrics = queue.getMetrics(); if (pending) { // once an allocation is done we assume the application is @@ -579,8 +582,10 @@ private void updateMetricsForAllocatedContainer( + containerAllocated.getResource() + " type=" + type); } - metrics.allocateResources(user, 1, containerAllocated.getResource(), - true); + if(node != null) { + metrics.allocateResources(node.getPartition(), user, 1, + containerAllocated.getResource(), true); + } metrics.incrNodeTypeAggregations(user, type); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 9a57876d54..eafe8edfcc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -43,6 +43,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.slf4j.Logger; @@ -59,38 +60,45 @@ public class QueueMetrics implements MetricsSource { @Metric("# of apps completed") MutableCounterInt appsCompleted; @Metric("# of apps killed") MutableCounterInt appsKilled; @Metric("# of apps failed") MutableCounterInt appsFailed; - - @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB; - @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; - @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; - @Metric("Aggregate # of allocated containers") MutableCounterLong aggregateContainersAllocated; @Metric("Aggregate # of allocated node-local containers") MutableCounterLong aggregateNodeLocalContainersAllocated; @Metric("Aggregate # of allocated rack-local containers") MutableCounterLong aggregateRackLocalContainersAllocated; @Metric("Aggregate # of allocated off-switch containers") MutableCounterLong aggregateOffSwitchContainersAllocated; - @Metric("Aggregate # of released containers") MutableCounterLong aggregateContainersReleased; @Metric("Aggregate # of preempted containers") MutableCounterLong aggregateContainersPreempted; + @Metric("# of active users") MutableGaugeInt activeUsers; + @Metric("# of active applications") MutableGaugeInt activeApplications; + @Metric("App Attempt First Container Allocation Delay") + MutableRate appAttemptFirstContainerAllocationDelay; + + //Metrics updated only for "default" partition + @Metric("Allocated memory in MB") MutableGaugeLong allocatedMB; + @Metric("Allocated CPU in virtual cores") MutableGaugeInt allocatedVCores; + @Metric("# of allocated containers") MutableGaugeInt allocatedContainers; + @Metric("Aggregate # of allocated containers") + MutableCounterLong aggregateContainersAllocated; + @Metric("Aggregate # of released containers") + MutableCounterLong aggregateContainersReleased; @Metric("Available memory in MB") MutableGaugeLong availableMB; @Metric("Available CPU in virtual cores") MutableGaugeInt availableVCores; @Metric("Pending memory allocation in MB") MutableGaugeLong pendingMB; - @Metric("Pending CPU allocation in virtual cores") MutableGaugeInt pendingVCores; + @Metric("Pending CPU allocation in virtual cores") + MutableGaugeInt pendingVCores; @Metric("# of pending containers") MutableGaugeInt pendingContainers; @Metric("# of reserved memory in MB") MutableGaugeLong reservedMB; @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; - @Metric("# of active users") MutableGaugeInt activeUsers; - @Metric("# of active applications") MutableGaugeInt activeApplications; - @Metric("App Attempt First Container Allocation Delay") MutableRate appAttemptFirstContainerAllocationDelay; + private final MutableGaugeInt[] runningTime; private TimeBucketMetrics runBuckets; static final Logger LOG = LoggerFactory.getLogger(QueueMetrics.class); static final MetricsInfo RECORD_INFO = info("QueueMetrics", "Metrics for the resource scheduler"); - protected static final MetricsInfo QUEUE_INFO = info("Queue", "Metrics by queue"); + protected static final MetricsInfo QUEUE_INFO = + info("Queue", "Metrics by queue"); protected static final MetricsInfo USER_INFO = info("User", "Metrics by user"); static final Splitter Q_SPLITTER = @@ -334,41 +342,61 @@ public void moveAppTo(AppSchedulingInfo app) { /** * Set available resources. To be called by scheduler periodically as * resources become available. + * @param partition Node Partition * @param limit resource limit */ - public void setAvailableResourcesToQueue(Resource limit) { - availableMB.set(limit.getMemorySize()); - availableVCores.set(limit.getVirtualCores()); + public void setAvailableResourcesToQueue(String partition, Resource limit) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + availableMB.set(limit.getMemorySize()); + availableVCores.set(limit.getVirtualCores()); + } } /** * Set available resources. To be called by scheduler periodically as * resources become available. + * @param limit resource limit + */ + public void setAvailableResourcesToQueue(Resource limit) { + this.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, limit); + } + + /** + * Set available resources. To be called by scheduler periodically as + * resources become available. + * @param partition Node Partition * @param user * @param limit resource limit */ - public void setAvailableResourcesToUser(String user, Resource limit) { - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.setAvailableResourcesToQueue(limit); + public void setAvailableResourcesToUser(String partition, + String user, Resource limit) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.setAvailableResourcesToQueue(partition, limit); + } } } /** * Increment pending resource metrics + * @param partition Node Partition * @param user * @param containers * @param res the TOTAL delta of resources note this is different from * the other APIs which use per container resource */ - public void incrPendingResources(String user, int containers, Resource res) { - _incrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.incrPendingResources(user, containers, res); - } - if (parent != null) { - parent.incrPendingResources(user, containers, res); + public void incrPendingResources(String partition, String user, + int containers, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + _incrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.incrPendingResources(partition, user, containers, res); + } } } @@ -378,14 +406,18 @@ private void _incrPendingResources(int containers, Resource res) { pendingVCores.incr(res.getVirtualCores() * containers); } - public void decrPendingResources(String user, int containers, Resource res) { - _decrPendingResources(containers, res); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decrPendingResources(user, containers, res); - } - if (parent != null) { - parent.decrPendingResources(user, containers, res); + + public void decrPendingResources(String partition, String user, + int containers, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + _decrPendingResources(containers, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decrPendingResources(partition, user, containers, res); + } + if (parent != null) { + parent.decrPendingResources(partition, user, containers, res); + } } } @@ -414,58 +446,66 @@ public void incrNodeTypeAggregations(String user, NodeType type) { } } - public void allocateResources(String user, int containers, Resource res, - boolean decrPending) { - allocatedContainers.incr(containers); - aggregateContainersAllocated.incr(containers); + public void allocateResources(String partition, String user, + int containers, Resource res, boolean decrPending) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + allocatedContainers.incr(containers); + aggregateContainersAllocated.incr(containers); - allocatedMB.incr(res.getMemorySize() * containers); - allocatedVCores.incr(res.getVirtualCores() * containers); - if (decrPending) { - _decrPendingResources(containers, res); - } - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(user, containers, res, decrPending); - } - if (parent != null) { - parent.allocateResources(user, containers, res, decrPending); + allocatedMB.incr(res.getMemorySize() * containers); + allocatedVCores.incr(res.getVirtualCores() * containers); + if (decrPending) { + _decrPendingResources(containers, res); + } + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, + containers, res, decrPending); + } + if (parent != null) { + parent.allocateResources(partition, user, containers, res, decrPending); + } } } /** * Allocate Resource for container size change. - * + * @param partition Node Partition * @param user * @param res */ - public void allocateResources(String user, Resource res) { - allocatedMB.incr(res.getMemorySize()); - allocatedVCores.incr(res.getVirtualCores()); + public void allocateResources(String partition, String user, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + allocatedMB.incr(res.getMemorySize()); + allocatedVCores.incr(res.getVirtualCores()); - pendingMB.decr(res.getMemorySize()); - pendingVCores.decr(res.getVirtualCores()); + pendingMB.decr(res.getMemorySize()); + pendingVCores.decr(res.getVirtualCores()); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.allocateResources(user, res); - } - if (parent != null) { - parent.allocateResources(user, res); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.allocateResources(partition, user, res); + } + if (parent != null) { + parent.allocateResources(partition, user, res); + } } } - public void releaseResources(String user, int containers, Resource res) { - allocatedContainers.decr(containers); - aggregateContainersReleased.incr(containers); - allocatedMB.decr(res.getMemorySize() * containers); - allocatedVCores.decr(res.getVirtualCores() * containers); - QueueMetrics userMetrics = getUserMetrics(user); - if (userMetrics != null) { - userMetrics.releaseResources(user, containers, res); - } - if (parent != null) { - parent.releaseResources(user, containers, res); + public void releaseResources(String partition, + String user, int containers, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + allocatedContainers.decr(containers); + aggregateContainersReleased.incr(containers); + allocatedMB.decr(res.getMemorySize() * containers); + allocatedVCores.decr(res.getVirtualCores() * containers); + QueueMetrics userMetrics = getUserMetrics(user); + if (userMetrics != null) { + userMetrics.releaseResources(partition, user, containers, res); + } + if (parent != null) { + parent.releaseResources(partition, user, containers, res); + } } } @@ -494,6 +534,12 @@ public void preemptContainer() { } } + public void reserveResource(String partition, String user, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + reserveResource(user, res); + } + } + public void reserveResource(String user, Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); @@ -520,6 +566,12 @@ public void unreserveResource(String user, Resource res) { } } + public void unreserveResource(String partition, String user, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + unreserveResource(user, res); + } + } + public void incrActiveUsers() { activeUsers.incr(); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java index 294897f5c3..4b0bf91e25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/SchedulerApplicationAttempt.java @@ -1073,15 +1073,19 @@ public void move(Queue newQueue) { for (RMContainer liveContainer : liveContainers.values()) { Resource resource = liveContainer.getContainer().getResource(); ((RMContainerImpl) liveContainer).setQueueName(newQueueName); - oldMetrics.releaseResources(user, 1, resource); - newMetrics.allocateResources(user, 1, resource, false); + oldMetrics.releaseResources(liveContainer.getNodeLabelExpression(), + user, 1, resource); + newMetrics.allocateResources(liveContainer.getNodeLabelExpression(), + user, 1, resource, false); } for (Map map : reservedContainers.values()) { for (RMContainer reservedContainer : map.values()) { ((RMContainerImpl) reservedContainer).setQueueName(newQueueName); Resource resource = reservedContainer.getReservedResource(); - oldMetrics.unreserveResource(user, resource); - newMetrics.reserveResource(user, resource); + oldMetrics.unreserveResource( + reservedContainer.getNodeLabelExpression(), user, resource); + newMetrics.reserveResource( + reservedContainer.getNodeLabelExpression(), user, resource); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index c4d19340c1..87fc23458a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -26,12 +26,14 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeFloat; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.Queue; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; @Metrics(context = "yarn") public class CSQueueMetrics extends QueueMetrics { + //Metrics updated only for "default" partition @Metric("AM memory limit in MB") MutableGaugeLong AMResourceLimitMB; @Metric("AM CPU limit in virtual cores") @@ -66,33 +68,40 @@ public long getUsedAMResourceVCores() { return usedAMResourceVCores.value(); } - public void setAMResouceLimit(Resource res) { - AMResourceLimitMB.set(res.getMemorySize()); - AMResourceLimitVCores.set(res.getVirtualCores()); - } - - public void setAMResouceLimitForUser(String user, Resource res) { - CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); - if (userMetrics != null) { - userMetrics.setAMResouceLimit(res); + public void setAMResouceLimit(String partition, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + AMResourceLimitMB.set(res.getMemorySize()); + AMResourceLimitVCores.set(res.getVirtualCores()); } } - public void incAMUsed(String user, Resource res) { - usedAMResourceMB.incr(res.getMemorySize()); - usedAMResourceVCores.incr(res.getVirtualCores()); + public void setAMResouceLimitForUser(String partition, + String user, Resource res) { CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); if (userMetrics != null) { - userMetrics.incAMUsed(user, res); + userMetrics.setAMResouceLimit(partition, res); } } - public void decAMUsed(String user, Resource res) { - usedAMResourceMB.decr(res.getMemorySize()); - usedAMResourceVCores.decr(res.getVirtualCores()); - CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); - if (userMetrics != null) { - userMetrics.decAMUsed(user, res); + public void incAMUsed(String partition, String user, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + usedAMResourceMB.incr(res.getMemorySize()); + usedAMResourceVCores.incr(res.getVirtualCores()); + CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); + if (userMetrics != null) { + userMetrics.incAMUsed(partition, user, res); + } + } + } + + public void decAMUsed(String partition, String user, Resource res) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + usedAMResourceMB.decr(res.getMemorySize()); + usedAMResourceVCores.decr(res.getVirtualCores()); + CSQueueMetrics userMetrics = (CSQueueMetrics) getUserMetrics(user); + if (userMetrics != null) { + userMetrics.decAMUsed(partition, user, res); + } } } @@ -100,16 +109,21 @@ public float getUsedCapacity() { return usedCapacity.value(); } - public void setUsedCapacity(float usedCapacity) { - this.usedCapacity.set(usedCapacity); + public void setUsedCapacity(String partition, float usedCap) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + this.usedCapacity.set(usedCap); + } } public float getAbsoluteUsedCapacity() { return absoluteUsedCapacity.value(); } - public void setAbsoluteUsedCapacity(Float absoluteUsedCapacity) { - this.absoluteUsedCapacity.set(absoluteUsedCapacity); + public void setAbsoluteUsedCapacity(String partition, + Float absoluteUsedCap) { + if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + this.absoluteUsedCapacity.set(absoluteUsedCap); + } } public synchronized static CSQueueMetrics forQueue(String queueName, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java index ba225413f6..e1014c11fc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueUtils.java @@ -230,13 +230,13 @@ public static void updateUsedCapacity(final ResourceCalculator rc, // QueueMetrics does not support per-label capacities, // so we report values only for the default partition. - if (nodePartition.equals(CommonNodeLabelsManager.NO_LABEL)) { - queueMetrics.setUsedCapacity( - queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL)); - queueMetrics.setAbsoluteUsedCapacity( - queueCapacities.getAbsoluteUsedCapacity( - RMNodeLabelsManager.NO_LABEL)); - } + + queueMetrics.setUsedCapacity(nodePartition, + queueCapacities.getUsedCapacity(RMNodeLabelsManager.NO_LABEL)); + queueMetrics.setAbsoluteUsedCapacity(nodePartition, + queueCapacities.getAbsoluteUsedCapacity( + RMNodeLabelsManager.NO_LABEL)); + } private static Resource getMaxAvailableResourceToQueue( @@ -302,7 +302,7 @@ public static void updateQueueStatistics( // Update queue metrics w.r.t node labels. In a generic way, we can // calculate available resource from all labels in cluster. - childQueue.getMetrics().setAvailableResourcesToQueue( + childQueue.getMetrics().setAvailableResourcesToQueue(nodePartition, getMaxAvailableResourceToQueue(rc, nlm, childQueue, cluster)); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java index 104e95e4ed..eb2432ee73 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/LeafQueue.java @@ -696,7 +696,7 @@ public Resource calculateAndGetAMResourceLimitPerPartition( resourceCalculator, queuePartitionUsableResource, amResourcePercent, minimumAllocation); - metrics.setAMResouceLimit(amResouceLimit); + metrics.setAMResouceLimit(nodePartition, amResouceLimit); queueUsage.setAMLimit(nodePartition, amResouceLimit); return amResouceLimit; } finally { @@ -811,9 +811,10 @@ private void activateApplications() { user.getResourceUsage().incAMUsed(partitionName, application.getAMResource(partitionName)); user.getResourceUsage().setAMLimit(partitionName, userAMLimit); - metrics.incAMUsed(application.getUser(), + metrics.incAMUsed(partitionName, application.getUser(), application.getAMResource(partitionName)); - metrics.setAMResouceLimitForUser(application.getUser(), userAMLimit); + metrics.setAMResouceLimitForUser(partitionName, + application.getUser(), userAMLimit); fsApp.remove(); LOG.info("Application " + applicationId + " from user: " + application .getUser() + " activated in queue: " + getQueueName()); @@ -894,7 +895,7 @@ private void removeApplicationAttempt( application.getAMResource(partitionName)); user.getResourceUsage().decAMUsed(partitionName, application.getAMResource(partitionName)); - metrics.decAMUsed(application.getUser(), + metrics.decAMUsed(partitionName, application.getUser(), application.getAMResource(partitionName)); } applicationAttemptMap.remove(application.getApplicationAttemptId()); @@ -1338,7 +1339,7 @@ Resource computeUserLimitAndSetHeadroom(FiCaSchedulerApp application, application.setHeadroomProvider(headroomProvider); - metrics.setAvailableResourcesToUser(user, headroom); + metrics.setAvailableResourcesToUser(nodePartition, user, headroom); return userLimit; } @@ -1583,7 +1584,8 @@ void allocateResource(Resource clusterResource, // Note this is a bit unconventional since it gets the object and modifies // it here, rather then using set routine Resources.subtractFrom(application.getHeadroom(), resource); // headroom - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(nodePartition, + userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { LOG.debug(getQueueName() + " user=" + userName + " used=" @@ -1622,7 +1624,8 @@ void releaseResource(Resource clusterResource, User user = usersManager.updateUserResourceUsage(userName, resource, nodePartition, false); - metrics.setAvailableResourcesToUser(userName, application.getHeadroom()); + metrics.setAvailableResourcesToUser(nodePartition, + userName, application.getHeadroom()); if (LOG.isDebugEnabled()) { LOG.debug( diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java index 5c0b718e33..331585e03c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/common/fica/FiCaSchedulerApp.java @@ -18,7 +18,15 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.fica; -import com.google.common.annotations.VisibleForTesting; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.locks.ReentrantReadWriteLock; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -65,28 +73,18 @@ import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.AbstractContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.allocator.ContainerAllocator; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ContainerAllocationProposal; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.ResourceCommitRequest; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.SchedulerContainer; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.PlacementSet; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.placement.SchedulingPlacementSet; - import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; - -import org.apache.hadoop.yarn.server.resourcemanager.scheduler.common.PendingAsk; import org.apache.hadoop.yarn.server.utils.BuilderUtils; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.ResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Iterator; -import java.util.List; -import java.util.Map; -import java.util.Set; -import java.util.concurrent.ConcurrentHashMap; -import java.util.concurrent.locks.ReentrantReadWriteLock; +import com.google.common.annotations.VisibleForTesting; /** * Represents an application attempt from the viewpoint of the FIFO or Capacity @@ -198,7 +196,8 @@ public boolean containerCompleted(RMContainer rmContainer, "SchedulerApp", getApplicationId(), containerId, containerResource); // Update usage metrics - queue.getMetrics().releaseResources(getUser(), 1, containerResource); + queue.getMetrics().releaseResources(partition, + getUser(), 1, containerResource); attemptResourceUsage.decUsed(partition, containerResource); // Clear resource utilization metrics cache. @@ -572,8 +571,8 @@ public boolean unreserve(SchedulerRequestKey schedulerKey, node.unreserveResource(this); // Update reserved metrics - queue.getMetrics().unreserveResource(getUser(), - rmContainer.getReservedResource()); + queue.getMetrics().unreserveResource(node.getPartition(), + getUser(), rmContainer.getReservedResource()); queue.decReservedResource(node.getPartition(), rmContainer.getReservedResource()); return true; @@ -782,7 +781,7 @@ public void reserve(SchedulerRequestKey schedulerKey, FiCaSchedulerNode node, // Update reserved metrics if this is the first reservation // rmContainer will be moved to reserved in the super.reserve if (!reReservation) { - queue.getMetrics().reserveResource( + queue.getMetrics().reserveResource(node.getPartition(), getUser(), container.getResource()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java index a5772badf2..a678bb9ec4 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/fair/FSAppAttempt.java @@ -18,6 +18,16 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair; +import java.text.DecimalFormat; +import java.util.ArrayList; +import java.util.Collection; +import java.util.Collections; +import java.util.HashMap; +import java.util.HashSet; +import java.util.List; +import java.util.Map; +import java.util.Set; + import org.apache.commons.logging.Log; import org.apache.commons.logging.LogFactory; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -51,16 +61,6 @@ import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.apache.hadoop.yarn.util.resource.Resources; -import java.text.DecimalFormat; -import java.util.ArrayList; -import java.util.Collection; -import java.util.Collections; -import java.util.HashMap; -import java.util.HashSet; -import java.util.List; -import java.util.Map; -import java.util.Set; - /** * Represents an application attempt from the viewpoint of the Fair Scheduler. */ @@ -169,7 +169,9 @@ void containerCompleted(RMContainer rmContainer, "SchedulerApp", getApplicationId(), containerId, containerResource); // Update usage metrics - queue.getMetrics().releaseResources(getUser(), 1, containerResource); + queue.getMetrics().releaseResources( + rmContainer.getNodeLabelExpression(), + getUser(), 1, containerResource); this.attemptResourceUsage.decUsed(containerResource); // Clear resource utilization metrics cache. @@ -653,7 +655,7 @@ private boolean reserve(Resource perAllocationResource, FSSchedulerNode node, reservedContainer = createContainer(node, perAllocationResource, schedulerKey); - getMetrics().reserveResource(getUser(), + getMetrics().reserveResource(node.getPartition(), getUser(), reservedContainer.getResource()); RMContainer rmContainer = super.reserve(node, schedulerKey, null, reservedContainer); @@ -712,7 +714,7 @@ public void unreserve(SchedulerRequestKey schedulerKey, unreserveInternal(schedulerKey, node); node.unreserveResource(this); clearReservation(node); - getMetrics().unreserveResource( + getMetrics().unreserveResource(node.getPartition(), getUser(), rmContainer.getContainer().getResource()); } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/invariants/TestMetricsInvariantChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/invariants/TestMetricsInvariantChecker.java index 35cf1e4e53..5fd4151a2f 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/invariants/TestMetricsInvariantChecker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/monitor/invariants/TestMetricsInvariantChecker.java @@ -22,6 +22,7 @@ import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.source.JvmMetrics; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.log4j.Logger; import org.junit.Before; @@ -59,7 +60,8 @@ public void testManyRuns() { QueueMetrics qm = QueueMetrics.forQueue(metricsSystem, "root", null, false, conf); - qm.setAvailableResourcesToQueue(Resource.newInstance(1, 1)); + qm.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resource.newInstance(1, 1)); int numIterations = 1000; long start = System.currentTimeMillis(); @@ -79,7 +81,8 @@ public void testViolation() { // create a "wrong" condition in which the invariants are not respected QueueMetrics qm = QueueMetrics.forQueue(metricsSystem, "root", null, false, conf); - qm.setAvailableResourcesToQueue(Resource.newInstance(-1, -1)); + qm.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resource.newInstance(-1, -1)); // test with throwing exception turned on try { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index cb1104bffb..13144e9e04 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -37,6 +37,7 @@ import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.MockRM; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; import org.apache.hadoop.yarn.server.utils.BuilderUtils; @@ -72,8 +73,10 @@ public void setUp() { metrics.submitAppAttempt(user); checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); - metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); + metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(100*GB, 100)); + metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, + user, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); @@ -81,17 +84,21 @@ public void setUp() { metrics.runAppAttempt(app.getApplicationId(), user); checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); + metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, + user, 3, Resources.createResource(2*GB, 2), true); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, + user, 1, Resources.createResource(2*GB, 2)); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); - metrics.incrPendingResources(user, 0, Resources.createResource(2 * GB, 2)); + metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, + user, 0, Resources.createResource(2 * GB, 2)); checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); - metrics.decrPendingResources(user, 0, Resources.createResource(2 * GB, 2)); + metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL, + user, 0, Resources.createResource(2 * GB, 2)); checkResources(queueSource, 4 * GB, 4, 2, 3, 1, 100 * GB, 100, 9 * GB, 9, 2, 0, 0, 0); @@ -177,9 +184,12 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 1, 0, 0, 0, 0, true); checkApps(userSource, 1, 1, 0, 0, 0, 0, true); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); + metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, + user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, + user, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); @@ -189,11 +199,13 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); + metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, + user, 3, Resources.createResource(2*GB, 2), true); checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); - metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); + metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, + user, 1, Resources.createResource(2*GB, 2)); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); @@ -283,11 +295,16 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(userSource, 1, 1, 0, 0, 0, 0, true); checkApps(parentUserSource, 1, 1, 0, 0, 0, 0, true); - parentMetrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); - metrics.setAvailableResourcesToQueue(Resources.createResource(100*GB, 100)); - parentMetrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); - metrics.setAvailableResourcesToUser(user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(user, 5, Resources.createResource(3*GB, 3)); + parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(100*GB, 100)); + metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + Resources.createResource(100*GB, 100)); + parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, + user, Resources.createResource(10*GB, 10)); + metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, + user, Resources.createResource(10*GB, 10)); + metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, + user, 5, Resources.createResource(3*GB, 3)); checkResources(queueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(parentQueueSource, 0, 0, 0, 0, 0, 100*GB, 100, 15*GB, 15, 5, 0, 0, 0); checkResources(userSource, 0, 0, 0, 0, 0, 10*GB, 10, 15*GB, 15, 5, 0, 0, 0); @@ -297,8 +314,10 @@ public void testQueueAppMetricsForMultipleFailures() { checkApps(queueSource, 1, 0, 1, 0, 0, 0, true); checkApps(userSource, 1, 0, 1, 0, 0, 0, true); - metrics.allocateResources(user, 3, Resources.createResource(2*GB, 2), true); - metrics.reserveResource(user, Resources.createResource(3*GB, 3)); + metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, + user, 3, Resources.createResource(2*GB, 2), true); + metrics.reserveResource(RMNodeLabelsManager.NO_LABEL, + user, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources checkResources(queueSource, 6*GB, 6, 3, 3, 0, 100*GB, 100, 9*GB, 9, 2, 3*GB, 3, 1); @@ -306,8 +325,10 @@ public void testQueueAppMetricsForMultipleFailures() { checkResources(userSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); checkResources(parentUserSource, 6*GB, 6, 3, 3, 0, 10*GB, 10, 9*GB, 9, 2, 3*GB, 3, 1); - metrics.releaseResources(user, 1, Resources.createResource(2*GB, 2)); - metrics.unreserveResource(user, Resources.createResource(3*GB, 3)); + metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, + user, 1, Resources.createResource(2*GB, 2)); + metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, + user, Resources.createResource(3*GB, 3)); checkResources(queueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(parentQueueSource, 4*GB, 4, 2, 3, 1, 100*GB, 100, 9*GB, 9, 2, 0, 0, 0); checkResources(userSource, 4*GB, 4, 2, 3, 1, 10*GB, 10, 9*GB, 9, 2, 0, 0, 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java index 06253ff7cc..fa16effd25 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerApplicationAttempt.java @@ -17,6 +17,7 @@ */ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.mock; import static org.mockito.Mockito.when; @@ -41,14 +42,11 @@ import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainer; import org.apache.hadoop.yarn.server.resourcemanager.rmcontainer.RMContainerImpl; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.fifo.FifoScheduler; - import org.apache.hadoop.yarn.server.scheduler.SchedulerRequestKey; import org.apache.hadoop.yarn.util.resource.DefaultResourceCalculator; import org.junit.After; import org.junit.Test; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.TestUtils.toSchedulerKey; - public class TestSchedulerApplicationAttempt { private static final NodeId nodeId = NodeId.newInstance("somehost", 5); @@ -103,7 +101,8 @@ public void testMove() { Map reservations = new HashMap(); reservations.put(node.getNodeID(), container2); app.reservedContainers.put(toSchedulerKey(prio1), reservations); - oldMetrics.reserveResource(user, reservedResource); + oldMetrics.reserveResource(container2.getNodeLabelExpression(), + user, reservedResource); checkQueueMetrics(oldMetrics, 1, 1, 1536, 2, 2048, 3, 3072, 4); checkQueueMetrics(newMetrics, 0, 0, 0, 0, 0, 0, 0, 0); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index bf1f6eb63e..3c6e6dfb67 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -2384,8 +2384,10 @@ public void testRemoveAttemptMoveAdded() throws Exception { sch.getApplicationAttempt(appAttemptId).getLiveContainersMap() .put(newContainerId, rmContainer); QueueMetrics queueA1M = queueA1.getMetrics(); - queueA1M.incrPendingResources("user1", 1, resource); - queueA1M.allocateResources("user1", resource); + queueA1M.incrPendingResources(rmContainer.getNodeLabelExpression(), + "user1", 1, resource); + queueA1M.allocateResources(rmContainer.getNodeLabelExpression(), + "user1", resource); // remove attempt sch.handle(new AppAttemptRemovedSchedulerEvent(appAttemptId, RMAppAttemptState.KILLED, true)); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java index fc1d284eaf..740ef33662 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestNodeLabelContainerAllocation.java @@ -1958,8 +1958,8 @@ public RMNodeLabelsManager createNodeLabelManager() { reportNm2.getAvailableResource().getMemorySize()); LeafQueue leafQueue = (LeafQueue) cs.getQueue("a"); - assertEquals(0 * GB, leafQueue.getMetrics().getAvailableMB()); - assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(5 * GB, leafQueue.getMetrics().getAvailableMB()); + assertEquals(0 * GB, leafQueue.getMetrics().getAllocatedMB()); // Kill all apps in queue a cs.killAllAppsInQueue("a"); @@ -2061,8 +2061,8 @@ public RMNodeLabelsManager createNodeLabelManager() { double delta = 0.0001; // 3GB is used from label x quota. 1.5 GB is remaining from default label. // 2GB is remaining from label x. - assertEquals(3.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); - assertEquals(4 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(6.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); + assertEquals(1 * GB, leafQueue.getMetrics().getAllocatedMB()); // app1 asks for 1 default partition container am1.allocate("*", 1 * GB, 5, new ArrayList()); @@ -2079,7 +2079,7 @@ public RMNodeLabelsManager createNodeLabelManager() { // 3GB is used from label x quota. 2GB used from default label. // So total 2.5 GB is remaining. assertEquals(2.5 * GB, leafQueue.getMetrics().getAvailableMB(), delta); - assertEquals(5 * GB, leafQueue.getMetrics().getAllocatedMB()); + assertEquals(2 * GB, leafQueue.getMetrics().getAllocatedMB()); rm1.close(); }