diff --git a/调优/flinkSql.md b/调优/flinkSql.md index e6807d2..68bbffb 100644 --- a/调优/flinkSql.md +++ b/调优/flinkSql.md @@ -42,9 +42,9 @@ configuration.setString(" table.exec.mini batch.size ", 20000); ``` -- table.exec.mini batch.enabled: 开启 miniBatch的参数。 -- table.exec.mini batch.allow latency: 批量输出的间隔时间。 -- table.exec.mini batch.size: 防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为2 万条。 +- `table.exec.mini batch.enabled`: 开启 miniBatch的参数。 +- `table.exec.mini batch.allow-latency`: 批量输出的间隔时间。 +- `table.exec.mini batch.size`: 防止 OOM 设置每个批次最多缓存数据的条数 ,可以设为2 万条。 注意: @@ -106,5 +106,95 @@ config uration.setString("table.optimizer.agg-phase-strategy", "TWO_PHASE"); # 开启Split Distinct +LocalGlobal优化针对普通聚合(例如 SUM 、 COUNT 、 MAX 、 MIN 和 AVG )有较好的效果。对于 DISTINCT 的聚合(如 +COUNT DISTINCT 收效不明显,因为 COUNT DISTINCT 在 Local 聚合时,对于 DISTINCT KEY 的去重率不高,导致在 +Global 节点仍然存在热点。 + +## 原理介绍 + +为了解决COUNT DISTINCT 的热点问题,通常需要手 动改写为两层聚合(增加按 Distinct Key取模的打散层)。 + +从 Flink1.9.0 版本开始,提供了 COUNT DISTINCT 自动打散功能, 通过HASH_CODE(distinct_key) % BUCKET_NUM 打散, +不需要手动重写。Split Distinct 和LocalGlobal 的原理对比参见下图。 + +![pic](./flinksql0002.png) + +Distinct举例 + +```sql +SELECT + a,COUNT(DISTINCT b) +FROM + T +GROUP BY a +``` + +使用SQL语句手动打散: + +```sql +SELECT a,SUM(cnt) +FROM ( + SELECT a,COUNT(DISTINCT b ) as cnt + FROM T + GROUP BY a,MOD(HASH_CODE(b), 1024) +) +GROUP BY a +``` + +## 特性开启 + +默认不开启,使用参数显式开启。 + +- `table.optimizer.distinct agg.split.enabled: true`: 默认 false 。 +- `table.optimizer.distinct agg.split.bucket num: Split Distinct`: 优化在第一层聚合中,被打散的bucket 数目。默认 1024。 + +```java +// 初始化 table environment +TableEnvironment tEnv = ... +// 获取 tableEnv 的配置对象 +Configuration configuration = tEnv.getConfig().getConfiguration(); +// 设置参数:要结合 minibatch 一起 使用 +// 开启 Split Distinct +configuration.setString("table.optimizer.distinct-agg.split.enabled", "true"); +// 第一层 打 散 的 bucket 数目 +configuration.setString("table.optimizer.distinct-agg.split.bucket-num", "1024"); +``` + +### 注意事项 + +- (1)目前不能在包含 UDAF 的 Flink SQL 中使用 Split Distinct 优化方法。 +- (2)拆分出来的两个 GROUP 聚合还可参与 LocalGlobal 优化。 +- (3)该功能在Flink 1.9.0 版本 及以上版本才支持。 +# 多维DISTINCT 使用Filter + +在某些场景下,可能需要从不同维度来统计count distinct )的结果 (比如统计 uv 、app 端的 uv 、 web 端的 uv 可能 +会使用如下 CASE WHEN 语法 。 + +```sql +SELECT + a, + COUNT(DISTINCT b ) AS total_ b, + COUNT(DISTINCT CASE WHEN c IN ('A', 'B') THEN b ELSE NULL END) AS AB b, + COUNT(DISTINCT CASE WHEN c IN ('C', 'D') THEN b ELSE NULL END) AS CD_b +FROM T +GROUP BY a +``` + +在这种情况下,建议使用FILTER 语法 , 目前的 Flink SQL 优化器可以识别同一唯一键上的不同 FILTER 参数。如,在上 +面的示例中,三个 COUNT DISTINCT 都作用在 b 列上。此时,经过优化器识别后,Flink 可以只使用一个共享状态实例, +而不是三个状态实例,可减少状态的大小和对状态的访问。 + +将上边的CASE WHEN 替换成 FILTER 后 ,如下所示: + +```sql +SELECT + a, + COUNT(DISTINCT b ) AS b, + COUNT(DISTINCT b ) FILT ER (WHERE c IN ('A', 'B')) AS AB_b, + COUNT(DISTINCT b ) FILTER (WHERE c IN ('C', 'D')) AS CD b +FROM T +GROUP BY a +``` + diff --git a/调优/flinksql0002.png b/调优/flinksql0002.png new file mode 100644 index 0000000..3bc368c Binary files /dev/null and b/调优/flinksql0002.png differ