YARN-2991. Fixed DrainDispatcher to reuse the draining code path in AsyncDispatcher. Contributed by Rohith Sharmaks.
This commit is contained in:
parent
0c45946e65
commit
947578c1c1
@ -305,6 +305,9 @@ Release 2.7.0 - UNRELEASED
|
|||||||
YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and
|
YARN-2987. Fixed ClientRMService#getQueueInfo to check against queue and
|
||||||
app ACLs. (Varun Saxena via jianhe)
|
app ACLs. (Varun Saxena via jianhe)
|
||||||
|
|
||||||
|
YARN-2991. Fixed DrainDispatcher to reuse the draining code path in
|
||||||
|
AsyncDispatcher. (Rohith Sharmaks via zjshen)
|
||||||
|
|
||||||
Release 2.6.0 - 2014-11-18
|
Release 2.6.0 - 2014-11-18
|
||||||
|
|
||||||
INCOMPATIBLE CHANGES
|
INCOMPATIBLE CHANGES
|
||||||
|
@ -34,6 +34,8 @@ import org.apache.hadoop.service.AbstractService;
|
|||||||
import org.apache.hadoop.util.ShutdownHookManager;
|
import org.apache.hadoop.util.ShutdownHookManager;
|
||||||
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
import org.apache.hadoop.yarn.exceptions.YarnRuntimeException;
|
||||||
|
|
||||||
|
import com.google.common.annotations.VisibleForTesting;
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Dispatches {@link Event}s in a separate thread. Currently only single thread
|
* Dispatches {@link Event}s in a separate thread. Currently only single thread
|
||||||
* does that. Potentially there could be multiple channels for each event type
|
* does that. Potentially there could be multiple channels for each event type
|
||||||
@ -282,4 +284,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|||||||
}
|
}
|
||||||
};
|
};
|
||||||
}
|
}
|
||||||
|
|
||||||
|
@VisibleForTesting
|
||||||
|
protected boolean isDrained() {
|
||||||
|
return this.drained;
|
||||||
|
}
|
||||||
}
|
}
|
||||||
|
@ -23,68 +23,20 @@ import java.util.concurrent.LinkedBlockingQueue;
|
|||||||
@SuppressWarnings("rawtypes")
|
@SuppressWarnings("rawtypes")
|
||||||
public class DrainDispatcher extends AsyncDispatcher {
|
public class DrainDispatcher extends AsyncDispatcher {
|
||||||
|
|
||||||
// flagrant initialize abuse throughout, but safe per
|
|
||||||
// http://java.sun.com/docs/books/jls/third_edition/html/typesValues.html#96595
|
|
||||||
// and similar grotesqueries
|
|
||||||
private volatile boolean drained = false;
|
|
||||||
private final BlockingQueue<Event> queue;
|
|
||||||
final Object mutex;
|
|
||||||
|
|
||||||
public DrainDispatcher() {
|
public DrainDispatcher() {
|
||||||
this(new LinkedBlockingQueue<Event>());
|
this(new LinkedBlockingQueue<Event>());
|
||||||
}
|
}
|
||||||
|
|
||||||
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
|
private DrainDispatcher(BlockingQueue<Event> eventQueue) {
|
||||||
super(eventQueue);
|
super(eventQueue);
|
||||||
this.queue = eventQueue;
|
|
||||||
this.mutex = this;
|
|
||||||
}
|
}
|
||||||
|
|
||||||
/**
|
/**
|
||||||
* Busy loop waiting for all queued events to drain.
|
* Busy loop waiting for all queued events to drain.
|
||||||
*/
|
*/
|
||||||
public void await() {
|
public void await() {
|
||||||
while (!drained) {
|
while (!isDrained()) {
|
||||||
Thread.yield();
|
Thread.yield();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@Override
|
|
||||||
Runnable createThread() {
|
|
||||||
return new Runnable() {
|
|
||||||
@Override
|
|
||||||
public void run() {
|
|
||||||
while (!Thread.currentThread().isInterrupted()) {
|
|
||||||
synchronized (mutex) {
|
|
||||||
// !drained if dispatch queued new events on this dispatcher
|
|
||||||
drained = queue.isEmpty();
|
|
||||||
}
|
|
||||||
Event event;
|
|
||||||
try {
|
|
||||||
event = queue.take();
|
|
||||||
} catch(InterruptedException ie) {
|
|
||||||
return;
|
|
||||||
}
|
|
||||||
if (event != null) {
|
|
||||||
dispatch(event);
|
|
||||||
}
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
@Override
|
|
||||||
public EventHandler getEventHandler() {
|
|
||||||
final EventHandler actual = super.getEventHandler();
|
|
||||||
return new EventHandler() {
|
|
||||||
@Override
|
|
||||||
public void handle(Event event) {
|
|
||||||
synchronized (mutex) {
|
|
||||||
actual.handle(event);
|
|
||||||
drained = false;
|
|
||||||
}
|
|
||||||
}
|
|
||||||
};
|
|
||||||
}
|
|
||||||
|
|
||||||
}
|
}
|
||||||
|
Loading…
x
Reference in New Issue
Block a user