YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu.
This commit is contained in:
parent
e19c00925f
commit
e7cad3811f
@ -20,11 +20,11 @@
|
|||||||
|
|
||||||
import java.util.ArrayList;
|
import java.util.ArrayList;
|
||||||
import java.util.HashMap;
|
import java.util.HashMap;
|
||||||
|
import java.util.Iterator;
|
||||||
import java.util.List;
|
import java.util.List;
|
||||||
import java.util.Map;
|
import java.util.Map;
|
||||||
import java.util.concurrent.BlockingQueue;
|
import java.util.concurrent.BlockingQueue;
|
||||||
import java.util.concurrent.LinkedBlockingQueue;
|
import java.util.concurrent.LinkedBlockingQueue;
|
||||||
import java.util.stream.Collectors;
|
|
||||||
|
|
||||||
import org.slf4j.Logger;
|
import org.slf4j.Logger;
|
||||||
import org.slf4j.LoggerFactory;
|
import org.slf4j.LoggerFactory;
|
||||||
@ -268,11 +268,16 @@ public EventHandler<Event> getEventHandler() {
|
|||||||
}
|
}
|
||||||
|
|
||||||
class GenericEventHandler implements EventHandler<Event> {
|
class GenericEventHandler implements EventHandler<Event> {
|
||||||
private void printEventQueueDetails(BlockingQueue<Event> queue) {
|
private void printEventQueueDetails() {
|
||||||
Map<Enum, Long> counterMap = eventQueue.stream().
|
Iterator<Event> iterator = eventQueue.iterator();
|
||||||
collect(Collectors.
|
Map<Enum, Long> counterMap = new HashMap<>();
|
||||||
groupingBy(e -> e.getType(), Collectors.counting())
|
while (iterator.hasNext()) {
|
||||||
);
|
Enum eventType = iterator.next().getType();
|
||||||
|
if (!counterMap.containsKey(eventType)) {
|
||||||
|
counterMap.put(eventType, 0L);
|
||||||
|
}
|
||||||
|
counterMap.put(eventType, counterMap.get(eventType) + 1);
|
||||||
|
}
|
||||||
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
|
for (Map.Entry<Enum, Long> entry : counterMap.entrySet()) {
|
||||||
long num = entry.getValue();
|
long num = entry.getValue();
|
||||||
LOG.info("Event type: " + entry.getKey()
|
LOG.info("Event type: " + entry.getKey()
|
||||||
@ -295,7 +300,7 @@ public void handle(Event event) {
|
|||||||
if (qSize != 0 && qSize % detailsInterval == 0
|
if (qSize != 0 && qSize % detailsInterval == 0
|
||||||
&& lastEventDetailsQueueSizeLogged != qSize) {
|
&& lastEventDetailsQueueSizeLogged != qSize) {
|
||||||
lastEventDetailsQueueSizeLogged = qSize;
|
lastEventDetailsQueueSizeLogged = qSize;
|
||||||
printEventQueueDetails(eventQueue);
|
printEventQueueDetails();
|
||||||
printTrigger = true;
|
printTrigger = true;
|
||||||
}
|
}
|
||||||
int remCapacity = eventQueue.remainingCapacity();
|
int remCapacity = eventQueue.remainingCapacity();
|
||||||
|
@ -97,12 +97,23 @@ private enum DummyType {
|
|||||||
}
|
}
|
||||||
|
|
||||||
private static class TestHandler implements EventHandler<Event> {
|
private static class TestHandler implements EventHandler<Event> {
|
||||||
|
|
||||||
|
private long sleepTime = 1500;
|
||||||
|
|
||||||
|
TestHandler() {
|
||||||
|
}
|
||||||
|
|
||||||
|
TestHandler(long sleepTime) {
|
||||||
|
this.sleepTime = sleepTime;
|
||||||
|
}
|
||||||
|
|
||||||
@Override
|
@Override
|
||||||
public void handle(Event event) {
|
public void handle(Event event) {
|
||||||
try {
|
try {
|
||||||
// As long as 10000 events queued
|
// As long as 10000 events queued
|
||||||
Thread.sleep(1500);
|
Thread.sleep(this.sleepTime);
|
||||||
} catch (InterruptedException e) {}
|
} catch (InterruptedException e) {
|
||||||
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
@ -170,11 +181,54 @@ public void testPrintDispatcherEventDetails() throws Exception {
|
|||||||
//Make sure more than one event to take
|
//Make sure more than one event to take
|
||||||
verify(log, atLeastOnce()).
|
verify(log, atLeastOnce()).
|
||||||
info("Latest dispatch event type: TestEventType");
|
info("Latest dispatch event type: TestEventType");
|
||||||
dispatcher.stop();
|
|
||||||
} finally {
|
} finally {
|
||||||
//... restore logger object
|
//... restore logger object
|
||||||
logger.set(null, oldLog);
|
logger.set(null, oldLog);
|
||||||
|
dispatcher.stop();
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
//Test print dispatcher details when the blocking queue is heavy
|
||||||
|
@Test(timeout = 60000)
|
||||||
|
public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception {
|
||||||
|
for (int i = 0; i < 5; i++) {
|
||||||
|
testPrintDispatcherEventDetailsAvoidDeadLoopInternal();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
|
public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal()
|
||||||
|
throws Exception {
|
||||||
|
YarnConfiguration conf = new YarnConfiguration();
|
||||||
|
conf.setInt(YarnConfiguration.
|
||||||
|
YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10);
|
||||||
|
Logger log = mock(Logger.class);
|
||||||
|
AsyncDispatcher dispatcher = new AsyncDispatcher();
|
||||||
|
dispatcher.init(conf);
|
||||||
|
|
||||||
|
Field logger = AsyncDispatcher.class.getDeclaredField("LOG");
|
||||||
|
logger.setAccessible(true);
|
||||||
|
Field modifiers = Field.class.getDeclaredField("modifiers");
|
||||||
|
modifiers.setAccessible(true);
|
||||||
|
modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL);
|
||||||
|
Object oldLog = logger.get(null);
|
||||||
|
|
||||||
|
try {
|
||||||
|
logger.set(null, log);
|
||||||
|
dispatcher.register(TestEnum.class, new TestHandler(0));
|
||||||
|
dispatcher.start();
|
||||||
|
|
||||||
|
for (int i = 0; i < 10000; ++i) {
|
||||||
|
Event event = mock(Event.class);
|
||||||
|
when(event.getType()).thenReturn(TestEnum.TestEventType);
|
||||||
|
dispatcher.getEventHandler().handle(event);
|
||||||
|
}
|
||||||
|
Thread.sleep(3000);
|
||||||
|
} finally {
|
||||||
|
//... restore logger object
|
||||||
|
logger.set(null, oldLog);
|
||||||
|
dispatcher.stop();
|
||||||
|
}
|
||||||
|
}
|
||||||
|
|
||||||
}
|
}
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user