HDFS-16949 Introduce inverse quantiles for metrics where higher numer… (#5495)
This commit is contained in:
parent
e45451f9c7
commit
3e2ae1da00
@ -227,6 +227,29 @@ public synchronized MutableQuantiles newQuantiles(String name, String desc,
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable inverse metric that estimates inverse quantiles of a stream of values
|
||||
* @param name of the metric
|
||||
* @param desc metric description
|
||||
* @param sampleName of the metric (e.g., "Ops")
|
||||
* @param valueName of the metric (e.g., "Rate")
|
||||
* @param interval rollover interval of estimator in seconds
|
||||
* @return a new inverse quantile estimator object
|
||||
* @throws MetricsException if interval is not a positive integer
|
||||
*/
|
||||
public synchronized MutableQuantiles newInverseQuantiles(String name, String desc,
|
||||
String sampleName, String valueName, int interval) {
|
||||
checkMetricName(name);
|
||||
if (interval <= 0) {
|
||||
throw new MetricsException("Interval should be positive. Value passed" +
|
||||
" is: " + interval);
|
||||
}
|
||||
MutableQuantiles ret =
|
||||
new MutableInverseQuantiles(name, desc, sampleName, valueName, interval);
|
||||
metricsMap.put(name, ret);
|
||||
return ret;
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable metric with stats
|
||||
* @param name of the metric
|
||||
@ -278,7 +301,7 @@ public MutableRate newRate(String name, String description) {
|
||||
}
|
||||
|
||||
/**
|
||||
* Create a mutable rate metric (for throughput measurement)
|
||||
* Create a mutable rate metric (for throughput measurement).
|
||||
* @param name of the metric
|
||||
* @param desc description
|
||||
* @param extended produce extended stat (stdev/min/max etc.) if true
|
||||
|
@ -0,0 +1,89 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
*
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
*
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.metrics2.lib;
|
||||
|
||||
import org.apache.hadoop.classification.InterfaceAudience;
|
||||
import org.apache.hadoop.classification.InterfaceStability;
|
||||
import org.apache.hadoop.classification.VisibleForTesting;
|
||||
import org.apache.hadoop.metrics2.util.Quantile;
|
||||
import org.apache.hadoop.metrics2.util.SampleQuantiles;
|
||||
import java.text.DecimalFormat;
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
|
||||
/**
|
||||
* Watches a stream of long values, maintaining online estimates of specific
|
||||
* quantiles with provably low error bounds. Inverse quantiles are meant for
|
||||
* highly accurate low-percentile (e.g. 1st, 5th) metrics.
|
||||
* InverseQuantiles are used for metrics where higher the value better it is.
|
||||
* ( eg: data transfer rate ).
|
||||
* The 1st percentile here corresponds to the 99th inverse percentile metric,
|
||||
* 5th percentile to 95th and so on.
|
||||
*/
|
||||
@InterfaceAudience.Public
|
||||
@InterfaceStability.Evolving
|
||||
public class MutableInverseQuantiles extends MutableQuantiles{
|
||||
|
||||
static class InversePercentile extends Quantile {
|
||||
InversePercentile(double inversePercentile) {
|
||||
super(inversePercentile/100, inversePercentile/1000);
|
||||
}
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public static final Quantile[] INVERSE_QUANTILES = {new InversePercentile(50),
|
||||
new InversePercentile(25), new InversePercentile(10),
|
||||
new InversePercentile(5), new InversePercentile(1)};
|
||||
|
||||
/**
|
||||
* Instantiates a new {@link MutableInverseQuantiles} for a metric that rolls itself
|
||||
* over on the specified time interval.
|
||||
*
|
||||
* @param name of the metric
|
||||
* @param description long-form textual description of the metric
|
||||
* @param sampleName type of items in the stream (e.g., "Ops")
|
||||
* @param valueName type of the values
|
||||
* @param intervalSecs rollover interval (in seconds) of the estimator
|
||||
*/
|
||||
public MutableInverseQuantiles(String name, String description, String sampleName,
|
||||
String valueName, int intervalSecs) {
|
||||
super(name, description, sampleName, valueName, intervalSecs);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets quantileInfo and estimator.
|
||||
*
|
||||
* @param ucName capitalized name of the metric
|
||||
* @param uvName capitalized type of the values
|
||||
* @param desc uncapitalized long-form textual description of the metric
|
||||
* @param lvName uncapitalized type of the values
|
||||
* @param df Number formatter for inverse percentile value
|
||||
*/
|
||||
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat df) {
|
||||
// Construct the MetricsInfos for inverse quantiles, converting to inverse percentiles
|
||||
setQuantileInfos(INVERSE_QUANTILES.length);
|
||||
for (int i = 0; i < INVERSE_QUANTILES.length; i++) {
|
||||
double inversePercentile = 100 * (1 - INVERSE_QUANTILES[i].quantile);
|
||||
String nameTemplate = ucName + df.format(inversePercentile) + "thInversePercentile" + uvName;
|
||||
String descTemplate = df.format(inversePercentile) + " inverse percentile " + lvName
|
||||
+ " with " + getInterval() + " second interval for " + desc;
|
||||
addQuantileInfo(i, info(nameTemplate, descTemplate));
|
||||
}
|
||||
|
||||
setEstimator(new SampleQuantiles(INVERSE_QUANTILES));
|
||||
}
|
||||
}
|
@ -20,6 +20,7 @@
|
||||
|
||||
import static org.apache.hadoop.metrics2.lib.Interns.info;
|
||||
|
||||
import java.text.DecimalFormat;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.ScheduledExecutorService;
|
||||
@ -52,9 +53,10 @@ public class MutableQuantiles extends MutableMetric {
|
||||
new Quantile(0.75, 0.025), new Quantile(0.90, 0.010),
|
||||
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
|
||||
|
||||
private final MetricsInfo numInfo;
|
||||
private final MetricsInfo[] quantileInfos;
|
||||
private final int interval;
|
||||
private MetricsInfo numInfo;
|
||||
private MetricsInfo[] quantileInfos;
|
||||
private int intervalSecs;
|
||||
private static DecimalFormat decimalFormat = new DecimalFormat("###.####");
|
||||
|
||||
private QuantileEstimator estimator;
|
||||
private long previousCount = 0;
|
||||
@ -91,26 +93,39 @@ public MutableQuantiles(String name, String description, String sampleName,
|
||||
String lsName = StringUtils.uncapitalize(sampleName);
|
||||
String lvName = StringUtils.uncapitalize(valueName);
|
||||
|
||||
numInfo = info(ucName + "Num" + usName, String.format(
|
||||
"Number of %s for %s with %ds interval", lsName, desc, interval));
|
||||
// Construct the MetricsInfos for the quantiles, converting to percentiles
|
||||
quantileInfos = new MetricsInfo[quantiles.length];
|
||||
String nameTemplate = ucName + "%dthPercentile" + uvName;
|
||||
String descTemplate = "%d percentile " + lvName + " with " + interval
|
||||
+ " second interval for " + desc;
|
||||
for (int i = 0; i < quantiles.length; i++) {
|
||||
int percentile = (int) (100 * quantiles[i].quantile);
|
||||
quantileInfos[i] = info(String.format(nameTemplate, percentile),
|
||||
String.format(descTemplate, percentile));
|
||||
}
|
||||
|
||||
estimator = new SampleQuantiles(quantiles);
|
||||
|
||||
this.interval = interval;
|
||||
setInterval(interval);
|
||||
setNumInfo(info(ucName + "Num" + usName, String.format(
|
||||
"Number of %s for %s with %ds interval", lsName, desc, interval)));
|
||||
scheduledTask = scheduler.scheduleWithFixedDelay(new RolloverSample(this),
|
||||
interval, interval, TimeUnit.SECONDS);
|
||||
setQuantiles(ucName, uvName, desc, lvName, decimalFormat);
|
||||
}
|
||||
|
||||
/**
|
||||
* Sets quantileInfo and estimator.
|
||||
*
|
||||
* @param ucName capitalized name of the metric
|
||||
* @param uvName capitalized type of the values
|
||||
* @param desc uncapitalized long-form textual description of the metric
|
||||
* @param lvName uncapitalized type of the values
|
||||
* @param pDecimalFormat Number formatter for percentile value
|
||||
*/
|
||||
void setQuantiles(String ucName, String uvName, String desc, String lvName, DecimalFormat pDecimalFormat) {
|
||||
// Construct the MetricsInfos for the quantiles, converting to percentiles
|
||||
setQuantileInfos(quantiles.length);
|
||||
for (int i = 0; i < quantiles.length; i++) {
|
||||
double percentile = 100 * quantiles[i].quantile;
|
||||
String nameTemplate = ucName + pDecimalFormat.format(percentile) + "thPercentile" + uvName;
|
||||
String descTemplate = pDecimalFormat.format(percentile) + " percentile " + lvName
|
||||
+ " with " + getInterval() + " second interval for " + desc;
|
||||
addQuantileInfo(i, info(nameTemplate, descTemplate));
|
||||
}
|
||||
|
||||
setEstimator(new SampleQuantiles(quantiles));
|
||||
}
|
||||
|
||||
public MutableQuantiles() {}
|
||||
|
||||
@Override
|
||||
public synchronized void snapshot(MetricsRecordBuilder builder, boolean all) {
|
||||
if (all || changed()) {
|
||||
@ -133,8 +148,50 @@ public synchronized void add(long value) {
|
||||
estimator.insert(value);
|
||||
}
|
||||
|
||||
public int getInterval() {
|
||||
return interval;
|
||||
/**
|
||||
* Set info about the metrics.
|
||||
*
|
||||
* @param pNumInfo info about the metrics.
|
||||
*/
|
||||
public synchronized void setNumInfo(MetricsInfo pNumInfo) {
|
||||
this.numInfo = pNumInfo;
|
||||
}
|
||||
|
||||
/**
|
||||
* Initialize quantileInfos array.
|
||||
*
|
||||
* @param length of the quantileInfos array.
|
||||
*/
|
||||
public synchronized void setQuantileInfos(int length) {
|
||||
this.quantileInfos = new MetricsInfo[length];
|
||||
}
|
||||
|
||||
/**
|
||||
* Add entry to quantileInfos array.
|
||||
*
|
||||
* @param i array index.
|
||||
* @param info info to be added to quantileInfos array.
|
||||
*/
|
||||
public synchronized void addQuantileInfo(int i, MetricsInfo info) {
|
||||
this.quantileInfos[i] = info;
|
||||
}
|
||||
|
||||
/**
|
||||
* Set the rollover interval (in seconds) of the estimator.
|
||||
*
|
||||
* @param pIntervalSecs of the estimator.
|
||||
*/
|
||||
public synchronized void setInterval(int pIntervalSecs) {
|
||||
this.intervalSecs = pIntervalSecs;
|
||||
}
|
||||
|
||||
/**
|
||||
* Get the rollover interval (in seconds) of the estimator.
|
||||
*
|
||||
* @return intervalSecs of the estimator.
|
||||
*/
|
||||
public synchronized int getInterval() {
|
||||
return intervalSecs;
|
||||
}
|
||||
|
||||
public void stop() {
|
||||
|
@ -24,6 +24,7 @@
|
||||
import java.util.Map;
|
||||
import java.util.Random;
|
||||
|
||||
import org.apache.hadoop.metrics2.lib.MutableInverseQuantiles;
|
||||
import org.junit.Before;
|
||||
import org.junit.Test;
|
||||
|
||||
@ -36,6 +37,7 @@ public class TestSampleQuantiles {
|
||||
new Quantile(0.95, 0.005), new Quantile(0.99, 0.001) };
|
||||
|
||||
SampleQuantiles estimator;
|
||||
final static int NUM_REPEATS = 10;
|
||||
|
||||
@Before
|
||||
public void init() {
|
||||
@ -91,28 +93,70 @@ public void testClear() throws IOException {
|
||||
@Test
|
||||
public void testQuantileError() throws IOException {
|
||||
final int count = 100000;
|
||||
Random r = new Random(0xDEADDEAD);
|
||||
Long[] values = new Long[count];
|
||||
Random rnd = new Random(0xDEADDEAD);
|
||||
int[] values = new int[count];
|
||||
for (int i = 0; i < count; i++) {
|
||||
values[i] = (long) (i + 1);
|
||||
values[i] = i + 1;
|
||||
}
|
||||
// Do 10 shuffle/insert/check cycles
|
||||
for (int i = 0; i < 10; i++) {
|
||||
System.out.println("Starting run " + i);
|
||||
Collections.shuffle(Arrays.asList(values), r);
|
||||
|
||||
// Repeat shuffle/insert/check cycles 10 times
|
||||
for (int i = 0; i < NUM_REPEATS; i++) {
|
||||
|
||||
// Shuffle
|
||||
Collections.shuffle(Arrays.asList(values), rnd);
|
||||
estimator.clear();
|
||||
for (int j = 0; j < count; j++) {
|
||||
estimator.insert(values[j]);
|
||||
|
||||
// Insert
|
||||
for (int value : values) {
|
||||
estimator.insert(value);
|
||||
}
|
||||
Map<Quantile, Long> snapshot;
|
||||
snapshot = estimator.snapshot();
|
||||
|
||||
// Check
|
||||
for (Quantile q : quantiles) {
|
||||
long actual = (long) (q.quantile * count);
|
||||
long error = (long) (q.error * count);
|
||||
long estimate = snapshot.get(q);
|
||||
System.out
|
||||
.println(String.format("Expected %d with error %d, estimated %d",
|
||||
actual, error, estimate));
|
||||
assertThat(estimate <= actual + error).isTrue();
|
||||
assertThat(estimate >= actual - error).isTrue();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Correctness test that checks that absolute error of the estimate for inverse quantiles
|
||||
* is within specified error bounds for some randomly permuted streams of items.
|
||||
*/
|
||||
@Test
|
||||
public void testInverseQuantiles() throws IOException {
|
||||
SampleQuantiles inverseQuantilesEstimator =
|
||||
new SampleQuantiles(MutableInverseQuantiles.INVERSE_QUANTILES);
|
||||
final int count = 100000;
|
||||
Random rnd = new Random(0xDEADDEAD);
|
||||
int[] values = new int[count];
|
||||
for (int i = 0; i < count; i++) {
|
||||
values[i] = i + 1;
|
||||
}
|
||||
|
||||
// Repeat shuffle/insert/check cycles 10 times
|
||||
for (int i = 0; i < NUM_REPEATS; i++) {
|
||||
// Shuffle
|
||||
Collections.shuffle(Arrays.asList(values), rnd);
|
||||
inverseQuantilesEstimator.clear();
|
||||
|
||||
// Insert
|
||||
for (int value : values) {
|
||||
inverseQuantilesEstimator.insert(value);
|
||||
}
|
||||
Map<Quantile, Long> snapshot;
|
||||
snapshot = inverseQuantilesEstimator.snapshot();
|
||||
|
||||
// Check
|
||||
for (Quantile q : MutableInverseQuantiles.INVERSE_QUANTILES) {
|
||||
long actual = (long) (q.quantile * count);
|
||||
long error = (long) (q.error * count);
|
||||
long estimate = snapshot.get(q);
|
||||
assertThat(estimate <= actual + error).isTrue();
|
||||
assertThat(estimate >= actual - error).isTrue();
|
||||
}
|
||||
|
@ -392,13 +392,34 @@ public static void assertQuantileGauges(String prefix,
|
||||
*/
|
||||
public static void assertQuantileGauges(String prefix,
|
||||
MetricsRecordBuilder rb, String valueName) {
|
||||
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0l));
|
||||
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
|
||||
for (Quantile q : MutableQuantiles.quantiles) {
|
||||
String nameTemplate = prefix + "%dthPercentile" + valueName;
|
||||
int percentile = (int) (100 * q.quantile);
|
||||
verify(rb).addGauge(
|
||||
eqName(info(String.format(nameTemplate, percentile), "")),
|
||||
geq(0l));
|
||||
geq(0L));
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Asserts that the NumOps and inverse quantiles for a metric have been changed at
|
||||
* some point to a non-zero value, for the specified value name of the
|
||||
* metrics (e.g., "Rate").
|
||||
*
|
||||
* @param prefix of the metric
|
||||
* @param rb MetricsRecordBuilder with the metric
|
||||
* @param valueName the value name for the metric
|
||||
*/
|
||||
public static void assertInverseQuantileGauges(String prefix,
|
||||
MetricsRecordBuilder rb, String valueName) {
|
||||
verify(rb).addGauge(eqName(info(prefix + "NumOps", "")), geq(0L));
|
||||
for (Quantile q : MutableQuantiles.quantiles) {
|
||||
String nameTemplate = prefix + "%dthInversePercentile" + valueName;
|
||||
int percentile = (int) (100 * q.quantile);
|
||||
verify(rb).addGauge(
|
||||
eqName(info(String.format(nameTemplate, percentile), "")),
|
||||
geq(0L));
|
||||
}
|
||||
}
|
||||
|
||||
|
@ -258,7 +258,7 @@ public DataNodeMetrics(String name, String sessionId, int[] intervals,
|
||||
"ramDiskBlocksLazyPersistWindows" + interval + "s",
|
||||
"Time between the RamDisk block write and disk persist in ms",
|
||||
"ops", "latency", interval);
|
||||
readTransferRateQuantiles[i] = registry.newQuantiles(
|
||||
readTransferRateQuantiles[i] = registry.newInverseQuantiles(
|
||||
"readTransferRate" + interval + "s",
|
||||
"Rate at which bytes are read from datanode calculated in bytes per second",
|
||||
"ops", "rate", interval);
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_HEARTBEAT_INTERVAL_KEY;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertInverseQuantileGauges;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertQuantileGauges;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
|
||||
import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
|
||||
@ -413,7 +414,7 @@ public Boolean get() {
|
||||
final long endWriteValue = getLongCounter("TotalWriteTime", rbNew);
|
||||
final long endReadValue = getLongCounter("TotalReadTime", rbNew);
|
||||
assertCounter("ReadTransferRateNumOps", 1L, rbNew);
|
||||
assertQuantileGauges("ReadTransferRate" + "60s", rbNew, "Rate");
|
||||
assertInverseQuantileGauges("ReadTransferRate60s", rbNew, "Rate");
|
||||
return endWriteValue > startWriteValue
|
||||
&& endReadValue > startReadValue;
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user