hadoop_book/yarn/yarn_event.md
2023-11-28 15:04:45 +00:00

305 lines
9.6 KiB
Markdown
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# 简介
Yarn采用了基于事件驱动的并发模型
- 所有状态机都实现了EventHandler接口很多服务类名通常带有Service后缀也实现了该接口它们都是事件处理器。
- 需要异步处理的事件由中央异步调度器类名通常带有Dispatcher后缀统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。
![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png)
某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。
# 事件处理器定义
事件处理器定义如下:
```java
@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {
void handle(T event);
}
```
只有一个handler函数如参是事件
# 中央处理器AsyncDispatcher
AsyncDispatcher 实现了接口DispatcherDispatcher中定义了事件Dispatcher的接口。主要提供两个功能
- 注册不同类型的事件,主要包含事件类型和事件处理器。
- 获取事件处理器用来派发事件等待异步执行真正的EventHandler。
```java
@Public
@Evolving
public interface Dispatcher {
EventHandler<Event> getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
```
AsyncDispatcher实现了Dispatcher接口也扩展了AbstractService表明AsyncDispatcher也是一个服务
是一个典型的生产者消费这模型。
```java
public class AsyncDispatcher extends AbstractService implements Dispatcher {
...
}
```
# 事件处理器的注册
事件注册就是将事件写入到eventDispatchers里面eventDispatchers的定义`Map<Class<? extends Enum>, EventHandler> eventDispatchers`键是事件类型value是事件的处理器。
对于同一事件类型注册多次handler处理函数时将使用MultiListenerHandler代替MultiListenerHandler里面保存了多个handler调用handler函数时会依次调用每个handler。
```java
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
```
# 事件处理
AsyncDispatcher#getEventHandler()是异步派发的关键:
```java
private final EventHandler<Event> handlerInstance = new GenericEventHandler();
// 省略.....
@Override
public EventHandler<Event> getEventHandler() {
return handlerInstance;
}
```
## GenericEventHandler一个特殊的事件处理器
GenericEventHandler是一个特殊的事件处理器用于接受各种事件。由指定线程处理接收到的事件。
```java
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
```
- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。
- eventQueue将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。
## 事件处理线程
在服务启动时serviceStart函数创建一个线程会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。
```java
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// 省略。。。
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
// 省略。。。
dispatch(event);
// 省略。。。
}
}
}
};
}
```
### dispatch详解
- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器调用当前处理器的handler函数。
- 如果当前handler处理出现异常时默认会退出RM。
```java
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
event);
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.error(FATAL, "Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
```
# 状态机
状态转换由成员变量StateMachine管理所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。
```java
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.START))
.installTopology();
```
Transition定义了“从一个状态转换到另一个状态”的行为由转换操作、开始状态、事件类型、事件组成
```java
public interface StateMachine
<STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
public STATE getCurrentState();
public STATE getPreviousState();
public STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException;
}
```
## ResourceManager中状态机
- RMApp用于维护一个Application的生命周期实现类 - RMAppImpl
- RMAppAttempt用于维护一次试探运行的生命周期实现类 - RMAppAttemptImpl
- RMContainer用于维护一个已分配的资源最小单位Container的生命周期实现类 - RMContainerImpl
- RMNode用于维护一个NodeManager的生命周期实现类 - RMNodeImpl
NodeManager中状态机
- Application用于维护节点上一个Application的生命周期实现类 - ApplicationImpl
- Container用于维护节点上一个容器的生命周期实现类 - ContainerImpl
- LocalizedResource用于维护节点上资源本地化的生命周期没有使用接口即实现类 - LocalizedResource