From 60ae030016f6641ea8c021b2f967378c58ce6536 Mon Sep 17 00:00:00 2001 From: zeekling Date: Mon, 4 Dec 2023 23:54:21 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AD=A6=E4=B9=A0=E4=BD=9C=E4=B8=9A=E5=90=AF?= =?UTF-8?q?=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/job_start.md | 120 ++++++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 120 insertions(+) diff --git a/yarn/job_start.md b/yarn/job_start.md index 7ba3716..64fd8dd 100644 --- a/yarn/job_start.md +++ b/yarn/job_start.md @@ -136,15 +136,135 @@ dispatcher.getEventHandler().handle( #### 3、创建job init事件,并且处理 +创建init事件,核心代码如下: + +```java +JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT); +jobEventDispatcher.handle(initJobEvent); +``` + +事件处理的核心类为InitTransition,核心代码如下: + +```java +public JobStateInternal transition(JobImpl job, JobEvent event) { + job.metrics.submittedJob(job); + job.metrics.preparingJob(job); + // 初始化上下文。 + if (job.newApiCommitter) { + job.jobContext = new JobContextImpl(job.conf, + job.oldJobId); + } else { + job.jobContext = new org.apache.hadoop.mapred.JobContextImpl( + job.conf, job.oldJobId); + } + + try { + // 初始化token等信息。 + setup(job); + job.fs = job.getFileSystem(job.conf); + + //log to job history + JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId, + job.conf.get(MRJobConfig.JOB_NAME, "test"), + job.conf.get(MRJobConfig.USER_NAME, "mapred"), + job.appSubmitTime, + job.remoteJobConfFile.toString(), + job.jobACLs, job.queueName, + job.conf.get(MRJobConfig.WORKFLOW_ID, ""), + job.conf.get(MRJobConfig.WORKFLOW_NAME, ""), + job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""), + getWorkflowAdjacencies(job.conf), + job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf); + job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse)); + //TODO JH Verify jobACLs, UserName via UGI? + // 初始化并行度等信息。 + TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId); + job.numMapTasks = taskSplitMetaInfo.length; + job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0); + + if (job.numMapTasks == 0 && job.numReduceTasks == 0) { + job.addDiagnostic("No of maps and reduces are 0 " + job.jobId); + } else if (job.numMapTasks == 0) { + job.reduceWeight = 0.9f; + } else if (job.numReduceTasks == 0) { + job.mapWeight = 0.9f; + } else { + job.mapWeight = job.reduceWeight = 0.45f; + } + + checkTaskLimits(); + + // 加载其他参数,具体代码省略。。 + + cleanupSharedCacheUploadPolicies(job.conf); + + // create the Tasks but don't start them yet,, 创建map task + createMapTasks(job, inputLength, taskSplitMetaInfo); + // 创建reduce tasks + createReduceTasks(job); + + job.metrics.endPreparingJob(job); + return JobStateInternal.INITED; + } catch (Exception e) { + LOG.warn("Job init failed", e); + job.metrics.endPreparingJob(job); + job.addDiagnostic("Job init failed : " + + StringUtils.stringifyException(e)); + // Leave job in the NEW state. The MR AM will detect that the state is + // not INITED and send a JOB_INIT_FAILED event. + return JobStateInternal.NEW; + } +} +``` +#### 4、检查初始化结果并且启动作业 +当init成功时,handler返回的结果是JobStateInternal.INITED;如果是失败了则返回的结果是JobStateInternal.NEW。 +对于初始化失败的作业会触发JobEventType.JOB_INIT_FAILED事件。 +对于初始化成功的作业会调用函数startJobs,继续启动作业。触发 +```java +protected void startJobs() { + /** create a job-start event to get this ball rolling */ + JobEvent startJobEvent = new JobStartEvent(job.getID(), + recoveredJobStartTime); + /** send the job-start event. this triggers the job execution. */ + dispatcher.getEventHandler().handle(startJobEvent); +} +``` +核心处理逻辑如下,主要是触发了几个事件: +- JobHistoryEvent: +- JobInfoChangeEvent: +- CommitterJobSetupEvent: +```java +public void transition(JobImpl job, JobEvent event) { + JobStartEvent jse = (JobStartEvent) event; + if (jse.getRecoveredJobStartTime() != -1L) { + job.startTime = jse.getRecoveredJobStartTime(); + } else { + job.startTime = job.clock.getTime(); + } + JobInitedEvent jie = + new JobInitedEvent(job.oldJobId, + job.startTime, + job.numMapTasks, job.numReduceTasks, + job.getState().toString(), + job.isUber()); + job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie)); + JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId, + job.appSubmitTime, job.startTime); + job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice)); + job.metrics.runningJob(job); + job.eventHandler.handle(new CommitterJobSetupEvent( + job.jobId, job.jobContext)); +} +```