12、离线compaction

        MOR表的compaction默认是自动打开的,策略是5个commits执行一次压缩。因为压缩操作比较耗费内存,和写流程放在同一个pipeline,在数据量比较大的时候(10w+/sqps),容易干扰写流程,此时采用离线定时任务的方式执行compaction任务更稳定。

12.1、设置参数

  • compaction.async.enabled为false,关闭在线compaction。
  • compaction.schedule.enabled仍然保持开启,由写任务阶段性触发压缩plan。

12.2、原理

        一个compaction的任务的执行包括两部分:

  •  schedule压缩plan

        该过程推荐由写任务定时触发,写参数compaction.schedule.enabled默认开启

  • 执行对应的压缩plan

12.3、使用方式

1、执行命令

        离线compaction需要手动执行Java程序,程序入口:

  • hudi-flink1.13-bundle-0.12.0.jar
  • org.apache.hudi.sink.compact.HoodieFlinkCompactor
// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop3:9000/table

2、参数配置

参数名

required

默认值

备注

--path

true

--

目标表的路径

--compaction-tasks

false

-1

压缩task的并发,默认是待压缩file group的数量

--compaction-max-memory

false

100 (单位 MB

压缩时log数据的索引map,默认100MB,内存足够可以开大些

--schedule

false

false

是否要执行schedule compaction的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的compaction plan默认是一直 schedule的,除非手动关闭(默认 5 commits 一次压缩)

--seq

false

LIFO

执行压缩任务的顺序,默认是从最新的压缩 plan 开始执行,可选值:

LIFO: 从最新的 plan 开始执行;

FIFO: 从最老的 plan 开始执行

--service

false

false

是否开启service模式,service模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从 0.11 开始执行)

--min-compaction-interval-seconds

false

600 (单位 秒)

service模式下的执行间隔,默认10分钟

3、案例演示

create table t7(
  id int,
  ts int,
  primary key (id) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t7',
  'compaction.async.enabled' = 'false',
  'compaction.schedule.enabled' = 'true',
  'table.type' = 'MERGE_ON_READ'
);


insert into t7 values(1,1);
insert into t7 values(2,2);
insert into t7 values(3,3);
insert into t7 values(4,4);
insert into t7 values(5,5);


// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.compact.HoodieFlinkCompactor lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:9000/tmp/hudi_catalog/default/t7

13、离线Clustering

        异步的clustering相对于online的async clustering资源隔离,从而更加稳定。

13.1、设置参数

  • clustering.async.enabled为false,关闭在线clustering。
  • clustering.schedule.enabled仍然保持开启,由写任务阶段性触发clustering plan。

13.2、原理

        一个 clustering 的任务的执行包括两部分:

  • schedule plan 

    推荐由写任务定时触发,写参数 clustering.schedule.enabled 默认开启。

  • 执行对应的 plan

13.3、使用方式

1、执行命令

        离线clustering需要手动执行Java程序,程序入口:

  • hudi-flink1.13-bundle-0.12.0.jar
  • org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob

        注意:必须是分区表,否则报错空指针异常。

// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:9000/table

2、参数配置

参数名

required

默认值

备注

--path

true

--

目标表的路径。

--clustering-tasks

false

-1

Clustering task的并发,默认是待压缩file group的数量。

--schedule

false

false

是否要执行schedule clustering plan的操作,当写流程还在持续写入表数据的时候,开启这个参数有丢失查询数据的风险,所以开启该参数一定要保证当前没有任务往表里写数据, 写任务的clustering plan默认是一直 schedule 的,除非手动关闭(默认 4 commits 一次 clustering)。

--seq

false

FIFO

执行压缩任务的顺序,默认是从最老的clustering plan 开始执行,可选值:

LIFO: 从最新的plan开始执行;

FIFO: 从最老的plan开始执行

--target-file-max-bytes

false

1024 * 1024 * 1024

最大目标文件,默认1GB

--small-file-limit

false

600

小于该大小的文件会参与clustering,默认600MB

--sort-columns

false

N/A

Clustering可选排序列。

--service

false

false

是否开启service模式,service模式会打开常驻进程,一直监听压缩任务并提交到集群执行(从0.11开始执行)。

--min-compaction-interval-seconds

false

600 (单位 秒)

service模式下的执行间隔,默认10分钟。

3、案例演示

 

create table t8(
  id int,
  age int,
  ts int,
  primary key (id) not enforced
) partitioned by (age)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t8',
  'clustering.async.enabled' = 'false',
  'clustering.schedule.enabled' = 'true',
  'table.type' = 'COPY_ON_WRITE'
);


insert into t8 values(1,18,1);
insert into t8 values(2,18,2);
insert into t8 values(3,18,3);
insert into t8 values(4,18,4);
insert into t8 values(5,18,5);


// 命令行的方式
./bin/flink run -c org.apache.hudi.sink.clustering.HoodieFlinkClusteringJob lib/hudi-flink1.13-bundle-0.12.0.jar --path hdfs://hadoop1:9000/tmp/hudi_catalog/default/t8

最终会生成新的文件。历史文件最终而不是立即被clean掉。

14、常见基础问题

14.1、存储一直看不到数据

        如果是streaming写,请确保开启checkpoint,Flink的writer有3种刷数据到磁盘的策略:

  • 当某个bucket在内存积攒到一定大小(可配,默认64MB)
  • 当总的buffer大小积攒到一定大小(可配,默认1GB)
  • 当checkpoint触发,将内存里的数据全部flush出去

14.2、数据有重复

        如果是COW写,需要开启参数write.insert.drop.duplicates,COW写每个bucket的第一个文件默认是不去重的,只有增量的数据会去重,全局去重需要开启该参数;MOR写不需要开启任何参数,定义好primary key后默认全局去重。(注意:从0.10版本开始,该属性改名write.precombine并且默认为true。)
        如果需要多partition去重,需要开启参数:index.global.enabled为true。(注意:从0.10版本开始,该属性默认为true。)
        索引index是判断数据重复的核心数据结构,index.state.ttl设置了索引保存的时间,默认为1.5天,对于长时间周期的更新,比如更新一个月前的数据,需要将index.state.ttl调大(单位天),设置小于0代表永久保存。(注意:从0.10版本开始,该属性默认为0。)

14.3、Merge On Read写只有log文件

        Merge On Read默认开启了异步的compaction,策略是5个commits压缩一次,当条件满足参会触发压缩任务,另外,压缩本身因为耗费资源,所以不一定能跟上写入效率,可能会有滞后。
        可以先观察log,搜索compaction关键词,看是否有compact任务调度:
        After filtering, Nothing to compact for关键词说明本次compaction strategy是不做压缩。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐