Yarn状态机

This commit is contained in:
LingZhaoHui 2023-11-28 00:43:16 +08:00
parent 23b9b93b4c
commit e1ba807faf
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
1 changed files with 196 additions and 8 deletions

View File

@ -3,7 +3,7 @@
Yarn采用了基于事件驱动的并发模型
- 所有状态机都实现了EventHandler接口很多服务类名通常带有Service后缀也实现了该接口它们都是事件处理器
- 所有状态机都实现了EventHandler接口很多服务类名通常带有Service后缀也实现了该接口它们都是事件处理器
- 需要异步处理的事件由中央异步调度器类名通常带有Dispatcher后缀统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。
![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png)
@ -32,7 +32,7 @@ public interface EventHandler<T extends Event> {
# 中央处理器AsyncDispatcher
AsyncDispatcher 实现了接口DispatcherDispatcher中定义了事件Dispatcher的接口。主要提供两个功能
- 注册不同类型的事件。
- 注册不同类型的事件,主要包含事件类型和事件处理器
- 获取事件处理器用来派发事件等待异步执行真正的EventHandler。
```java
@ -58,11 +58,9 @@ public class AsyncDispatcher extends AbstractService implements Dispatcher {
# 事件处理器的注册
事件注册就是将事件写入到eventDispatchers里面eventDispatchers的定义`Map<Class<? extends Enum>, EventHandler> eventDispatchers`
键是事件类型value是事件的处理器。
事件注册就是将事件写入到eventDispatchers里面eventDispatchers的定义`Map<Class<? extends Enum>, EventHandler> eventDispatchers`键是事件类型value是事件的处理器。
对于同一事件类型注册多次handler处理函数时将使用MultiListenerHandler代替MultiListenerHandler里面保存了多个handler调用handler函数时
会依次调用每个handler。
对于同一事件类型注册多次handler处理函数时将使用MultiListenerHandler代替MultiListenerHandler里面保存了多个handler调用handler函数时会依次调用每个handler。
```java
public void register(Class<? extends Enum> eventType,
@ -93,12 +91,202 @@ public void register(Class<? extends Enum> eventType,
AsyncDispatcher#getEventHandler()是异步派发的关键:
```java
private final EventHandler<Event> handlerInstance = new GenericEventHandler();
// 省略.....
@Override
public EventHandler<Event> getEventHandler() {
return handlerInstance;
}
```
## GenericEventHandler一个特殊的事件处理器
GenericEventHandler是一个特殊的事件处理器用于接受各种事件。由指定线程处理接收到的事件。
```java
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
```
ResourceManager中状态机
- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。
- eventQueue将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。
## 事件处理线程
在服务启动时serviceStart函数创建一个线程会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。
```java
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// 省略。。。
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
// 省略。。。
dispatch(event);
// 省略。。。
}
}
}
};
}
```
### dispatch详解
- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器调用当前处理器的handler函数。
- 如果当前handler处理出现异常时默认会退出RM。
```java
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
event);
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.error(FATAL, "Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
```
# 状态机
状态转换由成员变量StateMachine管理所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。
```java
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.START))
.installTopology();
```
Transition定义了“从一个状态转换到另一个状态”的行为由转换操作、开始状态、事件类型、事件组成
```java
public interface StateMachine
<STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
public STATE getCurrentState();
public STATE getPreviousState();
public STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException;
}
```
## ResourceManager中状态机
- RMApp用于维护一个Application的生命周期实现类 - RMAppImpl
- RMAppAttempt用于维护一次试探运行的生命周期实现类 - RMAppAttemptImpl
@ -112,5 +300,5 @@ NodeManager中状态机
- LocalizedResource用于维护节点上资源本地化的生命周期没有使用接口即实现类 - LocalizedResource
https://monkeysayhi.github.io/2018/11/20/%E6%BA%90%E7%A0%81%7CYarn%E7%9A%84%E4%BA%8B%E4%BB%B6%E9%A9%B1%E5%8A%A8%E6%A8%A1%E5%9E%8B%E4%B8%8E%E7%8A%B6%E6%80%81%E6%9C%BA/