多 Agent 协作的状态地狱:AI Agent 编排框架中的上下文管理与故障恢复
多 Agent 协作的状态地狱:AI Agent 编排框架中的上下文管理与故障恢复
一、三个 Agent 丢了一个包:多步编排中的状态丢失问题
构建一个"需求分析 -> 代码生成 -> 测试验证"的三 Agent 编排流水线,看起来逻辑清晰。实际运行时却发现:代码生成 Agent 偶尔丢失需求分析 Agent 传递的关键约束,测试验证 Agent 有时拿不到代码生成 Agent 的完整输出。更严重的是,当测试 Agent 发现代码错误需要回退到代码生成 Agent 重新生成时,整个流水线的上下文状态已经丢失,只能从头开始。
这不是 Agent 能力不足,而是编排框架缺少可靠的状态管理机制。多 Agent 编排的核心挑战不是"让每个 Agent 做好本职工作",而是"在 Agent 之间可靠地传递和维护上下文状态"。
本文从状态管理模型、故障恢复策略、上下文压缩三个维度,拆解 AI Agent 编排框架的工程实践。
二、多 Agent 编排的状态流转与故障模式
stateDiagram-v2
[*] --> INIT: 创建编排实例
INIT --> ANALYZING: 启动需求分析 Agent
ANALYZING --> CODE_GEN: 分析完成,传递需求文档
ANALYZING --> ANALYZING_FAILED: 分析超时/格式错误
ANALYZING_FAILED --> ANALYZING: 重试(保留已有上下文)
CODE_GEN --> TESTING: 代码生成完成,传递代码 + 需求
CODE_GEN --> CODE_GEN_FAILED: 生成超时/语法错误
CODE_GEN_FAILED --> CODE_GEN: 重试(保留需求文档)
TESTING --> COMPLETED: 测试通过
TESTING --> CODE_GEN: 测试失败,回退到代码生成
TESTING --> TESTING_FAILED: 测试框架异常
COMPLETED --> [*]
TESTING_FAILED --> [*]: 超过最大重试次数
note right of CODE_GEN: 关键:回退时必须保留<br/>需求分析阶段的上下文
note right of TESTING: 关键:测试失败时<br/>必须传递错误详情给代码生成
三种故障模式:
- 上下文丢失:Agent 之间传递的消息被截断或遗漏,下游 Agent 缺少关键信息。
- 状态不一致:回退到上游 Agent 时,上游 Agent 的内部状态已丢失,无法从断点继续。
- 上下文膨胀:每一步都把前序所有 Agent 的完整输出拼接到 Prompt 中,导致 Token 数爆炸,超出模型上下文窗口。
三、生产级 Agent 编排框架实现
3.1 持久化状态管理器
"""
Agent 编排状态管理器——确保上下文在 Agent 间可靠传递
设计意图:Agent 之间的消息传递不能依赖内存,必须持久化以支持故障恢复
"""
import json
from dataclasses import dataclass, field, asdict
from datetime import datetime
from enum import Enum
from typing import Any, Optional
from pathlib import Path
class AgentStep(Enum):
ANALYZE = "analyze"
CODE_GEN = "code_gen"
TEST = "test"
class StepStatus(Enum):
PENDING = "pending"
RUNNING = "running"
COMPLETED = "completed"
FAILED = "failed"
@dataclass
class StepContext:
"""单个步骤的上下文数据"""
step: AgentStep
status: StepStatus
input_data: dict[str, Any] # 该步骤的输入
output_data: Optional[dict] = None # 该步骤的输出
error: Optional[str] = None
started_at: Optional[str] = None
completed_at: Optional[str] = None
retry_count: int = 0
@dataclass
class OrchestrationState:
"""编排实例的全局状态——所有 Agent 共享"""
session_id: str
current_step: AgentStep
steps: dict[AgentStep, StepContext] = field(default_factory=dict)
max_retries: int = 3
created_at: str = field(default_factory=lambda: datetime.now().isoformat())
def get_context_for_step(self, step: AgentStep) -> dict[str, Any]:
"""
获取指定步骤所需的上下文——只包含该步骤需要的信息
设计意图:不是把所有前序步骤的输出都塞进 Prompt,
而是只传递当前步骤真正需要的字段,控制 Token 消耗
"""
context = {}
if step == AgentStep.CODE_GEN:
# 代码生成只需要需求文档,不需要分析过程的中间产物
analyze_ctx = self.steps.get(AgentStep.ANALYZE)
if analyze_ctx and analyze_ctx.output_data:
context["requirements"] = analyze_ctx.output_data.get("requirements_doc")
context["constraints"] = analyze_ctx.output_data.get("constraints")
elif step == AgentStep.TEST:
# 测试需要代码和需求文档,用于验证代码是否满足需求
code_ctx = self.steps.get(AgentStep.CODE_GEN)
analyze_ctx = self.steps.get(AgentStep.ANALYZE)
if code_ctx and code_ctx.output_data:
context["code"] = code_ctx.output_data.get("generated_code")
context["test_hints"] = code_ctx.output_data.get("test_hints")
if analyze_ctx and analyze_ctx.output_data:
context["acceptance_criteria"] = analyze_ctx.output_data.get(
"acceptance_criteria"
)
return context
class StateManager:
"""持久化状态管理器——将编排状态写入磁盘,支持故障恢复"""
def __init__(self, state_dir: Path):
self.state_dir = state_dir
self.state_dir.mkdir(parents=True, exist_ok=True)
def save(self, state: OrchestrationState) -> None:
"""持久化当前状态——每次步骤变更后必须调用"""
state_file = self.state_dir / f"{state.session_id}.json"
# 使用原子写入,避免写入中断导致状态文件损坏
temp_file = state_file.with_suffix(".tmp")
with open(temp_file, "w") as f:
json.dump(asdict(state), f, ensure_ascii=False, indent=2)
temp_file.rename(state_file) # 原子操作
def load(self, session_id: str) -> Optional[OrchestrationState]:
"""从磁盘恢复状态——故障重启后调用"""
state_file = self.state_dir / f"{session_id}.json"
if not state_file.exists():
return None
with open(state_file) as f:
data = json.load(f)
# 反序列化为 OrchestrationState 对象
steps = {}
for step_name, ctx_data in data["steps"].items():
steps[AgentStep(step_name)] = StepContext(**ctx_data)
return OrchestrationState(
session_id=data["session_id"],
current_step=AgentStep(data["current_step"]),
steps=steps,
max_retries=data["max_retries"],
created_at=data["created_at"],
)
3.2 故障恢复与回退策略
"""
编排执行器——处理 Agent 执行失败时的回退和重试
设计意图:Agent 调用 LLM 天然存在不确定性,必须从架构层面处理失败
"""
import logging
logger = logging.getLogger(__name__)
class OrchestrationExecutor:
def __init__(self, state_manager: StateManager):
self.state_manager = state_manager
self.agents = {
AgentStep.ANALYZE: AnalyzeAgent(),
AgentStep.CODE_GEN: CodeGenAgent(),
AgentStep.TEST: TestAgent(),
}
def execute(self, state: OrchestrationState) -> OrchestrationState:
"""执行编排流水线,支持故障恢复"""
while state.current_step != AgentStep.TEST or \
state.steps.get(AgentStep.TEST, StepContext(
step=AgentStep.TEST, status=StepStatus.PENDING, input_data={}
)).status != StepStatus.COMPLETED:
step = state.current_step
step_ctx = state.steps.get(step) or StepContext(
step=step, status=StepStatus.PENDING, input_data={}
)
# 获取当前步骤所需的上下文
context = state.get_context_for_step(step)
try:
step_ctx.status = StepStatus.RUNNING
step_ctx.started_at = datetime.now().isoformat()
self.state_manager.save(state)
# 执行 Agent
agent = self.agents[step]
result = agent.execute(context)
step_ctx.output_data = result
step_ctx.status = StepStatus.COMPLETED
step_ctx.completed_at = datetime.now().isoformat()
state.steps[step] = step_ctx
# 推进到下一步
state.current_step = self._next_step(step)
except AgentExecutionError as e:
step_ctx.status = StepStatus.FAILED
step_ctx.error = str(e)
step_ctx.retry_count += 1
state.steps[step] = step_ctx
if step_ctx.retry_count > state.max_retries:
logger.error(
f"步骤 {step.value} 超过最大重试次数 "
f"({state.max_retries}),编排终止"
)
break
# 测试失败时回退到代码生成,而非重试测试
if step == AgentStep.TEST:
logger.info("测试失败,回退到代码生成阶段")
state.current_step = AgentStep.CODE_GEN
# 将测试错误信息注入代码生成的上下文
code_ctx = state.steps[AgentStep.CODE_GEN]
code_ctx.input_data["test_failure"] = str(e)
code_ctx.status = StepStatus.PENDING # 重置状态
else:
logger.info(f"步骤 {step.value} 失败,重试中 "
f"({step_ctx.retry_count}/{state.max_retries})")
finally:
self.state_manager.save(state)
return state
def _next_step(self, current: AgentStep) -> AgentStep:
step_order = [AgentStep.ANALYZE, AgentStep.CODE_GEN, AgentStep.TEST]
idx = step_order.index(current)
if idx + 1 < len(step_order):
return step_order[idx + 1]
return current
3.3 上下文压缩:控制 Token 消耗
"""
上下文压缩器——在 Agent 间传递时压缩上下文,控制 Token 消耗
设计意图:原始输出可能包含大量冗余信息,压缩后只保留下游 Agent 需要的关键字段
"""
from typing import Any
class ContextCompressor:
# 每个步骤的输出模板——定义必须保留的字段
OUTPUT_SCHEMAS = {
AgentStep.ANALYZE: {
"requirements_doc": str, # 需求文档摘要(非原文)
"constraints": list[str], # 约束条件列表
"acceptance_criteria": list[str], # 验收标准
},
AgentStep.CODE_GEN: {
"generated_code": str, # 生成的代码
"test_hints": list[str], # 测试建议
"assumptions": list[str], # 代码中的假设(需测试验证)
},
}
def compress(self, step: AgentStep, raw_output: dict[str, Any]) -> dict[str, Any]:
"""
压缩 Agent 输出——只保留下游需要的字段
原始输出可能包含思维过程、中间结果等冗余信息,
压缩后只保留结构化的关键输出
"""
schema = self.OUTPUT_SCHEMAS.get(step, {})
compressed = {}
for field_name, field_type in schema.items():
value = raw_output.get(field_name)
if value is None:
continue
# 对字符串类型做长度截断,防止单个字段过长
if field_type == str and isinstance(value, str):
max_len = 2000 # 单字段最大 2000 字符
if len(value) > max_len:
value = value[:max_len] + "\n...[已截断]"
compressed[field_name] = value
return compressed
四、Agent 编排框架的架构权衡
4.1 持久化存储的延迟代价
每次步骤变更都写入磁盘,增加了约 5-10ms 的延迟。对于短链路编排(3-5 步),总延迟增加可忽略。但对于高频编排(每秒 100+ 实例),磁盘 I/O 可能成为瓶颈。解决方案:使用 Redis 替代文件系统,将延迟降到 1ms 以下。
4.2 上下文压缩的信息损失
压缩会丢弃 Agent 的思维过程和中间推理步骤。当下游 Agent 需要理解"为什么得出这个结论"时,压缩后的上下文可能不够。解决方案:在压缩时保留 reasoning_summary 字段,用 2-3 句话概括推理逻辑。
4.3 回退策略的循环风险
测试失败回退到代码生成,代码生成后再次测试失败,形成循环。必须设置最大回退次数(如 3 次),超过后终止编排并输出诊断报告。
4.4 适用场景与禁用场景
| 场景 | 是否适用 | 原因 |
|---|---|---|
| 代码生成流水线 | 适用 | 步骤间有明确依赖,回退逻辑清晰 |
| 数据分析流水线 | 适用 | 多步骤处理,中间结果需要持久化 |
| 实时对话 Agent | 不适用 | 单步交互,无需复杂状态管理 |
| 简单工具调用链 | 不适用 | 开销大于收益,直接串联调用即可 |
五、总结
AI Agent 编排的核心工程挑战是状态管理,而非 Agent 本身的能力:
- 持久化状态:Agent 之间的上下文不能只存在内存中,必须持久化以支持故障恢复。使用原子写入避免状态文件损坏。
- 按需传递上下文:不是把所有前序输出都塞进 Prompt,而是只传递当前步骤需要的字段。用输出模板约束每个步骤的产出结构。
- 故障恢复与回退:Agent 调用 LLM 天然存在不确定性,必须从架构层面处理失败。测试失败时回退到代码生成,而非盲目重试。
- 上下文压缩:对长输出做截断和摘要,控制 Token 消耗,避免超出模型上下文窗口。
落地路线:先实现持久化状态管理器,确保上下文不丢失;再实现故障恢复和回退策略;最后加入上下文压缩。每一步用端到端测试验证:随机注入 Agent 失败,确认状态能正确恢复。
更多推荐


所有评论(0)