diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java index 90b9c8a41a..0c45ad8ca2 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobResourceUploader.java @@ -202,10 +202,13 @@ private void uploadResourcesInternal(Job job, Path submitJobDir) Map archiveSCUploadPolicies = new LinkedHashMap(); + // 将配置文件上传到staging目录,并且将上传好的路径添加到配置mapreduce.job.cache.files中 uploadFiles(job, files, submitJobDir, mapredSysPerms, replication, fileSCUploadPolicies, statCache); + // 将jar文件上传到staging目录,并且将上传好的路径添加到配置mapreduce.job.cache.files中 uploadLibJars(job, libjars, submitJobDir, mapredSysPerms, replication, fileSCUploadPolicies, statCache); + // 将归档保信息上传到staging目录,将上传好的路径放到配置mapreduce.job.cache.archives中 uploadArchives(job, archives, submitJobDir, mapredSysPerms, replication, archiveSCUploadPolicies, statCache); uploadJobJar(job, jobJar, submitJobDir, replication, statCache); diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java index 4c983178a7..748bc2f34e 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-core/src/main/java/org/apache/hadoop/mapreduce/JobSubmitter.java @@ -143,8 +143,9 @@ JobStatus submitJobInternal(Job job, Cluster cluster) checkSpecs(job); Configuration conf = job.getConfiguration(); + // 分布式缓存优化 addMRFrameworkToDistributedCache(conf); - + // 获取Staging目录 Path jobStagingArea = JobSubmissionFiles.getStagingDir(cluster, conf); //configure the command line options correctly on the submitting dfs InetAddress ip = InetAddress.getLocalHost(); @@ -154,6 +155,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster) conf.set(MRJobConfig.JOB_SUBMITHOST,submitHostName); conf.set(MRJobConfig.JOB_SUBMITHOSTADDR,submitHostAddress); } + // 从yarn上面获取Yarn ApplicationId JobID jobId = submitClient.getNewJobID(); job.setJobID(jobId); Path submitJobDir = new Path(jobStagingArea, jobId.toString()); @@ -167,6 +169,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster) LOG.debug("Configuring job " + jobId + " with " + submitJobDir + " as the submit dir"); // get delegation token for the dir + // 获取访问hdfs的token TokenCache.obtainTokensForNamenodes(job.getCredentials(), new Path[] { submitJobDir }, conf); @@ -190,9 +193,10 @@ JobStatus submitJobInternal(Job job, Cluster cluster) LOG.warn("Max job attempts set to 1 since encrypted intermediate" + "data spill is enabled"); } - + // 将需要上传的文件拷贝到submitJobDir下面,将上传的结果添加到指定的配置中。 copyAndConfigureFiles(job, submitJobDir); + // 获取job.xml的具体路径 Path submitJobFile = JobSubmissionFiles.getJobConfPath(submitJobDir); // Create the splits for the job @@ -248,6 +252,7 @@ JobStatus submitJobInternal(Job job, Cluster cluster) // Now, actually submit the job (using the submit name) // printTokens(jobId, job.getCredentials()); + // 真正提交作业的流程 status = submitClient.submitJob( jobId, submitJobDir.toString(), job.getCredentials()); if (status != null) { diff --git a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java index 0fb3f77651..44ec5b479a 100644 --- a/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java +++ b/hadoop-mapreduce-project/hadoop-mapreduce-client/hadoop-mapreduce-client-jobclient/src/main/java/org/apache/hadoop/mapred/YARNRunner.java @@ -327,6 +327,7 @@ public JobStatus submitJob(JobID jobId, String jobSubmitDir, Credentials ts) // Submit to ResourceManager try { + // 提交作业 ApplicationId applicationId = resMgrDelegate.submitApplication(appContext); @@ -580,6 +581,7 @@ public ApplicationSubmissionContext createApplicationSubmissionContext( ByteBuffer.wrap(dob.getData(), 0, dob.getLength()); // Setup ContainerLaunchContext for AM container + // 构造AM命令。 List vargs = setupAMCommand(jobConf); ContainerLaunchContext amContainer = setupContainerLaunchContextForAM( jobConf, localResources, securityTokens, vargs); diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java index 655adb589d..52e54c0d9d 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-client/src/main/java/org/apache/hadoop/yarn/client/api/impl/YarnClientImpl.java @@ -315,6 +315,7 @@ public YarnClientApplication createApplication() } //TODO: YARN-1763:Handle RM failovers during the submitApplication call. + // 提交作业。 rmClient.submitApplication(request); int pollCount = 0; @@ -328,6 +329,7 @@ public YarnClientApplication createApplication() YarnApplicationState.KILLED); while (true) { try { + // 等待作业提交的结果。 ApplicationReport appReport = getApplicationReport(applicationId); YarnApplicationState state = appReport.getYarnApplicationState(); if (!waitingStates.contains(state)) { @@ -338,7 +340,7 @@ public YarnClientApplication createApplication() LOG.info("Submitted application " + applicationId); break; } - + // 作业提交超时。 long elapsedMillis = System.currentTimeMillis() - startTime; if (enforceAsyncAPITimeout() && elapsedMillis >= asyncApiPollTimeoutMillis) { diff --git a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java index 7861a6b3e5..f68518d238 100644 --- a/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java +++ b/hadoop-yarn-project/hadoop-yarn/hadoop-yarn-server/hadoop-yarn-server-resourcemanager/src/main/java/org/apache/hadoop/yarn/server/resourcemanager/ClientRMService.java @@ -683,7 +683,7 @@ public SubmitApplicationResponse submitApplication( } try { - // call RMAppManager to submit application directly + // call RMAppManager to submit application directly 调用RMAppManager提交作业。 rmAppManager.submitApplication(submissionContext, System.currentTimeMillis(), userUgi);