From aeb3f6f1a84f6ad45e90551fb4c217d837c1bc0d Mon Sep 17 00:00:00 2001 From: Tamas Domok Date: Tue, 23 May 2023 10:36:37 +0200 Subject: [PATCH] YARN-11490. Reverting YARN-11211 and eliminating the use of DefaultMetricsSystem during configuration validation (#5644) --- .../scheduler/PartitionQueueMetrics.java | 2 +- .../scheduler/QueueMetrics.java | 44 ++++++--- .../scheduler/capacity/CSQueueMetrics.java | 80 ++++++++++++++-- .../CapacitySchedulerConfigValidator.java | 9 +- .../TestCapacitySchedulerConfigValidator.java | 96 +++++++++++++++++++ 5 files changed, 205 insertions(+), 26 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java index 150abdf51d..02eaa7bd9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -78,7 +78,7 @@ public class PartitionQueueMetrics extends QueueMetrics { metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName, null, false, this.conf, this.partition); users.put(userName, metrics); - registerMetrics( + metricsSystem.register( pSourceName(partitionJMXStr).append(qSourceName(queueName)) .append(",user=").append(userName).toString(), "Metrics for user '" + userName + "' in queue '" + queueName + "'", diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 82bff453aa..11672439c6 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -117,6 +117,9 @@ public class QueueMetrics implements MetricsSource { @Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores; @Metric("# of reserved containers") MutableGaugeInt reservedContainers; + // INTERNAL ONLY + private static final String CONFIGURATION_VALIDATION = "yarn.configuration-validation"; + private final MutableGaugeInt[] runningTime; private TimeBucketMetrics runBuckets; @@ -289,7 +292,7 @@ public class QueueMetrics implements MetricsSource { metrics = new QueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); - registerMetrics( + metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '"+ userName +"' in queue '"+ queueName +"'", metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName)); @@ -334,13 +337,15 @@ public class QueueMetrics implements MetricsSource { QueueMetrics queueMetrics = new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue, this.enableUserMetrics, this.conf, partition); - registerMetrics( + metricsSystem.register( pSourceName(partitionJMXStr).append(qSourceName(this.queueName)) .toString(), "Metrics for queue: " + this.queueName, queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, this.queueName)); - getQueueMetrics().put(metricName, queueMetrics); + if (!isConfigurationValidationSet(conf)) { + getQueueMetrics().put(metricName, queueMetrics); + } registerPartitionMetricsCreation(metricName); return queueMetrics; } else { @@ -348,6 +353,26 @@ public class QueueMetrics implements MetricsSource { } } + /** + * Check whether we are in a configuration validation mode. INTERNAL ONLY. + * + * @param conf the configuration to check + * @return true if + */ + public static boolean isConfigurationValidationSet(Configuration conf) { + return conf.getBoolean(CONFIGURATION_VALIDATION, false); + } + + /** + * Set configuration validation mode. INTERNAL ONLY. + * + * @param conf the configuration to update + * @param value the value for the validation mode + */ + public static void setConfigurationValidation(Configuration conf, boolean value) { + conf.setBoolean(CONFIGURATION_VALIDATION, value); + } + /** * Partition Metrics * @@ -378,7 +403,7 @@ public class QueueMetrics implements MetricsSource { // Register with the MetricsSystems if (metricsSystem != null) { - registerMetrics(pSourceName(partitionJMXStr).toString(), + metricsSystem.register(pSourceName(partitionJMXStr).toString(), "Metrics for partition: " + partitionJMXStr, (PartitionQueueMetrics) metrics.tag(PARTITION_INFO, partitionJMXStr)); @@ -1359,15 +1384,4 @@ public class QueueMetrics implements MetricsSource { } } } - - protected void registerMetrics(String sourceName, String desc, QueueMetrics metrics) { - MetricsSource source = metricsSystem.getSource(sourceName); - // Unregister metrics if a source is already present - if (source != null) { - LOG.info("Unregistering source " + sourceName); - metricsSystem.unregisterSource(sourceName); - } - - metricsSystem.register(sourceName, desc, metrics); - } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java index 16ebc15512..cd779a86b8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CSQueueMetrics.java @@ -21,11 +21,13 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import java.util.Map; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSink; import org.apache.hadoop.metrics2.MetricsSource; import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.MetricsAnnotations; import org.apache.hadoop.metrics2.lib.MutableGaugeFloat; import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; @@ -230,9 +232,74 @@ public class CSQueueMetrics extends QueueMetrics { } } + @Metrics(context="dummymetricssystem") + public static class DummyMetricsSystemImpl extends MetricsSystem { + @Override + public MetricsSystem init(String prefix) { + return this; + } + + @Override + public T register(String name, String desc, T source) { + MetricsAnnotations.newSourceBuilder(source).build(); + return source; + } + + @Override + public void unregisterSource(String name) { + } + + @Override + public MetricsSource getSource(String name) { + return null; + } + + @Override + public T register(String name, String desc, T sink) { + return null; + } + + @Override + public void register(Callback callback) { + } + + @Override + public void publishMetricsNow() { + } + + @Override + public boolean shutdown() { + return false; + } + + @Override + public void start() { + } + + @Override + public void stop() { + } + + @Override + public void startMetricsMBeans() { + } + + @Override + public void stopMetricsMBeans() { + } + + @Override + public String currentConfig() { + return null; + } + } + public synchronized static CSQueueMetrics forQueue(String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { - MetricsSystem ms = DefaultMetricsSystem.instance(); + final boolean isConfigValidation = isConfigurationValidationSet(conf); + + MetricsSystem ms = isConfigValidation + ? new DummyMetricsSystemImpl() : DefaultMetricsSystem.instance(); QueueMetrics metrics = getQueueMetrics().get(queueName); if (metrics == null) { metrics = @@ -241,15 +308,14 @@ public class CSQueueMetrics extends QueueMetrics { // Register with the MetricsSystems if (ms != null) { - MetricsSource source = ms.getSource(sourceName(queueName).toString()); - if (source != null) { - ms.unregisterSource(sourceName(queueName).toString()); - } metrics = ms.register(sourceName(queueName).toString(), "Metrics for queue: " + queueName, metrics); } - getQueueMetrics().put(queueName, metrics); + + if (!isConfigValidation) { + getQueueMetrics().put(queueName, metrics); + } } return (CSQueueMetrics) metrics; @@ -265,7 +331,7 @@ public class CSQueueMetrics extends QueueMetrics { metrics = new CSQueueMetrics(metricsSystem, queueName, null, false, conf); users.put(userName, metrics); - registerMetrics( + metricsSystem.register( sourceName(queueName).append(",user=").append(userName).toString(), "Metrics for user '" + userName + "' in queue '" + queueName + "'", ((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO, diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java index 396527aabb..e40e978a80 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerConfigValidator.java @@ -41,8 +41,13 @@ public final class CapacitySchedulerConfigValidator { } public static boolean validateCSConfiguration( - final Configuration oldConf, final Configuration newConf, + final Configuration oldConfParam, final Configuration newConf, final RMContext rmContext) throws IOException { + // ensure that the oldConf is deep copied + Configuration oldConf = new Configuration(oldConfParam); + QueueMetrics.setConfigurationValidation(oldConf, true); + QueueMetrics.setConfigurationValidation(newConf, true); + CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler(); CapacityScheduler newCs = new CapacityScheduler(); try { @@ -56,8 +61,6 @@ public final class CapacitySchedulerConfigValidator { return true; } finally { newCs.stop(); - QueueMetrics.clearQueueMetrics(); - liveScheduler.resetSchedulerMetrics(); } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java index 1bee66eb9d..84ad3b9f16 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacitySchedulerConfigValidator.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity; import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap; import org.apache.hadoop.yarn.LocalConfigurationProvider; import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes; @@ -32,6 +35,7 @@ import org.apache.hadoop.yarn.server.resourcemanager.MockRM; import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler; import org.apache.hadoop.yarn.util.YarnVersionInfo; import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator; @@ -425,6 +429,98 @@ public class TestCapacitySchedulerConfigValidator { Assert.assertTrue(isValidConfig); } + @Test + public void testValidateDoesNotModifyTheDefaultMetricsSystem() throws Exception { + try { + YarnConfiguration conf = new YarnConfiguration(CapacitySchedulerConfigGeneratorForTest + .createBasicCSConfiguration()); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + mockRM = new MockRM(conf); + + cs = (CapacityScheduler) mockRM.getResourceScheduler(); + mockRM.start(); + cs.start(); + RMContext rmContext = mockRM.getRMContext(); + Configuration oldConfig = cs.getConfig(); + + final Map cache = QueueMetrics.getQueueMetrics(); + final MetricsSystem ms = DefaultMetricsSystem.instance(); + + QueueMetrics origQM1 = cache.get("root.test1"); + QueueMetrics origQM2 = cache.get("root.test2"); + Assert.assertNotNull("Original queues should be found in the cache", origQM1); + Assert.assertNotNull("Original queues should be found in the cache", origQM2); + + QueueMetrics origPQM1 = cache.get("default.root.test1"); + QueueMetrics origPQM2 = cache.get("default.root.test2"); + Assert.assertNotNull("Original queues should be found in the cache (PartitionQueueMetrics)", + origPQM1); + Assert.assertNotNull("Original queues should be found in the cache (PartitionQueueMetrics)", + origPQM2); + + MetricsSource origMS1 = + ms.getSource("QueueMetrics,q0=root,q1=test1"); + MetricsSource origMS2 = + ms.getSource("QueueMetrics,q0=root,q1=test2"); + Assert.assertNotNull("Original queues should be found in the Metrics System", + origMS1); + Assert.assertNotNull("Original queues should be found in the Metrics System", + origMS2); + + MetricsSource origPMS1 = ms + .getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1"); + MetricsSource origPMS2 = ms + .getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2"); + Assert.assertNotNull( + "Original queues should be found in Metrics System (PartitionQueueMetrics)", origPMS1); + Assert.assertNotNull( + "Original queues should be found in Metrics System (PartitionQueueMetrics)", origPMS2); + + Configuration newConfig = new Configuration(oldConfig); + newConfig + .set("yarn.scheduler.capacity.root.queues", "test1, test2, test3"); + newConfig + .set("yarn.scheduler.capacity.root.test3.state", "RUNNING"); + newConfig + .set("yarn.scheduler.capacity.root.test3.capacity", "30"); + newConfig + .set("yarn.scheduler.capacity.root.test1.capacity", "20"); + + boolean isValidConfig = CapacitySchedulerConfigValidator + .validateCSConfiguration(oldConfig, newConfig, rmContext); + Assert.assertTrue(isValidConfig); + + Assert.assertFalse("Validated new queue should not be in the cache", + cache.containsKey("root.test3")); + Assert.assertFalse("Validated new queue should not be in the cache (PartitionQueueMetrics)", + cache.containsKey("default.root.test3")); + Assert.assertNull("Validated new queue should not be in the Metrics System", + ms.getSource("QueueMetrics,q0=root,q1=test3")); + Assert.assertNull( + "Validated new queue should not be in Metrics System (PartitionQueueMetrics)", + ms + .getSource("PartitionQueueMetrics,partition=,q0=root,q1=test3")); + + // Config validation should not change the existing + // objects in the cache and the metrics system + Assert.assertEquals(origQM1, cache.get("root.test1")); + Assert.assertEquals(origQM2, cache.get("root.test2")); + Assert.assertEquals(origPQM1, cache.get("default.root.test1")); + Assert.assertEquals(origPQM1, cache.get("default.root.test1")); + Assert.assertEquals(origMS1, + ms.getSource("QueueMetrics,q0=root,q1=test1")); + Assert.assertEquals(origMS2, + ms.getSource("QueueMetrics,q0=root,q1=test2")); + Assert.assertEquals(origPMS1, + ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1")); + Assert.assertEquals(origPMS2, + ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2")); + } finally { + mockRM.stop(); + } + } + /** * Delete a running queue. */