From 954930e9d97b57bd3f595d2c123f4821f865ca3a Mon Sep 17 00:00:00 2001 From: Akira Ajisaka Date: Fri, 14 Feb 2020 15:20:28 +0900 Subject: [PATCH] HADOOP-16850. Support getting thread info from thread group for JvmMetrics to improve the performance. Contributed by Tao Yang. --- .../hadoop/fs/CommonConfigurationKeys.java | 9 ++ .../hadoop/metrics2/source/JvmMetrics.java | 57 +++++++++++- .../src/main/resources/core-default.xml | 9 ++ .../metrics2/source/TestJvmMetrics.java | 91 ++++++++++++++++++- 4 files changed, 160 insertions(+), 6 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index 8c09db1284..40ddfba3c0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -426,4 +426,13 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { "dfs.client.ignore.namenode.default.kms.uri"; public static final boolean DFS_CLIENT_IGNORE_NAMENODE_DEFAULT_KMS_URI_DEFAULT = false; + + /** + * Whether or not ThreadMXBean is used for getting thread info in JvmMetrics, + * ThreadGroup approach is preferred for better performance. + */ + public static final String HADOOP_METRICS_JVM_USE_THREAD_MXBEAN = + "hadoop.metrics.jvm.use-thread-mxbean"; + public static final boolean HADOOP_METRICS_JVM_USE_THREAD_MXBEAN_DEFAULT = + false; } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java index 5f9afddc57..f19a2be0b4 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/source/JvmMetrics.java @@ -31,6 +31,8 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.log.metrics.EventCounter; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsInfo; @@ -84,7 +86,7 @@ public class JvmMetrics implements MetricsSource { final MemoryMXBean memoryMXBean = ManagementFactory.getMemoryMXBean(); final List gcBeans = ManagementFactory.getGarbageCollectorMXBeans(); - final ThreadMXBean threadMXBean = ManagementFactory.getThreadMXBean(); + private ThreadMXBean threadMXBean; final String processName, sessionId; private JvmPauseMonitor pauseMonitor = null; final ConcurrentHashMap gcInfoCache = @@ -92,9 +94,12 @@ public class JvmMetrics implements MetricsSource { private GcTimeMonitor gcTimeMonitor = null; @VisibleForTesting - JvmMetrics(String processName, String sessionId) { + JvmMetrics(String processName, String sessionId, boolean useThreadMXBean) { this.processName = processName; this.sessionId = sessionId; + if (useThreadMXBean) { + this.threadMXBean = ManagementFactory.getThreadMXBean(); + } } public void setPauseMonitor(final JvmPauseMonitor pauseMonitor) { @@ -108,8 +113,15 @@ public class JvmMetrics implements MetricsSource { public static JvmMetrics create(String processName, String sessionId, MetricsSystem ms) { + // Reloading conf instead of getting from outside since it's redundant in + // code level to update all the callers across lots of modules, + // this method is called at most once for components (NN/DN/RM/NM/...) + // so that the overall cost is not expensive. + boolean useThreadMXBean = new Configuration().getBoolean( + CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN, + CommonConfigurationKeys.HADOOP_METRICS_JVM_USE_THREAD_MXBEAN_DEFAULT); return ms.register(JvmMetrics.name(), JvmMetrics.description(), - new JvmMetrics(processName, sessionId)); + new JvmMetrics(processName, sessionId, useThreadMXBean)); } public static void reattach(MetricsSystem ms, JvmMetrics jvmMetrics) { @@ -137,7 +149,11 @@ public class JvmMetrics implements MetricsSource { .tag(SessionId, sessionId); getMemoryUsage(rb); getGcUsage(rb); - getThreadUsage(rb); + if (threadMXBean != null) { + getThreadUsage(rb); + } else { + getThreadUsageFromGroup(rb); + } getEventCounters(rb); } @@ -235,6 +251,39 @@ public class JvmMetrics implements MetricsSource { .addGauge(ThreadsTerminated, threadsTerminated); } + private void getThreadUsageFromGroup(MetricsRecordBuilder rb) { + int threadsNew = 0; + int threadsRunnable = 0; + int threadsBlocked = 0; + int threadsWaiting = 0; + int threadsTimedWaiting = 0; + int threadsTerminated = 0; + ThreadGroup threadGroup = Thread.currentThread().getThreadGroup(); + Thread[] threads = new Thread[threadGroup.activeCount()]; + threadGroup.enumerate(threads); + for (Thread thread : threads) { + if (thread == null) { + // race protection + continue; + } + switch (thread.getState()) { + case NEW: threadsNew++; break; + case RUNNABLE: threadsRunnable++; break; + case BLOCKED: threadsBlocked++; break; + case WAITING: threadsWaiting++; break; + case TIMED_WAITING: threadsTimedWaiting++; break; + case TERMINATED: threadsTerminated++; break; + default: + } + } + rb.addGauge(ThreadsNew, threadsNew) + .addGauge(ThreadsRunnable, threadsRunnable) + .addGauge(ThreadsBlocked, threadsBlocked) + .addGauge(ThreadsWaiting, threadsWaiting) + .addGauge(ThreadsTimedWaiting, threadsTimedWaiting) + .addGauge(ThreadsTerminated, threadsTerminated); + } + private void getEventCounters(MetricsRecordBuilder rb) { rb.addCounter(LogFatal, EventCounter.getFatal()) .addCounter(LogError, EventCounter.getError()) diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index fe0f8cc540..b345eb6e69 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -3843,4 +3843,13 @@ Enable Server Name Indication (SNI) host check for HTTPS enabled server. + + + hadoop.metrics.jvm.use-thread-mxbean + false + + Whether or not ThreadMXBean is used for getting thread info in JvmMetrics, + ThreadGroup approach is preferred for better performance. + + diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java index 37a3a2affc..6fdd64dca7 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/source/TestJvmMetrics.java @@ -18,6 +18,7 @@ package org.apache.hadoop.metrics2.source; +import org.apache.hadoop.metrics2.impl.MetricsCollectorImpl; import org.apache.hadoop.util.GcTimeMonitor; import org.junit.After; import org.junit.Assert; @@ -37,6 +38,7 @@ import org.apache.hadoop.test.GenericTestUtils; import org.apache.hadoop.util.JvmPauseMonitor; import java.util.ArrayList; +import java.util.Iterator; import java.util.List; import static org.apache.hadoop.metrics2.source.JvmMetricsInfo.*; @@ -65,7 +67,7 @@ public class TestJvmMetrics { pauseMonitor = new JvmPauseMonitor(); pauseMonitor.init(new Configuration()); pauseMonitor.start(); - JvmMetrics jvmMetrics = new JvmMetrics("test", "test"); + JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false); jvmMetrics.setPauseMonitor(pauseMonitor); MetricsRecordBuilder rb = getMetrics(jvmMetrics); MetricsCollector mc = rb.parent(); @@ -91,7 +93,7 @@ public class TestJvmMetrics { public void testGcTimeMonitorPresence() { gcTimeMonitor = new GcTimeMonitor(60000, 1000, 70, null); gcTimeMonitor.start(); - JvmMetrics jvmMetrics = new JvmMetrics("test", "test"); + JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false); jvmMetrics.setGcTimeMonitor(gcTimeMonitor); MetricsRecordBuilder rb = getMetrics(jvmMetrics); MetricsCollector mc = rb.parent(); @@ -226,4 +228,89 @@ public class TestJvmMetrics { Assert.assertEquals("unexpected process name of the singleton instance", process1Name, jvmMetrics2.processName); } + + /** + * Performance test for JvmMetrics#getMetrics, comparing performance of + * getting thread usage from ThreadMXBean with that from ThreadGroup. + */ + @Test + public void testGetMetricsPerf() { + JvmMetrics jvmMetricsUseMXBean = new JvmMetrics("test", "test", true); + JvmMetrics jvmMetrics = new JvmMetrics("test", "test", false); + MetricsCollectorImpl collector = new MetricsCollectorImpl(); + // warm up + jvmMetrics.getMetrics(collector, true); + jvmMetricsUseMXBean.getMetrics(collector, true); + // test cases with different numbers of threads + int[] numThreadsCases = {100, 200, 500, 1000, 2000, 3000}; + List threads = new ArrayList(); + for (int numThreads : numThreadsCases) { + updateThreadsAndWait(threads, numThreads); + long startNs = System.nanoTime(); + jvmMetricsUseMXBean.getMetrics(collector, true); + long processingNsFromMXBean = System.nanoTime() - startNs; + startNs = System.nanoTime(); + jvmMetrics.getMetrics(collector, true); + long processingNsFromGroup = System.nanoTime() - startNs; + System.out.println( + "#Threads=" + numThreads + ", ThreadMXBean=" + processingNsFromMXBean + + " ns, ThreadGroup=" + processingNsFromGroup + " ns, ratio: " + ( + processingNsFromMXBean / processingNsFromGroup)); + } + // cleanup + updateThreadsAndWait(threads, 0); + } + + private static void updateThreadsAndWait(List threads, + int expectedNumThreads) { + // add/remove threads according to expected number + int addNum = expectedNumThreads - threads.size(); + if (addNum > 0) { + for (int i = 0; i < addNum; i++) { + TestThread testThread = new TestThread(); + testThread.start(); + threads.add(testThread); + } + } else if (addNum < 0) { + for (int i = 0; i < Math.abs(addNum); i++) { + threads.get(i).exit = true; + } + } else { + return; + } + // wait for threads to reach the expected number + while (true) { + Iterator it = threads.iterator(); + while (it.hasNext()) { + if (it.next().exited) { + it.remove(); + } + } + if (threads.size() == expectedNumThreads) { + break; + } else { + try { + Thread.sleep(500); + } catch (InterruptedException e) { + //ignore + } + } + } + } + + static class TestThread extends Thread { + private volatile boolean exit = false; + private boolean exited = false; + @Override + public void run() { + while (!exit) { + try { + Thread.sleep(1000); + } catch (InterruptedException e) { + e.printStackTrace(); + } + } + exited = true; + } + } }