可观测性:Langfuse、Langsmith 集成
系列「企业级 AI Agent 实现拆解」E28 篇。上一篇讲了调试工具:Eino Dev 交互式调试——开发期在浏览器里单步执行图。这篇讲生产期可观测性——把每次 Agent 调用的完整链路、Token 消耗、模型参数发到 Langfuse 或 Langsmith。
读完这篇你会知道
callbacks.Handler接口:5 个方法,覆盖所有执行时机- 三种注册方式:全局 / 单次调用 / 指定节点
- Langfuse 接入:
NewLangfuseHandler+ 批量异步上报 +SetTrace设置请求元数据- Langsmith 接入:
NewLangsmithHandler+ run tree 结构HandlerBuilder:不依赖任何外部平台,自建轻量追踪- Langfuse vs DeepFlux OTel:两种观测维度,可以同时跑
一、问题:LLM 应用的可观测性缺口
传统分布式追踪(OTel)能告诉你每个服务的耗时、错误率、调用关系。但 LLM 应用有它特有的信息:
- 模型收到的 prompt 是什么?
- 输出的 completion 是什么?
- 用了多少 Token?哪个模型?temperature 是多少?
- 同一个用户的多次对话怎么归到一条 session?
这些信息放在 OTel span 里不自然,放在专为 LLM 设计的平台(Langfuse、Langsmith)里才好用。
Eino 把两者都留了接入口——同一个 callbacks.Handler 接口。
二、Handler 接口:5 个时机
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo,
input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo,
output *schema.StreamReader[CallbackOutput]) context.Context
}
每个节点执行前调 OnStart(或流式版),执行后调 OnEnd,出错调 OnError。
RunInfo 告诉你是哪个节点:
type RunInfo struct {
Name string // 节点名(compose.WithNodeName 指定)
Type string // 实现类型,如 "OpenAI"
Component components.Component // 节点类别,如 ComponentOfChatModel
}
关键设计:每个方法返回的 context.Context 会传给同一个 Handler 的下一个方法。这样 OnStart 里存入 traceID,OnEnd 里就能取到——用 context.WithValue 在 Handler 内部传状态,不依赖全局变量。
三、三种注册方式
// 1. 全局注册:对所有节点、所有调用都生效
callbacks.AppendGlobalHandlers(myHandler)
// 2. 单次调用:仅对这次 Invoke/Stream 生效
runner.Invoke(ctx, input, compose.WithCallbacks(myHandler))
// 3. 指定节点:仅对名为 "model" 的节点生效
runner.Invoke(ctx, input, compose.WithCallbacks(myHandler).DesignateNode("model"))
全局注册比 per-invocation 优先级高——先执行全局 Handler,再执行调用时传入的 Handler。
四、Langfuse 接入
安装
go get github.com/cloudwego/eino-ext/callbacks/langfuse
初始化
import cbLangfuse "github.com/cloudwego/eino-ext/callbacks/langfuse"
handler, flusher := cbLangfuse.NewLangfuseHandler(&cbLangfuse.Config{
Host: "https://cloud.langfuse.com",
PublicKey: os.Getenv("LANGFUSE_PUBLIC_KEY"),
SecretKey: os.Getenv("LANGFUSE_SECRET_KEY"),
// 批量上报配置(可选)
Threads: 5, // 并发上报 worker 数,默认 1
FlushAt: 50, // 攒够 50 条就发,默认 15
FlushInterval: 10 * time.Second,// 每 10 秒刷一次,默认 500ms
MaxTaskQueueSize: 1000, // 内存队列上限,默认 100
// Trace 元数据(请求级别可覆盖)
Name: "my-agent",
UserID: "default-user",
SessionID: "default-session",
})
// 进程退出前确保所有缓冲事件发出去
defer flusher()
callbacks.AppendGlobalHandlers(handler)
节点映射规则
Langfuse 有两种观测单元:
| Eino 节点类型 | Langfuse 对象 | 包含的额外信息 |
|---|---|---|
ComponentOfChatModel |
Generation | model name、model params、prompt messages、completion message、token 用量 |
| 其他节点(Lambda、Tool 等) | Span | 节点输入(JSON)、节点输出(JSON)、耗时 |
这个区分是自动的,Handler 内部用 info.Component 判断分支:ChatModel 走 cli.CreateGeneration/EndGeneration,其他走 cli.CreateSpan/EndSpan。
流式处理
流式响应(OnEndWithStreamOutput)会启一个 goroutine 消费 StreamReader,收集完所有 chunks 之后再调 EndGeneration,所以 Langfuse 上看到的 completion 是完整的,不是一块一块的。
go func() {
defer output.Close()
var outs []callbacks.CallbackOutput
for {
chunk, err := output.Recv()
if err == io.EOF { break }
outs = append(outs, chunk)
}
// 合并后上报
c.cli.EndGeneration(body)
}()
请求级别的 Trace 元数据
全局配置的 UserID、SessionID 是默认值。每个请求可以覆盖:
ctx = cbLangfuse.SetTrace(ctx,
cbLangfuse.WithUserID("user-123"),
cbLangfuse.WithSessionID("session-456"),
cbLangfuse.WithTags("production", "v2"),
cbLangfuse.WithRelease("v1.2.3"),
)
runner.Invoke(ctx, input)
SetTrace 在 context 里存一个 traceOptions,Handler 的 getOrInitState 检测到有这个 key 就用它覆盖默认值。
五、Langsmith 接入
go get github.com/cloudwego/eino-ext/callbacks/langsmith
import cbLangsmith "github.com/cloudwego/eino-ext/callbacks/langsmith"
handler, err := cbLangsmith.NewLangsmithHandler(&cbLangsmith.Config{
APIKey: os.Getenv("LANGSMITH_API_KEY"),
APIURL: "https://api.smith.langchain.com", // 默认值,可不填
})
if err != nil {
log.Fatal(err)
}
callbacks.AppendGlobalHandlers(handler)
Langsmith 在 context 里维护一个 LangsmithState,记录 TraceID、ParentRunID、ParentDottedOrder——这是 Langsmith 用来重建 run tree 层级的核心数据。每个节点的运行对应一个 run,嵌套节点的 run 挂在父 run 下面,Langsmith 界面上展示成树形。
六、Cozeloop 接入
字节跳动 Coze 平台的可观测工具,接入方式类似:
go get github.com/cloudwego/eino-ext/callbacks/cozeloop
import (
"github.com/cloudwego/eino-ext/callbacks/cozeloop"
cozeloopcli "github.com/coze-dev/cozeloop-go"
)
client := cozeloopcli.New(cozeloopcli.WithAPIToken(os.Getenv("COZELOOP_TOKEN")))
handler := cozeloop.NewLoopHandler(client, cozeloop.WithTracing(true))
callbacks.AppendGlobalHandlers(handler)
七、HandlerBuilder:自建轻量追踪
不想依赖外部平台,只想在本地打日志或推 metrics?用 HandlerBuilder:
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
mi := model.ConvCallbackInput(input)
if mi != nil {
log.Printf("[%s] model call: %d messages", info.Name, len(mi.Messages))
}
return context.WithValue(ctx, startTimeKey{}, time.Now())
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
start, _ := ctx.Value(startTimeKey{}).(time.Time)
mo := model.ConvCallbackOutput(output)
if mo != nil && mo.Message.ResponseMeta != nil {
usage := mo.Message.ResponseMeta.Usage
log.Printf("[%s] done in %v, tokens: %d+%d",
info.Name,
time.Since(start),
usage.PromptTokens,
usage.CompletionTokens,
)
}
return ctx
}).
Build()
callbacks.AppendGlobalHandlers(handler)
model.ConvCallbackInput(input) 把 CallbackInput 安全转型为 *model.CallbackInput,如果不是 ChatModel 节点就返回 nil,可以直接 nil-check 跳过。
八、Langfuse vs DeepFlux OTel:两个维度
DeepFlux 用 OTel(server/internal/observability)做全平台追踪,Langfuse 用于 LLM 质量监控——两者不冲突,可以同时运行:
| 维度 | Langfuse / Langsmith | DeepFlux OTel |
|---|---|---|
| 关注点 | LLM 语义:prompt / completion / token | 基础设施:延迟 / 错误率 / 服务依赖 |
| 存储 | 专属后端(云端或自托管) | Tempo(trace)+ Prometheus(metrics) |
| 调试场景 | “这个回答为什么差” | “这个请求为什么慢” |
| 告警 | Langfuse score / evaluation | Grafana alertmanager |
| 接入成本 | 两行代码 | OTel SDK 初始化 + 中间件 |
生产环境两套都跑:OTel 负责 SLO 监控和告警,Langfuse 负责 prompt 质量分析和 session 回放。
小结
Eino 的 Callback 接口是可观测性的统一入口。5 个时机(OnStart / OnEnd / OnError / stream 版本),任何实现了接口的对象都能插进去。
Langfuse 接入只需 NewLangfuseHandler(cfg) + defer flusher(),ChatModel 节点自动上报为 Generation(带 token 用量),其他节点上报为 Span。流式响应在 goroutine 里收集完再上报。SetTrace(ctx, ...) 在请求粒度覆盖 userID 和 sessionID。
Langsmith 同理,核心差异是用 DottedOrder 维护 run tree 层级。
不依赖外部平台的场景用 HandlerBuilder 自建,几行代码实现自定义追踪逻辑。
下篇继续。
代码来源:cloudwego/eino-ext · cloudwego/eino
更多推荐




所有评论(0)