首先,附上 Github 链接LakeSoul:https://github.com/meta-soul/LakeSoul,可搜索公众号元灵数智,在底部菜单了解我们 - 用户交流获取官方技术交流群二维码,进群与业内大佬进行技术交流。
DMetaSoul团队于7月初发布了LakeSoul 2.0版本,对1.0版本进行了多方面升级优化,提高了自身架构设计的灵活性,也更好地适应客户未来业务高速发展的需要。
2.0版本研发升级的主要目标:
-
多计算引擎(Flink、Presto等)支持,重构了Catalog,与Spark解耦;
-
使用Postgres SQL协议支持更高要求的事务性机制,替换Cassandra SQL,同时减少企业Cassandra集群管理成本;
-
支持更多业务生产当中的功能诉求,如版本快照、回滚、Hive对接等;
-
强化生态系统建设,完善上下游链路设计;
团队研发经历了Catalog重构、Spark与Catalog对接改造、新的用户特性开发以及Flink计算引擎支持几个阶段,从而实现设计目标,下面分别介绍各自的功能点。
1.Catalog重构
1.1 支持Postgres SQL协议
在2.0版本中,全面使用Postgres SQL(PG)协议实现元数据与数据库交互,使用原因在https://github.com/meta-soul/LakeSoul/issues/23提到。一方面,Cassandra原生不支持单表多分区事务机制;另一方面,Cassandra集群管理维护成本较高,而Postgres SQL协议在企业使用广泛,维护成本较低。在使用时需要配置PG相关参数,参考https://github.com/meta-soul/LakeSoul/wiki/02.-QuickStart
1.2 独立的Catalog框架
在2.0版本中,Catalog从Spark中进行了分离解耦,实现了独立的元数据存储结构和接口,Spark、Flink、Presto等计算引擎都可以与LakeSoul对接,提供多引擎流批一体能力。
1.3 数据提交冲突检测机制
当多个任务同时往同一张表写入数据时,会引起同一分区数据写入的一致性问题。为解决此问题,LakeSoul先对数据提交类型进行了归类,定义了四种类型:append、compaction、update和merge,这样当两种类型同时提交时会进行一次冲突检测,从而决定数据提交成功或者失败(X)以及成功时数据处理流程。例如若是两个任务进行append类型数据提交操作,当检测到冲突时,冲突任务只需要重复提交即可。
2.Spark与Catalog对接和改造
Catalog重构完成后,与Spark正式解耦。原有Spark中大部分功能受到解耦影响,需结合新的Catalog设计框架,对三大块内容进行改造。
2.1 Spark DDL
Spark SQL中DDL语句(create、drop table等)以及DataSet相关函数(save等)都与Catalog交互紧密,创建表信息会存储到Catalog中。我们重新调整了Spark scala接口,与Catalog进行了对齐。
2.2 Spark DataSink
DataSink操作(insert into、upsert等)不但涉及到元数据信息(如表),还涉及到流批任务写入数据时数据文件记录与分区数据冲突检测等操作,我们同样调整了Spark Job完成后的数据操作接口,同时把分区数据提交冲突检测下移到了Catalog中。
2.3 Spark DataSource
DataSource上主要对Merge On Read(MOR)进行了优化,在1.0版本中数据读取会按照数据文件write version版本号排序,在Merge时默认会使用最新版本数据覆盖老版本数据(DefaultMergeOp)。在2.0版本中,取消了数据文件Write Version属性,转而代替使用文件有序列表,最新的生成文件在列表最后。
在1.0版本中,若是用户使用LakeSoul提供的其他MergeOperator如MergeOpLong,需要先注册然后在读取时指定相关的列名。
LakeSoulTable.registerMergeOperator(spark, "org.apache.spark.sql.lakesoul.MergeOpLong", "longMOp")
LakeSoulTable.forPath("tablepath").toDF.withColumn("value", expr("longMOp(value)")) .select("hash", "value")
若是表中需要使用的列字段数量多甚至全部字段都要使用时,显然一一指定很不方便,而用户不指定时LakeSoul默认在全部字段会使用DefaultMergeOp。
在https://github.com/meta-soul/LakeSoul/issues/30案例中,用户希望全部字段默认就用MergeNonNullOp算子,在1.0版本中显然不满足此需求。在2.0版本中,LakeSoul实现了修改默认MergeOprator操作。
spark-shell --conf defaultMergeOpInfo="org.apache.spark.sql.execution.datasources.v2.merge.parquet.batch.merge_operator.DefaultMergeOp"
3.新的业务使用特性
在2.0版本中,还提供三个新功能,满足实际落地中的业务需求。
3.1 快照
快照提供了用户查看历史数据的能力。在LakeSoul中用户每次upsert、update、insert等操作时相应分区都会产生一个数据版本,随着数据持续更新,历史版本会增多,用户希望查看历史数据,方便数据比对等,https://github.com/meta-soul/LakeSoul/issues/41。2.0版本中提供了用户按照分区进行快照查看某一个版本号的能力。
LakeSoulTable.forPath("tablepath","range=rangeval",2).toDF.show()
3.2 回滚
回滚提供了用户将历史某一个版本的数据作为当前数据的能力,能让数据回到过去。在2.0版本中,同样提供了按照分区进行数据回滚到某一个历史版本能力,
https://github.com/meta-soul/LakeSoul/issues/42
LakeSoulTable.forPath("tablepath").rollbackPartition("range=rangeval",2)
3.3 Hive支持
在金融、互联网等领域,很多企业会选择Hive作为离线数仓。在2.0版中,LakeSoul不但考虑了上游计算引擎的统一性,还考虑了下游数据输出的多样性。而Hive是优先支持对接LakeSoul下游的数仓工具。
用户需先将hive-site.xml相关配置放到Spark conf目录下,并创建Hive外表,在LakeSoul中还需要结合Compaction功能一起使用,使用Compaction主要是因为Hive不支持Merge On Read操作,只支持Copy On Write操作。
LakeSoulTable.forPath("tablepath").compaction("range=rangeval","hiveExternalTableName")
3.4 Flink计算引擎支持
在2.0版本中,LakeSoul支持Flink计算引擎流批一体写入操作(https://github.com/meta-soul/LakeSoul/tree/feature_flinkOrderSink),
并实现了Exactly-once语义。对于Flink CDC等场景能够保证数据准确性,同时结合LakeSoul提供主键的Merge On Read和Schema AutoMerge能力,对于多表合并、分库合并等实际业务场景具有较高商业价值。
结束语
LakeSoul2.0(https://github.com/meta-soul/LakeSoul/releases/tag/v2.0.0-spark-3.1.2)版本围绕生态系统进行进一步地升级迭代,注重上下游工具生态的无缝对接,也注重企业业务应用中的相关特性。我们希望伴随着2.0版本的发布,作为国产唯一的湖仓一体框架,能形成更好的用户生态和业务生态,让数据智能触手可及,释放数据的业务价值红利。
官方资料 GitHub:
LakeSoul:https://github.com/meta-soul/LakeSoul
MetaSpore:https://github.com/meta-soul/MetaSpore
AlphaIDE:https://registry-alphaide.dmetasoul.com/login
官方交流群: 微信群:关注公众号“元灵数智”,点击“了解我们-用户交流”即可获取二维码
Slack: https://join.slack.com/t/dmetasoul-user/shared_invite/zt-1681xagg3-4YouyW0Y4wfhPnvji~OwFg
更多推荐