YARN-2264. Fixed a race condition in DrainDispatcher which may cause random test failures. Contributed by Li Lu

git-svn-id: https://svn.apache.org/repos/asf/hadoop/common/trunk@1611126 13f79535-47bb-0310-9956-ffa450edef68
This commit is contained in:
Jian He 2014-07-16 18:25:05 +00:00
parent 43fe48d9e2
commit 40e1bb9d31
2 changed files with 13 additions and 4 deletions

View File

@ -59,6 +59,9 @@ Release 2.6.0 - UNRELEASED
YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers YARN-2260. Fixed ResourceManager's RMNode to correctly remember containers
when nodes resync during work-preserving RM restart. (Jian He via vinodkv) when nodes resync during work-preserving RM restart. (Jian He via vinodkv)
YARN-2264. Fixed a race condition in DrainDispatcher which may cause random
test failures. (Li Lu via jianhe)
Release 2.5.0 - UNRELEASED Release 2.5.0 - UNRELEASED
INCOMPATIBLE CHANGES INCOMPATIBLE CHANGES

View File

@ -28,6 +28,7 @@ public class DrainDispatcher extends AsyncDispatcher {
// and similar grotesqueries // and similar grotesqueries
private volatile boolean drained = false; private volatile boolean drained = false;
private final BlockingQueue<Event> queue; private final BlockingQueue<Event> queue;
final Object mutex;
public DrainDispatcher() { public DrainDispatcher() {
this(new LinkedBlockingQueue<Event>()); this(new LinkedBlockingQueue<Event>());
@ -36,6 +37,7 @@ public DrainDispatcher() {
private DrainDispatcher(BlockingQueue<Event> eventQueue) { private DrainDispatcher(BlockingQueue<Event> eventQueue) {
super(eventQueue); super(eventQueue);
this.queue = eventQueue; this.queue = eventQueue;
this.mutex = this;
} }
/** /**
@ -53,8 +55,10 @@ Runnable createThread() {
@Override @Override
public void run() { public void run() {
while (!Thread.currentThread().isInterrupted()) { while (!Thread.currentThread().isInterrupted()) {
// !drained if dispatch queued new events on this dispatcher synchronized (mutex) {
drained = queue.isEmpty(); // !drained if dispatch queued new events on this dispatcher
drained = queue.isEmpty();
}
Event event; Event event;
try { try {
event = queue.take(); event = queue.take();
@ -75,8 +79,10 @@ public EventHandler getEventHandler() {
return new EventHandler() { return new EventHandler() {
@Override @Override
public void handle(Event event) { public void handle(Event event) {
drained = false; synchronized (mutex) {
actual.handle(event); actual.handle(event);
drained = false;
}
} }
}; };
} }