AI Agent 系统设计:从单链推理到多智能体协作的架构演进
AI Agent 系统设计:从单链推理到多智能体协作的架构演进

一、从 Chatbot 到 Agent:大模型应用的范式跃迁
大模型的能力边界正在从"对话生成"向"自主决策"迁移。一个仅能回答问题的 Chatbot,本质上是无状态的文本映射器;而一个 Agent 能够感知环境、制定计划、调用工具、根据反馈调整策略,形成闭环决策。这个跃迁的核心,在于将大模型从被动的"应答器"升级为主动的"决策引擎"。
在工程实践中,单 Agent 系统面临三个瓶颈:单一模型的推理能力有限,复杂任务容易在长链推理中出错;单点执行无法并行处理子任务,效率低下;单一角色的知识覆盖面有限,跨领域任务力不从心。多智能体协作(Multi-Agent)通过角色分工、消息传递和任务分解,突破了这些瓶颈,但同时也引入了协调成本和一致性挑战。
二、Agent 架构演进:从 ReAct 到多智能体编排
Agent 的架构经历了从单链推理到多智能体协作的演进。理解每种架构的适用场景,是系统设计的基础。
flowchart TB
subgraph ReAct单链架构
direction TB
R1[观察 Observation] --> R2[思考 Thought]
R2 --> R3[行动 Action]
R3 --> R4[观察 Observation]
R4 --> R2
end
subgraph Plan-Execute架构
direction TB
P1[任务输入] --> P2[规划器 Planner]
P2 --> P3[步骤1]
P2 --> P4[步骤2]
P2 --> P5[步骤3]
P3 --> P6[执行器 Executor]
P4 --> P6
P5 --> P6
P6 --> P7[重新规划 Replan]
P7 --> P2
end
subgraph 多智能体架构
direction TB
M1[协调者 Orchestrator] --> M2[搜索Agent]
M1 --> M3[编码Agent]
M1 --> M4[评审Agent]
M2 --> M5[消息总线]
M3 --> M5
M4 --> M5
M5 --> M1
end
style R1 fill:#ff6b6b,color:#fff
style P2 fill:#4ecdc4,color:#fff
style M1 fill:#45b7d1,color:#fff
ReAct 架构适合简单任务,每步决策后立即执行;Plan-Execute 架构适合可分解的复杂任务,先规划后执行;多智能体架构适合需要多角色协作的开放式任务,各 Agent 独立运行、通过消息总线通信。
三、生产级多智能体系统:从框架到实现的完整方案
3.1 Agent 基类与消息协议
from abc import ABC, abstractmethod
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Dict, List, Optional
import asyncio
import uuid
import time
class MessageType(Enum):
"""消息类型枚举,定义 Agent 间通信的语义"""
TASK_ASSIGN = "task_assign" # 任务分配
TASK_RESULT = "task_result" # 任务结果
COLLABORATION = "collaboration" # 协作请求
FEEDBACK = "feedback" # 反馈信息
ERROR = "error" # 错误报告
@dataclass
class AgentMessage:
"""Agent 间通信的标准消息格式"""
sender: str # 发送者 ID
receiver: str # 接收者 ID(广播时为 "all")
msg_type: MessageType # 消息类型
content: Any # 消息内容
task_id: str = field(default_factory=lambda: str(uuid.uuid4())[:8])
timestamp: float = field(default_factory=time.time)
metadata: Dict[str, Any] = field(default_factory=dict)
class BaseAgent(ABC):
"""Agent 基类:定义智能体的核心接口与生命周期"""
def __init__(self, agent_id: str, role: str, capabilities: List[str]):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_queue: asyncio.Queue = asyncio.Queue()
self._running = False
@abstractmethod
async def process_message(self, message: AgentMessage) -> Optional[AgentMessage]:
"""处理收到的消息,返回响应消息或 None"""
pass
@abstractmethod
async def execute_task(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行具体任务,返回结果"""
pass
async def send_message(self, receiver: str, msg_type: MessageType,
content: Any, bus: "MessageBus") -> None:
"""通过消息总线发送消息"""
msg = AgentMessage(
sender=self.agent_id,
receiver=receiver,
msg_type=msg_type,
content=content,
)
await bus.publish(msg)
async def run(self, bus: "MessageBus"):
"""Agent 主循环:持续监听并处理消息"""
self._running = True
while self._running:
try:
# 带超时的消息等待,避免永久阻塞
message = await asyncio.wait_for(
self.message_queue.get(), timeout=1.0
)
response = await self.process_message(message)
if response:
await bus.publish(response)
except asyncio.TimeoutError:
continue
except Exception as e:
# 错误不中断主循环,通过消息总线报告
error_msg = AgentMessage(
sender=self.agent_id,
receiver="orchestrator",
msg_type=MessageType.ERROR,
content={"error": str(e), "original_task": message.content},
)
await bus.publish(error_msg)
3.2 消息总线与协调器
class MessageBus:
"""异步消息总线:支持点对点和广播通信"""
def __init__(self):
self._subscribers: Dict[str, asyncio.Queue] = {}
self._history: List[AgentMessage] = []
self._max_history = 1000 # 保留最近 1000 条消息用于审计
def register(self, agent_id: str) -> asyncio.Queue:
"""注册 Agent,返回其专属消息队列"""
queue = asyncio.Queue()
self._subscribers[agent_id] = queue
return queue
async def publish(self, message: AgentMessage) -> None:
"""发布消息到目标 Agent 或广播"""
self._history.append(message)
if len(self._history) > self._max_history:
self._history = self._history[-self._max_history:]
if message.receiver == "all":
# 广播:发送给所有已注册的 Agent
for agent_id, queue in self._subscribers.items():
if agent_id != message.sender:
await queue.put(message)
elif message.receiver in self._subscribers:
await self._subscribers[message.receiver].put(message)
class Orchestrator:
"""多智能体协调器:负责任务分解、分配和结果聚合"""
def __init__(self, bus: MessageBus, max_retries: int = 2):
self.bus = bus
self.agents: Dict[str, BaseAgent] = {}
self.max_retries = max_retries
self._task_results: Dict[str, List[Dict]] = {}
def register_agent(self, agent: BaseAgent):
"""注册 Agent 并连接消息总线"""
queue = self.bus.register(agent.agent_id)
agent.message_queue = queue
self.agents[agent.agent_id] = agent
async def decompose_task(self, task: Dict[str, Any]) -> List[Dict[str, Any]]:
"""
任务分解:根据 Agent 能力将复杂任务拆分为子任务
这里使用基于规则的分解策略,生产中可替换为 LLM 驱动的分解
"""
subtasks = []
task_type = task.get("type", "")
if task_type == "research_and_report":
# 研究报告类任务:搜索 + 分析 + 撰写 + 评审
subtasks = [
{"type": "search", "query": task["query"],
"assigned_to": "search_agent"},
{"type": "analyze", "depends_on": "search",
"assigned_to": "analysis_agent"},
{"type": "write", "depends_on": "analyze",
"assigned_to": "writer_agent"},
{"type": "review", "depends_on": "write",
"assigned_to": "reviewer_agent"},
]
elif task_type == "code_review":
subtasks = [
{"type": "static_analysis", "code": task["code"],
"assigned_to": "code_agent"},
{"type": "security_check", "depends_on": "static_analysis",
"assigned_to": "security_agent"},
]
return subtasks
async def execute_pipeline(self, task: Dict[str, Any]) -> Dict[str, Any]:
"""执行完整的任务管线,处理依赖关系和错误重试"""
subtasks = await self.decompose_task(task)
results = {}
retry_count = {}
for subtask in subtasks:
# 等待依赖任务完成
depends_on = subtask.get("depends_on")
if depends_on and depends_on not in results:
return {"error": f"依赖任务 {depends_on} 未完成"}
# 将依赖结果注入当前子任务
if depends_on:
subtask["context"] = results[depends_on]
agent_id = subtask["assigned_to"]
if agent_id not in self.agents:
return {"error": f"Agent {agent_id} 未注册"}
# 带重试的任务执行
agent = self.agents[agent_id]
attempt = retry_count.get(agent_id, 0)
try:
result = await asyncio.wait_for(
agent.execute_task(subtask),
timeout=60.0,
)
results[subtask["type"]] = result
except asyncio.TimeoutError:
if attempt < self.max_retries:
retry_count[agent_id] = attempt + 1
continue
results[subtask["type"]] = {"error": "执行超时,重试耗尽"}
except Exception as e:
results[subtask["type"]] = {"error": str(e)}
return {"status": "completed", "results": results}
四、多智能体系统的代价:协调开销与一致性困境
多智能体架构并非银弹,引入的复杂性可能超过收益。
协调开销:任务分解、消息传递和结果聚合都需要额外的计算和延迟。一个简单的查询任务,单 Agent 直接处理可能只需 2 秒,而经过多 Agent 协作可能需要 8 秒。当子任务间的依赖关系复杂时,串行等待时间会显著拉长。只有当任务确实需要多角色协作时,多 Agent 架构才值得使用。
一致性风险:多个 Agent 可能对同一问题给出矛盾的回答。例如搜索 Agent 找到的信息与分析 Agent 的结论冲突,此时协调者需要额外的仲裁逻辑。在缺乏明确优先级规则的情况下,仲裁本身可能引入新的不确定性。
调试困难:多 Agent 系统的行为由大量异步消息交互决定,错误可能出现在消息传递的任何环节。一条消息的延迟可能导致后续 Agent 超时,而超时又触发重试,形成级联效应。追踪问题根因需要完整的消息日志和因果链分析。
成本膨胀:每个 Agent 调用都需要消耗 LLM Token。4 个 Agent 的系统,Token 消耗是单 Agent 的 4 倍以上。在成本敏感的生产环境中,必须严格评估多 Agent 的收益是否覆盖额外成本。
五、总结
AI Agent 系统的架构选择取决于任务复杂度。简单任务用 ReAct 单链即可,可分解任务用 Plan-Execute 架构,需要多角色协作的开放式任务才考虑多智能体架构。架构越复杂,协调成本越高,必须确保收益覆盖成本。
落地路线建议:从单 Agent 的 ReAct 循环起步,验证核心推理能力;当任务链过长导致错误累积时,升级为 Plan-Execute 架构,增加重新规划机制;当单角色知识不足时,引入多智能体协作,但严格控制 Agent 数量(3-5 个为宜);始终保留完整的消息日志,用于调试和审计;设置合理的超时和重试策略,防止级联故障。
更多推荐


所有评论(0)