YARN-11211. QueueMetrics leaks Configuration objects when validation API is called multiple times. Contributed by Andras Gyori

This commit is contained in:
Szilard Nemeth 2022-07-21 14:20:34 +02:00
parent bac2219e3c
commit f4b635c4dc
4 changed files with 24 additions and 5 deletions

View File

@ -78,7 +78,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName, metrics = new PartitionQueueMetrics(this.metricsSystem, this.queueName,
null, false, this.conf, this.partition); null, false, this.conf, this.partition);
users.put(userName, metrics); users.put(userName, metrics);
metricsSystem.register( registerMetrics(
pSourceName(partitionJMXStr).append(qSourceName(queueName)) pSourceName(partitionJMXStr).append(qSourceName(queueName))
.append(",user=").append(userName).toString(), .append(",user=").append(userName).toString(),
"Metrics for user '" + userName + "' in queue '" + queueName + "'", "Metrics for user '" + userName + "' in queue '" + queueName + "'",

View File

@ -289,7 +289,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
metrics = metrics =
new QueueMetrics(metricsSystem, queueName, null, false, conf); new QueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics); users.put(userName, metrics);
metricsSystem.register( registerMetrics(
sourceName(queueName).append(",user=").append(userName).toString(), sourceName(queueName).append(",user=").append(userName).toString(),
"Metrics for user '"+ userName +"' in queue '"+ queueName +"'", "Metrics for user '"+ userName +"' in queue '"+ queueName +"'",
metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName)); metrics.tag(QUEUE_INFO, queueName).tag(USER_INFO, userName));
@ -334,7 +334,7 @@ public synchronized QueueMetrics getPartitionQueueMetrics(String partition) {
QueueMetrics queueMetrics = QueueMetrics queueMetrics =
new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue, new PartitionQueueMetrics(metricsSystem, this.queueName, parentQueue,
this.enableUserMetrics, this.conf, partition); this.enableUserMetrics, this.conf, partition);
metricsSystem.register( registerMetrics(
pSourceName(partitionJMXStr).append(qSourceName(this.queueName)) pSourceName(partitionJMXStr).append(qSourceName(this.queueName))
.toString(), .toString(),
"Metrics for queue: " + this.queueName, "Metrics for queue: " + this.queueName,
@ -378,7 +378,7 @@ private QueueMetrics getPartitionMetrics(String partition) {
// Register with the MetricsSystems // Register with the MetricsSystems
if (metricsSystem != null) { if (metricsSystem != null) {
metricsSystem.register(pSourceName(partitionJMXStr).toString(), registerMetrics(pSourceName(partitionJMXStr).toString(),
"Metrics for partition: " + partitionJMXStr, "Metrics for partition: " + partitionJMXStr,
(PartitionQueueMetrics) metrics.tag(PARTITION_INFO, (PartitionQueueMetrics) metrics.tag(PARTITION_INFO,
partitionJMXStr)); partitionJMXStr));
@ -1359,4 +1359,15 @@ public void setParentQueue(Queue parentQueue) {
} }
} }
} }
protected void registerMetrics(String sourceName, String desc, QueueMetrics metrics) {
MetricsSource source = metricsSystem.getSource(sourceName);
// Unregister metrics if a source is already present
if (source != null) {
LOG.info("Unregistering source " + sourceName);
metricsSystem.unregisterSource(sourceName);
}
metricsSystem.register(sourceName, desc, metrics);
}
} }

View File

@ -21,6 +21,7 @@
import java.util.Map; import java.util.Map;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.MetricsSource;
import org.apache.hadoop.metrics2.MetricsSystem; import org.apache.hadoop.metrics2.MetricsSystem;
import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.annotation.Metrics;
@ -240,6 +241,10 @@ public synchronized static CSQueueMetrics forQueue(String queueName,
// Register with the MetricsSystems // Register with the MetricsSystems
if (ms != null) { if (ms != null) {
MetricsSource source = ms.getSource(sourceName(queueName).toString());
if (source != null) {
ms.unregisterSource(sourceName(queueName).toString());
}
metrics = metrics =
ms.register(sourceName(queueName).toString(), "Metrics for queue: " ms.register(sourceName(queueName).toString(), "Metrics for queue: "
+ queueName, metrics); + queueName, metrics);
@ -260,7 +265,7 @@ public synchronized QueueMetrics getUserMetrics(String userName) {
metrics = metrics =
new CSQueueMetrics(metricsSystem, queueName, null, false, conf); new CSQueueMetrics(metricsSystem, queueName, null, false, conf);
users.put(userName, metrics); users.put(userName, metrics);
metricsSystem.register( registerMetrics(
sourceName(queueName).append(",user=").append(userName).toString(), sourceName(queueName).append(",user=").append(userName).toString(),
"Metrics for user '" + userName + "' in queue '" + queueName + "'", "Metrics for user '" + userName + "' in queue '" + queueName + "'",
((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO, ((CSQueueMetrics) metrics.tag(QUEUE_INFO, queueName)).tag(USER_INFO,

View File

@ -23,6 +23,7 @@
import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.conf.YarnConfiguration;
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
import org.apache.hadoop.yarn.server.resourcemanager.RMContext; import org.apache.hadoop.yarn.server.resourcemanager.RMContext;
import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
@ -55,6 +56,8 @@ public static boolean validateCSConfiguration(
return true; return true;
} finally { } finally {
newCs.stop(); newCs.stop();
QueueMetrics.clearQueueMetrics();
liveScheduler.resetSchedulerMetrics();
} }
} }