从三层数仓建模、LOAD DATA 批量入库、增量归档到 Power BI 看板,完整复盘一个可复用的数据分析模块。

一、项目背景与优化目标

这套项目的原始数据主要包含两类:一类是 1200 万级商品维表,另一类是搜索候选行为和历史行为明细。最初的痛点主要有三个:

  1. 图形化工具导入超大 TXT 文件速度慢
  2. 行为字段里存在大量下划线拼接的多值列,无法直接落库查询
  3. 增量导入后还要继续支持 BI 看板查看累计结果

因此这次优化的目标很明确:

  1. 第一次全量导入要快
  2. 后续增量导入不能覆盖历史行为数据
  3. 导入完成后要能直接重建漏斗分析、用户分层和类目转化三类看板

二、MySQL 表设计:DWD → DWS → ADS 三层架构

为了避免千万级大表在 BI 查询时直接 JOINGROUP BY,这里采用了 DWD 明细层、DWS 汇总层、ADS 分析层三层架构。核心思想是:把重计算前移到入库完成后执行,而不是把计算压力留给报表。

2.1 DWD 明细层

DWD 层保留最完整的行级事实,当前项目有 3 张核心表:

  • dim_product:商品维度表,保存商品名称、品牌、类目、店铺等属性
  • fact_search_candidate:候选商品行为事实表,按 sample_id + candidate_rank 拆成一行一商品
  • fact_history_behavior:历史行为事实表,按 sample_id + history_rank 拆成一行一行为
CREATE TABLE dim_product (
    wid         VARCHAR(50)  NOT NULL PRIMARY KEY,
    name        VARCHAR(500),
    brand_id    VARCHAR(50),
    brand_name  VARCHAR(255),
    cate_id_1   VARCHAR(50),
    cate_name_1 VARCHAR(255),
    cate_id_2   VARCHAR(50),
    cate_name_2 VARCHAR(255),
    cate_id_3   VARCHAR(50),
    cate_name_3 VARCHAR(255),
    cate_id_4   VARCHAR(50),
    cate_name_4 VARCHAR(255),
    shop_id     VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE fact_search_candidate (
    id              INT AUTO_INCREMENT PRIMARY KEY,
    sample_id       INT NOT NULL,
    query           VARCHAR(2000),
    candidate_rank  INT,
    wid             VARCHAR(50) NOT NULL,
    label           TINYINT,
    label_name      VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

CREATE TABLE fact_history_behavior (
    id              INT AUTO_INCREMENT PRIMARY KEY,
    sample_id       INT NOT NULL,
    history_rank    INT,
    history_query   VARCHAR(2000),
    wid             VARCHAR(50) NOT NULL,
    behavior_type   VARCHAR(50),
    behavior_name   VARCHAR(50),
    history_time    VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;

原始行为文件中的 candidate_wid_listcandidate_label_listhistory_wid_list 等字段都是下划线拼接的多值列,因此在入库前必须先由 Python 拆成标准明细行。

2.2 DWS 汇总层

DWS 层把高频分析维度提前聚合,当前主要用了 4 张表:

  • dim_product_category:从商品维表中提取轻量类目字段
  • tmp_candidate_wid_summary:商品级曝光、点击、加购、购买汇总
  • ads_current_sample_summary:样本级当前候选行为汇总
  • tmp_history_sample_summary:样本级历史行为汇总
TRUNCATE TABLE tmp_candidate_wid_summary;
INSERT INTO tmp_candidate_wid_summary (wid, exposure_count, click_count, cart_count, purchase_count)
SELECT wid,
       COUNT(*) AS exposure_count,
       SUM(CASE WHEN label IN (1,2,3) THEN 1 ELSE 0 END) AS click_count,
       SUM(CASE WHEN label IN (2,3)   THEN 1 ELSE 0 END) AS cart_count,
       SUM(CASE WHEN label = 3        THEN 1 ELSE 0 END) AS purchase_count
FROM fact_search_candidate
GROUP BY wid;

2.3 ADS 分析层

ADS 层直接服务 BI,这里最终落了 4 张表:

  • ads_candidate_label_summary:候选标签分布
  • ads_search_funnel:曝光 → 点击 → 加购 → 购买漏斗
  • ads_category_conversion:一级类目转化表现
  • ads_user_segment:用户价值分层
INSERT INTO ads_search_funnel (step_name, step_count, step_rate)
SELECT '曝光', SUM(total_candidates), 1.0000 FROM ads_current_sample_summary
UNION ALL
SELECT '点击', SUM(click_count), ROUND(SUM(click_count)/SUM(total_candidates),4) FROM ads_current_sample_summary
UNION ALL
SELECT '加购', SUM(cart_count), ROUND(SUM(cart_count)/SUM(total_candidates),4) FROM ads_current_sample_summary
UNION ALL
SELECT '购买', SUM(purchase_count), ROUND(SUM(purchase_count)/SUM(total_candidates),4) FROM ads_current_sample_summary;

三、自动化增量 ETL 管道实现

当前主程序已经统一收敛到 run_incremental_v2.py,整体流程是:

incoming/ 目录扫描
→ detect_file_type() 判断 product / behavior
→ clean_product_to_csv() 或 clean_behavior_to_csv()
→ LOAD DATA LOCAL INFILE 写入明细表
→ 01_build_indexes.sql
→ 02_rebuild_summary.sql
→ 03_rebuild_ads.sql
→ 归档到 processed/

3.1 这版脚本的几个关键点

相比最初版本,这一版脚本主要做了 4 个关键改进:

  • 支持无表头的增量行为文件识别,不再因为首列不是 query 被误跳过
  • 行为数据会读取当前最大 sample_id,保证增量导入时历史数据不丢
  • 商品维表按“全量快照”处理,先灌入临时表,再原子切换到 dim_product
  • 统一清洗回车符和特殊字符,避免 label_namehistory_time 出现脏数据

3.2 文件类型识别

def detect_file_type(fp: Path) -> str:
    sep = detect_separator(fp)
    with open(fp, "r", encoding="utf-8-sig", errors="replace") as f:
        first_line = f.readline().strip()

    parts = [part.strip() for part in first_line.split(sep)]
    first_cell = parts[0].lower() if parts else ""

    if first_cell == "query":
        return "behavior"
    if first_cell == "wid":
        return "product"
    if len(parts) == len(USER_BEHAVIOR_COLUMNS):
        return "behavior"

    return "unknown"

这一步很重要,因为后续我们测试 5 万伪造行为增量时,用的就是无表头 CSV。如果没有这个兜底逻辑,脚本会直接把文件判成 unknown 然后跳过。

3.3 增量行为数据如何避免覆盖历史数据

行为数据导入前,会先查当前事实表中的最大 sample_id

cur.execute("SELECT COALESCE(MAX(sample_id), 0) FROM fact_search_candidate")
max_sample_id = cur.fetchone()[0]

这样第二天的新行为数据就会从第一天的最大 sample_id 继续往后编号,因此行为事实表是追加而不是覆盖

3.4 批量入库为什么快

真正让导入速度提升的关键,不是 Python 循环插入,而是先把清洗结果写成 TSV,再用 LOAD DATA LOCAL INFILE 一次性批量写入:

def load_csv_to_table(csv_path: Path, table: str, columns: str, replace: bool = False):
    cur.execute("SET autocommit = 0")
    cur.execute("SET foreign_key_checks = 0")
    cur.execute("SET unique_checks = 0")
    cur.execute(
        f"LOAD DATA LOCAL INFILE '{escaped_path}' "
        f"INTO TABLE {table} "
        f"CHARACTER SET utf8mb4 "
        f"FIELDS TERMINATED BY '\\t' "
        f"LINES TERMINATED BY '\\n' "
        f"({columns})"
    )
    conn.commit()

这种方式比逐行 INSERT 快得多,尤其适合千万级明细入库。


四、性能优化点总结

原始数据

导入新数据后#只测试导入了5万的行为数据

这套方案能跑起来,主要靠下面几个优化点:

  • 行为事实表导入前先删二级索引,导完再统一建索引,比边导入边维护索引快很多
  • 商品表采用全量快照 + 临时表切换,既保留速度,也避免直接 TRUNCATE 后失败导致空表
  • 行为清洗按 chunk 分块读取,避免一次性把超大明细展开到内存里
  • DWS 预计算把高频 JOIN / GROUP BY 从 BI 查询时前移到入库完成后执行
  • 所有已处理文件统一归档到 processed,支持重复运行和增量追加

五、踩坑与修复记录

这次优化过程中,踩坑不少,下面是几个最典型的问题。

5.1 表头误导入

问题:CSV 首次写入带表头,LOAD DATA 又没跳过头行,导致 sample_id=0 的脏数据进库。
修复:行为和商品 TSV 一律写无表头

5.2 Windows 回车符污染字段

问题:Windows 默认换行符会把 \r 残留到最后一列,最后出现 无行为\r0\r 这种值。
修复:

  • 写 TSV 时统一 lineterminator="\n"
  • 在清洗函数中统一移除 \r\n

5.3 旧版标签展开跨行串位

问题:旧版行为标签展开逻辑会把不同样本的 candidate_label_list 扁平拼在一起,造成标签错位。
修复:按单行对齐拆分 candidate_wid_listcandidate_label_list,不再做跨块扁平映射。

5.4 无表头增量文件被跳过

问题:无表头行为文件的首列不是 query,会被判定成 unknown
修复:增加“7 列即行为文件”的兜底判断逻辑。

5.5 伪造增量数据无法关联商品维表

问题:早期生成的 5 万伪造行为数据里,widP000001 这种虚构编码,导致类目分析全部失真。
修复:从真实商品文件中抽样 wid,再生成测试行为数据。


六、实测结果

6.1 首次全量导入

首次全量导入实测结果如下:

  • 商品维表 12141247 行,商品清洗 + 导入约 14 分钟
  • 候选行为表 15510012 行,历史行为表 26667260 行,行为清洗 + 导入约 14 分钟
  • 建索引 3 分 09 秒
  • 汇总 4 分 54 秒
  • ADS 2 分 34 秒
  • 全链路首次导入总耗时 38 分 56 秒

6.2 5 万行为增量导入

后续又测试了一次 5 万行为增量文件,结果如下:

  • 新增候选明细 225070
  • 新增历史明细 249582
  • 增量导入总耗时 12 分 51 秒
  • 原始历史行为数据不会清空,而是在 sample_id=173831 之后继续追加

这说明当前行为增量逻辑已经满足“第二天数据追加、第一天数据保留”的要求。


七、代码结构

data/test/
  run_incremental_v2.py      # 主流程:扫描、清洗、导入、重建汇总、归档
  clean_behavior.py          # 行为字段拆分与清洗逻辑
  gen_5w.py                  # 真实 wid 抽样的增量测试数据生成器
  config.py                  # MySQL 连接配置
  sql/
    00_rebuild_all_tables.sql
    01_build_indexes.sql
    02_rebuild_summary.sql
    03_rebuild_ads.sql

链接:https://pan.quark.cn/s/03c47705aecd?pwd=r4Rj

如果后续还要继续扩展留存分析、用户分层或者更多 BI 看板,建议继续沿用“明细层追加、汇总层重建、分析层直连”的思路,把复杂计算尽量放到导入完成后统一执行。


八、总结

这次优化的核心不是单纯“把导入跑通”,而是把它做成一个可以重复跑、可以增量追加、可以直接对接 BI 的数据分析模块。

最终这套方案解决了三个关键问题:

  1. 首次全量导入速度慢
  2. 后续增量导入不能保留历史
  3. BI 查询直接打明细表性能差

通过 Python 清洗 + LOAD DATA 批量入库 + 三层数仓建模 + 增量归档,这套链路已经具备了较强的复用价值。后续无论是继续扩展留存分析,还是接入更多 BI 看板,都可以在这套基础上继续演进。

更多推荐