HDFS-14084. Need for more stats in DFSClient. Contributed by Pranay Singh.

Signed-off-by: Wei-Chiu Chuang <weichiu@apache.org>
This commit is contained in:
Pranay Singh 2019-01-03 09:55:00 -08:00 committed by Wei-Chiu Chuang
parent 14d232c0fe
commit ecdeaa7e6a
3 changed files with 46 additions and 7 deletions

View File

@ -34,6 +34,7 @@
import org.apache.hadoop.io.retry.RetryPolicies; import org.apache.hadoop.io.retry.RetryPolicies;
import org.apache.hadoop.io.retry.RetryPolicy; import org.apache.hadoop.io.retry.RetryPolicy;
import org.apache.hadoop.io.retry.RetryPolicy.RetryAction; import org.apache.hadoop.io.retry.RetryPolicy.RetryAction;
import org.apache.hadoop.ipc.metrics.RpcDetailedMetrics;
import org.apache.hadoop.ipc.RPC.RpcKind; import org.apache.hadoop.ipc.RPC.RpcKind;
import org.apache.hadoop.ipc.Server.AuthProtocol; import org.apache.hadoop.ipc.Server.AuthProtocol;
import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto; import org.apache.hadoop.ipc.protobuf.IpcConnectionContextProtos.IpcConnectionContextProto;
@ -86,6 +87,7 @@
public class Client implements AutoCloseable { public class Client implements AutoCloseable {
public static final Logger LOG = LoggerFactory.getLogger(Client.class); public static final Logger LOG = LoggerFactory.getLogger(Client.class);
private final RpcDetailedMetrics rpcDetailedMetrics;
/** A counter for generating call IDs. */ /** A counter for generating call IDs. */
private static final AtomicInteger callIdCounter = new AtomicInteger(); private static final AtomicInteger callIdCounter = new AtomicInteger();
@ -208,6 +210,24 @@ synchronized ExecutorService unrefAndCleanup() {
} }
}; };
/**
* Update a particular metric by recording the processing
* time of the metric.
*
* @param name Metric name
* @param processingTime time spent in processing the metric.
*/
public void updateMetrics(String name, long processingTime) {
rpcDetailedMetrics.addProcessingTime(name, processingTime);
}
/**
* Get the RpcDetailedMetrics associated with the Client.
*/
public RpcDetailedMetrics getRpcDetailedMetrics() {
return rpcDetailedMetrics;
}
/** /**
* set the ping interval value in configuration * set the ping interval value in configuration
* *
@ -1314,6 +1334,11 @@ public Client(Class<? extends Writable> valueClass, Configuration conf,
this.maxAsyncCalls = conf.getInt( this.maxAsyncCalls = conf.getInt(
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY, CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_KEY,
CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT); CommonConfigurationKeys.IPC_CLIENT_ASYNC_CALLS_MAX_DEFAULT);
/**
* Create with port of -1, dummy port since the function
* takes default argument.
*/
this.rpcDetailedMetrics = RpcDetailedMetrics.create(-1);
} }
/** /**

View File

@ -49,6 +49,8 @@
import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.hadoop.metrics2.MetricStringBuilder;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
/** /**
* RPC Engine for for protobuf based RPCs. * RPC Engine for for protobuf based RPCs.
@ -195,7 +197,7 @@ public Message invoke(Object proxy, final Method method, Object[] args)
throws ServiceException { throws ServiceException {
long startTime = 0; long startTime = 0;
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
startTime = Time.now(); startTime = System.currentTimeMillis();
} }
if (args.length != 2) { // RpcController + Message if (args.length != 2) { // RpcController + Message
@ -250,8 +252,16 @@ public Message invoke(Object proxy, final Method method, Object[] args)
} }
if (LOG.isDebugEnabled()) { if (LOG.isDebugEnabled()) {
long callTime = Time.now() - startTime; long callTime = System.currentTimeMillis() - startTime;
LOG.debug("Call: " + method.getName() + " took " + callTime + "ms"); if (callTime > 0) {
MetricStringBuilder rb =
new MetricStringBuilder(null, "", " = ", "\n");
client.updateMetrics(method.getName(), callTime);
MutableRatesWithAggregation rates =
client.getRpcDetailedMetrics().getMutableRates();
rates.snapshot(rb, true);
LOG.debug("RPC Client stats: {}", rb);
}
} }
if (Client.isAsynchronousMode()) { if (Client.isAsynchronousMode()) {

View File

@ -70,12 +70,16 @@ public void init(Class<?> protocol) {
* @param processingTime the processing time * @param processingTime the processing time
*/ */
//@Override // some instrumentation interface //@Override // some instrumentation interface
public void addProcessingTime(String name, int processingTime) { public void addProcessingTime(String metName, long processingTime) {
rates.add(name, processingTime); rates.add(metName, processingTime);
} }
public void addDeferredProcessingTime(String name, long processingTime) { public void addDeferredProcessingTime(String metName, long processingTime) {
deferredRpcRates.add(name, processingTime); deferredRpcRates.add(metName, processingTime);
}
public MutableRatesWithAggregation getMutableRates() {
return rates;
} }
/** /**