From 84e22a6af46db2859d7d2caf192861cae9b6a1a8 Mon Sep 17 00:00:00 2001 From: Haibo Chen Date: Tue, 16 Oct 2018 14:12:02 -0700 Subject: [PATCH] YARN-8842. Expose metrics for custom resource types in QueueMetrics. (Contributed by Szilard Nemeth) --- .../ResourceTypesTestHelper.java | 22 + .../scheduler/QueueMetrics.java | 130 +++- .../QueueMetricsForCustomResources.java | 158 +++++ .../scheduler/capacity/CapacityScheduler.java | 5 +- .../resourcemanager/scheduler/QueueInfo.java | 90 +++ .../scheduler/QueueMetricsTestData.java | 105 +++ .../scheduler/ResourceMetricsChecker.java | 88 ++- .../scheduler/TestQueueMetrics.java | 250 +++---- .../TestQueueMetricsForCustomResources.java | 645 ++++++++++++++++++ 9 files changed, 1325 insertions(+), 168 deletions(-) create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java index 98a8a003b2..3c3c2cce2d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/resourcetypes/ResourceTypesTestHelper.java @@ -16,6 +16,7 @@ package org.apache.hadoop.yarn.resourcetypes; +import com.google.common.collect.Maps; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.factories.RecordFactory; @@ -24,6 +25,7 @@ import java.util.Map; import java.util.regex.Matcher; import java.util.regex.Pattern; +import java.util.stream.Collectors; /** * Contains helper methods to create Resource and ResourceInformation objects. @@ -90,4 +92,24 @@ private static ResourceValueAndUnit getResourceValueAndUnit(String val) { return new ResourceValueAndUnit(value, matcher.group(2)); } + public static Map extractCustomResources(Resource res) { + Map customResources = Maps.newHashMap(); + for (int i = 0; i < res.getResources().length; i++) { + ResourceInformation ri = res.getResourceInformation(i); + if (!ri.getName().equals(ResourceInformation.MEMORY_URI) + && !ri.getName().equals(ResourceInformation.VCORES_URI)) { + customResources.put(ri.getName(), ri.getValue()); + } + } + return customResources; + } + + public static Map extractCustomResourcesAsStrings( + Resource res) { + Map resValues = extractCustomResources(res); + return resValues.entrySet().stream() + .collect(Collectors.toMap( + Map.Entry::getKey, e -> String.valueOf(e.getValue()))); + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 20a5a1ff79..1315c2e873 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -24,6 +24,7 @@ import java.util.HashMap; import java.util.Map; +import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; import org.apache.hadoop.conf.Configuration; @@ -45,7 +46,9 @@ import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; -import org.apache.hadoop.yarn.server.utils.BuilderUtils; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .QueueMetricsForCustomResources.QueueMetricsCustomResource; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -114,6 +117,7 @@ public class QueueMetrics implements MetricsSource { protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; + private QueueMetricsForCustomResources queueMetricsForCustomResources; protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { @@ -125,6 +129,11 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, metricsSystem = ms; this.conf = conf; runningTime = buildBuckets(conf); + + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + this.queueMetricsForCustomResources = + new QueueMetricsForCustomResources(); + } } protected QueueMetrics tag(MetricsInfo info, String value) { @@ -350,9 +359,12 @@ public void moveAppTo(AppSchedulingInfo app) { * @param limit resource limit */ public void setAvailableResourcesToQueue(String partition, Resource limit) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { availableMB.set(limit.getMemorySize()); availableVCores.set(limit.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.setAvailable(limit); + } } } @@ -392,7 +404,7 @@ public void setAvailableResourcesToUser(String partition, */ public void incrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { _incrPendingResources(containers, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -408,12 +420,15 @@ private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increasePending(res, containers); + } } public void decrPendingResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { _decrPendingResources(containers, res); QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -429,6 +444,9 @@ private void _decrPendingResources(int containers, Resource res) { pendingContainers.decr(containers); pendingMB.decr(res.getMemorySize() * containers); pendingVCores.decr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreasePending(res, containers); + } } public void incrNodeTypeAggregations(String user, NodeType type) { @@ -452,12 +470,16 @@ public void incrNodeTypeAggregations(String user, NodeType type) { public void allocateResources(String partition, String user, int containers, Resource res, boolean decrPending) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedContainers.incr(containers); aggregateContainersAllocated.incr(containers); allocatedMB.incr(res.getMemorySize() * containers); allocatedVCores.incr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res, containers); + } + if (decrPending) { _decrPendingResources(containers, res); } @@ -479,12 +501,18 @@ public void allocateResources(String partition, String user, * @param res */ public void allocateResources(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedMB.incr(res.getMemorySize()); allocatedVCores.incr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseAllocated(res); + } pendingMB.decr(res.getMemorySize()); pendingVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreasePending(res); + } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -498,11 +526,15 @@ public void allocateResources(String partition, String user, Resource res) { public void releaseResources(String partition, String user, int containers, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { allocatedContainers.decr(containers); aggregateContainersReleased.incr(containers); allocatedMB.decr(res.getMemorySize() * containers); allocatedVCores.decr(res.getVirtualCores() * containers); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreaseAllocated(res, containers); + } + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(partition, user, containers, res); @@ -519,9 +551,13 @@ public void releaseResources(String partition, * @param user * @param res */ - public void releaseResources(String user, Resource res) { + private void releaseResources(String user, Resource res) { allocatedMB.decr(res.getMemorySize()); allocatedVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreaseAllocated(res); + } + QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.releaseResources(user, res); @@ -552,8 +588,19 @@ public void updatePreemptedVcoreSeconds(long vcoreSeconds) { } } + public void updatePreemptedSecondsForCustomResources(Resource res, + long seconds) { + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources + .increaseAggregatedPreemptedSeconds(res, seconds); + } + if (parent != null) { + parent.updatePreemptedSecondsForCustomResources(res, seconds); + } + } + public void reserveResource(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { reserveResource(user, res); } } @@ -562,6 +609,9 @@ public void reserveResource(String user, Resource res) { reservedContainers.incr(); reservedMB.incr(res.getMemorySize()); reservedVCores.incr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.increaseReserved(res); + } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.reserveResource(user, res); @@ -571,10 +621,13 @@ public void reserveResource(String user, Resource res) { } } - public void unreserveResource(String user, Resource res) { + private void unreserveResource(String user, Resource res) { reservedContainers.decr(); reservedMB.decr(res.getMemorySize()); reservedVCores.decr(res.getVirtualCores()); + if (queueMetricsForCustomResources != null) { + queueMetricsForCustomResources.decreaseReserved(res); + } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { userMetrics.unreserveResource(user, res); @@ -585,7 +638,7 @@ public void unreserveResource(String user, Resource res) { } public void unreserveResource(String partition, String user, Resource res) { - if(partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { + if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { unreserveResource(user, res); } } @@ -647,10 +700,59 @@ public int getAppsKilled() { public int getAppsFailed() { return appsFailed.value(); } - + public Resource getAllocatedResources() { - return BuilderUtils.newResource(allocatedMB.value(), - (int) allocatedVCores.value()); + if (queueMetricsForCustomResources != null) { + return Resource.newInstance(allocatedMB.value(), allocatedVCores.value(), + queueMetricsForCustomResources.getAllocatedValues()); + } + return Resource.newInstance(allocatedMB.value(), + allocatedVCores.value()); + } + + public Resource getAvailableResources() { + if (queueMetricsForCustomResources != null) { + return Resource.newInstance(availableMB.value(), availableVCores.value(), + queueMetricsForCustomResources.getAvailableValues()); + } + return Resource.newInstance(availableMB.value(), availableVCores.value()); + } + + public Resource getPendingResources() { + if (queueMetricsForCustomResources != null) { + return Resource.newInstance(pendingMB.value(), pendingVCores.value(), + queueMetricsForCustomResources.getPendingValues()); + } + return Resource.newInstance(pendingMB.value(), pendingVCores.value()); + } + + public Resource getReservedResources() { + if (queueMetricsForCustomResources != null) { + return Resource.newInstance(reservedMB.value(), reservedVCores.value(), + queueMetricsForCustomResources.getReservedValues()); + } + return Resource.newInstance(reservedMB.value(), reservedVCores.value()); + } + + /** + * Handle this specially as this has a long value and it could be + * truncated when casted into an int parameter of + * Resource.newInstance (vCores). + * @return QueueMetricsCustomResource + */ + @VisibleForTesting + public QueueMetricsCustomResource getAggregatedPreemptedSecondsResources() { + return queueMetricsForCustomResources.getAggregatePreemptedSeconds(); + } + + @VisibleForTesting + public MutableCounterLong getAggregateMemoryMBSecondsPreempted() { + return aggregateMemoryMBSecondsPreempted; + } + + @VisibleForTesting + public MutableCounterLong getAggregateVcoreSecondsPreempted() { + return aggregateVcoreSecondsPreempted; } public long getAllocatedMB() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java new file mode 100644 index 0000000000..80295846ef --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsForCustomResources.java @@ -0,0 +1,158 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.Maps; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; + +import java.util.Map; +import java.util.function.BiFunction; + +/** + * This class is a main entry-point for any kind of metrics for + * custom resources. + * It provides increase and decrease methods for all types of metrics. + */ +public class QueueMetricsForCustomResources { + /** + * Class that holds metrics values for custom resources in a map keyed with + * the name of the custom resource. + * There are different kinds of values like allocated, available and others. + */ + public static class QueueMetricsCustomResource { + private final Map values = Maps.newHashMap(); + + protected void increase(Resource res) { + update(res, Long::sum); + } + + void increaseWithMultiplier(Resource res, long multiplier) { + update(res, (v1, v2) -> v1 + v2 * multiplier); + } + + protected void decrease(Resource res) { + update(res, (v1, v2) -> v1 - v2); + } + + void decreaseWithMultiplier(Resource res, int containers) { + update(res, (v1, v2) -> v1 - v2 * containers); + } + + protected void set(Resource res) { + update(res, (v1, v2) -> v2); + } + + private void update(Resource res, BiFunction operation) { + if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { + ResourceInformation[] resources = res.getResources(); + + for (int i = 2; i < resources.length; i++) { + ResourceInformation resource = resources[i]; + + // Map.merge only applies operation if there is + // a value for the key in the map + if (!values.containsKey(resource.getName())) { + values.put(resource.getName(), 0L); + } + values.merge(resource.getName(), + resource.getValue(), operation); + } + } + } + + public Map getValues() { + return values; + } + } + private final QueueMetricsCustomResource aggregatePreemptedSeconds = + new QueueMetricsCustomResource(); + private final QueueMetricsCustomResource allocated = + new QueueMetricsCustomResource(); + private final QueueMetricsCustomResource available = + new QueueMetricsCustomResource(); + private final QueueMetricsCustomResource pending = + new QueueMetricsCustomResource(); + + private final QueueMetricsCustomResource reserved = + new QueueMetricsCustomResource(); + + public void increaseReserved(Resource res) { + reserved.increase(res); + } + + public void decreaseReserved(Resource res) { + reserved.decrease(res); + } + + public void setAvailable(Resource res) { + available.set(res); + } + + public void increasePending(Resource res, int containers) { + pending.increaseWithMultiplier(res, containers); + } + + public void decreasePending(Resource res) { + pending.decrease(res); + } + + public void decreasePending(Resource res, int containers) { + pending.decreaseWithMultiplier(res, containers); + } + + public void increaseAllocated(Resource res) { + allocated.increase(res); + } + + public void increaseAllocated(Resource res, int containers) { + allocated.increaseWithMultiplier(res, containers); + } + + public void decreaseAllocated(Resource res) { + allocated.decrease(res); + } + + public void decreaseAllocated(Resource res, int containers) { + allocated.decreaseWithMultiplier(res, containers); + } + + public void increaseAggregatedPreemptedSeconds(Resource res, long seconds) { + aggregatePreemptedSeconds.increaseWithMultiplier(res, seconds); + } + + Map getAllocatedValues() { + return allocated.getValues(); + } + + Map getAvailableValues() { + return available.getValues(); + } + + Map getPendingValues() { + return pending.getValues(); + } + + Map getReservedValues() { + return reserved.getValues(); + } + + QueueMetricsCustomResource getAggregatePreemptedSeconds() { + return aggregatePreemptedSeconds; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java index fddd361482..c5ad2ce9fa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacityScheduler.java @@ -2106,7 +2106,8 @@ protected void completedContainerInternal( private void updateQueuePreemptionMetrics( CSQueue queue, RMContainer rmc) { QueueMetrics qMetrics = queue.getMetrics(); - long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); + final long usedMillis = rmc.getFinishTime() - rmc.getCreationTime(); + final long usedSeconds = usedMillis / DateUtils.MILLIS_PER_SECOND; Resource containerResource = rmc.getAllocatedResource(); qMetrics.preemptContainer(); long mbSeconds = (containerResource.getMemorySize() * usedMillis) @@ -2115,6 +2116,8 @@ private void updateQueuePreemptionMetrics( / DateUtils.MILLIS_PER_SECOND; qMetrics.updatePreemptedMemoryMBSeconds(mbSeconds); qMetrics.updatePreemptedVcoreSeconds(vcSeconds); + qMetrics.updatePreemptedSecondsForCustomResources(containerResource, + usedSeconds); } @Lock(Lock.NoLock.class) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java new file mode 100644 index 0000000000..0a0f893309 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueInfo.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; + +import java.util.function.Consumer; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .TestQueueMetrics.userSource; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + +/** + * This class holds queue and user metrics for a particular queue, + * used for testing metrics. + * Reference for the parent queue is also stored for every queue, + * except if the queue is root. + */ +public final class QueueInfo { + private final QueueInfo parentQueueInfo; + private final Queue queue; + final QueueMetrics queueMetrics; + final MetricsSource queueSource; + final MetricsSource userSource; + + public QueueInfo(QueueInfo parent, String queueName, MetricsSystem ms, + Configuration conf, String user) { + Queue parentQueue = parent == null ? null : parent.queue; + parentQueueInfo = parent; + queueMetrics = + QueueMetrics.forQueue(ms, queueName, parentQueue, true, conf); + queue = mock(Queue.class); + when(queue.getMetrics()).thenReturn(queueMetrics); + queueSource = ms.getSource(QueueMetrics.sourceName(queueName).toString()); + + // need to call getUserMetrics so that a non-null userSource is returned + // with the call to userSource(..) + queueMetrics.getUserMetrics(user); + userSource = userSource(ms, queueName, user); + } + + public QueueInfo getRoot() { + QueueInfo root = this; + while (root.parentQueueInfo != null) { + root = root.parentQueueInfo; + } + return root; + } + + public void checkAllQueueSources(Consumer consumer) { + checkQueueSourcesRecursive(this, consumer); + } + + private void checkQueueSourcesRecursive(QueueInfo queueInfo, + Consumer consumer) { + consumer.accept(queueInfo.queueSource); + if (queueInfo.parentQueueInfo != null) { + checkQueueSourcesRecursive(queueInfo.parentQueueInfo, consumer); + } + } + + public void checkAllQueueMetrics(Consumer consumer) { + checkAllQueueMetricsRecursive(this, consumer); + } + + private void checkAllQueueMetricsRecursive(QueueInfo queueInfo, Consumer + consumer) { + consumer.accept(queueInfo.queueMetrics); + if (queueInfo.parentQueueInfo != null) { + checkAllQueueMetricsRecursive(queueInfo.parentQueueInfo, consumer); + } + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java new file mode 100644 index 0000000000..56df7d34ab --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetricsTestData.java @@ -0,0 +1,105 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import org.apache.hadoop.yarn.api.records.Resource; + +import java.util.Map; + +import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper + .extractCustomResources; + +/** + * This class is to test standard and custom resource metrics for all types. + * Metrics types can be one of: allocated, pending, reserved + * and other resources. + */ +public final class QueueMetricsTestData { + public static final class Builder { + private int containers; + private Resource resource; + private Resource resourceToDecrease; + private Map customResourceValues; + private int containersToDecrease; + private String user; + private String partition; + private QueueInfo queueInfo; + + private Builder() { + } + + public static Builder create() { + return new Builder(); + } + + public Builder withContainers(int containers) { + this.containers = containers; + return this; + } + + public Builder withResourceToDecrease(Resource res, int containers) { + this.resourceToDecrease = res; + this.containersToDecrease = containers; + return this; + } + + public Builder withResources(Resource res) { + this.resource = res; + return this; + } + + public Builder withUser(String user) { + this.user = user; + return this; + } + + public Builder withPartition(String partition) { + this.partition = partition; + return this; + } + + public Builder withLeafQueue(QueueInfo qInfo) { + this.queueInfo = qInfo; + return this; + } + + public QueueMetricsTestData build() { + this.customResourceValues = extractCustomResources(resource); + return new QueueMetricsTestData(this); + } + } + + final Map customResourceValues; + final int containers; + final Resource resourceToDecrease; + final int containersToDecrease; + final Resource resource; + final String partition; + final QueueInfo leafQueue; + final String user; + + private QueueMetricsTestData(Builder builder) { + this.customResourceValues = builder.customResourceValues; + this.containers = builder.containers; + this.resourceToDecrease = builder.resourceToDecrease; + this.containersToDecrease = builder.containersToDecrease; + this.resource = builder.resource; + this.partition = builder.partition; + this.leafQueue = builder.queueInfo; + this.user = builder.user; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java index cd617d7b9d..05341aab10 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java @@ -27,34 +27,31 @@ import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertGauge; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.COUNTER_LONG; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_INT; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricType.GAUGE_LONG; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; final class ResourceMetricsChecker { private final static Logger LOG = LoggerFactory.getLogger(ResourceMetricsChecker.class); + enum ResourceMetricType { + GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG + } + private static final ResourceMetricsChecker INITIAL_CHECKER = new ResourceMetricsChecker() .gaugeLong(ALLOCATED_MB, 0) @@ -72,29 +69,41 @@ final class ResourceMetricsChecker { .gaugeInt(RESERVED_CONTAINERS, 0); enum ResourceMetricsKey { - ALLOCATED_MB("AllocatedMB"), - ALLOCATED_V_CORES("AllocatedVCores"), - ALLOCATED_CONTAINERS("AllocatedContainers"), - AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated"), - AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased"), - AVAILABLE_MB("AvailableMB"), - AVAILABLE_V_CORES("AvailableVCores"), - PENDING_MB("PendingMB"), - PENDING_V_CORES("PendingVCores"), - PENDING_CONTAINERS("PendingContainers"), - RESERVED_MB("ReservedMB"), - RESERVED_V_CORES("ReservedVCores"), - RESERVED_CONTAINERS("ReservedContainers"); + ALLOCATED_MB("AllocatedMB", GAUGE_LONG), + ALLOCATED_V_CORES("AllocatedVCores", GAUGE_INT), + ALLOCATED_CONTAINERS("AllocatedContainers", GAUGE_INT), + AGGREGATE_CONTAINERS_ALLOCATED("AggregateContainersAllocated", + COUNTER_LONG), + AGGREGATE_CONTAINERS_RELEASED("AggregateContainersReleased", + COUNTER_LONG), + AVAILABLE_MB("AvailableMB", GAUGE_LONG), + AVAILABLE_V_CORES("AvailableVCores", GAUGE_INT), + PENDING_MB("PendingMB", GAUGE_LONG), + PENDING_V_CORES("PendingVCores", GAUGE_INT), + PENDING_CONTAINERS("PendingContainers", GAUGE_INT), + RESERVED_MB("ReservedMB", GAUGE_LONG), + RESERVED_V_CORES("ReservedVCores", GAUGE_INT), + RESERVED_CONTAINERS("ReservedContainers", GAUGE_INT), + AGGREGATE_VCORE_SECONDS_PREEMPTED( + "AggregateVcoreSecondsPreempted", COUNTER_LONG), + AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED( + "AggregateMemoryMBSecondsPreempted", COUNTER_LONG); private String value; + private ResourceMetricType type; - ResourceMetricsKey(String value) { + ResourceMetricsKey(String value, ResourceMetricType type) { this.value = value; + this.type = type; } public String getValue() { return value; } + + public ResourceMetricType getType() { + return type; + } } private final Map gaugesLong; @@ -123,20 +132,31 @@ public static ResourceMetricsChecker create() { } ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { + ensureTypeIsCorrect(key, GAUGE_LONG); gaugesLong.put(key, value); return this; } ResourceMetricsChecker gaugeInt(ResourceMetricsKey key, int value) { + ensureTypeIsCorrect(key, GAUGE_INT); gaugesInt.put(key, value); return this; } ResourceMetricsChecker counter(ResourceMetricsKey key, long value) { + ensureTypeIsCorrect(key, COUNTER_LONG); counters.put(key, value); return this; } + private void ensureTypeIsCorrect(ResourceMetricsKey + key, ResourceMetricType actualType) { + if (key.type != actualType) { + throw new IllegalStateException("Metrics type should be " + key.type + + " instead of " + actualType + " for metrics: " + key.value); + } + } + ResourceMetricsChecker checkAgainst(MetricsSource source) { if (source == null) { throw new IllegalStateException("MetricsSource should not be null!"); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index c971d655e5..2066f607c5 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -18,15 +18,6 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.getMetrics; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler - .AppMetricsChecker.AppMetricsKey.*; -import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.*; -import static org.junit.Assert.assertNull; -import static org.mockito.Mockito.mock; -import static org.mockito.Mockito.when; - import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -46,8 +37,40 @@ import org.junit.Before; import org.junit.Test; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_COMPLETED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_FAILED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_PENDING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_RUNNING; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.AppMetricsChecker.AppMetricsKey.APPS_SUBMITTED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; +import static org.junit.Assert.assertNull; +import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.when; + public class TestQueueMetrics { + private static Queue createMockQueue(QueueMetrics metrics) { + Queue queue = mock(Queue.class); + when(queue.getMetrics()).thenReturn(metrics); + return queue; + } + private static final int GB = 1024; // MB + private static final String USER = "alice"; + private static final String USER_2 = "dodo"; private static final Configuration conf = new Configuration(); private MetricsSystem ms; @@ -60,19 +83,18 @@ public void setUp() { @Test public void testDefaultSingleQueueMetrics() { String queueName = "single"; - String user = "alice"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, conf); MetricsSource queueSource= queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(USER); - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, queueName, user); + metrics.submitApp(USER); + MetricsSource userSource = userSource(ms, queueName, USER); AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) .checkAgainst(queueSource, true); - metrics.submitAppAttempt(user); + metrics.submitAppAttempt(USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 1) .checkAgainst(queueSource, true); @@ -80,7 +102,7 @@ public void testDefaultSingleQueueMetrics() { metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, - user, 5, Resources.createResource(3*GB, 3)); + USER, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create() @@ -91,14 +113,14 @@ public void testDefaultSingleQueueMetrics() { .gaugeInt(PENDING_CONTAINERS, 5) .checkAgainst(queueSource); - metrics.runAppAttempt(app.getApplicationId(), user); + metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) .checkAgainst(queueSource, true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, - user, 3, Resources.createResource(2*GB, 2), true); + USER, 3, Resources.createResource(2*GB, 2), true); rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) .gaugeLong(ALLOCATED_MB, 6 * GB) .gaugeInt(ALLOCATED_V_CORES, 6) @@ -110,7 +132,7 @@ public void testDefaultSingleQueueMetrics() { .checkAgainst(queueSource); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, - user, 1, Resources.createResource(2*GB, 2)); + USER, 1, Resources.createResource(2*GB, 2)); rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -119,13 +141,13 @@ public void testDefaultSingleQueueMetrics() { .checkAgainst(queueSource); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, - user, 0, Resources.createResource(2 * GB, 2)); + USER, 0, Resources.createResource(2 * GB, 2)); //nothing should change in values rmChecker = ResourceMetricsChecker.createFromChecker(rmChecker) .checkAgainst(queueSource); metrics.decrPendingResources(RMNodeLabelsManager.NO_LABEL, - user, 0, Resources.createResource(2 * GB, 2)); + USER, 0, Resources.createResource(2 * GB, 2)); //nothing should change in values ResourceMetricsChecker.createFromChecker(rmChecker) .checkAgainst(queueSource); @@ -136,7 +158,7 @@ public void testDefaultSingleQueueMetrics() { .counter(APPS_SUBMITTED, 1) .gaugeInt(APPS_RUNNING, 0) .checkAgainst(queueSource, true); - metrics.finishApp(user, RMAppState.FINISHED); + metrics.finishApp(USER, RMAppState.FINISHED); AppMetricsChecker.createFromChecker(appMetricsChecker) .counter(APPS_COMPLETED, 1) .checkAgainst(queueSource, true); @@ -146,24 +168,23 @@ public void testDefaultSingleQueueMetrics() { @Test public void testQueueAppMetricsForMultipleFailures() { String queueName = "single"; - String user = "alice"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, false, new Configuration()); MetricsSource queueSource = queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(USER); - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, queueName, user); + metrics.submitApp(USER); + MetricsSource userSource = userSource(ms, queueName, USER); AppMetricsChecker appMetricsChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) .checkAgainst(queueSource, true); - metrics.submitAppAttempt(user); + metrics.submitAppAttempt(USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 1) .checkAgainst(queueSource, true); - metrics.runAppAttempt(app.getApplicationId(), user); + metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) @@ -177,12 +198,12 @@ public void testQueueAppMetricsForMultipleFailures() { // As the application has failed, framework retries the same application // based on configuration - metrics.submitAppAttempt(user); + metrics.submitAppAttempt(USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 1) .checkAgainst(queueSource, true); - metrics.runAppAttempt(app.getApplicationId(), user); + metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) @@ -197,12 +218,12 @@ public void testQueueAppMetricsForMultipleFailures() { // As the application has failed, framework retries the same application // based on configuration - metrics.submitAppAttempt(user); + metrics.submitAppAttempt(USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 1) .checkAgainst(queueSource, true); - metrics.runAppAttempt(app.getApplicationId(), user); + metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) @@ -215,7 +236,7 @@ public void testQueueAppMetricsForMultipleFailures() { .gaugeInt(APPS_RUNNING, 0) .checkAgainst(queueSource, true); - metrics.finishApp(user, RMAppState.FAILED); + metrics.finishApp(USER, RMAppState.FAILED); AppMetricsChecker.createFromChecker(appMetricsChecker) .gaugeInt(APPS_RUNNING, 0) .counter(APPS_FAILED, 1) @@ -227,15 +248,14 @@ public void testQueueAppMetricsForMultipleFailures() { @Test public void testSingleQueueWithUserMetrics() { String queueName = "single2"; - String user = "dodo"; QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, true, conf); MetricsSource queueSource = queueSource(ms, queueName); - AppSchedulingInfo app = mockApp(user); + AppSchedulingInfo app = mockApp(USER_2); - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, queueName, user); + metrics.submitApp(USER_2); + MetricsSource userSource = userSource(ms, queueName, USER_2); AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) @@ -244,7 +264,7 @@ public void testSingleQueueWithUserMetrics() { .counter(APPS_SUBMITTED, 1) .checkAgainst(userSource, true); - metrics.submitAppAttempt(user); + metrics.submitAppAttempt(USER_2); appMetricsQueueSourceChecker = AppMetricsChecker .createFromChecker(appMetricsQueueSourceChecker) .gaugeInt(APPS_PENDING, 1) @@ -257,9 +277,9 @@ public void testSingleQueueWithUserMetrics() { metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, - user, Resources.createResource(10*GB, 10)); + USER_2, Resources.createResource(10*GB, 10)); metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, - user, 5, Resources.createResource(3*GB, 3)); + USER_2, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources @@ -280,7 +300,7 @@ public void testSingleQueueWithUserMetrics() { .gaugeInt(PENDING_CONTAINERS, 5) .checkAgainst(userSource); - metrics.runAppAttempt(app.getApplicationId(), user); + metrics.runAppAttempt(app.getApplicationId(), USER_2); appMetricsQueueSourceChecker = AppMetricsChecker .createFromChecker(appMetricsQueueSourceChecker) .gaugeInt(APPS_PENDING, 0) @@ -293,7 +313,7 @@ public void testSingleQueueWithUserMetrics() { .checkAgainst(userSource, true); metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, - user, 3, Resources.createResource(2*GB, 2), true); + USER_2, 3, Resources.createResource(2*GB, 2), true); resMetricsQueueSourceChecker = ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) .gaugeLong(ALLOCATED_MB, 6 * GB) @@ -316,7 +336,7 @@ public void testSingleQueueWithUserMetrics() { .checkAgainst(userSource); metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, - user, 1, Resources.createResource(2*GB, 2)); + USER_2, 1, Resources.createResource(2*GB, 2)); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -340,7 +360,7 @@ public void testSingleQueueWithUserMetrics() { AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) .gaugeInt(APPS_RUNNING, 0) .checkAgainst(userSource, true); - metrics.finishApp(user, RMAppState.FINISHED); + metrics.finishApp(USER_2, RMAppState.FINISHED); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) .counter(APPS_COMPLETED, 1) .checkAgainst(queueSource, true); @@ -353,7 +373,6 @@ public void testSingleQueueWithUserMetrics() { public void testNodeTypeMetrics() { String parentQueueName = "root"; String leafQueueName = "root.leaf"; - String user = "alice"; QueueMetrics parentMetrics = QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); @@ -365,29 +384,29 @@ public void testNodeTypeMetrics() { MetricsSource queueSource = queueSource(ms, leafQueueName); //AppSchedulingInfo app = mockApp(user); - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, leafQueueName, user); - MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + metrics.submitApp(USER); + MetricsSource userSource = userSource(ms, leafQueueName, USER); + MetricsSource parentUserSource = userSource(ms, parentQueueName, USER); - metrics.incrNodeTypeAggregations(user, NodeType.NODE_LOCAL); + metrics.incrNodeTypeAggregations(USER, NodeType.NODE_LOCAL); checkAggregatedNodeTypes(queueSource, 1L, 0L, 0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 0L, 0L); checkAggregatedNodeTypes(userSource, 1L, 0L, 0L); checkAggregatedNodeTypes(parentUserSource, 1L, 0L, 0L); - metrics.incrNodeTypeAggregations(user, NodeType.RACK_LOCAL); + metrics.incrNodeTypeAggregations(USER, NodeType.RACK_LOCAL); checkAggregatedNodeTypes(queueSource, 1L, 1L, 0L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 0L); checkAggregatedNodeTypes(userSource, 1L, 1L, 0L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 0L); - metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH); checkAggregatedNodeTypes(queueSource, 1L, 1L, 1L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 1L); checkAggregatedNodeTypes(userSource, 1L, 1L, 1L); checkAggregatedNodeTypes(parentUserSource, 1L, 1L, 1L); - metrics.incrNodeTypeAggregations(user, NodeType.OFF_SWITCH); + metrics.incrNodeTypeAggregations(USER, NodeType.OFF_SWITCH); checkAggregatedNodeTypes(queueSource, 1L, 1L, 2L); checkAggregatedNodeTypes(parentQueueSource, 1L, 1L, 2L); checkAggregatedNodeTypes(userSource, 1L, 1L, 2L); @@ -396,67 +415,60 @@ public void testNodeTypeMetrics() { @Test public void testTwoLevelWithUserMetrics() { - String parentQueueName = "root"; - String leafQueueName = "root.leaf"; - String user = "alice"; + AppSchedulingInfo app = mockApp(USER); - QueueMetrics parentMetrics = - QueueMetrics.forQueue(ms, parentQueueName, null, true, conf); - Queue parentQueue = mock(Queue.class); - when(parentQueue.getMetrics()).thenReturn(parentMetrics); - QueueMetrics metrics = - QueueMetrics.forQueue(ms, leafQueueName, parentQueue, true, conf); - MetricsSource parentQueueSource = queueSource(ms, parentQueueName); - MetricsSource queueSource = queueSource(ms, leafQueueName); - AppSchedulingInfo app = mockApp(user); - - metrics.submitApp(user); - MetricsSource userSource = userSource(ms, leafQueueName, user); - MetricsSource parentUserSource = userSource(ms, parentQueueName, user); + QueueInfo root = new QueueInfo(null, "root", ms, conf, USER); + QueueInfo leaf = new QueueInfo(root, "root.leaf", ms, conf, USER); + leaf.queueMetrics.submitApp(USER); AppMetricsChecker appMetricsQueueSourceChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) - .checkAgainst(queueSource, true); + .checkAgainst(leaf.queueSource, true); AppMetricsChecker appMetricsParentQueueSourceChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) - .checkAgainst(parentQueueSource, true); + .checkAgainst(root.queueSource, true); AppMetricsChecker appMetricsUserSourceChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) - .checkAgainst(userSource, true); + .checkAgainst(leaf.userSource, true); AppMetricsChecker appMetricsParentUserSourceChecker = AppMetricsChecker.create() .counter(APPS_SUBMITTED, 1) - .checkAgainst(parentUserSource, true); + .checkAgainst(root.userSource, true); - metrics.submitAppAttempt(user); + leaf.queueMetrics.submitAppAttempt(USER); appMetricsQueueSourceChecker = AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) .gaugeInt(APPS_PENDING, 1) - .checkAgainst(queueSource, true); + .checkAgainst(leaf.queueSource, true); appMetricsParentQueueSourceChecker = AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) .gaugeInt(APPS_PENDING, 1) - .checkAgainst(parentQueueSource, true); + .checkAgainst(root.queueSource, true); appMetricsUserSourceChecker = AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) .gaugeInt(APPS_PENDING, 1) - .checkAgainst(userSource, true); + .checkAgainst(leaf.userSource, true); appMetricsParentUserSourceChecker = AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) .gaugeInt(APPS_PENDING, 1) - .checkAgainst(parentUserSource, true); + .checkAgainst(root.userSource, true); - parentMetrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + root.queueMetrics.setAvailableResourcesToQueue( + RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); - metrics.setAvailableResourcesToQueue(RMNodeLabelsManager.NO_LABEL, + leaf.queueMetrics.setAvailableResourcesToQueue( + RMNodeLabelsManager.NO_LABEL, Resources.createResource(100*GB, 100)); - parentMetrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, - user, Resources.createResource(10*GB, 10)); - metrics.setAvailableResourcesToUser(RMNodeLabelsManager.NO_LABEL, - user, Resources.createResource(10*GB, 10)); - metrics.incrPendingResources(RMNodeLabelsManager.NO_LABEL, - user, 5, Resources.createResource(3*GB, 3)); + root.queueMetrics.setAvailableResourcesToUser( + RMNodeLabelsManager.NO_LABEL, + USER, Resources.createResource(10*GB, 10)); + leaf.queueMetrics.setAvailableResourcesToUser( + RMNodeLabelsManager.NO_LABEL, + USER, Resources.createResource(10*GB, 10)); + leaf.queueMetrics.incrPendingResources( + RMNodeLabelsManager.NO_LABEL, + USER, 5, Resources.createResource(3*GB, 3)); ResourceMetricsChecker resMetricsQueueSourceChecker = ResourceMetricsChecker.create() @@ -465,7 +477,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(PENDING_MB, 15 * GB) .gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(queueSource); + .checkAgainst(leaf.queueSource); ResourceMetricsChecker resMetricsParentQueueSourceChecker = ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, 100 * GB) @@ -473,7 +485,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(PENDING_MB, 15 * GB) .gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(parentQueueSource); + .checkAgainst(root.queueSource); ResourceMetricsChecker resMetricsUserSourceChecker = ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, 10 * GB) @@ -481,7 +493,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(PENDING_MB, 15 * GB) .gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(userSource); + .checkAgainst(leaf.userSource); ResourceMetricsChecker resMetricsParentUserSourceChecker = ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, 10 * GB) @@ -489,24 +501,24 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(PENDING_MB, 15 * GB) .gaugeInt(PENDING_V_CORES, 15) .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(parentUserSource); + .checkAgainst(root.userSource); - metrics.runAppAttempt(app.getApplicationId(), user); + leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER); appMetricsQueueSourceChecker = AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) - .checkAgainst(queueSource, true); + .checkAgainst(leaf.queueSource, true); appMetricsUserSourceChecker = AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 1) - .checkAgainst(userSource, true); + .checkAgainst(leaf.userSource, true); - metrics.allocateResources(RMNodeLabelsManager.NO_LABEL, - user, 3, Resources.createResource(2*GB, 2), true); - metrics.reserveResource(RMNodeLabelsManager.NO_LABEL, - user, Resources.createResource(3*GB, 3)); + leaf.queueMetrics.allocateResources(RMNodeLabelsManager.NO_LABEL, + USER, 3, Resources.createResource(2*GB, 2), true); + leaf.queueMetrics.reserveResource(RMNodeLabelsManager.NO_LABEL, + USER, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources resMetricsQueueSourceChecker = @@ -521,7 +533,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 3 * GB) .gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(queueSource); + .checkAgainst(leaf.queueSource); resMetricsParentQueueSourceChecker = ResourceMetricsChecker .createFromChecker(resMetricsParentQueueSourceChecker) @@ -535,7 +547,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 3 * GB) .gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(parentQueueSource); + .checkAgainst(root.queueSource); resMetricsUserSourceChecker = ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) .gaugeLong(ALLOCATED_MB, 6 * GB) @@ -548,7 +560,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 3 * GB) .gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(userSource); + .checkAgainst(leaf.userSource); resMetricsParentUserSourceChecker = ResourceMetricsChecker .createFromChecker(resMetricsParentUserSourceChecker) .gaugeLong(ALLOCATED_MB, 6 * GB) @@ -561,12 +573,12 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 3 * GB) .gaugeInt(RESERVED_V_CORES, 3) .gaugeInt(RESERVED_CONTAINERS, 1) - .checkAgainst(parentUserSource); + .checkAgainst(root.userSource); - metrics.releaseResources(RMNodeLabelsManager.NO_LABEL, - user, 1, Resources.createResource(2*GB, 2)); - metrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, - user, Resources.createResource(3*GB, 3)); + leaf.queueMetrics.releaseResources(RMNodeLabelsManager.NO_LABEL, + USER, 1, Resources.createResource(2*GB, 2)); + leaf.queueMetrics.unreserveResource(RMNodeLabelsManager.NO_LABEL, + USER, Resources.createResource(3*GB, 3)); ResourceMetricsChecker.createFromChecker(resMetricsQueueSourceChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -575,7 +587,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_CONTAINERS, 0) - .checkAgainst(queueSource); + .checkAgainst(leaf.queueSource); ResourceMetricsChecker.createFromChecker(resMetricsParentQueueSourceChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -584,7 +596,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_CONTAINERS, 0) - .checkAgainst(parentQueueSource); + .checkAgainst(root.queueSource); ResourceMetricsChecker.createFromChecker(resMetricsUserSourceChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -593,7 +605,7 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_CONTAINERS, 0) - .checkAgainst(userSource); + .checkAgainst(leaf.userSource); ResourceMetricsChecker.createFromChecker(resMetricsParentUserSourceChecker) .gaugeLong(ALLOCATED_MB, 4 * GB) .gaugeInt(ALLOCATED_V_CORES, 4) @@ -602,46 +614,46 @@ public void testTwoLevelWithUserMetrics() { .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) .gaugeInt(RESERVED_CONTAINERS, 0) - .checkAgainst(parentUserSource); + .checkAgainst(root.userSource); - metrics.finishAppAttempt( + leaf.queueMetrics.finishAppAttempt( app.getApplicationId(), app.isPending(), app.getUser()); appMetricsQueueSourceChecker = AppMetricsChecker .createFromChecker(appMetricsQueueSourceChecker) .counter(APPS_SUBMITTED, 1) .gaugeInt(APPS_RUNNING, 0) - .checkAgainst(queueSource, true); + .checkAgainst(leaf.queueSource, true); appMetricsParentQueueSourceChecker = AppMetricsChecker .createFromChecker(appMetricsParentQueueSourceChecker) .counter(APPS_SUBMITTED, 1) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 0) - .checkAgainst(parentQueueSource, true); + .checkAgainst(root.queueSource, true); appMetricsUserSourceChecker = AppMetricsChecker .createFromChecker(appMetricsUserSourceChecker) .counter(APPS_SUBMITTED, 1) .gaugeInt(APPS_RUNNING, 0) - .checkAgainst(userSource, true); + .checkAgainst(leaf.userSource, true); appMetricsParentUserSourceChecker = AppMetricsChecker .createFromChecker(appMetricsParentUserSourceChecker) .counter(APPS_SUBMITTED, 1) .gaugeInt(APPS_PENDING, 0) .gaugeInt(APPS_RUNNING, 0) - .checkAgainst(parentUserSource, true); + .checkAgainst(root.userSource, true); - metrics.finishApp(user, RMAppState.FINISHED); + leaf.queueMetrics.finishApp(USER, RMAppState.FINISHED); AppMetricsChecker.createFromChecker(appMetricsQueueSourceChecker) .counter(APPS_COMPLETED, 1) - .checkAgainst(queueSource, true); + .checkAgainst(leaf.queueSource, true); AppMetricsChecker.createFromChecker(appMetricsParentQueueSourceChecker) .counter(APPS_COMPLETED, 1) - .checkAgainst(parentQueueSource, true); + .checkAgainst(root.queueSource, true); AppMetricsChecker.createFromChecker(appMetricsUserSourceChecker) .counter(APPS_COMPLETED, 1) - .checkAgainst(userSource, true); + .checkAgainst(leaf.userSource, true); AppMetricsChecker.createFromChecker(appMetricsParentUserSourceChecker) .counter(APPS_COMPLETED, 1) - .checkAgainst(parentUserSource, true); + .checkAgainst(root.userSource, true); } @Test @@ -719,7 +731,7 @@ private static void checkAggregatedNodeTypes(MetricsSource source, assertCounter("AggregateOffSwitchContainersAllocated", offSwitch, rb); } - private static AppSchedulingInfo mockApp(String user) { + static AppSchedulingInfo mockApp(String user) { AppSchedulingInfo app = mock(AppSchedulingInfo.class); when(app.getUser()).thenReturn(user); ApplicationId appId = BuilderUtils.newApplicationId(1, 1); @@ -732,7 +744,7 @@ public static MetricsSource queueSource(MetricsSystem ms, String queue) { return ms.getSource(QueueMetrics.sourceName(queue).toString()); } - private static MetricsSource userSource(MetricsSystem ms, String queue, + public static MetricsSource userSource(MetricsSystem ms, String queue, String user) { return ms.getSource(QueueMetrics.sourceName(queue). append(",user=").append(user).toString()); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java new file mode 100644 index 0000000000..76a98490c2 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java @@ -0,0 +1,645 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + +import com.google.common.collect.ImmutableMap; +import com.google.common.collect.Maps; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.impl.MetricsSystemImpl; +import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; +import org.apache.hadoop.yarn.conf.YarnConfiguration; +import org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper; +import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; + + +import org.apache.hadoop.yarn.server.resourcemanager.scheduler + .QueueMetricsForCustomResources.QueueMetricsCustomResource; +import org.apache.hadoop.yarn.util.resource.ResourceUtils; +import org.junit.Before; +import org.junit.Test; + +import java.util.Collections; +import java.util.HashMap; +import java.util.Map; +import java.util.function.BiFunction; +import java.util.function.Function; + +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB; +import static org.apache.hadoop.yarn.conf.YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES; +import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper + .extractCustomResourcesAsStrings; +import static org.apache.hadoop.yarn.resourcetypes.ResourceTypesTestHelper.newResource; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_ALLOCATED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_CONTAINERS_RELEASED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_VCORE_SECONDS_PREEMPTED; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_V_CORES; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.PENDING_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.PENDING_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler + .ResourceMetricsChecker.ResourceMetricsKey.PENDING_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource; +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertNotNull; + +public class TestQueueMetricsForCustomResources { + public enum MetricsForCustomResource { + ALLOCATED, AVAILABLE, PENDING, RESERVED, AGGREGATE_PREEMPTED_SECONDS + } + + public static final long GB = 1024; // MB + private static final Configuration CONF = new Configuration(); + private static final String CUSTOM_RES_1 = "custom_res_1"; + private static final String CUSTOM_RES_2 = "custom_res_2"; + public static final String USER = "alice"; + private Resource defaultResource; + private MetricsSystem ms; + + @Before + public void setUp() { + ms = new MetricsSystemImpl(); + QueueMetrics.clearQueueMetrics(); + initializeResourceTypes(); + createDefaultResource(); + } + + private void createDefaultResource() { + defaultResource = newResource(4 * GB, 4, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(15 * GB)) + .put(CUSTOM_RES_2, String.valueOf(20 * GB)) + .build()); + } + + private void initializeResourceTypes() { + Map riMap = new HashMap<>(); + + ResourceInformation memory = ResourceInformation.newInstance( + ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = ResourceInformation.newInstance( + ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + ResourceInformation res1 = ResourceInformation.newInstance(CUSTOM_RES_1, + ResourceInformation.VCORES.getUnits(), 0, 2000); + ResourceInformation res2 = ResourceInformation.newInstance(CUSTOM_RES_2, + ResourceInformation.VCORES.getUnits(), 0, 2000); + + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + riMap.put(CUSTOM_RES_1, res1); + riMap.put(CUSTOM_RES_2, res2); + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + } + + private static void assertCustomResourceValue(QueueMetrics metrics, + MetricsForCustomResource metricsType, + Function func, + String resourceName, + long expectedValue) { + Resource res = func.apply(metrics); + Long value = res.getResourceValue(resourceName); + assertCustomResourceValueInternal(metricsType, resourceName, + expectedValue, value); + } + + private static void assertCustomResourceValueInternal( + MetricsForCustomResource metricsType, String resourceName, long + expectedValue, Long value) { + assertNotNull( + "QueueMetrics should have custom resource metrics value " + + "for resource: " + resourceName, value); + assertEquals(String.format( + "QueueMetrics should have custom resource metrics value %d " + + "for resource: %s for metrics type %s", + expectedValue, resourceName, metricsType), expectedValue, + (long) value); + } + + private static Map getCustomResourcesWithValue(long value) { + return ImmutableMap.builder() + .put(CUSTOM_RES_1, String.valueOf(value)) + .put(CUSTOM_RES_2, String.valueOf(value)) + .build(); + } + + private QueueInfo createFourLevelQueueHierarchy() { + QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER); + QueueInfo sub = new QueueInfo(root, "root.subQ", ms, CONF, USER); + QueueInfo sub2 = new QueueInfo(sub, "root.subQ2", ms, CONF, USER); + return new QueueInfo(sub2, "root.subQ2.leafQ", ms, CONF, USER); + } + + private QueueInfo createBasicQueueHierarchy() { + QueueInfo root = new QueueInfo(null, "root", ms, CONF, USER); + return new QueueInfo(root, "root.leaf", ms, CONF, USER); + } + + private QueueMetricsTestData.Builder + createQueueMetricsTestDataWithContainers(int containers) { + return createDefaultQueueMetricsTestData() + .withContainers(containers); + } + + private QueueMetricsTestData.Builder createDefaultQueueMetricsTestData() { + return QueueMetricsTestData.Builder.create() + .withUser(USER) + .withPartition(RMNodeLabelsManager.NO_LABEL); + } + + private void testIncreasePendingResources(QueueMetricsTestData testData) { + testIncreasePendingResourcesInternal(testData.containers, testData); + } + + private void testIncreasePendingResourcesWithoutContainer( + QueueMetricsTestData testData) { + testIncreasePendingResourcesInternal(1, testData); + } + + private void testIncreasePendingResourcesInternal(int containers, + QueueMetricsTestData testData) { + testData.leafQueue.queueMetrics.incrPendingResources(testData.partition, + testData.user, containers, testData.resource); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .gaugeInt(PENDING_CONTAINERS, containers) + .gaugeLong(PENDING_MB, containers * + testData.resource.getMemorySize()) + .gaugeInt(PENDING_V_CORES, containers * + testData.resource.getVirtualCores()); + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getPendingResources, + MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> v * containers)); + } + + private void testAllocateResources(boolean decreasePending, + QueueMetricsTestData testData) { + testData.leafQueue.queueMetrics.allocateResources(testData.partition, + testData.user, testData.containers, testData.resource, decreasePending); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .gaugeInt(ALLOCATED_CONTAINERS, testData.containers) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, testData.containers) + .gaugeLong(ALLOCATED_MB, testData.containers * + testData.resource.getMemorySize()) + .gaugeInt(ALLOCATED_V_CORES, testData.containers * + testData.resource.getVirtualCores()) + .gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0) + .checkAgainst(testData.leafQueue.queueSource); + if (decreasePending) { + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getPendingResources, + MetricsForCustomResource.PENDING, + computeExpectedCustomResourceValues(testData.customResourceValues, + (k, v) -> 0L)); + } + if (!testData.customResourceValues.isEmpty()) { + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getAllocatedResources, + MetricsForCustomResource.ALLOCATED, + computeExpectedCustomResourceValues(testData.customResourceValues, + (k, v) -> v * testData.containers)); + } + } + + private void testUpdatePreemptedSeconds(QueueMetricsTestData testData, + int seconds) { + testData.leafQueue.queueMetrics.updatePreemptedMemoryMBSeconds( + testData.resource.getMemorySize() * seconds); + testData.leafQueue.queueMetrics.updatePreemptedVcoreSeconds( + testData.resource.getVirtualCores() * seconds); + testData.leafQueue.queueMetrics.updatePreemptedSecondsForCustomResources( + testData.resource, seconds); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED, + testData.resource.getMemorySize() * seconds) + .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED, + testData.resource.getVirtualCores() * seconds); + + assertQueueMetricsOnly(testData.leafQueue, checker, + this::convertPreemptedSecondsToResource, + MetricsForCustomResource.AGGREGATE_PREEMPTED_SECONDS, + computeExpectedCustomResourceValues(testData.customResourceValues, + (k, v) -> v * seconds)); + } + + private Resource convertPreemptedSecondsToResource(QueueMetrics qm) { + QueueMetricsCustomResource customValues = qm + .getAggregatedPreemptedSecondsResources(); + MutableCounterLong vcoreSeconds = qm + .getAggregateVcoreSecondsPreempted(); + MutableCounterLong memorySeconds = qm + .getAggregateMemoryMBSecondsPreempted(); + return Resource.newInstance( + memorySeconds.value(), (int) vcoreSeconds.value(), + customValues.getValues()); + } + + private void testReserveResources(QueueMetricsTestData testData) { + testData.leafQueue.queueMetrics.reserveResource(testData.partition, + testData.user, testData.resource); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .gaugeInt(RESERVED_CONTAINERS, 1) + .gaugeLong(RESERVED_MB, testData.resource.getMemorySize()) + .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores()) + .checkAgainst(testData.leafQueue.queueSource); + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getReservedResources, + MetricsForCustomResource.RESERVED, + computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> v)); + } + + private void testGetAllocatedResources(QueueMetricsTestData testData) { + testAllocateResources(false, testData); + + Resource res = testData.leafQueue.queueMetrics.getAllocatedResources(); + if (testData.customResourceValues.size() > 0) { + assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED, + CUSTOM_RES_1, + testData.customResourceValues.get(CUSTOM_RES_1) * testData.containers, + res.getResourceValue(CUSTOM_RES_1)); + assertCustomResourceValueInternal(MetricsForCustomResource.ALLOCATED, + CUSTOM_RES_2, + testData.customResourceValues.get(CUSTOM_RES_2) * testData.containers, + res.getResourceValue(CUSTOM_RES_2)); + } + } + + private void assertAllMetrics(QueueInfo queueInfo, + ResourceMetricsChecker checker, + Function func, + MetricsForCustomResource metricsType, + Map expectedCustomResourceValues) { + assertAllQueueMetrics(queueInfo, checker, func, metricsType, + expectedCustomResourceValues); + + //assert leaf and root userSources + checker = ResourceMetricsChecker.createFromChecker(checker) + .checkAgainst(queueInfo.userSource); + ResourceMetricsChecker.createFromChecker(checker) + .checkAgainst(queueInfo.getRoot().userSource); + } + + private void assertQueueMetricsOnly(QueueInfo queueInfo, + ResourceMetricsChecker checker, + Function func, + MetricsForCustomResource metricsType, + Map expectedCustomResourceValues) { + assertAllQueueMetrics(queueInfo, checker, func, metricsType, + expectedCustomResourceValues); + } + + private void assertAllQueueMetrics(QueueInfo queueInfo, + ResourceMetricsChecker checker, + Function func, + MetricsForCustomResource metricsType, + Map expectedCustomResourceValues) { + // assert normal resource metrics values + queueInfo.checkAllQueueSources(qs -> ResourceMetricsChecker + .createFromChecker(checker).checkAgainst(qs)); + + // assert custom resource metrics values + queueInfo.checkAllQueueMetrics(qm -> { + assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_1, + expectedCustomResourceValues.get(CUSTOM_RES_1)); + assertCustomResourceValue(qm, metricsType, func, CUSTOM_RES_2, + expectedCustomResourceValues.get(CUSTOM_RES_2)); + }); + } + + private Map computeExpectedCustomResourceValues( + Map customResourceValues, + BiFunction func) { + Map values = Maps.newHashMap(); + for (Map.Entry res : customResourceValues.entrySet()) { + values.put(res.getKey(), func.apply(res.getKey(), res.getValue())); + } + return values; + } + + @Test + public void testSetAvailableResourcesToQueue1() { + String queueName = "single"; + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, + false, CONF); + MetricsSource queueSource = queueSource(ms, queueName); + + metrics.setAvailableResourcesToQueue(newResource( + GB, 4, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(5 * GB)) + .put(CUSTOM_RES_2, String.valueOf(6 * GB)) + .build())); + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, GB) + .gaugeInt(AVAILABLE_V_CORES, 4) + .checkAgainst(queueSource); + + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, + QueueMetrics::getAvailableResources, CUSTOM_RES_1, 5 * GB); + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, + QueueMetrics::getAvailableResources, CUSTOM_RES_2, 6 * GB); + } + + @Test + public void testSetAvailableResourcesToQueue2() { + String queueName = "single"; + QueueMetrics metrics = QueueMetrics.forQueue(ms, queueName, null, + false, CONF); + MetricsSource queueSource = queueSource(ms, queueName); + + metrics.setAvailableResourcesToQueue(null, + newResource(GB, 4, + ImmutableMap. builder() + .put(CUSTOM_RES_1, String.valueOf(15 * GB)) + .put(CUSTOM_RES_2, String.valueOf(20 * GB)) + .build())); + ResourceMetricsChecker.create() + .gaugeLong(AVAILABLE_MB, GB) + .gaugeInt(AVAILABLE_V_CORES, 4) + .checkAgainst(queueSource); + + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, + QueueMetrics::getAvailableResources, CUSTOM_RES_1, 15 * GB); + assertCustomResourceValue(metrics, + MetricsForCustomResource.AVAILABLE, + QueueMetrics::getAvailableResources, CUSTOM_RES_2, 20 * GB); + } + + @Test + public void testIncreasePendingResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResourceToDecrease( + newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2) + .withResources(defaultResource) + .build(); + + testIncreasePendingResources(testData); + } + + @Test + public void testDecreasePendingResources() { + Resource resourceToDecrease = + newResource(GB, 2, getCustomResourcesWithValue(2 * GB)); + int containersToDecrease = 2; + int containers = 5; + QueueMetricsTestData testData = + createQueueMetricsTestDataWithContainers(containers) + .withLeafQueue(createBasicQueueHierarchy()) + .withResourceToDecrease(resourceToDecrease, containers) + .withResources(defaultResource) + .build(); + + //compute expected values + final int vCoresToDecrease = resourceToDecrease.getVirtualCores(); + final long memoryMBToDecrease = resourceToDecrease.getMemorySize(); + final int containersAfterDecrease = containers - containersToDecrease; + final int vcoresAfterDecrease = + (defaultResource.getVirtualCores() * containers) + - (vCoresToDecrease * containersToDecrease); + final long memoryAfterDecrease = + (defaultResource.getMemorySize() * containers) + - (memoryMBToDecrease * containersToDecrease); + + //first, increase resources to be able to decrease some + testIncreasePendingResources(testData); + + //decrease resources + testData.leafQueue.queueMetrics.decrPendingResources(testData.partition, + testData.user, containersToDecrease, + ResourceTypesTestHelper.newResource(memoryMBToDecrease, + vCoresToDecrease, + extractCustomResourcesAsStrings(resourceToDecrease))); + + //check + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease) + .gaugeLong(PENDING_MB, memoryAfterDecrease) + .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease) + .checkAgainst(testData.leafQueue.queueSource); + + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getPendingResources, + MetricsForCustomResource.PENDING, + computeExpectedCustomResourceValues(testData.customResourceValues, + (k, v) -> v * containers - (resourceToDecrease.getResourceValue(k) + * containersToDecrease))); + } + + @Test + public void testAllocateResourcesWithoutDecreasePending() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResources(defaultResource) + .build(); + + testAllocateResources(false, testData); + } + + @Test + public void testAllocateResourcesWithDecreasePending() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResourceToDecrease( + newResource(GB, 2, getCustomResourcesWithValue(2 * GB)), 2) + .withResources(defaultResource) + .build(); + + //first, increase pending resources to be able to decrease some + testIncreasePendingResources(testData); + + //then allocate with decrease pending resources + testAllocateResources(true, testData); + } + + @Test + public void testAllocateResourcesWithoutContainer() { + QueueMetricsTestData testData = createDefaultQueueMetricsTestData() + .withLeafQueue(createBasicQueueHierarchy()) + .withResources(defaultResource) + .build(); + + //first, increase pending resources + testIncreasePendingResourcesWithoutContainer(testData); + + Resource resource = testData.resource; + testData.leafQueue.queueMetrics.allocateResources(testData.partition, + testData.user, resource); + + ResourceMetricsChecker checker = ResourceMetricsChecker.create() + .gaugeLong(ALLOCATED_MB, resource.getMemorySize()) + .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores()) + .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0); + + checker.checkAgainst(testData.leafQueue.queueSource); + checker.checkAgainst(testData.leafQueue.getRoot().queueSource); + + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getPendingResources, + MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> 0L)); + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getAllocatedResources, + MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> v)); + } + + @Test + public void testReleaseResources() { + int containers = 5; + QueueMetricsTestData testData = + createQueueMetricsTestDataWithContainers(containers) + .withLeafQueue(createBasicQueueHierarchy()) + .withResourceToDecrease(defaultResource, containers) + .withResources(defaultResource) + .build(); + + //first, allocate some resources so that we can release some + testAllocateResources(false, testData); + + testData.leafQueue.queueMetrics.releaseResources(testData.partition, + testData.user, containers, defaultResource); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .counter(AGGREGATE_CONTAINERS_ALLOCATED, containers) + .counter(AGGREGATE_CONTAINERS_RELEASED, containers) + .checkAgainst(testData.leafQueue.queueSource); + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getAllocatedResources, + MetricsForCustomResource.ALLOCATED, computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> 0L)); + } + + @Test + public void testUpdatePreemptedSecondsForCustomResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createFourLevelQueueHierarchy()) + .withResources(defaultResource) + .build(); + + final int seconds = 1; + testUpdatePreemptedSeconds(testData, seconds); + } + + @Test + public void testUpdatePreemptedSecondsForCustomResourcesMoreSeconds() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createFourLevelQueueHierarchy()) + .withResources(defaultResource) + .build(); + + final int seconds = 15; + testUpdatePreemptedSeconds(testData, seconds); + } + + @Test + public void testReserveResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResources(defaultResource) + .build(); + + testReserveResources(testData); + } + + @Test + public void testUnreserveResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResources(defaultResource) + .build(); + + testReserveResources(testData); + + testData.leafQueue.queueMetrics.unreserveResource(testData.partition, + testData.user, defaultResource); + + ResourceMetricsChecker checker = ResourceMetricsChecker + .create() + .gaugeInt(RESERVED_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0) + .gaugeInt(RESERVED_V_CORES, 0) + .checkAgainst(testData.leafQueue.queueSource); + assertAllMetrics(testData.leafQueue, checker, + QueueMetrics::getReservedResources, + MetricsForCustomResource.RESERVED, computeExpectedCustomResourceValues( + testData.customResourceValues, (k, v) -> 0L)); + } + + @Test + public void testGetAllocatedResourcesWithCustomResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withLeafQueue(createBasicQueueHierarchy()) + .withResources(defaultResource) + .build(); + + testGetAllocatedResources(testData); + } + + @Test + public void testGetAllocatedResourcesWithoutCustomResources() { + QueueMetricsTestData testData = createQueueMetricsTestDataWithContainers(5) + .withResources(newResource(4 * GB, 4, Collections.emptyMap())) + .withLeafQueue(createBasicQueueHierarchy()) + .build(); + + testGetAllocatedResources(testData); + } + +}