From a5034c7988b6bf54bb7dab208100a2d205e3929e Mon Sep 17 00:00:00 2001 From: Eric E Payne Date: Wed, 16 Oct 2019 21:10:08 +0000 Subject: [PATCH] YARN-9773: Add QueueMetrics for Custom Resources. Contributed by Manikandan R. --- .../scheduler/QueueMetrics.java | 112 +++++++++++++++++- .../scheduler/ResourceMetricsChecker.java | 67 ++++++++--- .../scheduler/TestQueueMetrics.java | 60 ++++------ .../TestQueueMetricsForCustomResources.java | 57 ++++++++- 4 files changed, 240 insertions(+), 56 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index ed5f5464e2..8f60c6b48c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -23,6 +23,7 @@ import java.util.ArrayList; import java.util.HashMap; import java.util.Map; +import java.util.Map.Entry; import com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.classification.InterfaceAudience; @@ -43,6 +44,7 @@ import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; +import org.apache.hadoop.yarn.api.records.ResourceInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; @@ -121,6 +123,31 @@ public class QueueMetrics implements MetricsSource { protected final Configuration conf; private QueueMetricsForCustomResources queueMetricsForCustomResources; + private static final String ALLOCATED_RESOURCE_METRIC_PREFIX = + "AllocatedResource."; + private static final String ALLOCATED_RESOURCE_METRIC_DESC = + "Allocated NAME"; + + private static final String AVAILABLE_RESOURCE_METRIC_PREFIX = + "AvailableResource."; + private static final String AVAILABLE_RESOURCE_METRIC_DESC = + "Available NAME"; + + private static final String PENDING_RESOURCE_METRIC_PREFIX = + "PendingResource."; + private static final String PENDING_RESOURCE_METRIC_DESC = + "Pending NAME"; + + private static final String RESERVED_RESOURCE_METRIC_PREFIX = + "ReservedResource."; + private static final String RESERVED_RESOURCE_METRIC_DESC = + "Reserved NAME"; + + private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX = + "AggregatePreemptedSeconds."; + private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = + "Aggregate Preempted Seconds for NAME"; + protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { registry = new MetricsRegistry(RECORD_INFO); @@ -135,6 +162,7 @@ protected QueueMetrics(MetricsSystem ms, String queueName, Queue parent, if (ResourceUtils.getNumberOfKnownResourceTypes() > 2) { this.queueMetricsForCustomResources = new QueueMetricsForCustomResources(); + registerCustomResources(); } } @@ -366,6 +394,9 @@ public void setAvailableResourcesToQueue(String partition, Resource limit) { availableVCores.set(limit.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.setAvailable(limit); + registerCustomResources( + queueMetricsForCustomResources.getAvailableValues(), + AVAILABLE_RESOURCE_METRIC_PREFIX, AVAILABLE_RESOURCE_METRIC_DESC); } } } @@ -418,16 +449,67 @@ public void incrPendingResources(String partition, String user, } } + /** + * Register all custom resources metrics as part of initialization. As and + * when this metric object construction happens for any queue, all custom + * resource metrics value would be initialized with '0' like any other + * mandatory resources metrics + */ + private void registerCustomResources() { + Map customResources = + new HashMap(); + ResourceInformation[] resources = + ResourceUtils.getResourceTypesArray(); + + for (int i = + 2; i < resources.length; i++) { + ResourceInformation resource = + resources[i]; + customResources.put(resource.getName(), new Long(0)); + } + + registerCustomResources(customResources, ALLOCATED_RESOURCE_METRIC_PREFIX, + ALLOCATED_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, AVAILABLE_RESOURCE_METRIC_PREFIX, + AVAILABLE_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, PENDING_RESOURCE_METRIC_PREFIX, + PENDING_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, RESERVED_RESOURCE_METRIC_PREFIX, + RESERVED_RESOURCE_METRIC_DESC); + registerCustomResources(customResources, + AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, + AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); + } + + private void registerCustomResources(Map customResources, + String metricPrefix, String metricDesc) { + for (Entry entry : customResources.entrySet()) { + String resourceName = entry.getKey(); + Long resourceValue = entry.getValue(); + + MutableGaugeLong resourceMetric = + (MutableGaugeLong) this.registry.get(metricPrefix + resourceName); + + if (resourceMetric == null) { + resourceMetric = + this.registry.newGauge(metricPrefix + resourceName, + metricDesc.replace("NAME", resourceName), 0L); + } + resourceMetric.set(resourceValue); + } + } + private void _incrPendingResources(int containers, Resource res) { pendingContainers.incr(containers); pendingMB.incr(res.getMemorySize() * containers); pendingVCores.incr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increasePending(res, containers); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } } - public void decrPendingResources(String partition, String user, int containers, Resource res) { if (partition == null || partition.equals(RMNodeLabelsManager.NO_LABEL)) { @@ -448,6 +530,8 @@ private void _decrPendingResources(int containers, Resource res) { pendingVCores.decr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreasePending(res, containers); + registerCustomResources(queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } } @@ -480,6 +564,9 @@ public void allocateResources(String partition, String user, allocatedVCores.incr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } if (decrPending) { @@ -508,12 +595,18 @@ public void allocateResources(String partition, String user, Resource res) { allocatedVCores.incr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } pendingMB.decr(res.getMemorySize()); pendingVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreasePending(res); + registerCustomResources( + queueMetricsForCustomResources.getPendingValues(), + PENDING_RESOURCE_METRIC_PREFIX, PENDING_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -535,6 +628,9 @@ public void releaseResources(String partition, allocatedVCores.decr(res.getVirtualCores() * containers); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseAllocated(res, containers); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -558,6 +654,9 @@ private void releaseResources(String user, Resource res) { allocatedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseAllocated(res); + registerCustomResources( + queueMetricsForCustomResources.getAllocatedValues(), + ALLOCATED_RESOURCE_METRIC_PREFIX, ALLOCATED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); @@ -612,6 +711,11 @@ public void updatePreemptedSecondsForCustomResources(Resource res, if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources .increaseAggregatedPreemptedSeconds(res, seconds); + registerCustomResources( + queueMetricsForCustomResources.getAggregatePreemptedSeconds() + .getValues(), + AGGREGATE_PREEMPTED_SECONDS_METRIC_PREFIX, + AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC); } if (parent != null) { parent.updatePreemptedSecondsForCustomResources(res, seconds); @@ -630,6 +734,9 @@ public void reserveResource(String user, Resource res) { reservedVCores.incr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.increaseReserved(res); + registerCustomResources( + queueMetricsForCustomResources.getReservedValues(), + RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { @@ -646,6 +753,9 @@ private void unreserveResource(String user, Resource res) { reservedVCores.decr(res.getVirtualCores()); if (queueMetricsForCustomResources != null) { queueMetricsForCustomResources.decreaseReserved(res); + registerCustomResources( + queueMetricsForCustomResources.getReservedValues(), + RESERVED_RESOURCE_METRIC_PREFIX, RESERVED_RESOURCE_METRIC_DESC); } QueueMetrics userMetrics = getUserMetrics(user); if (userMetrics != null) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java index 05341aab10..b49b125a97 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/ResourceMetricsChecker.java @@ -43,6 +43,16 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2; final class ResourceMetricsChecker { private final static Logger LOG = @@ -52,21 +62,33 @@ enum ResourceMetricType { GAUGE_INT, GAUGE_LONG, COUNTER_INT, COUNTER_LONG } + private static final ResourceMetricsChecker INITIAL_MANDATORY_RES_CHECKER = + new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0) + .gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) + .counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0) + .gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0); + private static final ResourceMetricsChecker INITIAL_CHECKER = - new ResourceMetricsChecker() - .gaugeLong(ALLOCATED_MB, 0) - .gaugeInt(ALLOCATED_V_CORES, 0) - .gaugeInt(ALLOCATED_CONTAINERS, 0) - .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) - .counter(AGGREGATE_CONTAINERS_RELEASED, 0) - .gaugeLong(AVAILABLE_MB, 0) - .gaugeInt(AVAILABLE_V_CORES, 0) - .gaugeLong(PENDING_MB, 0) - .gaugeInt(PENDING_V_CORES, 0) - .gaugeInt(PENDING_CONTAINERS, 0) - .gaugeLong(RESERVED_MB, 0) - .gaugeInt(RESERVED_V_CORES, 0) - .gaugeInt(RESERVED_CONTAINERS, 0); + new ResourceMetricsChecker().gaugeLong(ALLOCATED_MB, 0) + .gaugeInt(ALLOCATED_V_CORES, 0).gaugeInt(ALLOCATED_CONTAINERS, 0) + .counter(AGGREGATE_CONTAINERS_ALLOCATED, 0) + .counter(AGGREGATE_CONTAINERS_RELEASED, 0).gaugeLong(AVAILABLE_MB, 0) + .gaugeInt(AVAILABLE_V_CORES, 0).gaugeLong(PENDING_MB, 0) + .gaugeInt(PENDING_V_CORES, 0).gaugeInt(PENDING_CONTAINERS, 0) + .gaugeLong(RESERVED_MB, 0).gaugeInt(RESERVED_V_CORES, 0) + .gaugeInt(RESERVED_CONTAINERS, 0).gaugeLong(ALLOCATED_CUSTOM_RES1, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES2, 0).gaugeLong(AVAILABLE_CUSTOM_RES1, 0) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 0).gaugeLong(PENDING_CUSTOM_RES1, 0) + .gaugeLong(PENDING_CUSTOM_RES2, 0).gaugeLong(RESERVED_CUSTOM_RES1, 0) + .gaugeLong(RESERVED_CUSTOM_RES2, 0) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1, 0) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2, 0); + + enum ResourceMetricsKey { ALLOCATED_MB("AllocatedMB", GAUGE_LONG), @@ -87,7 +109,18 @@ enum ResourceMetricsKey { AGGREGATE_VCORE_SECONDS_PREEMPTED( "AggregateVcoreSecondsPreempted", COUNTER_LONG), AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED( - "AggregateMemoryMBSecondsPreempted", COUNTER_LONG); + "AggregateMemoryMBSecondsPreempted", COUNTER_LONG), + ALLOCATED_CUSTOM_RES1("AllocatedResource.custom_res_1", GAUGE_LONG), + ALLOCATED_CUSTOM_RES2("AllocatedResource.custom_res_2", GAUGE_LONG), + AVAILABLE_CUSTOM_RES1("AvailableResource.custom_res_1", GAUGE_LONG), + AVAILABLE_CUSTOM_RES2("AvailableResource.custom_res_2", GAUGE_LONG), + PENDING_CUSTOM_RES1("PendingResource.custom_res_1",GAUGE_LONG), + PENDING_CUSTOM_RES2("PendingResource.custom_res_2",GAUGE_LONG), + RESERVED_CUSTOM_RES1("ReservedResource.custom_res_1",GAUGE_LONG), + RESERVED_CUSTOM_RES2("ReservedResource.custom_res_2", GAUGE_LONG), + AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1("AggregatePreemptedSeconds.custom_res_1", GAUGE_LONG), + AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2("AggregatePreemptedSeconds.custom_res_2", GAUGE_LONG); + private String value; private ResourceMetricType type; @@ -131,6 +164,10 @@ public static ResourceMetricsChecker create() { return new ResourceMetricsChecker(INITIAL_CHECKER); } + public static ResourceMetricsChecker createMandatoryResourceChecker() { + return new ResourceMetricsChecker(INITIAL_MANDATORY_RES_CHECKER); + } + ResourceMetricsChecker gaugeLong(ResourceMetricsKey key, long value) { ensureTypeIsCorrect(key, GAUGE_LONG); gaugesLong.put(key, value); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java index 2066f607c5..33c39290de 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetrics.java @@ -105,13 +105,11 @@ public void testDefaultSingleQueueMetrics() { USER, 5, Resources.createResource(3*GB, 3)); // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources - ResourceMetricsChecker rmChecker = ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(queueSource); + ResourceMetricsChecker rmChecker = + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(queueSource); metrics.runAppAttempt(app.getApplicationId(), USER); appMetricsChecker = AppMetricsChecker.createFromChecker(appMetricsChecker) @@ -284,7 +282,7 @@ public void testSingleQueueWithUserMetrics() { // Available resources is set externally, as it depends on dynamic // configurable cluster/queue resources ResourceMetricsChecker resMetricsQueueSourceChecker = - ResourceMetricsChecker.create() + ResourceMetricsChecker.createMandatoryResourceChecker() .gaugeLong(AVAILABLE_MB, 100 * GB) .gaugeInt(AVAILABLE_V_CORES, 100) .gaugeLong(PENDING_MB, 15 * GB) @@ -292,7 +290,7 @@ public void testSingleQueueWithUserMetrics() { .gaugeInt(PENDING_CONTAINERS, 5) .checkAgainst(queueSource); ResourceMetricsChecker resMetricsUserSourceChecker = - ResourceMetricsChecker.create() + ResourceMetricsChecker.createMandatoryResourceChecker() .gaugeLong(AVAILABLE_MB, 10 * GB) .gaugeInt(AVAILABLE_V_CORES, 10) .gaugeLong(PENDING_MB, 15 * GB) @@ -471,37 +469,25 @@ public void testTwoLevelWithUserMetrics() { USER, 5, Resources.createResource(3*GB, 3)); ResourceMetricsChecker resMetricsQueueSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(leaf.queueSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.queueSource); ResourceMetricsChecker resMetricsParentQueueSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 100 * GB) - .gaugeInt(AVAILABLE_V_CORES, 100) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(root.queueSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 100 * GB).gaugeInt(AVAILABLE_V_CORES, 100) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.queueSource); ResourceMetricsChecker resMetricsUserSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 10 * GB) - .gaugeInt(AVAILABLE_V_CORES, 10) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(leaf.userSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(leaf.userSource); ResourceMetricsChecker resMetricsParentUserSourceChecker = - ResourceMetricsChecker.create() - .gaugeLong(AVAILABLE_MB, 10 * GB) - .gaugeInt(AVAILABLE_V_CORES, 10) - .gaugeLong(PENDING_MB, 15 * GB) - .gaugeInt(PENDING_V_CORES, 15) - .gaugeInt(PENDING_CONTAINERS, 5) - .checkAgainst(root.userSource); + ResourceMetricsChecker.createMandatoryResourceChecker() + .gaugeLong(AVAILABLE_MB, 10 * GB).gaugeInt(AVAILABLE_V_CORES, 10) + .gaugeLong(PENDING_MB, 15 * GB).gaugeInt(PENDING_V_CORES, 15) + .gaugeInt(PENDING_CONTAINERS, 5).checkAgainst(root.userSource); leaf.queueMetrics.runAppAttempt(app.getApplicationId(), USER); appMetricsQueueSourceChecker = diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java index 0758afaeb9..a18f2d2faa 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestQueueMetricsForCustomResources.java @@ -71,6 +71,17 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CONTAINERS; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_MB; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_V_CORES; + +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.ALLOCATED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AVAILABLE_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.PENDING_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.RESERVED_CUSTOM_RES2; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceMetricsChecker.ResourceMetricsKey.AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.TestQueueMetrics.queueSource; import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertNotNull; @@ -204,7 +215,11 @@ private void testIncreasePendingResourcesInternal(int containers, .gaugeLong(PENDING_MB, containers * testData.resource.getMemorySize()) .gaugeInt(PENDING_V_CORES, containers * - testData.resource.getVirtualCores()); + testData.resource.getVirtualCores()) + .gaugeLong(PENDING_CUSTOM_RES1, + containers * testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(PENDING_CUSTOM_RES2, + containers * testData.customResourceValues.get(CUSTOM_RES_2)); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getPendingResources, MetricsForCustomResource.PENDING, computeExpectedCustomResourceValues( @@ -227,6 +242,12 @@ private void testAllocateResources(boolean decreasePending, .gaugeInt(PENDING_CONTAINERS, 0) .gaugeLong(PENDING_MB, 0) .gaugeInt(PENDING_V_CORES, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES1, + testData.containers + * testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(ALLOCATED_CUSTOM_RES2, + testData.containers + * testData.customResourceValues.get(CUSTOM_RES_2)) .checkAgainst(testData.leafQueue.queueSource); if (decreasePending) { assertAllMetrics(testData.leafQueue, checker, @@ -258,7 +279,11 @@ private void testUpdatePreemptedSeconds(QueueMetricsTestData testData, .counter(AGGREGATE_MEMORY_MB_SECONDS_PREEMPTED, testData.resource.getMemorySize() * seconds) .counter(AGGREGATE_VCORE_SECONDS_PREEMPTED, - testData.resource.getVirtualCores() * seconds); + testData.resource.getVirtualCores() * seconds) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1) * seconds) + .gaugeLong(AGGREGATE_PREEMPTED_SECONDS_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2) * seconds); assertQueueMetricsOnly(testData.leafQueue, checker, this::convertPreemptedSecondsToResource, @@ -288,6 +313,10 @@ private void testReserveResources(QueueMetricsTestData testData) { .gaugeInt(RESERVED_CONTAINERS, 1) .gaugeLong(RESERVED_MB, testData.resource.getMemorySize()) .gaugeInt(RESERVED_V_CORES, testData.resource.getVirtualCores()) + .gaugeLong(RESERVED_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(RESERVED_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2)) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getReservedResources, @@ -380,6 +409,8 @@ ImmutableMap. builder() ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, GB) .gaugeInt(AVAILABLE_V_CORES, 4) + .gaugeLong(AVAILABLE_CUSTOM_RES1, 5 * GB) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 6 * GB) .checkAgainst(queueSource); assertCustomResourceValue(metrics, @@ -406,6 +437,8 @@ ImmutableMap. builder() ResourceMetricsChecker.create() .gaugeLong(AVAILABLE_MB, GB) .gaugeInt(AVAILABLE_V_CORES, 4) + .gaugeLong(AVAILABLE_CUSTOM_RES1, 15 * GB) + .gaugeLong(AVAILABLE_CUSTOM_RES2, 20 * GB) .checkAgainst(queueSource); assertCustomResourceValue(metrics, @@ -445,12 +478,23 @@ public void testDecreasePendingResources() { final int vCoresToDecrease = resourceToDecrease.getVirtualCores(); final long memoryMBToDecrease = resourceToDecrease.getMemorySize(); final int containersAfterDecrease = containers - containersToDecrease; + final long customRes1ToDecrease = + resourceToDecrease.getResourceValue(CUSTOM_RES_1); + final long customRes2ToDecrease = + resourceToDecrease.getResourceValue(CUSTOM_RES_2); + final int vcoresAfterDecrease = (defaultResource.getVirtualCores() * containers) - (vCoresToDecrease * containersToDecrease); final long memoryAfterDecrease = (defaultResource.getMemorySize() * containers) - (memoryMBToDecrease * containersToDecrease); + final long customResource1AfterDecrease = + (testData.customResourceValues.get(CUSTOM_RES_1) * containers) + - (customRes1ToDecrease * containersToDecrease); + final long customResource2AfterDecrease = + (testData.customResourceValues.get(CUSTOM_RES_2) * containers) + - (customRes2ToDecrease * containersToDecrease); //first, increase resources to be able to decrease some testIncreasePendingResources(testData); @@ -468,6 +512,8 @@ public void testDecreasePendingResources() { .gaugeInt(PENDING_CONTAINERS, containersAfterDecrease) .gaugeLong(PENDING_MB, memoryAfterDecrease) .gaugeInt(PENDING_V_CORES, vcoresAfterDecrease) + .gaugeLong(PENDING_CUSTOM_RES1, customResource1AfterDecrease) + .gaugeLong(PENDING_CUSTOM_RES2, customResource2AfterDecrease) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, @@ -522,7 +568,11 @@ public void testAllocateResourcesWithoutContainer() { .gaugeLong(ALLOCATED_MB, resource.getMemorySize()) .gaugeInt(ALLOCATED_V_CORES, resource.getVirtualCores()) .gaugeInt(PENDING_CONTAINERS, 1).gaugeLong(PENDING_MB, 0) - .gaugeInt(PENDING_V_CORES, 0); + .gaugeInt(PENDING_V_CORES, 0) + .gaugeLong(ALLOCATED_CUSTOM_RES1, + testData.customResourceValues.get(CUSTOM_RES_1)) + .gaugeLong(ALLOCATED_CUSTOM_RES2, + testData.customResourceValues.get(CUSTOM_RES_2)); checker.checkAgainst(testData.leafQueue.queueSource); checker.checkAgainst(testData.leafQueue.getRoot().queueSource); @@ -613,6 +663,7 @@ public void testUnreserveResources() { .gaugeInt(RESERVED_CONTAINERS, 0) .gaugeLong(RESERVED_MB, 0) .gaugeInt(RESERVED_V_CORES, 0) + .gaugeLong(RESERVED_CUSTOM_RES1, 0).gaugeLong(RESERVED_CUSTOM_RES2, 0) .checkAgainst(testData.leafQueue.queueSource); assertAllMetrics(testData.leafQueue, checker, QueueMetrics::getReservedResources,