6、读取方式

6.1、流读(Streaming Query)

        当前表默认是快照读取,即读取最新的全量快照数据并一次性返回。通过参数 read.streaming.enabled 参数开启流读模式,通过 read.start-commit 参数指定起始消费位置,支持指定 earliest 从最早消费。

1、WITH参数

名称

Required

默认值

说明

read.streaming.enabled

false

false

设置 true 开启流读模式

read.start-commit

false

最新 commit

指定 'yyyyMMddHHmmss' 格式的起始 commit(闭区间)

read.streaming.skip_compaction

false

false

流读时是否跳过 compaction 的 commits,跳过 compaction 有两个用途:

1)避免 upsert 语义下重复消费 (compaction 的 instant 为重复数据,如果不跳过,有小概率会重复消费)

2) changelog 模式下保证语义正确性

0.11开始,以上两个问题已经通过保留 compaction instant time 修复

clean.retain_commits

false

10

cleaner最多保留的历史commits数,大于此数量的历史commits会被清理掉,changelog模式下,这个参数可以控制changelog的保留时间,例如checkpoint周期为5分钟一次,默认最少保留50分钟的时间。

        注意:当参数read.streaming.skip_compaction打开并且streaming reader消费落后于clean.retain_commits数时,流读可能会丢失数据。从0.11开始,compaction不会再变更record的 instant time,因此理论上数据不会再重复消费,但是还是会重复读取并丢弃,因此额外的开销还是无法避免,对性能有要求的话还是可以开启此参数。 

CREATE TABLE t5(
  uuid VARCHAR(20) PRIMARY KEY NOT ENFORCED,
  name VARCHAR(10),
  age INT,
  ts TIMESTAMP(3),
  `partition` VARCHAR(20)
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t5',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'   -- 默认60s
);


insert into t5 select * from sourceT;

select * from t5;

6.2、增量读取(Incremental Query)

        从 0.10.0 开始支持。

        如果有增量读取 batch 数据的需求,增量读取包含三种场景。

        (1)Stream 增量消费,通过参数 read.start-commit 指定起始消费位置;

        (2)Batch 增量消费,通过参数 read.start-commit 指定起始消费位置,通过参数 read.end-commit 指定结束消费位置,区间为闭区间,即包含起始、结束的 commit

        (3)TimeTravel:Batch 消费某个时间点的数据:通过参数 read.end-commit 指定结束消费位置即可(由于起始位置默认从最新,所以无需重复声明)

        WITH 参数

名称

Required

默认值

说明

read.start-commit

false

默认从最新 commit

支持 earliest 从最早消费

read.end-commit

false

默认到最新 commit

7、限流

        如果将全量数据(百亿数量级)和增量先同步到kafka,再通过flink流式消费的方式将库表数据直接导成hoodie表,因为直接消费全量部分数据:量大(吞吐高)、乱序严重(写入的partition随机),会导致写入性能退化,出现吞吐毛刺,这时候可以开启限速参数,保证流量平稳写入。
        WITH参数

名称

Required

默认值

说明

write.rate.limit

false

0

默认关闭限速

8、写入方式

8.1、CDC数据同步

        CDC数据保存了完整的数据库变更,当前可通过两种途径将数据导入hudi:

        第一种:通过cdc-connector直接对接DB的binlog将数据导入hudi,优点是不依赖消息队列,缺点是对db server造成压力。
        第二种:对接cdc format消费kafka数据导入hudi,优点是可扩展性强,缺点是依赖kafka。
        注意:如果上游数据无法保证顺序,需要指定write.precombine.field字段

8.2、离线批量导入

        如果存量数据来源于其他数据源,可以使用批量导入功能,快速将存量数据导成 Hoodie 表格式。

1、原理

  1. 批量导入省去了avro的序列化以及数据的merge过程,后续不会再有去重操作,数据的唯一性需要自己来保证。
  2. bulk_insert需要在Batch Execuiton Mode下执行更高效,Batch模式默认会按照partition path排序输入消息再写入Hoodie,避免file handle频繁切换导致性能下降。
    1. SET execution.runtime-mode=batch;
    2. SET execution.checkpointing.interval=0;
  3. bulk_insert write task的并发通过参数write.tasks指定,并发的数量会影响到小文件的数量,理论上,bulk_insert write task的并发数就是划分的bucket数,当然每个bucket在写到文件大小上限(parquet 120MB)的时候会rollover到新的文件句柄,所以最后:写文件数量>=bulk_insert write task数。

2、WITH参数

名称

Required

默认值

说明

write.operation

true

upsert

配置 bulk_insert 开启该功能

write.tasks

false

4

bulk_insert 写 task 的并发,最后的文件数 >=write.tasks

write.bulk_insert.shuffle_by_partition

write.bulk_insert.shuffle_input

(从 0.11 开始)

false

true

是否将数据按照 partition 字段 shuffle 再通过 write task 写入,开启该参数将减少小文件的数量 但是可能有数据倾斜风险

write.bulk_insert.sort_by_partition

write.bulk_insert.sort_input

(从 0.11 开始)

false

true

是否将数据线按照 partition 字段排序再写入,当一个 write task 写多个 partition,开启可以减少小文件数量

write.sort.memory

128

sort 算子的可用 managed memory(单位 MB)

8.3、全量接增量

        如果已经有全量的离线Hoodie表,需要接上实时写入,并且保证数据不重复,可以开启index bootstrap功能。
        如果觉得流程冗长,可以在写入全量数据的时候资源调大直接走流模式写,全量走完接新数据再将资源调小(或者开启限流功能)。

        WITH参数

名称

Required

默认值

说明

index.bootstrap.enabled

true

false

开启索引加载,会将已存表的最新数据一次性加载到 state 中

index.partition.regex

false

*

设置正则表达式进行分区筛选,默认为加载全部分区

9、写入模式

9.1、Changelog模式

        如果希望Hoodie保留消息的所有变更(I/-U/U/D),之后接上Flink引擎的有状态计算实现全链路近实时数仓生产(增量计算),Hoodie的MOR表通过行存原生支持保留消息的所有变更(format层面的集成),通过流读MOR表可以消费到所有的变更记录。

1、WITH参数

名称

Required

默认值

说明

changelog.enabled

false

false

默认是关闭状态,即 UPSERT 语义,所有的消息仅保证最后一条合并消息,中间的变更可能会被 merge 掉;改成 true 支持消费所有变更。

        批(快照)读仍然会合并所有的中间结果,不管format是否已存储中间状态。

        开启changelog.enabled参数后,中间的变更也只是Best Effort:异步的压缩任务会将中间变更合并成1条,所以如果流读消费不够及时,被压缩后只能读到最后一条记录。当然,通过调整压缩的buffer时间可以预留一定的时间buffer给reader,比如调整压缩的两个参数:

        compaction.delta_commits:5 

        compaction.delta_seconds: 3600

        说明:
        Changelog 模式开启流读的话,要在 sql-client 里面设置参数:
        set sql-client.execution.result-mode=tableau; 
        或者
        set sql-client.execution.result-mode=changelog;
        否则中间结果在读的时候会被直接合并。(参考:https://nightlies.apache.org/flink/flink-docs-release-1.13/docs/dev/table/sqlclient/#running-sql-queries)

2、流读 changelog

        仅在0.10.0支持,本feature为实验性。

        开启changelog模式后,hudi会保留一段时间的changelog供下游consumer消费,我们可以通过流读ODS层changelog接上ETL逻辑写入到DWD层,如下图的pipeline:

        流读的时候我们要注意changelog有可能会被compaction合并掉,中间记录会消除,可能会影响计算结果,需要关注sql-client的属性(result-mode)同上。

3、案例演示

-- 使用changelog
CREATE TABLE t6(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4',
  'changelog.enabled' = 'true'
);


insert into t6 values(1,1);
insert into t6 values(1,2);

select * from t6;  可以获取最新的数据,一条
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 可以获取2条


-- 不使用changelog
CREATE TABLE t6_v(
  id int PRIMARY KEY NOT ENFORCED,
  age INT
) WITH (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_flink/t6',
  'table.type' = 'MERGE_ON_READ',
  'read.streaming.enabled' = 'true',
  'read.streaming.check-interval' = '4'
);
select * from t6/*+ OPTIONS('read.start-commit'='earliest')*/; 只会获取1条,读时合并

9.2、Append模式

        从0.10开始支持
        对于INSERT模式:

  • MOR默认会apply小文件策略:会追加写avro log文件
  • COW每次直接写新的parquet文件,没有小文件策略

        Hudi支持丰富的Clustering策略,优化INSERT模式下的小文件问题:

1、Inline Clustering

        只有Copy On Write表支持该模式

名称

Required

默认值

说明

write.insert.cluster

false

false

是否在写入时合并小文件,COW 表默认 insert 写不合并小文件,开启该参数后,每次写入会优先合并之前的小文件(不会去重),吞吐会受影响

2、Async Clustering

        从 0.12 开始支持

(1)WITH参数

名称

Required

默认值

说明

clustering.schedule.enabled

false

false

是否在写入时定时异步调度 clustering plan,默认关闭

clustering.delta_commits

false

4

调度 clsutering plan 的间隔 commits,

clustering.schedule.enabled 为 true 时生效

clustering.async.enabled

false

false

是否异步执行 clustering plan,默认关闭

clustering.tasks

false

4

Clustering task 执行并发

clustering.plan.strategy.target.file.max.bytes

false

1024 * 1024 * 1024

Clustering 单文件目标大小,默认 1GB

clustering.plan.strategy.small.file.limit

false

600

小于该大小的文件才会参与 clustering,默认600MB

clustering.plan.strategy.sort.columns

false

N/A

支持指定特殊的排序字段

clustering.plan.partition.filter.mode

false

NONE

支持

NONE:不做限制

RECENT_DAYS:按时间(天)回溯

SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

(2)Clustering Plan Strategy

        支持定制化的 clustering 策略。

名称

Required

默认值

说明

clustering.plan.partition.filter.mode

false

NONE

支持

  • NONE:不做限制
  • RECENT_DAYS:按时间(天)回溯
  • SELECTED_PARTITIONS:指定固定的 partition

clustering.plan.strategy.daybased.lookback.partitions

false

2

RECENT_DAYS 生效,默认 2 天

clustering.plan.strategy.cluster.begin.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定开始 partition(inclusive)

clustering.plan.strategy.cluster.end.partition

false

N/A

SELECTED_PARTITIONS 生效,

指定结束 partition(incluseve)

clustering.plan.strategy.partition.regex.pattern

false

N/A

正则表达式过滤 partitions

clustering.plan.strategy.partition.selected

false

N/A

显示指定目标 partitions,支持逗号 , 分割多个 partition

10、Bucket索引

        从0.11开始支持

        默认的flink流式写入使用state存储索引信息:primarykey到fileId的映射关系。当数据量比较大的时候,state的存储开销可能成为瓶颈,bucket索引通过固定的hash策略,将相同key的数据分配到同一个fileGroup中,避免了索引的存储和查询开销。

1、WITH参数

名称

Required

默认值

说明

index.type

false

FLINK_STATE

设置BUCKET开启Bucket索引功能

hoodie.bucket.index.hash.field

false

主键

可以设置成主键的子集

hoodie.bucket.index.num.buckets

false

4

默认每个partition的bucket数,当前设置后则不可再变更。

2、和state索引的对比:
(1)bucket index没有state的存储计算开销,性能较好。
(2)bucket index无法扩buckets,state index则可以依据文件的大小动态扩容。
(3)bucket index不支持跨partition的变更(如果输入是cdc流则没有这个限制),state index没有限制。

11、Hudi Catalog

        将表的元数据持久化。从0.12.0开始支持,通过catalog可以管理flink创建的表,避免重复建表操作,另外hms模式的catalog支持自动补全hive同步参数。

        DFS模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'mode'='dfs' 
  );

         Hms模式Catalog SQL样例:

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '${catalog 的默认路径}',
    'hive.conf.dir' = '${hive-site.xml 所在的目录}',
    'mode'='hms' -- 支持 'dfs' 模式通过文件系统管理表属性
  );	

1、With参数

名称

Required

默认值

说明

catalog.path

true

--

默认的 catalog 根路径,用作表路径的自动推导,默认的表路径:${catalog.path}/${db_name}/${table_name}

default-database

false

default

默认的 database 名

hive.conf.dir

false

--

hive-site.xml 所在的目录,只在 hms 模式下生效

mode

false

dfs

支持 hms模式通过 hive 管理元数据

table.external

false

false

是否创建外部表,只在 hms 模式下生效

2、使用dfs方式 

1、创建sql-client初始化sql文件

vim /opt/module/flink-1.13.2/conf/sql-client-init.sql

CREATE CATALOG hoodie_catalog
  WITH (
    'type'='hudi',
    'catalog.path' = '/tmp/hudi_catalog',
    'mode'='dfs' 
  );

USE CATALOG hoodie_catalog;

2、 指定sql-client启动时加载sql文件

hadoop fs -mkdir /tmp/hudi_catalog

bin/sql-client.sh embedded -i conf/sql-client-init.sql -s yarn-session

需要先创建库 

3、建库建表插入

create database test;
use test;

create table t2(
  uuid varchar(20),
  name varchar(10),
  age int,
  ts timestamp(3),
  `partition` varchar(20),
primary key (uuid) not enforced
)
with (
  'connector' = 'hudi',
  'path' = '/tmp/hudi_catalog/default/t2',
  'table.type' = 'MERGE_ON_READ'
);

insert into t2 values('1','zs',18,TIMESTAMP '1970-01-01 00:00:01','a');

4、退出sql-client,重新进入,表信息还在

use test;
show tables;
select * from t2;

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐