AI多智能体协作系统的状态管理工程:让Agent记住正在做什么
考虑这样的场景:一个AI Agent被委托"分析我们公司Q1的销售数据,找出TOP10客户,生成详细报告,并制定下季度的客户维护策略"。这个任务需要:- 多个步骤的顺序执行- 在步骤之间传递数据- 在某步骤失败时恢复,不从头开始- 支持人工在某步骤介入审查- 任务可能跨越多个会话(今天开始,明天继续)## 用LangGraph实现有状态的Agent工作流LangGraph是目前最成熟的有状态Age
为什么Agent需要状态管理?
一个处理单次用户问题的AI Assistant不需要复杂的状态管理——请求来了,调用LLM,返回答案,结束。每次请求都是独立的。但当AI系统承担的任务从"回答一个问题"升级为"完成一个项目",状态管理就变成了工程核心。考虑这样的场景:一个AI Agent被委托"分析我们公司Q1的销售数据,找出TOP10客户,生成详细报告,并制定下季度的客户维护策略"。这个任务需要:- 多个步骤的顺序执行- 在步骤之间传递数据- 在某步骤失败时恢复,不从头开始- 支持人工在某步骤介入审查- 任务可能跨越多个会话(今天开始,明天继续)这就是Agent状态管理要解决的核心问题:让一个有多个步骤的长任务,在时间和错误面前保持连贯性。## Agent状态的分类Agent状态不是一个整体,而是分层的:┌─────────────────────────────────────┐│ 会话级状态(Session) ││ 当前对话的上下文和短期记忆 ││ ││ ┌─────────────────────────────┐ ││ │ 任务级状态(Task) │ ││ │ 当前正在执行的任务进度 │ ││ │ │ ││ │ ┌───────────────────────┐ │ ││ │ │ 步骤级状态(Step) │ │ ││ │ │ 每个执行步骤的输入 │ │ ││ │ │ 输出和中间结果 │ │ ││ │ └───────────────────────┘ │ ││ └─────────────────────────────┘ ││ ││ ┌─────────────────────────────┐ ││ │ 长期记忆(Memory) │ ││ │ 跨会话的用户偏好和知识库 │ ││ └─────────────────────────────┘ │└─────────────────────────────────────┘不同层次的状态需要不同的存储策略和生命周期管理。## 用LangGraph实现有状态的Agent工作流LangGraph是目前最成熟的有状态Agent工作流框架,它将Agent的执行建模为一个有向图,每个节点是一个处理步骤,图的状态在节点间流转。### 基础状态机pythonfrom typing import TypedDict, Annotated, List, Optionalfrom langgraph.graph import StateGraph, ENDfrom langgraph.checkpoint.sqlite import SqliteSaverimport operatorimport json# 定义状态结构(TypedDict确保类型安全)class ResearchAgentState(TypedDict): # 任务基本信息 task_id: str task_description: str created_at: str # 执行状态 current_step: str completed_steps: List[str] is_complete: bool # 数据流 search_results: List[dict] analysis_results: dict final_report: Optional[str] # 错误处理 errors: Annotated[List[str], operator.add] # 追加语义 retry_count: int # 人工介入 requires_human_approval: bool human_feedback: Optional[str]def search_step(state: ResearchAgentState) -> ResearchAgentState: """步骤1:搜索收集信息""" print(f"[步骤1] 搜索任务:{state['task_description']}") # 实际搜索逻辑 # results = search_engine.search(state["task_description"]) results = [ {"title": "相关文章1", "content": "..."}, {"title": "相关文章2", "content": "..."} ] return { **state, "search_results": results, "current_step": "analysis", "completed_steps": state["completed_steps"] + ["search"] }def analysis_step(state: ResearchAgentState) -> ResearchAgentState: """步骤2:分析搜索结果""" print(f"[步骤2] 分析 {len(state['search_results'])} 条搜索结果") if not state["search_results"]: return { **state, "errors": ["搜索结果为空,无法分析"], "current_step": "error" } # 分析逻辑... analysis = { "key_findings": ["发现1", "发现2"], "confidence": 0.85, "requires_verification": False } # 如果置信度低,需要人工审查 needs_approval = analysis["confidence"] < 0.7 return { **state, "analysis_results": analysis, "requires_human_approval": needs_approval, "current_step": "human_review" if needs_approval else "report", "completed_steps": state["completed_steps"] + ["analysis"] }def human_review_step(state: ResearchAgentState) -> ResearchAgentState: """步骤3(可选):等待人工审查""" print("[步骤3] 等待人工审查...") print(f"分析结果:{json.dumps(state['analysis_results'], ensure_ascii=False, indent=2)}") # 在实际系统中,这里会发送通知并等待 # 使用LangGraph的interrupt功能实现 feedback = input("请输入审查意见(直接回车表示通过):") return { **state, "human_feedback": feedback or "approved", "current_step": "report", "completed_steps": state["completed_steps"] + ["human_review"] }def report_generation_step(state: ResearchAgentState) -> ResearchAgentState: """步骤4:生成最终报告""" print("[步骤4] 生成报告...") # 整合所有信息生成报告 report = f"""# 任务报告:{state['task_description']}## 执行摘要- 搜索到 {len(state['search_results'])} 条相关资料- 关键发现:{', '.join(state['analysis_results'].get('key_findings', []))}{f"- 人工反馈:{state['human_feedback']}" if state.get('human_feedback') else ''}## 详细分析{json.dumps(state['analysis_results'], ensure_ascii=False, indent=2)}## 执行步骤记录{' → '.join(state['completed_steps'])}""" return { **state, "final_report": report, "is_complete": True, "current_step": "complete", "completed_steps": state["completed_steps"] + ["report"] }def route_after_analysis(state: ResearchAgentState) -> str: """条件路由:根据分析结果决定下一步""" if state.get("errors"): return "error_handler" if state["requires_human_approval"]: return "human_review" return "report"def build_research_agent(): """构建研究Agent的状态图""" # 创建状态图 workflow = StateGraph(ResearchAgentState) # 添加节点 workflow.add_node("search", search_step) workflow.add_node("analysis", analysis_step) workflow.add_node("human_review", human_review_step) workflow.add_node("report", report_generation_step) # 定义边(流转关系) workflow.set_entry_point("search") workflow.add_edge("search", "analysis") # 条件边:分析后根据结果路由 workflow.add_conditional_edges( "analysis", route_after_analysis, { "human_review": "human_review", "report": "report", "error_handler": END } ) workflow.add_edge("human_review", "report") workflow.add_edge("report", END) # 添加检查点(持久化状态) memory = SqliteSaver.from_conn_string("agent_state.db") return workflow.compile(checkpointer=memory)# 执行Agentagent = build_research_agent()initial_state = { "task_id": "task_001", "task_description": "分析2026年AI市场趋势", "created_at": "2026-04-29", "current_step": "search", "completed_steps": [], "is_complete": False, "search_results": [], "analysis_results": {}, "final_report": None, "errors": [], "retry_count": 0, "requires_human_approval": False, "human_feedback": None}# 配置:thread_id用于恢复执行config = {"configurable": {"thread_id": "task_001"}}result = agent.invoke(initial_state, config=config)print(result["final_report"])### 断点续执行(Checkpointing)LangGraph的检查点机制允许任务在中断后恢复:python# 场景:任务执行到一半,进程被强制终止# 重新启动后,可以从最后的检查点恢复agent = build_research_agent()# 使用相同的thread_id恢复执行config = {"configurable": {"thread_id": "task_001"}}# 获取当前状态current_state = agent.get_state(config)print(f"当前步骤:{current_state.values.get('current_step')}")print(f"已完成步骤:{current_state.values.get('completed_steps')}")# 如果任务未完成,继续执行(None表示从检查点恢复,不需要重新传入初始状态)if not current_state.values.get('is_complete'): result = agent.invoke(None, config=config) print("任务恢复完成!")## 多智能体状态协调当多个Agent需要协作完成任务时,状态管理变得更加复杂。pythonfrom dataclasses import dataclass, field, asdictfrom typing import Dict, List, Any, Optionalimport jsonimport timeimport uuidfrom enum import Enumclass AgentStatus(Enum): IDLE = "idle" WORKING = "working" WAITING = "waiting" # 等待其他Agent的结果 BLOCKED = "blocked" # 等待人工介入 COMPLETED = "completed" FAILED = "failed"@dataclassclass AgentTask: """分配给单个Agent的子任务""" task_id: str agent_id: str task_type: str inputs: Dict[str, Any] dependencies: List[str] # 依赖的其他task_id status: AgentStatus = AgentStatus.IDLE outputs: Optional[Dict[str, Any]] = None error: Optional[str] = None started_at: Optional[float] = None completed_at: Optional[float] = Noneclass MultiAgentCoordinator: """ 多智能体协调器 管理多个Agent的任务分配、状态追踪和结果汇总 """ def __init__(self): self.tasks: Dict[str, AgentTask] = {} self.agents: Dict[str, Any] = {} self.global_context: Dict[str, Any] = {} def register_agent(self, agent_id: str, agent_instance: Any, capabilities: List[str]): """注册Agent及其能力""" self.agents[agent_id] = { "instance": agent_instance, "capabilities": capabilities, "status": AgentStatus.IDLE, "current_task": None } def submit_task( self, task_type: str, inputs: Dict[str, Any], dependencies: List[str] = None, agent_id: str = None ) -> str: """提交任务,返回task_id""" task_id = str(uuid.uuid4())[:8] # 自动分配Agent(如果没有指定) if not agent_id: agent_id = self._find_capable_agent(task_type) task = AgentTask( task_id=task_id, agent_id=agent_id, task_type=task_type, inputs=inputs, dependencies=dependencies or [] ) self.tasks[task_id] = task return task_id def _find_capable_agent(self, task_type: str) -> str: """找到有能力且空闲的Agent""" for agent_id, info in self.agents.items(): if (task_type in info["capabilities"] and info["status"] == AgentStatus.IDLE): return agent_id raise RuntimeError(f"没有可用的Agent处理任务类型:{task_type}") def _are_dependencies_met(self, task: AgentTask) -> bool: """检查任务的所有依赖是否已完成""" for dep_id in task.dependencies: dep_task = self.tasks.get(dep_id) if not dep_task or dep_task.status != AgentStatus.COMPLETED: return False return True def _collect_dependency_outputs(self, task: AgentTask) -> Dict[str, Any]: """收集依赖任务的输出,注入到当前任务的输入""" dep_outputs = {} for dep_id in task.dependencies: dep_task = self.tasks[dep_id] dep_outputs[dep_id] = dep_task.outputs return dep_outputs async def execute_ready_tasks(self): """执行所有就绪的任务(依赖已满足)""" import asyncio ready_tasks = [ task for task in self.tasks.values() if task.status == AgentStatus.IDLE and self._are_dependencies_met(task) ] if not ready_tasks: return # 并行执行所有就绪任务 async def run_task(task: AgentTask): agent_info = self.agents[task.agent_id] agent = agent_info["instance"] task.status = AgentStatus.WORKING task.started_at = time.time() agent_info["status"] = AgentStatus.WORKING agent_info["current_task"] = task.task_id # 注入依赖输出 enriched_inputs = { **task.inputs, "_dependency_outputs": self._collect_dependency_outputs(task) } try: # 执行任务 result = await agent.execute(task.task_type, enriched_inputs) task.outputs = result task.status = AgentStatus.COMPLETED task.completed_at = time.time() print(f" [完成] 任务 {task.task_id} ({task.task_type})") except Exception as e: task.error = str(e) task.status = AgentStatus.FAILED print(f" [失败] 任务 {task.task_id}: {e}") finally: agent_info["status"] = AgentStatus.IDLE agent_info["current_task"] = None await asyncio.gather(*[run_task(task) for task in ready_tasks]) async def run_until_complete(self, timeout: float = 300) -> Dict[str, Any]: """ 运行协调器直到所有任务完成或超时 返回所有任务的执行结果 """ start_time = time.time() while True: pending = [t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)] if not pending: break if time.time() - start_time > timeout: raise TimeoutError(f"任务执行超时 ({timeout}s)") await self.execute_ready_tasks() # 检查是否有死锁(所有pending任务都在等待) still_pending = [t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)] if still_pending and all(t.status != AgentStatus.IDLE for t in still_pending): # 可能存在循环依赖 break return { "completed": {tid: asdict(t) for tid, t in self.tasks.items() if t.status == AgentStatus.COMPLETED}, "failed": {tid: asdict(t) for tid, t in self.tasks.items() if t.status == AgentStatus.FAILED}, "pending": len([t for t in self.tasks.values() if t.status not in (AgentStatus.COMPLETED, AgentStatus.FAILED)]) }## 状态持久化策略pythonimport sqlite3import jsonfrom contextlib import contextmanagerclass AgentStateStore: """ Agent状态的持久化存储 支持任务断点续执行 """ def __init__(self, db_path: str = "agent_states.db"): self.db_path = db_path self._init_db() def _init_db(self): with self._conn() as conn: conn.execute(""" CREATE TABLE IF NOT EXISTS agent_states ( task_id TEXT PRIMARY KEY, state_json TEXT NOT NULL, checkpoint_step TEXT, updated_at REAL, is_complete INTEGER DEFAULT 0 ) """) conn.execute(""" CREATE INDEX IF NOT EXISTS idx_complete ON agent_states(is_complete) """) @contextmanager def _conn(self): conn = sqlite3.connect(self.db_path) try: yield conn conn.commit() except Exception: conn.rollback() raise finally: conn.close() def save_state(self, task_id: str, state: dict, checkpoint_step: str): with self._conn() as conn: conn.execute(""" INSERT OR REPLACE INTO agent_states (task_id, state_json, checkpoint_step, updated_at, is_complete) VALUES (?, ?, ?, ?, ?) """, ( task_id, json.dumps(state, ensure_ascii=False), checkpoint_step, time.time(), 1 if state.get("is_complete") else 0 )) def load_state(self, task_id: str) -> Optional[dict]: with self._conn() as conn: row = conn.execute( "SELECT state_json, checkpoint_step FROM agent_states WHERE task_id = ?", (task_id,) ).fetchone() if row: state = json.loads(row[0]) return state return None def get_incomplete_tasks(self) -> List[dict]: """获取所有未完成的任务(用于系统重启后恢复)""" with self._conn() as conn: rows = conn.execute( "SELECT task_id, state_json, checkpoint_step FROM agent_states WHERE is_complete = 0" ).fetchall() return [ {"task_id": r[0], "state": json.loads(r[1]), "checkpoint": r[2]} for r in rows ]## 总结Agent状态管理是让AI系统从"玩具"走向"生产"的关键工程能力:1. 状态分层:会话、任务、步骤、长期记忆各自独立管理,不要混淆2. LangGraph是当前最佳选择:内置检查点、条件路由、人工介入支持3. 持久化是必须:任务状态必须持久化,支持断点续执行4. 多Agent协调需要DAG:用有向无环图管理任务依赖,避免死锁5. 错误隔离:单个Agent失败不应影响整个系统,要有降级和重试策略状态管理复杂,但它是Agent工程能力的核心体现。做好状态管理,就做好了Agent可靠性的基础。
更多推荐




所有评论(0)