diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java index b5dc29cea9..1ea44df503 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/fs/CommonConfigurationKeys.java @@ -381,7 +381,9 @@ public class CommonConfigurationKeys extends CommonConfigurationKeysPublic { public static final boolean RPC_METRICS_QUANTILE_ENABLE_DEFAULT = false; public static final String RPC_METRICS_PERCENTILES_INTERVALS_KEY = "rpc.metrics.percentiles.intervals"; - + + public static final String RPC_METRICS_TIME_UNIT = "rpc.metrics.timeunit"; + /** Allowed hosts for nfs exports */ public static final String NFS_EXPORTS_ALLOWED_HOSTS_SEPARATOR = ";"; public static final String NFS_EXPORTS_ALLOWED_HOSTS_KEY = "nfs.exports.allowed.hosts"; diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index e69064d8fb..7f6f0c4723 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -193,6 +193,7 @@ public class DecayRpcScheduler implements RpcScheduler, private final String namespace; private final int topUsersCount; // e.g., report top 10 users' metrics private static final double PRECISION = 0.0001; + private final TimeUnit metricsTimeUnit; private MetricsProxy metricsProxy; private final CostProvider costProvider; private final Map staticPriorities = new HashMap<>(); @@ -266,6 +267,8 @@ public DecayRpcScheduler(int numLevels, String ns, Configuration conf) { DecayRpcSchedulerDetailedMetrics.create(ns); decayRpcSchedulerDetailedMetrics.init(numLevels); + metricsTimeUnit = RpcMetrics.getMetricsTimeUnit(conf); + // Setup delay timer Timer timer = new Timer(true); DecayTask task = new DecayTask(this, timer); @@ -725,8 +728,9 @@ public void addResponseTime(String callName, Schedulable schedulable, addCost(user, processingCost); int priorityLevel = schedulable.getPriorityLevel(); - long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT); - long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT); + long queueTime = details.get(Timing.QUEUE, metricsTimeUnit); + long processingTime = details.get(Timing.PROCESSING, + metricsTimeUnit); this.decayRpcSchedulerDetailedMetrics.addQueueTime( priorityLevel, queueTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java index 5202c6b356..8c423b8e5e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java @@ -62,10 +62,10 @@ default void addResponseTime(String callName, Schedulable schedulable, // this interface, a default implementation is supplied which uses the old // method. All new implementations MUST override this interface and should // NOT use the other addResponseTime method. - int queueTime = (int) - details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT); - int processingTime = (int) - details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT); + int queueTime = (int) details.get(ProcessingDetails.Timing.QUEUE, + RpcMetrics.DEFAULT_METRIC_TIME_UNIT); + int processingTime = (int) details.get(ProcessingDetails.Timing.PROCESSING, + RpcMetrics.DEFAULT_METRIC_TIME_UNIT); addResponseTime(callName, schedulable.getPriorityLevel(), queueTime, processingTime); } diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index 243b3c776f..8acdc0a99b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -544,13 +544,13 @@ void logSlowRpcCalls(String methodName, Call call, (rpcMetrics.getProcessingStdDev() * deviation); long processingTime = - details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT); + details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit()); if ((rpcMetrics.getProcessingSampleCount() > minSampleSize) && (processingTime > threeSigma)) { LOG.warn( "Slow RPC : {} took {} {} to process from client {}," + " the processing detail is {}", - methodName, processingTime, RpcMetrics.TIMEUNIT, call, + methodName, processingTime, rpcMetrics.getMetricsTimeUnit(), call, details.toString()); rpcMetrics.incrSlowRpc(); } @@ -570,7 +570,7 @@ void updateMetrics(Call call, long startTime, boolean connDropped) { deltaNanos -= details.get(Timing.RESPONSE); details.set(Timing.HANDLER, deltaNanos); - long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT); + long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit()); rpcMetrics.addRpcQueueTime(queueTime); if (call.isResponseDeferred() || connDropped) { @@ -579,9 +579,9 @@ void updateMetrics(Call call, long startTime, boolean connDropped) { } long processingTime = - details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT); + details.get(Timing.PROCESSING, rpcMetrics.getMetricsTimeUnit()); long waitTime = - details.get(Timing.LOCKWAIT, RpcMetrics.TIMEUNIT); + details.get(Timing.LOCKWAIT, rpcMetrics.getMetricsTimeUnit()); rpcMetrics.addRpcLockWaitTime(waitTime); rpcMetrics.addRpcProcessingTime(processingTime); // don't include lock wait for detailed metrics. diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index 439b87326c..a0351ee357 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -19,6 +19,7 @@ import java.util.concurrent.TimeUnit; +import org.apache.commons.lang3.StringUtils; import org.apache.hadoop.thirdparty.com.google.common.annotations.VisibleForTesting; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.Server; @@ -48,9 +49,12 @@ public class RpcMetrics { final MetricsRegistry registry; final String name; final boolean rpcQuantileEnable; + + public static final TimeUnit DEFAULT_METRIC_TIME_UNIT = + TimeUnit.MILLISECONDS; /** The time unit used when storing/accessing time durations. */ - public final static TimeUnit TIMEUNIT = TimeUnit.MILLISECONDS; - + private final TimeUnit metricsTimeUnit; + RpcMetrics(Server server, Configuration conf) { String port = String.valueOf(server.getListenerAddress().getPort()); name = "RpcActivityForPort" + port; @@ -63,6 +67,7 @@ public class RpcMetrics { rpcQuantileEnable = (intervals.length > 0) && conf.getBoolean( CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE, CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT); + metricsTimeUnit = getMetricsTimeUnit(conf); if (rpcQuantileEnable) { rpcQueueTimeQuantiles = new MutableQuantiles[intervals.length]; @@ -75,19 +80,19 @@ public class RpcMetrics { for (int i = 0; i < intervals.length; i++) { int interval = intervals[i]; rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime" - + interval + "s", "rpc queue time in " + TIMEUNIT, "ops", + + interval + "s", "rpc queue time in " + metricsTimeUnit, "ops", "latency", interval); rpcLockWaitTimeQuantiles[i] = registry.newQuantiles( "rpcLockWaitTime" + interval + "s", - "rpc lock wait time in " + TIMEUNIT, "ops", + "rpc lock wait time in " + metricsTimeUnit, "ops", "latency", interval); rpcProcessingTimeQuantiles[i] = registry.newQuantiles( "rpcProcessingTime" + interval + "s", - "rpc processing time in " + TIMEUNIT, "ops", + "rpc processing time in " + metricsTimeUnit, "ops", "latency", interval); deferredRpcProcessingTimeQuantiles[i] = registry.newQuantiles( "deferredRpcProcessingTime" + interval + "s", - "deferred rpc processing time in " + TIMEUNIT, "ops", + "deferred rpc processing time in " + metricsTimeUnit, "ops", "latency", interval); } } @@ -141,6 +146,27 @@ public String numOpenConnectionsPerUser() { return server.getNumDroppedConnections(); } + public TimeUnit getMetricsTimeUnit() { + return metricsTimeUnit; + } + + public static TimeUnit getMetricsTimeUnit(Configuration conf) { + TimeUnit metricsTimeUnit = RpcMetrics.DEFAULT_METRIC_TIME_UNIT; + String timeunit = conf.get(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT); + if (StringUtils.isNotEmpty(timeunit)) { + try { + metricsTimeUnit = TimeUnit.valueOf(timeunit); + } catch (IllegalArgumentException e) { + LOG.info("Config key {} 's value {} does not correspond to enum values" + + " of java.util.concurrent.TimeUnit. Hence default unit" + + " {} will be used", + CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, timeunit, + RpcMetrics.DEFAULT_METRIC_TIME_UNIT); + } + } + return metricsTimeUnit; + } + // Public instrumentation methods that could be extracted to an // abstract class if we decide to do custom instrumentation classes a la // JobTrackerInstrumentation. The methods with //@Override comment are diff --git a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml index 4fa8429cb1..c01c2f8f41 100644 --- a/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml +++ b/hadoop-common-project/hadoop-common/src/main/resources/core-default.xml @@ -3345,6 +3345,21 @@ + + rpc.metrics.timeunit + MILLISECONDS + + This property is used to configure timeunit for various RPC Metrics + e.g rpcQueueTime, rpcLockWaitTime, rpcProcessingTime, + deferredRpcProcessingTime. In the absence of this property, + default timeunit used is milliseconds. + The value of this property should match to any one value of enum: + java.util.concurrent.TimeUnit. + Some of the valid values: NANOSECONDS, MICROSECONDS, MILLISECONDS, + SECONDS etc. + + + rpc.metrics.percentiles.intervals diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 6cec030ee2..9e747631ca 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -65,6 +65,8 @@ rpc --- Each metrics record contains tags such as Hostname and port (number to which server is bound) as additional information along with metrics. +`rpc.metrics.timeunit` config can be used to configure timeunit for RPC metrics. +The default timeunit used for RPC metrics is milliseconds (as per the below description). | Name | Description | |:---- |:---- | diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 9fbb865c6e..b78900b609 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -29,7 +29,6 @@ import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; -import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.TestProtos; @@ -1098,8 +1097,8 @@ public TestRpcService run() { proxy.lockAndSleep(null, newSleepRequest(5)); rpcMetrics = getMetrics(server.getRpcMetrics().name()); assertGauge("RpcLockWaitTimeAvgTime", - (double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)), - rpcMetrics); + (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L, + TimeUnit.SECONDS)), rpcMetrics); } finally { if (proxy2 != null) { RPC.stopProxy(proxy2); @@ -1603,6 +1602,70 @@ public void testSetProtocolEngine() { assertTrue(rpcEngine instanceof StoppedRpcEngine); } + @Test + public void testRpcMetricsInNanos() throws Exception { + final Server server; + TestRpcService proxy = null; + + final int interval = 1; + conf.setBoolean(CommonConfigurationKeys. + RPC_METRICS_QUANTILE_ENABLE, true); + conf.set(CommonConfigurationKeys. + RPC_METRICS_PERCENTILES_INTERVALS_KEY, "" + interval); + conf.set(CommonConfigurationKeys.RPC_METRICS_TIME_UNIT, "NANOSECONDS"); + + server = setupTestServer(conf, 5); + String testUser = "testUserInNanos"; + UserGroupInformation anotherUser = + UserGroupInformation.createRemoteUser(testUser); + TestRpcService proxy2 = + anotherUser.doAs((PrivilegedAction) () -> { + try { + return RPC.getProxy(TestRpcService.class, 0, + server.getListenerAddress(), conf); + } catch (IOException e) { + LOG.error("Something went wrong.", e); + } + return null; + }); + try { + proxy = getClient(addr, conf); + for (int i = 0; i < 100; i++) { + proxy.ping(null, newEmptyRequest()); + proxy.echo(null, newEchoRequest("" + i)); + proxy2.echo(null, newEchoRequest("" + i)); + } + MetricsRecordBuilder rpcMetrics = + getMetrics(server.getRpcMetrics().name()); + assertEquals("Expected zero rpc lock wait time", + 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001); + MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s", + rpcMetrics); + MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", + rpcMetrics); + + proxy.lockAndSleep(null, newSleepRequest(5)); + rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertGauge("RpcLockWaitTimeAvgTime", + (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L, + TimeUnit.SECONDS)), rpcMetrics); + LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}", + getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics), + getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)); + + assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics) + > 4000000D); + assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics) + > 4000D); + } finally { + if (proxy2 != null) { + RPC.stopProxy(proxy2); + } + stop(server, proxy); + } + } + + public static void main(String[] args) throws Exception { new TestRPC().testCallsInternal(conf); }