HDDS-513. Check if the EventQueue is not closed before executing handlers. Contributed by Nanda Kumar.

This commit is contained in:
Bharat Viswanadham 2018-09-19 14:35:29 -07:00
parent 2eb597b151
commit f6bb1ca3c1

View File

@ -55,6 +55,8 @@ public class EventQueue implements EventPublisher, AutoCloseable {
private final AtomicLong eventCount = new AtomicLong(0);
private boolean isRunning = true;
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventHandler<PAYLOAD> handler) {
this.addHandler(event, handler, generateHandlerName(handler));
@ -116,6 +118,10 @@ private <PAYLOAD> String generateHandlerName(EventHandler<PAYLOAD> handler) {
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
EVENT_TYPE event, EventExecutor<PAYLOAD> executor,
EventHandler<PAYLOAD> handler) {
if (!isRunning) {
LOG.warn("Not adding handler for {}, EventQueue is not running", event);
return;
}
validateEvent(event);
executors.putIfAbsent(event, new HashMap<>());
executors.get(event).putIfAbsent(executor, new ArrayList<>());
@ -136,6 +142,11 @@ public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void addHandler(
public <PAYLOAD, EVENT_TYPE extends Event<PAYLOAD>> void fireEvent(
EVENT_TYPE event, PAYLOAD payload) {
if (!isRunning) {
LOG.warn("Processing of {} is skipped, EventQueue is not running", event);
return;
}
Map<EventExecutor, List<EventHandler>> eventExecutorListMap =
this.executors.get(event);
@ -187,6 +198,11 @@ public void processAll(long timeout) {
long currentTime = Time.now();
while (true) {
if (!isRunning) {
LOG.warn("Processing of event skipped. EventQueue is not running");
return;
}
long processed = 0;
Stream<EventExecutor> allExecutor = this.executors.values().stream()
@ -216,6 +232,8 @@ public void processAll(long timeout) {
public void close() {
isRunning = false;
Set<EventExecutor> allExecutors = this.executors.values().stream()
.flatMap(handlerMap -> handlerMap.keySet().stream())
.collect(Collectors.toSet());