企业级 AI Agent 架构:任务拆解、工具调用与多轮对话的工程实现
一、引言
2024年以来,以 GPT-4、Claude 为代表的大语言模型(LLM)能力持续跃升,AI Agent 已从概念验证走向生产落地。然而,将 LLM 的能力真正嵌入企业业务系统,远不止调用一个 Chat Completion API 那么简单。一个真正可用的企业级 Agent,需要在任务拆解、工具调用、多轮对话三个核心维度上具备健壮的工程实现。
本文将深入探讨这三者的设计思路,并结合完整的 Python 代码,构建一个可直接参考的企业级 Agent 框架。
二、整体架构设计
企业级 AI Agent 一般采用分层架构,核心分为四层:
┌──────────────────────────────────────────────┐
│ 接入层 (API Gateway) │
│ REST / WebSocket / gRPC │
├──────────────────────────────────────────────┤
│ 编排层 (Orchestrator) │
│ 任务拆解 → 工具路由 → 对话管理 → 结果聚合 │
├──────────────────────────────────────────────┤
│ 能力层 (Capabilities) │
│ LLM推理 │ 工具注册 │ 记忆管理 │ 安全审计 │
├──────────────────────────────────────────────┤
│ 基础设施层 (Infra) │
│ 向量数据库 │ 消息队列 │ 配置中心 │ 监控告警 │
└──────────────────────────────────────────────┘
编排层是整个 Agent 的核心大脑,我们下面围绕它展开。
三、任务拆解:从自然语言到 DAG
3.1 设计思路
企业场景中,用户的一个指令往往隐含多步操作。例如:“帮我分析上周的销售数据,生成报告并发送给张经理”——这至少涉及数据查询、分析计算、报告生成、邮件发送四个子任务。
任务拆解的核心是将模糊指令转化为有向无环图(DAG),其中节点是原子操作,边代表依赖关系。
3.2 核心数据结构
from future import annotations
from dataclasses import dataclass, field
from enum import Enum
from typing import Any, Callable, Dict, List, Optional
import asyncio
import json
class TaskStatus(Enum):
PENDING = “pending”
RUNNING = “running”
COMPLETED = “completed”
FAILED = “failed”
SKIPPED = “skipped”
@dataclass
class SubTask:
“”“子任务定义”“”
task_id: str
name: str
description: str
tool_name: str # 关联的工具名称
arguments: Dict[str, Any] = field(default_factory=dict)
depends_on: List[str] = field(default_factory=list) # 前置任务 ID 列表
status: TaskStatus = TaskStatus.PENDING
result: Any = None
error: Optional[str] = None
retry_count: int = 0
max_retries: int = 2
@dataclass
class TaskPlan:
“”“LLM 生成的任务计划”“”
original_query: str
sub_tasks: List[SubTask]
reasoning: str = “”
3.3 基于 LLM 的任务规划器
我们让 LLM 输出结构化的 JSON 计划,而非自由文本:
TASK_PLANNER_PROMPT = “”"你是一个任务规划专家。请将用户的请求拆解为多个可执行的子任务。
要求:
- 每个子任务对应一个具体的工具调用
- 明确子任务之间的依赖关系
- 以严格的 JSON 格式输出
可用工具列表:
{tools_description}
用户请求:{user_query}
请输出如下格式的 JSON(不要包含其他内容):
{{
“reasoning”: “拆解思路…”,
“sub_tasks”: [
{{
“task_id”: “task_1”,
“name”: “子任务名称”,
“description”: “详细描述”,
“tool_name”: “对应工具名”,
“arguments”: {{}},
“depends_on”: []
}}
]
}}
“”"
class TaskPlanner:
“”“基于 LLM 的任务规划器”“”
def __init__(self, llm_client, tool_registry: "ToolRegistry"):
self.llm = llm_client
self.tool_registry = tool_registry
def _build_tools_description(self) -> str:
"""构建工具描述文本,供 LLM 理解可用能力"""
descriptions = []
for tool in self.tool_registry.list_tools():
descriptions.append(
f"- {tool.name}: {tool.description}\n"
f" 参数: {json.dumps(tool.parameters_schema, ensure_ascii=False)}"
)
return "\n".join(descriptions)
async def plan(self, user_query: str) -> TaskPlan:
"""调用 LLM 生成任务计划"""
prompt = TASK_PLANNER_PROMPT.format(
tools_description=self._build_tools_description(),
user_query=user_query,
)
response = await self.llm.chat(prompt)
plan_dict = self._parse_response(response)
sub_tasks = [
SubTask(
task_id=t["task_id"],
name=t["name"],
description=t["description"],
tool_name=t["tool_name"],
arguments=t.get("arguments", {}),
depends_on=t.get("depends_on", []),
)
for t in plan_dict["sub_tasks"]
]
return TaskPlan(
original_query=user_query,
sub_tasks=sub_tasks,
reasoning=plan_dict.get("reasoning", ""),
)
def _parse_response(self, response: str) -> dict:
"""从 LLM 响应中提取 JSON(容错处理)"""
# 尝试直接解析
try:
return json.loads(response)
except json.JSONDecodeError:
pass
# 尝试提取 ```json ... ```中的内容
import re
match = re.search(r"```(?:json)?\s*([\s\S]*?)```", response)
if match:
return json.loads(match.group(1))
raise ValueError(f"无法解析 LLM 返回的任务计划: {response[:200]}")
3.4 DAG 执行引擎
有了任务计划,需要一个按照依赖关系调度执行的引擎:
class DAGExecutor:
“”“DAG 执行引擎 —— 按拓扑顺序并行执行无依赖的子任务”“”
def __init__(self, tool_registry: "ToolRegistry"):
self.tool_registry = tool_registry
async def execute(self, plan: TaskPlan) -> Dict[str, Any]:
"""执行整个任务计划"""
task_map: Dict[str, SubTask] = {t.task_id: t for t in plan.sub_tasks}
completed: Dict[str, Any] = {} # task_id → result
in_flight: Dict[str, asyncio.Task] = {}
while len(completed) + sum(
1 for t in plan.sub_tasks if t.status == TaskStatus.FAILED
) < len(plan.sub_tasks):
# 找出所有依赖已满足且尚未执行的任务
ready_tasks = []
for task in plan.sub_tasks:
if task.task_id in completed:
continue
if task.status in (TaskStatus.RUNNING, TaskStatus.FAILED):
continue
if all(dep in completed for dep in task.depends_on):
ready_tasks.append(task)
if not ready_tasks and not in_flight:
# 没有可执行的任务且没有进行中的任务 → 死锁或全部完成
break
# 并行启动所有就绪任务
for task in ready_tasks:
task.status = TaskStatus.RUNNING
coro = self._execute_single_task(task, completed)
in_flight[task.task_id] = asyncio.create_task(coro)
# 等待至少一个任务完成
done, _ = await asyncio.wait(
in_flight.values(), return_when=asyncio.FIRST_COMPLETED
)
for finished in done:
tid, result = finished.result()
completed[tid] = result
del in_flight[tid]
return completed
async def _execute_single_task(
self, task: SubTask, context: Dict[str, Any]
) -> tuple:
"""执行单个子任务,支持重试和上下文注入"""
tool = self.tool_registry.get(task.tool_name)
if not tool:
task.status = TaskStatus.FAILED
task.error = f"工具 {task.tool_name} 未注册"
return task.task_id, None
# 将前置任务的结果注入参数(支持模板变量如 {{task_1.result}})
resolved_args = self._resolve_arguments(task.arguments, context)
for attempt in range(task.max_retries + 1):
try:
result = await tool.execute(**resolved_args)
task.status = TaskStatus.COMPLETED
task.result = result
return task.task_id, result
except Exception as e:
task.retry_count = attempt + 1
if attempt >= task.max_retries:
task.status = TaskStatus.FAILED
task.error = str(e)
return task.task_id, None
await asyncio.sleep(2 ** attempt) # 指数退避
return task.task_id, None
@staticmethod
def _resolve_arguments(
arguments: Dict[str, Any], context: Dict[str, Any]
) -> Dict[str, Any]:
"""解析参数中的模板变量,注入前置任务结果"""
resolved = {}
for key, value in arguments.items():
if isinstance(value, str) and value.startswith("{{") and value.endswith("}}"):
ref = value.strip("{} ").strip()
# 支持 {{task_1.result.field}} 语法
parts = ref.split(".")
current = context
for part in parts:
if isinstance(current, dict):
current = current.get(part)
else:
current = getattr(current, part, None)
resolved[key] = current
else:
resolved[key] = value
return resolved
四、工具调用:Function Calling 的工程化封装
4.1 工具注册中心
企业场景下工具数量多、类型杂(内部 API、数据库查询、第三方服务等),需要一个统一的注册与发现机制:
@dataclass
class ToolDefinition:
“”“工具定义”“”
name: str
description: str
parameters_schema: Dict[str, Any] # JSON Schema 格式
handler: Callable # 实际执行函数
require_confirmation: bool = False # 是否需要用户确认(敏感操作)
timeout_seconds: int = 30
category: str = “general” # 分类:data / communication / system
class ToolRegistry:
“”“工具注册中心”“”
def __init__(self):
self._tools: Dict[str, ToolDefinition] = {}
def register(self, tool: ToolDefinition) -> None:
"""注册一个工具"""
if tool.name in self._tools:
raise ValueError(f"工具 {tool.name} 已存在")
self._tools[tool.name] = tool
def register_from_function(
self,
func: Callable,
name: str = None,
description: str = None,
parameters_schema: Dict[str, Any] = None,
**kwargs,
) -> None:
"""从函数自动注册工具(装饰器模式)"""
tool = ToolDefinition(
name=name or func.__name__,
description=description or func.__doc__ or "",
parameters_schema=parameters_schema or {},
handler=func,
**kwargs,
)
self.register(tool)
def get(self, name: str) -> Optional[ToolDefinition]:
return self._tools.get(name)
def list_tools(self) -> List[ToolDefinition]:
return list(self._tools.values())
def to_openai_tools(self) -> List[Dict[str, Any]]:
"""转换为 OpenAI Function Calling 格式"""
tools = []
for t in self._tools.values():
tools.append({
"type": "function",
"function": {
"name": t.name,
"description": t.description,
"parameters": t.parameters_schema,
},
})
return tools
4.2 工具调用执行器
在实际调用 LLM 时,需要处理 Function Calling 的完整循环 — 即 LLM 返回 tool_calls → 执行 → 把结果返回给 LLM → 继续推理:
import json
from typing import AsyncGenerator
class ToolExecutor:
“”“工具调用执行器 —— 处理 ReAct 循环”“”
MAX_TOOL_CALL_ROUNDS = 10 # 防止无限循环
def __init__(self, llm_client, tool_registry: ToolRegistry):
self.llm = llm_client
self.registry = tool_registry
async def run_with_tools(
self,
messages: List[Dict[str, Any]],
) -> AsyncGenerator[Dict[str, Any], None]:
"""
执行带工具调用的对话流程,使用流式输出。
每步以事件形式 yield,方便上层做进度展示。
"""
round_count = 0
while round_count < self.MAX_TOOL_CALL_ROUNDS:
round_count += 1
# 调用 LLM
response = await self.llm.chat(
messages=messages,
tools=self.registry.to_openai_tools(),
tool_choice="auto",
)
choice = response["choices"][0]
assistant_msg = choice["message"]
# 如果 LLM 决定调用工具
if assistant_msg.get("tool_calls"):
tool_calls = assistant_msg["tool_calls"]
yield {
"type": "tool_calls_start",
"tool_calls": [
{"name": tc["function"]["name"], "arguments": tc["function"]["arguments"]}
for tc in tool_calls
],
}
# 将 assistant 消息加入历史
messages.append(assistant_msg)
# 并行执行所有工具调用
tool_results = await asyncio.gather(
*[self._execute_tool_call(tc) for tc in tool_calls],
return_exceptions=True,
)
# 将工具结果作为 tool 消息追加到对话
for tc, result in zip(tool_calls, tool_results):
if isinstance(result, Exception):
content = json.dumps({"error": str(result)}, ensure_ascii=False)
else:
content = json.dumps(result, ensure_ascii=False)
messages.append({
"role": "tool",
"tool_call_id": tc["id"],
"content": content,
})
yield {
"type": "tool_result",
"tool_name": tc["function"]["name"],
"result": content,
}
else:
# LLM 给出最终文本回复
content = assistant_msg.get("content", "")
messages.append(assistant_msg)
yield {"type": "final_answer", "content": content}
return
yield {
"type": "error",
"content": f"工具调用超过最大轮次限制 ({self.MAX_TOOL_CALL_ROUNDS})",
}
async def _execute_tool_call(self, tool_call: dict) -> Any:
"""执行单个工具调用,带超时控制"""
func_name = tool_call["function"]["name"]
tool = self.registry.get(func_name)
if not tool:
return {"error": f"未知工具: {func_name}"}
try:
arguments = json.loads(tool_call["function"]["arguments"])
except json.JSONDecodeError:
return {"error": f"参数解析失败: {tool_call['function']['arguments']}"}
try:
result = await asyncio.wait_for(
tool.handler(**arguments) if asyncio.iscoroutinefunction(tool.handler)
else asyncio.to_thread(tool.handler, **arguments),
timeout=tool.timeout_seconds,
)
return result
except asyncio.TimeoutError:
return {"error": f"工具 {func_name} 执行超时 ({tool.timeout_seconds}s)"}
五、多轮对话:上下文管理与状态流转
5.1 对话记忆管理
多轮对话的核心挑战在于有限上下文窗口与长期记忆之间的平衡。解决方案是分层记忆管理:
from collections import deque
from datetime import datetime
@dataclass
class ConversationTurn:
“”“单轮对话记录”“”
role: str # “user” | “assistant” | “tool”
content: str
timestamp: float = field(default_factory=lambda: datetime.now().timestamp())
metadata: Dict[str, Any] = field(default_factory=dict)
class ConversationMemory:
“”“分层对话记忆管理”“”
def __init__(
self,
max_recent_turns: int = 20, # 近期完整对话轮数
max_summary_turns: int = 100, # 摘要覆盖的历史轮数
token_budget: int = 8000, # Token 预算
):
self.recent_turns: deque[ConversationTurn] = deque(maxlen=max_recent_turns)
self.summary: str = "" # 早期对话的压缩摘要
self.max_recent_turns = max_recent_turns
self.max_summary_turns = max_summary_turns
self.token_budget = token_budget
self._total_turns = 0
def add_turn(self, turn: ConversationTurn) -> None:
"""添加一轮对话,超出窗口的自动转入摘要"""
self._total_turns += 1
if len(self.recent_turns) >= self.max_recent_turns:
# 将最早的对话移入摘要
evicted = self.recent_turns.popleft()
self.summary = self._update_summary(self.summary, evicted)
self.recent_turns.append(turn)
def to_messages(self) -> List[Dict[str, str]]:
"""构建发给 LLM 的消息列表"""
messages = []
# 1. 系统消息:包含摘要
system_content = "你是一个智能助手。"
if self.summary:
system_content += f"\n\n[历史对话摘要]\n{self.summary}"
messages.append({"role": "system", "content": system_content})
# 2. 近期完整对话
for turn in self.recent_turns:
messages.append({"role": turn.role, "content": turn.content})
return messages
def _update_summary(self, current_summary: str, turn: ConversationTurn) -> str:
"""增量更新摘要(简化版:实际项目中应调用 LLM 做摘要)"""
snippet = f"[{turn.role}]: {turn.content[:100]}..."
if current_summary:
return current_summary + "\n" + snippet
return snippet
def estimate_tokens(self) -> int:
"""估算当前上下文的 Token 数量"""
total = len(self.summary) // 2 # 粗略估算:中文约2字符/token
for turn in self.recent_turns:
total += len(turn.content) // 2
return total
5.2 对话状态机
企业级 Agent 往往需要维护会话级别的状态,例如:正在等待用户确认、正在执行某个长流程等。使用有限状态机进行管理:
class SessionState(Enum):
IDLE = “idle” # 空闲,等待用户输入
PLANNING = “planning” # 正在拆解任务
EXECUTING = “executing” # 正在执行
AWAITING_CONFIRMATION = “awaiting_confirmation” # 等待用户确认
COMPLETED = “completed” # 当前任务完成
@dataclass
class Session:
“”“会话上下文”“”
session_id: str
state: SessionState = SessionState.IDLE
memory: ConversationMemory = field(default_factory=ConversationMemory)
current_plan: Optional[TaskPlan] = None
pending_confirmation: Optional[str] = None # 待确认的操作描述
created_at: float = field(default_factory=lambda: datetime.now().timestamp())
metadata: Dict[str, Any] = field(default_factory=dict)
5.3 上下文压缩策略
当 Token 预算紧张时,需要主动压缩历史消息:
class ContextCompressor:
“”“上下文压缩器 —— 在 Token 预算不足时自动压缩早期对话”“”
COMPRESSION_PROMPT = """请将以下对话历史压缩为一段简洁的摘要,保留关键信息(决策、数据、结论),丢弃冗余描述。"""
def __init__(self, llm_client, target_ratio: float = 0.5):
self.llm = llm_client
self.target_ratio = target_ratio # 压缩到原长度的比例
async def compress(self, memory: ConversationMemory) -> None:
"""压缩记忆中的早期对话"""
if memory.estimate_tokens() < memory.token_budget:
return # 无需压缩
# 取最早的 50% 历史进行压缩
turns_list = list(memory.recent_turns)
split_point = len(turns_list) // 2
old_turns = turns_list[:split_point]
new_turns = turns_list[split_point:]
# 构建待压缩文本
to_compress = "\n".join(
f"[{t.role}]: {t.content}" for t in old_turns
)
# 调用 LLM 压缩
response = await self.llm.chat(
f"{self.COMPRESSION_PROMPT}\n\n对话历史:\n{to_compress}"
)
compressed_summary = response.strip()
# 更新记忆:摘要 + 近期对话
memory.summary = (
f"{memory.summary}\n\n[压缩历史]\n{compressed_summary}"
if memory.summary
else compressed_summary
)
memory.recent_turns = deque(new_turns, maxlen=memory.max_recent_turns)
六、完整集成:EnterpriseAgent
将上述三大模块组装为一个完整的企业级 Agent:
class EnterpriseAgent:
“”“企业级 AI Agent —— 集成任务拆解、工具调用与多轮对话”“”
def __init__(
self,
llm_client,
tool_registry: ToolRegistry,
config: Optional[Dict[str, Any]] = None,
):
self.llm = llm_client
self.tool_registry = tool_registry
self.config = config or {}
# 初始化核心组件
self.planner = TaskPlanner(llm_client, tool_registry)
self.executor = DAGExecutor(tool_registry)
self.tool_executor = ToolExecutor(llm_client, tool_registry)
self.compressor = ContextCompressor(llm_client)
# 会话管理
self._sessions: Dict[str, Session] = {}
async def chat(
self,
session_id: str,
user_message: str,
) -> AsyncGenerator[Dict[str, Any], None]:
"""主入口:处理用户消息,流式返回结果"""
# 获取或创建会话
session = self._get_or_create_session(session_id)
# 添加用户消息到记忆
session.memory.add_turn(ConversationTurn(
role="user", content=user_message
))
# Token 预算检查与压缩
await self.compressor.compress(session.memory)
yield {"type": "status", "content": "正在分析您的请求..."}
# 步骤 1:判断是否需要任务拆解
if await self._requires_planning(user_message):
session.state = SessionState.PLANNING
yield {"type": "status", "content": "正在拆解任务..."}
plan = await self.planner.plan(user_message)
session.current_plan = plan
yield {
"type": "plan",
"reasoning": plan.reasoning,
"sub_tasks": [
{"id": t.task_id, "name": t.name, "description": t.description}
for t in plan.sub_tasks
],
}
# 步骤 2:执行任务 DAG
session.state = SessionState.EXECUTING
yield {"type": "status", "content": "正在执行任务..."}
results = await self.executor.execute(plan)
yield {"type": "task_results", "results": results}
# 将执行结果注入对话,让 LLM 生成总结
session.memory.add_turn(ConversationTurn(
role="assistant",
content=f"[系统] 任务执行完成,结果: {json.dumps(results, ensure_ascii=False, default=str)}",
metadata={"type": "task_execution"},
))
# 步骤 3:标准工具调用对话
session.state = SessionState.EXECUTING
messages = session.memory.to_messages()
async for event in self.tool_executor.run_with_tools(messages):
yield event
# 将助手回复存入记忆
if event["type"] == "final_answer":
session.memory.add_turn(ConversationTurn(
role="assistant", content=event["content"]
))
session.state = SessionState.IDLE
async def _requires_planning(self, user_message: str) -> bool:
"""判断用户请求是否复杂到需要任务拆解"""
PLAN_CHECK_PROMPT = f"""判断以下用户请求是否需要拆解为多个子任务(返回 true 或 false):
用户请求:{user_message}
判断标准:
- 涉及多个独立步骤或操作 → true
- 需要调用多个不同工具 → true
- 简单问答或单步操作 → false
只返回 “true” 或 “false”。“”"
response = await self.llm.chat(PLAN_CHECK_PROMPT)
return "true" in response.strip().lower()
def _get_or_create_session(self, session_id: str) -> Session:
"""获取或创建会话"""
if session_id not in self._sessions:
self._sessions[session_id] = Session(session_id=session_id)
return self._sessions[session_id]
def close_session(self, session_id: str) -> None:
"""关闭会话,释放资源"""
self._sessions.pop(session_id, None)
七、使用示例
async def main():
# 初始化 LLM 客户端(此处以 OpenAI 兼容接口为例)
from openai import AsyncOpenAI
llm_client = AsyncOpenAI(api_key="your-api-key", base_url="https://api.openai.com/v1")
# 初始化工具注册中心并注册业务工具
registry = ToolRegistry()
# 注册数据查询工具
registry.register(ToolDefinition(
name="query_sales_data",
description="查询销售数据,支持按时间范围、地区筛选",
parameters_schema={
"type": "object",
"properties": {
"start_date": {"type": "string", "description": "开始日期,格式 YYYY-MM-DD"},
"end_date": {"type": "string", "description": "结束日期,格式 YYYY-MM-DD"},
"region": {"type": "string", "description": "地区,可选"},
},
"required": ["start_date", "end_date"],
},
handler=query_sales_data_impl,
category="data",
))
# 注册邮件发送工具
registry.register(ToolDefinition(
name="send_email",
description="发送邮件给指定收件人",
parameters_schema={
"type": "object",
"properties": {
"to": {"type": "string", "description": "收件人邮箱"},
"subject": {"type": "string", "description": "邮件主题"},
"body": {"type": "string", "description": "邮件正文"},
},
"required": ["to", "subject", "body"],
},
handler=send_email_impl,
require_confirmation=True, # 敏感操作需用户确认
category="communication",
))
# 创建 Agent
agent = EnterpriseAgent(
llm_client=llm_client,
tool_registry=registry,
config={"max_tool_rounds": 8},
)
# 用户对话
session_id = "user-001-session-20250101"
user_query = "帮我分析上周北京的销售数据,生成总结报告并发送给 zhang@company.com"
async for event in agent.chat(session_id, user_query):
print(f"[{event['type']}] {event.get('content', '')}")
# 清理
agent.close_session(session_id)
async def query_sales_data_impl(start_date: str, end_date: str, region: str = None):
“”“实际的数据查询实现”“”
# 这里连接真实数据源(数据库 / API)
return {
“total_revenue”: 1285000,
“orders”: 342,
“top_product”: “智能传感器 X3”,
“period”: f"{start_date} ~ {end_date}",
}
async def send_email_impl(to: str, subject: str, body: str):
“”“实际的邮件发送实现”“”
# 调用 SMTP 或邮件服务 API
return {“status”: “sent”, “to”: to, “subject”: subject}
if name == “main”:
asyncio.run(main())
八、生产环境考量
8.1 安全审计
企业级 Agent 的每一次工具调用都应记录审计日志:
import logging
from datetime import datetime, timezone
class AuditLogger:
“”“审计日志记录器”“”
def __init__(self, log_file: str = "agent_audit.log"):
self.logger = logging.getLogger("agent_audit")
handler = logging.FileHandler(log_file)
handler.setFormatter(logging.Formatter(
'%(asctime)s | %(levelname)s | %(message)s'
))
self.logger.addHandler(handler)
self.logger.setLevel(logging.INFO)
def log_tool_call(self, session_id: str, tool_name: str,
arguments: dict, result: Any, duration_ms: float):
"""记录工具调用"""
self.logger.info(
f"session={session_id} | tool={tool_name} | "
f"args={json.dumps(arguments, ensure_ascii=False)} | "
f"result={str(result)[:200]} | duration={duration_ms:.0f}ms"
)
def log_llm_call(self, session_id: str, prompt_tokens: int,
completion_tokens: int, duration_ms: float):
"""记录 LLM 调用"""
self.logger.info(
f"session={session_id} | llm_call | "
f"prompt_tokens={prompt_tokens} | completion_tokens={completion_tokens} | "
f"duration={duration_ms:.0f}ms"
)
8.2 速率限制与熔断
class RateLimiter:
“”“简单的滑动窗口速率限制器”“”
def __init__(self, max_calls: int, window_seconds: float):
self.max_calls = max_calls
self.window = window_seconds
self._calls: deque[float] = deque()
async def acquire(self) -> bool:
"""尝试获取调用许可"""
now = datetime.now().timestamp()
# 清理过期记录
while self._calls and self._calls[0] < now - self.window:
self._calls.popleft()
if len(self._calls) >= self.max_calls:
return False
self._calls.append(now)
return True
8.3 可观测性
建议为 Agent 接入 OpenTelemetry,对每一次 LLM 调用和工具调用进行 Tracing,并通过 Prometheus + Grafana 监控关键指标:各环节耗时分布、工具调用成功率、Token 消耗趋势、DAG 执行时长等。
九、总结
构建企业级 AI Agent 绝非简单的 Prompt Engineering。本文从工程角度出发,围绕任务拆解(DAG 规划与并行执行)、工具调用(Function Calling 的 ReAct 循环)、多轮对话(分层记忆与状态机)三个核心环节。
真正优秀的企业 Agent,是对这三者进行精细化工程治理后的产物。希望本文的架构设计与代码实现,能为在 Agent 落地之路上提供有价值的参考。下一步可以在此基础上扩展:引入 Human-in-the-Loop 审批流、向量化长期记忆、多 Agent 协作编排等高级能力。
更多推荐




所有评论(0)