AI Agent 编排实战:从工具调用到多 Agent 协作的任务拆解与执行框架

一、单次对话搞不定复杂任务——Agent 编排的工程化刚需

让 AI 一次性完成"分析竞品定价策略、生成报告、发送邮件"这种多步骤任务,结果:要么步骤遗漏、要么上下文丢失、要么中间步骤出错后无法恢复。更常见的情况是,AI 自信地给出了一个看似完整但逻辑断裂的答案。

问题根源:大语言模型是单轮预测引擎,不是任务执行器。它擅长生成文本,不擅长规划步骤、调用工具、处理异常、维护状态。Agent 编排的本质,就是把 LLM 的生成能力包装成一个可控的任务执行框架。

Agent 编排的核心挑战:

  • 任务拆解:复杂任务如何分解为可执行的原子步骤
  • 工具调用:如何让 LLM 安全、可靠地调用外部 API
  • 状态管理:多步骤执行过程中如何维护上下文和中间结果
  • 异常恢复:某步骤失败时如何重试、回退或降级
  • 多 Agent 协作:不同角色的 Agent 如何分工与通信

二、AI Agent 编排的架构与执行机制

Agent 编排的核心模式:ReAct 循环(Reasoning + Acting)——思考下一步做什么,执行动作,观察结果,再思考。

graph TB
    subgraph "Agent 执行循环"
        A[接收任务] --> B[推理: 分析当前状态]
        B --> C[决策: 选择下一步动作]
        C --> D{动作类型?}
        D -->|工具调用| E[执行工具]
        D -->|子任务委派| F[委派给子 Agent]
        D -->|直接回答| G[生成最终结果]
        E --> H[观察: 获取执行结果]
        F --> H
        H --> I{任务完成?}
        I -->|否| B
        I -->|是| G
    end
    subgraph "工具注册表"
        J[搜索工具]
        K[数据库查询]
        L[文件读写]
        M[API 调用]
    end
    subgraph "状态管理"
        N[对话历史]
        O[中间结果]
        P[执行日志]
    end
    E --> J & K & L & M
    B --> N & O & P

编排模式对比:

模式 适用场景 复杂度 可控性
单 Agent + 工具 简单任务,步骤 < 5
多 Agent 串行 有明确依赖的流水线任务
多 Agent 并行 无依赖的子任务可并行
层级 Agent 复杂任务需分解-委派-汇总 中低

三、生产级 Agent 编排框架实现

3.1 工具定义与注册

from dataclasses import dataclass, field
from typing import Callable, Any
import json

@dataclass
class ToolDefinition:
    """工具定义:描述工具的输入输出规范,供 LLM 理解和调用"""

    name: str
    description: str                    # 工具功能描述,LLM 据此判断何时调用
    parameters: dict                    # JSON Schema 格式的参数定义
    required: list[str]                 # 必填参数列表
    executor: Callable                  # 实际执行函数
    timeout_seconds: int = 30           # 执行超时时间
    max_retries: int = 2                # 最大重试次数

class ToolRegistry:
    """工具注册表:集中管理所有可用工具"""

    def __init__(self):
        self._tools: dict[str, ToolDefinition] = {}

    def register(self, tool: ToolDefinition):
        """注册工具"""
        self._tools[tool.name] = tool

    def get(self, name: str) -> ToolDefinition | None:
        return self._tools.get(name)

    def get_all_schemas(self) -> list[dict]:
        """获取所有工具的 JSON Schema 描述,供 LLM 函数调用使用"""
        return [
            {
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters,
                    "required": tool.required,
                },
            }
            for tool in self._tools.values()
        ]

    async def execute(self, name: str, arguments: dict) -> Any:
        """执行工具调用,带超时和重试"""
        tool = self._tools.get(name)
        if not tool:
            raise ValueError(f"未知工具: {name}")

        # 参数校验:检查必填参数是否存在
        for param in tool.required:
            if param not in arguments:
                raise ValueError(f"工具 {name} 缺少必填参数: {param}")

        last_error = None
        for attempt in range(tool.max_retries + 1):
            try:
                import asyncio
                result = await asyncio.wait_for(
                    tool.executor(**arguments),
                    timeout=tool.timeout_seconds,
                )
                return result
            except asyncio.TimeoutError:
                last_error = f"工具 {name} 执行超时 ({tool.timeout_seconds}s)"
            except Exception as e:
                last_error = f"工具 {name} 执行失败: {str(e)}"

        raise RuntimeError(f"工具 {name} 重试耗尽: {last_error}")

3.2 ReAct Agent 核心

import asyncio
from enum import Enum
from dataclasses import dataclass, field

class ActionKind(Enum):
    TOOL_CALL = "tool_call"
    FINAL_ANSWER = "final_answer"

@dataclass
class AgentAction:
    """Agent 的动作决策"""
    kind: ActionKind
    tool_name: str | None = None
    tool_args: dict | None = None
    answer: str | None = None
    reasoning: str = ""                # 推理过程,用于可解释性

@dataclass
class AgentState:
    """Agent 执行状态:维护对话历史和中间结果"""
    task: str
    messages: list[dict] = field(default_factory=list)
    tool_results: list[dict] = field(default_factory=list)
    step_count: int = 0
    max_steps: int = 15                # 防止无限循环

class ReActAgent:
    """ReAct 模式 Agent:推理-行动-观察循环"""

    def __init__(self, llm_client, tool_registry: ToolRegistry):
        self.llm = llm_client
        self.tools = tool_registry

    async def run(self, task: str) -> str:
        """执行任务,返回最终结果"""
        state = AgentState(task=task)
        # 初始化系统提示词
        state.messages.append({
            "role": "system",
            "content": self._build_system_prompt(),
        })
        state.messages.append({
            "role": "user",
            "content": task,
        })

        while state.step_count < state.max_steps:
            state.step_count += 1

            # 推理:让 LLM 决定下一步动作
            action = await self._decide_action(state)

            if action.kind == ActionKind.FINAL_ANSWER:
                return action.answer or ""

            if action.kind == ActionKind.TOOL_CALL:
                # 行动:执行工具调用
                result = await self._execute_tool(action)

                # 观察:将工具结果加入上下文
                state.tool_results.append({
                    "step": state.step_count,
                    "tool": action.tool_name,
                    "args": action.tool_args,
                    "result": result,
                })
                state.messages.append({
                    "role": "assistant",
                    "content": f"调用工具 {action.tool_name},参数: {json.dumps(action.tool_args, ensure_ascii=False)}",
                })
                state.messages.append({
                    "role": "user",
                    "content": f"工具返回结果: {json.dumps(result, ensure_ascii=False)}",
                })

        return "任务执行超时:达到最大步骤数限制"

    async def _decide_action(self, state: AgentState) -> AgentAction:
        """让 LLM 决定下一步动作"""
        response = await self.llm.chat(
            messages=state.messages,
            tools=self.tools.get_all_schemas(),
            tool_choice="auto",
        )

        # 检查是否有工具调用
        if response.tool_calls:
            call = response.tool_calls[0]
            return AgentAction(
                kind=ActionKind.TOOL_CALL,
                tool_name=call.function.name,
                tool_args=json.loads(call.function.arguments),
                reasoning=response.content or "",
            )

        # 无工具调用,视为最终回答
        return AgentAction(
            kind=ActionKind.FINAL_ANSWER,
            answer=response.content or "",
            reasoning="",
        )

    async def _execute_tool(self, action: AgentAction) -> Any:
        """执行工具调用,捕获异常"""
        try:
            return await self.tools.execute(action.tool_name, action.tool_args or {})
        except Exception as e:
            # 工具执行失败,返回错误信息而非抛异常
            return {"error": str(e)}

    def _build_system_prompt(self) -> str:
        return """你是一个任务执行 Agent。根据用户任务,逐步推理并调用工具完成。

规则:
1. 每次只调用一个工具
2. 根据工具返回结果决定下一步
3. 如果工具返回错误,分析原因并尝试其他方案
4. 任务完成后给出最终答案
5. 不要编造工具返回结果"""

3.3 多 Agent 协作:层级编排

@dataclass
class SubTask:
    """子任务定义"""
    name: str
    description: str
    agent_role: str                    # 执行该任务的 Agent 角色
    dependencies: list[str] = field(default_factory=list)  # 依赖的子任务
    input_from: dict[str, str] = field(default_factory=dict)  # 从上游任务获取输入

class MultiAgentOrchestrator:
    """多 Agent 编排器:支持串行、并行和层级任务编排"""

    def __init__(self, agents: dict[str, ReActAgent]):
        self.agents = agents

    async def execute_pipeline(
        self,
        tasks: list[SubTask],
        initial_context: dict,
    ) -> dict:
        """执行任务流水线,自动处理依赖关系"""

        completed: dict[str, Any] = {}
        results: dict[str, Any] = {}

        # 拓扑排序:按依赖关系确定执行顺序
        execution_order = self._topological_sort(tasks)

        for task_name in execution_order:
            task = next(t for t in tasks if t.name == task_name)
            agent = self.agents.get(task.agent_role)
            if not agent:
                raise ValueError(f"未注册的 Agent 角色: {task.agent_role}")

            # 构建子任务输入:从上游结果中提取
            task_input = self._build_task_input(task, initial_context, results)

            # 执行子任务
            result = await agent.run(task_input)
            results[task.name] = result
            completed[task.name] = True

        return results

    def _topological_sort(self, tasks: list[SubTask]) -> list[str]:
        """拓扑排序:确保被依赖的任务先执行"""
        in_degree: dict[str, int] = {t.name: 0 for t in tasks}
        graph: dict[str, list[str]] = {t.name: [] for t in tasks}

        for task in tasks:
            for dep in task.dependencies:
                graph[dep].append(task.name)
                in_degree[task.name] += 1

        queue = [name for name, deg in in_degree.items() if deg == 0]
        order = []

        while queue:
            current = queue.pop(0)
            order.append(current)
            for neighbor in graph[current]:
                in_degree[neighbor] -= 1
                if in_degree[neighbor] == 0:
                    queue.append(neighbor)

        if len(order) != len(tasks):
            raise ValueError("任务依赖关系存在环,无法排序")

        return order

    def _build_task_input(
        self,
        task: SubTask,
        initial_context: dict,
        completed_results: dict,
    ) -> str:
        """构建子任务输入,从上游结果中提取所需数据"""
        parts = [f"任务: {task.description}"]

        for param_name, source_path in task.input_from.items():
            # source_path 格式: "task_name.field_name"
            source_task, field = source_path.split(".", 1)
            if source_task in completed_results:
                parts.append(f"输入参数 {param_name}: {completed_results[source_task]}")

        if initial_context:
            parts.append(f"初始上下文: {json.dumps(initial_context, ensure_ascii=False)}")

        return "\n".join(parts)

四、Agent 编排的架构权衡与边界

ReAct 循环的可靠性问题

  • 无限循环:Agent 可能反复调用同一工具而不收敛。必须设置 max_steps 硬限制
  • 幻觉工具调用:LLM 可能生成不存在的工具名或参数。需要在执行前校验
  • 上下文窗口溢出:多轮工具调用后,对话历史超出模型上下文窗口。需要摘要压缩

多 Agent 协作的通信开销

  • 串行编排延迟叠加:3 个子任务各需 10 秒,总耗时 30 秒
  • 并行编排需要无依赖前提:有依赖关系的子任务无法并行
  • Agent 间信息传递有损:子 Agent 的输出被压缩为文本传给下游,结构化信息可能丢失

工具调用的安全边界

  • LLM 生成的参数可能包含注入攻击(如 SQL 注入、命令注入)
  • 工具执行必须有权限隔离:只允许访问必要的资源
  • 敏感操作(删除、支付)需要人工确认环节

禁用场景

  • 确定性任务:规则明确、逻辑固定的任务,用传统代码实现更可靠
  • 实时性要求高的任务:LLM 推理延迟 1-5 秒,不适合毫秒级响应
  • 安全敏感任务:金融交易、医疗诊断等场景,Agent 的不确定性不可接受

五、总结

AI Agent 编排的核心框架:ReAct 循环(推理-行动-观察)解决单 Agent 的任务执行问题,层级编排(分解-委派-汇总)解决多 Agent 的协作问题。工具注册表提供标准化的工具定义和执行接口,AgentState 维护多步骤执行的状态和上下文。ReAct 循环必须设置最大步骤数防止无限循环,工具调用需要参数校验和权限隔离。多 Agent 编排通过拓扑排序自动处理任务依赖,但串行编排的延迟叠加和 Agent 间信息传递的有损性是架构瓶颈。Agent 编排适用于步骤不确定、需要工具调用的复杂任务,确定性和实时性要求高的场景应使用传统代码实现。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐