From 77c13c385774c51766fe505397fa916754ac08d4 Mon Sep 17 00:00:00 2001 From: Zhe Zhang Date: Tue, 8 Nov 2016 16:07:36 -0800 Subject: [PATCH] HADOOP-13782. Make MutableRates metrics thread-local write, aggregate-on-read. Contributed by Erik Krogen. --- .../ipc/metrics/RpcDetailedMetrics.java | 4 +- .../metrics2/lib/MutableMetricsFactory.java | 5 + .../hadoop/metrics2/lib/MutableRates.java | 6 + .../lib/MutableRatesWithAggregation.java | 148 ++++++++++++++++++ .../metrics2/lib/TestMutableMetrics.java | 148 +++++++++++++++++- 5 files changed, 306 insertions(+), 5 deletions(-) create mode 100644 hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java index 7414364d36..ad367425e8 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -24,7 +24,7 @@ import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.MetricsRegistry; -import org.apache.hadoop.metrics2.lib.MutableRates; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; /** * This class is for maintaining RPC method related statistics @@ -34,7 +34,7 @@ @Metrics(about="Per method RPC metrics", context="rpcdetailed") public class RpcDetailedMetrics { - @Metric MutableRates rates; + @Metric MutableRatesWithAggregation rates; static final Log LOG = LogFactory.getLog(RpcDetailedMetrics.class); final MetricsRegistry registry; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java index ae2cb4f02c..b926c4b9b0 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableMetricsFactory.java @@ -67,6 +67,11 @@ MutableMetric newForField(Field field, Metric annotation, if (cls == MutableRates.class) { return new MutableRates(registry); } + if (cls == MutableRatesWithAggregation.class) { + MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); + registry.add(info.name(), rates); + return rates; + } if (cls == MutableStat.class) { return registry.newStat(info.name(), info.description(), annotation.sampleName(), annotation.valueName(), diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java index 121c292f4f..1074e87255 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRates.java @@ -33,6 +33,12 @@ /** * Helper class to manage a group of mutable rate metrics + * + * This class synchronizes all accesses to the metrics it + * contains, so it should not be used in situations where + * there is high contention on the metrics. + * {@link MutableRatesWithAggregation} is preferable in that + * situation. */ @InterfaceAudience.Public @InterfaceStability.Evolving diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java new file mode 100644 index 0000000000..64eae039f4 --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java @@ -0,0 +1,148 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hadoop.metrics2.lib; + +import com.google.common.collect.Sets; +import java.lang.ref.WeakReference; +import java.lang.reflect.Method; +import java.util.HashMap; +import java.util.Iterator; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.ConcurrentLinkedDeque; +import java.util.concurrent.ConcurrentMap; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.util.SampleStat; + + +/** + * Helper class to manage a group of mutable rate metrics. + * + * Each thread will maintain a local rate count, and upon snapshot, + * these values will be aggregated into a global rate. This class + * should only be used for long running threads, as any metrics + * produced between the last snapshot and the death of a thread + * will be lost. This allows for significantly higher concurrency + * than {@link MutableRates}. See HADOOP-24420. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MutableRatesWithAggregation extends MutableMetric { + static final Log LOG = LogFactory.getLog(MutableRatesWithAggregation.class); + private final Map globalMetrics = new HashMap<>(); + private final Set> protocolCache = Sets.newHashSet(); + + private final ConcurrentLinkedDeque>> + weakReferenceQueue = new ConcurrentLinkedDeque<>(); + private final ThreadLocal> + threadLocalMetricsMap = new ThreadLocal<>(); + + /** + * Initialize the registry with all the methods in a protocol + * so they all show up in the first snapshot. + * Convenient for JMX implementations. + * @param protocol the protocol class + */ + public void init(Class protocol) { + if (protocolCache.contains(protocol)) { + return; + } + protocolCache.add(protocol); + for (Method method : protocol.getDeclaredMethods()) { + String name = method.getName(); + LOG.debug(name); + addMetricIfNotExists(name); + } + } + + /** + * Add a rate sample for a rate metric. + * @param name of the rate metric + * @param elapsed time + */ + public void add(String name, long elapsed) { + ConcurrentMap localStats = + threadLocalMetricsMap.get(); + if (localStats == null) { + localStats = new ConcurrentHashMap<>(); + threadLocalMetricsMap.set(localStats); + weakReferenceQueue.add(new WeakReference<>(localStats)); + } + ThreadSafeSampleStat stat = localStats.get(name); + if (stat == null) { + stat = new ThreadSafeSampleStat(); + localStats.put(name, stat); + } + stat.add(elapsed); + } + + @Override + public synchronized void snapshot(MetricsRecordBuilder rb, boolean all) { + Iterator>> iter = + weakReferenceQueue.iterator(); + while (iter.hasNext()) { + ConcurrentMap map = iter.next().get(); + if (map == null) { + // Thread has died; clean up its state + iter.remove(); + } else { + // Aggregate the thread's local samples into the global metrics + for (Map.Entry entry : map.entrySet()) { + String name = entry.getKey(); + MutableRate globalMetric = addMetricIfNotExists(name); + entry.getValue().snapshotInto(globalMetric); + } + } + } + for (MutableRate globalMetric : globalMetrics.values()) { + globalMetric.snapshot(rb, all); + } + } + + private synchronized MutableRate addMetricIfNotExists(String name) { + MutableRate metric = globalMetrics.get(name); + if (metric == null) { + metric = new MutableRate(name, name, false); + globalMetrics.put(name, metric); + } + return metric; + } + + private static class ThreadSafeSampleStat { + + private SampleStat stat = new SampleStat(); + + synchronized void add(double x) { + stat.add(x); + } + + synchronized void snapshotInto(MutableRate metric) { + if (stat.numSamples() > 0) { + metric.add(stat.numSamples(), Math.round(stat.total())); + stat.reset(); + } + } + } + +} diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java index 9161df581c..1faa361def 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java @@ -19,9 +19,7 @@ package org.apache.hadoop.metrics2.lib; import static org.apache.hadoop.metrics2.lib.Interns.info; -import static org.apache.hadoop.test.MetricsAsserts.assertCounter; -import static org.apache.hadoop.test.MetricsAsserts.assertGauge; -import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder; +import static org.apache.hadoop.test.MetricsAsserts.*; import static org.mockito.AdditionalMatchers.eq; import static org.mockito.AdditionalMatchers.geq; import static org.mockito.AdditionalMatchers.leq; @@ -29,10 +27,15 @@ import static org.mockito.Matchers.eq; import static org.mockito.Mockito.times; import static org.mockito.Mockito.verify; +import static org.junit.Assert.*; import java.util.Map; import java.util.Map.Entry; +import java.util.Random; +import java.util.concurrent.CountDownLatch; +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.util.Quantile; import org.junit.Test; @@ -42,6 +45,7 @@ */ public class TestMutableMetrics { + private static final Log LOG = LogFactory.getLog(TestMutableMetrics.class); private final double EPSILON = 1e-42; /** @@ -129,6 +133,144 @@ interface TestProtocol { assertGauge("BarAvgTime", 0.0, rb); } + @Test public void testMutableRatesWithAggregationInit() { + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); + + rates.init(TestProtocol.class); + rates.snapshot(rb, false); + + assertCounter("FooNumOps", 0L, rb); + assertGauge("FooAvgTime", 0.0, rb); + assertCounter("BarNumOps", 0L, rb); + assertGauge("BarAvgTime", 0.0, rb); + } + + @Test public void testMutableRatesWithAggregationSingleThread() { + MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); + + rates.add("foo", 1); + rates.add("bar", 5); + + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + rates.snapshot(rb, false); + assertCounter("FooNumOps", 1L, rb); + assertGauge("FooAvgTime", 1.0, rb); + assertCounter("BarNumOps", 1L, rb); + assertGauge("BarAvgTime", 5.0, rb); + + rates.add("foo", 1); + rates.add("foo", 3); + rates.add("bar", 6); + + rb = mockMetricsRecordBuilder(); + rates.snapshot(rb, false); + assertCounter("FooNumOps", 3L, rb); + assertGauge("FooAvgTime", 2.0, rb); + assertCounter("BarNumOps", 2L, rb); + assertGauge("BarAvgTime", 6.0, rb); + } + + @Test public void testMutableRatesWithAggregationManyThreads() + throws InterruptedException { + final MutableRatesWithAggregation rates = new MutableRatesWithAggregation(); + + final int n = 10; + long[] opCount = new long[n]; + double[] opTotalTime = new double[n]; + + for (int i = 0; i < n; i++) { + opCount[i] = 0; + opTotalTime[i] = 0; + // Initialize so that the getLongCounter() method doesn't complain + rates.add("metric" + i, 0); + } + + Thread[] threads = new Thread[n]; + final CountDownLatch firstAddsFinished = new CountDownLatch(threads.length); + final CountDownLatch firstSnapshotsFinished = new CountDownLatch(1); + final CountDownLatch secondAddsFinished = + new CountDownLatch(threads.length); + final CountDownLatch secondSnapshotsFinished = new CountDownLatch(1); + long seed = new Random().nextLong(); + LOG.info("Random seed = " + seed); + final Random sleepRandom = new Random(seed); + for (int tIdx = 0; tIdx < threads.length; tIdx++) { + final int threadIdx = tIdx; + threads[threadIdx] = new Thread() { + @Override + public void run() { + try { + for (int i = 0; i < 1000; i++) { + rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2); + // Sleep so additions can be interleaved with snapshots + Thread.sleep(sleepRandom.nextInt(5)); + } + firstAddsFinished.countDown(); + + // Make sure all threads stay alive long enough for the first + // snapshot to complete; else their metrics may be lost to GC + firstSnapshotsFinished.await(); + + // Let half the threads continue with more metrics and let half die + if (threadIdx % 2 == 0) { + for (int i = 0; i < 1000; i++) { + rates.add("metric" + (i % n), (i / n) % 2 == 0 ? 1 : 2); + } + secondAddsFinished.countDown(); + secondSnapshotsFinished.await(); + } else { + secondAddsFinished.countDown(); + } + } catch (InterruptedException e) { + // Ignore + } + } + }; + } + for (Thread t : threads) { + t.start(); + } + // Snapshot concurrently with additions but aggregate the totals into + // opCount / opTotalTime + for (int i = 0; i < 100; i++) { + snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); + Thread.sleep(sleepRandom.nextInt(20)); + } + firstAddsFinished.await(); + // Final snapshot to grab any remaining metrics and then verify that + // the totals are as expected + snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); + for (int i = 0; i < n; i++) { + assertEquals("metric" + i + " count", 1001, opCount[i]); + assertEquals("metric" + i + " total", 1500, opTotalTime[i], 1.0); + } + firstSnapshotsFinished.countDown(); + + // After half of the threads die, ensure that the remaining ones still + // add metrics correctly and that snapshot occurs correctly + secondAddsFinished.await(); + snapshotMutableRatesWithAggregation(rates, opCount, opTotalTime); + for (int i = 0; i < n; i++) { + assertEquals("metric" + i + " count", 1501, opCount[i]); + assertEquals("metric" + i + " total", 2250, opTotalTime[i], 1.0); + } + secondSnapshotsFinished.countDown(); + } + + private static void snapshotMutableRatesWithAggregation( + MutableRatesWithAggregation rates, long[] opCount, double[] opTotalTime) { + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + rates.snapshot(rb, true); + for (int i = 0; i < opCount.length; i++) { + long prevOpCount = opCount[i]; + long newOpCount = getLongCounter("Metric" + i + "NumOps", rb); + opCount[i] = newOpCount; + double avgTime = getDoubleGauge("Metric" + i + "AvgTime", rb); + opTotalTime[i] += avgTime * (newOpCount - prevOpCount); + } + } + /** * Tests that when using {@link MutableStat#add(long, long)}, even with a high * sample count, the mean does not lose accuracy.