From 9cd725ea3acaec2e618f1d3887ed4587c681a02e Mon Sep 17 00:00:00 2001 From: zeekling Date: Sat, 1 Jul 2023 22:32:35 +0800 Subject: [PATCH 1/4] =?UTF-8?q?=E5=AE=8C=E5=96=84basic=E9=87=8C=E9=9D=A2?= =?UTF-8?q?=E7=9A=84=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- basic/README.md | 5 +++++ 1 file changed, 5 insertions(+) diff --git a/basic/README.md b/basic/README.md index e4f9e3c..d56ee3a 100644 --- a/basic/README.md +++ b/basic/README.md @@ -1,7 +1,12 @@ +## 目录 - [旁路输出](./旁路输出.md) - [Flink CEP](./CEP.md) - [Flink Operator Chain](./Flink_Operator_chain.md) - [slot相关](./slot相关.md) +- [Flink基本架构](./Flink基本架构.md) +- [旁路输出](./旁路输出.md) + + -- 2.45.2 From 521bde768403b7702e49147cbb04c8cbf2fea98a Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 2 Jul 2023 01:03:12 +0800 Subject: [PATCH 2/4] =?UTF-8?q?=E4=BF=AE=E6=94=B9=E5=86=85=E5=AD=98?= =?UTF-8?q?=E6=A8=A1=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 调优/Resource.md | 27 +++++++++++++++------------ 1 file changed, 15 insertions(+), 12 deletions(-) diff --git a/调优/Resource.md b/调优/Resource.md index d647042..1f93c39 100644 --- a/调优/Resource.md +++ b/调优/Resource.md @@ -18,7 +18,7 @@ Flink使用了堆上内存和堆外内存。 框架堆外内存、Task堆外内存、网络缓冲内存都在堆外的直接内存里面。 - 管理内存:Flink堆外内存的管理,用于管理排序,hash表,缓冲中间结果以及RocksDb 状态后端的本地内存。 -- JVM特有内存:JVM本身占用的内存,包括元数据和执行开销, +- JVM特有内存:JVM本身占用的内存,包括元数据和执行开销。 Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 网络缓冲内存 + 管理内存。 @@ -66,11 +66,12 @@ Task执行用户代码所使用的内存。 ` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小,则使用min/max -### 托管内存 +### 管理内存 用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。 -堆外: +***堆外***: + `taskmanager.memory.managed.fraction`,默认0.4。 `taskmanager.memory.managed.size` ,默认为none。 @@ -88,31 +89,33 @@ Task执行用户代码所使用的内存。 # 并行度设置 并行度的设置和具体的作业强关联。 -## 全局并行度 +## 并行度设置 -### 并行度设置: +- **flink-conf.yml设置** -1. flink-conf.yml 设置 在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下: ```conf parallelism.default: 5 ``` -2. env级别 -env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。 +- **env级别** + +env的级别就是`Environment` 级别。也就是通过`ExecutionEnvironment` 来设置整体的Job并行度。 ```java val env = Stream... env.setParallelism(5); ``` -3. 客户端级别 +- **客户端级别** + 如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。 ```bash ./bin/flink run -p 5 ../wordCount-java*.jar ``` --p即设置WordCount的Job并行度为5。 +-p 即设置WordCount的Job并行度为5。 + +- **算子级别** -4. 算子级别 我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效 读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行 度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置: @@ -125,7 +128,7 @@ text.keyBy(XXX) .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1 ``` -从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别 +从优先级上来看: **算子级别 > env级别 > Client级别 > 系统默认级别** 并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。 -- 2.45.2 From 9853df39487ba769e04318f3c2f611154fc0768b Mon Sep 17 00:00:00 2001 From: zeekling Date: Thu, 6 Jul 2023 00:09:02 +0800 Subject: [PATCH 3/4] =?UTF-8?q?=E5=A2=9E=E5=8A=A0hudi=E7=9B=AE=E5=BD=95?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 6 +++++- hudi/README.md | 4 ++++ 2 files changed, 9 insertions(+), 1 deletion(-) create mode 100644 hudi/README.md diff --git a/README.md b/README.md index 4259857..d2a60c3 100644 --- a/README.md +++ b/README.md @@ -41,7 +41,7 @@ Flink SQL学习笔记提纲。持续更新。Hive SQL 离线Join VS Flink SQL # 基础知识 -## 目录 +## FLink基础知识 - [Flink CEP](./basic/CEP.md) - [旁路输出](./basic/旁路输出.md) @@ -49,6 +49,10 @@ Flink SQL学习笔记提纲。持续更新。Hive SQL 离线Join VS Flink SQL - [slot相关](./basic/slot相关.md) +## Flink On Hudi + +- [Flink On Hudi 简介](./hudi/README.md) + # Flink 源码 diff --git a/hudi/README.md b/hudi/README.md new file mode 100644 index 0000000..516bc3e --- /dev/null +++ b/hudi/README.md @@ -0,0 +1,4 @@ + +# Hudi 相关知识学习 + + -- 2.45.2 From f6db0f7c9af3bd2916712fa4c502d702b65cf9a3 Mon Sep 17 00:00:00 2001 From: zeekling Date: Mon, 17 Jul 2023 23:22:59 +0800 Subject: [PATCH 4/4] =?UTF-8?q?=E8=B0=83=E6=95=B4=E8=B5=84=E6=BA=90?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- 调优/Resource.md | 28 +++++++++++++--------------- 1 file changed, 13 insertions(+), 15 deletions(-) diff --git a/调优/Resource.md b/调优/Resource.md index 1f93c39..ccf8d5b 100644 --- a/调优/Resource.md +++ b/调优/Resource.md @@ -1,9 +1,7 @@ - +# 1. 内存设置 -# 内存设置 - -## TaskManager 内存模型 +## 1.1 TaskManager 内存模型 TaskManager的内存模型如下图所示(1.10之后版本内存模型): @@ -26,7 +24,7 @@ Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 进程内存 - Flink 内存 + JVM特有内存 -### JVM特有内存详解 +### 1.1.1 JVM特有内存详解 JVM特定内存: JVM本身使用的内存,包含JVM的metaspace和over-head @@ -39,7 +37,7 @@ JVM特定内存: JVM本身使用的内存,包含JVM的metaspace和over-head ` 总进程内存*fraction `,如果小于配置的min或者大于配置的max大小,则使用min/max -### 框架内存 +### 1.1.2 框架内存 Flink框架,即TaskManager本身占用的内存,不计入Slot的资源中。 堆内:`taskmanager.memory.framework.heap.size` ,默认128mb。 @@ -47,7 +45,7 @@ Flink框架,即TaskManager本身占用的内存,不计入Slot的资源中。 堆外:`taskmanager.memory.framework.off-heap.size`,默认128mb。 -### Task内存 +### 1.1.3 TaskManager内存 Task执行用户代码所使用的内存。 堆内:`taskmanager.memory,task,heap.size`,默认none,由Flink内存扣除掉其他部分内存得到。 @@ -55,7 +53,7 @@ Task执行用户代码所使用的内存。 堆外:`taskmanager.memory,task.off-heap.size`,默认为0,表示不适用堆外内存。 -### 网络内存 +### 1.1.4 网络内存 网络数据交换所使用的堆外内存大小,如网络数据交换缓冲区。 堆外:`taskmanager.memory.network.fraction`,默认0.1。 @@ -66,7 +64,7 @@ Task执行用户代码所使用的内存。 ` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小,则使用min/max -### 管理内存 +### 1.1.5 管理内存 用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。 @@ -86,19 +84,19 @@ Task执行用户代码所使用的内存。 - 单个TaskManager内存大小为2-8G之间。 -# 并行度设置 +# 2. 并行度设置 并行度的设置和具体的作业强关联。 -## 并行度设置 +## 2.1 并行度设置 -- **flink-conf.yml设置** +- 2.1.1 **flink-conf.yml设置** 在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下: ```conf parallelism.default: 5 ``` -- **env级别** +- 2.1.2 **env级别** env的级别就是`Environment` 级别。也就是通过`ExecutionEnvironment` 来设置整体的Job并行度。 @@ -106,7 +104,7 @@ env的级别就是`Environment` 级别。也就是通过`ExecutionEnvironment` val env = Stream... env.setParallelism(5); ``` -- **客户端级别** +- 2.1.3 **客户端级别** 如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。 ```bash @@ -114,7 +112,7 @@ env.setParallelism(5); ``` -p 即设置WordCount的Job并行度为5。 -- **算子级别** +- 2.1.4 **算子级别** 我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效 读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行 -- 2.45.2