diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java index b71f7f8cc5..31031b808e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MetricsRegistry.java @@ -227,6 +227,29 @@ public synchronized MutableQuantiles newQuantiles(String name, String desc, return ret; } + /** + * Create a mutable inverse metric that estimates inverse quantiles of a stream of values + * @param name of the metric + * @param desc metric description + * @param sampleName of the metric (e.g., "Ops") + * @param valueName of the metric (e.g., "Rate") + * @param interval rollover interval of estimator in seconds + * @return a new inverse quantile estimator object + * @throws MetricsException if interval is not a positive integer + */ + public synchronized MutableQuantiles newInverseQuantiles(String name, String desc, + String sampleName, String valueName, int interval) { + checkMetricName(name); + if (interval <= 0) { + throw new MetricsException("Interval should be positive. Value passed" + + " is: " + interval); + } + MutableQuantiles ret = + new MutableInverseQuantiles(name, desc, sampleName, valueName, interval); + metricsMap.put(name, ret); + return ret; + } + /** * Create a mutable metric with stats * @param name of the metric @@ -278,7 +301,7 @@ public MutableRate newRate(String name, String description) { } /** - * Create a mutable rate metric (for throughput measurement) + * Create a mutable rate metric (for throughput measurement). * @param name of the metric * @param desc description * @param extended produce extended stat (stdev/min/max etc.) if true diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java new file mode 100644 index 0000000000..a3d579cb9e --- /dev/null +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableInverseQuantiles.java @@ -0,0 +1,89 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.metrics2.lib; + +import org.apache.hadoop.classification.InterfaceAudience; +import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.classification.VisibleForTesting; +import org.apache.hadoop.metrics2.util.Quantile; +import org.apache.hadoop.metrics2.util.SampleQuantiles; +import java.text.DecimalFormat; +import static org.apache.hadoop.metrics2.lib.Interns.info; + +/** + * Watches a stream of long values, maintaining online estimates of specific + * quantiles with provably low error bounds. Inverse quantiles are meant for + * highly accurate low-percentile (e.g. 1st, 5th) metrics. + * InverseQuantiles are used for metrics where higher the value better it is. + * ( eg: data transfer rate ). + * The 1st percentile here corresponds to the 99th inverse percentile metric, + * 5th percentile to 95th and so on. + */ +@InterfaceAudience.Public +@InterfaceStability.Evolving +public class MutableInverseQuantiles extends MutableQuantiles{ + + static class InversePercentile extends Quantile { + InversePercentile(double inversePercentile) { + super(inversePercentile/100, inversePercentile/1000); + } + } + + @VisibleForTesting + public static final Quantile[] INVERSE_QUANTILES = {new InversePercentile(50), + new InversePercentile(25), new InversePercentile(10), + new InversePercentile(5), new InversePercentile(1)}; + + /** + * Instantiates a new {@link MutableInverseQuantiles} for a metric that rolls itself + * over on the specified time interval. + * + * @param name of the metric + * @param description long-form textual description of the metric + * @param sampleName type of items in the stream (e.g., "Ops") + * @param valueName type of the values + * @param intervalSecs rollover interval (in seconds) of the estimator + */ + public MutableInverseQuantiles(String name, String description, String sampleName, + String valueName, int intervalSecs) { + super(name, description, sampleName, valueName, intervalSecs); + } + + /** + * Sets quantileInfo and estimator. + * + * @param ucName capitalized name of the metric + * @param uvName capitalized type of the values + * @param desc uncapitalized long-form textual description of the metric + * @param lvName uncapitalized type of the values + * @param df Number formatter for inverse percentile value + */ + void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat df) { + // Construct the MetricsInfos for inverse quantiles, converting to inverse percentiles + setQuantileInfos(INVERSE_QUANTILES.length); + for (int i = 0; i < INVERSE_QUANTILES.length; i++) { + double inversePercentile = 100 * (1 - INVERSE_QUANTILES[i].quantile); + String nameTemplate = ucName + df.format(inversePercentile) + "thInversePercentile" + uvName; + String descTemplate = df.format(inversePercentile) + " inverse percentile " + lvName + + " with " + getInterval() + " second interval for " + desc; + addQuantileInfo(i, info(nameTemplate, descTemplate)); + } + + setEstimator(new SampleQuantiles(INVERSE_QUANTILES)); + } +} diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java index f7dfaffb3f..edb2159f17 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableQuantiles.java @@ -20,6 +20,7 @@ import static org.apache.hadoop.metrics2.lib.Interns.info; +import java.text.DecimalFormat; import java.util.Map; import java.util.concurrent.Executors; import java.util.concurrent.ScheduledExecutorService; @@ -52,9 +53,10 @@ public class MutableQuantiles extends MutableMetric { new Quantile(0.75, 0.025), new Quantile(0.90, 0.010), new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; - private final MetricsInfo numInfo; - private final MetricsInfo[] quantileInfos; - private final int interval; + private MetricsInfo numInfo; + private MetricsInfo[] quantileInfos; + private int intervalSecs; + private static DecimalFormat decimalFormat = new DecimalFormat("###.####"); private QuantileEstimator estimator; private long previousCount = 0; @@ -91,26 +93,39 @@ public MutableQuantiles(String name, String description, String sampleName, String lsName = StringUtils.uncapitalize(sampleName); String lvName = StringUtils.uncapitalize(valueName); - numInfo = info(ucName + "Num" + usName, String.format( - "Number of %s for %s with %ds interval", lsName, desc, interval)); - // Construct the MetricsInfos for the quantiles, converting to percentiles - quantileInfos = new MetricsInfo[quantiles.length]; - String nameTemplate = ucName + "%dthPercentile" + uvName; - String descTemplate = "%d percentile " + lvName + " with " + interval - + " second interval for " + desc; - for (int i = 0; i < quantiles.length; i++) { - int percentile = (int) (100 * quantiles[i].quantile); - quantileInfos[i] = info(String.format(nameTemplate, percentile), - String.format(descTemplate, percentile)); - } - - estimator = new SampleQuantiles(quantiles); - - this.interval = interval; + setInterval(interval); + setNumInfo(info(ucName + "Num" + usName, String.format( + "Number of %s for %s with %ds interval", lsName, desc, interval))); scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this), interval, interval, TimeUnit.SECONDS); + setQuantiles(ucName, uvName, desc, lvName, decimalFormat); } + /** + * Sets quantileInfo and estimator. + * + * @param ucName capitalized name of the metric + * @param uvName capitalized type of the values + * @param desc uncapitalized long-form textual description of the metric + * @param lvName uncapitalized type of the values + * @param pDecimalFormat Number formatter for percentile value + */ + void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat pDecimalFormat) { + // Construct the MetricsInfos for the quantiles, converting to percentiles + setQuantileInfos(quantiles.length); + for (int i = 0; i < quantiles.length; i++) { + double percentile = 100 * quantiles[i].quantile; + String nameTemplate = ucName + pDecimalFormat.format(percentile) + "thPercentile" + uvName; + String descTemplate = pDecimalFormat.format(percentile) + " percentile " + lvName + + " with " + getInterval() + " second interval for " + desc; + addQuantileInfo(i, info(nameTemplate, descTemplate)); + } + + setEstimator(new SampleQuantiles(quantiles)); + } + + public MutableQuantiles() {} + @Override public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) { if (all || changed()) { @@ -133,8 +148,50 @@ public synchronized void add(long value) { estimator.insert(value); } - public int getInterval() { - return interval; + /** + * Set info about the metrics. + * + * @param pNumInfo info about the metrics. + */ + public synchronized void setNumInfo(MetricsInfo pNumInfo) { + this.numInfo = pNumInfo; + } + + /** + * Initialize quantileInfos array. + * + * @param length of the quantileInfos array. + */ + public synchronized void setQuantileInfos(int length) { + this.quantileInfos = new MetricsInfo[length]; + } + + /** + * Add entry to quantileInfos array. + * + * @param i array index. + * @param info info to be added to quantileInfos array. + */ + public synchronized void addQuantileInfo(int i, MetricsInfo info) { + this.quantileInfos[i] = info; + } + + /** + * Set the rollover interval (in seconds) of the estimator. + * + * @param pIntervalSecs of the estimator. + */ + public synchronized void setInterval(int pIntervalSecs) { + this.intervalSecs = pIntervalSecs; + } + + /** + * Get the rollover interval (in seconds) of the estimator. + * + * @return intervalSecs of the estimator. + */ + public synchronized int getInterval() { + return intervalSecs; } public void stop() { diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java index c7d8f60b18..aefd7a264b 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/util/TestSampleQuantiles.java @@ -24,6 +24,7 @@ import java.util.Map; import java.util.Random; +import org.apache.hadoop.metrics2.lib.MutableInverseQuantiles; import org.junit.Before; import org.junit.Test; @@ -36,6 +37,7 @@ public class TestSampleQuantiles { new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) }; SampleQuantiles estimator; + final static int NUM_REPEATS = 10; @Before public void init() { @@ -91,28 +93,70 @@ public void testClear() throws IOException { @Test public void testQuantileError() throws IOException { final int count = 100000; - Random r = new Random(0xDEADDEAD); - Long[] values = new Long[count]; + Random rnd = new Random(0xDEADDEAD); + int[] values = new int[count]; for (int i = 0; i < count; i++) { - values[i] = (long) (i + 1); + values[i] = i + 1; } - // Do 10 shuffle/insert/check cycles - for (int i = 0; i < 10; i++) { - System.out.println("Starting run " + i); - Collections.shuffle(Arrays.asList(values), r); + + // Repeat shuffle/insert/check cycles 10 times + for (int i = 0; i < NUM_REPEATS; i++) { + + // Shuffle + Collections.shuffle(Arrays.asList(values), rnd); estimator.clear(); - for (int j = 0; j < count; j++) { - estimator.insert(values[j]); + + // Insert + for (int value : values) { + estimator.insert(value); } Map snapshot; snapshot = estimator.snapshot(); + + // Check for (Quantile q : quantiles) { long actual = (long) (q.quantile * count); long error = (long) (q.error * count); long estimate = snapshot.get(q); - System.out - .println(String.format("Expected %d with error %d, estimated %d", - actual, error, estimate)); + assertThat(estimate <= actual + error).isTrue(); + assertThat(estimate >= actual - error).isTrue(); + } + } + } + + /** + * Correctness test that checks that absolute error of the estimate for inverse quantiles + * is within specified error bounds for some randomly permuted streams of items. + */ + @Test + public void testInverseQuantiles() throws IOException { + SampleQuantiles inverseQuantilesEstimator = + new SampleQuantiles(MutableInverseQuantiles.INVERSE_QUANTILES); + final int count = 100000; + Random rnd = new Random(0xDEADDEAD); + int[] values = new int[count]; + for (int i = 0; i < count; i++) { + values[i] = i + 1; + } + + // Repeat shuffle/insert/check cycles 10 times + for (int i = 0; i < NUM_REPEATS; i++) { + // Shuffle + Collections.shuffle(Arrays.asList(values), rnd); + inverseQuantilesEstimator.clear(); + + // Insert + for (int value : values) { + inverseQuantilesEstimator.insert(value); + } + Map snapshot; + snapshot = inverseQuantilesEstimator.snapshot(); + + // Check + for (Quantile q : MutableInverseQuantiles.INVERSE_QUANTILES) { + long actual = (long) (q.quantile * count); + long error = (long) (q.error * count); + long estimate = snapshot.get(q); assertThat(estimate <= actual + error).isTrue(); assertThat(estimate >= actual - error).isTrue(); } diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 9132e20210..8210322f8f 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -392,13 +392,34 @@ public static void assertQuantileGauges(String prefix, */ public static void assertQuantileGauges(String prefix, MetricsRecordBuilder rb, String valueName) { - verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l)); + verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L)); for (Quantile q : MutableQuantiles.quantiles) { String nameTemplate = prefix + "%dthPercentile" + valueName; int percentile = (int) (100 * q.quantile); verify(rb).addGauge( eqName(info(String.format(nameTemplate, percentile), "")), - geq(0l)); + geq(0L)); + } + } + + /** + * Asserts that the NumOps and inverse quantiles for a metric have been changed at + * some point to a non-zero value, for the specified value name of the + * metrics (e.g., "Rate"). + * + * @param prefix of the metric + * @param rb MetricsRecordBuilder with the metric + * @param valueName the value name for the metric + */ + public static void assertInverseQuantileGauges(String prefix, + MetricsRecordBuilder rb, String valueName) { + verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L)); + for (Quantile q : MutableQuantiles.quantiles) { + String nameTemplate = prefix + "%dthInversePercentile" + valueName; + int percentile = (int) (100 * q.quantile); + verify(rb).addGauge( + eqName(info(String.format(nameTemplate, percentile), "")), + geq(0L)); } } diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java index 675dbbff4c..c3aa3c3a45 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/main/java/org/apache/hadoop/hdfs/server/datanode/metrics/DataNodeMetrics.java @@ -258,7 +258,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals, "ramDiskBlocksLazyPersistWindows" + interval + "s", "Time between the RamDisk block write and disk persist in ms", "ops", "latency", interval); - readTransferRateQuantiles[i] = registry.newQuantiles( + readTransferRateQuantiles[i] = registry.newInverseQuantiles( "readTransferRate" + interval + "s", "Rate at which bytes are read from datanode calculated in bytes per second", "ops", "rate", interval); diff --git a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java index de5c985a4f..35f7924be1 100644 --- a/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java +++ b/hadoop-hdfs-project/hadoop-hdfs/src/test/java/org/apache/hadoop/hdfs/server/datanode/TestDataNodeMetrics.java @@ -19,6 +19,7 @@ import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges; import static org.apache.hadoop.test.MetricsAsserts.getLongCounter; import static org.apache.hadoop.test.MetricsAsserts.getMetrics; @@ -413,7 +414,7 @@ public Boolean get() { final long endWriteValue = getLongCounter("TotalWriteTime", rbNew); final long endReadValue = getLongCounter("TotalReadTime", rbNew); assertCounter("ReadTransferRateNumOps", 1L, rbNew); - assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate"); + assertInverseQuantileGauges("ReadTransferRate60s", rbNew, "Rate"); return endWriteValue > startWriteValue && endReadValue > startReadValue; }