From a4be6d16d8eccc07acfe137a23c5790d14149964 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 26 Nov 2023 17:13:10 +0800 Subject: [PATCH 1/5] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/yarn_event.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 58 insertions(+) create mode 100644 yarn/yarn_event.md diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md new file mode 100644 index 0000000..9c8bd2a --- /dev/null +++ b/yarn/yarn_event.md @@ -0,0 +1,58 @@ + +# 简介 + +Yarn采用了基于事件驱动的并发模型: + +- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器 +- 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。 + +![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png) + +某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。 + + +# 事件处理器定义 + +事件处理器定义如下: + +```java +@SuppressWarnings("rawtypes") +@Public +@Evolving +public interface EventHandler { + + void handle(T event); + +} +``` + +只有一个handler函数,如参是事件: + + +# 中央处理器AsyncDispatcher + +AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能: +- 注册不同类型的事件。 +- 获取所有的事件处理器。 + +```java +@Public +@Evolving +public interface Dispatcher { + + EventHandler getEventHandler(); + + void register(Class eventType, EventHandler handler); + +} +``` + +AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务: + +```java +public class AsyncDispatcher extends AbstractService implements Dispatcher { + ... +} +``` + + -- 2.45.2 From b0dc8f8c031dbe007ec46198c24c1402098b9e65 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 26 Nov 2023 20:19:25 +0800 Subject: [PATCH 2/5] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/yarn_event.md | 45 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 43 insertions(+), 2 deletions(-) diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md index 9c8bd2a..bff1a34 100644 --- a/yarn/yarn_event.md +++ b/yarn/yarn_event.md @@ -33,7 +33,7 @@ public interface EventHandler { AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能: - 注册不同类型的事件。 -- 获取所有的事件处理器。 +- 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。 ```java @Public @@ -47,7 +47,8 @@ public interface Dispatcher { } ``` -AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务: +AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务, +是一个典型的生产者消费这模型。 ```java public class AsyncDispatcher extends AbstractService implements Dispatcher { @@ -55,4 +56,44 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { } ``` +# 事件处理器的注册 + +事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`, +键是事件类型,value是事件的处理器。 + +对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时, +会依次调用每个handler。 + +```java +public void register(Class eventType, + EventHandler handler) { + /* check to see if we have a listener registered */ + EventHandler registeredHandler = (EventHandler) eventDispatchers.get(eventType); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventDispatchers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)){ + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventDispatchers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler + = (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } +``` + + +# 事件处理 + +AsyncDispatcher#getEventHandler()是异步派发的关键: + +```java + +``` + -- 2.45.2 From ea8d70a5ef8d001190ed65f9082fa959b3fb4608 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 26 Nov 2023 20:25:45 +0800 Subject: [PATCH 3/5] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/yarn_event.md | 2 ++ 1 file changed, 2 insertions(+) diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md index bff1a34..4b7d424 100644 --- a/yarn/yarn_event.md +++ b/yarn/yarn_event.md @@ -97,3 +97,5 @@ AsyncDispatcher#getEventHandler()是异步派发的关键: ``` +https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/ + -- 2.45.2 From 23b9b93b4cacdce8e86c23d0d5de2a9934808c1a Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 26 Nov 2023 23:17:59 +0800 Subject: [PATCH 4/5] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/yarn_event.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md index 4b7d424..c9b974b 100644 --- a/yarn/yarn_event.md +++ b/yarn/yarn_event.md @@ -97,5 +97,20 @@ AsyncDispatcher#getEventHandler()是异步派发的关键: ``` + +ResourceManager中状态机: + +- RMApp:用于维护一个Application的生命周期,实现类 - RMAppImpl +- RMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImpl +- RMContainer:用于维护一个已分配的资源最小单位Container的生命周期,实现类 - RMContainerImpl +- RMNode:用于维护一个NodeManager的生命周期,实现类 - RMNodeImpl + +NodeManager中状态机: + +- Application:用于维护节点上一个Application的生命周期,实现类 - ApplicationImpl +- Container:用于维护节点上一个容器的生命周期,实现类 - ContainerImpl +- LocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource + + https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/ -- 2.45.2 From e1ba807faf6825e15c44f053e7ffe5cfc1bc16c4 Mon Sep 17 00:00:00 2001 From: zeekling Date: Tue, 28 Nov 2023 00:43:16 +0800 Subject: [PATCH 5/5] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/yarn_event.md | 204 +++++++++++++++++++++++++++++++++++++++++++-- 1 file changed, 196 insertions(+), 8 deletions(-) diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md index c9b974b..f74124f 100644 --- a/yarn/yarn_event.md +++ b/yarn/yarn_event.md @@ -3,7 +3,7 @@ Yarn采用了基于事件驱动的并发模型: -- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器 +- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器。 - 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。 ![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png) @@ -32,7 +32,7 @@ public interface EventHandler { # 中央处理器AsyncDispatcher AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能: -- 注册不同类型的事件。 +- 注册不同类型的事件,主要包含事件类型和事件处理器。 - 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。 ```java @@ -58,11 +58,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { # 事件处理器的注册 -事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`, -键是事件类型,value是事件的处理器。 +事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`,键是事件类型,value是事件的处理器。 -对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时, -会依次调用每个handler。 +对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,会依次调用每个handler。 ```java public void register(Class eventType, @@ -93,12 +91,202 @@ public void register(Class eventType, AsyncDispatcher#getEventHandler()是异步派发的关键: ```java +private final EventHandler handlerInstance = new GenericEventHandler(); +// 省略..... + +@Override +public EventHandler getEventHandler() { + return handlerInstance; +} +``` + +## GenericEventHandler:一个特殊的事件处理器 + +GenericEventHandler是一个特殊的事件处理器,用于接受各种事件。由指定线程处理接收到的事件。 + +```java +public void handle(Event event) { + if (blockNewEvents) { + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ + int qSize = eventQueue.size(); + if (qSize != 0 && qSize % 1000 == 0 + && lastEventQueueSizeLogged != qSize) { + lastEventQueueSizeLogged = qSize; + LOG.info("Size of event-queue is " + qSize); + } + if (qSize != 0 && qSize % detailsInterval == 0 + && lastEventDetailsQueueSizeLogged != qSize) { + lastEventDetailsQueueSizeLogged = qSize; + printEventQueueDetails(); + printTrigger = true; + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue: " + + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } + // Need to reset drained flag to true if event queue is empty, + // otherwise dispatcher will hang on stop. + drained = eventQueue.isEmpty(); + throw new YarnRuntimeException(e); + } +}; ``` -ResourceManager中状态机: +- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。 + +- eventQueue:将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。 + + + +## 事件处理线程 + +在服务启动时(serviceStart函数)创建一个线程,会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。 + +```java +Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + // 省略。。。 + Event event; + try { + event = eventQueue.take(); + } catch(InterruptedException ie) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } + return; + } + if (event != null) { + // 省略。。。 + dispatch(event); + // 省略。。。 + } + } + } + }; +} +``` + + + +### dispatch详解 + +- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器,调用当前处理器的handler函数。 +- 如果当前handler处理出现异常时,默认会退出RM。 + +```java +protected void dispatch(Event event) { + //all events go thru this loop + LOG.debug("Dispatching the event {}.{}", event.getClass().getName(), + event); + + Class type = event.getType().getDeclaringClass(); + + try{ + EventHandler handler = eventDispatchers.get(type); + if(handler != null) { + handler.handle(event); + } else { + throw new Exception("No handler for registered for " + type); + } + } catch (Throwable t) { + //TODO Maybe log the state of the queue + LOG.error(FATAL, "Error in dispatcher thread", t); + // If serviceStop is called, we should exit this thread gracefully. + if (exitOnDispatchException + && (ShutdownHookManager.get().isShutdownInProgress()) == false + && stopped == false) { + stopped = true; + Thread shutDownThread = new Thread(createShutDownThread()); + shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.start(); + } + } +} +``` + + + + + +# 状态机 + +状态转换由成员变量StateMachine管理,所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。 + +```java +private static final StateMachineFactory stateMachineFactory + = new StateMachineFactory(RMAppState.NEW) + + + // Transitions from NEW state + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, + RMAppEventType.START, new RMAppNewlySavingTransition()) + .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, + RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING), + RMAppEventType.RECOVER, new RMAppRecoveredTransition()) + .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, + new AppKilledTransition()) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + + .addTransition( + RMAppState.KILLED, + RMAppState.KILLED, + EnumSet.of(RMAppEventType.APP_ACCEPTED, + RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, + RMAppEventType.NODE_UPDATE, RMAppEventType.START)) + + .installTopology(); +``` + + + +Transition定义了“从一个状态转换到另一个状态”的行为,由转换操作、开始状态、事件类型、事件组成: + +```java +public interface StateMachine + , + EVENTTYPE extends Enum, EVENT> { + public STATE getCurrentState(); + public STATE getPreviousState(); + public STATE doTransition(EVENTTYPE eventType, EVENT event) + throws InvalidStateTransitionException; +} +``` + + + + + +## ResourceManager中状态机 - RMApp:用于维护一个Application的生命周期,实现类 - RMAppImpl - RMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImpl @@ -112,5 +300,5 @@ NodeManager中状态机: - LocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource -https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/ + -- 2.45.2