From ecdeaa7e6ad43555031aed032e6ba7a14a17d7bc Mon Sep 17 00:00:00 2001 From: Pranay Singh Date: Thu, 3 Jan 2019 09:55:00 -0800 Subject: [PATCH] HDFS-14084. Need for more stats in DFSClient. Contributed by Pranay Singh. Signed-off-by: Wei-Chiu Chuang --- .../java/org/apache/hadoop/ipc/Client.java | 25 +++++++++++++++++++ .../apache/hadoop/ipc/ProtobufRpcEngine.java | 16 +++++++++--- .../ipc/metrics/RpcDetailedMetrics.java | 12 ++++++--- 3 files changed, 46 insertions(+), 7 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java index ea79887273..124d068e46 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Client.java @@ -34,6 +34,7 @@ import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; +import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics; import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; @@ -86,6 +87,7 @@ public class Client implements AutoCloseable { public static final Logger LOG = LoggerFactory.getLogger(Client.class); + private final RpcDetailedMetrics rpcDetailedMetrics; /** A counter for generating call IDs. */ private static final AtomicInteger callIdCounter = new AtomicInteger(); @@ -208,6 +210,24 @@ synchronized ExecutorService unrefAndCleanup() { } }; + /** + * Update a particular metric by recording the processing + * time of the metric. + * + * @param name Metric name + * @param processingTime time spent in processing the metric. + */ + public void updateMetrics(String name, long processingTime) { + rpcDetailedMetrics.addProcessingTime(name, processingTime); + } + + /** + * Get the RpcDetailedMetrics associated with the Client. + */ + public RpcDetailedMetrics getRpcDetailedMetrics() { + return rpcDetailedMetrics; + } + /** * set the ping interval value in configuration * @@ -1314,6 +1334,11 @@ public Client(Class valueClass, Configuration conf, this.maxAsyncCalls = conf.getInt( CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); + /** + * Create with port of -1, dummy port since the function + * takes default argument. + */ + this.rpcDetailedMetrics = RpcDetailedMetrics.create(-1); } /** diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java index 5548566252..e52dc66d32 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProtobufRpcEngine.java @@ -49,6 +49,8 @@ import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; +import org.apache.hadoop.metrics2.MetricStringBuilder; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; /** * RPC Engine for for protobuf based RPCs. @@ -195,7 +197,7 @@ public Message invoke(Object proxy, final Method method, Object[] args) throws ServiceException { long startTime = 0; if (LOG.isDebugEnabled()) { - startTime = Time.now(); + startTime = System.currentTimeMillis(); } if (args.length != 2) { // RpcController + Message @@ -250,8 +252,16 @@ public Message invoke(Object proxy, final Method method, Object[] args) } if (LOG.isDebugEnabled()) { - long callTime = Time.now() - startTime; - LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); + long callTime = System.currentTimeMillis() - startTime; + if (callTime > 0) { + MetricStringBuilder rb = + new MetricStringBuilder(null, "", " = ", "\n"); + client.updateMetrics(method.getName(), callTime); + MutableRatesWithAggregation rates = + client.getRpcDetailedMetrics().getMutableRates(); + rates.snapshot(rb, true); + LOG.debug("RPC Client stats: {}", rb); + } } if (Client.isAsynchronousMode()) { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java index 6ed57ec6d9..9be9c5addd 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -70,12 +70,16 @@ public void init(Class protocol) { * @param processingTime the processing time */ //@Override // some instrumentation interface - public void addProcessingTime(String name, int processingTime) { - rates.add(name, processingTime); + public void addProcessingTime(String metName, long processingTime) { + rates.add(metName, processingTime); } - public void addDeferredProcessingTime(String name, long processingTime) { - deferredRpcRates.add(name, processingTime); + public void addDeferredProcessingTime(String metName, long processingTime) { + deferredRpcRates.add(metName, processingTime); + } + + public MutableRatesWithAggregation getMutableRates() { + return rates; } /**