diff --git a/yarn/job_start.md b/yarn/job_start.md index 4e9c201..7ba3716 100644 --- a/yarn/job_start.md +++ b/yarn/job_start.md @@ -59,7 +59,86 @@ Yarn资源调度过程待完善,后面会单独章节学习。 ### initAndStartAppMaster:启动AppMaster -MRAppMaster是 +MRAppMaster在yarn内部是一个服务,最终启动的时候会调用到serviceStart函数里面,所以我们主要看这个函数里面做了什么。 + + + +#### 1、创建并且初始化Job + +创建Job对象并且将其初始化掉。但是不会启动当前作业。 + +- 初始化JobImpl对象。在JobImpl初始化的时候做了下面几件事: + + - 初始化线程池。 + + - 初始化作业状态机的核心代码如下: + + ```java + protected static final + StateMachineFactory + stateMachineFactory + = new StateMachineFactory + (JobStateInternal.NEW) + // Transitions from NEW state + .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, + JobEventType.JOB_DIAGNOSTIC_UPDATE, + DIAGNOSTIC_UPDATE_TRANSITION) + .addTransition(JobStateInternal.NEW, JobStateInternal.NEW, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + // ....省略... + .addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT, + JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION) + // create the topology tables + .installTopology(); + + - 初始化其他配置。 + +- 在中央处理器里面注册JobFinishEvent类型事件以及事件处理的handler。 + +```java +protected Job createJob(Configuration conf, JobStateInternal forcedState, + String diagnostic) { + // create single job + Job newJob = + new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(), + taskAttemptListener, jobTokenSecretManager, jobCredentials, clock, + completedTasksFromPreviousRun, metrics, + committer, newApiCommitter, + currentUser.getUserName(), appSubmitTime, amInfos, context, + forcedState, diagnostic); + ((RunningAppContext) context).jobs.put(newJob.getID(), newJob); + dispatcher.register(JobFinishEvent.Type.class, + createJobFinishEventHandler()); + return newJob; +} +``` + + + +#### 2、发送inited事件 + +发送inited事件的对象主要是下面两个: + +- 通过dispatcher给历史AM发送。 +- 当前AM。代码如下: + +```java +// Send out an MR AM inited event for this AM. +dispatcher.getEventHandler().handle( + new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo + .getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(), + amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo + .getNodeManagerHttpPort(), this.forcedState == null ? null + : this.forcedState.toString(), appSubmitTime))); +``` + + + +#### 3、创建job init事件,并且处理 + + + +