From e7cad3811fd14ff0b963423a24a9012ce5fb07a9 Mon Sep 17 00:00:00 2001 From: Peter Bacsko Date: Fri, 5 Mar 2021 13:50:45 +0100 Subject: [PATCH] YARN-10642. Race condition: AsyncDispatcher can get stuck by the changes introduced in YARN-8995. Contributed by zhengchenyu. --- .../hadoop/yarn/event/AsyncDispatcher.java | 19 +++--- .../yarn/event/TestAsyncDispatcher.java | 60 ++++++++++++++++++- 2 files changed, 69 insertions(+), 10 deletions(-) diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java index 79ad464200..f9deab06ff 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/main/java/org/apache/hadoop/yarn/event/AsyncDispatcher.java @@ -20,11 +20,11 @@ import java.util.ArrayList; import java.util.HashMap; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.concurrent.BlockingQueue; import java.util.concurrent.LinkedBlockingQueue; -import java.util.stream.Collectors; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -268,11 +268,16 @@ public EventHandler getEventHandler() { } class GenericEventHandler implements EventHandler { - private void printEventQueueDetails(BlockingQueue queue) { - Map counterMap = eventQueue.stream(). - collect(Collectors. - groupingBy(e -> e.getType(), Collectors.counting()) - ); + private void printEventQueueDetails() { + Iterator iterator = eventQueue.iterator(); + Map counterMap = new HashMap<>(); + while (iterator.hasNext()) { + Enum eventType = iterator.next().getType(); + if (!counterMap.containsKey(eventType)) { + counterMap.put(eventType, 0L); + } + counterMap.put(eventType, counterMap.get(eventType) + 1); + } for (Map.Entry entry : counterMap.entrySet()) { long num = entry.getValue(); LOG.info("Event type: " + entry.getKey() @@ -295,7 +300,7 @@ public void handle(Event event) { if (qSize != 0 && qSize % detailsInterval == 0 && lastEventDetailsQueueSizeLogged != qSize) { lastEventDetailsQueueSizeLogged = qSize; - printEventQueueDetails(eventQueue); + printEventQueueDetails(); printTrigger = true; } int remCapacity = eventQueue.remainingCapacity(); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java index 762e2280ca..55ddd12fce 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-common/src/test/java/org/apache/hadoop/yarn/event/TestAsyncDispatcher.java @@ -97,12 +97,23 @@ private enum DummyType { } private static class TestHandler implements EventHandler { + + private long sleepTime = 1500; + + TestHandler() { + } + + TestHandler(long sleepTime) { + this.sleepTime = sleepTime; + } + @Override public void handle(Event event) { try { // As long as 10000 events queued - Thread.sleep(1500); - } catch (InterruptedException e) {} + Thread.sleep(this.sleepTime); + } catch (InterruptedException e) { + } } } @@ -170,11 +181,54 @@ public void testPrintDispatcherEventDetails() throws Exception { //Make sure more than one event to take verify(log, atLeastOnce()). info("Latest dispatch event type: TestEventType"); - dispatcher.stop(); } finally { //... restore logger object logger.set(null, oldLog); + dispatcher.stop(); } } + + //Test print dispatcher details when the blocking queue is heavy + @Test(timeout = 60000) + public void testPrintDispatcherEventDetailsAvoidDeadLoop() throws Exception { + for (int i = 0; i < 5; i++) { + testPrintDispatcherEventDetailsAvoidDeadLoopInternal(); + } + } + + public void testPrintDispatcherEventDetailsAvoidDeadLoopInternal() + throws Exception { + YarnConfiguration conf = new YarnConfiguration(); + conf.setInt(YarnConfiguration. + YARN_DISPATCHER_PRINT_EVENTS_INFO_THRESHOLD, 10); + Logger log = mock(Logger.class); + AsyncDispatcher dispatcher = new AsyncDispatcher(); + dispatcher.init(conf); + + Field logger = AsyncDispatcher.class.getDeclaredField("LOG"); + logger.setAccessible(true); + Field modifiers = Field.class.getDeclaredField("modifiers"); + modifiers.setAccessible(true); + modifiers.setInt(logger, logger.getModifiers() & ~Modifier.FINAL); + Object oldLog = logger.get(null); + + try { + logger.set(null, log); + dispatcher.register(TestEnum.class, new TestHandler(0)); + dispatcher.start(); + + for (int i = 0; i < 10000; ++i) { + Event event = mock(Event.class); + when(event.getType()).thenReturn(TestEnum.TestEventType); + dispatcher.getEventHandler().handle(event); + } + Thread.sleep(3000); + } finally { + //... restore logger object + logger.set(null, oldLog); + dispatcher.stop(); + } + } + }