From b93205fef3bf686aac1222cbe89501dd6438278d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=B0=8F=E4=BB=A4=E7=AB=A5=E9=9E=8B?= Date: Sun, 1 Jan 2023 17:09:26 +0800 Subject: [PATCH] =?UTF-8?q?=E5=B9=B6=E8=A1=8C=E5=BA=A6=E8=B0=83=E4=BC=98?= =?UTF-8?q?=E5=8F=82=E6=95=B0=20(#6)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 9 ++++++++- 调优/Resource.md | 48 +++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 53 insertions(+), 4 deletions(-) diff --git a/README.md b/README.md index 614e4de..0f91573 100644 --- a/README.md +++ b/README.md @@ -1,8 +1,15 @@ # 简介 +

+ +

+ +本项目用于学习Flink所做的笔记,便于以后查看复习。没有在Redis源码中直接写注释的原因时源码太多了,学习成本有点高,不适合我这种虽然比较菜,但是爱学习的人学习。 # 内存调优 - +- [内存等资源调优](./调优/Resource.md) +- [状态和CheckPoint 调优](./调优/CheckPoint.md) +- Flink SQL 调优 diff --git a/调优/Resource.md b/调优/Resource.md index 18b161f..b556c6c 100644 --- a/调优/Resource.md +++ b/调优/Resource.md @@ -82,9 +82,51 @@ Task执行用户代码所使用的内存。 - 单个TaskManager内存大小为2-8G之间。 -# 合理CPU设置 - - # 并行度设置 +并行度的设置和具体的作业强关联。 + +## 全局并行度 + + +### 并行度设置: + +1. flink-conf.yml 设置 +在我们提交一个Job的时候如果没有考虑并行度的话,那么Flink会使用默认配置文件中的并行度。配置如下: +```conf +parallelism.default: 5 +``` +2. env级别 +env的级别就是Environment级别。也就是通过Execution Environment来设置整体的Job并行度。 + +```java +val env = Stream... +env.setParallelism(5); +``` +3. 客户端级别 +如果在执行Job时候,发现代码中没有设置并行度而又不修改配置文件的话,可以通过Client来设置Job的并行度。 +```bash +./bin/flink run -p 5 ../wordCount-java*.jar +``` +-p即设置WordCount的Job并行度为5。 + +4. 算子级别 +我们在编写Flink项目时,可能对于不同的Operator设置不同的并行度,例如为了实现读取Kafka的最高效 +读取需要参考Kafka的partition的数量对并行度进行设置,在Sink时需要对于Sink的介质设置不同的并行 +度。这样就会存在一个Job需要有多个并行度。这样就需要用到算子级别的并行度设置: + +```java +val env = Stream... +val text = ... +text.keyBy(XXX) + .flatMap(XXX).setParallelism(5) //计算时设置为5 + .addSink(XXXXX).setParallelism(1) //写入数据库时候设置为1 +``` + +从优先级上来看: 算子级别 > env级别 > Client级别 > 系统默认级别 + +并行度的高级别会覆盖低级别的配置。例如在算子中设置的策略会覆盖配置文件中的parallelism。 + +在实际的使用中,我们需要设置合理的并行度来保证数据的高效处理,在一般情况下例如source,Sink等 +可能会需要不同的并行度来保证数据的快速读取与写入负载等。