HADOOP-16290. Enable RpcMetrics units to be configurable (#3198)
Signed-off-by: Akira Ajisaka <aajisaka@apache.org>
This commit is contained in:
parent
17bf2fcbc5
commit
e1d00addb5
@ -382,6 +382,8 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic {
|
||||
public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY =
|
||||
"rpc.metrics.percentiles.intervals";
|
||||
|
||||
public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit";
|
||||
|
||||
/** Allowed hosts for nfs exports */
|
||||
public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";";
|
||||
public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts";
|
||||
|
@ -193,6 +193,7 @@ public class DecayRpcScheduler implements RpcScheduler,
|
||||
private final String namespace;
|
||||
private final int topUsersCount; // e.g., report top 10 users' metrics
|
||||
private static final double PRECISION = 0.0001;
|
||||
private final TimeUnit metricsTimeUnit;
|
||||
private MetricsProxy metricsProxy;
|
||||
private final CostProvider costProvider;
|
||||
private final Map<String, Integer> staticPriorities = new HashMap<>();
|
||||
@ -266,6 +267,8 @@ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) {
|
||||
DecayRpcSchedulerDetailedMetrics.create(ns);
|
||||
decayRpcSchedulerDetailedMetrics.init(numLevels);
|
||||
|
||||
metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf);
|
||||
|
||||
// Setup delay timer
|
||||
Timer timer = new Timer(true);
|
||||
DecayTask task = new DecayTask(this, timer);
|
||||
@ -725,8 +728,9 @@ public void addResponseTime(String callName, Schedulable schedulable,
|
||||
addCost(user, processingCost);
|
||||
|
||||
int priorityLevel = schedulable.getPriorityLevel();
|
||||
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||
long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
long queueTime = details.get(Timing.QUEUE, metricsTimeUnit);
|
||||
long processingTime = details.get(Timing.PROCESSING,
|
||||
metricsTimeUnit);
|
||||
|
||||
this.decayRpcSchedulerDetailedMetrics.addQueueTime(
|
||||
priorityLevel, queueTime);
|
||||
|
@ -62,10 +62,10 @@ default void addResponseTime(String callName, Schedulable schedulable,
|
||||
// this interface, a default implementation is supplied which uses the old
|
||||
// method. All new implementations MUST override this interface and should
|
||||
// NOT use the other addResponseTime method.
|
||||
int queueTime = (int)
|
||||
details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||
int processingTime = (int)
|
||||
details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE,
|
||||
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
|
||||
int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING,
|
||||
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
|
||||
addResponseTime(callName, schedulable.getPriorityLevel(),
|
||||
queueTime, processingTime);
|
||||
}
|
||||
|
@ -544,13 +544,13 @@ void logSlowRpcCalls(String methodName, Call call,
|
||||
(rpcMetrics.getProcessingStdDev() * deviation);
|
||||
|
||||
long processingTime =
|
||||
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
|
||||
if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) &&
|
||||
(processingTime > threeSigma)) {
|
||||
LOG.warn(
|
||||
"Slow RPC : {} took {} {} to process from client {},"
|
||||
+ " the processing detail is {}",
|
||||
methodName, processingTime, RpcMetrics.TIMEUNIT, call,
|
||||
methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call,
|
||||
details.toString());
|
||||
rpcMetrics.incrSlowRpc();
|
||||
}
|
||||
@ -570,7 +570,7 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
|
||||
deltaNanos -= details.get(Timing.RESPONSE);
|
||||
details.set(Timing.HANDLER, deltaNanos);
|
||||
|
||||
long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT);
|
||||
long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit());
|
||||
rpcMetrics.addRpcQueueTime(queueTime);
|
||||
|
||||
if (call.isResponseDeferred() || connDropped) {
|
||||
@ -579,9 +579,9 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
|
||||
}
|
||||
|
||||
long processingTime =
|
||||
details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT);
|
||||
details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit());
|
||||
long waitTime =
|
||||
details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT);
|
||||
details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit());
|
||||
rpcMetrics.addRpcLockWaitTime(waitTime);
|
||||
rpcMetrics.addRpcProcessingTime(processingTime);
|
||||
// don't include lock wait for detailed metrics.
|
||||
|
@ -19,6 +19,7 @@
|
||||
|
||||
import java.util.concurrent.TimeUnit;
|
||||
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting;
|
||||
import org.apache.hadoop.fs.CommonConfigurationKeys;
|
||||
import org.apache.hadoop.ipc.Server;
|
||||
@ -48,8 +49,11 @@ public class RpcMetrics {
|
||||
final MetricsRegistry registry;
|
||||
final String name;
|
||||
final boolean rpcQuantileEnable;
|
||||
|
||||
public static final TimeUnit DEFAULT_METRIC_TIME_UNIT =
|
||||
TimeUnit.MILLISECONDS;
|
||||
/** The time unit used when storing/accessing time durations. */
|
||||
public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS;
|
||||
private final TimeUnit metricsTimeUnit;
|
||||
|
||||
RpcMetrics(Server server, Configuration conf) {
|
||||
String port = String.valueOf(server.getListenerAddress().getPort());
|
||||
@ -63,6 +67,7 @@ public class RpcMetrics {
|
||||
rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean(
|
||||
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE,
|
||||
CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT);
|
||||
metricsTimeUnit = getMetricsTimeUnit(conf);
|
||||
if (rpcQuantileEnable) {
|
||||
rpcQueueTimeQuantiles =
|
||||
new MutableQuantiles[intervals.length];
|
||||
@ -75,19 +80,19 @@ public class RpcMetrics {
|
||||
for (int i = 0; i < intervals.length; i++) {
|
||||
int interval = intervals[i];
|
||||
rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime"
|
||||
+ interval + "s", "rpc queue time in " + TIMEUNIT, "ops",
|
||||
+ interval + "s", "rpc queue time in " + metricsTimeUnit, "ops",
|
||||
"latency", interval);
|
||||
rpcLockWaitTimeQuantiles[i] = registry.newQuantiles(
|
||||
"rpcLockWaitTime" + interval + "s",
|
||||
"rpc lock wait time in " + TIMEUNIT, "ops",
|
||||
"rpc lock wait time in " + metricsTimeUnit, "ops",
|
||||
"latency", interval);
|
||||
rpcProcessingTimeQuantiles[i] = registry.newQuantiles(
|
||||
"rpcProcessingTime" + interval + "s",
|
||||
"rpc processing time in " + TIMEUNIT, "ops",
|
||||
"rpc processing time in " + metricsTimeUnit, "ops",
|
||||
"latency", interval);
|
||||
deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles(
|
||||
"deferredRpcProcessingTime" + interval + "s",
|
||||
"deferred rpc processing time in " + TIMEUNIT, "ops",
|
||||
"deferred rpc processing time in " + metricsTimeUnit, "ops",
|
||||
"latency", interval);
|
||||
}
|
||||
}
|
||||
@ -141,6 +146,27 @@ public String numOpenConnectionsPerUser() {
|
||||
return server.getNumDroppedConnections();
|
||||
}
|
||||
|
||||
public TimeUnit getMetricsTimeUnit() {
|
||||
return metricsTimeUnit;
|
||||
}
|
||||
|
||||
public static TimeUnit getMetricsTimeUnit(Configuration conf) {
|
||||
TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT;
|
||||
String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT);
|
||||
if (StringUtils.isNotEmpty(timeunit)) {
|
||||
try {
|
||||
metricsTimeUnit = TimeUnit.valueOf(timeunit);
|
||||
} catch (IllegalArgumentException e) {
|
||||
LOG.info("Config key {} 's value {} does not correspond to enum values"
|
||||
+ " of java.util.concurrent.TimeUnit. Hence default unit"
|
||||
+ " {} will be used",
|
||||
CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit,
|
||||
RpcMetrics.DEFAULT_METRIC_TIME_UNIT);
|
||||
}
|
||||
}
|
||||
return metricsTimeUnit;
|
||||
}
|
||||
|
||||
// Public instrumentation methods that could be extracted to an
|
||||
// abstract class if we decide to do custom instrumentation classes a la
|
||||
// JobTrackerInstrumentation. The methods with //@Override comment are
|
||||
|
@ -3345,6 +3345,21 @@
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>rpc.metrics.timeunit</name>
|
||||
<value>MILLISECONDS</value>
|
||||
<description>
|
||||
This property is used to configure timeunit for various RPC Metrics
|
||||
e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime,
|
||||
deferredRpcProcessingTime. In the absence of this property,
|
||||
default timeunit used is milliseconds.
|
||||
The value of this property should match to any one value of enum:
|
||||
java.util.concurrent.TimeUnit.
|
||||
Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS,
|
||||
SECONDS etc.
|
||||
</description>
|
||||
</property>
|
||||
|
||||
<property>
|
||||
<name>rpc.metrics.percentiles.intervals</name>
|
||||
<value></value>
|
||||
|
@ -65,6 +65,8 @@ rpc
|
||||
---
|
||||
|
||||
Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics.
|
||||
`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics.
|
||||
The default timeunit used for RPC metrics is milliseconds (as per the below description).
|
||||
|
||||
| Name | Description |
|
||||
|:---- |:---- |
|
||||
|
@ -29,7 +29,6 @@
|
||||
import org.apache.hadoop.ipc.Client.ConnectionId;
|
||||
import org.apache.hadoop.ipc.Server.Call;
|
||||
import org.apache.hadoop.ipc.Server.Connection;
|
||||
import org.apache.hadoop.ipc.metrics.RpcMetrics;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto;
|
||||
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
|
||||
import org.apache.hadoop.ipc.protobuf.TestProtos;
|
||||
@ -1098,8 +1097,8 @@ public TestRpcService run() {
|
||||
proxy.lockAndSleep(null, newSleepRequest(5));
|
||||
rpcMetrics = getMetrics(server.getRpcMetrics().name());
|
||||
assertGauge("RpcLockWaitTimeAvgTime",
|
||||
(double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)),
|
||||
rpcMetrics);
|
||||
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
|
||||
TimeUnit.SECONDS)), rpcMetrics);
|
||||
} finally {
|
||||
if (proxy2 != null) {
|
||||
RPC.stopProxy(proxy2);
|
||||
@ -1603,6 +1602,70 @@ public void testSetProtocolEngine() {
|
||||
assertTrue(rpcEngine instanceof StoppedRpcEngine);
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testRpcMetricsInNanos() throws Exception {
|
||||
final Server server;
|
||||
TestRpcService proxy = null;
|
||||
|
||||
final int interval = 1;
|
||||
conf.setBoolean(CommonConfigurationKeys.
|
||||
RPC_METRICS_QUANTILE_ENABLE, true);
|
||||
conf.set(CommonConfigurationKeys.
|
||||
RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval);
|
||||
conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS");
|
||||
|
||||
server = setupTestServer(conf, 5);
|
||||
String testUser = "testUserInNanos";
|
||||
UserGroupInformation anotherUser =
|
||||
UserGroupInformation.createRemoteUser(testUser);
|
||||
TestRpcService proxy2 =
|
||||
anotherUser.doAs((PrivilegedAction<TestRpcService>) () -> {
|
||||
try {
|
||||
return RPC.getProxy(TestRpcService.class, 0,
|
||||
server.getListenerAddress(), conf);
|
||||
} catch (IOException e) {
|
||||
LOG.error("Something went wrong.", e);
|
||||
}
|
||||
return null;
|
||||
});
|
||||
try {
|
||||
proxy = getClient(addr, conf);
|
||||
for (int i = 0; i < 100; i++) {
|
||||
proxy.ping(null, newEmptyRequest());
|
||||
proxy.echo(null, newEchoRequest("" + i));
|
||||
proxy2.echo(null, newEchoRequest("" + i));
|
||||
}
|
||||
MetricsRecordBuilder rpcMetrics =
|
||||
getMetrics(server.getRpcMetrics().name());
|
||||
assertEquals("Expected zero rpc lock wait time",
|
||||
0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001);
|
||||
MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s",
|
||||
rpcMetrics);
|
||||
MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s",
|
||||
rpcMetrics);
|
||||
|
||||
proxy.lockAndSleep(null, newSleepRequest(5));
|
||||
rpcMetrics = getMetrics(server.getRpcMetrics().name());
|
||||
assertGauge("RpcLockWaitTimeAvgTime",
|
||||
(double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L,
|
||||
TimeUnit.SECONDS)), rpcMetrics);
|
||||
LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}",
|
||||
getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics),
|
||||
getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics));
|
||||
|
||||
assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics)
|
||||
> 4000000D);
|
||||
assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)
|
||||
> 4000D);
|
||||
} finally {
|
||||
if (proxy2 != null) {
|
||||
RPC.stopProxy(proxy2);
|
||||
}
|
||||
stop(server, proxy);
|
||||
}
|
||||
}
|
||||
|
||||
|
||||
public static void main(String[] args) throws Exception {
|
||||
new TestRPC().testCallsInternal(conf);
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user