2.7 KiB
2.7 KiB
作业启动
作业提交的客户端比较核心的类是Job.java,看作业启动的源码需要从这个类开始看。
Job.java
作业启动的入口函数为waitForCompletion函数。当前函数的核心函数为submit(),主要如下:
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
其中,connect主要为连接ResourceManager。核心提交类为submitJobInternal,在submitJobInternal中主要包含:
- 检查是否开启分布式缓存,核心函数为:
addMRFrameworkToDistributedCache(conf);
- 从yarn上面获取Yarn ApplicationId。
- 将需要上传的文件拷贝到submitJobDir下面,将上传的结果添加到指定的配置中。主要实现在函数
copyAndConfigureFiles(job, submitJobDir);
里面,主要上传当前作业需要的jar包等信息到staging目录。当上传Jar包比较频繁的时候可以考虑开启分布式缓存。 - 初始化核心配置,主要实现在函数:
writeConf(conf, submitJobFile);
里面。 - 最后才是真正提交作业的部分:
status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());
通过submitClient.submitJob之后是远程调用到ResourceManager的类:YARNRunner.java,开始作业提交。
YARNRunner.java
在当前类中,处理逻辑主要包含下面几步:
- 创建上下问信息:ApplicationSubmissionContext,当前这一步当中主要是构造AM相关参数,比如AM的启动命令等。在AM的启动命令中会设置AM的启动主函数MRAppMaster,在资源调度到当前作业时,会先启动AM的主函数MRAppMaster
- 提交作业。最后会调用到
rmClient.submitApplication(request);
发送启动作业的请求,在发送请求之后会一直等到作业启动完成。启动成功之后会返回appilicationId
资源调度
Yarn资源调度过程待完善,后面会单独章节学习。
MRAppMaster.java
当前类是启动AM的入口函数,所以要从main函数开始读代码。main函数里面主要做了下面几件事:
- 初始化MRAppMaster实例。
- 加载job.xml信息。
- 初始化web信息。主要包含: MR history server、MR Server。
- 启动APPMaster。
initAndStartAppMaster:启动AppMaster
MRAppMaster是