From bf08f4abae43d706a305af3f14e00f01c00dba7c Mon Sep 17 00:00:00 2001 From: Chen Liang Date: Fri, 11 Jan 2019 14:01:23 -0800 Subject: [PATCH] HADOOP-15481. Emit FairCallQueue stats as metrics. Contributed by Christopher Gregorian. --- .../org/apache/hadoop/ipc/FairCallQueue.java | 32 +++++++++++++++-- .../src/site/markdown/Metrics.md | 10 ++++++ .../apache/hadoop/ipc/TestFairCallQueue.java | 36 +++++++++++++++++++ 3 files changed, 76 insertions(+), 2 deletions(-) diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java index 3a8c83dea7..380426fe5b 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/ipc/FairCallQueue.java @@ -35,6 +35,11 @@ import org.apache.commons.lang3.NotImplementedException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.ipc.CallQueueManager.CallQueueOverflowException; +import org.apache.hadoop.metrics2.MetricsCollector; +import org.apache.hadoop.metrics2.MetricsRecordBuilder; +import org.apache.hadoop.metrics2.MetricsSource; +import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem; +import org.apache.hadoop.metrics2.lib.Interns; import org.apache.hadoop.metrics2.util.MBeans; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,7 +48,7 @@ * A queue with multiple levels for each priority. */ public class FairCallQueue extends AbstractQueue - implements BlockingQueue { + implements BlockingQueue { @Deprecated public static final int IPC_CALLQUEUE_PRIORITY_LEVELS_DEFAULT = 4; @Deprecated @@ -335,7 +340,8 @@ public int remainingCapacity() { * MetricsProxy is a singleton because we may init multiple * FairCallQueues, but the metrics system cannot unregister beans cleanly. */ - private static final class MetricsProxy implements FairCallQueueMXBean { + private static final class MetricsProxy implements FairCallQueueMXBean, + MetricsSource { // One singleton per namespace private static final HashMap INSTANCES = new HashMap(); @@ -346,8 +352,13 @@ private static final class MetricsProxy implements FairCallQueueMXBean { // Keep track of how many objects we registered private int revisionNumber = 0; + private String namespace; + private MetricsProxy(String namespace) { + this.namespace = namespace; MBeans.register(namespace, "FairCallQueue", this); + final String name = namespace + ".FairCallQueue"; + DefaultMetricsSystem.instance().register(name, name, this); } public static synchronized MetricsProxy getInstance(String namespace) { @@ -389,6 +400,23 @@ public long[] getOverflowedCalls() { @Override public int getRevision() { return revisionNumber; } + + @Override + public void getMetrics(MetricsCollector collector, boolean all) { + MetricsRecordBuilder rb = collector.addRecord("FairCallQueue") + .setContext("rpc") + .tag(Interns.info("namespace", "Namespace"), namespace); + + final int[] currentQueueSizes = getQueueSizes(); + final long[] currentOverflowedCalls = getOverflowedCalls(); + + for (int i = 0; i < currentQueueSizes.length; i++) { + rb.addGauge(Interns.info("FairCallQueueSize_p" + i, "FCQ Queue Size"), + currentQueueSizes[i]); + rb.addCounter(Interns.info("FairCallQueueOverflowedCalls_p" + i, + "FCQ Overflowed Calls"), currentOverflowedCalls[i]); + } + } } // FairCallQueueMXBean diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index 1e219405ad..1ef2b44b6e 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -104,6 +104,16 @@ RetryCache metrics is useful to monitor NameNode fail-over. Each metrics record | `CacheCleared` | Total number of RetryCache cleared | | `CacheUpdated` | Total number of RetryCache updated | +FairCallQueue +------------- + +FairCallQueue metrics will only exist if FairCallQueue is enabled. Each metric exists for each level of priority. + +| Name | Description | +|:---- |:---- | +| `FairCallQueueSize_p`*Priority* | Current number of calls in priority queue | +| `FairCallQueueOverflowedCalls_p`*Priority* | Total number of overflowed calls in priority queue | + rpcdetailed context =================== diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java index d82a2f1a2d..776db5e002 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/ipc/TestFairCallQueue.java @@ -18,6 +18,9 @@ package org.apache.hadoop.ipc; +import static org.apache.hadoop.test.MetricsAsserts.assertGauge; +import static org.apache.hadoop.test.MetricsAsserts.assertCounter; +import static org.apache.hadoop.test.MetricsAsserts.getMetrics; import static org.mockito.Mockito.anyInt; import static org.mockito.Mockito.anyObject; import static org.mockito.Mockito.doThrow; @@ -637,4 +640,37 @@ public void testFairCallQueueMXBean() throws Exception { assertEquals(0, queueSizes[0]); assertEquals(0, queueSizes[1]); } + + @Test + public void testFairCallQueueMetrics() throws Exception { + final String fcqMetrics = "ns.FairCallQueue"; + Schedulable p0 = mockCall("a", 0); + Schedulable p1 = mockCall("b", 1); + + assertGauge("FairCallQueueSize_p0", 0, getMetrics(fcqMetrics)); + assertGauge("FairCallQueueSize_p1", 0, getMetrics(fcqMetrics)); + assertCounter("FairCallQueueOverflowedCalls_p0", 0L, + getMetrics(fcqMetrics)); + assertCounter("FairCallQueueOverflowedCalls_p1", 0L, + getMetrics(fcqMetrics)); + + for (int i = 0; i < 5; i++) { + fcq.add(p0); + fcq.add(p1); + } + + try { + fcq.add(p1); + fail("didn't overflow"); + } catch (IllegalStateException ise) { + // Expected exception + } + + assertGauge("FairCallQueueSize_p0", 5, getMetrics(fcqMetrics)); + assertGauge("FairCallQueueSize_p1", 5, getMetrics(fcqMetrics)); + assertCounter("FairCallQueueOverflowedCalls_p0", 0L, + getMetrics(fcqMetrics)); + assertCounter("FairCallQueueOverflowedCalls_p1", 1L, + getMetrics(fcqMetrics)); + } }