我作为算法工程师在落地大模型应用时,不得不搭建的一套数据闭环管线。核心围绕一个念头——让 Agent 产生的每一条对话、RAG 召回的每一篇文档,都能流回训练和检索系统,让应用越用越聪明。

目录

  • 为什么算法工程师要去折腾数据管道

  • 选型:把已有的轮子拼起来

  • 一条对话日志的完整旅程

  • Token 计量与日志标准化——让对话可被计算

  • Agent 图谱拆解——把黑盒打开

  • RAG 知识库的增量推送——检索永远需要最新鲜的数据

  • AI 质检——用大模型给大模型的回答打分

  • 标注飞轮——让人工反馈自动回流

  • 多模态附件对齐——让训练数据带上“眼睛”和“耳朵”

  • 一些不吐不快的工程代价

  • 围绕这套系统的技术速查清单


为什么算法工程师要去折腾数据管道

在做 Agent 和 RAG 的初期,我和很多人一样,把精力全扑在模型选型、Prompt 调优、工具定义上。上线跑了一段时间后发现,真正制约迭代速度的,是数据。

Agent 每天产出的对话记录,就那么在日志里躺尸。RAG 知识库的文档,更新全凭人工上传,两个月前的制度还在给用户当答案。更头疼的是,想用真实对话做微调,光是清洗格式、统计 Token、剔除脏数据就要花掉大半天。

市面上不是没有数据处理工具,但大部分偏 ETL 或者纯后端工程,懂 NLP 和模型需求的人往往不参与数据管道的设计。结果就是,算法工程师拿到手里的“清洗后数据”,常常需要二次加工。

我决定自己来。基于常用的开源组件,我搭了一套专为 Agent 和 RAG 设计的数据闭环管线。它不做离线批处理,而是时刻在线,把对话日志、检索反馈、质检结果、人工标注全部串在一起,形成一条能自己转起来的飞轮。


选型:把已有的轮子拼起来

没有引入什么新奇的技术。存储层用的是 Elasticsearch 存全量数据行和向量,MinIO 存原始文件,MySQL 存任务状态。调度用 Quartz,分布式锁用 Redis,跨线程上下文传递靠 TTL。编程语言是 Java,生态里这些组件都是现成的。

唯一特意引入的“算法”依赖是 jtokkit——OpenAI tiktoken 的 Java 移植版。我需要离线计算每条对话的 Token 数,不能总依赖调用 API 时返回的 usage 字段,因为很多日志是从中间件转发过来的,usage 早就丢掉了。


一条对话日志的完整旅程

平台的工作流很直觉:上游应用(无论是自研 Agent、Dify 工作流,还是 Langfuse 埋点)把数据推过来。数据进来后,经历标准化、Token 计量、可选的多模态附件关联,然后进入质检和标注环节。最终,高质量数据被推送到下游的训练平台或 RAG 知识库。

整个过程有两个核心设计。第一,数据从不以原始文件形式经过后端服务——文件通过预签名 URL 直接上传到 MinIO,后端只处理元数据。第二,每条数据都带有丰富的“AI 上下文”:不仅是对话文本,还有这次请求用了哪些工具、召回了哪些文档、模型最终采纳了哪几条、用户有没有点踩。

这些上下文,是后面分析 Agent 行为和优化 RAG 召回的关键。


Token 计量与日志标准化——让对话可被计算

Agent 对话日志在进入系统前,格式五花八门。有的用 ChatML,有的用 ShareGPT,有的只存了 user 和 assistant 两个字段,system prompt 和工具调用全都丢掉。

我定义了一套内部标准格式,类似 OpenAI Chat Completions 的消息结构,但额外扩充了几个字段:

  • retrieved_docs:本次请求从知识库召回的全部文档 ID 列表

  • adopted_docs:模型在回答中真正引用到的文档 ID(通过事后相似度比对或规则判定)

  • tool_calls:工具调用的完整记录,包括函数名、参数、返回值

  • user_feedback:如果业务系统有反馈按钮,就带上

其中 adopted_docs 是后来补上的。我发现很多 RAG 应用,召回了 5 条文档,模型可能只看了一条甚至一条都没看。只记录“召回了什么”而不记录“采纳了什么”,后续优化检索策略时根本抓不住重点。

Token 计量方面,我用 jtokkit 对每条消息的 content 做计数,区分 prompt_tokens 和 completion_tokens。这为后面的成本核算、数据筛选(比如只保留 Token 数在合理区间的长对话)打下了基础。

Agent 图谱拆解——把黑盒打开

很多 Agent 平台(比如 Dify)会导出 DSL 文件,描述了工作流的节点和连线。Langfuse 这类可观测性工具则记录每次执行的 Trace,包含每个节点的实际输入输出和耗时。

我把这两种数据都接入了进来。DSL 告诉我“设计上 Agent 长什么样”,Trace 告诉我“实际上它跑得怎么样”。

每条 Trace 会被拆解成节点级数据,存入 ES。一个典型的意图识别-检索-生成-校验四步 Agent,会产生四条记录,每条记录都有独立的 node_type、input、output、start_time、end_time。

这些数据积累下来后,我可以轻松回答以前很难回答的问题:

  • 检索节点的 P99 延迟是多少?是不是因为向量数据库没有预热?

  • 最近一周工具调用的失败率有没有上升?是不是某个外部 API 变更了?

  • 用户问“退货流程”时,Agent 是先检索知识库,还是直接调了订单查询工具?是否符合预期?

更重要的是,这些节点数据可以直接用来构造训练样本。比如“检索节点”的 query 和最终被采纳的文档,就是一对优质的检索训练对;“工具调用节点”的输入和输出,是训练 function calling 的天然素材。


RAG 知识库的增量推送——检索永远需要最新鲜的数据

RAG 应用最大的痛点不是召回率,是数据过期。业务文档一天改好几版,知识库如果一周才全量重建一次,用户搜到的内容早就不对了。

我在管线里接入了定时增量推送。核心逻辑不复杂:用 ES 的序列号记录每次推送的位置,下一轮只拉取增量文档,打成 JSONL 上传到知识库的存储目录,调一次索引刷新接口。

这里有两个反复验证过的选择。一,为什么用序列号而不是时间戳?时间戳依赖系统时钟,并发写入时可能乱序,容易漏数据。序列号单调递增,天然支持断点续传。二,为什么不全量推送?全量推送在小规模(万条以内)时简单可靠,但文档量上十万后,每次全量重建索引可能要几十分钟,增量推送则只需要几十秒。

知识库更新后,效果怎么量化?我留了一套标准测试集,每次推送前后各跑一遍,对比 top-k 召回准确率和最终回答的正确率。这个数据可以直观看到“这次更新是正向还是负向”。


AI 质检——用大模型给大模型的回答打分

对话数据入库前,必须过一道质检。人工一条条看太慢,完全靠规则又太死板,我的做法是用大模型自己当裁判。

给模型一段精心设计的 Prompt,让它从准确性、完整性、流畅性几个维度给回答打分。Prompt 里不写“请打分”这种模糊指令,而是给每个分数段配上具体的行为描述。比如:

  • 准确性 5 分:回答完全正确,引用的知识来源准确无误

  • 准确性 1 分:回答存在明显的事实错误,或完全答非所问

这种“锚点描述”极大提升了评分的一致性。早期没加锚点的时候,同一对话两次评分能差两分;加上之后一致性从 60% 左右提到了 85% 以上。

分数低于阈值的,自动标记为“待人工复核”,进入标注队列。分数达标的直接入库成为训练候选。阈值不是拍脑袋,是用一批人工标注的金标数据跑了一遍“阈值-准确率-召回率”曲线,选了个平衡点。


标注飞轮——让人工反馈自动回流

质检筛出的低分样本,会自动创建标注任务,推送到标注工具。标注人员修正后,结果回写到 ES,同时触发一个事件:下游的训练数据筛选流程会感知到“有新一批高质量标注数据入库了”。

这个设计让标注不再是离线的一次性行为,而是持续在线的反馈循环。标注质量怎么控制?我用了三层:AI 初筛减少人工量,多人交叉标注避免单人偏差,定期混入金标数据评估标注人员稳定性。

另一个我后来加上的改进是,标注任务会带上“上下文”——不只给标注人员看单条回答,还会附上当时的对话历史、召回文档片段、甚至 Agent 的节点执行路径。这让标注人员能更准确地判断“这条回答到底合不合理”。

最终,这个闭环会回流到模型训练。训练平台用新标注的高质量数据微调模型,部署上线后产生新的对话日志,日志再流入这套管线——飞轮就这样转起来了。


多模态附件对齐——让训练数据带上“眼睛”和“耳朵”

很多业务场景不只有纯文本。客服系统里,用户传一张截图比打一百个字管用。做多模态模型或者给纯文本模型构造图文训练对时,需要把图片、音频这些附件,和它们对应的描述文本对齐。

上游给过来的往往是一个表格加一堆附件文件。表格里某列是“产品图片”,但实际值写的是“图片1.png”,真实文件名却是“产品图_20240101.png”。这种情况靠规则匹配很脆弱。

我做了三层自适应匹配。有显式字段声明就优先在声明的字段里匹配,没有就全字段扫描。匹配算法从精确匹配开始,逐步降级到去扩展名匹配、子串包含。全字段扫描时,先给所有附件名建一个 HashMap 索引,避免 O(n*m) 的遍历。

没匹配上的附件不会丢掉,会单独生成一条告警,并在结果数据集里保留原始文件名,等人工二次确认。原则是宁可不关联,也不误关联——训练数据里一张图配错描述,比漏配更影响模型效果。


一些不吐不快的工程代价

做这套东西,踩过的坑比写出来的代码还多。

大文件解析曾经把服务搞挂好几次。一个十万行的 Excel,用老版 POI 全量加载,JVM 堆直接打满。后来切到流式解析,门口再加文件大小和行数限制,再也没崩过。

增量推送早期用时间戳,一次并发写入导致十几条文档漏推,知识库里用户搜出来的还是旧版本。换成序列号方案后,再没漏过一条。

分布式锁的 key 设计也有过教训。最早只用了数据集名称,两个不同项目各建了一个“测试数据集”,后创建的那个直接被当成重复请求拦掉。后来 key 里加上了项目 ID 和业务线标识。

AI 质检的 Prompt 调整了不下十版。最开始用“好/坏”二元判断,同一个对话两次评估结果完全相反。后来改多维度打分,配锚点描述,再用金标数据校准,才稳定下来。

围绕这套系统的技术速查清单

以下是我在面试和复盘时,觉得最有价值讨论的几个技术点。

Agent 图谱数据对优化智能体有什么实际价值?
图谱拆解后的节点级数据,是优化 Agent 的数据基础。可以按节点类型统计延迟、成功率、工具调用频次,快速定位瓶颈。检索节点的 query-文档对可以直接用于训练检索模型,工具调用节点的输入输出可用于训练 function calling。

RAG 知识库增量推送如何保证不漏不重?
用数据存储内部的序列号而非时间戳。序列号单调递增,即使推送失败,重试时从上次记录的序列号位置继续,天然支持断点续传。时间戳依赖系统时钟,并发写入时可能乱序漏数据。

用大模型评估对话质量,一致性怎么保证?
一是 Prompt 里给出明确的分级标准,每个分数段有具体行为描述;二是定期用人工标注的金标数据做校准;三是可以考虑多个模型打分取平均,降低单模型偏差。AI 初筛的目标是高召回——宁可多送人工复核,不要漏掉差样本。

数据飞轮和普通数据管道的本质区别是什么?
普通管道是单向的,数据进去、处理、出来。飞轮是闭环的,下游模型的性能指标能反哺给上游的数据筛选和标注策略。反馈链路越短,模型迭代越快。

Token 计量为什么不用 API 返回的 usage 字段?
很多日志场景拿不到 API 原始返回,离线计算 Token 数更通用,不受厂商和网关限制。同时可以在数据入库前就做 Token 预算控制和筛选,不需要等下游消费时才判断。

附件与结构化字段的自动对齐怎么兼顾准确率和性能?
给附件文件名建索引,把全字段遍历的 O(n*m) 降到 O(n)。匹配策略从精确到模糊逐级降级,未匹配的告警而非静默丢弃。宁可漏关联,不要错关联,因为错误关联对训练数据的污染更大。

为什么要记录 RAG 中“模型采纳了哪些文档”?
只记录召回而不记录采纳,后续优化检索策略时只能靠猜。“召回但未采纳”比例过高,说明检索虽然捞到了文档,但文档质量或相关性不够好;或者 Prompt 没有引导模型充分参考检索结果。这个指标是优化 RAG 管线的关键信号。


这平台从第一条脚本到最后一条流水线,迭代了大半年。每一次改动背后,几乎都是某个实际需求或者线上故障逼的。写这些觉得做 Agent 和 RAG 的同行,迟早会碰到数据闭环的问题。如果这篇能让你少踩几个坑,少加几个班,那就算没白写。

更多推荐