From 40e1bb9d316bce5cd720d3c091d651b3586ddb94 Mon Sep 17 00:00:00 2001 From: Jian He Date: Wed, 16 Jul 2014 18:25:05 +0000 Subject: [PATCH] YARN-2264. Fixed a race condition in DrainDispatcher which may cause random test failures. Contributed by Li Lu git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611126 13f79535-47bb-0310-9956-ffa450edef68 --- hadoop-yarn-project/CHANGES.txt | 3 +++ .../apache/hadoop/yarn/event/DrainDispatcher.java | 14 ++++++++++---- 2 files changed, 13 insertions(+), 4 deletions(-) diff --git a/hadoop-yarn-project/CHANGES.txt b/hadoop-yarn-project/CHANGES.txt index 41fd5bfd5a..6f8567e205 100644 --- a/hadoop-yarn-project/CHANGES.txt +++ b/hadoop-yarn-project/CHANGES.txt @@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers when nodes resync during work-preserving RM restart. (Jian He via vinodkv) + YARN-2264. Fixed a race condition in DrainDispatcher which may cause random + test failures. (Li Lu via jianhe) + Release 2.5.0 - UNRELEASED INCOMPATIBLE CHANGES diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java index e79e7b360e..803b2bb2b3 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/DrainDispatcher.java @@ -28,6 +28,7 @@ public class DrainDispatcher extends AsyncDispatcher { // and similar grotesqueries private volatile boolean drained = false; private final BlockingQueue queue; + final Object mutex; public DrainDispatcher() { this(new LinkedBlockingQueue()); @@ -36,6 +37,7 @@ public DrainDispatcher() { private DrainDispatcher(BlockingQueue eventQueue) { super(eventQueue); this.queue = eventQueue; + this.mutex = this; } /** @@ -53,8 +55,10 @@ Runnable createThread() { @Override public void run() { while (!Thread.currentThread().isInterrupted()) { - // !drained if dispatch queued new events on this dispatcher - drained = queue.isEmpty(); + synchronized (mutex) { + // !drained if dispatch queued new events on this dispatcher + drained = queue.isEmpty(); + } Event event; try { event = queue.take(); @@ -75,8 +79,10 @@ public EventHandler getEventHandler() { return new EventHandler() { @Override public void handle(Event event) { - drained = false; - actual.handle(event); + synchronized (mutex) { + actual.handle(event); + drained = false; + } } }; }