flink_book/source_code/作业提交源码分析.md

4.1 KiB
Raw Blame History

作业提交流程

作业提交脚本

  • flink-conf.yml获取Flink运行的环境信息
    • taskmanager.compute.numa:
    • env.pid.dir: Flink pid保存路径
    • env.log.dir: Flink日志保存路径
    • env.log.max: Flink日志归档数目
    • env.yarn.conf.dir: Yarn配置文件所在目录
    • env.hadoop.conf.dir: Hadoop配置文件路径
    • env.hbase.conf.dir: Hbase配置文件路径
    • env.java.home: Java_home
    • env.java.opts.all: Java 参数
    • env.java.opts.jobmanager JobManager JVM相关参数
    • env.java.opts.taskmanagerTaskManager JVM相关参数
    • env.java.opts.historyserver HistoryServer JVM参数
    • env.java.opts.clientFlink 客户端 JVM参数
    • env.ssh.opts 定义Flink PID文件目录的配置参数启动或停止JobManager时传递给SSH客户端的其他命令行选项。
    • high-availability.type Flink运行模式
    • zookeeper.heap.mb
  • 启动类:CliFrontend

作业提交主函数

作业提交的入口函数为CliFrontend处理流程如下

  • 查找配置文件夹路径
    • 判断环境变量FLINK_CONF_DIR所在的文件夹是否存在。
    • 如果FLINK_CONF_DIR所在的文件夹不存在则,判断脚本所在的相对路径../conf是否存在。
    • 如果上述路径不存在,则判断相对路径conf是否存在。
  • 加载全局配置
    • 从配置文件夹路径下面的flink-conf.yaml文件当中读取配置信息
  • 加载命令行参数
  • 启动Flink任务提交
    • run: 使用除了application之外的其他模式提交作业。
      • 参数解析:
        • 解析主函数,参数名称:-c--class
        • 解析运行jar文件参数名称:-j--jarfile
        • 解析jar文件运行参数参数名称-a--arguments
        • 解析classpath信息参数名称-C--classpath
        • 解析并发度,参数名称:-p--parallelism
        • 解析运行模式,是否在后台运行,参数名称:-d--detached
        • 解析savepoint参数参数名称-s--fromSavepoint传了当前参数表示需要从指定的savepoint恢复作业。
      • 尝试加载flink 作业jar包并且判断依赖。判断主函数。
      • 运行程序。
        • 设置类加载器。
        • ContextEnvironment.setAsContext初始化远程客户端执行环境
        • StreamContextEnvironment.setAsContextExecutionEnvironment是执行程序的上下文。提供了控制作业执行例 如设置并行度)和与外部世界交互(数据访问)的方法。
        • 调用主函数。
    • run-application: 使用application模式提交作业。
      • 参数解析:
        • 解析savepoint参数参数名称-s--fromSavepoint传了当前参数表示需要从指定的savepoint恢复作业。
        • 参数名称:-n--allowNonRestoredState表示是否允许跳过不可以restore的savepoint状态。
        • 参数名称:-rm--restoreMode表示定义如何从给定的保存点恢复支持claim、subsumed。
        • 解析主函数,参数名称:-c--class
        • 解析运行jar文件参数名称:-j--jarfile
        • 解析classpath信息参数名称-C--classpath
        • 解析并发度,参数名称:-p--parallelism
        • 解析并发度,参数名称:-p--parallelism
      • 解析Flink jar包。
      • 启动Flink作业,调用ApplicationDeployer.run(),具体实现类为:ApplicationClusterDeployer
        • 初始化clientFactory根据提供的配置查找适当的ClusterClientFactory。
        • 执行deployApplicationCluster触发应用程序集群的部署。支持yarn、Kubernetes、Standalone三种模式。
    • info: 使用info [OPTIONS] 主要查看class信息。
    • list: 显示正在运行或调度的程序。
    • cancel: 取消正在运行的作业。
    • stop: 停止正在运行的作业。
    • savepoint: 触发一个正在运行的应用生成savepoint
    • -h--help: 帮助信息。
    • -v--version 显示版本信息。