From f0f299268625af275522f55d5bfc43118c31bdd8 Mon Sep 17 00:00:00 2001 From: Jason Lowe Date: Thu, 19 Feb 2015 17:30:07 +0000 Subject: [PATCH] HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA --- .../hadoop-common/CHANGES.txt | 3 ++ .../metrics2/impl/MetricsSinkAdapter.java | 15 +++++- .../src/site/markdown/Metrics.md | 2 +- .../metrics2/impl/TestMetricsSystemImpl.java | 50 +++++++++++++++++++ 4 files changed, 67 insertions(+), 3 deletions(-) diff --git a/hadoop-common-project/hadoop-common/CHANGES.txt b/hadoop-common-project/hadoop-common/CHANGES.txt index c01e3d64e7..8d3f9f5784 100644 --- a/hadoop-common-project/hadoop-common/CHANGES.txt +++ b/hadoop-common-project/hadoop-common/CHANGES.txt @@ -973,6 +973,9 @@ Release 2.7.0 - UNRELEASED HADOOP-11595. Add default implementation for AbstractFileSystem#truncate. (yliu) + HADOOP-9087. Queue size metric for metric sinks isn't actually maintained + (Akira AJISAKA via jlowe) + Release 2.6.1 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java index 9add494165..ed52317538 100644 --- a/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java +++ b/hadoop-common-project/hadoop-common/src/main/java/org/apache/hadoop/metrics2/impl/MetricsSinkAdapter.java @@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer { boolean putMetrics(MetricsBuffer buffer, long logicalTime) { if (logicalTime % period == 0) { LOG.debug("enqueue, logicalTime="+ logicalTime); - if (queue.enqueue(buffer)) return true; + if (queue.enqueue(buffer)) { + refreshQueueSizeGauge(); + return true; + } dropped.incr(); return false; } @@ -105,7 +108,9 @@ boolean putMetrics(MetricsBuffer buffer, long logicalTime) { public boolean putMetricsImmediate(MetricsBuffer buffer) { WaitableMetricsBuffer waitableBuffer = new WaitableMetricsBuffer(buffer); - if (!queue.enqueue(waitableBuffer)) { + if (queue.enqueue(waitableBuffer)) { + refreshQueueSizeGauge(); + } else { LOG.warn(name + " has a full queue and can't consume the given metrics."); dropped.incr(); return false; @@ -127,6 +132,7 @@ void publishMetricsFromQueue() { while (!stopping) { try { queue.consumeAll(this); + refreshQueueSizeGauge(); retryDelay = firstRetryDelay; n = retryCount; inError = false; @@ -151,12 +157,17 @@ void publishMetricsFromQueue() { "suppressing further error messages", e); } queue.clear(); + refreshQueueSizeGauge(); inError = true; // Don't keep complaining ad infinitum } } } } + private void refreshQueueSizeGauge() { + qsize.set(queue.size()); + } + @Override public void consume(MetricsBuffer buffer) { long ts = 0; diff --git a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md index dbcf0d8c6f..6953c3bbc9 100644 --- a/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md +++ b/hadoop-common-project/hadoop-common/src/site/markdown/Metrics.md @@ -434,7 +434,7 @@ MetricsSystem shows the statistics for metrics snapshots and publishes. Each met | `Sink_`*instance*`NumOps` | Total number of sink operations for the *instance* | | `Sink_`*instance*`AvgTime` | Average time in milliseconds of sink operations for the *instance* | | `Sink_`*instance*`Dropped` | Total number of dropped sink operations for the *instance* | -| `Sink_`*instance*`Qsize` | Current queue length of sink operations  (BUT always set to 0 because nothing to increment this metrics, see [HADOOP-9941](https://issues.apache.org/jira/browse/HADOOP-9941)) | +| `Sink_`*instance*`Qsize` | Current queue length of sink operations | default context =============== diff --git a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java index 4c2ebc8e0e..0f7b15f2ef 100644 --- a/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java +++ b/hadoop-common-project/hadoop-common/src/test/java/org/apache/hadoop/metrics2/impl/TestMetricsSystemImpl.java @@ -29,7 +29,9 @@ import org.mockito.ArgumentCaptor; import org.mockito.Captor; +import org.mockito.invocation.InvocationOnMock; import org.mockito.runners.MockitoJUnitRunner; +import org.mockito.stubbing.Answer; import static org.junit.Assert.*; import static org.mockito.Mockito.*; @@ -434,6 +436,54 @@ private void checkMetricsRecords(List recs) { new MetricGaugeInt(MsInfo.NumActiveSinks, 3))); } + @Test + public void testQSize() throws Exception { + new ConfigBuilder().add("*.period", 8) + .add("test.sink.test.class", TestSink.class.getName()) + .save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test")); + MetricsSystemImpl ms = new MetricsSystemImpl("Test"); + final CountDownLatch proceedSignal = new CountDownLatch(1); + final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1); + ms.start(); + try { + MetricsSink slowSink = mock(MetricsSink.class); + MetricsSink dataSink = mock(MetricsSink.class); + ms.registerSink("slowSink", + "The sink that will wait on putMetric", slowSink); + ms.registerSink("dataSink", + "The sink I'll use to get info about slowSink", dataSink); + doAnswer(new Answer() { + @Override + public Object answer(InvocationOnMock invocation) throws Throwable { + reachedPutMetricSignal.countDown(); + proceedSignal.await(); + return null; + } + }).when(slowSink).putMetrics(any(MetricsRecord.class)); + + // trigger metric collection first time + ms.onTimerEvent(); + assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS)); + // Now that the slow sink is still processing the first metric, + // its queue length should be 1 for the second collection. + ms.onTimerEvent(); + verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture()); + List mr = r1.getAllValues(); + Number qSize = Iterables.find(mr.get(1).metrics(), + new Predicate() { + @Override + public boolean apply(@Nullable AbstractMetric input) { + assert input != null; + return input.name().equals("Sink_slowSinkQsize"); + } + }).value(); + assertEquals(1, qSize); + } finally { + proceedSignal.countDown(); + ms.stop(); + } + } + @Metrics(context="test") private static class TestSource { @Metric("C1 desc") MutableCounterLong c1;