增加ContainerManager
This commit is contained in:
parent
9a3c708486
commit
c9a5063b58
@ -19,5 +19,76 @@ ContainerManagerImpl实例的构造函数和serviceInit函数。
|
|||||||
|
|
||||||
## serviceInit函数
|
## serviceInit函数
|
||||||
|
|
||||||
|
主要是服务启动时的初始化函数,ContainerManager在NodeManager内部属于一个服务。所以初始化的时候会调用这个函数初始化一些服务相关的东西。
|
||||||
|
|
||||||
|
在这个函数里面总结下来主要做了几件事:
|
||||||
|
- 继续初始化一些事件,主要包含LogHandlerEventType、SharedCacheUploadEventType。
|
||||||
|
- 初始化AMRMProxyService。
|
||||||
|
- 从本地的LevelDB恢复Container信息。
|
||||||
|
|
||||||
|
### 恢复当前NodeManager的所有作业信息
|
||||||
|
|
||||||
|
|
||||||
|
#### 第一步:恢复Application 信息
|
||||||
|
|
||||||
|
首先是从LevelDB里面加载Application信息。循环加载。
|
||||||
|
|
||||||
|
```java
|
||||||
|
RecoveredApplicationsState appsState = stateStore.loadApplicationsState();
|
||||||
|
try (RecoveryIterator<ContainerManagerApplicationProto> rasIterator =
|
||||||
|
appsState.getIterator()) {
|
||||||
|
while (rasIterator.hasNext()) {
|
||||||
|
ContainerManagerApplicationProto proto = rasIterator.next();
|
||||||
|
LOG.debug("Recovering application with state: {}", proto);
|
||||||
|
recoverApplication(proto);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
加载Application的时候会将Application的上下文信息从LevelDB里面读出来,通过上下文信息等初始化新的ApplicationImpl,并且触发ApplicationInitEvent事件。
|
||||||
|
会根据当前作业上下文中实际的状态等信息跳转到实际的状态。
|
||||||
|
|
||||||
|
```java
|
||||||
|
ApplicationImpl app = new ApplicationImpl(dispatcher, p.getUser(), fc,
|
||||||
|
appId, creds, context, p.getAppLogAggregationInitedTime());
|
||||||
|
context.getApplications().put(appId, app);
|
||||||
|
metrics.runningApplication();
|
||||||
|
app.handle(new ApplicationInitEvent(appId, acls, logAggregationContext));
|
||||||
|
```
|
||||||
|
|
||||||
|
#### 第二步:恢复所有的Container信息
|
||||||
|
|
||||||
|
第二步是从LevelDB里面加载Container信息。循环加载。
|
||||||
|
|
||||||
|
```java
|
||||||
|
try (RecoveryIterator<RecoveredContainerState> rcsIterator =
|
||||||
|
stateStore.getContainerStateIterator()) {
|
||||||
|
while (rcsIterator.hasNext()) {
|
||||||
|
RecoveredContainerState rcs = rcsIterator.next();
|
||||||
|
LOG.debug("Recovering container with state: {}", rcs);
|
||||||
|
recoverContainer(rcs);
|
||||||
|
}
|
||||||
|
}
|
||||||
|
```
|
||||||
|
|
||||||
|
recoverContainer函数用于恢复单个Container信息。对于已经存在的Application对应的Container会通过LevelDB里面加载到的信息初始化Container对象,
|
||||||
|
将其加到所有Container的列表里面并且触发ApplicationContainerInitEvent,后续会根据实际状态信息跳转到指定状态继续处理。
|
||||||
|
|
||||||
|
```java
|
||||||
|
Container container = new ContainerImpl(getConfig(), dispatcher,
|
||||||
|
launchContext, credentials, metrics, token, context, rcs);
|
||||||
|
context.getContainers().put(token.getContainerID(), container);
|
||||||
|
containerScheduler.recoverActiveContainer(container, rcs);
|
||||||
|
app.handle(new ApplicationContainerInitEvent(container));
|
||||||
|
```
|
||||||
|
|
||||||
|
如果发现作业的状态为KILL状态,则会为当前Container重新触发KILL事件,保证Container已经停止。
|
||||||
|
|
||||||
|
对于Application找不见的Container,认为作业已经结束了,直接标记为已经完成。
|
||||||
|
|
||||||
|
|
||||||
|
在恢复完成之后会触发事件: ContainerSchedulerEventType.RECOVERY_COMPLETED
|
||||||
|
|
||||||
|
此状态会重新拉起所有的Container。
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user