diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index b46a78553e..a594d2be01 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -616,6 +616,9 @@ void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped deltaNanos -= details.get(Timing.RESPONSE); details.set(Timing.HANDLER, deltaNanos); + long enQueueTime = details.get(Timing.ENQUEUE, rpcMetrics.getMetricsTimeUnit()); + rpcMetrics.addRpcEnQueueTime(enQueueTime); + long queueTime = details.get(Timing.QUEUE, rpcMetrics.getMetricsTimeUnit()); rpcMetrics.addRpcQueueTime(queueTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index ad7aa88d60..5e0ea6228c 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -69,6 +69,8 @@ public class RpcMetrics { CommonConfigurationKeys.RPC_METRICS_QUANTILE_ENABLE_DEFAULT); metricsTimeUnit = getMetricsTimeUnit(conf); if (rpcQuantileEnable) { + rpcEnQueueTimeQuantiles = + new MutableQuantiles[intervals.length]; rpcQueueTimeQuantiles = new MutableQuantiles[intervals.length]; rpcLockWaitTimeQuantiles = @@ -81,6 +83,9 @@ public class RpcMetrics { new MutableQuantiles[intervals.length]; for (int i = 0; i < intervals.length; i++) { int interval = intervals[i]; + rpcEnQueueTimeQuantiles[i] = registry.newQuantiles("rpcEnQueueTime" + + interval + "s", "rpc enqueue time in " + metricsTimeUnit, "ops", + "latency", interval); rpcQueueTimeQuantiles[i] = registry.newQuantiles("rpcQueueTime" + interval + "s", "rpc queue time in " + metricsTimeUnit, "ops", "latency", interval); @@ -114,6 +119,8 @@ public static RpcMetrics create(Server server, Configuration conf) { @Metric("Number of received bytes") MutableCounterLong receivedBytes; @Metric("Number of sent bytes") MutableCounterLong sentBytes; + @Metric("EQueue time") MutableRate rpcEnQueueTime; + MutableQuantiles[] rpcEnQueueTimeQuantiles; @Metric("Queue time") MutableRate rpcQueueTime; MutableQuantiles[] rpcQueueTimeQuantiles; @Metric("Lock wait time") MutableRate rpcLockWaitTime; @@ -257,6 +264,23 @@ public void incrReceivedBytes(int count) { receivedBytes.incr(count); } + /** + * Sometimes, the request time observed by the client is much longer than + * the queue + process time on the RPC server.Perhaps the RPC request + * 'waiting enQueue' took too long on the RPC server, so we should add + * enQueue time to RpcMetrics. See HADOOP-18840 for details. + * Add an RPC enqueue time sample + * @param enQTime the queue time + */ + public void addRpcEnQueueTime(long enQTime) { + rpcEnQueueTime.add(enQTime); + if (rpcQuantileEnable) { + for (MutableQuantiles q : rpcEnQueueTimeQuantiles) { + q.add(enQTime); + } + } + } + /** * Add an RPC queue time sample * @param qTime the queue time diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 1373a8a40e..88d9204f69 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -1334,6 +1334,8 @@ public TestRpcService run() { } MetricsRecordBuilder rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertEquals("Expected correct rpc en queue count", + 3000, getLongCounter("RpcEnQueueTimeNumOps", rpcMetrics)); assertEquals("Expected correct rpc queue count", 3000, getLongCounter("RpcQueueTimeNumOps", rpcMetrics)); assertEquals("Expected correct rpc processing count", @@ -1344,6 +1346,8 @@ public TestRpcService run() { 3000, getLongCounter("RpcResponseTimeNumOps", rpcMetrics)); assertEquals("Expected zero rpc lock wait time", 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001); + MetricsAsserts.assertQuantileGauges("RpcEnQueueTime" + interval + "s", + rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s", rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", @@ -2007,6 +2011,8 @@ public void testRpcMetricsInNanos() throws Exception { getMetrics(server.getRpcMetrics().name()); assertEquals("Expected zero rpc lock wait time", 0, getDoubleGauge("RpcLockWaitTimeAvgTime", rpcMetrics), 0.001); + MetricsAsserts.assertQuantileGauges("RpcEnQueueTime" + interval + "s", + rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcQueueTime" + interval + "s", rpcMetrics); MetricsAsserts.assertQuantileGauges("RpcProcessingTime" + interval + "s", @@ -2017,12 +2023,15 @@ public void testRpcMetricsInNanos() throws Exception { assertGauge("RpcLockWaitTimeAvgTime", (double)(server.getRpcMetrics().getMetricsTimeUnit().convert(10L, TimeUnit.SECONDS)), rpcMetrics); - LOG.info("RpcProcessingTimeAvgTime: {} , RpcQueueTimeAvgTime: {}", + LOG.info("RpcProcessingTimeAvgTime: {} , RpcEnQueueTimeAvgTime: {} , RpcQueueTimeAvgTime: {}", getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics), + getDoubleGauge("RpcEnQueueTimeAvgTime", rpcMetrics), getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics)); assertTrue(getDoubleGauge("RpcProcessingTimeAvgTime", rpcMetrics) > 4000000D); + assertTrue(getDoubleGauge("RpcEnQueueTimeAvgTime", rpcMetrics) + > 4000D); assertTrue(getDoubleGauge("RpcQueueTimeAvgTime", rpcMetrics) > 4000D); } finally {