并行度调优参数 (#6)
This commit is contained in:
parent
100dae0cca
commit
b93205fef3
@ -1,8 +1,15 @@
|
|||||||
|
|
||||||
# 简介
|
# 简介
|
||||||
|
|
||||||
|
<p align="center"><a title="小令童鞋" target="_blank" href="https://github.com/zeekling/flink_book"><img src="https://img.shields.io/github/last-commit/zeekling/redis_book.svg?style=flat-square&color=FF9900"></a>
|
||||||
|
<a title="GitHub repo size in bytes" target="_blank" href="https://github.com/zeekling/redis_book"><img src="https://img.shields.io/github/repo-size/zeekling/redis_book.svg?style=flat-square"></a>
|
||||||
|
<a title="Hits" target="_blank" href="https://github.com/zeekling/hits"><img src="https://hits.b3log.org/zeekling/redis_book.svg"></a></p>
|
||||||
|
|
||||||
|
本项目用于学习Flink所做的笔记,便于以后查看复习。没有在Redis源码中直接写注释的原因时源码太多了,学习成本有点高,不适合我这种虽然比较菜,但是爱学习的人学习。
|
||||||
|
|
||||||
# 内存调优
|
# 内存调优
|
||||||
|
|
||||||
|
- [内存等资源调优](./调优/Resource.md)
|
||||||
|
- [状态和CheckPoint 调优](./调优/CheckPoint.md)
|
||||||
|
- Flink SQL 调优
|
||||||
|
|
||||||
|
@ -82,9 +82,51 @@ Task执行用户代码所使用的内存。
|
|||||||
- 单个TaskManager内存大小为2-8G之间。
|
- 单个TaskManager内存大小为2-8G之间。
|
||||||
|
|
||||||
|
|
||||||
# 合理CPU设置
|
|
||||||
|
|
||||||
|
|
||||||
# 并行度设置
|
# 并行度设置
|
||||||
|
并行度的设置和具体的作业强关联。
|
||||||
|
|
||||||
|
## 全局并行度
|
||||||
|
|
||||||
|
|
||||||
|
### 并行度设置:
|
||||||
|
|
||||||
|
1. flink-conf.yml 设置
|
||||||
|
在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下:
|
||||||
|
```conf
|
||||||
|
parallelism.default: 5
|
||||||
|
```
|
||||||
|
2. env级别
|
||||||
|
env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。
|
||||||
|
|
||||||
|
```java
|
||||||
|
val env = Stream...
|
||||||
|
env.setParallelism(5);
|
||||||
|
```
|
||||||
|
3. 客户端级别
|
||||||
|
如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。
|
||||||
|
```bash
|
||||||
|
./bin/flink run -p 5 ../wordCount-java*.jar
|
||||||
|
```
|
||||||
|
-p即设置WordCount的Job并行度为5。
|
||||||
|
|
||||||
|
4. 算子级别
|
||||||
|
我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效
|
||||||
|
读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行
|
||||||
|
度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置:
|
||||||
|
|
||||||
|
```java
|
||||||
|
val env = Stream...
|
||||||
|
val text = ...
|
||||||
|
text.keyBy(XXX)
|
||||||
|
.flatMap(XXX).setParallelism(5) //计算时设置为5
|
||||||
|
.addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1
|
||||||
|
```
|
||||||
|
|
||||||
|
从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别
|
||||||
|
|
||||||
|
并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。
|
||||||
|
|
||||||
|
在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等
|
||||||
|
可能会需要不同的并行度来保证数据的快速读取与写入负载等。
|
||||||
|
|
||||||
|
|
||||||
|
Loading…
Reference in New Issue
Block a user