HDFS-17042 Add rpcCallSuccesses and OverallRpcProcessingTime to RpcMetrics for Namenode (#5730)

This commit is contained in:
Xing Lin 2023-06-15 13:59:58 -07:00 committed by GitHub
parent 02027c8dcc
commit 427366b73b
No known key found for this signature in database
GPG Key ID: 4AEE18F83AFDEB23
9 changed files with 176 additions and 9 deletions

View File

@ -20,13 +20,15 @@
import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceAudience;
import org.apache.hadoop.classification.InterfaceStability; import org.apache.hadoop.classification.InterfaceStability;
import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import java.util.concurrent.TimeUnit; import java.util.concurrent.TimeUnit;
/** /**
* Stores the times that a call takes to be processed through each step. * Stores the times that a call takes to be processed through each step and
* its response status.
*/ */
@InterfaceStability.Unstable @InterfaceStability.Unstable
@InterfaceAudience.Private @InterfaceAudience.Private
@ -53,6 +55,9 @@ public enum Timing {
private long[] timings = new long[Timing.values().length]; private long[] timings = new long[Timing.values().length];
// Rpc return status of this call
private RpcStatusProto returnStatus = RpcStatusProto.SUCCESS;
ProcessingDetails(TimeUnit timeUnit) { ProcessingDetails(TimeUnit timeUnit) {
this.valueTimeUnit = timeUnit; this.valueTimeUnit = timeUnit;
} }
@ -81,6 +86,14 @@ public void add(Timing type, long value, TimeUnit timeUnit) {
timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit); timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit);
} }
public void setReturnStatus(RpcStatusProto status) {
this.returnStatus = status;
}
public RpcStatusProto getReturnStatus() {
return returnStatus;
}
@Override @Override
public String toString() { public String toString() {
StringBuilder sb = new StringBuilder(256); StringBuilder sb = new StringBuilder(256);

View File

@ -600,17 +600,18 @@ void logSlowRpcCalls(String methodName, Call call,
} }
} }
void updateMetrics(Call call, long startTime, boolean connDropped) { void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped) {
totalRequests.increment(); totalRequests.increment();
// delta = handler + processing + response // delta = handler + processing + response
long deltaNanos = Time.monotonicNowNanos() - startTime; long completionTimeNanos = Time.monotonicNowNanos();
long timestampNanos = call.timestampNanos; long deltaNanos = completionTimeNanos - processingStartTimeNanos;
long arrivalTimeNanos = call.timestampNanos;
ProcessingDetails details = call.getProcessingDetails(); ProcessingDetails details = call.getProcessingDetails();
// queue time is the delta between when the call first arrived and when it // queue time is the delta between when the call first arrived and when it
// began being serviced, minus the time it took to be put into the queue // began being serviced, minus the time it took to be put into the queue
details.set(Timing.QUEUE, details.set(Timing.QUEUE,
startTime - timestampNanos - details.get(Timing.ENQUEUE)); processingStartTimeNanos - arrivalTimeNanos - details.get(Timing.ENQUEUE));
deltaNanos -= details.get(Timing.PROCESSING); deltaNanos -= details.get(Timing.PROCESSING);
deltaNanos -= details.get(Timing.RESPONSE); deltaNanos -= details.get(Timing.RESPONSE);
details.set(Timing.HANDLER, deltaNanos); details.set(Timing.HANDLER, deltaNanos);
@ -636,10 +637,17 @@ void updateMetrics(Call call, long startTime, boolean connDropped) {
processingTime -= waitTime; processingTime -= waitTime;
String name = call.getDetailedMetricsName(); String name = call.getDetailedMetricsName();
rpcDetailedMetrics.addProcessingTime(name, processingTime); rpcDetailedMetrics.addProcessingTime(name, processingTime);
// Overall processing time is from arrival to completion.
long overallProcessingTime = rpcMetrics.getMetricsTimeUnit()
.convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS);
rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime);
callQueue.addResponseTime(name, call, details); callQueue.addResponseTime(name, call, details);
if (isLogSlowRPC()) { if (isLogSlowRPC()) {
logSlowRpcCalls(name, call, details); logSlowRpcCalls(name, call, details);
} }
if (details.getReturnStatus() == RpcStatusProto.SUCCESS) {
rpcMetrics.incrRpcCallSuccesses();
}
} }
void updateDeferredMetrics(String name, long processingTime) { void updateDeferredMetrics(String name, long processingTime) {
@ -1237,6 +1245,7 @@ public Void run() throws Exception {
setResponseFields(value, responseParams); setResponseFields(value, responseParams);
sendResponse(); sendResponse();
details.setReturnStatus(responseParams.returnStatus);
deltaNanos = Time.monotonicNowNanos() - startNanos; deltaNanos = Time.monotonicNowNanos() - startNanos;
details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS); details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS);
} else { } else {

View File

@ -33,14 +33,27 @@
@InterfaceAudience.Private @InterfaceAudience.Private
@Metrics(about="Per method RPC metrics", context="rpcdetailed") @Metrics(about="Per method RPC metrics", context="rpcdetailed")
public class RpcDetailedMetrics { public class RpcDetailedMetrics {
static final String DEFERRED_PREFIX = "Deferred";
static final String OVERALL_PROCESSING_PREFIX = "Overall";
// per-method RPC processing time
@Metric MutableRatesWithAggregation rates; @Metric MutableRatesWithAggregation rates;
@Metric MutableRatesWithAggregation deferredRpcRates; @Metric MutableRatesWithAggregation deferredRpcRates;
/**
* per-method overall RPC processing time, from request arrival to when the
* response is sent back.
*/
@Metric MutableRatesWithAggregation overallRpcProcessingRates;
static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class); static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class);
final MetricsRegistry registry; final MetricsRegistry registry;
final String name; final String name;
// Mainly to facilitate testing in TestRPC.java
public MutableRatesWithAggregation getOverallRpcProcessingRates() {
return overallRpcProcessingRates;
}
RpcDetailedMetrics(int port) { RpcDetailedMetrics(int port) {
name = "RpcDetailedActivityForPort"+ port; name = "RpcDetailedActivityForPort"+ port;
registry = new MetricsRegistry("rpcdetailed") registry = new MetricsRegistry("rpcdetailed")
@ -61,7 +74,8 @@ public static RpcDetailedMetrics create(int port) {
*/ */
public void init(Class<?> protocol) { public void init(Class<?> protocol) {
rates.init(protocol); rates.init(protocol);
deferredRpcRates.init(protocol, "Deferred"); deferredRpcRates.init(protocol, DEFERRED_PREFIX);
overallRpcProcessingRates.init(protocol, OVERALL_PROCESSING_PREFIX);
} }
/** /**
@ -78,6 +92,15 @@ public void addDeferredProcessingTime(String name, long processingTime) {
deferredRpcRates.add(name, processingTime); deferredRpcRates.add(name, processingTime);
} }
/**
* Add an overall RPC processing time sample.
* @param rpcCallName of the RPC call
* @param overallProcessingTime the overall RPC processing time
*/
public void addOverallProcessingTime(String rpcCallName, long overallProcessingTime) {
overallRpcProcessingRates.add(rpcCallName, overallProcessingTime);
}
/** /**
* Shutdown the instrumentation for the process * Shutdown the instrumentation for the process
*/ */

View File

@ -138,6 +138,8 @@ public static RpcMetrics create(Server server, Configuration conf) {
MutableCounterLong rpcSlowCalls; MutableCounterLong rpcSlowCalls;
@Metric("Number of requeue calls") @Metric("Number of requeue calls")
MutableCounterLong rpcRequeueCalls; MutableCounterLong rpcRequeueCalls;
@Metric("Number of successful RPC calls")
MutableCounterLong rpcCallSuccesses;
@Metric("Number of open connections") public int numOpenConnections() { @Metric("Number of open connections") public int numOpenConnections() {
return server.getNumOpenConnections(); return server.getNumOpenConnections();
@ -330,6 +332,13 @@ public void incrRequeueCalls() {
rpcRequeueCalls.incr(); rpcRequeueCalls.incr();
} }
/**
* One RPC call success event.
*/
public void incrRpcCallSuccesses() {
rpcCallSuccesses.incr();
}
/** /**
* Returns a MutableRate Counter. * Returns a MutableRate Counter.
* @return Mutable Rate * @return Mutable Rate

View File

@ -33,6 +33,7 @@
import org.apache.hadoop.metrics2.util.SampleStat; import org.apache.hadoop.metrics2.util.SampleStat;
import org.slf4j.Logger; import org.slf4j.Logger;
import org.slf4j.LoggerFactory; import org.slf4j.LoggerFactory;
import static org.apache.commons.lang3.StringUtils.capitalize;
/** /**
@ -162,7 +163,8 @@ Map<String, MutableRate> getGlobalMetrics() {
private synchronized MutableRate addMetricIfNotExists(String name) { private synchronized MutableRate addMetricIfNotExists(String name) {
MutableRate metric = globalMetrics.get(name); MutableRate metric = globalMetrics.get(name);
if (metric == null) { if (metric == null) {
metric = new MutableRate(name + typePrefix, name + typePrefix, false); String metricName = typePrefix + capitalize(name);
metric = new MutableRate(metricName, metricName, false);
metric.setUpdateTimeStamp(true); metric.setUpdateTimeStamp(true);
globalMetrics.put(name, metric); globalMetrics.put(name, metric);
} }

View File

@ -82,6 +82,9 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc
| `RpcAuthenticationSuccesses` | Total number of authentication successes | | `RpcAuthenticationSuccesses` | Total number of authentication successes |
| `RpcAuthorizationFailures` | Total number of authorization failures | | `RpcAuthorizationFailures` | Total number of authorization failures |
| `RpcAuthorizationSuccesses` | Total number of authorization successes | | `RpcAuthorizationSuccesses` | Total number of authorization successes |
| `RpcClientBackoff` | Total number of client backoff requests |
| `RpcSlowCalls` | Total number of slow RPC calls |
| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed |
| `NumOpenConnections` | Current number of open connections | | `NumOpenConnections` | Current number of open connections |
| `NumInProcessHandler` | Current number of handlers on working | | `NumInProcessHandler` | Current number of handlers on working |
| `CallQueueLength` | Current length of the call queue | | `CallQueueLength` | Current length of the call queue |
@ -142,8 +145,10 @@ to FairCallQueue metrics. For each level of priority, rpcqueue and rpcprocessing
rpcdetailed context rpcdetailed context
=================== ===================
Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average turn around time for method calls in milliseconds. Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average processing time for method calls in milliseconds.
Please note that the AvgTime metrics do not include time spent waiting to acquire locks on data structures (see RpcLockWaitTimeAvgTime). Please note that the AvgTime metrics do not include time spent waiting to acquire locks on data structures (see RpcLockWaitTimeAvgTime).
Metrics named "Overall(RPC method name)AvgTime" shows the average overall processing time for method calls
in milliseconds. It is measured from request arrival to when the response is sent back to the client.
rpcdetailed rpcdetailed
----------- -----------

View File

@ -39,6 +39,7 @@
import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.ipc.protobuf.TestProtos;
import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.MetricsRecordBuilder;
import org.apache.hadoop.metrics2.lib.MutableCounterLong; import org.apache.hadoop.metrics2.lib.MutableCounterLong;
import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation;
import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.net.NetUtils;
import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.AccessControlException;
import org.apache.hadoop.security.SecurityUtil; import org.apache.hadoop.security.SecurityUtil;
@ -95,6 +96,9 @@
import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock; import java.util.concurrent.locks.ReentrantLock;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt;
import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGte;
import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder;
import static org.assertj.core.api.Assertions.assertThat; import static org.assertj.core.api.Assertions.assertThat;
import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt;
@ -1397,6 +1401,82 @@ public void testNumInProcessHandlerMetrics() throws Exception {
} }
} }
/**
* Test the rpcCallSucesses metric in RpcMetrics.
*/
@Test
public void testRpcCallSuccessesMetric() throws Exception {
final Server server;
TestRpcService proxy = null;
server = setupTestServer(conf, 5);
try {
proxy = getClient(addr, conf);
// 10 successful responses
for (int i = 0; i < 10; i++) {
proxy.ping(null, newEmptyRequest());
}
MetricsRecordBuilder rpcMetrics =
getMetrics(server.getRpcMetrics().name());
assertCounter("RpcCallSuccesses", 10L, rpcMetrics);
// rpcQueueTimeNumOps equals total number of RPC calls.
assertCounter("RpcQueueTimeNumOps", 10L, rpcMetrics);
// 2 failed responses with ERROR status and 1 more successful response.
for (int i = 0; i < 2; i++) {
try {
proxy.error(null, newEmptyRequest());
} catch (ServiceException ignored) {
}
}
proxy.ping(null, newEmptyRequest());
rpcMetrics = getMetrics(server.getRpcMetrics().name());
assertCounter("RpcCallSuccesses", 11L, rpcMetrics);
assertCounter("RpcQueueTimeNumOps", 13L, rpcMetrics);
} finally {
stop(server, proxy);
}
}
/**
* Test per-type overall RPC processing time metric.
*/
@Test
public void testOverallRpcProcessingTimeMetric() throws Exception {
final Server server;
TestRpcService proxy = null;
server = setupTestServer(conf, 5);
try {
proxy = getClient(addr, conf);
// Sent 1 ping request and 2 lockAndSleep requests
proxy.ping(null, newEmptyRequest());
proxy.lockAndSleep(null, newSleepRequest(10));
proxy.lockAndSleep(null, newSleepRequest(12));
MetricsRecordBuilder rb = mockMetricsRecordBuilder();
MutableRatesWithAggregation rates =
server.rpcDetailedMetrics.getOverallRpcProcessingRates();
rates.snapshot(rb, true);
// Verify the ping request.
// Overall processing time for ping is zero when this test is run together with
// the rest of tests. Thus, we use assertGaugeGte() for OverallPingAvgTime.
assertCounter("OverallPingNumOps", 1L, rb);
assertGaugeGte("OverallPingAvgTime", 0.0, rb);
// Verify lockAndSleep requests. AvgTime should be greater than 10 ms,
// since we sleep for 10 and 12 ms respectively.
assertCounter("OverallLockAndSleepNumOps", 2L, rb);
assertGaugeGt("OverallLockAndSleepAvgTime", 10.0, rb);
} finally {
stop(server, proxy);
}
}
/** /**
* Test RPC backoff by queue full. * Test RPC backoff by queue full.
*/ */

View File

@ -328,7 +328,20 @@ public void testDuplicateMetrics() {
verify(rb, times(1)) verify(rb, times(1))
.addCounter(info("GetLongNumOps", "Number of ops for getLong"), 0L); .addCounter(info("GetLongNumOps", "Number of ops for getLong"), 0L);
verify(rb, times(1)).addCounter( verify(rb, times(1)).addCounter(
info("GetLongDeferredNumOps", "Number of ops for getLongDeferred"), 0L); info("DeferredGetLongNumOps", "Number of ops for deferredGetLong"), 0L);
// Add some samples and verify
rb = mockMetricsRecordBuilder();
rates.add("testRpcMethod", 10);
deferredRpcRates.add("testRpcMethod", 100);
deferredRpcRates.add("testRpcMethod", 500);
rates.snapshot(rb, true);
deferredRpcRates.snapshot(rb, true);
assertCounter("TestRpcMethodNumOps", 1L, rb);
assertGauge("TestRpcMethodAvgTime", 10.0, rb);
assertCounter("DeferredTestRpcMethodNumOps", 2L, rb);
assertGauge("DeferredTestRpcMethodAvgTime", 300.0, rb);
} }
/** /**

View File

@ -358,6 +358,19 @@ public static void assertGaugeGt(String name, double greater,
getDoubleGauge(name, rb) > greater); getDoubleGauge(name, rb) > greater);
} }
/**
* Assert that a double gauge metric is greater than or equal to a value.
* @param name of the metric
* @param greater value of the metric should be greater than or equal to this
* @param rb the record builder mock used to getMetrics
*/
public static void assertGaugeGte(String name, double greater,
MetricsRecordBuilder rb) {
double curValue = getDoubleGauge(name, rb);
Assert.assertTrue("Bad value for metric " + name,
curValue >= greater);
}
/** /**
* Assert that a double gauge metric is greater than a value * Assert that a double gauge metric is greater than a value
* @param name of the metric * @param name of the metric