修改内存模型
This commit is contained in:
parent
9cd725ea3a
commit
521bde7684
@ -18,7 +18,7 @@ Flink使用了堆上内存和堆外内存。
|
|||||||
框架堆外内存、Task堆外内存、网络缓冲内存都在堆外的直接内存里面。
|
框架堆外内存、Task堆外内存、网络缓冲内存都在堆外的直接内存里面。
|
||||||
|
|
||||||
- 管理内存:Flink堆外内存的管理,用于管理排序,hash表,缓冲中间结果以及RocksDb 状态后端的本地内存。
|
- 管理内存:Flink堆外内存的管理,用于管理排序,hash表,缓冲中间结果以及RocksDb 状态后端的本地内存。
|
||||||
- JVM特有内存:JVM本身占用的内存,包括元数据和执行开销,
|
- JVM特有内存:JVM本身占用的内存,包括元数据和执行开销。
|
||||||
|
|
||||||
|
|
||||||
Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 网络缓冲内存 + 管理内存。
|
Flink 使用内存 = 框架堆内和堆外内存 + Task堆内和堆外内存 + 网络缓冲内存 + 管理内存。
|
||||||
@ -66,11 +66,12 @@ Task执行用户代码所使用的内存。
|
|||||||
|
|
||||||
` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小,则使用min/max
|
` Flink内存*fraction `,如果小于配置的min或者大于配置的max大小,则使用min/max
|
||||||
|
|
||||||
### 托管内存
|
### 管理内存
|
||||||
|
|
||||||
用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。
|
用于RocksDB 状态后端的本地内存和批的排序、hash、缓冲中间结果。
|
||||||
|
|
||||||
堆外:
|
***堆外***:
|
||||||
|
|
||||||
`taskmanager.memory.managed.fraction`,默认0.4。
|
`taskmanager.memory.managed.fraction`,默认0.4。
|
||||||
|
|
||||||
`taskmanager.memory.managed.size` ,默认为none。
|
`taskmanager.memory.managed.size` ,默认为none。
|
||||||
@ -88,31 +89,33 @@ Task执行用户代码所使用的内存。
|
|||||||
# 并行度设置
|
# 并行度设置
|
||||||
并行度的设置和具体的作业强关联。
|
并行度的设置和具体的作业强关联。
|
||||||
|
|
||||||
## 全局并行度
|
|
||||||
|
|
||||||
|
## 并行度设置
|
||||||
|
|
||||||
### 并行度设置:
|
- **flink-conf.yml设置**
|
||||||
|
|
||||||
1. flink-conf.yml 设置
|
|
||||||
在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下:
|
在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下:
|
||||||
```conf
|
```conf
|
||||||
parallelism.default: 5
|
parallelism.default: 5
|
||||||
```
|
```
|
||||||
2. env级别
|
- **env级别**
|
||||||
env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。
|
|
||||||
|
env的级别就是`Environment` 级别。也就是通过`ExecutionEnvironment` 来设置整体的Job并行度。
|
||||||
|
|
||||||
```java
|
```java
|
||||||
val env = Stream...
|
val env = Stream...
|
||||||
env.setParallelism(5);
|
env.setParallelism(5);
|
||||||
```
|
```
|
||||||
3. 客户端级别
|
- **客户端级别**
|
||||||
|
|
||||||
如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。
|
如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。
|
||||||
```bash
|
```bash
|
||||||
./bin/flink run -p 5 ../wordCount-java*.jar
|
./bin/flink run -p 5 ../wordCount-java*.jar
|
||||||
```
|
```
|
||||||
-p即设置WordCount的Job并行度为5。
|
-p 即设置WordCount的Job并行度为5。
|
||||||
|
|
||||||
|
- **算子级别**
|
||||||
|
|
||||||
4. 算子级别
|
|
||||||
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效
|
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效
|
||||||
读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行
|
读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行
|
||||||
度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置:
|
度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置:
|
||||||
@ -125,7 +128,7 @@ text.keyBy(XXX)
|
|||||||
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
|
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
|
||||||
```
|
```
|
||||||
|
|
||||||
从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别
|
从优先级上来看: **算子级别 > env级别 > Client级别 > 系统默认级别**
|
||||||
|
|
||||||
并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。
|
并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user