diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index bf5058a9d1..733f0eaabc 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -56,6 +56,7 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { // Indicates all the remaining dispatcher's events on stop have been drained // and processed. private volatile boolean drained = true; + private Object waitForDrained = new Object(); // For drainEventsOnStop enabled only, block newly coming events into the // queue while stopping. @@ -82,6 +83,16 @@ Runnable createThread() { public void run() { while (!stopped && !Thread.currentThread().isInterrupted()) { drained = eventQueue.isEmpty(); + // blockNewEvents is only set when dispatcher is draining to stop, + // adding this check is to avoid the overhead of acquiring the lock + // and calling notify every time in the normal run of the loop. + if (blockNewEvents) { + synchronized (waitForDrained) { + if (drained) { + waitForDrained.notify(); + } + } + } Event event; try { event = eventQueue.take(); @@ -125,8 +136,11 @@ protected void serviceStop() throws Exception { if (drainEventsOnStop) { blockNewEvents = true; LOG.info("AsyncDispatcher is draining to stop, igonring any new events."); - while(!drained) { - Thread.yield(); + synchronized (waitForDrained) { + while (!drained && eventHandlingThread.isAlive()) { + waitForDrained.wait(1000); + LOG.info("Waiting for AsyncDispatcher to drain."); + } } } stopped = true;