目录

一、拉链表概念

二、拉链表对应的业务需求

三、代码实现

3.1 数据初始化:

 3.2 创建ods层增量表:

 3.3 创建dwd层拉链表

 3.4 数据更新 ,将数据日期为2023-3-4的日期添加到拉链表中

 3.4.1 先追加数据到ods层表

3.4.2 更新dwd层表数据


一、拉链表概念

        拉链表是一种数据模型,主要是针对数据仓库设计中表存储数据的方式而定义的,顾名思义,所谓拉链,就是记录历史。记录一个事物从开始,一直到当前状态的所有变化的信息。拉链表可以避免按每一天存储所有记录造成的海量存储问题,同时也是处理缓慢变化数据(SCD2)的一种常见方式。

      百度百科的解释:拉链表是维护历史状态,以及最新状态数据的一种表,拉链表根据拉链粒度的不同,实际上相当于快照,只不过做了优化,去除了一部分不变的记录,通过拉链表可以很方便的还原出拉链时点的客户记录。

二、拉链表对应的业务需求

  1. 数据量比较大;
  2. 表中的部分字段会被update, 如用户的地址,产品的描述信息,订单的状态等等;
  3. 需要查看某一个时间点或者时间段的历史快照信息,比如,查看某一个订单在历史某一个时间点的状态,比如,查看某一个用户在过去某一段时间内,更新过几次等等;
  4. 变化的比例和频率不是很大,比如,总共有1000万的会员,每天新增和发生变化的有10万左右;
  5. 如果对这边表每天都保留一份全量,那么每次全量中会保存很多不变的信息,对存储是极大的浪费。
  6. 拉链历史表,既能满足反应数据的历史状态,又可以最大程度的节省存储;

缺点:

  • 开发/使用 成本略高,大量记录频繁变更会导致存储压缩效果降低
  • 不分区的话,数据量大的时候,查询效率低
  • 如果某个日期同步的数据出现问题需要重跑数据,则需要重跑从出问题的日期到当前日期的每一天数据

三、代码实现

       拉链表主要用在dwd层,用来及时记录每个事务状态的。加入ods层数据发生的新增或者更新,相应的dwd层的数据也会改变。拉链表数据生成的思路是:ods更新或者新增的数据 + union +dwd拉链表历史数据(要更改历史数据中状态发生改变的字段)。
方法有两种: 窗口函数和union all 。

3.1 数据初始化:

导入数据到一张初始表 (外部表)

-----------拉链---------------
create database lalian;
use lalian;
drop table if exists orders;
create external table  orders(
    orderid int,
    createdate string,
    modifiedtime string,
    status string
)
row format delimited fields terminated by '\t'
location '/tmp/lalian/orders';
-- [root@reagan180 ~]# vim /opt/aa.txt
-- [root@reagan180 ~]# hdfs dfs -mkdir -p /tmp/lalian/orders/
-- [root@reagan180 ~]# hdfs dfs -put /opt/aa.txt /tmp/lalian/orders/
select * from orders;

 3.2 创建ods层增量表:(按日期分区)

将初始表添加覆盖到ods层表中,数据日期为2023-3-3

create table  ods_orders_inc(
    orderid int,
    createdate string,
    modifiedtime string,
    status string
)partitioned by (day string)
row format delimited fields terminated by '\t';

insert overwrite table ods_orders_inc partition (day='2023-03-03')
select orderid, createdate, modifiedtime, status from orders;

 3.3 创建dwd层拉链表

将ods层数据添加覆盖到dw层,dw表增加  start_time 和 end_time 两列数据用来记录时间动态。

其实默认end_time 为时间极限值 '9999-12-31'

create table dws_orders_his(
    orderid int,
    createdate string,
    modifiedtime string,
    status string,
    start_time string,
    end_time string
)row format delimited fields terminated by '\t';
insert overwrite  table  dws_orders_his
select orderid, createdate, modifiedtime, status, modifiedtime,'9999-12-31' from ods_orders_inc where day='2023-03-03';

 

 3.4 数据更新 ,将数据日期为2023-3-4的日期添加到拉链表中

 3.4.1 先追加数据到ods层表

1、删除 hdfs 路径的aa.txt 文件  :[root@reagan180 ~]# hdfs dfs -rm -f /tmp/lalian/orders/*

2、将更新的数据重新传入hdfs路径:

               [root@reagan180 ~]# hdfs dfs -put /opt/aa.txt /tmp/lalian/orders/

where条件,不会覆盖日期2023-3-3的数据

insert overwrite table ods_orders_inc partition (day='2023-03-04')
select orderid, createdate, modifiedtime, status from orders 
where modifiedtime='2023-3-4';

3.4.2 更新dwd层表数据

可以采用union all 和窗口函数

union all :

insert overwrite  table  dws_orders_his
select tb.orderid,
       tb.createdate,
       tb.modifiedtime,
       tb.status,
       tb.start_time ,
       tb.end_time
from(
(select orderid, createdate, modifiedtime, status, modifiedtime as start_time,'9999-12-31' as end_time
             from ods_orders_inc where day='2023-03-04')
union all
(select t1.orderid,
        t1.createdate,
        t1.modifiedtime,
        t1.status,
        t1.start_time,
        if(t2.orderid is not null and t1.end_time>'2023-03-04', t2.modifiedtime, '9999-12-31') endtime
from dws_orders_his t1
left join
    (select orderid from ods_orders_inc where day='2023-03-04') t2 on t1.orderid=t2.orderid  ))tb
     order by tb.orderid,tb.start_time;

窗口函数:

lead(start_time, 1) over (partition by orderid order by start_time)

补充一下每日的用户更新表该怎么获取:

  1. 我们可以监听Mysql数据的变化,比如说用Canal,最后合并每日的变化,获取到最后的一个状态。
  2. 假设我们每天都会获得一份切片数据,我们可以通过取两天切片数据的不同来作为每日更新表。
  3. 每日的变更流水表。
  4. 通过etl工具对操作型数据库按照时间字段增量抽取到ods或者数据仓库(每天抽取前一天的数据),形成每天的增量数据(实际中使用最多的情形)。

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐