From 99ffe1b8e11614d18e2cc49b61f64fcb7e7395b0 Mon Sep 17 00:00:00 2001 From: zeekling Date: Sun, 8 Oct 2023 15:15:14 +0000 Subject: [PATCH] =?UTF-8?q?Hudi=E5=85=83=E6=95=B0=E6=8D=AE=20(#34)?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. 增加Hudi学习网站。 2. 增加Hudi参数调优章节。 引用:https://git.zeekling.cn/big-data/flink_book/issues/25 Reviewed-on: https://git.zeekling.cn/big-data/flink_book/pulls/34 --- hudi/Hudi参数调优.md | 118 +++++++++++++++++++++++++++++++++++++++++++ hudi/reference.md | 1 + 2 files changed, 119 insertions(+) create mode 100644 hudi/Hudi参数调优.md diff --git a/hudi/Hudi参数调优.md b/hudi/Hudi参数调优.md new file mode 100644 index 0000000..a6b4b85 --- /dev/null +++ b/hudi/Hudi参数调优.md @@ -0,0 +1,118 @@ + +## 内存 + +|参数名称 | 描述 | 默认值 | 备注 | +|----|----|----|----| +| write.task.max.size | 写任务的最大内存(以MB为单位),当达到阈值时,它刷新最大大小的数据桶以避免OOM。 | 1024 | 为写缓冲区预留的内存为write.task.max.size - compact .max_memory。当写任务的总缓冲区达到阈值时,将刷新内存中最大的缓冲区 | +| write.batch.size | Flink支持到达一定阈值之后,将数据写入到Hudi | 64 | 推荐使用默认值 | +| write.log_block.size | hudi的日志写入器接收到消息后不会立刻flush数据,写入器以LogBlock为单位将数据刷新到磁盘。 | 128 | 推荐使用默认值 | +| write.merge.max_memory | 对于COW表,Hudi将会合并增量数据和base文件数据。增量数据将会被缓存和溢写磁盘。 | 100 | 推荐使用默认值 | +| compaction.max_memory | Compaction期间占用的最大内存 | 100 | 如果是在线压缩,则可以在资源足够时打开它,例如设置为1024MB | + + +## 并行度 + +|参数名称 | 描述 | 默认值 | 备注 | +|----|----|----|----| +| write.tasks | 写入器任务的并行度,每个任务依次向1到N个桶写。 | 4 | 增加并行度对小文件的数量没有影响| +| write.bucket_assign.tasks | 桶分配操作符的并行性。
无默认值,使用Flink parallelism.default | parallelism.default | 增加并行度也会增加桶的数量,
从而增加小文件(小桶)的数量。 | +| write.index_boostrap.tasks | index bootstrap的并行度,增加并行度可以提高bootstarp阶段的效率。 | parallelism.default | 只有当index.bootstrap .enabled为true时才生效 | +| read.tasks | 读操作的并行度(批和流) | / | /| +| compaction.tasks | 实时compaction的并行度,默认为10 | 10 | Online compaction 会占用写任务的资源,推荐使用offline compaction | + + +## Compaction + +通过设置compaction.async.enabled = false关闭在线压缩,但我们仍然建议对写作业启用compaction.schedule.enable。可以通过离线压缩来执行压缩计划。 + + +|参数名称 | 描述 | 默认值 | 备注 | +|----|----|----|----| +| compaction.schedule.enabled | 是否定期生成compaction计划 | true | 即使compaction.async.enabled = false,也建议打开 | +| compaction.async.enabled | 异步压缩,MOR默认启用 | true | 通过关闭此选项来关闭offline compaction | +| compaction.trigger.strategy | 触发compaction的策略 | num_commits | -- | +| compaction.delta_commits | 触发压缩所需的最大delte提交,默认为5次提交 | 5 | -- | +| compaction.delta_seconds | 触发压缩所需的最大增量秒数,默认为1小时 | 3600 | -- | +| compaction.max_memory | compaction溢出映射的最大内存(以MB为单位),默认为100MB | 100 | 有足够的资源,建议调整到1024MB | +| compaction.target_io | 每次压缩的目标IO(读和写),默认为5GB | 5120 | offline compaction 的默认值是500GB | + + +### 触发compaction的策略 + +- num_commits: 当达到N个delta提交时触发压缩。 +- time_elapsed:当距离上次压缩时间> N秒时触发压缩。 +- num_and_time:当满足NUM_COMMITS和TIME_ELAPSED时,进行trigger压缩。 +- num_or_time: 在满足NUM_COMMITS或TIME_ELAPSED时触发压缩。 + +## Memory Optimization + +### MOR 表 +- Flink的状态后端设置为RocksDB,默认为内存。 +- 如果有足够的内存,compaction.max_memory可以设置大于100MB建议调整到1024MB。 +- 在配置TM内存的时候,需要保证每个写任务都能分配到write.task.max.size对应的内存。需要保留一部分内存给TM的网络缓冲区和其他类型的任务(如bucketAssignFunctio)使用。 +- 需要注意compaction内存的变化,compaction.max_memory控制的是压缩任务读取日志时可以使用每个任务的最大内存,ompaction.tasks控制的是压缩任务的并行性。 + + +### COW表 + +- Flink的状态后端设置为RocksDB,默认为内存。 +- 增大write.task.max.size和write.merge.max_memory(默认1024MB和100MB,调整为2014MB和1024MB) +- 在配置TM内存的时候,需要保证每个写任务都能分配到write.task.max.size对应的内存。需要保留一部分内存给TM的网络缓冲区和其他类型的任务(如bucketAssignFunctio)使用。 + + +## Bulk Insert + +- 用于快照数据导入。如果快照数据来自其他数据源,可以使用bulk_insert模式将快照数据快速导入到Hudi中。 +- bulk_insert消除了序列化和数据合并。用户无需重复数据删除,因此需要保证数据的唯一性。 +- bulk_insert在批处理执行模式下效率更高。批处理执行方式根据分区路径对输入记录进行排序,并将这些记录写入Hudi,避免了频繁切换文件句柄导致的写性能下降。有序写入一个分区中不会频繁写换对应的数据分区。 +- bulk_insert的并行度由write.tasks指定。并行度会影响小文件的数量。 + 从理论上讲,bulk_insert的并行性是bucket的数量(特别是,当每个bucket写到最大文件大小时,它将转到新的文件句柄。最后,文件的数量>= write.bucket_assign.tasks)。 + + +|参数名称 | 描述 | 默认值 | 备注 | +|----|----|----|----| +| write.operation | | upsert | | +| write.bulk_insert.shuffle_by_partition | 写入前是否根据分区字段进行shuffle。启用此选项将减少小文件的数量,但可能存在数据倾斜的风险 | true | | +| write.bulk_insert.sort_by_partition | 写入前是否根据分区字段对数据进行排序。启用此选项将在写任务写多个分区时减少小文件的数量 | true | | +| write.sort.memory | Sort时使用的最大内存 | 128 | | + + + +## Index Bootstrap + +- 用于snapshot data+incremental data导入的需求。如果snapshot data已经通过bulk insert插入到Hudi中。通过Index Bootstrap功能,用户可以实时插入incremental data,保证数据不重复,构造离线数据indexState +- 可以在写入快照数据的同时增加资源以流模式写入,然后减少资源以写入增量数据(或打开速率限制函数)。 + +|参数名称 | 描述 | 默认值 | 备注 | +|----|----|----|----| +| index.bootstrap.enabled | 启index.bootstrap.enabled时,Hudi表中的剩余记录将一次性加载到Flink状态 | false | | +| index.partition.regex | 优化选择。设置正则表达式来过滤分区。默认情况下,所有分区都被加载到flink状态 | * | | + +## 模式 + + +### Changelog Mode + +Hudi可以保留消息的所有中间变化(+I / -U / +U / -D),然后通过flink的状态计算消费,从而拥有一个接近实时的数据仓库ETL管道(增量计算)。 + +|参数名称 | 描述 | 默认值 | +|----|----|----| +| changelog.enabled | 它在默认情况下是关闭的,为了拥有upsert语义,只有合并的消息被确保保留,中间的更改可以被合并。设置为true以支持使用所有更改 | true | + +- 批处理(快照)读取仍然合并所有中间更改,不管格式是否存储了中间更改日志消息。 +- changelog.enable设置为true后,更改日志记录的保留只是最好的工作:异步压缩任务将更改日志记录合并到一个记录中,因此,如果流源不及时使用,则压缩后只能读取每个键的合并记录。 + 解决方案是通过调整compact策略,比如压缩选项:compress.delta_commits和compression .delta_seconds,为读取器保留一些缓冲时间。 + + + +### Insert Mode + +- 默认情况下,Hudi对插入模式采用小文件策略:MOR将增量记录追加到日志文件中,COW合并 base parquet文件(增量数据集将被重复数据删除)。这种策略会导致性能下降。 +- 如果要禁止文件合并行为,可将write.insert.deduplicate设置为false,则跳过重复数据删除。每次刷新行为直接写入一个new parquet文件(MOR表也直接写入parquet文件)。 +- 适用于能否在外部保证写入hudi cow表的数据是单调递增的或者可以不在乎重复数据的情况,但是可能会存在小文件问题。 + +|参数名称 | 描述 | 默认值 | +|----|----|----| +| write.insert.deduplicate | Insert mode 默认启用重复数据删除功能。关闭此选项后,每次刷新行为直接写入一个now parquet文件 | true | + + diff --git a/hudi/reference.md b/hudi/reference.md index 08fbf1e..8d20ace 100644 --- a/hudi/reference.md +++ b/hudi/reference.md @@ -5,4 +5,5 @@ |----|----|----| | hudi-resources | https://github.com/zeekling/hudi-resources | 汇总Apache Hudi相关资料 | | 官网 | https://hudi.apache.org/cn/docs/0.13.0/overview | 中文官网| +| Hudi官方文档翻译 | https://hudi.apachecn.org/docs/master/concepts.html |