diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java index 5b97eec9c1..eba0da5c0e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/ProcessingDetails.java @@ -20,13 +20,15 @@ import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification.InterfaceStability; +import org.apache.hadoop.ipc.protobuf.RpcHeaderProtos.RpcResponseHeaderProto.RpcStatusProto; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import java.util.concurrent.TimeUnit; /** - * Stores the times that a call takes to be processed through each step. + * Stores the times that a call takes to be processed through each step and + * its response status. */ @InterfaceStability.Unstable @InterfaceAudience.Private @@ -53,6 +55,9 @@ public enum Timing { private long[] timings = new long[Timing.values().length]; + // Rpc return status of this call + private RpcStatusProto returnStatus = RpcStatusProto.SUCCESS; + ProcessingDetails(TimeUnit timeUnit) { this.valueTimeUnit = timeUnit; } @@ -81,6 +86,14 @@ public void add(Timing type, long value, TimeUnit timeUnit) { timings[type.ordinal()] += valueTimeUnit.convert(value, timeUnit); } + public void setReturnStatus(RpcStatusProto status) { + this.returnStatus = status; + } + + public RpcStatusProto getReturnStatus() { + return returnStatus; + } + @Override public String toString() { StringBuilder sb = new StringBuilder(256); diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java index e9a605a004..b46a78553e 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/Server.java @@ -600,17 +600,18 @@ void logSlowRpcCalls(String methodName, Call call, } } - void updateMetrics(Call call, long startTime, boolean connDropped) { + void updateMetrics(Call call, long processingStartTimeNanos, boolean connDropped) { totalRequests.increment(); // delta = handler + processing + response - long deltaNanos = Time.monotonicNowNanos() - startTime; - long timestampNanos = call.timestampNanos; + long completionTimeNanos = Time.monotonicNowNanos(); + long deltaNanos = completionTimeNanos - processingStartTimeNanos; + long arrivalTimeNanos = call.timestampNanos; ProcessingDetails details = call.getProcessingDetails(); // queue time is the delta between when the call first arrived and when it // began being serviced, minus the time it took to be put into the queue details.set(Timing.QUEUE, - startTime - timestampNanos - details.get(Timing.ENQUEUE)); + processingStartTimeNanos - arrivalTimeNanos - details.get(Timing.ENQUEUE)); deltaNanos -= details.get(Timing.PROCESSING); deltaNanos -= details.get(Timing.RESPONSE); details.set(Timing.HANDLER, deltaNanos); @@ -636,10 +637,17 @@ void updateMetrics(Call call, long startTime, boolean connDropped) { processingTime -= waitTime; String name = call.getDetailedMetricsName(); rpcDetailedMetrics.addProcessingTime(name, processingTime); + // Overall processing time is from arrival to completion. + long overallProcessingTime = rpcMetrics.getMetricsTimeUnit() + .convert(completionTimeNanos - arrivalTimeNanos, TimeUnit.NANOSECONDS); + rpcDetailedMetrics.addOverallProcessingTime(name, overallProcessingTime); callQueue.addResponseTime(name, call, details); if (isLogSlowRPC()) { logSlowRpcCalls(name, call, details); } + if (details.getReturnStatus() == RpcStatusProto.SUCCESS) { + rpcMetrics.incrRpcCallSuccesses(); + } } void updateDeferredMetrics(String name, long processingTime) { @@ -1237,6 +1245,7 @@ public Void run() throws Exception { setResponseFields(value, responseParams); sendResponse(); + details.setReturnStatus(responseParams.returnStatus); deltaNanos = Time.monotonicNowNanos() - startNanos; details.set(Timing.RESPONSE, deltaNanos, TimeUnit.NANOSECONDS); } else { diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java index 98b9f262b8..e768079fd6 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcDetailedMetrics.java @@ -33,14 +33,27 @@ @InterfaceAudience.Private @Metrics(about="Per method RPC metrics", context="rpcdetailed") public class RpcDetailedMetrics { + static final String DEFERRED_PREFIX = "Deferred"; + static final String OVERALL_PROCESSING_PREFIX = "Overall"; + // per-method RPC processing time @Metric MutableRatesWithAggregation rates; @Metric MutableRatesWithAggregation deferredRpcRates; + /** + * per-method overall RPC processing time, from request arrival to when the + * response is sent back. + */ + @Metric MutableRatesWithAggregation overallRpcProcessingRates; static final Logger LOG = LoggerFactory.getLogger(RpcDetailedMetrics.class); final MetricsRegistry registry; final String name; + // Mainly to facilitate testing in TestRPC.java + public MutableRatesWithAggregation getOverallRpcProcessingRates() { + return overallRpcProcessingRates; + } + RpcDetailedMetrics(int port) { name = "RpcDetailedActivityForPort"+ port; registry = new MetricsRegistry("rpcdetailed") @@ -61,7 +74,8 @@ public static RpcDetailedMetrics create(int port) { */ public void init(Class protocol) { rates.init(protocol); - deferredRpcRates.init(protocol, "Deferred"); + deferredRpcRates.init(protocol, DEFERRED_PREFIX); + overallRpcProcessingRates.init(protocol, OVERALL_PROCESSING_PREFIX); } /** @@ -78,6 +92,15 @@ public void addDeferredProcessingTime(String name, long processingTime) { deferredRpcRates.add(name, processingTime); } + /** + * Add an overall RPC processing time sample. + * @param rpcCallName of the RPC call + * @param overallProcessingTime the overall RPC processing time + */ + public void addOverallProcessingTime(String rpcCallName, long overallProcessingTime) { + overallRpcProcessingRates.add(rpcCallName, overallProcessingTime); + } + /** * Shutdown the instrumentation for the process */ diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java index c18562441f..ad7aa88d60 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/metrics/RpcMetrics.java @@ -138,6 +138,8 @@ public static RpcMetrics create(Server server, Configuration conf) { MutableCounterLong rpcSlowCalls; @Metric("Number of requeue calls") MutableCounterLong rpcRequeueCalls; + @Metric("Number of successful RPC calls") + MutableCounterLong rpcCallSuccesses; @Metric("Number of open connections") public int numOpenConnections() { return server.getNumOpenConnections(); @@ -330,6 +332,13 @@ public void incrRequeueCalls() { rpcRequeueCalls.incr(); } + /** + * One RPC call success event. + */ + public void incrRpcCallSuccesses() { + rpcCallSuccesses.incr(); + } + /** * Returns a MutableRate Counter. * @return Mutable Rate diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java index 60b33a84b5..72dc797683 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/lib/MutableRatesWithAggregation.java @@ -33,6 +33,7 @@ import org.apache.hadoop.metrics2.util.SampleStat; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import static org.apache.commons.lang3.StringUtils.capitalize; /** @@ -162,7 +163,8 @@ Map getGlobalMetrics() { private synchronized MutableRate addMetricIfNotExists(String name) { MutableRate metric = globalMetrics.get(name); if (metric == null) { - metric = new MutableRate(name + typePrefix, name + typePrefix, false); + String metricName = typePrefix + capitalize(name); + metric = new MutableRate(metricName, metricName, false); metric.setUpdateTimeStamp(true); globalMetrics.put(name, metric); } diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 0777fc42ab..e8d3cc4552 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -82,6 +82,9 @@ The default timeunit used for RPC metrics is milliseconds (as per the below desc | `RpcAuthenticationSuccesses` | Total number of authentication successes | | `RpcAuthorizationFailures` | Total number of authorization failures | | `RpcAuthorizationSuccesses` | Total number of authorization successes | +| `RpcClientBackoff` | Total number of client backoff requests | +| `RpcSlowCalls` | Total number of slow RPC calls | +| `RpcCallsSuccesses` | Total number of RPC calls that are successfully processed | | `NumOpenConnections` | Current number of open connections | | `NumInProcessHandler` | Current number of handlers on working | | `CallQueueLength` | Current length of the call queue | @@ -142,8 +145,10 @@ to FairCallQueue metrics. For each level of priority, rpcqueue and rpcprocessing rpcdetailed context =================== -Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average turn around time for method calls in milliseconds. +Metrics of rpcdetailed context are exposed in unified manner by RPC layer. Two metrics are exposed for each RPC based on its name. Metrics named "(RPC method name)NumOps" indicates total number of method calls, and metrics named "(RPC method name)AvgTime" shows average processing time for method calls in milliseconds. Please note that the AvgTime metrics do not include time spent waiting to acquire locks on data structures (see RpcLockWaitTimeAvgTime). +Metrics named "Overall(RPC method name)AvgTime" shows the average overall processing time for method calls +in milliseconds. It is measured from request arrival to when the response is sent back to the client. rpcdetailed ----------- diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java index bbc241a420..1373a8a40e 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestRPC.java @@ -39,6 +39,7 @@ import org.apache.hadoop.ipc.protobuf.TestProtos; import org.apache.hadoop.metrics2.MetricsRecordBuilder; import org.apache.hadoop.metrics2.lib.MutableCounterLong; +import org.apache.hadoop.metrics2.lib.MutableRatesWithAggregation; import org.apache.hadoop.net.NetUtils; import org.apache.hadoop.security.AccessControlException; import org.apache.hadoop.security.SecurityUtil; @@ -95,6 +96,9 @@ import java.util.concurrent.atomic.AtomicReference; import java.util.concurrent.locks.ReentrantLock; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGt; +import static org.apache.hadoop.test.MetricsAsserts.assertGaugeGte; +import static org.apache.hadoop.test.MetricsAsserts.mockMetricsRecordBuilder; import static org.assertj.core.api.Assertions.assertThat; import static org.apache.hadoop.test.MetricsAsserts.assertCounter; import static org.apache.hadoop.test.MetricsAsserts.assertCounterGt; @@ -1397,6 +1401,82 @@ public void testNumInProcessHandlerMetrics() throws Exception { } } + /** + * Test the rpcCallSucesses metric in RpcMetrics. + */ + @Test + public void testRpcCallSuccessesMetric() throws Exception { + final Server server; + TestRpcService proxy = null; + + server = setupTestServer(conf, 5); + try { + proxy = getClient(addr, conf); + + // 10 successful responses + for (int i = 0; i < 10; i++) { + proxy.ping(null, newEmptyRequest()); + } + MetricsRecordBuilder rpcMetrics = + getMetrics(server.getRpcMetrics().name()); + assertCounter("RpcCallSuccesses", 10L, rpcMetrics); + // rpcQueueTimeNumOps equals total number of RPC calls. + assertCounter("RpcQueueTimeNumOps", 10L, rpcMetrics); + + // 2 failed responses with ERROR status and 1 more successful response. + for (int i = 0; i < 2; i++) { + try { + proxy.error(null, newEmptyRequest()); + } catch (ServiceException ignored) { + } + } + proxy.ping(null, newEmptyRequest()); + + rpcMetrics = getMetrics(server.getRpcMetrics().name()); + assertCounter("RpcCallSuccesses", 11L, rpcMetrics); + assertCounter("RpcQueueTimeNumOps", 13L, rpcMetrics); + } finally { + stop(server, proxy); + } + } + + /** + * Test per-type overall RPC processing time metric. + */ + @Test + public void testOverallRpcProcessingTimeMetric() throws Exception { + final Server server; + TestRpcService proxy = null; + + server = setupTestServer(conf, 5); + try { + proxy = getClient(addr, conf); + + // Sent 1 ping request and 2 lockAndSleep requests + proxy.ping(null, newEmptyRequest()); + proxy.lockAndSleep(null, newSleepRequest(10)); + proxy.lockAndSleep(null, newSleepRequest(12)); + + MetricsRecordBuilder rb = mockMetricsRecordBuilder(); + MutableRatesWithAggregation rates = + server.rpcDetailedMetrics.getOverallRpcProcessingRates(); + rates.snapshot(rb, true); + + // Verify the ping request. + // Overall processing time for ping is zero when this test is run together with + // the rest of tests. Thus, we use assertGaugeGte() for OverallPingAvgTime. + assertCounter("OverallPingNumOps", 1L, rb); + assertGaugeGte("OverallPingAvgTime", 0.0, rb); + + // Verify lockAndSleep requests. AvgTime should be greater than 10 ms, + // since we sleep for 10 and 12 ms respectively. + assertCounter("OverallLockAndSleepNumOps", 2L, rb); + assertGaugeGt("OverallLockAndSleepAvgTime", 10.0, rb); + + } finally { + stop(server, proxy); + } + } /** * Test RPC backoff by queue full. */ diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java index 85635e01e1..2cc3d706f0 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/lib/TestMutableMetrics.java @@ -328,7 +328,20 @@ public void testDuplicateMetrics() { verify(rb, times(1)) .addCounter(info("GetLongNumOps", "Number of ops for getLong"), 0L); verify(rb, times(1)).addCounter( - info("GetLongDeferredNumOps", "Number of ops for getLongDeferred"), 0L); + info("DeferredGetLongNumOps", "Number of ops for deferredGetLong"), 0L); + + // Add some samples and verify + rb = mockMetricsRecordBuilder(); + rates.add("testRpcMethod", 10); + deferredRpcRates.add("testRpcMethod", 100); + deferredRpcRates.add("testRpcMethod", 500); + rates.snapshot(rb, true); + deferredRpcRates.snapshot(rb, true); + + assertCounter("TestRpcMethodNumOps", 1L, rb); + assertGauge("TestRpcMethodAvgTime", 10.0, rb); + assertCounter("DeferredTestRpcMethodNumOps", 2L, rb); + assertGauge("DeferredTestRpcMethodAvgTime", 300.0, rb); } /** diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java index 38b475a277..60f752d160 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/test/MetricsAsserts.java @@ -358,6 +358,19 @@ public static void assertGaugeGt(String name, double greater, getDoubleGauge(name, rb) > greater); } + /** + * Assert that a double gauge metric is greater than or equal to a value. + * @param name of the metric + * @param greater value of the metric should be greater than or equal to this + * @param rb the record builder mock used to getMetrics + */ + public static void assertGaugeGte(String name, double greater, + MetricsRecordBuilder rb) { + double curValue = getDoubleGauge(name, rb); + Assert.assertTrue("Bad value for metric " + name, + curValue >= greater); + } + /** * Assert that a double gauge metric is greater than a value * @param name of the metric