From da39f0dc4090f9b5edb5b0be991f8a79f8fe719b Mon Sep 17 00:00:00 2001 From: zeekling Date: Tue, 28 Nov 2023 15:04:45 +0000 Subject: [PATCH] =?UTF-8?q?Yarn=E7=8A=B6=E6=80=81=E6=9C=BA=20(#20)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit https://git.zeekling.cn/big-data/hadoop_book/issues/21 Reviewed-on: https://git.zeekling.cn/big-data/hadoop_book/pulls/20 --- yarn/yarn_event.md | 304 +++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 304 insertions(+) create mode 100644 yarn/yarn_event.md diff --git a/yarn/yarn_event.md b/yarn/yarn_event.md new file mode 100644 index 0000000..f74124f --- /dev/null +++ b/yarn/yarn_event.md @@ -0,0 +1,304 @@ + +# 简介 + +Yarn采用了基于事件驱动的并发模型: + +- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器。 +- 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。 + +![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png) + +某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。 + + +# 事件处理器定义 + +事件处理器定义如下: + +```java +@SuppressWarnings("rawtypes") +@Public +@Evolving +public interface EventHandler { + + void handle(T event); + +} +``` + +只有一个handler函数,如参是事件: + + +# 中央处理器AsyncDispatcher + +AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能: +- 注册不同类型的事件,主要包含事件类型和事件处理器。 +- 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。 + +```java +@Public +@Evolving +public interface Dispatcher { + + EventHandler getEventHandler(); + + void register(Class eventType, EventHandler handler); + +} +``` + +AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务, +是一个典型的生产者消费这模型。 + +```java +public class AsyncDispatcher extends AbstractService implements Dispatcher { + ... +} +``` + +# 事件处理器的注册 + +事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map, EventHandler> eventDispatchers`,键是事件类型,value是事件的处理器。 + +对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,会依次调用每个handler。 + +```java +public void register(Class eventType, + EventHandler handler) { + /* check to see if we have a listener registered */ + EventHandler registeredHandler = (EventHandler) eventDispatchers.get(eventType); + LOG.info("Registering " + eventType + " for " + handler.getClass()); + if (registeredHandler == null) { + eventDispatchers.put(eventType, handler); + } else if (!(registeredHandler instanceof MultiListenerHandler)){ + /* for multiple listeners of an event add the multiple listener handler */ + MultiListenerHandler multiHandler = new MultiListenerHandler(); + multiHandler.addHandler(registeredHandler); + multiHandler.addHandler(handler); + eventDispatchers.put(eventType, multiHandler); + } else { + /* already a multilistener, just add to it */ + MultiListenerHandler multiHandler + = (MultiListenerHandler) registeredHandler; + multiHandler.addHandler(handler); + } + } +``` + + +# 事件处理 + +AsyncDispatcher#getEventHandler()是异步派发的关键: + +```java +private final EventHandler handlerInstance = new GenericEventHandler(); + +// 省略..... + +@Override +public EventHandler getEventHandler() { + return handlerInstance; +} +``` + +## GenericEventHandler:一个特殊的事件处理器 + +GenericEventHandler是一个特殊的事件处理器,用于接受各种事件。由指定线程处理接收到的事件。 + +```java +public void handle(Event event) { + if (blockNewEvents) { + return; + } + drained = false; + /* all this method does is enqueue all the events onto the queue */ + int qSize = eventQueue.size(); + if (qSize != 0 && qSize % 1000 == 0 + && lastEventQueueSizeLogged != qSize) { + lastEventQueueSizeLogged = qSize; + LOG.info("Size of event-queue is " + qSize); + } + if (qSize != 0 && qSize % detailsInterval == 0 + && lastEventDetailsQueueSizeLogged != qSize) { + lastEventDetailsQueueSizeLogged = qSize; + printEventQueueDetails(); + printTrigger = true; + } + int remCapacity = eventQueue.remainingCapacity(); + if (remCapacity < 1000) { + LOG.warn("Very low remaining capacity in the event-queue: " + + remCapacity); + } + try { + eventQueue.put(event); + } catch (InterruptedException e) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", e); + } + // Need to reset drained flag to true if event queue is empty, + // otherwise dispatcher will hang on stop. + drained = eventQueue.isEmpty(); + throw new YarnRuntimeException(e); + } +}; +``` + + + +- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。 + +- eventQueue:将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。 + + + +## 事件处理线程 + +在服务启动时(serviceStart函数)创建一个线程,会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。 + +```java +Runnable createThread() { + return new Runnable() { + @Override + public void run() { + while (!stopped && !Thread.currentThread().isInterrupted()) { + drained = eventQueue.isEmpty(); + // 省略。。。 + Event event; + try { + event = eventQueue.take(); + } catch(InterruptedException ie) { + if (!stopped) { + LOG.warn("AsyncDispatcher thread interrupted", ie); + } + return; + } + if (event != null) { + // 省略。。。 + dispatch(event); + // 省略。。。 + } + } + } + }; +} +``` + + + +### dispatch详解 + +- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器,调用当前处理器的handler函数。 +- 如果当前handler处理出现异常时,默认会退出RM。 + +```java +protected void dispatch(Event event) { + //all events go thru this loop + LOG.debug("Dispatching the event {}.{}", event.getClass().getName(), + event); + + Class type = event.getType().getDeclaringClass(); + + try{ + EventHandler handler = eventDispatchers.get(type); + if(handler != null) { + handler.handle(event); + } else { + throw new Exception("No handler for registered for " + type); + } + } catch (Throwable t) { + //TODO Maybe log the state of the queue + LOG.error(FATAL, "Error in dispatcher thread", t); + // If serviceStop is called, we should exit this thread gracefully. + if (exitOnDispatchException + && (ShutdownHookManager.get().isShutdownInProgress()) == false + && stopped == false) { + stopped = true; + Thread shutDownThread = new Thread(createShutDownThread()); + shutDownThread.setName("AsyncDispatcher ShutDown handler"); + shutDownThread.start(); + } + } +} +``` + + + + + +# 状态机 + +状态转换由成员变量StateMachine管理,所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。 + +```java +private static final StateMachineFactory stateMachineFactory + = new StateMachineFactory(RMAppState.NEW) + + + // Transitions from NEW state + .addTransition(RMAppState.NEW, RMAppState.NEW, + RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition()) + .addTransition(RMAppState.NEW, RMAppState.NEW_SAVING, + RMAppEventType.START, new RMAppNewlySavingTransition()) + .addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED, + RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED, + RMAppState.KILLED, RMAppState.FINAL_SAVING), + RMAppEventType.RECOVER, new RMAppRecoveredTransition()) + .addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL, + new AppKilledTransition()) + .addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING, + RMAppEventType.APP_REJECTED, + new FinalSavingTransition(new AppRejectedTransition(), + RMAppState.FAILED)) + + .addTransition( + RMAppState.KILLED, + RMAppState.KILLED, + EnumSet.of(RMAppEventType.APP_ACCEPTED, + RMAppEventType.APP_REJECTED, RMAppEventType.KILL, + RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED, + RMAppEventType.NODE_UPDATE, RMAppEventType.START)) + + .installTopology(); +``` + + + +Transition定义了“从一个状态转换到另一个状态”的行为,由转换操作、开始状态、事件类型、事件组成: + +```java +public interface StateMachine + , + EVENTTYPE extends Enum, EVENT> { + public STATE getCurrentState(); + public STATE getPreviousState(); + public STATE doTransition(EVENTTYPE eventType, EVENT event) + throws InvalidStateTransitionException; +} +``` + + + + + +## ResourceManager中状态机 + +- RMApp:用于维护一个Application的生命周期,实现类 - RMAppImpl +- RMAppAttempt:用于维护一次试探运行的生命周期,实现类 - RMAppAttemptImpl +- RMContainer:用于维护一个已分配的资源最小单位Container的生命周期,实现类 - RMContainerImpl +- RMNode:用于维护一个NodeManager的生命周期,实现类 - RMNodeImpl + +NodeManager中状态机: + +- Application:用于维护节点上一个Application的生命周期,实现类 - ApplicationImpl +- Container:用于维护节点上一个容器的生命周期,实现类 - ContainerImpl +- LocalizedResource:用于维护节点上资源本地化的生命周期,没有使用接口即实现类 - LocalizedResource + + + +