格式调整优化 #23

Merged
zeekling merged 4 commits from basic_001 into main 2023-07-17 15:23:30 +00:00
4 changed files with 36 additions and 22 deletions

View File

@ -41,7 +41,7 @@ Flink SQL学习笔记提纲。持续更新。Hive SQL 离线Join VS Flink SQL
# 基础知识 # 基础知识
## 目录 ## FLink基础知识
- [Flink CEP](./basic/CEP.md) - [Flink CEP](./basic/CEP.md)
- [旁路输出](./basic/旁路输出.md) - [旁路输出](./basic/旁路输出.md)
@ -49,6 +49,10 @@ Flink SQL学习笔记提纲。持续更新。Hive SQL 离线Join VS Flink SQL
- [slot相关](./basic/slot相关.md) - [slot相关](./basic/slot相关.md)
## Flink On Hudi
- [Flink On Hudi 简介](./hudi/README.md)
# Flink 源码 # Flink 源码

View File

@ -1,7 +1,12 @@
## 目录
- [旁路输出](./旁路输出.md) - [旁路输出](./旁路输出.md)
- [Flink CEP](./CEP.md) - [Flink CEP](./CEP.md)
- [Flink Operator Chain](./Flink_Operator_chain.md) - [Flink Operator Chain](./Flink_Operator_chain.md)
- [slot相关](./slot相关.md) - [slot相关](./slot相关.md)
- [Flink基本架构](./Flink基本架构.md)
- [旁路输出](./旁路输出.md)

4
hudi/README.md Normal file
View File

@ -0,0 +1,4 @@
# Hudi 相关知识学习

View File

@ -1,9 +1,7 @@
<a title="Hits" target="_blank" href="https://github.com/zeekling/hits"><img src="https://hits.b3log.org/zeekling/flink_book.svg"></a> # 1. 内存设置
# 内存设置 ## 1.1 TaskManager 内存模型
## TaskManager 内存模型
TaskManager的内存模型如下图所示(1.10之后版本内存模型) TaskManager的内存模型如下图所示(1.10之后版本内存模型)
@ -18,7 +16,7 @@ Flink使用了堆上内存和堆外内存。
框架堆外内存、Task堆外内存、网络缓冲内存都在堆外的直接内存里面。 框架堆外内存、Task堆外内存、网络缓冲内存都在堆外的直接内存里面。
- 管理内存Flink堆外内存的管理用于管理排序hash表缓冲中间结果以及RocksDb 状态后端的本地内存。 - 管理内存Flink堆外内存的管理用于管理排序hash表缓冲中间结果以及RocksDb 状态后端的本地内存。
- JVM特有内存JVM本身占用的内存包括元数据和执行开销 - JVM特有内存JVM本身占用的内存包括元数据和执行开销
Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 网络缓冲内存 + 管理内存。 Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 网络缓冲内存 + 管理内存。
@ -26,7 +24,7 @@ Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 +
进程内存 - Flink 内存 + JVM特有内存 进程内存 - Flink 内存 + JVM特有内存
### JVM特有内存详解 ### 1.1.1 JVM特有内存详解
JVM特定内存 JVM本身使用的内存包含JVM的metaspace和over-head JVM特定内存 JVM本身使用的内存包含JVM的metaspace和over-head
@ -39,7 +37,7 @@ JVM特定内存 JVM本身使用的内存包含JVM的metaspace和over-head
` 总进程内存*fraction `,如果小于配置的min或者大于配置的max大小则使用min/max ` 总进程内存*fraction `,如果小于配置的min或者大于配置的max大小则使用min/max
### 框架内存 ### 1.1.2 框架内存
Flink框架即TaskManager本身占用的内存不计入Slot的资源中。 Flink框架即TaskManager本身占用的内存不计入Slot的资源中。
堆内:`taskmanager.memory.framework.heap.size` 默认128mb。 堆内:`taskmanager.memory.framework.heap.size` 默认128mb。
@ -47,7 +45,7 @@ Flink框架即TaskManager本身占用的内存不计入Slot的资源中。
堆外:`taskmanager.memory.framework.off-heap.size`默认128mb。 堆外:`taskmanager.memory.framework.off-heap.size`默认128mb。
### Task内存 ### 1.1.3 TaskManager内存
Task执行用户代码所使用的内存。 Task执行用户代码所使用的内存。
堆内:`taskmanager.memory,task,heap.size`默认none由Flink内存扣除掉其他部分内存得到。 堆内:`taskmanager.memory,task,heap.size`默认none由Flink内存扣除掉其他部分内存得到。
@ -55,7 +53,7 @@ Task执行用户代码所使用的内存。
堆外:`taskmanager.memory,task.off-heap.size`默认为0表示不适用堆外内存。 堆外:`taskmanager.memory,task.off-heap.size`默认为0表示不适用堆外内存。
### 网络内存 ### 1.1.4 网络内存
网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区。 网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区。
堆外:`taskmanager.memory.network.fraction`默认0.1。 堆外:`taskmanager.memory.network.fraction`默认0.1。
@ -66,11 +64,12 @@ Task执行用户代码所使用的内存。
` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小则使用min/max ` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小则使用min/max
### 托管内存 ### 1.1.5 管理内存
用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。 用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。
堆外: ***堆外***
`taskmanager.memory.managed.fraction`,默认0.4。 `taskmanager.memory.managed.fraction`,默认0.4。
`taskmanager.memory.managed.size` 默认为none。 `taskmanager.memory.managed.size` 默认为none。
@ -85,34 +84,36 @@ Task执行用户代码所使用的内存。
- 单个TaskManager内存大小为2-8G之间。 - 单个TaskManager内存大小为2-8G之间。
# 并行度设置 # 2. 并行度设置
并行度的设置和具体的作业强关联。 并行度的设置和具体的作业强关联。
## 全局并行度
## 2.1 并行度设置
### 并行度设置: - 2.1.1 **flink-conf.yml设置**
1. flink-conf.yml 设置
在我们提交一个Job的时候如果没有考虑并行度的话那么Flink会使用默认配置文件中的并行度。配置如下 在我们提交一个Job的时候如果没有考虑并行度的话那么Flink会使用默认配置文件中的并行度。配置如下
```conf ```conf
parallelism.default: 5 parallelism.default: 5
``` ```
2. env级别 - 2.1.2 **env级别**
env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。
env的级别就是`Environment` 级别。也就是通过`ExecutionEnvironment` 来设置整体的Job并行度。
```java ```java
val env = Stream... val env = Stream...
env.setParallelism(5); env.setParallelism(5);
``` ```
3. 客户端级别 - 2.1.3 **客户端级别**
如果在执行Job时候发现代码中没有设置并行度而又不修改配置文件的话可以通过Client来设置Job的并行度。 如果在执行Job时候发现代码中没有设置并行度而又不修改配置文件的话可以通过Client来设置Job的并行度。
```bash ```bash
./bin/flink run -p 5 ../wordCount-java*.jar ./bin/flink run -p 5 ../wordCount-java*.jar
``` ```
-p 即设置WordCount的Job并行度为5。 -p 即设置WordCount的Job并行度为5。
4. 算子级别 - 2.1.4 **算子级别**
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度例如为了实现读取Kafka的最高效 我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度例如为了实现读取Kafka的最高效
读取需要参考Kafka的partition的数量对并行度进行设置在Sink时需要对于Sink的介质设置不同的并行 读取需要参考Kafka的partition的数量对并行度进行设置在Sink时需要对于Sink的介质设置不同的并行
度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置: 度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置:
@ -125,7 +126,7 @@ text.keyBy(XXX)
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1 .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
``` ```
从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别 从优先级上来看: **算子级别 > env级别 > Client级别 > 系统默认级别**
并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。 并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。