From 051a5068dd6d505f0b4748f103a13c4938d61cc0 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Tue, 11 May 2021 19:23:45 +0200 Subject: [PATCH] YARN-9615. Add dispatcher metrics to RM. Contributed by Jonathan Hung and Qi Zhu. --- YARN-9615-branch-3.3-001.patch.1 | 732 ++++++++++++++++++ .../hadoop/yarn/event/AsyncDispatcher.java | 26 +- .../hadoop/yarn/event/EventDispatcher.java | 21 +- .../yarn/metrics/DisableEventTypeMetrics.java | 42 + .../hadoop/yarn/metrics/EventTypeMetrics.java | 32 + .../yarn/metrics/GenericEventTypeMetrics.java | 160 ++++ .../yarn/event/TestAsyncDispatcher.java | 182 ++++- .../GenericEventTypeMetricsManager.java | 42 + .../resourcemanager/ResourceManager.java | 11 +- .../resourcemanager/TestResourceManager.java | 2 + .../scheduler/TestSchedulerHealth.java | 2 + .../capacity/TestCapacityScheduler.java | 1 + 12 files changed, 1248 insertions(+), 5 deletions(-) create mode 100644 YARN-9615-branch-3.3-001.patch.1 create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java create mode 100644 hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java diff --git a/YARN-9615-branch-3.3-001.patch.1 b/YARN-9615-branch-3.3-001.patch.1 new file mode 100644 index 0000000000..80e9ca92e5 --- /dev/null +++ b/YARN-9615-branch-3.3-001.patch.1 @@ -0,0 +1,732 @@ +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +index f9deab06ff2..667515d00c1 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +@@ -26,6 +26,9 @@ + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + ++import org.apache.hadoop.yarn.metrics.EventTypeMetrics; ++import org.apache.hadoop.yarn.util.Clock; ++import org.apache.hadoop.yarn.util.MonotonicClock; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.Marker; +@@ -85,6 +88,11 @@ + protected final Map, EventHandler> eventDispatchers; + private boolean exitOnDispatchException = true; + ++ private Map, ++ EventTypeMetrics> eventTypeMetricsMap; ++ ++ private Clock clock = new MonotonicClock(); ++ + /** + * The thread name for dispatcher. + */ +@@ -98,6 +106,8 @@ public AsyncDispatcher(BlockingQueue eventQueue) { + super("Dispatcher"); + this.eventQueue = eventQueue; + this.eventDispatchers = new HashMap, EventHandler>(); ++ this.eventTypeMetricsMap = new HashMap, ++ EventTypeMetrics>(); + } + + /** +@@ -135,7 +145,16 @@ public void run() { + return; + } + if (event != null) { +- dispatch(event); ++ if (eventTypeMetricsMap. ++ get(event.getType().getDeclaringClass()) != null) { ++ long startTime = clock.getTime(); ++ dispatch(event); ++ eventTypeMetricsMap.get(event.getType().getDeclaringClass()) ++ .increment(event.getType(), ++ clock.getTime() - startTime); ++ } else { ++ dispatch(event); ++ } + if (printTrigger) { + //Log the latest dispatch event type + // may cause the too many events queued +@@ -369,4 +388,9 @@ protected boolean isDrained() { + protected boolean isStopped() { + return stopped; + } ++ ++ public void addMetrics(EventTypeMetrics metrics, ++ Class eventClass) { ++ eventTypeMetricsMap.put(eventClass, metrics); ++ } + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java +index cadb73663a0..849bb402d87 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java +@@ -19,6 +19,9 @@ + package org.apache.hadoop.yarn.event; + + import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; ++import org.apache.hadoop.yarn.metrics.EventTypeMetrics; ++import org.apache.hadoop.yarn.util.Clock; ++import org.apache.hadoop.yarn.util.MonotonicClock; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.Marker; +@@ -47,12 +50,15 @@ + private final Thread eventProcessor; + private volatile boolean stopped = false; + private boolean shouldExitOnError = true; ++ private EventTypeMetrics metrics; + + private static final Logger LOG = + LoggerFactory.getLogger(EventDispatcher.class); + private static final Marker FATAL = + MarkerFactory.getMarker("FATAL"); + ++ private Clock clock = new MonotonicClock(); ++ + private final class EventProcessor implements Runnable { + @Override + public void run() { +@@ -68,7 +74,14 @@ public void run() { + } + + try { +- handler.handle(event); ++ if (metrics != null) { ++ long startTime = clock.getTime(); ++ handler.handle(event); ++ metrics.increment(event.getType(), ++ clock.getTime() - startTime); ++ } else { ++ handler.handle(event); ++ } + } catch (Throwable t) { + // An error occurred, but we are shutting down anyway. + // If it was an InterruptedException, the very act of +@@ -136,6 +149,7 @@ public void handle(T event) { + public void disableExitOnError() { + shouldExitOnError = false; + } ++ + protected long getEventProcessorId() { + return this.eventProcessor.getId(); + } +@@ -143,4 +157,9 @@ protected long getEventProcessorId() { + protected boolean isStopped() { + return this.stopped; + } ++ ++ public void setMetrics(EventTypeMetrics metrics) { ++ this.metrics = metrics; ++ } ++ + } +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java +new file mode 100644 +index 00000000000..7b4af0c3e09 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java +@@ -0,0 +1,42 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.yarn.metrics; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.metrics2.MetricsCollector; ++import org.apache.hadoop.metrics2.annotation.Metrics; ++ ++@InterfaceAudience.Private ++@Metrics(context="yarn") ++public class DisableEventTypeMetrics implements EventTypeMetrics { ++ @Override ++ public void increment(Enum type, long processingTimeUs) { ++ //nop ++ return; ++ } ++ @Override ++ public void getMetrics(MetricsCollector collector, boolean all) { ++ //nop ++ return; ++ } ++ ++ @Override ++ public long get(Enum type) { ++ return 0; ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java +new file mode 100644 +index 00000000000..7a7e4f54890 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java +@@ -0,0 +1,32 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.yarn.metrics; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.metrics2.MetricsSource; ++import org.apache.hadoop.metrics2.annotation.Metrics; ++ ++@InterfaceAudience.Private ++@Metrics(context="yarn") ++public interface EventTypeMetrics> ++ extends MetricsSource { ++ ++ void increment(T type, long processingTimeUs); ++ ++ long get(T type); ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java +new file mode 100644 +index 00000000000..464edb27782 +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java +@@ -0,0 +1,160 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.yarn.metrics; ++ ++import org.apache.hadoop.classification.InterfaceAudience; ++import org.apache.hadoop.metrics2.MetricsCollector; ++import org.apache.hadoop.metrics2.MetricsInfo; ++import org.apache.hadoop.metrics2.MetricsSystem; ++import org.apache.hadoop.metrics2.annotation.Metrics; ++import org.apache.hadoop.metrics2.lib.MetricsRegistry; ++import org.apache.hadoop.metrics2.lib.MutableGaugeLong; ++import org.slf4j.Logger; ++import org.slf4j.LoggerFactory; ++ ++import java.util.EnumMap; ++ ++@InterfaceAudience.Private ++@Metrics(context="yarn") ++public class GenericEventTypeMetrics> ++ implements EventTypeMetrics { ++ ++ static final Logger LOG = ++ LoggerFactory.getLogger(GenericEventTypeMetrics.class); ++ ++ private final EnumMap eventCountMetrics; ++ private final EnumMap processingTimeMetrics; ++ private final MetricsRegistry registry; ++ private final MetricsSystem ms; ++ private final MetricsInfo info; ++ private final Class enumClass; ++ ++ private boolean isInitialized = false; ++ ++ public GenericEventTypeMetrics(MetricsInfo info, MetricsSystem ms, ++ final T[] enums, Class enumClass) { ++ this.enumClass = enumClass; ++ this.eventCountMetrics = new EnumMap<>(this.enumClass); ++ this.processingTimeMetrics = new EnumMap<>(this.enumClass); ++ this.ms = ms; ++ this.info = info; ++ this.registry = new MetricsRegistry(this.info); ++ ++ //Initialize enum ++ for (final T type : enums) { ++ String eventCountMetricsName = ++ type.toString() + "_" + "event_count"; ++ String processingTimeMetricsName = ++ type.toString() + "_" + "processing_time"; ++ eventCountMetrics.put(type, this.registry. ++ newGauge(eventCountMetricsName, eventCountMetricsName, 0L)); ++ processingTimeMetrics.put(type, this.registry. ++ newGauge(processingTimeMetricsName, processingTimeMetricsName, 0L)); ++ } ++ } ++ ++ public synchronized GenericEventTypeMetrics registerMetrics() { ++ if (!isInitialized) { ++ // Register with the MetricsSystems ++ if (this.ms != null) { ++ LOG.info("Registering GenericEventTypeMetrics"); ++ ms.register(info.name(), ++ info.description(), this); ++ isInitialized = true; ++ } ++ } ++ return this; ++ } ++ ++ @Override ++ public void increment(T type, long processingTimeUs) { ++ if (eventCountMetrics.get(type) != null) { ++ eventCountMetrics.get(type).incr(); ++ processingTimeMetrics.get(type).incr(processingTimeUs); ++ } ++ } ++ ++ @Override ++ public long get(T type) { ++ return eventCountMetrics.get(type).value(); ++ } ++ ++ public long getTotalProcessingTime(T type) { ++ return processingTimeMetrics.get(type).value(); ++ } ++ ++ public EnumMap getEventCountMetrics() { ++ return eventCountMetrics; ++ } ++ ++ public EnumMap getProcessingTimeMetrics() { ++ return processingTimeMetrics; ++ } ++ ++ public MetricsRegistry getRegistry() { ++ return registry; ++ } ++ ++ public MetricsInfo getInfo() { ++ return info; ++ } ++ ++ @Override ++ public void getMetrics(MetricsCollector collector, boolean all) { ++ registry.snapshot(collector.addRecord(registry.info()), all); ++ } ++ ++ public Class getEnumClass() { ++ return enumClass; ++ } ++ ++ /** Builder class for GenericEventTypeMetrics. */ ++ public static class EventTypeMetricsBuilder>{ ++ public EventTypeMetricsBuilder() { ++ } ++ ++ public EventTypeMetricsBuilder setEnumClass(Class enumClassValue) { ++ this.enumClass = enumClassValue; ++ return this; ++ } ++ ++ public EventTypeMetricsBuilder setEnums(T[] enumsValue) { ++ this.enums = enumsValue.clone(); ++ return this; ++ } ++ ++ public EventTypeMetricsBuilder setInfo(MetricsInfo infoValue) { ++ this.info = infoValue; ++ return this; ++ } ++ ++ public EventTypeMetricsBuilder setMs(MetricsSystem msValue) { ++ this.ms = msValue; ++ return this; ++ } ++ ++ public GenericEventTypeMetrics build() { ++ return new GenericEventTypeMetrics(info, ms, enums, enumClass); ++ } ++ ++ private MetricsSystem ms; ++ private MetricsInfo info; ++ private Class enumClass; ++ private T[] enums; ++ } ++} +\ No newline at end of file +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +index 55ddd12fce9..7d2572a4c11 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +@@ -20,9 +20,20 @@ + + import java.lang.reflect.Field; + import java.lang.reflect.Modifier; ++import java.util.HashMap; ++import java.util.HashSet; ++import java.util.Map; ++import java.util.Set; + import java.util.concurrent.BlockingQueue; + import java.util.concurrent.LinkedBlockingQueue; + ++ ++import org.apache.hadoop.metrics2.AbstractMetric; ++import org.apache.hadoop.metrics2.MetricsRecord; ++import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; ++import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; ++import org.apache.hadoop.test.GenericTestUtils; ++import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; + import org.slf4j.Logger; + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.yarn.conf.YarnConfiguration; +@@ -30,6 +41,7 @@ + import org.junit.Assert; + import org.junit.Test; + ++import static org.apache.hadoop.metrics2.lib.Interns.info; + import static org.junit.Assert.assertEquals; + import static org.mockito.Mockito.*; + +@@ -118,7 +130,7 @@ public void handle(Event event) { + } + + private enum TestEnum { +- TestEventType ++ TestEventType, TestEventType2 + } + + @SuppressWarnings({ "rawtypes", "unchecked" }) +@@ -230,5 +242,171 @@ public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() + } + } + +-} ++ @Test ++ public void testMetricsForDispatcher() throws Exception { ++ YarnConfiguration conf = new YarnConfiguration(); ++ AsyncDispatcher dispatcher = null; ++ ++ try { ++ dispatcher = new AsyncDispatcher("RM Event dispatcher"); ++ ++ GenericEventTypeMetrics genericEventTypeMetrics = ++ new GenericEventTypeMetrics.EventTypeMetricsBuilder() ++ .setMs(DefaultMetricsSystem.instance()) ++ .setInfo(info("GenericEventTypeMetrics for " ++ + TestEnum.class.getName(), ++ "Metrics for " + dispatcher.getName())) ++ .setEnumClass(TestEnum.class) ++ .setEnums(TestEnum.class.getEnumConstants()) ++ .build().registerMetrics(); ++ ++ // We can the metrics enabled for TestEnum ++ dispatcher.addMetrics(genericEventTypeMetrics, ++ genericEventTypeMetrics.getEnumClass()); ++ dispatcher.init(conf); ++ ++ // Register handler ++ dispatcher.register(TestEnum.class, new TestHandler()); ++ dispatcher.start(); ++ ++ for (int i = 0; i < 3; ++i) { ++ Event event = mock(Event.class); ++ when(event.getType()).thenReturn(TestEnum.TestEventType); ++ dispatcher.getEventHandler().handle(event); ++ } ++ ++ for (int i = 0; i < 2; ++i) { ++ Event event = mock(Event.class); ++ when(event.getType()).thenReturn(TestEnum.TestEventType2); ++ dispatcher.getEventHandler().handle(event); ++ } ++ ++ // Check event type count. ++ GenericTestUtils.waitFor(() -> genericEventTypeMetrics. ++ get(TestEnum.TestEventType) == 3, 1000, 10000); ++ ++ GenericTestUtils.waitFor(() -> genericEventTypeMetrics. ++ get(TestEnum.TestEventType2) == 2, 1000, 10000); ++ ++ // Check time spend. ++ Assert.assertTrue(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType) ++ >= 1500*3); ++ Assert.assertTrue(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType) ++ < 1500*4); ++ ++ Assert.assertTrue(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType2) ++ >= 1500*2); ++ Assert.assertTrue(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType2) ++ < 1500*3); ++ ++ // Make sure metrics consistent. ++ Assert.assertEquals(Long.toString(genericEventTypeMetrics. ++ get(TestEnum.TestEventType)), ++ genericEventTypeMetrics. ++ getRegistry().get("TestEventType_event_count").toString()); ++ Assert.assertEquals(Long.toString(genericEventTypeMetrics. ++ get(TestEnum.TestEventType2)), ++ genericEventTypeMetrics. ++ getRegistry().get("TestEventType2_event_count").toString()); ++ Assert.assertEquals(Long.toString(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType)), ++ genericEventTypeMetrics. ++ getRegistry().get("TestEventType_processing_time").toString()); ++ Assert.assertEquals(Long.toString(genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType2)), ++ genericEventTypeMetrics. ++ getRegistry().get("TestEventType2_processing_time").toString()); ++ ++ } finally { ++ dispatcher.close(); ++ } ++ ++ } ++ ++ @Test ++ public void testDispatcherMetricsHistogram() throws Exception { ++ YarnConfiguration conf = new YarnConfiguration(); ++ AsyncDispatcher dispatcher = null; ++ ++ try { ++ dispatcher = new AsyncDispatcher("RM Event dispatcher"); ++ ++ GenericEventTypeMetrics genericEventTypeMetrics = ++ new GenericEventTypeMetrics.EventTypeMetricsBuilder() ++ .setMs(DefaultMetricsSystem.instance()) ++ .setInfo(info("GenericEventTypeMetrics for " ++ + TestEnum.class.getName(), ++ "Metrics for " + dispatcher.getName())) ++ .setEnumClass(TestEnum.class) ++ .setEnums(TestEnum.class.getEnumConstants()) ++ .build().registerMetrics(); ++ ++ // We can the metrics enabled for TestEnum ++ dispatcher.addMetrics(genericEventTypeMetrics, ++ genericEventTypeMetrics.getEnumClass()); ++ dispatcher.init(conf); ++ ++ // Register handler ++ dispatcher.register(TestEnum.class, new TestHandler()); ++ dispatcher.start(); ++ ++ for (int i = 0; i < 3; ++i) { ++ Event event = mock(Event.class); ++ when(event.getType()).thenReturn(TestEnum.TestEventType); ++ dispatcher.getEventHandler().handle(event); ++ } ++ ++ for (int i = 0; i < 2; ++i) { ++ Event event = mock(Event.class); ++ when(event.getType()).thenReturn(TestEnum.TestEventType2); ++ dispatcher.getEventHandler().handle(event); ++ } ++ ++ // Check event type count. ++ GenericTestUtils.waitFor(() -> genericEventTypeMetrics. ++ get(TestEnum.TestEventType) == 3, 1000, 10000); ++ ++ GenericTestUtils.waitFor(() -> genericEventTypeMetrics. ++ get(TestEnum.TestEventType2) == 2, 1000, 10000); ++ ++ // submit actual values ++ Map expectedValues = new HashMap<>(); ++ expectedValues.put("TestEventType_event_count", ++ genericEventTypeMetrics.get(TestEnum.TestEventType)); ++ expectedValues.put("TestEventType_processing_time", ++ genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType)); ++ expectedValues.put("TestEventType2_event_count", ++ genericEventTypeMetrics.get(TestEnum.TestEventType2)); ++ expectedValues.put("TestEventType2_processing_time", ++ genericEventTypeMetrics. ++ getTotalProcessingTime(TestEnum.TestEventType2)); ++ Set testResults = new HashSet<>(); + ++ MetricsCollectorImpl collector = new MetricsCollectorImpl(); ++ genericEventTypeMetrics.getMetrics(collector, true); ++ ++ for (MetricsRecord record : collector.getRecords()) { ++ for (AbstractMetric metric : record.metrics()) { ++ String metricName = metric.name(); ++ if (expectedValues.containsKey(metricName)) { ++ Long expectedValue = expectedValues.get(metricName); ++ Assert.assertEquals( ++ "Metric " + metricName + " doesn't have expected value", ++ expectedValue, metric.value()); ++ testResults.add(metricName); ++ } ++ } ++ } ++ Assert.assertEquals(expectedValues.keySet(), testResults); ++ ++ } finally { ++ dispatcher.close(); ++ } ++ ++ } ++} +\ No newline at end of file +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java +new file mode 100644 +index 00000000000..8fda9b7f38a +--- /dev/null ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java +@@ -0,0 +1,42 @@ ++/** ++ * Licensed to the Apache Software Foundation (ASF) under one ++ * or more contributor license agreements. See the NOTICE file ++ * distributed with this work for additional information ++ * regarding copyright ownership. The ASF licenses this file ++ * to you under the Apache License, Version 2.0 (the ++ * "License"); you may not use this file except in compliance ++ * with the License. You may obtain a copy of the License at ++ * ++ * http://www.apache.org/licenses/LICENSE-2.0 ++ * ++ * Unless required by applicable law or agreed to in writing, software ++ * distributed under the License is distributed on an "AS IS" BASIS, ++ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ++ * See the License for the specific language governing permissions and ++ * limitations under the License. ++ */ ++package org.apache.hadoop.yarn.server.resourcemanager; ++ ++import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; ++import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; ++ ++import static org.apache.hadoop.metrics2.lib.Interns.info; ++ ++public final class GenericEventTypeMetricsManager { ++ ++ private GenericEventTypeMetricsManager() { ++ // nothing to do ++ } ++ ++ // Construct a GenericEventTypeMetrics for dispatcher ++ public static > GenericEventTypeMetrics ++ create(String dispatcherName, Class eventTypeClass) { ++ return new GenericEventTypeMetrics.EventTypeMetricsBuilder() ++ .setMs(DefaultMetricsSystem.instance()) ++ .setInfo(info("GenericEventTypeMetrics for " + eventTypeClass.getName(), ++ "Metrics for " + dispatcherName)) ++ .setEnumClass(eventTypeClass) ++ .setEnums(eventTypeClass.getEnumConstants()) ++ .build().registerMetrics(); ++ } ++} +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +index bb8a3ba7db3..ca47fdb648b 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +@@ -21,6 +21,7 @@ + import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; + import com.sun.jersey.spi.container.servlet.ServletContainer; + ++import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; + import org.slf4j.Logger; + import org.slf4j.LoggerFactory; + import org.slf4j.Marker; +@@ -63,6 +64,7 @@ + import org.apache.hadoop.yarn.event.Dispatcher; + import org.apache.hadoop.yarn.event.EventDispatcher; + import org.apache.hadoop.yarn.event.EventHandler; ++ + import org.apache.hadoop.yarn.exceptions.YarnException; + import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; + import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; +@@ -470,7 +472,14 @@ protected void setRMStateStore(RMStateStore rmStore) { + } + + protected Dispatcher createDispatcher() { +- return new AsyncDispatcher("RM Event dispatcher"); ++ AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); ++ GenericEventTypeMetrics genericEventTypeMetrics = ++ GenericEventTypeMetricsManager. ++ create(dispatcher.getName(), NodesListManagerEventType.class); ++ // We can add more ++ dispatcher.addMetrics(genericEventTypeMetrics, ++ genericEventTypeMetrics.getEnumClass()); ++ return dispatcher; + } + + protected ResourceScheduler createScheduler() { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +index 1cb5e1d0e76..b9c5500a7d2 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +@@ -29,6 +29,7 @@ + import org.apache.hadoop.conf.Configuration; + import org.apache.hadoop.fs.CommonConfigurationKeysPublic; + import org.apache.hadoop.http.lib.StaticUserWebFilter; ++import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + import org.apache.hadoop.net.NetworkTopology; + import org.apache.hadoop.security.AuthenticationFilterInitializer; + import org.apache.hadoop.security.UserGroupInformation; +@@ -73,6 +74,7 @@ + public void setUp() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + UserGroupInformation.setConfiguration(conf); ++ DefaultMetricsSystem.setMiniClusterMode(true); + resourceManager = new ResourceManager(); + resourceManager.init(conf); + resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +index a75be7745fb..933eaf91fe4 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +@@ -18,6 +18,7 @@ + + package org.apache.hadoop.yarn.server.resourcemanager.scheduler; + ++import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; + import org.apache.hadoop.net.NetworkTopology; + import org.apache.hadoop.util.Time; + import org.apache.hadoop.yarn.api.records.ContainerId; +@@ -52,6 +53,7 @@ + private ResourceManager resourceManager; + + public void setup() { ++ DefaultMetricsSystem.setMiniClusterMode(true); + resourceManager = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { +diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +index 6b0c42f9b16..8bc18481cb2 100644 +--- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java ++++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +@@ -210,6 +210,7 @@ + @Before + public void setUp() throws Exception { + ResourceUtils.resetResourceTypes(new Configuration()); ++ DefaultMetricsSystem.setMiniClusterMode(true); + resourceManager = new ResourceManager() { + @Override + protected RMNodeLabelsManager createNodeLabelManager() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index f9deab06ff..667515d00c 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -26,6 +26,9 @@ import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; +import org.apache.hadoop.yarn.metrics.EventTypeMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -85,6 +88,11 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { protected final Map, EventHandler> eventDispatchers; private boolean exitOnDispatchException = true; + private Map, + EventTypeMetrics> eventTypeMetricsMap; + + private Clock clock = new MonotonicClock(); + /** * The thread name for dispatcher. */ @@ -98,6 +106,8 @@ public AsyncDispatcher(BlockingQueue eventQueue) { super("Dispatcher"); this.eventQueue = eventQueue; this.eventDispatchers = new HashMap, EventHandler>(); + this.eventTypeMetricsMap = new HashMap, + EventTypeMetrics>(); } /** @@ -135,7 +145,16 @@ public void run() { return; } if (event != null) { - dispatch(event); + if (eventTypeMetricsMap. + get(event.getType().getDeclaringClass()) != null) { + long startTime = clock.getTime(); + dispatch(event); + eventTypeMetricsMap.get(event.getType().getDeclaringClass()) + .increment(event.getType(), + clock.getTime() - startTime); + } else { + dispatch(event); + } if (printTrigger) { //Log the latest dispatch event type // may cause the too many events queued @@ -369,4 +388,9 @@ protected boolean isDrained() { protected boolean isStopped() { return stopped; } + + public void addMetrics(EventTypeMetrics metrics, + Class eventClass) { + eventTypeMetricsMap.put(eventClass, metrics); + } } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java index cadb73663a..849bb402d8 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/EventDispatcher.java @@ -19,6 +19,9 @@ package org.apache.hadoop.yarn.event; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; +import org.apache.hadoop.yarn.metrics.EventTypeMetrics; +import org.apache.hadoop.yarn.util.Clock; +import org.apache.hadoop.yarn.util.MonotonicClock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -47,12 +50,15 @@ public class EventDispatcher extends private final Thread eventProcessor; private volatile boolean stopped = false; private boolean shouldExitOnError = true; + private EventTypeMetrics metrics; private static final Logger LOG = LoggerFactory.getLogger(EventDispatcher.class); private static final Marker FATAL = MarkerFactory.getMarker("FATAL"); + private Clock clock = new MonotonicClock(); + private final class EventProcessor implements Runnable { @Override public void run() { @@ -68,7 +74,14 @@ public void run() { } try { - handler.handle(event); + if (metrics != null) { + long startTime = clock.getTime(); + handler.handle(event); + metrics.increment(event.getType(), + clock.getTime() - startTime); + } else { + handler.handle(event); + } } catch (Throwable t) { // An error occurred, but we are shutting down anyway. // If it was an InterruptedException, the very act of @@ -136,6 +149,7 @@ public void handle(T event) { public void disableExitOnError() { shouldExitOnError = false; } + protected long getEventProcessorId() { return this.eventProcessor.getId(); } @@ -143,4 +157,9 @@ protected long getEventProcessorId() { protected boolean isStopped() { return this.stopped; } + + public void setMetrics(EventTypeMetrics metrics) { + this.metrics = metrics; + } + } diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java new file mode 100644 index 0000000000..7b4af0c3e0 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/DisableEventTypeMetrics.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.annotation.Metrics; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class DisableEventTypeMetrics implements EventTypeMetrics { + @Override + public void increment(Enum type, long processingTimeUs) { + //nop + return; + } + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + //nop + return; + } + + @Override + public long get(Enum type) { + return 0; + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java new file mode 100644 index 0000000000..7a7e4f5489 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/EventTypeMetrics.java @@ -0,0 +1,32 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.annotation.Metrics; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public interface EventTypeMetrics> + extends MetricsSource { + + void increment(T type, long processingTimeUs); + + long get(T type); +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java new file mode 100644 index 0000000000..464edb2778 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/metrics/GenericEventTypeMetrics.java @@ -0,0 +1,160 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.metrics; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsInfo; +import org.apache.hadoop.metrics2.MetricsSystem; +import org.apache.hadoop.metrics2.annotation.Metrics; +import org.apache.hadoop.metrics2.lib.MetricsRegistry; +import org.apache.hadoop.metrics2.lib.MutableGaugeLong; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.EnumMap; + +@InterfaceAudience.Private +@Metrics(context="yarn") +public class GenericEventTypeMetrics> + implements EventTypeMetrics { + + static final Logger LOG = + LoggerFactory.getLogger(GenericEventTypeMetrics.class); + + private final EnumMap eventCountMetrics; + private final EnumMap processingTimeMetrics; + private final MetricsRegistry registry; + private final MetricsSystem ms; + private final MetricsInfo info; + private final Class enumClass; + + private boolean isInitialized = false; + + public GenericEventTypeMetrics(MetricsInfo info, MetricsSystem ms, + final T[] enums, Class enumClass) { + this.enumClass = enumClass; + this.eventCountMetrics = new EnumMap<>(this.enumClass); + this.processingTimeMetrics = new EnumMap<>(this.enumClass); + this.ms = ms; + this.info = info; + this.registry = new MetricsRegistry(this.info); + + //Initialize enum + for (final T type : enums) { + String eventCountMetricsName = + type.toString() + "_" + "event_count"; + String processingTimeMetricsName = + type.toString() + "_" + "processing_time"; + eventCountMetrics.put(type, this.registry. + newGauge(eventCountMetricsName, eventCountMetricsName, 0L)); + processingTimeMetrics.put(type, this.registry. + newGauge(processingTimeMetricsName, processingTimeMetricsName, 0L)); + } + } + + public synchronized GenericEventTypeMetrics registerMetrics() { + if (!isInitialized) { + // Register with the MetricsSystems + if (this.ms != null) { + LOG.info("Registering GenericEventTypeMetrics"); + ms.register(info.name(), + info.description(), this); + isInitialized = true; + } + } + return this; + } + + @Override + public void increment(T type, long processingTimeUs) { + if (eventCountMetrics.get(type) != null) { + eventCountMetrics.get(type).incr(); + processingTimeMetrics.get(type).incr(processingTimeUs); + } + } + + @Override + public long get(T type) { + return eventCountMetrics.get(type).value(); + } + + public long getTotalProcessingTime(T type) { + return processingTimeMetrics.get(type).value(); + } + + public EnumMap getEventCountMetrics() { + return eventCountMetrics; + } + + public EnumMap getProcessingTimeMetrics() { + return processingTimeMetrics; + } + + public MetricsRegistry getRegistry() { + return registry; + } + + public MetricsInfo getInfo() { + return info; + } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + registry.snapshot(collector.addRecord(registry.info()), all); + } + + public Class getEnumClass() { + return enumClass; + } + + /** Builder class for GenericEventTypeMetrics. */ + public static class EventTypeMetricsBuilder>{ + public EventTypeMetricsBuilder() { + } + + public EventTypeMetricsBuilder setEnumClass(Class enumClassValue) { + this.enumClass = enumClassValue; + return this; + } + + public EventTypeMetricsBuilder setEnums(T[] enumsValue) { + this.enums = enumsValue.clone(); + return this; + } + + public EventTypeMetricsBuilder setInfo(MetricsInfo infoValue) { + this.info = infoValue; + return this; + } + + public EventTypeMetricsBuilder setMs(MetricsSystem msValue) { + this.ms = msValue; + return this; + } + + public GenericEventTypeMetrics build() { + return new GenericEventTypeMetrics(info, ms, enums, enumClass); + } + + private MetricsSystem ms; + private MetricsInfo info; + private Class enumClass; + private T[] enums; + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 55ddd12fce..7d2572a4c1 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -20,9 +20,20 @@ import java.lang.reflect.Field; import java.lang.reflect.Modifier; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Set; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; + +import org.apache.hadoop.metrics2.AbstractMetric; +import org.apache.hadoop.metrics2.MetricsRecord; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.test.GenericTestUtils; +import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; import org.slf4j.Logger; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.yarn.conf.YarnConfiguration; @@ -30,6 +41,7 @@ import org.junit.Assert; import org.junit.Test; +import static org.apache.hadoop.metrics2.lib.Interns.info; import static org.junit.Assert.assertEquals; import static org.mockito.Mockito.*; @@ -118,7 +130,7 @@ public void handle(Event event) { } private enum TestEnum { - TestEventType + TestEventType, TestEventType2 } @SuppressWarnings({ "rawtypes", "unchecked" }) @@ -230,5 +242,171 @@ public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() } } -} + @Test + public void testMetricsForDispatcher() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + AsyncDispatcher dispatcher = null; + try { + dispatcher = new AsyncDispatcher("RM Event dispatcher"); + + GenericEventTypeMetrics genericEventTypeMetrics = + new GenericEventTypeMetrics.EventTypeMetricsBuilder() + .setMs(DefaultMetricsSystem.instance()) + .setInfo(info("GenericEventTypeMetrics for " + + TestEnum.class.getName(), + "Metrics for " + dispatcher.getName())) + .setEnumClass(TestEnum.class) + .setEnums(TestEnum.class.getEnumConstants()) + .build().registerMetrics(); + + // We can the metrics enabled for TestEnum + dispatcher.addMetrics(genericEventTypeMetrics, + genericEventTypeMetrics.getEnumClass()); + dispatcher.init(conf); + + // Register handler + dispatcher.register(TestEnum.class, new TestHandler()); + dispatcher.start(); + + for (int i = 0; i < 3; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + + for (int i = 0; i < 2; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType2); + dispatcher.getEventHandler().handle(event); + } + + // Check event type count. + GenericTestUtils.waitFor(() -> genericEventTypeMetrics. + get(TestEnum.TestEventType) == 3, 1000, 10000); + + GenericTestUtils.waitFor(() -> genericEventTypeMetrics. + get(TestEnum.TestEventType2) == 2, 1000, 10000); + + // Check time spend. + Assert.assertTrue(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType) + >= 1500*3); + Assert.assertTrue(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType) + < 1500*4); + + Assert.assertTrue(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType2) + >= 1500*2); + Assert.assertTrue(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType2) + < 1500*3); + + // Make sure metrics consistent. + Assert.assertEquals(Long.toString(genericEventTypeMetrics. + get(TestEnum.TestEventType)), + genericEventTypeMetrics. + getRegistry().get("TestEventType_event_count").toString()); + Assert.assertEquals(Long.toString(genericEventTypeMetrics. + get(TestEnum.TestEventType2)), + genericEventTypeMetrics. + getRegistry().get("TestEventType2_event_count").toString()); + Assert.assertEquals(Long.toString(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType)), + genericEventTypeMetrics. + getRegistry().get("TestEventType_processing_time").toString()); + Assert.assertEquals(Long.toString(genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType2)), + genericEventTypeMetrics. + getRegistry().get("TestEventType2_processing_time").toString()); + + } finally { + dispatcher.close(); + } + + } + + @Test + public void testDispatcherMetricsHistogram() throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + AsyncDispatcher dispatcher = null; + + try { + dispatcher = new AsyncDispatcher("RM Event dispatcher"); + + GenericEventTypeMetrics genericEventTypeMetrics = + new GenericEventTypeMetrics.EventTypeMetricsBuilder() + .setMs(DefaultMetricsSystem.instance()) + .setInfo(info("GenericEventTypeMetrics for " + + TestEnum.class.getName(), + "Metrics for " + dispatcher.getName())) + .setEnumClass(TestEnum.class) + .setEnums(TestEnum.class.getEnumConstants()) + .build().registerMetrics(); + + // We can the metrics enabled for TestEnum + dispatcher.addMetrics(genericEventTypeMetrics, + genericEventTypeMetrics.getEnumClass()); + dispatcher.init(conf); + + // Register handler + dispatcher.register(TestEnum.class, new TestHandler()); + dispatcher.start(); + + for (int i = 0; i < 3; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + + for (int i = 0; i < 2; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType2); + dispatcher.getEventHandler().handle(event); + } + + // Check event type count. + GenericTestUtils.waitFor(() -> genericEventTypeMetrics. + get(TestEnum.TestEventType) == 3, 1000, 10000); + + GenericTestUtils.waitFor(() -> genericEventTypeMetrics. + get(TestEnum.TestEventType2) == 2, 1000, 10000); + + // submit actual values + Map expectedValues = new HashMap<>(); + expectedValues.put("TestEventType_event_count", + genericEventTypeMetrics.get(TestEnum.TestEventType)); + expectedValues.put("TestEventType_processing_time", + genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType)); + expectedValues.put("TestEventType2_event_count", + genericEventTypeMetrics.get(TestEnum.TestEventType2)); + expectedValues.put("TestEventType2_processing_time", + genericEventTypeMetrics. + getTotalProcessingTime(TestEnum.TestEventType2)); + Set testResults = new HashSet<>(); + + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + genericEventTypeMetrics.getMetrics(collector, true); + + for (MetricsRecord record : collector.getRecords()) { + for (AbstractMetric metric : record.metrics()) { + String metricName = metric.name(); + if (expectedValues.containsKey(metricName)) { + Long expectedValue = expectedValues.get(metricName); + Assert.assertEquals( + "Metric " + metricName + " doesn't have expected value", + expectedValue, metric.value()); + testResults.add(metricName); + } + } + } + Assert.assertEquals(expectedValues.keySet(), testResults); + + } finally { + dispatcher.close(); + } + + } +} \ No newline at end of file diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java new file mode 100644 index 0000000000..8fda9b7f38 --- /dev/null +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/GenericEventTypeMetricsManager.java @@ -0,0 +1,42 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.yarn.server.resourcemanager; + +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; + +import static org.apache.hadoop.metrics2.lib.Interns.info; + +public final class GenericEventTypeMetricsManager { + + private GenericEventTypeMetricsManager() { + // nothing to do + } + + // Construct a GenericEventTypeMetrics for dispatcher + public static > GenericEventTypeMetrics + create(String dispatcherName, Class eventTypeClass) { + return new GenericEventTypeMetrics.EventTypeMetricsBuilder() + .setMs(DefaultMetricsSystem.instance()) + .setInfo(info("GenericEventTypeMetrics for " + eventTypeClass.getName(), + "Metrics for " + dispatcherName)) + .setEnumClass(eventTypeClass) + .setEnums(eventTypeClass.getEnumConstants()) + .build().registerMetrics(); + } +} diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java index bb8a3ba7db..ca47fdb648 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ResourceManager.java @@ -21,6 +21,7 @@ import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import com.sun.jersey.spi.container.servlet.ServletContainer; +import org.apache.hadoop.yarn.metrics.GenericEventTypeMetrics; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.slf4j.Marker; @@ -63,6 +64,7 @@ import org.apache.hadoop.yarn.event.Dispatcher; import org.apache.hadoop.yarn.event.EventDispatcher; import org.apache.hadoop.yarn.event.EventHandler; + import org.apache.hadoop.yarn.exceptions.YarnException; import org.apache.hadoop.yarn.exceptions.YarnRuntimeException; import org.apache.hadoop.yarn.nodelabels.NodeAttributesManager; @@ -470,7 +472,14 @@ protected EventHandler createSchedulerEventDispatcher() { } protected Dispatcher createDispatcher() { - return new AsyncDispatcher("RM Event dispatcher"); + AsyncDispatcher dispatcher = new AsyncDispatcher("RM Event dispatcher"); + GenericEventTypeMetrics genericEventTypeMetrics = + GenericEventTypeMetricsManager. + create(dispatcher.getName(), NodesListManagerEventType.class); + // We can add more + dispatcher.addMetrics(genericEventTypeMetrics, + genericEventTypeMetrics.getEnumClass()); + return dispatcher; } protected ResourceScheduler createScheduler() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java index 1cb5e1d0e7..b9c5500a7d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/TestResourceManager.java @@ -29,6 +29,7 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; import org.apache.hadoop.http.lib.StaticUserWebFilter; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.security.AuthenticationFilterInitializer; import org.apache.hadoop.security.UserGroupInformation; @@ -73,6 +74,7 @@ public class TestResourceManager { public void setUp() throws Exception { YarnConfiguration conf = new YarnConfiguration(); UserGroupInformation.setConfiguration(conf); + DefaultMetricsSystem.setMiniClusterMode(true); resourceManager = new ResourceManager(); resourceManager.init(conf); resourceManager.getRMContext().getContainerTokenSecretManager().rollMasterKey(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java index a75be7745f..933eaf91fe 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/TestSchedulerHealth.java @@ -18,6 +18,7 @@ package org.apache.hadoop.yarn.server.resourcemanager.scheduler; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.net.NetworkTopology; import org.apache.hadoop.util.Time; import org.apache.hadoop.yarn.api.records.ContainerId; @@ -52,6 +53,7 @@ public class TestSchedulerHealth { private ResourceManager resourceManager; public void setup() { + DefaultMetricsSystem.setMiniClusterMode(true); resourceManager = new ResourceManager() { @Override protected RMNodeLabelsManager createNodeLabelManager() { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java index 6b0c42f9b1..8bc18481cb 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/test/java/org/apache/hadoop/yarn/server/resourcemanager/scheduler/capacity/TestCapacityScheduler.java @@ -210,6 +210,7 @@ public class TestCapacityScheduler extends CapacitySchedulerTestBase { @Before public void setUp() throws Exception { ResourceUtils.resetResourceTypes(new Configuration()); + DefaultMetricsSystem.setMiniClusterMode(true); resourceManager = new ResourceManager() { @Override protected RMNodeLabelsManager createNodeLabelManager() {