学习作业启动

This commit is contained in:
LingZhaoHui 2023-12-03 22:55:59 +08:00
parent ccc687cf68
commit 1d7b0176a9
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
1 changed files with 80 additions and 1 deletions

View File

@ -59,7 +59,86 @@ Yarn资源调度过程待完善后面会单独章节学习。
### initAndStartAppMaster启动AppMaster
MRAppMaster是
MRAppMaster在yarn内部是一个服务最终启动的时候会调用到serviceStart函数里面所以我们主要看这个函数里面做了什么。
#### 1、创建并且初始化Job
创建Job对象并且将其初始化掉。但是不会启动当前作业。
- 初始化JobImpl对象。在JobImpl初始化的时候做了下面几件事
- 初始化线程池。
- 初始化作业状态机的核心代码如下:
```java
protected static final
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobStateInternal.NEW)
// Transitions from NEW state
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// ....省略...
.addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
- 初始化其他配置。
- 在中央处理器里面注册JobFinishEvent类型事件以及事件处理的handler。
```java
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
```
#### 2、发送inited事件
发送inited事件的对象主要是下面两个
- 通过dispatcher给历史AM发送。
- 当前AM。代码如下
```java
// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString(), appSubmitTime)));
```
#### 3、创建job init事件并且处理