Compare commits

...

4 Commits

Author SHA1 Message Date
f08af56544 Merge branch 'master' into upload_file 2024-03-31 11:02:11 +00:00
6085eaec1c Yarn事件处理机制 (#22)
#21
Reviewed-on: #22
2023-12-07 14:55:02 +00:00
da39f0dc40 Yarn状态机 (#20)
#21
Reviewed-on: #20
2023-11-28 15:04:45 +00:00
eb1d736834 ResourceManager学习 (#19)
#18
Reviewed-on: #19
2023-11-24 15:23:55 +00:00
4 changed files with 830 additions and 0 deletions

373
yarn/job_start.md Normal file
View File

@ -0,0 +1,373 @@
# 作业启动
作业提交的客户端比较核心的类是Job.java看作业启动的源码需要从这个类开始看。
## Job.java
作业启动的入口函数为waitForCompletion函数。当前函数的核心函数为submit(),主要如下:
```java
public void submit()
throws IOException, InterruptedException, ClassNotFoundException {
ensureState(JobState.DEFINE);
setUseNewAPI();
connect();
final JobSubmitter submitter =
getJobSubmitter(cluster.getFileSystem(), cluster.getClient());
status = ugi.doAs(new PrivilegedExceptionAction<JobStatus>() {
public JobStatus run() throws IOException, InterruptedException,
ClassNotFoundException {
return submitter.submitJobInternal(Job.this, cluster);
}
});
state = JobState.RUNNING;
LOG.info("The url to track the job: " + getTrackingURL());
}
```
其中connect主要为连接ResourceManager。核心提交类为submitJobInternal在submitJobInternal中主要包含
- 检查是否开启分布式缓存,核心函数为:`addMRFrameworkToDistributedCache(conf);`
- 从yarn上面获取Yarn ApplicationId。
- 将需要上传的文件拷贝到submitJobDir下面将上传的结果添加到指定的配置中。主要实现在函数`copyAndConfigureFiles(job, submitJobDir);`里面主要上传当前作业需要的jar包等信息到staging目录。当上传Jar包比较频繁的时候可以考虑开启分布式缓存。
- 初始化核心配置,主要实现在函数:`writeConf(conf, submitJobFile);`里面。
- 最后才是真正提交作业的部分:`status = submitClient.submitJob(jobId, submitJobDir.toString(), job.getCredentials());`通过submitClient.submitJob之后是远程调用到ResourceManager的类YARNRunner.java开始作业提交。
## YARNRunner.java
在当前类中,处理逻辑主要包含下面几步:
- 创建上下问信息ApplicationSubmissionContext当前这一步当中主要是构造AM相关参数比如AM的启动命令等。在AM的启动命令中会设置AM的启动主函数MRAppMaster在资源调度到当前作业时会先启动AM的主函数MRAppMaster
- 提交作业。最后会调用到`rmClient.submitApplication(request);`发送启动作业的请求在发送请求之后会一直等到作业启动完成。启动成功之后会返回appilicationId
## 资源调度
Yarn资源调度过程待完善后面会单独章节学习。
## MRAppMaster.java
当前类是启动AM的入口函数所以要从main函数开始读代码。main函数里面主要做了下面几件事
- 初始化MRAppMaster实例。
- 加载job.xml信息。
- 初始化web信息。主要包含 MR history server、MR Server。
- 启动APPMaster。
### initAndStartAppMaster启动AppMaster
MRAppMaster在yarn内部是一个服务最终启动的时候会调用到serviceStart函数里面所以我们主要看这个函数里面做了什么。
#### 1、创建并且初始化Job
创建Job对象并且将其初始化掉。但是不会启动当前作业。
- 初始化JobImpl对象。在JobImpl初始化的时候做了下面几件事
- 初始化线程池。
- 初始化作业状态机的核心代码如下:
```java
protected static final
StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
stateMachineFactory
= new StateMachineFactory<JobImpl, JobStateInternal, JobEventType, JobEvent>
(JobStateInternal.NEW)
// Transitions from NEW state
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_DIAGNOSTIC_UPDATE,
DIAGNOSTIC_UPDATE_TRANSITION)
.addTransition(JobStateInternal.NEW, JobStateInternal.NEW,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// ....省略...
.addTransition(JobStateInternal.REBOOT, JobStateInternal.REBOOT,
JobEventType.JOB_COUNTER_UPDATE, COUNTER_UPDATE_TRANSITION)
// create the topology tables
.installTopology();
- 初始化其他配置。
- 在中央处理器里面注册JobFinishEvent类型事件以及事件处理的handler。
```java
protected Job createJob(Configuration conf, JobStateInternal forcedState,
String diagnostic) {
// create single job
Job newJob =
new JobImpl(jobId, appAttemptID, conf, dispatcher.getEventHandler(),
taskAttemptListener, jobTokenSecretManager, jobCredentials, clock,
completedTasksFromPreviousRun, metrics,
committer, newApiCommitter,
currentUser.getUserName(), appSubmitTime, amInfos, context,
forcedState, diagnostic);
((RunningAppContext) context).jobs.put(newJob.getID(), newJob);
dispatcher.register(JobFinishEvent.Type.class,
createJobFinishEventHandler());
return newJob;
}
```
#### 2、发送inited事件
发送inited事件的对象主要是下面两个
- 通过dispatcher给历史AM发送。
- 当前AM。代码如下
```java
// Send out an MR AM inited event for this AM.
dispatcher.getEventHandler().handle(
new JobHistoryEvent(job.getID(), new AMStartedEvent(amInfo
.getAppAttemptId(), amInfo.getStartTime(), amInfo.getContainerId(),
amInfo.getNodeManagerHost(), amInfo.getNodeManagerPort(), amInfo
.getNodeManagerHttpPort(), this.forcedState == null ? null
: this.forcedState.toString(), appSubmitTime)));
```
#### 3、创建job init事件并且处理
创建init事件核心代码如下
```java
JobEvent initJobEvent = new JobEvent(job.getID(), JobEventType.JOB_INIT);
jobEventDispatcher.handle(initJobEvent);
```
事件处理的核心类为InitTransition核心代码如下
```java
public JobStateInternal transition(JobImpl job, JobEvent event) {
job.metrics.submittedJob(job);
job.metrics.preparingJob(job);
// 初始化上下文。
if (job.newApiCommitter) {
job.jobContext = new JobContextImpl(job.conf,
job.oldJobId);
} else {
job.jobContext = new org.apache.hadoop.mapred.JobContextImpl(
job.conf, job.oldJobId);
}
try {
// 初始化token等信息。
setup(job);
job.fs = job.getFileSystem(job.conf);
//log to job history
JobSubmittedEvent jse = new JobSubmittedEvent(job.oldJobId,
job.conf.get(MRJobConfig.JOB_NAME, "test"),
job.conf.get(MRJobConfig.USER_NAME, "mapred"),
job.appSubmitTime,
job.remoteJobConfFile.toString(),
job.jobACLs, job.queueName,
job.conf.get(MRJobConfig.WORKFLOW_ID, ""),
job.conf.get(MRJobConfig.WORKFLOW_NAME, ""),
job.conf.get(MRJobConfig.WORKFLOW_NODE_NAME, ""),
getWorkflowAdjacencies(job.conf),
job.conf.get(MRJobConfig.WORKFLOW_TAGS, ""), job.conf);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jse));
//TODO JH Verify jobACLs, UserName via UGI?
// 初始化并行度等信息。
TaskSplitMetaInfo[] taskSplitMetaInfo = createSplits(job, job.jobId);
job.numMapTasks = taskSplitMetaInfo.length;
job.numReduceTasks = job.conf.getInt(MRJobConfig.NUM_REDUCES, 0);
if (job.numMapTasks == 0 && job.numReduceTasks == 0) {
job.addDiagnostic("No of maps and reduces are 0 " + job.jobId);
} else if (job.numMapTasks == 0) {
job.reduceWeight = 0.9f;
} else if (job.numReduceTasks == 0) {
job.mapWeight = 0.9f;
} else {
job.mapWeight = job.reduceWeight = 0.45f;
}
checkTaskLimits();
// 加载其他参数,具体代码省略。。
cleanupSharedCacheUploadPolicies(job.conf);
// create the Tasks but don't start them yet 创建map task
createMapTasks(job, inputLength, taskSplitMetaInfo);
// 创建reduce tasks
createReduceTasks(job);
job.metrics.endPreparingJob(job);
return JobStateInternal.INITED;
} catch (Exception e) {
LOG.warn("Job init failed", e);
job.metrics.endPreparingJob(job);
job.addDiagnostic("Job init failed : "
+ StringUtils.stringifyException(e));
// Leave job in the NEW state. The MR AM will detect that the state is
// not INITED and send a JOB_INIT_FAILED event.
return JobStateInternal.NEW;
}
}
```
#### 4、检查初始化结果并且启动作业
当init成功时handler返回的结果是JobStateInternal.INITED如果是失败了则返回的结果是JobStateInternal.NEW。
对于初始化失败的作业会触发JobEventType.JOB_INIT_FAILED事件。
对于初始化成功的作业会调用函数startJobs继续启动作业。触发
```java
protected void startJobs() {
/** create a job-start event to get this ball rolling */
JobEvent startJobEvent = new JobStartEvent(job.getID(),
recoveredJobStartTime);
/** send the job-start event. this triggers the job execution. */
dispatcher.getEventHandler().handle(startJobEvent);
}
```
核心处理逻辑如下,主要是触发了几个事件:
- JobHistoryEvent事件处理的handler为JobHistoryEventHandler。
- JobInfoChangeEvent
- CommitterJobSetupEvent作业启动的事件核心处理逻辑在EventProcessor中的函数handleJobSetup中。
```java
public void transition(JobImpl job, JobEvent event) {
JobStartEvent jse = (JobStartEvent) event;
if (jse.getRecoveredJobStartTime() != -1L) {
job.startTime = jse.getRecoveredJobStartTime();
} else {
job.startTime = job.clock.getTime();
}
JobInitedEvent jie =
new JobInitedEvent(job.oldJobId,
job.startTime,
job.numMapTasks, job.numReduceTasks,
job.getState().toString(),
job.isUber());
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jie));
JobInfoChangeEvent jice = new JobInfoChangeEvent(job.oldJobId,
job.appSubmitTime, job.startTime);
job.eventHandler.handle(new JobHistoryEvent(job.jobId, jice));
job.metrics.runningJob(job);
job.eventHandler.handle(new CommitterJobSetupEvent(
job.jobId, job.jobContext));
}
```
handleJobSetup的核心处理逻辑
- 创建attempt路径。
- 触发JobSetupCompletedEvent事件。从事件实现来看会触发JobImpl里面的JOB_SETUP_COMPLETED事件类型由SetupCompletedTransition来处理当前事件。在当前函数里面会触发JOB_COMPLETED事件。最终会走到JobImpl的checkReadyForCommit函数里面。
```java
protected void handleJobSetup(CommitterJobSetupEvent event) {
try {
// 主要是创建attempt路径
committer.setupJob(event.getJobContext());
context.getEventHandler().handle(
new JobSetupCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.warn("Job setup failed", e);
context.getEventHandler().handle(new JobSetupFailedEvent(
event.getJobID(), StringUtils.stringifyException(e)));
}
}
```
SetupCompletedTransition的处理逻辑如下可以看到会定时启动MapTask和ReduceTask。
```java
public void transition(JobImpl job, JobEvent event) {
job.setupProgress = 1.0f;
job.scheduleTasks(job.mapTasks, job.numReduceTasks == 0);
job.scheduleTasks(job.reduceTasks, true);
// If we have no tasks, just transition to job completed
if (job.numReduceTasks == 0 && job.numMapTasks == 0) {
job.eventHandler.handle(new JobEvent(job.jobId,
JobEventType.JOB_COMPLETED));
}
}
```
checkReadyForCommit函数的实现如下可以看到在触发了CommitterJobCommitEvent事件,在CommitterJobCommitEvent里面会触发JOB_COMMIT事件。主要处理逻辑在handleJobCommit里面。
```java
protected JobStateInternal checkReadyForCommit() {
JobStateInternal currentState = getInternalState();
if (completedTaskCount == tasks.size()
&& currentState == JobStateInternal.RUNNING) {
eventHandler.handle(new CommitterJobCommitEvent(jobId, getJobContext()));
return JobStateInternal.COMMITTING;
}
// return the current state as job not ready to commit yet
return getInternalState();
}
```
handleJobCommit处理逻辑如下
```java
protected void handleJobCommit(CommitterJobCommitEvent event) {
boolean commitJobIsRepeatable = false;
try {
// 检查作业是否重复。
commitJobIsRepeatable = committer.isCommitJobRepeatable(
event.getJobContext());
} catch (IOException e) {
LOG.warn("Exception in committer.isCommitJobRepeatable():", e);
}
try {
// 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_STARTED
touchz(startCommitFile, commitJobIsRepeatable);
jobCommitStarted();
// 检查和RM的心跳。
waitForValidCommitWindow();
// 提交作业核心处理函数在commitJobInternal里面
committer.commitJob(event.getJobContext());
// 创建文件:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_SUCCESS
touchz(endCommitSuccessFile, commitJobIsRepeatable);
context.getEventHandler().handle(
new JobCommitCompletedEvent(event.getJobID()));
} catch (Exception e) {
LOG.error("Could not commit job", e);
try {
// 失败之后创建:/tmp/hadoop-yarn/staging//user/.staging/{jobid}/COMMIT_FAIL
touchz(endCommitFailureFile, commitJobIsRepeatable);
} catch (Exception e2) {
LOG.error("could not create failure file.", e2);
}
context.getEventHandler().handle(
new JobCommitFailedEvent(event.getJobID(),
StringUtils.stringifyException(e)));
} finally {
jobCommitEnded();
}
}
```
##### CommitSucceededTransition
提交成功的事件处理handler为CommitSucceededTransition核心处理逻辑如下
```java
job.logJobHistoryFinishedEvent();
job.finished(JobStateInternal.SUCCEEDED);
```

124
yarn/resourcemanager.md Normal file
View File

@ -0,0 +1,124 @@
# 简介
ResourceManager(RM)RM是全局的资源管理器负责整个系统的资源管理和分配。主要由以下两部分组成
- 调度器:根据容量、队列限制条件将系统资源分配给各个应用。
- 资源分配的单位是containercontainer是一个动态资源单位它将内存、CPU、磁盘、网络等资源封装在一起从而限定了资源使用量。
- 调度器是一个可插拔的组件用户可以自己定制也可以选择Fair或Capacity调度器.
- 应用程序管理器:负责管理所有应用程序的以下内容:
- 应用提交
- 与调度器协商资源以启动AM.
- 监控AM运行状态并在失败时重启它
# RM内部架构
- 交互模块RM对普通用户、管理员、Web提供了三种对外服务
- ClientRMService:为普通用户提供服务它处理来自客户端的各种RPC比如:
- 应用提交
- 终止应用
- 获取应用状态等
- AdminService:为管理员提供的独立接口,主要目的是为了防止大量普通用户请求阻塞管理员通道,提供如下功能:
- 动态更新节点列表
- 更新ACL列表
- 更新队列信息等
- WebApp:提供一个Web界面来让用户更友好的获知集群和应用的状态
- NM管理模块用来管理NM的模块主要包含以下三个组件
- ResourceTrackerService:处理来自NodeManager的请求主要包括
- 注册注册是NM启动时发生的行为NM提供的信息包括
- 节点ID、可用资源上限信息等.
- 心跳:心跳是周期行为
- NM提供的信息包括
- 各个Container运行状态、运行的Application列表、节点健康状态等.
- RM返回的信息包括
- 等待释放的Container列表、Application列表等.
- NMLivelinessMonitor:监控NM是否活着如果NM在一定时间(默认10m)内未上报心跳,则认为它死掉,需要移除.
- NodesListManager:维护正常节点和异常节点列表管理exclude(类似黑名单)和include(类似白名单)节点列表,
这两个列表均是在配置文件中设置的,可以动态加载。
- AM管理模块主要是用来管理所有AM主要包括
- ApplicationMasterService(AMS):处理来自AM的请求包括
- 注册是AM启动时发生的行为信息包括
- AM的启动节点、对外RPC端口、tracking URL等.
- 心跳:是周期行为
- AM提供的信息包括所需资源的描述、待释放Container列表、黑名单列表等.
- AMS返回的信息包括新分配的Container、失败的Container、待抢占的Container列表等
- AMLivelinessMonitor:监控AM是否活着如果AM在一定时间(默认10m)内未上报心路,
则认为它死掉它上面正在运行的Container将会被置为失败状态而AM本身会被分配到另一个节点上(用户可以指定重试次数默认5)
- ApplicationMasterLauncher与某个NM通信要求它为某个应用程序启动AM.
- 应用管理模块:主要是各个应用外围的管理,并不涉及到应用内部
- ApplicationACLsManager:管理应用程序访问权限,包含两部分:
- 查看权限:主要用于查看应用程序基本信息
- 修改权限:主要用于修改应用程序优先级、杀死应用程序等
- RMAppManager:管理应用程序的启动和关闭.
- ContainerAllocationExpirer:当AM收到RM新分配的Container后必须在一定时间(默认10m)内在对应的NM上启动该Container
否则RM将强制回收该Container而一个已经分配的Container是否该被回收则是由ContainerAllocationExpirer决定和执行的
- 状态机管理模块RM使用有限状态机维护有状态对象的生命周期状态机的引入使得Yarn的架构设计清晰RM内部的状态机有
- RMApp:维护一个应用程序的整个运行周期,包括从启动到运行结束的整个过程
- 由于一个APP的生命周期可能会启动多个运行实例(Attempt)RMApp维护的是所有的这些Attempt
- RMAppAttempt:一次应用程序的运行实例的整个生命周期可以理解为APP的一次尝试运行
- RMContainer:一个Container的运行周期包括从创建到运行结束的整个过程。
- RM将资源封装成Container发送给应用程序的AMAM在Container描述的运行环境中启动任务
- Yarn不支持Container重用一个Container用完后会立刻释放
- RMNode:维护了一个NM的生命周期包括从启动到运行结束的整个过程
- 安全模块RM自带了非常全面的权限管理机制主要包括
- ClientToAMSecretManager
- ContainerTokenSecretManager
- ApplicationTokenSecretManager
- 调度模块主要包含一个组件ResourceScheduler。
- 资源调度器,它按照一定的约束条件(比如队列容量限制等)将集群中的资源分配给各个应用程序目前主要考虑内存和CPU。
- ResourceScheduler是一个可插拔式的模块自带三个调度器用户可以自己定制。
- FIFO先进先出单用户。
- Fair Scheduler:公平调度器(FairScheduler基本上具备其它两种的所有功能)
- Capacity Scheduler:容量调度器
# RM事件与事件处理器
Yarn采用了事件驱动机制而RM是的实现则是最好的例证。所有服务和组件均是通过中央异步调度器组织在一起的
不同组件之间通过事件交互,从而实现了一个异步并行的高效系统。
## 服务
|组件名称 | 输出事件类型| 用途 |
|-----|------|-------|
| ClientRMService | RMAppAttemptEvent <br> RMAppEvent <br> RMNodeEvent | |
| NMLivelinessMonitor | RMNodeEvent | |
| ResourceTrackerService | RMNodeEvent <br> RMAppAttemptEvent | |
| AMLivelinessMonitor | RMAppAttemptEvent | |
| ContainerAllocationExpirer | SchedulerEvent | |
## 事件处理器
|组件名称 | 处理的事件类型 | 输出事件类型 | 用途 |
|-----|------|-------|-------|
| ApplicationMasterLauncher | AMLauncherEvent | - | |
| RMAppManager | RMAppManagerEvent | RMAppEvent | |
| NodesListManager | NodesListManagerEvent | RMNodeEvent <br> RMAppEvent | |
| RMApp | RMAppEvent | RMAppAttemptEvent <br> RMNodeEvent <br> SchedulerEvent <br> RMAppManagerEvent | |
| RMAppAttempt | RMAppAttemptEvent | SchedulerEvent <br> RMAppAttemptEvent <br> RMAppEvent <br> AMLauncherEvent <br> RMNodeEvent | |
| RMNode | RMNodeEvent | RMAppEvent <br> SchedulerEvent <br> NodesListManagerEvent <br> RMNodeEvent | |
| ResourceScheduler | SchedulerEvent | RMAppEvent <br> RMAppAttemptEvent | |
| RMContainer | RMContainerEvent | RMAppEvent <br> RMAppAttemptEvent <br> RMNodeEvent | |
### 事件处理器实现类
- RMApp 实现类:
- ApplicationEventDispatcher
- RMAppImpl
- RMAppAttempt 实现类
- ApplicationAttemptEventDispatcher
- RMAppAttemptImpl
- RMNode实现类
- NodeEventDispatcher
- RMNodeImpl
- ResourceScheduler实现类
- EventDispatcher
- FairScheduler
- RMContainer实现类
- RMContainerImpl

304
yarn/yarn_event.md Normal file
View File

@ -0,0 +1,304 @@
# 简介
Yarn采用了基于事件驱动的并发模型
- 所有状态机都实现了EventHandler接口很多服务类名通常带有Service后缀也实现了该接口它们都是事件处理器。
- 需要异步处理的事件由中央异步调度器类名通常带有Dispatcher后缀统一接收/派发,需要同步处理的事件直接交给相应的事件处理器。
![pic](https://pan.zeekling.cn/zeekling/hadoop/event/state_event_001.png)
某些事件处理器不仅处理事件,也会向中央异步调度器发送事件。
# 事件处理器定义
事件处理器定义如下:
```java
@SuppressWarnings("rawtypes")
@Public
@Evolving
public interface EventHandler<T extends Event> {
void handle(T event);
}
```
只有一个handler函数如参是事件
# 中央处理器AsyncDispatcher
AsyncDispatcher 实现了接口DispatcherDispatcher中定义了事件Dispatcher的接口。主要提供两个功能
- 注册不同类型的事件,主要包含事件类型和事件处理器。
- 获取事件处理器用来派发事件等待异步执行真正的EventHandler。
```java
@Public
@Evolving
public interface Dispatcher {
EventHandler<Event> getEventHandler();
void register(Class<? extends Enum> eventType, EventHandler handler);
}
```
AsyncDispatcher实现了Dispatcher接口也扩展了AbstractService表明AsyncDispatcher也是一个服务
是一个典型的生产者消费这模型。
```java
public class AsyncDispatcher extends AbstractService implements Dispatcher {
...
}
```
# 事件处理器的注册
事件注册就是将事件写入到eventDispatchers里面eventDispatchers的定义`Map<Class<? extends Enum>, EventHandler> eventDispatchers`键是事件类型value是事件的处理器。
对于同一事件类型注册多次handler处理函数时将使用MultiListenerHandler代替MultiListenerHandler里面保存了多个handler调用handler函数时会依次调用每个handler。
```java
public void register(Class<? extends Enum> eventType,
EventHandler handler) {
/* check to see if we have a listener registered */
EventHandler<Event> registeredHandler = (EventHandler<Event>) eventDispatchers.get(eventType);
LOG.info("Registering " + eventType + " for " + handler.getClass());
if (registeredHandler == null) {
eventDispatchers.put(eventType, handler);
} else if (!(registeredHandler instanceof MultiListenerHandler)){
/* for multiple listeners of an event add the multiple listener handler */
MultiListenerHandler multiHandler = new MultiListenerHandler();
multiHandler.addHandler(registeredHandler);
multiHandler.addHandler(handler);
eventDispatchers.put(eventType, multiHandler);
} else {
/* already a multilistener, just add to it */
MultiListenerHandler multiHandler
= (MultiListenerHandler) registeredHandler;
multiHandler.addHandler(handler);
}
}
```
# 事件处理
AsyncDispatcher#getEventHandler()是异步派发的关键:
```java
private final EventHandler<Event> handlerInstance = new GenericEventHandler();
// 省略.....
@Override
public EventHandler<Event> getEventHandler() {
return handlerInstance;
}
```
## GenericEventHandler一个特殊的事件处理器
GenericEventHandler是一个特殊的事件处理器用于接受各种事件。由指定线程处理接收到的事件。
```java
public void handle(Event event) {
if (blockNewEvents) {
return;
}
drained = false;
/* all this method does is enqueue all the events onto the queue */
int qSize = eventQueue.size();
if (qSize != 0 && qSize % 1000 == 0
&& lastEventQueueSizeLogged != qSize) {
lastEventQueueSizeLogged = qSize;
LOG.info("Size of event-queue is " + qSize);
}
if (qSize != 0 && qSize % detailsInterval == 0
&& lastEventDetailsQueueSizeLogged != qSize) {
lastEventDetailsQueueSizeLogged = qSize;
printEventQueueDetails();
printTrigger = true;
}
int remCapacity = eventQueue.remainingCapacity();
if (remCapacity < 1000) {
LOG.warn("Very low remaining capacity in the event-queue: "
+ remCapacity);
}
try {
eventQueue.put(event);
} catch (InterruptedException e) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", e);
}
// Need to reset drained flag to true if event queue is empty,
// otherwise dispatcher will hang on stop.
drained = eventQueue.isEmpty();
throw new YarnRuntimeException(e);
}
};
```
- blockNewEvents: 是否阻塞事件处理,只有当中央处理器停止之后才会停止接受事件。
- eventQueue将接收到的请求放置到当前阻塞队列里面。方便指定线程及时处理。
## 事件处理线程
在服务启动时serviceStart函数创建一个线程会循环处理接受到的事件。核心处理逻辑在函数dispatch里面。
```java
Runnable createThread() {
return new Runnable() {
@Override
public void run() {
while (!stopped && !Thread.currentThread().isInterrupted()) {
drained = eventQueue.isEmpty();
// 省略。。。
Event event;
try {
event = eventQueue.take();
} catch(InterruptedException ie) {
if (!stopped) {
LOG.warn("AsyncDispatcher thread interrupted", ie);
}
return;
}
if (event != null) {
// 省略。。。
dispatch(event);
// 省略。。。
}
}
}
};
}
```
### dispatch详解
- 从已经注册的eventDispatchers列表里面查找当前事件对应的处理器调用当前处理器的handler函数。
- 如果当前handler处理出现异常时默认会退出RM。
```java
protected void dispatch(Event event) {
//all events go thru this loop
LOG.debug("Dispatching the event {}.{}", event.getClass().getName(),
event);
Class<? extends Enum> type = event.getType().getDeclaringClass();
try{
EventHandler handler = eventDispatchers.get(type);
if(handler != null) {
handler.handle(event);
} else {
throw new Exception("No handler for registered for " + type);
}
} catch (Throwable t) {
//TODO Maybe log the state of the queue
LOG.error(FATAL, "Error in dispatcher thread", t);
// If serviceStop is called, we should exit this thread gracefully.
if (exitOnDispatchException
&& (ShutdownHookManager.get().isShutdownInProgress()) == false
&& stopped == false) {
stopped = true;
Thread shutDownThread = new Thread(createShutDownThread());
shutDownThread.setName("AsyncDispatcher ShutDown handler");
shutDownThread.start();
}
}
}
```
# 状态机
状态转换由成员变量StateMachine管理所有的StateMachine都由StateMachineFactory进行管理。由addTransition函数实现状态机。
```java
private static final StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent> stateMachineFactory
= new StateMachineFactory<RMAppImpl,
RMAppState,
RMAppEventType,
RMAppEvent>(RMAppState.NEW)
// Transitions from NEW state
.addTransition(RMAppState.NEW, RMAppState.NEW,
RMAppEventType.NODE_UPDATE, new RMAppNodeUpdateTransition())
.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())
.addTransition(RMAppState.NEW, EnumSet.of(RMAppState.SUBMITTED,
RMAppState.ACCEPTED, RMAppState.FINISHED, RMAppState.FAILED,
RMAppState.KILLED, RMAppState.FINAL_SAVING),
RMAppEventType.RECOVER, new RMAppRecoveredTransition())
.addTransition(RMAppState.NEW, RMAppState.KILLED, RMAppEventType.KILL,
new AppKilledTransition())
.addTransition(RMAppState.NEW, RMAppState.FINAL_SAVING,
RMAppEventType.APP_REJECTED,
new FinalSavingTransition(new AppRejectedTransition(),
RMAppState.FAILED))
.addTransition(
RMAppState.KILLED,
RMAppState.KILLED,
EnumSet.of(RMAppEventType.APP_ACCEPTED,
RMAppEventType.APP_REJECTED, RMAppEventType.KILL,
RMAppEventType.ATTEMPT_FINISHED, RMAppEventType.ATTEMPT_FAILED,
RMAppEventType.NODE_UPDATE, RMAppEventType.START))
.installTopology();
```
Transition定义了“从一个状态转换到另一个状态”的行为由转换操作、开始状态、事件类型、事件组成
```java
public interface StateMachine
<STATE extends Enum<STATE>,
EVENTTYPE extends Enum<EVENTTYPE>, EVENT> {
public STATE getCurrentState();
public STATE getPreviousState();
public STATE doTransition(EVENTTYPE eventType, EVENT event)
throws InvalidStateTransitionException;
}
```
## ResourceManager中状态机
- RMApp用于维护一个Application的生命周期实现类 - RMAppImpl
- RMAppAttempt用于维护一次试探运行的生命周期实现类 - RMAppAttemptImpl
- RMContainer用于维护一个已分配的资源最小单位Container的生命周期实现类 - RMContainerImpl
- RMNode用于维护一个NodeManager的生命周期实现类 - RMNodeImpl
NodeManager中状态机
- Application用于维护节点上一个Application的生命周期实现类 - ApplicationImpl
- Container用于维护节点上一个容器的生命周期实现类 - ContainerImpl
- LocalizedResource用于维护节点上资源本地化的生命周期没有使用接口即实现类 - LocalizedResource

29
yarn/yarn_event_detail.md Normal file
View File

@ -0,0 +1,29 @@
# 简介
Yarn状态机的基础部分参见[Yarn 状态机以及事件机制](./yarn_event.md)
本章主要将Yarn当中详细的事件以及处理过程。
AsyncDispatcher是中央处理器的核心线程。通过使用AsyncDispatcher的对象可以分析Yarn里面有多少个中央处理器每个处理器都由什么用途。
## Component dispatcher
当前的处理器注册了下面几个事件:
- ServiceEventHandler:
- ComponentEventHandler:
- ComponentInstanceEventHandler:
```java
dispatcher.register(ServiceEventType.class, new ServiceEventHandler());
dispatcher.register(ComponentEventType.class, new ComponentEventHandler());
dispatcher.register(ComponentInstanceEventType.class,
new ComponentInstanceEventHandler());
```