YARN-11490. Reverting YARN-11211 and eliminating the use of DefaultMetricsSystem during configuration validation (#5644)
This commit is contained in:
parent
a98d15804a
commit
aeb3f6f1a8
@ -78,7 +78,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
|
||||
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
|
||||
null, false, this.conf, this.partition);
|
||||
users.put(userName, metrics);
|
||||
registerMetrics(
|
||||
metricsSystem.register(
|
||||
pSourceName(partitionJMXStr).append(qSourceName(queueName))
|
||||
.append(",user=").append(userName).toString(),
|
||||
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
|
||||
|
@ -117,6 +117,9 @@ public class QueueMetrics implements MetricsSource {
|
||||
@Metric("Reserved CPU in virtual cores") MutableGaugeInt reservedVCores;
|
||||
@Metric("# of reserved containers") MutableGaugeInt reservedContainers;
|
||||
|
||||
// INTERNAL ONLY
|
||||
private static final String CONFIGURATION_VALIDATION = "yarn.configuration-validation";
|
||||
|
||||
private final MutableGaugeInt[] runningTime;
|
||||
private TimeBucketMetrics<ApplicationId> runBuckets;
|
||||
|
||||
@ -289,7 +292,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
|
||||
metrics =
|
||||
new QueueMetrics(metricsSystem, queueName, null, false, conf);
|
||||
users.put(userName, metrics);
|
||||
registerMetrics(
|
||||
metricsSystem.register(
|
||||
sourceName(queueName).append(",user=").append(userName).toString(),
|
||||
"Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
|
||||
metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
|
||||
@ -334,13 +337,15 @@ public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
|
||||
QueueMetrics queueMetrics =
|
||||
new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
|
||||
this.enableUserMetrics, this.conf, partition);
|
||||
registerMetrics(
|
||||
metricsSystem.register(
|
||||
pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
|
||||
.toString(),
|
||||
"Metrics for queue: " + this.queueName,
|
||||
queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO,
|
||||
this.queueName));
|
||||
getQueueMetrics().put(metricName, queueMetrics);
|
||||
if (!isConfigurationValidationSet(conf)) {
|
||||
getQueueMetrics().put(metricName, queueMetrics);
|
||||
}
|
||||
registerPartitionMetricsCreation(metricName);
|
||||
return queueMetrics;
|
||||
} else {
|
||||
@ -348,6 +353,26 @@ public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Check whether we are in a configuration validation mode. INTERNAL ONLY.
|
||||
*
|
||||
* @param conf the configuration to check
|
||||
* @return true if
|
||||
*/
|
||||
public static boolean isConfigurationValidationSet(Configuration conf) {
|
||||
return conf.getBoolean(CONFIGURATION_VALIDATION, false);
|
||||
}
|
||||
|
||||
/**
|
||||
* Set configuration validation mode. INTERNAL ONLY.
|
||||
*
|
||||
* @param conf the configuration to update
|
||||
* @param value the value for the validation mode
|
||||
*/
|
||||
public static void setConfigurationValidation(Configuration conf, boolean value) {
|
||||
conf.setBoolean(CONFIGURATION_VALIDATION, value);
|
||||
}
|
||||
|
||||
/**
|
||||
* Partition Metrics
|
||||
*
|
||||
@ -378,7 +403,7 @@ private QueueMetrics getPartitionMetrics(String partition) {
|
||||
|
||||
// Register with the MetricsSystems
|
||||
if (metricsSystem != null) {
|
||||
registerMetrics(pSourceName(partitionJMXStr).toString(),
|
||||
metricsSystem.register(pSourceName(partitionJMXStr).toString(),
|
||||
"Metrics for partition: " + partitionJMXStr,
|
||||
(PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
|
||||
partitionJMXStr));
|
||||
@ -1359,15 +1384,4 @@ public void setParentQueue(Queue parentQueue) {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
protected void registerMetrics(String sourceName, String desc, QueueMetrics metrics) {
|
||||
MetricsSource source = metricsSystem.getSource(sourceName);
|
||||
// Unregister metrics if a source is already present
|
||||
if (source != null) {
|
||||
LOG.info("Unregistering source " + sourceName);
|
||||
metricsSystem.unregisterSource(sourceName);
|
||||
}
|
||||
|
||||
metricsSystem.register(sourceName, desc, metrics);
|
||||
}
|
||||
}
|
@ -21,11 +21,13 @@
|
||||
import java.util.Map;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSink;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.annotation.Metrics;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.MetricsAnnotations;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeFloat;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeInt;
|
||||
import org.apache.hadoop.metrics2.lib.MutableGaugeLong;
|
||||
@ -230,9 +232,74 @@ protected void createQueueMetricsForCustomResources() {
|
||||
}
|
||||
}
|
||||
|
||||
@Metrics(context="dummymetricssystem")
|
||||
public static class DummyMetricsSystemImpl extends MetricsSystem {
|
||||
@Override
|
||||
public MetricsSystem init(String prefix) {
|
||||
return this;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T> T register(String name, String desc, T source) {
|
||||
MetricsAnnotations.newSourceBuilder(source).build();
|
||||
return source;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void unregisterSource(String name) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public MetricsSource getSource(String name) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public <T extends MetricsSink> T register(String name, String desc, T sink) {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void register(Callback callback) {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void publishMetricsNow() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public boolean shutdown() {
|
||||
return false;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void start() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stop() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void startMetricsMBeans() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public void stopMetricsMBeans() {
|
||||
}
|
||||
|
||||
@Override
|
||||
public String currentConfig() {
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
||||
public synchronized static CSQueueMetrics forQueue(String queueName,
|
||||
Queue parent, boolean enableUserMetrics, Configuration conf) {
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
final boolean isConfigValidation = isConfigurationValidationSet(conf);
|
||||
|
||||
MetricsSystem ms = isConfigValidation
|
||||
? new DummyMetricsSystemImpl() : DefaultMetricsSystem.instance();
|
||||
QueueMetrics metrics = getQueueMetrics().get(queueName);
|
||||
if (metrics == null) {
|
||||
metrics =
|
||||
@ -241,15 +308,14 @@ public synchronized static CSQueueMetrics forQueue(String queueName,
|
||||
|
||||
// Register with the MetricsSystems
|
||||
if (ms != null) {
|
||||
MetricsSource source = ms.getSource(sourceName(queueName).toString());
|
||||
if (source != null) {
|
||||
ms.unregisterSource(sourceName(queueName).toString());
|
||||
}
|
||||
metrics =
|
||||
ms.register(sourceName(queueName).toString(), "Metrics for queue: "
|
||||
+ queueName, metrics);
|
||||
}
|
||||
getQueueMetrics().put(queueName, metrics);
|
||||
|
||||
if (!isConfigValidation) {
|
||||
getQueueMetrics().put(queueName, metrics);
|
||||
}
|
||||
}
|
||||
|
||||
return (CSQueueMetrics) metrics;
|
||||
@ -265,7 +331,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
|
||||
metrics =
|
||||
new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
|
||||
users.put(userName, metrics);
|
||||
registerMetrics(
|
||||
metricsSystem.register(
|
||||
sourceName(queueName).append(",user=").append(userName).toString(),
|
||||
"Metrics for user '" + userName + "' in queue '" + queueName + "'",
|
||||
((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO,
|
||||
|
@ -41,8 +41,13 @@ private CapacitySchedulerConfigValidator() {
|
||||
}
|
||||
|
||||
public static boolean validateCSConfiguration(
|
||||
final Configuration oldConf, final Configuration newConf,
|
||||
final Configuration oldConfParam, final Configuration newConf,
|
||||
final RMContext rmContext) throws IOException {
|
||||
// ensure that the oldConf is deep copied
|
||||
Configuration oldConf = new Configuration(oldConfParam);
|
||||
QueueMetrics.setConfigurationValidation(oldConf, true);
|
||||
QueueMetrics.setConfigurationValidation(newConf, true);
|
||||
|
||||
CapacityScheduler liveScheduler = (CapacityScheduler) rmContext.getScheduler();
|
||||
CapacityScheduler newCs = new CapacityScheduler();
|
||||
try {
|
||||
@ -56,8 +61,6 @@ public static boolean validateCSConfiguration(
|
||||
return true;
|
||||
} finally {
|
||||
newCs.stop();
|
||||
QueueMetrics.clearQueueMetrics();
|
||||
liveScheduler.resetSchedulerMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -19,6 +19,9 @@
|
||||
package org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity;
|
||||
|
||||
import org.apache.hadoop.conf.Configuration;
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.collect.ImmutableMap;
|
||||
import org.apache.hadoop.yarn.LocalConfigurationProvider;
|
||||
import org.apache.hadoop.yarn.api.protocolrecords.ResourceTypes;
|
||||
@ -32,6 +35,7 @@
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.placement.PlacementManager;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
|
||||
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.ResourceScheduler;
|
||||
import org.apache.hadoop.yarn.util.YarnVersionInfo;
|
||||
import org.apache.hadoop.yarn.util.resource.DominantResourceCalculator;
|
||||
@ -425,6 +429,98 @@ public void testValidateCSConfigAddALeafQueueValid() throws IOException {
|
||||
Assert.assertTrue(isValidConfig);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testValidateDoesNotModifyTheDefaultMetricsSystem() throws Exception {
|
||||
try {
|
||||
YarnConfiguration conf = new YarnConfiguration(CapacitySchedulerConfigGeneratorForTest
|
||||
.createBasicCSConfiguration());
|
||||
conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class,
|
||||
ResourceScheduler.class);
|
||||
mockRM = new MockRM(conf);
|
||||
|
||||
cs = (CapacityScheduler) mockRM.getResourceScheduler();
|
||||
mockRM.start();
|
||||
cs.start();
|
||||
RMContext rmContext = mockRM.getRMContext();
|
||||
Configuration oldConfig = cs.getConfig();
|
||||
|
||||
final Map<String, QueueMetrics> cache = QueueMetrics.getQueueMetrics();
|
||||
final MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
|
||||
QueueMetrics origQM1 = cache.get("root.test1");
|
||||
QueueMetrics origQM2 = cache.get("root.test2");
|
||||
Assert.assertNotNull("Original queues should be found in the cache", origQM1);
|
||||
Assert.assertNotNull("Original queues should be found in the cache", origQM2);
|
||||
|
||||
QueueMetrics origPQM1 = cache.get("default.root.test1");
|
||||
QueueMetrics origPQM2 = cache.get("default.root.test2");
|
||||
Assert.assertNotNull("Original queues should be found in the cache (PartitionQueueMetrics)",
|
||||
origPQM1);
|
||||
Assert.assertNotNull("Original queues should be found in the cache (PartitionQueueMetrics)",
|
||||
origPQM2);
|
||||
|
||||
MetricsSource origMS1 =
|
||||
ms.getSource("QueueMetrics,q0=root,q1=test1");
|
||||
MetricsSource origMS2 =
|
||||
ms.getSource("QueueMetrics,q0=root,q1=test2");
|
||||
Assert.assertNotNull("Original queues should be found in the Metrics System",
|
||||
origMS1);
|
||||
Assert.assertNotNull("Original queues should be found in the Metrics System",
|
||||
origMS2);
|
||||
|
||||
MetricsSource origPMS1 = ms
|
||||
.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1");
|
||||
MetricsSource origPMS2 = ms
|
||||
.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2");
|
||||
Assert.assertNotNull(
|
||||
"Original queues should be found in Metrics System (PartitionQueueMetrics)", origPMS1);
|
||||
Assert.assertNotNull(
|
||||
"Original queues should be found in Metrics System (PartitionQueueMetrics)", origPMS2);
|
||||
|
||||
Configuration newConfig = new Configuration(oldConfig);
|
||||
newConfig
|
||||
.set("yarn.scheduler.capacity.root.queues", "test1, test2, test3");
|
||||
newConfig
|
||||
.set("yarn.scheduler.capacity.root.test3.state", "RUNNING");
|
||||
newConfig
|
||||
.set("yarn.scheduler.capacity.root.test3.capacity", "30");
|
||||
newConfig
|
||||
.set("yarn.scheduler.capacity.root.test1.capacity", "20");
|
||||
|
||||
boolean isValidConfig = CapacitySchedulerConfigValidator
|
||||
.validateCSConfiguration(oldConfig, newConfig, rmContext);
|
||||
Assert.assertTrue(isValidConfig);
|
||||
|
||||
Assert.assertFalse("Validated new queue should not be in the cache",
|
||||
cache.containsKey("root.test3"));
|
||||
Assert.assertFalse("Validated new queue should not be in the cache (PartitionQueueMetrics)",
|
||||
cache.containsKey("default.root.test3"));
|
||||
Assert.assertNull("Validated new queue should not be in the Metrics System",
|
||||
ms.getSource("QueueMetrics,q0=root,q1=test3"));
|
||||
Assert.assertNull(
|
||||
"Validated new queue should not be in Metrics System (PartitionQueueMetrics)",
|
||||
ms
|
||||
.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test3"));
|
||||
|
||||
// Config validation should not change the existing
|
||||
// objects in the cache and the metrics system
|
||||
Assert.assertEquals(origQM1, cache.get("root.test1"));
|
||||
Assert.assertEquals(origQM2, cache.get("root.test2"));
|
||||
Assert.assertEquals(origPQM1, cache.get("default.root.test1"));
|
||||
Assert.assertEquals(origPQM1, cache.get("default.root.test1"));
|
||||
Assert.assertEquals(origMS1,
|
||||
ms.getSource("QueueMetrics,q0=root,q1=test1"));
|
||||
Assert.assertEquals(origMS2,
|
||||
ms.getSource("QueueMetrics,q0=root,q1=test2"));
|
||||
Assert.assertEquals(origPMS1,
|
||||
ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test1"));
|
||||
Assert.assertEquals(origPMS2,
|
||||
ms.getSource("PartitionQueueMetrics,partition=,q0=root,q1=test2"));
|
||||
} finally {
|
||||
mockRM.stop();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Delete a running queue.
|
||||
*/
|
||||
|
Loading…
Reference in New Issue
Block a user