学习作业启动

This commit is contained in:
LingZhaoHui 2023-12-07 00:36:04 +08:00
parent a3980b8012
commit 9a5a33e0b2
Signed by: zeekling
GPG Key ID: D96E4E75267CA2CC
1 changed files with 57 additions and 1 deletions

View File

@ -271,7 +271,7 @@ public void transition(JobImpl job, JobEvent event) {
handleJobSetup的核心处理逻辑
- 创建attempt路径。
- 触发JobSetupCompletedEvent事件。从事件实现来看会触发JobImpl里面的JOB_SETUP_COMPLETED事件类型由SetupCompletedTransition来处理当前事件。
- 触发JobSetupCompletedEvent事件。从事件实现来看会触发JobImpl里面的JOB_SETUP_COMPLETED事件类型由SetupCompletedTransition来处理当前事件。在当前函数里面会触发JOB_COMPLETED事件。最终会走到JobImpl的checkReadyForCommit函数里面。
```java
protected void handleJobSetup(CommitterJobSetupEvent event) {
@ -288,3 +288,59 @@ protected void handleJobSetup(CommitterJobSetupEvent event) {
}
```
checkReadyForCommit函数的实现如下可以看到在触发了CommitterJobCommitEvent事件,在CommitterJobCommitEvent里面会触发JOB_COMMIT事件。主要处理逻辑在handleJobCommit里面。
```java
protected JobStateInternal checkReadyForCommit() {
JobStateInternal currentState = getInternalState();
if (completedTaskCount == tasks.size()
&& currentState == JobStateInternal.RUNNING) {
eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
return JobStateInternal.COMMITTING;
}
// return the current state as job not ready to commit yet
return getInternalState();
}
```
handleJobCommit处理逻辑如下
```java
protected void handleJobCommit(CommitterJobCommitEvent event) {
boolean commitJobIsRepeatable = false;
try {
// 检查作业是否重复。
commitJobIsRepeatable = committer.isCommitJobRepeatable(
event.getJobContext());
} catch (IOException e) {
LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
}
try {
// 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_STARTED
touchz(startCommitFile, commitJobIsRepeatable);
jobCommitStarted();
// 检查和RM的心跳。
waitForValidCommitWindow();
// 提交作业核心处理函数在commitJobInternal里面
committer.commitJob(event.getJobContext());
// 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_SUCCESS
touchz(endCommitSuccessFile, commitJobIsRepeatable);
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.error("Could not commit job", e);
try {
// 失败之后创建:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_FAIL
touchz(endCommitFailureFile, commitJobIsRepeatable);
} catch (Exception e2) {
LOG.error("could not create failure file.", e2);
}
context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(),
StringUtils.stringifyException(e)));
} finally {
jobCommitEnded();
}
}
```