增加作业提交的中文注释。

This commit is contained in:
LingZhaoHui 2023-10-28 22:04:00 +08:00
parent 7db9895000
commit e1d4e6e26d
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
5 changed files with 16 additions and 4 deletions

View File

@ -202,10 +202,13 @@ private void uploadResourcesInternal(Job job, Path submitJobDir)
Map<String, Boolean> archiveSCUploadPolicies = Map<String, Boolean> archiveSCUploadPolicies =
new LinkedHashMap<String, Boolean>(); new LinkedHashMap<String, Boolean>();
// 将配置文件上传到staging目录并且将上传好的路径添加到配置mapreduce.job.cache.files中
uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, uploadFiles(job, files, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache); fileSCUploadPolicies, statCache);
// 将jar文件上传到staging目录并且将上传好的路径添加到配置mapreduce.job.cache.files中
uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication, uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication,
fileSCUploadPolicies, statCache); fileSCUploadPolicies, statCache);
// 将归档保信息上传到staging目录将上传好的路径放到配置mapreduce.job.cache.archives中
uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication, uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication,
archiveSCUploadPolicies, statCache); archiveSCUploadPolicies, statCache);
uploadJobJar(job, jobJar, submitJobDir, replication, statCache); uploadJobJar(job, jobJar, submitJobDir, replication, statCache);

View File

@ -143,8 +143,9 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
checkSpecs(job); checkSpecs(job);
Configuration conf = job.getConfiguration(); Configuration conf = job.getConfiguration();
// 分布式缓存优化
addMRFrameworkToDistributedCache(conf); addMRFrameworkToDistributedCache(conf);
// 获取Staging目录
Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf);
//configure the command line options correctly on the submitting dfs //configure the command line options correctly on the submitting dfs
InetAddress ip = InetAddress.getLocalHost(); InetAddress ip = InetAddress.getLocalHost();
@ -154,6 +155,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName);
conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress);
} }
// 从yarn上面获取Yarn ApplicationId
JobID jobId = submitClient.getNewJobID(); JobID jobId = submitClient.getNewJobID();
job.setJobID(jobId); job.setJobID(jobId);
Path submitJobDir = new Path(jobStagingArea, jobId.toString()); Path submitJobDir = new Path(jobStagingArea, jobId.toString());
@ -167,6 +169,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
LOG.debug("Configuring job " + jobId + " with " + submitJobDir LOG.debug("Configuring job " + jobId + " with " + submitJobDir
+ " as the submit dir"); + " as the submit dir");
// get delegation token for the dir // get delegation token for the dir
// 获取访问hdfs的token
TokenCache.obtainTokensForNamenodes(job.getCredentials(), TokenCache.obtainTokensForNamenodes(job.getCredentials(),
new Path[] { submitJobDir }, conf); new Path[] { submitJobDir }, conf);
@ -190,9 +193,10 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
LOG.warn("Max job attempts set to 1 since encrypted intermediate" + LOG.warn("Max job attempts set to 1 since encrypted intermediate" +
"data spill is enabled"); "data spill is enabled");
} }
// 将需要上传的文件拷贝到submitJobDir下面将上传的结果添加到指定的配置中
copyAndConfigureFiles(job, submitJobDir); copyAndConfigureFiles(job, submitJobDir);
// 获取job.xml的具体路径
Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir);
// Create the splits for the job // Create the splits for the job
@ -248,6 +252,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster)
// Now, actually submit the job (using the submit name) // Now, actually submit the job (using the submit name)
// //
printTokens(jobId, job.getCredentials()); printTokens(jobId, job.getCredentials());
// 真正提交作业的流程
status = submitClient.submitJob( status = submitClient.submitJob(
jobId, submitJobDir.toString(), job.getCredentials()); jobId, submitJobDir.toString(), job.getCredentials());
if (status != null) { if (status != null) {

View File

@ -327,6 +327,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts)
// Submit to ResourceManager // Submit to ResourceManager
try { try {
// 提交作业
ApplicationId applicationId = ApplicationId applicationId =
resMgrDelegate.submitApplication(appContext); resMgrDelegate.submitApplication(appContext);
@ -580,6 +581,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext(
ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); ByteBuffer.wrap(dob.getData(), 0, dob.getLength());
// Setup ContainerLaunchContext for AM container // Setup ContainerLaunchContext for AM container
// 构造AM命令
List<String> vargs = setupAMCommand(jobConf); List<String> vargs = setupAMCommand(jobConf);
ContainerLaunchContext amContainer = setupContainerLaunchContextForAM( ContainerLaunchContext amContainer = setupContainerLaunchContextForAM(
jobConf, localResources, securityTokens, vargs); jobConf, localResources, securityTokens, vargs);

View File

@ -315,6 +315,7 @@ public YarnClientApplication createApplication()
} }
//TODO: YARN-1763:Handle RM failovers during the submitApplication call. //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
// 提交作业
rmClient.submitApplication(request); rmClient.submitApplication(request);
int pollCount = 0; int pollCount = 0;
@ -328,6 +329,7 @@ public YarnClientApplication createApplication()
YarnApplicationState.KILLED); YarnApplicationState.KILLED);
while (true) { while (true) {
try { try {
// 等待作业提交的结果
ApplicationReport appReport = getApplicationReport(applicationId); ApplicationReport appReport = getApplicationReport(applicationId);
YarnApplicationState state = appReport.getYarnApplicationState(); YarnApplicationState state = appReport.getYarnApplicationState();
if (!waitingStates.contains(state)) { if (!waitingStates.contains(state)) {
@ -338,7 +340,7 @@ public YarnClientApplication createApplication()
LOG.info("Submitted application " + applicationId); LOG.info("Submitted application " + applicationId);
break; break;
} }
// 作业提交超时
long elapsedMillis = System.currentTimeMillis() - startTime; long elapsedMillis = System.currentTimeMillis() - startTime;
if (enforceAsyncAPITimeout() && if (enforceAsyncAPITimeout() &&
elapsedMillis >= asyncApiPollTimeoutMillis) { elapsedMillis >= asyncApiPollTimeoutMillis) {

View File

@ -683,7 +683,7 @@ public SubmitApplicationResponse submitApplication(
} }
try { try {
// call RMAppManager to submit application directly // call RMAppManager to submit application directly 调用RMAppManager提交作业
rmAppManager.submitApplication(submissionContext, rmAppManager.submitApplication(submissionContext,
System.currentTimeMillis(), userUgi); System.currentTimeMillis(), userUgi);