Callback 系统:给 Agent 管道装上“监听器“
系列「企业级 AI Agent 实现拆解」E23 篇。前面拆解了 Batch 处理:并发控制与可中断批处理。这篇聚焦一个横切关注点:Callback 系统——不改一行业务代码,就能在任何组件的开始、结束、错误时刻插入自定义逻辑。生产 tracing、token 计量、进度推送全靠它。
读完这篇你会知道
- Callback 的五个时机:OnStart / OnEnd / OnError / OnStartWithStreamInput / OnEndWithStreamOutput
RunInfo:怎么知道当前回调是哪个节点、哪种组件触发的HandlerBuilder:一行链式调用注册你关心的时机- 全局 Handler vs. 单次 Handler:两种注入方式的区别
- 流式 Handler 的必须关闭约定:为什么忘了 Close 会泄漏 goroutine
- 真实案例:Langsmith tracing 怎么用 context 在 OnStart/OnEnd 之间传状态
- 组件开发者视角:怎么在自己的组件里埋 callback 点
一、为什么需要 Callback
一个 ReAct Agent 运行时可能调用 5 个节点、3 个工具、1 个模型——每次调用用了多少 token?哪个节点耗时最长?工具返回了什么?
如果在每个节点里手动加日志,代码会被监控逻辑污染,而且每次换监控平台(LangSmith → CozeLoop → APMPlus)就要改业务代码。
Callback 的思路是把观测逻辑和执行逻辑彻底分开:框架在固定时刻触发回调,你只需要注册一个 Handler,业务代码不感知 Handler 的存在。
二、接口:五个时机
// callbacks/interface.go
type Handler interface {
OnStart(ctx context.Context, info *RunInfo, input CallbackInput) context.Context
OnEnd(ctx context.Context, info *RunInfo, output CallbackOutput) context.Context
OnError(ctx context.Context, info *RunInfo, err error) context.Context
OnStartWithStreamInput(ctx context.Context, info *RunInfo, input *schema.StreamReader[CallbackInput]) context.Context
OnEndWithStreamOutput(ctx context.Context, info *RunInfo, output *schema.StreamReader[CallbackOutput]) context.Context
}
| 时机 | 触发点 | 输入参数 |
|---|---|---|
OnStart |
组件开始处理(非流式输入) | CallbackInput(具体类型由组件定义) |
OnEnd |
组件成功返回(非流式输出) | CallbackOutput |
OnError |
组件返回 error | error |
OnStartWithStreamInput |
组件接收流式输入(Collect/Transform) | *schema.StreamReader[CallbackInput] |
OnEndWithStreamOutput |
组件产生流式输出(Stream/Transform) | *schema.StreamReader[CallbackOutput] |
重要约束:
- OnEnd 和 OnError 互斥——同一次调用只会触发其中一个
- 流式 Handler 收到的是框架已复制好的独立 StreamReader,必须在处理完后
Close(),否则 goroutine 泄漏 - 同一个 Handler 的 ctx 在 OnStart → OnEnd 之间是连续的(可以传状态),不同 Handler 之间的 ctx 不互通
三、RunInfo:知道是谁在说话
每次回调都带着 *RunInfo,告诉 Handler 当前是哪个组件、哪个节点触发的:
type RunInfo struct {
Name string // 节点名(compose.WithNodeName 设置的)
Type string // 实现类型,如 "OpenAI"、"DeepSeek"
Component components.Component // 组件类别常量
}
三个字段的来源:
- Name:Graph 里的节点名;独立组件需要手动调
InitCallbacks设置 - Type:组件自己实现
Typer接口返回的字符串,没实现就反射取结构体名 - Component:框架定义的常量,如
ComponentOfChatModel、ComponentOfToolsNode、"Lambda"
为什么要有 Component? 因为 CallbackInput 和 CallbackOutput 是 any(interface{}),你需要先判断是什么类型的组件,再做类型断言:
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
// 只处理 ChatModel 的回调
modelInput := model.ConvCallbackInput(input)
if modelInput == nil {
return ctx // 不是 ChatModel,跳过
}
log.Printf("[%s] 发送 %d 条消息给模型", info.Name, len(modelInput.Messages))
return ctx
}).Build()
四、HandlerBuilder:只注册你关心的时机
实现全部五个方法太啰嗦。HandlerBuilder 允许你只设置需要的:
handler := callbacks.NewHandlerBuilder().
OnStartFn(func(ctx context.Context, info *callbacks.RunInfo, input callbacks.CallbackInput) context.Context {
log.Printf("[%s/%s] 开始", info.Component, info.Name)
return context.WithValue(ctx, startTimeKey{}, time.Now()) // 存开始时间
}).
OnEndFn(func(ctx context.Context, info *callbacks.RunInfo, output callbacks.CallbackOutput) context.Context {
start, _ := ctx.Value(startTimeKey{}).(time.Time)
log.Printf("[%s/%s] 完成,耗时 %v", info.Component, info.Name, time.Since(start))
return ctx
}).
OnErrorFn(func(ctx context.Context, info *callbacks.RunInfo, err error) context.Context {
log.Printf("[%s/%s] 出错: %v", info.Component, info.Name, err)
return ctx
}).
Build()
框架优化:HandlerBuilder 构建的 Handler 自动实现 TimingChecker 接口,未注册的时机返回 false,框架就跳过那个时机的 stream 复制和 goroutine 分配——没注册 OnEndWithStreamOutput 就不为它开销。
五、注入方式:全局 vs. 单次
全局 Handler(进程级)
// main.go 里调用一次,进程生命周期内所有调用都生效
callbacks.AppendGlobalHandlers(metricsHandler, tracingHandler)
全局 Handler 适合"监控一切"的场景(分布式追踪、Token 计费)。调用时序:全局 Handler 先于单次 Handler 执行。
注意:
AppendGlobalHandlers不是线程安全的,只能在main函数初始化时调用一次。
单次 Handler(调用级)
// 每次 Invoke 时按需注入
runner.Invoke(ctx, input, compose.WithCallbacks(progressHandler))
// 或者编译期绑定(对所有节点生效)
graph.Compile(ctx, compose.WithGraphCompileCallbacks(debugHandler))
单次 Handler 只对当次调用生效,适合调试、用户级进度推送。
六、流式 Handler:关闭是必须的
OnStartWithStreamInput 和 OnEndWithStreamOutput 收到的是框架提前复制的 StreamReader。如果不关闭,框架无法回收底层 goroutine:
// 错误写法:忘了 Close → goroutine 泄漏
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
chunk, _ := output.Recv()
log.Printf("first chunk: %v", chunk)
return ctx // 只读了一个,没 Close!
})
// 正确写法:defer output.Close() 兜底
OnEndWithStreamOutputFn(func(ctx context.Context, info *callbacks.RunInfo,
output *schema.StreamReader[callbacks.CallbackOutput]) context.Context {
go func() {
defer output.Close() // 无论如何都要关
for {
chunk, err := output.Recv()
if err != nil { break } // io.EOF 也会在这里返回
// 处理 chunk...
}
}()
return ctx
})
Langsmith 的实现就是这个模式——在 goroutine 里用 defer output.Close() 兜底,就算中途 panic 也不泄漏。
七、context 传状态:OnStart → OnEnd 的标准模式
同一个 Handler 的 OnStart 返回的 context 会流入它的 OnEnd/OnError。这是跨时机传状态的唯一正确方式。
Langsmith Handler 的完整模式(langsmith.go:75):
// OnStart:创建 run,把 runID 存入 context
func (c *CallbackHandler) OnStart(ctx context.Context, info *callbacks.RunInfo,
input callbacks.CallbackInput) context.Context {
runID := uuid.NewString()
c.cli.CreateRun(ctx, &Run{ID: runID, Name: info.Name, StartTime: time.Now()})
return context.WithValue(ctx, langsmithStateKey{}, &LangsmithState{
ParentRunID: runID, // 存起来,OnEnd 用
})
}
// OnEnd:从 context 取出 runID,更新 run(关联同一次调用)
func (c *CallbackHandler) OnEnd(ctx context.Context, info *callbacks.RunInfo,
output callbacks.CallbackOutput) context.Context {
state, _ := ctx.Value(langsmithStateKey{}).(*LangsmithState)
endTime := time.Now()
c.cli.UpdateRun(ctx, state.ParentRunID, &RunPatch{EndTime: &endTime})
return ctx
}
注意:每个 Handler 只能看到自己的 ctx 链,不同 Handler 之间的 ctx 不共享——不要依赖其他 Handler 存进去的 context value。
八、组件开发者视角:怎么在自己的组件里埋点
如果你在实现自定义组件(如自定义 ChatModel),用 callbacks/aspect_inject.go 里的工具函数:
// 非流式组件
func (m *MyModel) Generate(ctx context.Context, input []*schema.Message, ...) (*schema.Message, error) {
ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
resp, err := m.doGenerate(ctx, input)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
callbacks.OnEnd(ctx, &model.CallbackOutput{Message: resp})
return resp, nil
}
// 流式组件
func (m *MyModel) Stream(ctx context.Context, input []*schema.Message, ...) (*schema.StreamReader[*schema.Message], error) {
ctx = callbacks.OnStart(ctx, &model.CallbackInput{Messages: input})
stream, err := m.doStream(ctx, input)
if err != nil {
callbacks.OnError(ctx, err)
return nil, err
}
// 框架自动复制 stream 给各 Handler
ctx, stream = callbacks.OnEndWithStreamOutput(ctx, stream)
return stream, nil
}
独立运行(没有 Graph 管理 RunInfo)时,需要先初始化:
ctx = callbacks.InitCallbacks(ctx, &callbacks.RunInfo{
Type: "MyModel",
Component: components.ComponentOfChatModel,
Name: "my-model",
}, myHandler)
九、三个现成的 tracing 集成
eino-ext/callbacks/ 提供了开箱即用的 Handler:
| 平台 | 包路径 | 接入代码 |
|---|---|---|
| LangSmith | callbacks/langsmith |
langsmith.NewLangsmithHandler(&Config{APIKey: "..."}) |
| Langfuse | callbacks/langfuse |
langfuse.NewHandler(&Config{...}) |
| APMPlus | callbacks/apmplus |
apmplus.NewHandler(&Config{...}) |
接入 LangSmith 只需:
handler, _ := langsmith.NewLangsmithHandler(&langsmith.Config{
APIKey: os.Getenv("LANGSMITH_API_KEY"),
})
callbacks.AppendGlobalHandlers(handler)
之后所有 Graph 的所有节点的调用自动上报,不需要改一行业务代码。
小结
Callback 系统的设计哲学是侵入性为零:框架统一在五个时机触发回调,Handler 只关心自己的逻辑,业务代码不感知 Handler 的存在。HandlerBuilder 让你跳过不关心的时机(节省 stream 复制开销);RunInfo.Component 让一个 Handler 同时处理多种组件类型;context 链让 OnStart 到 OnEnd 之间安全传递状态。流式 Handler 必须 Close 是最容易踩到的坑——defer output.Close() 是标准写法。
下篇继续。
代码来源:cloudwego/eino · cloudwego/eino-ext · cloudwego/eino-examples
更多推荐



所有评论(0)