diff --git a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java index b2b0df2a2a..9aeab7bfc3 100644 --- a/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java +++ b/hadoop-hdds/framework/src/main/java/org/apache/hadoop/hdds/server/events/EventQueue.java @@ -55,6 +55,8 @@ public class EventQueue implements EventPublisher, AutoCloseable { private final AtomicLong eventCount = new AtomicLong(0); + private boolean isRunning = true; + public > void addHandler( EVENT_TYPE event, EventHandler handler) { this.addHandler(event, handler, generateHandlerName(handler)); @@ -116,6 +118,10 @@ private String generateHandlerName(EventHandler handler) { public > void addHandler( EVENT_TYPE event, EventExecutor executor, EventHandler handler) { + if (!isRunning) { + LOG.warn("Not adding handler for {}, EventQueue is not running", event); + return; + } validateEvent(event); executors.putIfAbsent(event, new HashMap<>()); executors.get(event).putIfAbsent(executor, new ArrayList<>()); @@ -136,6 +142,11 @@ public > void addHandler( public > void fireEvent( EVENT_TYPE event, PAYLOAD payload) { + if (!isRunning) { + LOG.warn("Processing of {} is skipped, EventQueue is not running", event); + return; + } + Map> eventExecutorListMap = this.executors.get(event); @@ -187,6 +198,11 @@ public void processAll(long timeout) { long currentTime = Time.now(); while (true) { + if (!isRunning) { + LOG.warn("Processing of event skipped. EventQueue is not running"); + return; + } + long processed = 0; Stream allExecutor = this.executors.values().stream() @@ -216,6 +232,8 @@ public void processAll(long timeout) { public void close() { + isRunning = false; + Set allExecutors = this.executors.values().stream() .flatMap(handlerMap -> handlerMap.keySet().stream()) .collect(Collectors.toSet());