From 9a5a33e0b2ac4ce443e2787bdeec55c75d257dd9 Mon Sep 17 00:00:00 2001 From: zeekling Date: Thu, 7 Dec 2023 00:36:04 +0800 Subject: [PATCH] =?UTF-8?q?=E5=AD=A6=E4=B9=A0=E4=BD=9C=E4=B8=9A=E5=90=AF?= =?UTF-8?q?=E5=8A=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- yarn/job_start.md | 58 ++++++++++++++++++++++++++++++++++++++++++++++- 1 file changed, 57 insertions(+), 1 deletion(-) diff --git a/yarn/job_start.md b/yarn/job_start.md index 879d01e..4bdd466 100644 --- a/yarn/job_start.md +++ b/yarn/job_start.md @@ -271,7 +271,7 @@ public void transition(JobImpl job, JobEvent event) { handleJobSetup的核心处理逻辑: - 创建attempt路径。 -- 触发JobSetupCompletedEvent事件。从事件实现来看会触发JobImpl里面的JOB_SETUP_COMPLETED事件类型,由SetupCompletedTransition来处理当前事件。 +- 触发JobSetupCompletedEvent事件。从事件实现来看会触发JobImpl里面的JOB_SETUP_COMPLETED事件类型,由SetupCompletedTransition来处理当前事件。在当前函数里面会触发JOB_COMPLETED事件。最终会走到JobImpl的checkReadyForCommit函数里面。 ```java protected void handleJobSetup(CommitterJobSetupEvent event) { @@ -288,3 +288,59 @@ protected void handleJobSetup(CommitterJobSetupEvent event) { } ``` +checkReadyForCommit函数的实现如下,可以看到在触发了CommitterJobCommitEvent事件,在CommitterJobCommitEvent里面会触发JOB_COMMIT事件。主要处理逻辑在handleJobCommit里面。 + +```java +protected JobStateInternal checkReadyForCommit() { + JobStateInternal currentState = getInternalState(); + if (completedTaskCount == tasks.size() + && currentState == JobStateInternal.RUNNING) { + eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext())); + return JobStateInternal.COMMITTING; + } + // return the current state as job not ready to commit yet + return getInternalState(); +} +``` + +handleJobCommit处理逻辑如下, + +```java +protected void handleJobCommit(CommitterJobCommitEvent event) { + boolean commitJobIsRepeatable = false; + try { + // 检查作业是否重复。 + commitJobIsRepeatable = committer.isCommitJobRepeatable( + event.getJobContext()); + } catch (IOException e) { + LOG.warn("Exception in committer.isCommitJobRepeatable():", e); + } + + try { + // 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_STARTED + touchz(startCommitFile, commitJobIsRepeatable); + jobCommitStarted(); + // 检查和RM的心跳。 + waitForValidCommitWindow(); + // 提交作业,核心处理函数在commitJobInternal里面 + committer.commitJob(event.getJobContext()); + // 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_SUCCESS + touchz(endCommitSuccessFile, commitJobIsRepeatable); + context.getEventHandler().handle( + new JobCommitCompletedEvent(event.getJobID())); + } catch (Exception e) { + LOG.error("Could not commit job", e); + try { + // 失败之后创建:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_FAIL + touchz(endCommitFailureFile, commitJobIsRepeatable); + } catch (Exception e2) { + LOG.error("could not create failure file.", e2); + } + context.getEventHandler().handle( + new JobCommitFailedEvent(event.getJobID(), + StringUtils.stringifyException(e))); + } finally { + jobCommitEnded(); + } +} +```