AI Agent 编排实战:从工具调用到多 Agent 协作的任务拆解与执行框架
·
AI Agent 编排实战:从工具调用到多 Agent 协作的任务拆解与执行框架
一、单次对话搞不定复杂任务——Agent 编排的工程化刚需
让 AI 一次性完成"分析竞品定价策略、生成报告、发送邮件"这种多步骤任务,结果:要么步骤遗漏、要么上下文丢失、要么中间步骤出错后无法恢复。更常见的情况是,AI 自信地给出了一个看似完整但逻辑断裂的答案。
问题根源:大语言模型是单轮预测引擎,不是任务执行器。它擅长生成文本,不擅长规划步骤、调用工具、处理异常、维护状态。Agent 编排的本质,就是把 LLM 的生成能力包装成一个可控的任务执行框架。
Agent 编排的核心挑战:
- 任务拆解:复杂任务如何分解为可执行的原子步骤
- 工具调用:如何让 LLM 安全、可靠地调用外部 API
- 状态管理:多步骤执行过程中如何维护上下文和中间结果
- 异常恢复:某步骤失败时如何重试、回退或降级
- 多 Agent 协作:不同角色的 Agent 如何分工与通信
二、AI Agent 编排的架构与执行机制
Agent 编排的核心模式:ReAct 循环(Reasoning + Acting)——思考下一步做什么,执行动作,观察结果,再思考。
graph TB
subgraph "Agent 执行循环"
A[接收任务] --> B[推理: 分析当前状态]
B --> C[决策: 选择下一步动作]
C --> D{动作类型?}
D -->|工具调用| E[执行工具]
D -->|子任务委派| F[委派给子 Agent]
D -->|直接回答| G[生成最终结果]
E --> H[观察: 获取执行结果]
F --> H
H --> I{任务完成?}
I -->|否| B
I -->|是| G
end
subgraph "工具注册表"
J[搜索工具]
K[数据库查询]
L[文件读写]
M[API 调用]
end
subgraph "状态管理"
N[对话历史]
O[中间结果]
P[执行日志]
end
E --> J & K & L & M
B --> N & O & P
编排模式对比:
| 模式 | 适用场景 | 复杂度 | 可控性 |
|---|---|---|---|
| 单 Agent + 工具 | 简单任务,步骤 < 5 | 低 | 高 |
| 多 Agent 串行 | 有明确依赖的流水线任务 | 中 | 高 |
| 多 Agent 并行 | 无依赖的子任务可并行 | 中 | 中 |
| 层级 Agent | 复杂任务需分解-委派-汇总 | 高 | 中低 |
三、生产级 Agent 编排框架实现
3.1 工具定义与注册
from dataclasses import dataclass, field
from typing import Callable, Any
import json
@dataclass
class ToolDefinition:
"""工具定义:描述工具的输入输出规范,供 LLM 理解和调用"""
name: str
description: str # 工具功能描述,LLM 据此判断何时调用
parameters: dict # JSON Schema 格式的参数定义
required: list[str] # 必填参数列表
executor: Callable # 实际执行函数
timeout_seconds: int = 30 # 执行超时时间
max_retries: int = 2 # 最大重试次数
class ToolRegistry:
"""工具注册表:集中管理所有可用工具"""
def __init__(self):
self._tools: dict[str, ToolDefinition] = {}
def register(self, tool: ToolDefinition):
"""注册工具"""
self._tools[tool.name] = tool
def get(self, name: str) -> ToolDefinition | None:
return self._tools.get(name)
def get_all_schemas(self) -> list[dict]:
"""获取所有工具的 JSON Schema 描述,供 LLM 函数调用使用"""
return [
{
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters,
"required": tool.required,
},
}
for tool in self._tools.values()
]
async def execute(self, name: str, arguments: dict) -> Any:
"""执行工具调用,带超时和重试"""
tool = self._tools.get(name)
if not tool:
raise ValueError(f"未知工具: {name}")
# 参数校验:检查必填参数是否存在
for param in tool.required:
if param not in arguments:
raise ValueError(f"工具 {name} 缺少必填参数: {param}")
last_error = None
for attempt in range(tool.max_retries + 1):
try:
import asyncio
result = await asyncio.wait_for(
tool.executor(**arguments),
timeout=tool.timeout_seconds,
)
return result
except asyncio.TimeoutError:
last_error = f"工具 {name} 执行超时 ({tool.timeout_seconds}s)"
except Exception as e:
last_error = f"工具 {name} 执行失败: {str(e)}"
raise RuntimeError(f"工具 {name} 重试耗尽: {last_error}")
3.2 ReAct Agent 核心
import asyncio
from enum import Enum
from dataclasses import dataclass, field
class ActionKind(Enum):
TOOL_CALL = "tool_call"
FINAL_ANSWER = "final_answer"
@dataclass
class AgentAction:
"""Agent 的动作决策"""
kind: ActionKind
tool_name: str | None = None
tool_args: dict | None = None
answer: str | None = None
reasoning: str = "" # 推理过程,用于可解释性
@dataclass
class AgentState:
"""Agent 执行状态:维护对话历史和中间结果"""
task: str
messages: list[dict] = field(default_factory=list)
tool_results: list[dict] = field(default_factory=list)
step_count: int = 0
max_steps: int = 15 # 防止无限循环
class ReActAgent:
"""ReAct 模式 Agent:推理-行动-观察循环"""
def __init__(self, llm_client, tool_registry: ToolRegistry):
self.llm = llm_client
self.tools = tool_registry
async def run(self, task: str) -> str:
"""执行任务,返回最终结果"""
state = AgentState(task=task)
# 初始化系统提示词
state.messages.append({
"role": "system",
"content": self._build_system_prompt(),
})
state.messages.append({
"role": "user",
"content": task,
})
while state.step_count < state.max_steps:
state.step_count += 1
# 推理:让 LLM 决定下一步动作
action = await self._decide_action(state)
if action.kind == ActionKind.FINAL_ANSWER:
return action.answer or ""
if action.kind == ActionKind.TOOL_CALL:
# 行动:执行工具调用
result = await self._execute_tool(action)
# 观察:将工具结果加入上下文
state.tool_results.append({
"step": state.step_count,
"tool": action.tool_name,
"args": action.tool_args,
"result": result,
})
state.messages.append({
"role": "assistant",
"content": f"调用工具 {action.tool_name},参数: {json.dumps(action.tool_args, ensure_ascii=False)}",
})
state.messages.append({
"role": "user",
"content": f"工具返回结果: {json.dumps(result, ensure_ascii=False)}",
})
return "任务执行超时:达到最大步骤数限制"
async def _decide_action(self, state: AgentState) -> AgentAction:
"""让 LLM 决定下一步动作"""
response = await self.llm.chat(
messages=state.messages,
tools=self.tools.get_all_schemas(),
tool_choice="auto",
)
# 检查是否有工具调用
if response.tool_calls:
call = response.tool_calls[0]
return AgentAction(
kind=ActionKind.TOOL_CALL,
tool_name=call.function.name,
tool_args=json.loads(call.function.arguments),
reasoning=response.content or "",
)
# 无工具调用,视为最终回答
return AgentAction(
kind=ActionKind.FINAL_ANSWER,
answer=response.content or "",
reasoning="",
)
async def _execute_tool(self, action: AgentAction) -> Any:
"""执行工具调用,捕获异常"""
try:
return await self.tools.execute(action.tool_name, action.tool_args or {})
except Exception as e:
# 工具执行失败,返回错误信息而非抛异常
return {"error": str(e)}
def _build_system_prompt(self) -> str:
return """你是一个任务执行 Agent。根据用户任务,逐步推理并调用工具完成。
规则:
1. 每次只调用一个工具
2. 根据工具返回结果决定下一步
3. 如果工具返回错误,分析原因并尝试其他方案
4. 任务完成后给出最终答案
5. 不要编造工具返回结果"""
3.3 多 Agent 协作:层级编排
@dataclass
class SubTask:
"""子任务定义"""
name: str
description: str
agent_role: str # 执行该任务的 Agent 角色
dependencies: list[str] = field(default_factory=list) # 依赖的子任务
input_from: dict[str, str] = field(default_factory=dict) # 从上游任务获取输入
class MultiAgentOrchestrator:
"""多 Agent 编排器:支持串行、并行和层级任务编排"""
def __init__(self, agents: dict[str, ReActAgent]):
self.agents = agents
async def execute_pipeline(
self,
tasks: list[SubTask],
initial_context: dict,
) -> dict:
"""执行任务流水线,自动处理依赖关系"""
completed: dict[str, Any] = {}
results: dict[str, Any] = {}
# 拓扑排序:按依赖关系确定执行顺序
execution_order = self._topological_sort(tasks)
for task_name in execution_order:
task = next(t for t in tasks if t.name == task_name)
agent = self.agents.get(task.agent_role)
if not agent:
raise ValueError(f"未注册的 Agent 角色: {task.agent_role}")
# 构建子任务输入:从上游结果中提取
task_input = self._build_task_input(task, initial_context, results)
# 执行子任务
result = await agent.run(task_input)
results[task.name] = result
completed[task.name] = True
return results
def _topological_sort(self, tasks: list[SubTask]) -> list[str]:
"""拓扑排序:确保被依赖的任务先执行"""
in_degree: dict[str, int] = {t.name: 0 for t in tasks}
graph: dict[str, list[str]] = {t.name: [] for t in tasks}
for task in tasks:
for dep in task.dependencies:
graph[dep].append(task.name)
in_degree[task.name] += 1
queue = [name for name, deg in in_degree.items() if deg == 0]
order = []
while queue:
current = queue.pop(0)
order.append(current)
for neighbor in graph[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(order) != len(tasks):
raise ValueError("任务依赖关系存在环,无法排序")
return order
def _build_task_input(
self,
task: SubTask,
initial_context: dict,
completed_results: dict,
) -> str:
"""构建子任务输入,从上游结果中提取所需数据"""
parts = [f"任务: {task.description}"]
for param_name, source_path in task.input_from.items():
# source_path 格式: "task_name.field_name"
source_task, field = source_path.split(".", 1)
if source_task in completed_results:
parts.append(f"输入参数 {param_name}: {completed_results[source_task]}")
if initial_context:
parts.append(f"初始上下文: {json.dumps(initial_context, ensure_ascii=False)}")
return "\n".join(parts)
四、Agent 编排的架构权衡与边界
ReAct 循环的可靠性问题:
- 无限循环:Agent 可能反复调用同一工具而不收敛。必须设置
max_steps硬限制 - 幻觉工具调用:LLM 可能生成不存在的工具名或参数。需要在执行前校验
- 上下文窗口溢出:多轮工具调用后,对话历史超出模型上下文窗口。需要摘要压缩
多 Agent 协作的通信开销:
- 串行编排延迟叠加:3 个子任务各需 10 秒,总耗时 30 秒
- 并行编排需要无依赖前提:有依赖关系的子任务无法并行
- Agent 间信息传递有损:子 Agent 的输出被压缩为文本传给下游,结构化信息可能丢失
工具调用的安全边界:
- LLM 生成的参数可能包含注入攻击(如 SQL 注入、命令注入)
- 工具执行必须有权限隔离:只允许访问必要的资源
- 敏感操作(删除、支付)需要人工确认环节
禁用场景:
- 确定性任务:规则明确、逻辑固定的任务,用传统代码实现更可靠
- 实时性要求高的任务:LLM 推理延迟 1-5 秒,不适合毫秒级响应
- 安全敏感任务:金融交易、医疗诊断等场景,Agent 的不确定性不可接受
五、总结
AI Agent 编排的核心框架:ReAct 循环(推理-行动-观察)解决单 Agent 的任务执行问题,层级编排(分解-委派-汇总)解决多 Agent 的协作问题。工具注册表提供标准化的工具定义和执行接口,AgentState 维护多步骤执行的状态和上下文。ReAct 循环必须设置最大步骤数防止无限循环,工具调用需要参数校验和权限隔离。多 Agent 编排通过拓扑排序自动处理任务依赖,但串行编排的延迟叠加和 Agent 间信息传递的有损性是架构瓶颈。Agent 编排适用于步骤不确定、需要工具调用的复杂任务,确定性和实时性要求高的场景应使用传统代码实现。
更多推荐




所有评论(0)