HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time. Contributed by Liang Xie.

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1556983 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Andrew Wang 2014-01-09 23:41:41 +00:00
parent 8b2c8a657e
commit ed3f1454ec
5 changed files with 89 additions and 6 deletions

View File

@ -415,6 +415,9 @@ Release 2.4.0 - UNRELEASED
HADOOP-10208. Remove duplicate initialization in StringUtils.getStringCollection. HADOOP-10208. Remove duplicate initialization in StringUtils.getStringCollection.
(Benoy Antony via jing9) (Benoy Antony via jing9)
HADOOP-9420. Add percentile or max metric for rpcQueueTime, processing time.
(Liang Xie via wang)
OPTIMIZATIONS OPTIMIZATIONS
HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn) HADOOP-9748. Reduce blocking on UGI.ensureInitialized (daryn)

View File

@ -242,4 +242,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS = public static final String HADOOP_USER_GROUP_METRICS_PERCENTILES_INTERVALS =
"hadoop.user.group.metrics.percentiles.intervals"; "hadoop.user.group.metrics.percentiles.intervals";
public static final String RPC_METRICS_QUANTILE_ENABLE =
"rpc.metrics.quantile.enable";
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
"rpc.metrics.percentiles.intervals";
} }

View File

@ -2193,7 +2193,7 @@ protected Server(String bindAddress, int port,
listener = new Listener(); listener = new Listener();
this.port = listener.getAddress().getPort(); this.port = listener.getAddress().getPort();
connectionManager = new ConnectionManager(); connectionManager = new ConnectionManager();
this.rpcMetrics = RpcMetrics.create(this); this.rpcMetrics = RpcMetrics.create(this, conf);
this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port); this.rpcDetailedMetrics = RpcDetailedMetrics.create(this.port);
this.tcpNoDelay = conf.getBoolean( this.tcpNoDelay = conf.getBoolean(
CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY, CommonConfigurationKeysPublic.IPC_SERVER_TCPNODELAY_KEY,

View File

@ -19,14 +19,17 @@
import org.apache.commons.logging.Log; import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory; import org.apache.commons.logging.LogFactory;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.ipc.Server; import org.apache.hadoop.ipc.Server;
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.metrics2.annotation.Metric; import org.apache.hadoop.metrics2.annotation.Metric;
import org.apache.hadoop.metrics2.annotation.Metrics; import org.apache.hadoop.metrics2.annotation.Metrics;
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
import org.apache.hadoop.metrics2.lib.MetricsRegistry; import org.apache.hadoop.metrics2.lib.MetricsRegistry;
import org.apache.hadoop.metrics2.lib.MutableCounterInt; import org.apache.hadoop.metrics2.lib.MutableCounterInt;
import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableQuantiles;
import org.apache.hadoop.metrics2.lib.MutableRate; import org.apache.hadoop.metrics2.lib.MutableRate;
/** /**
@ -41,26 +44,48 @@ public class RpcMetrics {
final Server server; final Server server;
final MetricsRegistry registry; final MetricsRegistry registry;
final String name; final String name;
final boolean rpcQuantileEnable;
RpcMetrics(Server server) { RpcMetrics(Server server, Configuration conf) {
String port = String.valueOf(server.getListenerAddress().getPort()); String port = String.valueOf(server.getListenerAddress().getPort());
name = "RpcActivityForPort"+ port; name = "RpcActivityForPort" + port;
this.server = server; this.server = server;
registry = new MetricsRegistry("rpc").tag("port", "RPC port", port); registry = new MetricsRegistry("rpc").tag("port", "RPC port", port);
LOG.debug("Initialized "+ registry); int[] intervals = conf.getInts(
CommonConfigurationKeys.RPC_METRICS_PERCENTILES_INTERVALS_KEY);
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, false);
if (rpcQuantileEnable) {
rpcQueueTimeMillisQuantiles =
new MutableQuantiles[intervals.length];
rpcProcessingTimeMillisQuantiles =
new MutableQuantiles[intervals.length];
for (int i = 0; i < intervals.length; i++) {
int interval = intervals[i];
rpcQueueTimeMillisQuantiles[i] = registry.newQuantiles("rpcQueueTime"
+ interval + "s", "rpc queue time in milli second", "ops",
"latency", interval);
rpcProcessingTimeMillisQuantiles[i] = registry.newQuantiles(
"rpcProcessingTime" + interval + "s",
"rpc processing time in milli second", "ops", "latency", interval);
}
}
LOG.debug("Initialized " + registry);
} }
public String name() { return name; } public String name() { return name; }
public static RpcMetrics create(Server server) { public static RpcMetrics create(Server server, Configuration conf) {
RpcMetrics m = new RpcMetrics(server); RpcMetrics m = new RpcMetrics(server, conf);
return DefaultMetricsSystem.instance().register(m.name, null, m); return DefaultMetricsSystem.instance().register(m.name, null, m);
} }
@Metric("Number of received bytes") MutableCounterLong receivedBytes; @Metric("Number of received bytes") MutableCounterLong receivedBytes;
@Metric("Number of sent bytes") MutableCounterLong sentBytes; @Metric("Number of sent bytes") MutableCounterLong sentBytes;
@Metric("Queue time") MutableRate rpcQueueTime; @Metric("Queue time") MutableRate rpcQueueTime;
MutableQuantiles[] rpcQueueTimeMillisQuantiles;
@Metric("Processsing time") MutableRate rpcProcessingTime; @Metric("Processsing time") MutableRate rpcProcessingTime;
MutableQuantiles[] rpcProcessingTimeMillisQuantiles;
@Metric("Number of authentication failures") @Metric("Number of authentication failures")
MutableCounterInt rpcAuthenticationFailures; MutableCounterInt rpcAuthenticationFailures;
@Metric("Number of authentication successes") @Metric("Number of authentication successes")
@ -146,6 +171,11 @@ public void incrReceivedBytes(int count) {
//@Override //@Override
public void addRpcQueueTime(int qTime) { public void addRpcQueueTime(int qTime) {
rpcQueueTime.add(qTime); rpcQueueTime.add(qTime);
if (rpcQuantileEnable) {
for (MutableQuantiles q : rpcQueueTimeMillisQuantiles) {
q.add(qTime);
}
}
} }
/** /**
@ -155,5 +185,10 @@ public void addRpcQueueTime(int qTime) {
//@Override //@Override
public void addRpcProcessingTime(int processingTime) { public void addRpcProcessingTime(int processingTime) {
rpcProcessingTime.add(processingTime); rpcProcessingTime.add(processingTime);
if (rpcQuantileEnable) {
for (MutableQuantiles q : rpcProcessingTimeMillisQuantiles) {
q.add(processingTime);
}
}
} }
} }

View File

@ -20,6 +20,7 @@
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
import static org.apache.hadoop.test.MetricsAsserts.getLongCounter;
import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.apache.hadoop.test.MetricsAsserts.getMetrics;
import static org.junit.Assert.assertEquals; import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotSame; import static org.junit.Assert.assertNotSame;
@ -67,6 +68,7 @@
import org.apache.hadoop.security.authorize.Service; import org.apache.hadoop.security.authorize.Service;
import org.apache.hadoop.security.token.SecretManager; import org.apache.hadoop.security.token.SecretManager;
import org.apache.hadoop.security.token.TokenIdentifier; import org.apache.hadoop.security.token.TokenIdentifier;
import org.apache.hadoop.test.MetricsAsserts;
import org.apache.hadoop.test.MockitoUtil; import org.apache.hadoop.test.MockitoUtil;
import org.junit.Before; import org.junit.Before;
import org.junit.Test; import org.junit.Test;
@ -961,6 +963,44 @@ public void testConnectionPing() throws Exception {
} }
} }
@Test
public void testRpcMetrics() throws Exception {
Configuration configuration = new Configuration();
final int interval = 1;
configuration.setBoolean(CommonConfigurationKeys.
RPC_METRICS_QUANTILE_ENABLE, true);
configuration.set(CommonConfigurationKeys.
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
final Server server = new RPC.Builder(configuration)
.setProtocol(TestProtocol.class).setInstance(new TestImpl())
.setBindAddress(ADDRESS).setPort(0).setNumHandlers(5).setVerbose(true)
.build();
server.start();
final TestProtocol proxy = RPC.getProxy(TestProtocol.class,
TestProtocol.versionID, server.getListenerAddress(), configuration);
try {
for (int i=0; i<1000; i++) {
proxy.ping();
proxy.echo("" + i);
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertTrue("Expected non-zero rpc queue time",
getLongCounter("RpcQueueTimeNumOps", rpcMetrics) > 0);
assertTrue("Expected non-zero rpc processing time",
getLongCounter("RpcProcessingTimeNumOps", rpcMetrics) > 0);
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
rpcMetrics);
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
rpcMetrics);
} finally {
if (proxy != null) {
RPC.stopProxy(proxy);
}
server.stop();
}
}
public static void main(String[] args) throws IOException { public static void main(String[] args) throws IOException {
new TestRPC().testCallsInternal(conf); new TestRPC().testCallsInternal(conf);