AI Agent 编排引擎实战:从单次调用到多步推理的调度架构

一、引言痛点:Agent 编排的工程化挑战

单个 LLM 调用谁都会写,但把多个 Agent 串成一条能跑的生产链路,那是另一回事。现实中的 Agent 编排面临一堆工程问题:步骤间上下文怎么传递、中间结果挂了怎么重试、长链路超时怎么处理、多 Agent 并行怎么聚合、Token 消耗怎么控制。很多人拿 LangChain 拼个 demo 就以为搞定了 Agent 编排,结果一上生产就各种翻车——链路一长就超时、重试逻辑缺失导致状态不一致、Token 爆炸直接把预算烧光。

Agent 编排的本质是一个分布式调度问题,跟微服务编排是同一类挑战。本文直接上方案,从编排引擎设计、状态管理、错误处理三个维度,讲清楚怎么构建一个能在生产环境跑的 Agent 编排系统。

二、编排引擎架构设计

2.1 DAG 驱动的编排模型

flowchart TD
    A[用户输入] --> B[意图识别 Agent]
    B --> C[信息抽取 Agent]
    B --> D[上下文检索 Agent]
    C --> E[推理 Agent]
    D --> E
    E --> F{结果校验}
    F -->|通过| G[输出格式化 Agent]
    F -->|不通过| E
    G --> H[最终输出]

    style F fill:#f9f,stroke:#333
    style E fill:#bbf,stroke:#333

2.2 编排引擎核心实现

// AI Agent 编排引擎
// 设计原则:DAG 定义流程,状态机驱动执行,失败可重试

package agent

import (
	"context"
	"fmt"
	"sync"
	"time"
)

// Step 定义编排中的一个步骤
type Step struct {
	Name        string         // 步骤名称
	Agent       Agent          // 执行该步骤的 Agent
	DependsOn   []string       // 依赖的步骤名
	Timeout     time.Duration  // 超时时间
	MaxRetries  int            // 最大重试次数
	RetryDelay  time.Duration  // 重试间隔
}

// Agent 接口定义
type Agent interface {
	Name() string
	Execute(ctx context.Context, input map[string]interface{}) (map[string]interface{}, error)
}

// StepResult 步骤执行结果
type StepResult struct {
	StepName  string
	Output    map[string]interface{}
	Duration  time.Duration
	Retries   int
	TokenUsed int
	Error     error
}

// Pipeline 编排管线
type Pipeline struct {
	steps      map[string]*Step
	order      []string       // 拓扑排序后的执行顺序
	resultStore *ResultStore  // 中间结果存储
}

// NewPipeline 创建编排管线
func NewPipeline(steps ...*Step) (*Pipeline, error) {
	p := &Pipeline{
		steps:       make(map[string]*Step),
		resultStore: NewResultStore(),
	}

	for _, s := range steps {
		p.steps[s.Name] = s
	}

	// 拓扑排序
	order, err := p.topologicalSort()
	if err != nil {
		return nil, fmt.Errorf("拓扑排序失败: %w", err)
	}
	p.order = order

	return p, nil
}

// Execute 执行编排管线
func (p *Pipeline) Execute(ctx context.Context, initialInput map[string]interface{}) (*PipelineResult, error) {
	ctx, cancel := context.WithCancel(ctx)
	defer cancel()

	results := make(map[string]*StepResult)
	var mu sync.Mutex

	// 按拓扑序执行
	for _, stepName := range p.order {
		step := p.steps[stepName]

		// 收集上游依赖的输出
		input := p.collectInputs(step, results)
		// 合并初始输入
		for k, v := range initialInput {
			if _, exists := input[k]; !exists {
				input[k] = v
			}
		}

		// 执行步骤(带重试)
		result := p.executeStepWithRetry(ctx, step, input)

		mu.Lock()
		results[stepName] = result
		mu.Unlock()

		// 存储中间结果
		if result.Error == nil {
			p.resultStore.Store(stepName, result.Output)
		} else {
			// 关键步骤失败,终止管线
			return &PipelineResult{
				StepResults: results,
				Error:       fmt.Errorf("步骤 %s 执行失败: %w", stepName, result.Error),
			}, result.Error
		}
	}

	return &PipelineResult{StepResults: results}, nil
}

// executeStepWithRetry 带重试的步骤执行
func (p *Pipeline) executeStepWithRetry(ctx context.Context, step *Step, input map[string]interface{}) *StepResult {
	var result *StepResult

	for attempt := 0; attempt <= step.MaxRetries; attempt++ {
		if attempt > 0 {
			select {
			case <-ctx.Done():
				return &StepResult{StepName: step.Name, Error: ctx.Err()}
			case <-time.After(step.RetryDelay):
			}
		}

		start := time.Now()

		// 带超时的执行
		stepCtx, cancel := context.WithTimeout(ctx, step.Timeout)
		output, err := step.Agent.Execute(stepCtx, input)
		cancel()

		duration := time.Since(start)

		result = &StepResult{
			StepName: step.Name,
			Output:   output,
			Duration: duration,
			Retries:  attempt,
			Error:    err,
		}

		if err == nil {
			return result
		}

		// 不可重试的错误直接返回
		if !isRetryableError(err) {
			return result
		}
	}

	return result
}

// collectInputs 收集上游步骤的输出作为当前步骤的输入
func (p *Pipeline) collectInputs(step *Step, results map[string]*StepResult) map[string]interface{} {
	input := make(map[string]interface{})

	for _, dep := range step.DependsOn {
		if r, ok := results[dep]; ok && r.Error == nil {
			for k, v := range r.Output {
				input[fmt.Sprintf("%s.%s", dep, k)] = v
			}
		}
	}

	return input
}

// topologicalSort 拓扑排序
func (p *Pipeline) topologicalSort() ([]string, error) {
	inDegree := make(map[string]int)
	for name := range p.steps {
		inDegree[name] = 0
	}
	for _, step := range p.steps {
		for _, dep := range step.DependsOn {
			inDegree[step.Name]++
		}
	}

	var queue []string
	for name, degree := range inDegree {
		if degree == 0 {
			queue = append(queue, name)
		}
	}

	var order []string
	for len(queue) > 0 {
		name := queue[0]
		queue = queue[1:]
		order = append(order, name)

		for _, step := range p.steps {
			for _, dep := range step.DependsOn {
				if dep == name {
					inDegree[step.Name]--
					if inDegree[step.Name] == 0 {
						queue = append(queue, step.Name)
					}
				}
			}
		}
	}

	if len(order) != len(p.steps) {
		return nil, fmt.Errorf("存在循环依赖")
	}

	return order, nil
}

func isRetryableError(err error) bool {
	// 超时、限流、网络错误可重试
	return true
}

// PipelineResult 管线执行结果
type PipelineResult struct {
	StepResults map[string]*StepResult
	Error       error
}

// ResultStore 中间结果存储
type ResultStore struct {
	mu     sync.RWMutex
	store  map[string]map[string]interface{}
}

func NewResultStore() *ResultStore {
	return &ResultStore{
		store: make(map[string]map[string]interface{}),
	}
}

func (rs *ResultStore) Store(stepName string, output map[string]interface{}) {
	rs.mu.Lock()
	defer rs.mu.Unlock()
	rs.store[stepName] = output
}

func (rs *ResultStore) Get(stepName string) (map[string]interface{}, bool) {
	rs.mu.RLock()
	defer rs.mu.RUnlock()
	out, ok := rs.store[stepName]
	return out, ok
}

三、上下文管理与 Token 控制

3.1 上下文压缩策略

flowchart LR
    A[原始上下文<br/>~8000 tokens] --> B[关键信息提取<br/>保留决策相关内容]
    B --> C[摘要压缩<br/>~2000 tokens]
    C --> D[滑动窗口<br/>保留最近 N 轮]
    D --> E[压缩后上下文<br/>~3000 tokens]

    A --> F[Token 预算分配]
    F --> F1[系统提示: 500]
    F --> F2[上下文: 3000]
    F --> F3[当前输入: 1000]
    F --> F4[输出预留: 1500]

3.2 Token 预算管理器

// Token 预算管理器
// 核心思路:给每一步分配 Token 预算,超了就压缩

package agent

import "fmt"

type TokenBudget struct {
	Total       int
	System      int
	Context     int
	Input       int
	Output      int
	Used        int
}

func NewTokenBudget(total int) *TokenBudget {
	return &TokenBudget{
		Total:   total,
		System:  int(float64(total) * 0.1),   // 10% 给系统提示
		Context: int(float64(total) * 0.4),    // 40% 给上下文
		Input:   int(float64(total) * 0.2),    // 20% 给当前输入
		Output:  int(float64(total) * 0.3),    // 30% 给输出
	}
}

func (tb *TokenBudget) Allocate(stepName string, estimatedTokens int) (int, error) {
	remaining := tb.Total - tb.Used
	if estimatedTokens > remaining {
		// 压缩上下文释放空间
		released := tb.compressContext(estimatedTokens - remaining)
		remaining += released
	}

	if estimatedTokens > remaining {
		return 0, fmt.Errorf("Token 预算不足: 需要 %d, 剩余 %d", estimatedTokens, remaining)
	}

	tb.Used += estimatedTokens
	return estimatedTokens, nil
}

func (tb *TokenBudget) compressContext(needed int) int {
	// 实际实现:摘要压缩、滑动窗口、关键信息提取
	// 这里返回模拟值
	released := needed
	if released > tb.Context/2 {
		released = tb.Context / 2
	}
	tb.Context -= released
	return released
}

func (tb *TokenBudget) Report() string {
	return fmt.Sprintf(
		"Token 预算: 总计=%d, 已用=%d(%0.1f%%), 剩余=%d",
		tb.Total, tb.Used, float64(tb.Used)/float64(tb.Total)*100, tb.Total-tb.Used,
	)
}

四、错误处理与可观测性

4.1 错误分类与处理策略

错误类型 示例 处理策略
可重试 API 限流、网络超时 指数退避重试
可降级 某步骤输出质量低 跳过或用默认值替代
不可恢复 API Key 失效、模型不存在 立即终止,返回错误
部分失败 并行步骤中部分失败 聚合成功结果,标记失败步骤

4.2 链路追踪集成

// Agent 编排的链路追踪
// 接入 OpenTelemetry,每一步都是一次 Span

package agent

import (
	"context"
	"go.opentelemetry.io/otel"
	"go.opentelemetry.io/otel/attribute"
	"go.opentelemetry.io/otel/trace"
)

func (p *Pipeline) executeStepWithTracing(ctx context.Context, step *Step, input map[string]interface{}) (*StepResult, error) {
	tracer := otel.Tracer("agent-pipeline")
	ctx, span := tracer.Start(ctx, fmt.Sprintf("step.%s", step.Name),
		trace.WithAttributes(
			attribute.String("agent.name", step.Agent.Name()),
			attribute.Int("step.max_retries", step.MaxRetries),
			attribute.String("step.timeout", step.Timeout.String()),
		),
	)
	defer span.End()

	result := p.executeStepWithRetry(ctx, step, input)

	// 记录结果到 Span
	span.SetAttributes(
		attribute.Int("step.retries", result.Retries),
		attribute.Int("step.tokens_used", result.TokenUsed),
		attribute.String("step.duration", result.Duration.String()),
	)

	if result.Error != nil {
		span.RecordError(result.Error)
		span.SetAttributes(attribute.Bool("step.success", false))
	} else {
		span.SetAttributes(attribute.Bool("step.success", true))
	}

	return result, nil
}

五、边界分析与架构权衡

5.1 编排模式对比

模式 适用场景 复杂度 灵活性
线性链 简单的 A→B→C 流程
DAG 有依赖的多步流程
状态机 需要条件分支和循环 最高
ReAct 循环 Agent 自主决策 最高但不可控

生产环境推荐 DAG + 状态机混合:主流程用 DAG 定义,条件分支用状态机处理。ReAct 循环在可控场景下可用,但需要设置最大步数和 Token 上限。

5.2 关键设计决策

同步 vs 异步执行。 步骤间有依赖时必须同步,无依赖时可并行。并行执行的步骤需要聚合机制,推荐用 WaitGroup + 超时控制。

上下文传递方式。 全量传递简单但 Token 浪费严重,按需传递高效但实现复杂。推荐按需传递 + 压缩策略:每一步只接收依赖步骤的输出,长上下文自动摘要。

失败策略。 全部回滚太重,部分降级更实用。推荐策略:关键步骤失败终止管线,非关键步骤失败降级继续。

六、总结

Agent 编排的核心不是调 API,是工程化。三个要点:

第一,DAG 驱动,拓扑排序确定执行顺序。依赖关系必须显式声明,不能靠调用顺序隐式保证。拓扑排序 + 依赖输入收集,这是编排引擎的骨架。

第二,Token 预算必须管。不管 Token 预算的 Agent 编排,跟不管内存的程序一样,迟早爆。每一步分配预算,超了就压缩上下文,这是生产环境的基本操作。

第三,可观测性不是锦上添花,是必需品。链路追踪、Token 统计、步骤耗时,这些数据是排查问题的基础。没有可观测性的编排系统,出了问题就是黑盒。

别整虚的,把编排引擎当基础设施来建设,Agent 才能真正跑起来。

五、总结

围绕“AI Agent 编排引擎实战:从单次调用到多步推理的调度架构”,更稳妥的落地方式不是一次性追求完整平台,而是先确定核心路径,再把复杂能力逐步收敛到可验证的模块。第一步,明确输入、输出和失败边界,避免把不稳定因素藏在默认配置里。第二步,优先实现最小闭环,用真实数据验证性能、稳定性和维护成本。第三步,把监控、告警和回滚策略前置到设计阶段,而不是上线后再补。

后续迭代可以从三个方向推进:补齐自动化测试,覆盖正常路径、边界路径和异常路径;建立基准数据,持续比较版本变化带来的收益和副作用;沉淀操作手册,把排障步骤、指标含义和禁用场景写清楚。只要这些基础工作到位,方案就不会停留在概念层,而能成为团队可以长期维护的工程资产。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐