flink_book/source_code/作业提交源码分析.md

80 lines
4.1 KiB
Markdown
Raw Normal View History

2023-02-21 14:59:21 +00:00
# 作业提交流程
## 作业提交脚本
### flink 脚本
- 从`flink-conf.yml`获取Flink运行的环境信息
- `taskmanager.compute.numa`:
- `env.pid.dir`: Flink pid保存路径
- `env.log.dir`: Flink日志保存路径
- `env.log.max`: Flink日志归档数目
- `env.yarn.conf.dir`: Yarn配置文件所在目录
- `env.hadoop.conf.dir`: Hadoop配置文件路径
- `env.hbase.conf.dir`: Hbase配置文件路径
- `env.java.home`: Java_home
- `env.java.opts.all`: Java 参数
- `env.java.opts.jobmanager` JobManager JVM相关参数
- `env.java.opts.taskmanager`TaskManager JVM相关参数
- `env.java.opts.historyserver` HistoryServer JVM参数
- `env.java.opts.client`Flink 客户端 JVM参数
- `env.ssh.opts` 定义Flink PID文件目录的配置参数启动或停止JobManager时传递给SSH客户端的其他命令行选项。
- `high-availability.type` Flink运行模式
- `zookeeper.heap.mb`
- 启动类:`CliFrontend`
## 作业提交主函数
作业提交的入口函数为CliFrontend处理流程如下
- 查找配置文件夹路径
- 判断环境变量`FLINK_CONF_DIR`所在的文件夹是否存在。
- 如果`FLINK_CONF_DIR`所在的文件夹不存在则,判断脚本所在的相对路径`../conf`是否存在。
- 如果上述路径不存在,则判断相对路径`conf`是否存在。
- 加载全局配置
- 从配置文件夹路径下面的`flink-conf.yaml`文件当中读取配置信息
- 加载命令行参数
- 启动Flink任务提交
- `run`: 使用除了application之外的其他模式提交作业。
- 参数解析:
- 解析主函数,参数名称:`-c`、`--class`。
- 解析运行jar文件参数名称:`-j`、`--jarfile`。
- 解析jar文件运行参数参数名称`-a`、`--arguments`。
- 解析classpath信息参数名称`-C`、`--classpath`。
- 解析并发度,参数名称:`-p`、`--parallelism`。
- 解析运行模式,是否在后台运行,参数名称:`-d`、`--detached`。
- 解析savepoint参数参数名称`-s`、`--fromSavepoint`传了当前参数表示需要从指定的savepoint恢复作业。
- 尝试加载flink 作业jar包并且判断依赖。判断主函数。
- 运行程序。
- 设置类加载器。
- ContextEnvironment.setAsContext初始化远程客户端执行环境
- StreamContextEnvironment.setAsContextExecutionEnvironment是执行程序的上下文。提供了控制作业执行
如设置并行度)和与外部世界交互(数据访问)的方法。
- 调用主函数。
- `run-application`: 使用application模式提交作业。
- 参数解析:
- 解析savepoint参数参数名称`-s`、`--fromSavepoint`传了当前参数表示需要从指定的savepoint恢复作业。
- 参数名称:`-n`、`--allowNonRestoredState`表示是否允许跳过不可以restore的savepoint状态。
- 参数名称:`-rm`、`--restoreMode`表示定义如何从给定的保存点恢复支持claim、subsumed。
- 解析主函数,参数名称:`-c`、`--class`。
- 解析运行jar文件参数名称:`-j`、`--jarfile`。
- 解析classpath信息参数名称`-C`、`--classpath`。
- 解析并发度,参数名称:`-p`、`--parallelism`。
- 解析并发度,参数名称:`-p`、`--parallelism`。
- 解析Flink jar包。
- 启动Flink作业,调用`ApplicationDeployer.run()`,具体实现类为:`ApplicationClusterDeployer`
- 初始化clientFactory根据提供的配置查找适当的ClusterClientFactory。
- 执行deployApplicationCluster触发应用程序集群的部署。支持yarn、Kubernetes、Standalone三种模式。
- `info`: 使用info [OPTIONS] <jar-file> <arguments>主要查看class信息。
- `list`: 显示正在运行或调度的程序。
- `cancel`: 取消正在运行的作业。
- `stop`: 停止正在运行的作业。
- `savepoint`: 触发一个正在运行的应用生成savepoint
- `-h`、`--help`: 帮助信息。
- `-v`、`--version` 显示版本信息。