多 Agent 协作的状态地狱:AI Agent 编排框架中的上下文管理与故障恢复

一、三个 Agent 丢了一个包:多步编排中的状态丢失问题

构建一个"需求分析 -> 代码生成 -> 测试验证"的三 Agent 编排流水线,看起来逻辑清晰。实际运行时却发现:代码生成 Agent 偶尔丢失需求分析 Agent 传递的关键约束,测试验证 Agent 有时拿不到代码生成 Agent 的完整输出。更严重的是,当测试 Agent 发现代码错误需要回退到代码生成 Agent 重新生成时,整个流水线的上下文状态已经丢失,只能从头开始。

这不是 Agent 能力不足,而是编排框架缺少可靠的状态管理机制。多 Agent 编排的核心挑战不是"让每个 Agent 做好本职工作",而是"在 Agent 之间可靠地传递和维护上下文状态"。

本文从状态管理模型、故障恢复策略、上下文压缩三个维度,拆解 AI Agent 编排框架的工程实践。

二、多 Agent 编排的状态流转与故障模式

stateDiagram-v2
    [*] --> INIT: 创建编排实例
    INIT --> ANALYZING: 启动需求分析 Agent
    ANALYZING --> CODE_GEN: 分析完成,传递需求文档
    ANALYZING --> ANALYZING_FAILED: 分析超时/格式错误
    ANALYZING_FAILED --> ANALYZING: 重试(保留已有上下文)

    CODE_GEN --> TESTING: 代码生成完成,传递代码 + 需求
    CODE_GEN --> CODE_GEN_FAILED: 生成超时/语法错误
    CODE_GEN_FAILED --> CODE_GEN: 重试(保留需求文档)

    TESTING --> COMPLETED: 测试通过
    TESTING --> CODE_GEN: 测试失败,回退到代码生成
    TESTING --> TESTING_FAILED: 测试框架异常

    COMPLETED --> [*]
    TESTING_FAILED --> [*]: 超过最大重试次数

    note right of CODE_GEN: 关键:回退时必须保留<br/>需求分析阶段的上下文
    note right of TESTING: 关键:测试失败时<br/>必须传递错误详情给代码生成

三种故障模式:

  1. 上下文丢失:Agent 之间传递的消息被截断或遗漏,下游 Agent 缺少关键信息。
  2. 状态不一致:回退到上游 Agent 时,上游 Agent 的内部状态已丢失,无法从断点继续。
  3. 上下文膨胀:每一步都把前序所有 Agent 的完整输出拼接到 Prompt 中,导致 Token 数爆炸,超出模型上下文窗口。

三、生产级 Agent 编排框架实现

3.1 持久化状态管理器

"""
Agent 编排状态管理器——确保上下文在 Agent 间可靠传递
设计意图:Agent 之间的消息传递不能依赖内存,必须持久化以支持故障恢复
"""
import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from pathlib import Path

class AgentStep(Enum):
    ANALYZE = "analyze"
    CODE_GEN = "code_gen"
    TEST = "test"

class StepStatus(Enum):
    PENDING = "pending"
    RUNNING = "running"
    COMPLETED = "completed"
    FAILED = "failed"

@dataclass
class StepContext:
    """单个步骤的上下文数据"""
    step: AgentStep
    status: StepStatus
    input_data: dict[str, Any]     # 该步骤的输入
    output_data: Optional[dict] = None  # 该步骤的输出
    error: Optional[str] = None
    started_at: Optional[str] = None
    completed_at: Optional[str] = None
    retry_count: int = 0

@dataclass
class OrchestrationState:
    """编排实例的全局状态——所有 Agent 共享"""
    session_id: str
    current_step: AgentStep
    steps: dict[AgentStep, StepContext] = field(default_factory=dict)
    max_retries: int = 3
    created_at: str = field(default_factory=lambda: datetime.now().isoformat())

    def get_context_for_step(self, step: AgentStep) -> dict[str, Any]:
        """
        获取指定步骤所需的上下文——只包含该步骤需要的信息
        设计意图:不是把所有前序步骤的输出都塞进 Prompt,
        而是只传递当前步骤真正需要的字段,控制 Token 消耗
        """
        context = {}

        if step == AgentStep.CODE_GEN:
            # 代码生成只需要需求文档,不需要分析过程的中间产物
            analyze_ctx = self.steps.get(AgentStep.ANALYZE)
            if analyze_ctx and analyze_ctx.output_data:
                context["requirements"] = analyze_ctx.output_data.get("requirements_doc")
                context["constraints"] = analyze_ctx.output_data.get("constraints")

        elif step == AgentStep.TEST:
            # 测试需要代码和需求文档,用于验证代码是否满足需求
            code_ctx = self.steps.get(AgentStep.CODE_GEN)
            analyze_ctx = self.steps.get(AgentStep.ANALYZE)
            if code_ctx and code_ctx.output_data:
                context["code"] = code_ctx.output_data.get("generated_code")
                context["test_hints"] = code_ctx.output_data.get("test_hints")
            if analyze_ctx and analyze_ctx.output_data:
                context["acceptance_criteria"] = analyze_ctx.output_data.get(
                    "acceptance_criteria"
                )

        return context

class StateManager:
    """持久化状态管理器——将编排状态写入磁盘,支持故障恢复"""

    def __init__(self, state_dir: Path):
        self.state_dir = state_dir
        self.state_dir.mkdir(parents=True, exist_ok=True)

    def save(self, state: OrchestrationState) -> None:
        """持久化当前状态——每次步骤变更后必须调用"""
        state_file = self.state_dir / f"{state.session_id}.json"
        # 使用原子写入,避免写入中断导致状态文件损坏
        temp_file = state_file.with_suffix(".tmp")
        with open(temp_file, "w") as f:
            json.dump(asdict(state), f, ensure_ascii=False, indent=2)
        temp_file.rename(state_file)  # 原子操作

    def load(self, session_id: str) -> Optional[OrchestrationState]:
        """从磁盘恢复状态——故障重启后调用"""
        state_file = self.state_dir / f"{session_id}.json"
        if not state_file.exists():
            return None
        with open(state_file) as f:
            data = json.load(f)
        # 反序列化为 OrchestrationState 对象
        steps = {}
        for step_name, ctx_data in data["steps"].items():
            steps[AgentStep(step_name)] = StepContext(**ctx_data)
        return OrchestrationState(
            session_id=data["session_id"],
            current_step=AgentStep(data["current_step"]),
            steps=steps,
            max_retries=data["max_retries"],
            created_at=data["created_at"],
        )

3.2 故障恢复与回退策略

"""
编排执行器——处理 Agent 执行失败时的回退和重试
设计意图:Agent 调用 LLM 天然存在不确定性,必须从架构层面处理失败
"""
import logging

logger = logging.getLogger(__name__)

class OrchestrationExecutor:
    def __init__(self, state_manager: StateManager):
        self.state_manager = state_manager
        self.agents = {
            AgentStep.ANALYZE: AnalyzeAgent(),
            AgentStep.CODE_GEN: CodeGenAgent(),
            AgentStep.TEST: TestAgent(),
        }

    def execute(self, state: OrchestrationState) -> OrchestrationState:
        """执行编排流水线,支持故障恢复"""
        while state.current_step != AgentStep.TEST or \
              state.steps.get(AgentStep.TEST, StepContext(
                  step=AgentStep.TEST, status=StepStatus.PENDING, input_data={}
              )).status != StepStatus.COMPLETED:

            step = state.current_step
            step_ctx = state.steps.get(step) or StepContext(
                step=step, status=StepStatus.PENDING, input_data={}
            )

            # 获取当前步骤所需的上下文
            context = state.get_context_for_step(step)

            try:
                step_ctx.status = StepStatus.RUNNING
                step_ctx.started_at = datetime.now().isoformat()
                self.state_manager.save(state)

                # 执行 Agent
                agent = self.agents[step]
                result = agent.execute(context)

                step_ctx.output_data = result
                step_ctx.status = StepStatus.COMPLETED
                step_ctx.completed_at = datetime.now().isoformat()
                state.steps[step] = step_ctx

                # 推进到下一步
                state.current_step = self._next_step(step)

            except AgentExecutionError as e:
                step_ctx.status = StepStatus.FAILED
                step_ctx.error = str(e)
                step_ctx.retry_count += 1
                state.steps[step] = step_ctx

                if step_ctx.retry_count > state.max_retries:
                    logger.error(
                        f"步骤 {step.value} 超过最大重试次数 "
                        f"({state.max_retries}),编排终止"
                    )
                    break

                # 测试失败时回退到代码生成,而非重试测试
                if step == AgentStep.TEST:
                    logger.info("测试失败,回退到代码生成阶段")
                    state.current_step = AgentStep.CODE_GEN
                    # 将测试错误信息注入代码生成的上下文
                    code_ctx = state.steps[AgentStep.CODE_GEN]
                    code_ctx.input_data["test_failure"] = str(e)
                    code_ctx.status = StepStatus.PENDING  # 重置状态
                else:
                    logger.info(f"步骤 {step.value} 失败,重试中 "
                                f"({step_ctx.retry_count}/{state.max_retries})")

            finally:
                self.state_manager.save(state)

        return state

    def _next_step(self, current: AgentStep) -> AgentStep:
        step_order = [AgentStep.ANALYZE, AgentStep.CODE_GEN, AgentStep.TEST]
        idx = step_order.index(current)
        if idx + 1 < len(step_order):
            return step_order[idx + 1]
        return current

3.3 上下文压缩:控制 Token 消耗

"""
上下文压缩器——在 Agent 间传递时压缩上下文,控制 Token 消耗
设计意图:原始输出可能包含大量冗余信息,压缩后只保留下游 Agent 需要的关键字段
"""
from typing import Any

class ContextCompressor:
    # 每个步骤的输出模板——定义必须保留的字段
    OUTPUT_SCHEMAS = {
        AgentStep.ANALYZE: {
            "requirements_doc": str,      # 需求文档摘要(非原文)
            "constraints": list[str],     # 约束条件列表
            "acceptance_criteria": list[str],  # 验收标准
        },
        AgentStep.CODE_GEN: {
            "generated_code": str,        # 生成的代码
            "test_hints": list[str],      # 测试建议
            "assumptions": list[str],     # 代码中的假设(需测试验证)
        },
    }

    def compress(self, step: AgentStep, raw_output: dict[str, Any]) -> dict[str, Any]:
        """
        压缩 Agent 输出——只保留下游需要的字段
        原始输出可能包含思维过程、中间结果等冗余信息,
        压缩后只保留结构化的关键输出
        """
        schema = self.OUTPUT_SCHEMAS.get(step, {})
        compressed = {}
        for field_name, field_type in schema.items():
            value = raw_output.get(field_name)
            if value is None:
                continue
            # 对字符串类型做长度截断,防止单个字段过长
            if field_type == str and isinstance(value, str):
                max_len = 2000  # 单字段最大 2000 字符
                if len(value) > max_len:
                    value = value[:max_len] + "\n...[已截断]"
            compressed[field_name] = value
        return compressed

四、Agent 编排框架的架构权衡

4.1 持久化存储的延迟代价

每次步骤变更都写入磁盘,增加了约 5-10ms 的延迟。对于短链路编排(3-5 步),总延迟增加可忽略。但对于高频编排(每秒 100+ 实例),磁盘 I/O 可能成为瓶颈。解决方案:使用 Redis 替代文件系统,将延迟降到 1ms 以下。

4.2 上下文压缩的信息损失

压缩会丢弃 Agent 的思维过程和中间推理步骤。当下游 Agent 需要理解"为什么得出这个结论"时,压缩后的上下文可能不够。解决方案:在压缩时保留 reasoning_summary 字段,用 2-3 句话概括推理逻辑。

4.3 回退策略的循环风险

测试失败回退到代码生成,代码生成后再次测试失败,形成循环。必须设置最大回退次数(如 3 次),超过后终止编排并输出诊断报告。

4.4 适用场景与禁用场景

场景 是否适用 原因
代码生成流水线 适用 步骤间有明确依赖,回退逻辑清晰
数据分析流水线 适用 多步骤处理,中间结果需要持久化
实时对话 Agent 不适用 单步交互,无需复杂状态管理
简单工具调用链 不适用 开销大于收益,直接串联调用即可

五、总结

AI Agent 编排的核心工程挑战是状态管理,而非 Agent 本身的能力:

  1. 持久化状态:Agent 之间的上下文不能只存在内存中,必须持久化以支持故障恢复。使用原子写入避免状态文件损坏。
  2. 按需传递上下文:不是把所有前序输出都塞进 Prompt,而是只传递当前步骤需要的字段。用输出模板约束每个步骤的产出结构。
  3. 故障恢复与回退:Agent 调用 LLM 天然存在不确定性,必须从架构层面处理失败。测试失败时回退到代码生成,而非盲目重试。
  4. 上下文压缩:对长输出做截断和摘要,控制 Token 消耗,避免超出模型上下文窗口。

落地路线:先实现持久化状态管理器,确保上下文不丢失;再实现故障恢复和回退策略;最后加入上下文压缩。每一步用端到端测试验证:随机注入 Agent 失败,确认状态能正确恢复。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐