From 317fe4584a51cfe553e4098d48170cd2898b9732 Mon Sep 17 00:00:00 2001 From: Erik Krogen Date: Tue, 14 Jul 2020 11:22:16 -0700 Subject: [PATCH] HADOOP-17127. Use RpcMetrics.TIMEUNIT to initialize rpc queueTime and processingTime. Contributed by Jim Brennan. --- .../org/apache/hadoop/ipc/DecayRpcScheduler.java | 5 +++-- .../java/org/apache/hadoop/ipc/RpcScheduler.java | 12 ++++++------ .../src/test/java/org/apache/hadoop/ipc/TestRPC.java | 6 +++++- 3 files changed, 14 insertions(+), 9 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java index 3e952eb63c..45cbd4e99d 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/DecayRpcScheduler.java @@ -43,6 +43,7 @@ import org.apache.commons.lang3.exception.ExceptionUtils; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeys; import org.apache.hadoop.ipc.metrics.DecayRpcSchedulerDetailedMetrics; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.metrics2.MetricsCollector; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsSource; @@ -632,8 +633,8 @@ public class DecayRpcScheduler implements RpcScheduler, addCost(user, processingCost); int priorityLevel = schedulable.getPriorityLevel(); - long queueTime = details.get(Timing.QUEUE, TimeUnit.MILLISECONDS); - long processingTime = details.get(Timing.PROCESSING, TimeUnit.MILLISECONDS); + long queueTime = details.get(Timing.QUEUE, RpcMetrics.TIMEUNIT); + long processingTime = details.get(Timing.PROCESSING, RpcMetrics.TIMEUNIT); this.decayRpcSchedulerDetailedMetrics.addQueueTime( priorityLevel, queueTime); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java index 63812f47f2..5202c6b356 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/RpcScheduler.java @@ -18,7 +18,7 @@ package org.apache.hadoop.ipc; -import java.util.concurrent.TimeUnit; +import org.apache.hadoop.ipc.metrics.RpcMetrics; /** * Implement this interface to be used for RPC scheduling and backoff. @@ -62,12 +62,12 @@ public interface RpcScheduler { // this interface, a default implementation is supplied which uses the old // method. All new implementations MUST override this interface and should // NOT use the other addResponseTime method. - int queueTimeMs = (int) - details.get(ProcessingDetails.Timing.QUEUE, TimeUnit.MILLISECONDS); - int processingTimeMs = (int) - details.get(ProcessingDetails.Timing.PROCESSING, TimeUnit.MILLISECONDS); + int queueTime = (int) + details.get(ProcessingDetails.Timing.QUEUE, RpcMetrics.TIMEUNIT); + int processingTime = (int) + details.get(ProcessingDetails.Timing.PROCESSING, RpcMetrics.TIMEUNIT); addResponseTime(callName, schedulable.getPriorityLevel(), - queueTimeMs, processingTimeMs); + queueTime, processingTime); } void stop(); diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index 640ca3d2b8..cd2433a8af 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -29,6 +29,7 @@ import org.apache.hadoop.io.retry.RetryProxy; import org.apache.hadoop.ipc.Client.ConnectionId; import org.apache.hadoop.ipc.Server.Call; import org.apache.hadoop.ipc.Server.Connection; +import org.apache.hadoop.ipc.metrics.RpcMetrics; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcErrorCodeProto; import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.apache.hadoop.ipc.protobuf.TestProtos; @@ -81,6 +82,7 @@ import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicReference; @@ -1095,7 +1097,9 @@ public class TestRPC extends TestRpcBase { proxy.lockAndSleep(null, newSleepRequest(5)); rpcMetrics = getMetrics(server.getRpcMetrics().name()); - assertGauge("RpcLockWaitTimeAvgTime", 10000.0, rpcMetrics); + assertGauge("RpcLockWaitTimeAvgTime", + (double)(RpcMetrics.TIMEUNIT.convert(10L, TimeUnit.SECONDS)), + rpcMetrics); } finally { if (proxy2 != null) { RPC.stopProxy(proxy2);