diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java index f43131809a..02eaa7bd9b 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/PartitionQueueMetrics.java @@ -40,6 +40,7 @@ protected PartitionQueueMetrics(MetricsSystem ms, String queueName, String parentMetricName = partition + METRIC_NAME_DELIMITER + newQueueName; setParent(getQueueMetrics().get(parentMetricName)); + storedPartitionMetrics = null; } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java index 6f9b1ab47b..3e6a1d7d71 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/QueueMetrics.java @@ -22,7 +22,9 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.HashSet; import java.util.Map; +import java.util.Set; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience.Private; @@ -40,12 +42,14 @@ import org.apache.hadoop.metrics2.lib.MutableGaugeInt; import org.apache.hadoop.metrics2.lib.MutableGaugeLong; import org.apache.hadoop.metrics2.lib.MutableRate; +import org.apache.hadoop.util.Sets; import org.apache.hadoop.yarn.api.records.ApplicationId; import org.apache.hadoop.yarn.api.records.Resource; import org.apache.hadoop.yarn.conf.YarnConfiguration; import org.apache.hadoop.yarn.metrics.CustomResourceMetricValue; import org.apache.hadoop.yarn.server.resourcemanager.nodelabels.RMNodeLabelsManager; import org.apache.hadoop.yarn.server.resourcemanager.rmapp.RMAppState; +import org.apache.hadoop.yarn.server.resourcemanager.webapp.dao.AppAttemptsInfo; import org.apache.hadoop.yarn.util.resource.ResourceUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -133,7 +137,7 @@ public class QueueMetrics implements MetricsSource { protected final MetricsRegistry registry; protected final String queueName; private QueueMetrics parent; - private final Queue parentQueue; + private Queue parentQueue; protected final MetricsSystem metricsSystem; protected final Map users; protected final Configuration conf; @@ -177,6 +181,7 @@ public class QueueMetrics implements MetricsSource { "AggregatePreemptedSeconds."; private static final String AGGREGATE_PREEMPTED_SECONDS_METRIC_DESC = "Aggregate Preempted Seconds for NAME"; + protected Set storedPartitionMetrics = Sets.newConcurrentHashSet(); public QueueMetrics(MetricsSystem ms, String queueName, Queue parent, boolean enableUserMetrics, Configuration conf) { @@ -338,6 +343,7 @@ public synchronized QueueMetrics getPartitionQueueMetrics(String partition) { queueMetrics.tag(PARTITION_INFO, partitionJMXStr).tag(QUEUE_INFO, this.queueName)); getQueueMetrics().put(metricName, queueMetrics); + registerPartitionMetricsCreation(metricName); return queueMetrics; } else { return metrics; @@ -380,6 +386,7 @@ private QueueMetrics getPartitionMetrics(String partition) { partitionJMXStr)); } getQueueMetrics().put(metricName, metrics); + registerPartitionMetricsCreation(metricName); } return metrics; } @@ -1332,4 +1339,26 @@ public void setParent(QueueMetrics parent) { public Queue getParentQueue() { return parentQueue; } + + protected void registerPartitionMetricsCreation(String metricName) { + if (storedPartitionMetrics != null) { + storedPartitionMetrics.add(metricName); + } + } + + public void setParentQueue(Queue parentQueue) { + this.parentQueue = parentQueue; + + if (storedPartitionMetrics == null) { + return; + } + + for (String partitionMetric : storedPartitionMetrics) { + QueueMetrics metric = getQueueMetrics().get(partitionMetric); + + if (metric != null && metric.parentQueue != null) { + metric.parentQueue = parentQueue; + } + } + } } \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java index 124131391e..1a5a1ce0fd 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/AbstractCSQueue.java @@ -262,6 +262,7 @@ public CSQueue getParent() { @Override public void setParent(CSQueue newParentQueue) { this.parent = newParentQueue; + getMetrics().setParentQueue(newParentQueue); } @Override diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java index 7e362731f8..1adde7d8d1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/CapacitySchedulerQueueHelpers.java @@ -90,6 +90,35 @@ public static CapacitySchedulerConfiguration setupQueueConfiguration( return conf; } + public static CapacitySchedulerConfiguration setupAdditionalQueues( + CapacitySchedulerConfiguration conf) { + + // Define top-level queues + conf.setQueues(CapacitySchedulerConfiguration.ROOT, new String[]{"a", "b"}); + + conf.setCapacity(A, A_CAPACITY); + conf.setCapacity(B, B_CAPACITY); + + // Define 2nd-level queues + conf.setQueues(A, new String[]{"a1", "a2", "a3"}); + conf.setCapacity(A1, 30.0f); + conf.setUserLimitFactor(A1, 100.0f); + conf.setCapacity(A2, 30.0f); + conf.setUserLimitFactor(A2, 100.0f); + conf.setCapacity("root.a.a3", 40.0f); + conf.setUserLimitFactor("root.a.a3", 100.0f); + + conf.setQueues(B, new String[]{"b1", "b2", "b3"}); + conf.setCapacity(B1, B1_CAPACITY); + conf.setUserLimitFactor(B1, 100.0f); + conf.setCapacity(B2, B2_CAPACITY); + conf.setUserLimitFactor(B2, 100.0f); + conf.setCapacity(B3, B3_CAPACITY); + conf.setUserLimitFactor(B3, 100.0f); + + return conf; + } + /** * @param conf, to be modified * @return CS configuration which has deleted all children of queue(b) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 4a9e45e756..7c407dfd4a 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -24,6 +24,7 @@ import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocMb; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerConfigGeneratorForTest.setMinAllocVcores; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.findQueue; +import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupAdditionalQueues; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupBlockedQueueConfiguration; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupOtherBlockedQueueConfiguration; import static org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacitySchedulerQueueHelpers.setupQueueConfiguration; @@ -81,6 +82,7 @@ import org.apache.hadoop.util.Sets; import org.apache.hadoop.service.ServiceStateException; import org.apache.hadoop.yarn.server.api.records.NodeStatus; +import org.apache.hadoop.yarn.server.resourcemanager.scheduler.QueueMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.apache.hadoop.conf.Configuration; @@ -2663,6 +2665,64 @@ public void testContainerAllocationLocalitySkipped() throws Exception { ContainerAllocation.QUEUE_SKIPPED.getAllocationState()); } + /** + * Tests + * @throws Exception + */ + @Test + public void testCSQueueMetricsDoesNotLeakOnReinit() throws Exception { + // Initialize resource map + Map riMap = new HashMap<>(); + + // Initialize mandatory resources + ResourceInformation memory = + ResourceInformation.newInstance(ResourceInformation.MEMORY_MB.getName(), + ResourceInformation.MEMORY_MB.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_MB, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_MB); + ResourceInformation vcores = + ResourceInformation.newInstance(ResourceInformation.VCORES.getName(), + ResourceInformation.VCORES.getUnits(), + YarnConfiguration.DEFAULT_RM_SCHEDULER_MINIMUM_ALLOCATION_VCORES, + YarnConfiguration.DEFAULT_RM_SCHEDULER_MAXIMUM_ALLOCATION_VCORES); + riMap.put(ResourceInformation.MEMORY_URI, memory); + riMap.put(ResourceInformation.VCORES_URI, vcores); + + ResourceUtils.initializeResourcesFromResourceInformationMap(riMap); + + CapacitySchedulerConfiguration csConf = + new CapacitySchedulerConfiguration(); + csConf.setResourceComparator(DominantResourceCalculator.class); + + setupQueueConfiguration(csConf); + + YarnConfiguration conf = new YarnConfiguration(csConf); + + // Don't reset resource types since we have already configured resource + // types + conf.setBoolean(TestResourceProfiles.TEST_CONF_RESET_RESOURCE_TYPES, false); + conf.setClass(YarnConfiguration.RM_SCHEDULER, CapacityScheduler.class, + ResourceScheduler.class); + + MockRM rm = new MockRM(conf); + rm.start(); + + CapacityScheduler cs = (CapacityScheduler) rm.getResourceScheduler(); + csConf = new CapacitySchedulerConfiguration(); + setupAdditionalQueues(csConf); + cs.reinitialize(csConf, cs.getRMContext()); + QueueMetrics a3DefaultPartitionMetrics = QueueMetrics.getQueueMetrics().get( + "default.root.a.a3"); + + Assert.assertSame("Different ParentQueue of siblings is a sign of a memory leak", + QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(), + QueueMetrics.getQueueMetrics().get("root.a.a3").getParentQueue()); + + Assert.assertSame("Different ParentQueue of partition metrics is a sign of a memory leak", + QueueMetrics.getQueueMetrics().get("root.a.a1").getParentQueue(), + a3DefaultPartitionMetrics.getParentQueue()); + } + @Test public void testCSQueueMetrics() throws Exception {