zl程序教程

您现在的位置是:首页 >  后端

当前栏目

Hudi(14):Hudi集成Flink之核心参数设置

集成flink 核心 14 hudi 参数设置
2023-09-14 09:01:25 时间

目录

0. 相关文章链接

1. 去重参数

2. 并发参数

2.1. 参数说明

2.2. 案例演示

3. 压缩参数

3.1. 参数说明

3.2. 案例演示

4. 文件大小

4.1. 参数说明

4.2. 案例演示

5. Hadoop 参数


Flink可配参数官网地址:All Configurations | Apache Hudi

0. 相关文章链接

 Hudi文章汇总 

1. 去重参数

通过如下语法设置主键:

-- 设置单个主键
create table hoodie_table (
  f0 int primary key not enforced,
  f1 varchar(20),
  ...
) with (
  'connector' = 'hudi',
  ...
)

-- 设置联合主键
create table hoodie_table (
  f0 int,
  f1 varchar(20),
  ...
  primary key(f0, f1) not enforced
) with (
  'connector' = 'hudi',
  ...
)

名称

说明

默认值

备注

hoodie.datasource.write.recordkey.field

主键字段

--

支持主键语法 PRIMARY KEY 设置,支持逗号分隔的多个字段

precombine.field

(0.13.0 之前版本为

 write.precombine.field)

去重时间字段

--

record 合并的时候会按照该字段排序,选值较大的 record 为合并结果;不指定则为处理序:选择后到的 record

2. 并发参数

2.1. 参数说明

名称

说明

默认值

备注

write.tasks

writer 的并发,每个 writer 顺序写 1~N buckets

4

增加并发对小文件个数没影响

write.bucket_assign.tasks

bucket assigner 的并发

Flink的并行度

增加并发同时增加了并发写的 bucekt 数,也就变相增加了小文件( bucket)

write.index_bootstrap.tasks

Index bootstrap 算子的并发,增加并发可以加快 bootstrap 阶段的效率,bootstrap 阶段会阻塞 checkpoint,因此需要设置多一些的 checkpoint 失败容忍次数

Flink的并行度

只在 index.bootstrap.enabled true 时生效

read.tasks

读算子的并发(batch stream

4

compaction.tasks

online compaction 算子的并发

writer 的并发

online compaction 比较耗费资源,建议走 offline compaction

2.2. 案例演示

可以flink建表时在with中指定,或Hints临时指定参数的方式:在需要调整的表名后面加上 /*+ OPTIONS() */

insert into t2 /*+ OPTIONS('write.tasks'='2','write.bucket_assign.tasks'='3','compaction.tasks'='4') */
select * from sourceT;

3. 压缩参数

3.1. 参数说明

        在线压缩的参数,通过设置 compaction.async.enabled =false关闭在线压缩执行,但是调度compaction.schedule.enabled 仍然建议开启,之后通过离线压缩直接执行 在线压缩任务 阶段性调度的压缩 plan。

名称

说明

默认值

备注

compaction.schedule.enabled

是否阶段性生成压缩 plan

true

建议开启,即使compaction.async.enabled 关闭的情况下

compaction.async.enabled

是否开启异步压缩

true

通过关闭此参数关闭在线压缩

compaction.tasks

压缩 task 并发

4

compaction.trigger.strategy

压缩策略

num_commits

支持四种策略:num_commitstime_elapsednum_and_time

num_or_time

compaction.delta_commits

默认策略,5 commits 压缩一次

5

compaction.delta_seconds

3600

compaction.max_memory

压缩去重的 hash map 可用内存

100MB

资源够用的话建议调整到 1GB

compaction.target_io

每个压缩 plan IO 上限,默认 5GB

500GB

3.2. 案例演示

CREATE TABLE t3(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t3',
  'compaction.async.enabled' = 'true',
  'compaction.tasks' = '1',
  'compaction.schedule.enabled' = 'true',
  'compaction.trigger.strategy' = 'num_commits',
  'compaction.delta_commits' = '2',

  'table.type' = 'MERGE_ON_READ'
);

set table.dynamic-table-options.enabled=true;
insert into t3
select * from sourceT/*+ OPTIONS('rows-per-second' = '5')*/;

        注意:如果没有按照 Hudi(12):Hudi集成Flink之sql-client方式 中的 1.3章节 yarn-session模式解决hadoop依赖冲突问题,那么无法compaction生成parquet文件,报错很隐晦,在Exception中看不到,要搜索TaskManager中关于compaction才能看到报错。

4. 文件大小

4.1. 参数说明

        Hudi会自管理文件大小,避免向查询引擎暴露小文件,其中自动处理文件大小起很大作用。在进行insert/upsert操作时,Hudi可以将文件大小维护在一个指定文件大小。目前只有 log 文件的写入大小可以做到精确控制,parquet 文件大小按照估算值。

名称

说明

默认值

备注

hoodie.parquet.max.file.size

最大可写入的 parquet 文件大小

120 * 1024 * 1024

默认 120MB

(单位 byte)

超过该大小切新的 file group

hoodie.logfile.to.parquet.compression.ratio

log文件大小转 parquet 的比率

0.35

hoodie 统一依据 parquet 大小来评估小文件策略

hoodie.parquet.small.file.limit

在写入时,hudi 会尝试先追加写已存小文件,该参数设置了小文件的大小阈值,小于该参数的文件被认为是小文件

104857600

默认 100MB

(单位 byte)

大于 100MB,小于 120MB 的文件会被忽略,避免写过度放大

hoodie.copyonwrite.record.size.estimate

预估的 record 大小,hoodie 会依据历史的 commits 动态估算 record 的大小,但是前提是之前有单次写入超过

hoodie.parquet.small.file.limit 大小,在未达到这个大小时会使用这个参数

1024

默认 1KB

(单位 byte)

如果作业流量比较小,可以设置下这个参数

hoodie.logfile.max.size

LogFile最大大小。这是在将Log滚转到下一个版本之前允许的最大大小。

1073741824

默认1GB

(单位 byte)

4.2. 案例演示

CREATE TABLE t4(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
)
WITH (
  'connector' = 'hudi',
  'path' = 'hdfs://hadoop1:8020/tmp/hudi_flink/t4',
  'compaction.tasks' = '1',
  'hoodie.parquet.max.file.size'= '10000',
  'hoodie.parquet.small.file.limit'='5000',

  'table.type' = 'MERGE_ON_READ'
);

set table.dynamic-table-options.enabled=true;
insert into t4
select * from sourceT /*+ OPTIONS('rows-per-second' = '5')*/;

5. Hadoop 参数

从 0.12.0 开始支持,如果有跨集群提交执行的需求,可以通过 sql 的 ddl 指定 per-job 级别的 hadoop 配置。

名称

说明

默认值

备注

hadoop.${you option key}

通过 hadoop.前缀指定 hadoop 配置项

--

支持同时指定多个 hadoop 配置项


注:其他Hudi相关文章链接由此进 ->  Hudi文章汇总