HDDS-224. Create metrics for Event Watcher.
Contributed by Elek, Marton.
This commit is contained in:
parent
895845e9b0
commit
e12d93bfc1
@ -39,6 +39,11 @@ http://maven.apache.org/xsd/maven-4.0.0.xsd">
|
||||
<artifactId>hadoop-hdds-common</artifactId>
|
||||
<scope>provided</scope>
|
||||
</dependency>
|
||||
<dependency>
|
||||
<groupId>org.mockito</groupId>
|
||||
<artifactId>mockito-all</artifactId>
|
||||
<scope>test</scope>
|
||||
</dependency>
|
||||
</dependencies>
|
||||
|
||||
<build>
|
||||
|
@ -26,12 +26,17 @@
|
||||
import java.util.function.Predicate;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.ozone.lease.Lease;
|
||||
import org.apache.hadoop.ozone.lease.LeaseAlreadyExistException;
|
||||
import org.apache.hadoop.ozone.lease.LeaseExpiredException;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.ozone.lease.LeaseNotFoundException;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
import com.google.common.base.Preconditions;
|
||||
import org.apache.commons.collections.map.HashedMap;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@ -58,18 +63,39 @@ public abstract class EventWatcher<TIMEOUT_PAYLOAD extends
|
||||
|
||||
private final LeaseManager<UUID> leaseManager;
|
||||
|
||||
private final EventWatcherMetrics metrics;
|
||||
|
||||
private final String name;
|
||||
|
||||
protected final Map<UUID, TIMEOUT_PAYLOAD> trackedEventsByUUID =
|
||||
new ConcurrentHashMap<>();
|
||||
|
||||
protected final Set<TIMEOUT_PAYLOAD> trackedEvents = new HashSet<>();
|
||||
|
||||
public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
private final Map<UUID, Long> startTrackingTimes = new HashedMap();
|
||||
|
||||
public EventWatcher(String name, Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
Event<COMPLETION_PAYLOAD> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
this.startEvent = startEvent;
|
||||
this.completionEvent = completionEvent;
|
||||
this.leaseManager = leaseManager;
|
||||
this.metrics = new EventWatcherMetrics();
|
||||
Preconditions.checkNotNull(name);
|
||||
if (name.equals("")) {
|
||||
name = getClass().getSimpleName();
|
||||
}
|
||||
if (name.equals("")) {
|
||||
//for anonymous inner classes
|
||||
name = getClass().getName();
|
||||
}
|
||||
this.name = name;
|
||||
}
|
||||
|
||||
public EventWatcher(Event<TIMEOUT_PAYLOAD> startEvent,
|
||||
Event<COMPLETION_PAYLOAD> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
this("", startEvent, completionEvent, leaseManager);
|
||||
}
|
||||
|
||||
public void start(EventQueue queue) {
|
||||
@ -87,11 +113,16 @@ public void start(EventQueue queue) {
|
||||
}
|
||||
});
|
||||
|
||||
MetricsSystem ms = DefaultMetricsSystem.instance();
|
||||
ms.register(name, "EventWatcher metrics", metrics);
|
||||
}
|
||||
|
||||
private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
|
||||
EventPublisher publisher) {
|
||||
metrics.incrementTrackedEvents();
|
||||
UUID identifier = payload.getUUID();
|
||||
startTrackingTimes.put(identifier, System.currentTimeMillis());
|
||||
|
||||
trackedEventsByUUID.put(identifier, payload);
|
||||
trackedEvents.add(payload);
|
||||
try {
|
||||
@ -112,16 +143,21 @@ private synchronized void handleStartEvent(TIMEOUT_PAYLOAD payload,
|
||||
|
||||
private synchronized void handleCompletion(UUID uuid,
|
||||
EventPublisher publisher) throws LeaseNotFoundException {
|
||||
metrics.incrementCompletedEvents();
|
||||
leaseManager.release(uuid);
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(uuid);
|
||||
trackedEvents.remove(payload);
|
||||
long originalTime = startTrackingTimes.remove(uuid);
|
||||
metrics.updateFinishingTime(System.currentTimeMillis() - originalTime);
|
||||
onFinished(publisher, payload);
|
||||
}
|
||||
|
||||
private synchronized void handleTimeout(EventPublisher publisher,
|
||||
UUID identifier) {
|
||||
metrics.incrementTimedOutEvents();
|
||||
TIMEOUT_PAYLOAD payload = trackedEventsByUUID.remove(identifier);
|
||||
trackedEvents.remove(payload);
|
||||
startTrackingTimes.remove(payload.getUUID());
|
||||
onTimeout(publisher, payload);
|
||||
}
|
||||
|
||||
@ -154,4 +190,9 @@ public List<TIMEOUT_PAYLOAD> getTimeoutEvents(
|
||||
return trackedEventsByUUID.values().stream().filter(predicate)
|
||||
.collect(Collectors.toList());
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
protected EventWatcherMetrics getMetrics() {
|
||||
return metrics;
|
||||
}
|
||||
}
|
||||
|
@ -0,0 +1,79 @@
|
||||
/**
|
||||
* Licensed to the Apache Software Foundation (ASF) under one
|
||||
* or more contributor license agreements. See the NOTICE file
|
||||
* distributed with this work for additional information
|
||||
* regarding copyright ownership. The ASF licenses this file
|
||||
* to you under the Apache License, Version 2.0 (the
|
||||
* "License"); you may not use this file except in compliance
|
||||
* with the License. You may obtain a copy of the License at
|
||||
* <p>
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* <p>
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.apache.hadoop.hdds.server.events;
|
||||
|
||||
import org.apache.hadoop.metrics2.annotation.Metric;
|
||||
import org.apache.hadoop.metrics2.lib.MutableCounterLong;
|
||||
import org.apache.hadoop.metrics2.lib.MutableRate;
|
||||
|
||||
import com.google.common.annotations.VisibleForTesting;
|
||||
|
||||
/**
|
||||
* Metrics for any event watcher.
|
||||
*/
|
||||
public class EventWatcherMetrics {
|
||||
|
||||
@Metric()
|
||||
private MutableCounterLong trackedEvents;
|
||||
|
||||
@Metric()
|
||||
private MutableCounterLong timedOutEvents;
|
||||
|
||||
@Metric()
|
||||
private MutableCounterLong completedEvents;
|
||||
|
||||
@Metric()
|
||||
private MutableRate completionTime;
|
||||
|
||||
public void incrementTrackedEvents() {
|
||||
trackedEvents.incr();
|
||||
}
|
||||
|
||||
public void incrementTimedOutEvents() {
|
||||
timedOutEvents.incr();
|
||||
}
|
||||
|
||||
public void incrementCompletedEvents() {
|
||||
completedEvents.incr();
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public void updateFinishingTime(long duration) {
|
||||
completionTime.add(duration);
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public MutableCounterLong getTrackedEvents() {
|
||||
return trackedEvents;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public MutableCounterLong getTimedOutEvents() {
|
||||
return timedOutEvents;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public MutableCounterLong getCompletedEvents() {
|
||||
return completedEvents;
|
||||
}
|
||||
|
||||
@VisibleForTesting
|
||||
public MutableRate getCompletionTime() {
|
||||
return completionTime;
|
||||
}
|
||||
}
|
@ -21,8 +21,13 @@
|
||||
import java.util.Objects;
|
||||
import java.util.UUID;
|
||||
|
||||
import org.apache.hadoop.metrics2.MetricsSource;
|
||||
import org.apache.hadoop.metrics2.MetricsSystem;
|
||||
import org.apache.hadoop.metrics2.lib.DefaultMetricsSystem;
|
||||
import org.apache.hadoop.ozone.lease.LeaseManager;
|
||||
import org.apache.hadoop.test.MetricsAsserts;
|
||||
|
||||
import static org.apache.hadoop.test.MetricsAsserts.assertCounter;
|
||||
import org.junit.After;
|
||||
import org.junit.Assert;
|
||||
import org.junit.Before;
|
||||
@ -46,6 +51,7 @@ public class TestEventWatcher {
|
||||
|
||||
@Before
|
||||
public void startLeaseManager() {
|
||||
DefaultMetricsSystem.instance();
|
||||
leaseManager = new LeaseManager<>(2000l);
|
||||
leaseManager.start();
|
||||
}
|
||||
@ -53,12 +59,12 @@ public void startLeaseManager() {
|
||||
@After
|
||||
public void stopLeaseManager() {
|
||||
leaseManager.shutdown();
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
public void testEventHandling() throws InterruptedException {
|
||||
|
||||
EventQueue queue = new EventQueue();
|
||||
|
||||
EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
|
||||
@ -139,26 +145,101 @@ public void testInprogressFilter() throws InterruptedException {
|
||||
Assert.assertEquals(0, c1todo.size());
|
||||
Assert.assertFalse(replicationWatcher.contains(event1));
|
||||
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testMetrics() throws InterruptedException {
|
||||
|
||||
DefaultMetricsSystem.initialize("test");
|
||||
|
||||
EventQueue queue = new EventQueue();
|
||||
|
||||
EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
|
||||
replicationWatcher = createEventWatcher();
|
||||
|
||||
EventHandlerStub<UnderreplicatedEvent> underReplicatedEvents =
|
||||
new EventHandlerStub<>();
|
||||
|
||||
queue.addHandler(UNDER_REPLICATED, underReplicatedEvents);
|
||||
|
||||
replicationWatcher.start(queue);
|
||||
|
||||
//send 3 event to track 3 in-progress activity
|
||||
UnderreplicatedEvent event1 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1");
|
||||
|
||||
UnderreplicatedEvent event2 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C2");
|
||||
|
||||
UnderreplicatedEvent event3 =
|
||||
new UnderreplicatedEvent(UUID.randomUUID(), "C1");
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED, event1);
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED, event2);
|
||||
|
||||
queue.fireEvent(WATCH_UNDER_REPLICATED, event3);
|
||||
|
||||
//1st event is completed, don't need to track any more
|
||||
ReplicationCompletedEvent event1Completed =
|
||||
new ReplicationCompletedEvent(event1.UUID, "C1", "D1");
|
||||
|
||||
queue.fireEvent(REPLICATION_COMPLETED, event1Completed);
|
||||
|
||||
|
||||
Thread.sleep(2200l);
|
||||
|
||||
//until now: 3 in-progress activities are tracked with three
|
||||
// UnderreplicatedEvents. The first one is completed, the remaining two
|
||||
// are timed out (as the timeout -- defined in the leasmanager -- is 2000ms.
|
||||
|
||||
EventWatcherMetrics metrics = replicationWatcher.getMetrics();
|
||||
|
||||
//3 events are received
|
||||
Assert.assertEquals(3, metrics.getTrackedEvents().value());
|
||||
|
||||
//one is finished. doesn't need to be resent
|
||||
Assert.assertEquals(1, metrics.getCompletedEvents().value());
|
||||
|
||||
//Other two are timed out and resent
|
||||
Assert.assertEquals(2, metrics.getTimedOutEvents().value());
|
||||
|
||||
DefaultMetricsSystem.shutdown();
|
||||
}
|
||||
|
||||
private EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>
|
||||
createEventWatcher() {
|
||||
return new EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent>(
|
||||
WATCH_UNDER_REPLICATED, REPLICATION_COMPLETED, leaseManager) {
|
||||
|
||||
@Override
|
||||
void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
|
||||
publisher.fireEvent(UNDER_REPLICATED, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
|
||||
//Good job. We did it.
|
||||
}
|
||||
};
|
||||
return new CommandWatcherExample(WATCH_UNDER_REPLICATED,
|
||||
REPLICATION_COMPLETED, leaseManager);
|
||||
}
|
||||
|
||||
private class CommandWatcherExample
|
||||
extends EventWatcher<UnderreplicatedEvent, ReplicationCompletedEvent> {
|
||||
|
||||
public CommandWatcherExample(Event<UnderreplicatedEvent> startEvent,
|
||||
Event<ReplicationCompletedEvent> completionEvent,
|
||||
LeaseManager<UUID> leaseManager) {
|
||||
super("TestCommandWatcher", startEvent, completionEvent, leaseManager);
|
||||
}
|
||||
|
||||
@Override
|
||||
void onTimeout(EventPublisher publisher, UnderreplicatedEvent payload) {
|
||||
publisher.fireEvent(UNDER_REPLICATED, payload);
|
||||
}
|
||||
|
||||
@Override
|
||||
void onFinished(EventPublisher publisher, UnderreplicatedEvent payload) {
|
||||
//Good job. We did it.
|
||||
}
|
||||
|
||||
@Override
|
||||
public EventWatcherMetrics getMetrics() {
|
||||
return super.getMetrics();
|
||||
}
|
||||
}
|
||||
|
||||
;
|
||||
|
||||
private static class ReplicationCompletedEvent
|
||||
implements IdentifiableEventPayload {
|
||||
|
||||
@ -217,4 +298,4 @@ public UUID getUUID() {
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
}
|
||||
|
Loading…
Reference in New Issue
Block a user