diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md index c9b974b..f74124f 100644 --- a/yarn/yarn_event.md +++ b/yarn/yarn_event.md @@ -3,7 +3,7 @@ Yarn采用了基于事件驱动的并发模型: -- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器 +- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器。 - 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。 ![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png) @@ -32,7 +32,7 @@ public interface EventHandler { # 中央处理器AsyncDispatcher AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能: -- 注册不同类型的事件。 +- 注册不同类型的事件,主要包含事件类型和事件处理器。 - 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。 ```java @@ -58,11 +58,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher { # 事件处理器的注册 -事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`, -键是事件类型,value是事件的处理器。 +事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`,键是事件类型,value是事件的处理器。 -对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时, -会依次调用每个handler。 +对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,会依次调用每个handler。 ```java public void register(Class eventType, @@ -93,12 +91,202 @@ public void register(Class eventType, AsyncDispatcher#getEventHandler()是异步派发的关键: ```java +private final EventHandler handlerInstance = new GenericEventHandler(); +// 省略..... + +@Override +public EventHandler getEventHandler() { + return handlerInstance; +} +``` + +## GenericEventHandler:一个特殊的事件处理器 + +GenericEventHandler是一个特殊的事件处理器,用于接受各种事件。由指定线程处理接收到的事件。 + +```java +public void handle(Event event) { + if (blockNewEvents) { + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ + int qSize = eventQueue.size(); + if (qSize != 0 && qSize % 1000 == 0 + && lastEventQueueSizeLogged != qSize) { + lastEventQueueSizeLogged = qSize; + LOG.info("Size of event-queue is " + qSize); + } + if (qSize != 0 && qSize % detailsInterval == 0 + && lastEventDetailsQueueSizeLogged != qSize) { + lastEventDetailsQueueSizeLogged = qSize; + printEventQueueDetails(); + printTrigger = true; + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue: " + + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } + // Need to reset drained flag to true if event queue is empty, + // otherwise dispatcher will hang on stop. + drained = eventQueue.isEmpty(); + throw new YarnRuntimeException(e); + } +}; ``` -ResourceManager中状态机: +- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。 + +- eventQueue:将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。 + + + +## 事件处理线程 + +在服务启动时(serviceStart函数)创建一个线程,会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。 + +```java +Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + // 省略。。。 + Event event; + try { + event = eventQueue.take(); + } catch(InterruptedException ie) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } + return; + } + if (event != null) { + // 省略。。。 + dispatch(event); + // 省略。。。 + } + } + } + }; +} +``` + + + +### dispatch详解 + +- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器,调用当前处理器的handler函数。 +- 如果当前handler处理出现异常时,默认会退出RM。 + +```java +protected void dispatch(Event event) { + //all events go thru this loop + LOG.debug("Dispatching the event {}.{}", event.getClass().getName(), + event); + + Class type = event.getType().getDeclaringClass(); + + try{ + EventHandler handler = eventDispatchers.get(type); + if(handler != null) { + handler.handle(event); + } else { + throw new Exception("No handler for registered for " + type); + } + } catch (Throwable t) { + //TODO Maybe log the state of the queue + LOG.error(FATAL, "Error in dispatcher thread", t); + // If serviceStop is called, we should exit this thread gracefully. + if (exitOnDispatchException + && (ShutdownHookManager.get().isShutdownInProgress()) == false + && stopped == false) { + stopped = true; + Thread shutDownThread = new Thread(createShutDownThread()); + shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.start(); + } + } +} +``` + + + + + +# 状态机 + +状态转换由成员变量StateMachine管理,所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。 + +```java +private static final StateMachineFactory stateMachineFactory + = new StateMachineFactory(RMAppState.NEW) + + + // Transitions from NEW state + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, + RMAppEventType.START, new RMAppNewlySavingTransition()) + .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, + RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING), + RMAppEventType.RECOVER, new RMAppRecoveredTransition()) + .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, + new AppKilledTransition()) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + + .addTransition( + RMAppState.KILLED, + RMAppState.KILLED, + EnumSet.of(RMAppEventType.APP_ACCEPTED, + RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, + RMAppEventType.NODE_UPDATE, RMAppEventType.START)) + + .installTopology(); +``` + + + +Transition定义了“从一个状态转换到另一个状态”的行为,由转换操作、开始状态、事件类型、事件组成: + +```java +public interface StateMachine + , + EVENTTYPE extends Enum, EVENT> { + public STATE getCurrentState(); + public STATE getPreviousState(); + public STATE doTransition(EVENTTYPE eventType, EVENT event) + throws InvalidStateTransitionException; +} +``` + + + + + +## ResourceManager中状态机 - RMApp:用于维护一个Application的生命周期,实现类 - RMAppImpl - RMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImpl @@ -112,5 +300,5 @@ NodeManager中状态机: - LocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource -https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/ +