HADOOP-9087. Queue size metric for metric sinks isn't actually maintained. Contributed by Akira AJISAKA
This commit is contained in:
parent
2fd02afeca
commit
f0f2992686
@ -973,6 +973,9 @@ Release 2.7.0 - UNRELEASED
|
||||
HADOOP-11595. Add default implementation for AbstractFileSystem#truncate.
|
||||
(yliu)
|
||||
|
||||
HADOOP-9087. Queue size metric for metric sinks isn't actually maintained
|
||||
(Akira AJISAKA via jlowe)
|
||||
|
||||
Release 2.6.1 - UNRELEASED
|
||||
|
||||
INCOMPATIBLE CHANGES
|
||||
|
@ -95,7 +95,10 @@ class MetricsSinkAdapter implements SinkQueue.Consumer<MetricsBuffer> {
|
||||
boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
|
||||
if (logicalTime % period == 0) {
|
||||
LOG.debug("enqueue, logicalTime="+ logicalTime);
|
||||
if (queue.enqueue(buffer)) return true;
|
||||
if (queue.enqueue(buffer)) {
|
||||
refreshQueueSizeGauge();
|
||||
return true;
|
||||
}
|
||||
dropped.incr();
|
||||
return false;
|
||||
}
|
||||
@ -105,7 +108,9 @@ boolean putMetrics(MetricsBuffer buffer, long logicalTime) {
|
||||
public boolean putMetricsImmediate(MetricsBuffer buffer) {
|
||||
WaitableMetricsBuffer waitableBuffer =
|
||||
new WaitableMetricsBuffer(buffer);
|
||||
if (!queue.enqueue(waitableBuffer)) {
|
||||
if (queue.enqueue(waitableBuffer)) {
|
||||
refreshQueueSizeGauge();
|
||||
} else {
|
||||
LOG.warn(name + " has a full queue and can't consume the given metrics.");
|
||||
dropped.incr();
|
||||
return false;
|
||||
@ -127,6 +132,7 @@ void publishMetricsFromQueue() {
|
||||
while (!stopping) {
|
||||
try {
|
||||
queue.consumeAll(this);
|
||||
refreshQueueSizeGauge();
|
||||
retryDelay = firstRetryDelay;
|
||||
n = retryCount;
|
||||
inError = false;
|
||||
@ -151,12 +157,17 @@ void publishMetricsFromQueue() {
|
||||
"suppressing further error messages", e);
|
||||
}
|
||||
queue.clear();
|
||||
refreshQueueSizeGauge();
|
||||
inError = true; // Don't keep complaining ad infinitum
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void refreshQueueSizeGauge() {
|
||||
qsize.set(queue.size());
|
||||
}
|
||||
|
||||
@Override
|
||||
public void consume(MetricsBuffer buffer) {
|
||||
long ts = 0;
|
||||
|
@ -434,7 +434,7 @@ MetricsSystem shows the statistics for metrics snapshots and publishes. Each met
|
||||
| `Sink_`*instance*`NumOps` | Total number of sink operations for the *instance* |
|
||||
| `Sink_`*instance*`AvgTime` | Average time in milliseconds of sink operations for the *instance* |
|
||||
| `Sink_`*instance*`Dropped` | Total number of dropped sink operations for the *instance* |
|
||||
| `Sink_`*instance*`Qsize` | Current queue length of sink operations (BUT always set to 0 because nothing to increment this metrics, see [HADOOP-9941](https://issues.apache.org/jira/browse/HADOOP-9941)) |
|
||||
| `Sink_`*instance*`Qsize` | Current queue length of sink operations |
|
||||
|
||||
default context
|
||||
===============
|
||||
|
@ -29,7 +29,9 @@
|
||||
|
||||
import org.mockito.ArgumentCaptor;
|
||||
import org.mockito.Captor;
|
||||
import org.mockito.invocation.InvocationOnMock;
|
||||
import org.mockito.runners.MockitoJUnitRunner;
|
||||
import org.mockito.stubbing.Answer;
|
||||
|
||||
import static org.junit.Assert.*;
|
||||
import static org.mockito.Mockito.*;
|
||||
@ -434,6 +436,54 @@ private void checkMetricsRecords(List<MetricsRecord> recs) {
|
||||
new MetricGaugeInt(MsInfo.NumActiveSinks, 3)));
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testQSize() throws Exception {
|
||||
new ConfigBuilder().add("*.period", 8)
|
||||
.add("test.sink.test.class", TestSink.class.getName())
|
||||
.save(TestMetricsConfig.getTestFilename("hadoop-metrics2-test"));
|
||||
MetricsSystemImpl ms = new MetricsSystemImpl("Test");
|
||||
final CountDownLatch proceedSignal = new CountDownLatch(1);
|
||||
final CountDownLatch reachedPutMetricSignal = new CountDownLatch(1);
|
||||
ms.start();
|
||||
try {
|
||||
MetricsSink slowSink = mock(MetricsSink.class);
|
||||
MetricsSink dataSink = mock(MetricsSink.class);
|
||||
ms.registerSink("slowSink",
|
||||
"The sink that will wait on putMetric", slowSink);
|
||||
ms.registerSink("dataSink",
|
||||
"The sink I'll use to get info about slowSink", dataSink);
|
||||
doAnswer(new Answer() {
|
||||
@Override
|
||||
public Object answer(InvocationOnMock invocation) throws Throwable {
|
||||
reachedPutMetricSignal.countDown();
|
||||
proceedSignal.await();
|
||||
return null;
|
||||
}
|
||||
}).when(slowSink).putMetrics(any(MetricsRecord.class));
|
||||
|
||||
// trigger metric collection first time
|
||||
ms.onTimerEvent();
|
||||
assertTrue(reachedPutMetricSignal.await(1, TimeUnit.SECONDS));
|
||||
// Now that the slow sink is still processing the first metric,
|
||||
// its queue length should be 1 for the second collection.
|
||||
ms.onTimerEvent();
|
||||
verify(dataSink, timeout(500).times(2)).putMetrics(r1.capture());
|
||||
List<MetricsRecord> mr = r1.getAllValues();
|
||||
Number qSize = Iterables.find(mr.get(1).metrics(),
|
||||
new Predicate<AbstractMetric>() {
|
||||
@Override
|
||||
public boolean apply(@Nullable AbstractMetric input) {
|
||||
assert input != null;
|
||||
return input.name().equals("Sink_slowSinkQsize");
|
||||
}
|
||||
}).value();
|
||||
assertEquals(1, qSize);
|
||||
} finally {
|
||||
proceedSignal.countDown();
|
||||
ms.stop();
|
||||
}
|
||||
}
|
||||
|
||||
@Metrics(context="test")
|
||||
private static class TestSource {
|
||||
@Metric("C1 desc") MutableCounterLong c1;
|
||||
|
Loading…
Reference in New Issue
Block a user