2023-11-26 09:13:10 +00:00
|
|
|
|
|
|
|
|
|
# 简介
|
|
|
|
|
|
|
|
|
|
Yarn采用了基于事件驱动的并发模型:
|
|
|
|
|
|
|
|
|
|
- 所有状态机都实现了EventHandler接口,很多服务(类名通常带有Service后缀)也实现了该接口,它们都是事件处理器
|
|
|
|
|
- 需要异步处理的事件由中央异步调度器(类名通常带有Dispatcher后缀)统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。
|
|
|
|
|
|
|
|
|
|
![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png)
|
|
|
|
|
|
|
|
|
|
某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 事件处理器定义
|
|
|
|
|
|
|
|
|
|
事件处理器定义如下:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
@SuppressWarnings("rawtypes")
|
|
|
|
|
@Public
|
|
|
|
|
@Evolving
|
|
|
|
|
public interface EventHandler<T extends Event> {
|
|
|
|
|
|
|
|
|
|
void handle(T event);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
只有一个handler函数,如参是事件:
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 中央处理器AsyncDispatcher
|
|
|
|
|
|
|
|
|
|
AsyncDispatcher 实现了接口Dispatcher,Dispatcher中定义了事件Dispatcher的接口。主要提供两个功能:
|
|
|
|
|
- 注册不同类型的事件。
|
2023-11-26 12:19:25 +00:00
|
|
|
|
- 获取事件处理器,用来派发事件,等待异步执行真正的EventHandler。
|
2023-11-26 09:13:10 +00:00
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
@Public
|
|
|
|
|
@Evolving
|
|
|
|
|
public interface Dispatcher {
|
|
|
|
|
|
|
|
|
|
EventHandler<Event> getEventHandler();
|
|
|
|
|
|
|
|
|
|
void register(Class<? extends Enum> eventType, EventHandler handler);
|
|
|
|
|
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2023-11-26 12:19:25 +00:00
|
|
|
|
AsyncDispatcher实现了Dispatcher接口,也扩展了AbstractService,表明AsyncDispatcher也是一个服务,
|
|
|
|
|
是一个典型的生产者消费这模型。
|
2023-11-26 09:13:10 +00:00
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
public class AsyncDispatcher extends AbstractService implements Dispatcher {
|
|
|
|
|
...
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
2023-11-26 12:19:25 +00:00
|
|
|
|
# 事件处理器的注册
|
|
|
|
|
|
|
|
|
|
事件注册就是将事件写入到eventDispatchers里面,eventDispatchers的定义:`Map<Class<? extends Enum>, EventHandler> eventDispatchers`,
|
|
|
|
|
键是事件类型,value是事件的处理器。
|
|
|
|
|
|
|
|
|
|
对于同一事件类型注册多次handler处理函数时,将使用MultiListenerHandler代替,MultiListenerHandler里面保存了多个handler,调用handler函数时,
|
|
|
|
|
会依次调用每个handler。
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
public void register(Class<? extends Enum> eventType,
|
|
|
|
|
EventHandler handler) {
|
|
|
|
|
/* check to see if we have a listener registered */
|
|
|
|
|
EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
|
|
|
|
|
LOG.info("Registering " + eventType + " for " + handler.getClass());
|
|
|
|
|
if (registeredHandler == null) {
|
|
|
|
|
eventDispatchers.put(eventType, handler);
|
|
|
|
|
} else if (!(registeredHandler instanceof MultiListenerHandler)){
|
|
|
|
|
/* for multiple listeners of an event add the multiple listener handler */
|
|
|
|
|
MultiListenerHandler multiHandler = new MultiListenerHandler();
|
|
|
|
|
multiHandler.addHandler(registeredHandler);
|
|
|
|
|
multiHandler.addHandler(handler);
|
|
|
|
|
eventDispatchers.put(eventType, multiHandler);
|
|
|
|
|
} else {
|
|
|
|
|
/* already a multilistener, just add to it */
|
|
|
|
|
MultiListenerHandler multiHandler
|
|
|
|
|
= (MultiListenerHandler) registeredHandler;
|
|
|
|
|
multiHandler.addHandler(handler);
|
|
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
```
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# 事件处理
|
|
|
|
|
|
|
|
|
|
AsyncDispatcher#getEventHandler()是异步派发的关键:
|
|
|
|
|
|
|
|
|
|
```java
|
|
|
|
|
|
|
|
|
|
```
|
|
|
|
|
|
2023-11-26 09:13:10 +00:00
|
|
|
|
|