从“工具”到“协作者”:2026年AI Agent的范式跃迁与技术演进全景图
从简单的对话助手到能够自主规划、执行复杂任务的智能体,AI Agent正经历着从“被动工具”到“主动协作者”的深刻转变。根据Gartner的预测,到2028年,约33%的企业软件应用将内嵌代理型AI,而15%的日常工作决策将由这些智能体自主完成。AI Agent不再仅仅是等待指令的“工具”,而是能够理解意图、规划路径、调用工具并交付成果的“数字伙伴”。实现多智能体协作的关键是建立统一的通信协议。多
从“工具”到“协作者”:2026年AI Agent的范式跃迁与技术演进全景图
引言:AI Agent的时代拐点
当我们回顾2025年,一个清晰的共识正在形成:这一年被广泛认定为“AI智能体元年”。从简单的对话助手到能够自主规划、执行复杂任务的智能体,AI Agent正经历着从“被动工具”到“主动协作者”的深刻转变。根据Gartner的预测,到2028年,约33%的企业软件应用将内嵌代理型AI,而15%的日常工作决策将由这些智能体自主完成。
这种转变不仅仅是技术能力的提升,更是人机交互范式的根本性重构。AI Agent不再仅仅是等待指令的“工具”,而是能够理解意图、规划路径、调用工具并交付成果的“数字伙伴”。本文将深入探讨2026年AI Agent从“工具”到“协作者”的演进路径,涵盖技术架构、实现代码、应用场景和未来趋势。
第一章:AI Agent的定义演进与核心特征
1.1 从传统AI助手到通用型Agent
传统AI助手与通用型AI Agent存在本质区别:
| 维度 | 传统AI助手 | 通用型AI Agent |
|---|---|---|
| 交互模式 | 一问一答 | 目标导向、多轮协作 |
| 能力范围 | 信息检索、内容生成 | 任务规划、工具调用、结果交付 |
| 自主性 | 被动响应 | 主动拆解、自主执行 |
| 输出形式 | 文本、建议 | 可执行成果(文件、代码、数据) |
| 工作流程 | 单线程对话 | 多智能体协作、并行处理 |
通用型AI Agent是指能够理解自然语言指令、自主规划任务路径、调用多种工具和服务、并最终完成复杂目标的智能体系统。这种转变的核心在于AI Agent具备了“感知-决策-行动-记忆”的完整认知闭环。
1.2 AI Agent的四大核心支柱
现代AI Agent的架构围绕四大核心支柱构建:
-
规划(Planning):智能体通过自我反思、自我批评以及思维链进行子目标拆解,在复杂环境中寻找最优行动路径。
-
记忆(Memory):实现逻辑连续性的基石,包括短期记忆和长期记忆,用于优化服务体验。
-
工具使用(Tools):智能体能够精准调用外部API,如Search()、Calculator()、Calendar()以及CodeInterpreter()等,打破知识边界。
-
行动(Action):最终通过对话、任务探索或完成生成对物理或数字世界的正向反馈。
第二章:技术架构演进:从单体到多智能体系统
2.1 单体Agent的局限性
尽管单体AI Agent在创意与代码逻辑上已臻化境,但在面对极端复杂的业务链路时,单兵作战的AI往往暴露出效率瓶颈与容错缺失。根据Google DeepMind的实证研究,当单Agent基线准确率超过45%时,增加Agent反而导致性能下降;在严格顺序依赖任务中,多Agent效率可能暴跌70%。
2.2 多智能体协作系统的崛起
多智能体系统(MAS)通过将任务拆解并交由不同专长的Agent协作完成,实现“1+1>2”的集体智能。其核心优势包括:
- 专业化分工:不同Agent专注于特定领域,形成专家团队
- 任务并行提速:多个子任务可同时处理,大幅提升效率
- 系统灵活鲁棒:局部失败可由其他Agent补位,避免单点故障
- 复杂场景模拟:对交通、供应链等复杂场景的高保真模拟能力
2.3 多智能体架构模式
2025年,业界已形成三类成熟的多智能体架构:
| 架构类型 | 特点 | 适用场景 |
|---|---|---|
| 层级式 | 中央协调器+专业执行器 | 企业级复杂工作流 |
| 平等式 | 对等协商、民主决策 | 去中心化应用 |
| 混合式 | 结合层级与平等优势 | 动态变化环境 |
第三章:核心技术栈与开发框架
3.1 AI Agent技术栈全景图
根据CB Insights的报告,AI Agent技术栈可拆解为6大层级:
# AI Agent技术栈层级结构示例
class AIAgentTechStack:
def __init__(self):
self.layers = {
"基础模型": ["LLM", "部署平台"],
"开发框架": ["Agent构建平台", "低代码工具"],
"工具集成": ["浏览器", "搜索", "API调用"],
"上下文/记忆": ["向量数据库", "记忆管理"],
"编排系统": ["多Agent协作", "任务路由"],
"监督治理": ["安全", "认证", "观测"]
}
def get_companies_by_layer(self, layer):
# 各层级代表公司映射
company_map = {
"基础模型": ["OpenAI", "Anthropic", "Together.ai"],
"开发框架": ["LangChain", "CrewAI", "StackAI"],
"工具集成": ["Composio", "Tavily", "E2B"],
"上下文/记忆": ["Pinecone", "Letta", "Zep"],
"编排系统": ["Fetch.ai", "Magnetic-One"],
"监督治理": ["Langfuse", "Coval", "Larridin"]
}
return company_map.get(layer, [])
3.2 LangChain框架深度解析
LangChain是目前最流行的AI Agent开发框架之一,其核心设计围绕四大组件展开:
from langchain.agents import load_tools, initialize_agent
from langchain_openai import ChatOpenAI
from langchain.tools import tool
from langchain_core.prompts import PromptTemplate
# 1. 初始化语言模型
llm = ChatOpenAI(temperature=0, model="gpt-4o")
# 2. 定义自定义工具
@tool
def search_wikipedia(query: str) -> str:
"""在Wikipedia上搜索信息"""
# 实际的Wikipedia API调用
from wikipedia import summary
return summary(query, sentences=3)
@tool
def calculate(expression: str) -> float:
"""计算数学表达式"""
return eval(expression)
# 3. 加载工具集
tools = load_tools(["serpapi", "llm-math"], llm=llm)
tools.extend([search_wikipedia, calculate])
# 4. 创建ReAct代理
agent = initialize_agent(
tools,
llm,
agent="zero-shot-react-description",
verbose=True
)
# 5. 执行任务
result = agent.run("查询2025年AI Agent市场规模,并计算相对于2024年的增长率")
print(result)
3.3 结构化输出与工具调用
LangChain v1.x引入了更可靠的结构化输出支持,适用于支持原生结构化输出的提供者:
from langchain.agents.structured_output import ToolStrategy
from langchain.agents import create_agent
from pydantic import BaseModel
from typing import List
# 定义响应格式
class ContactInfo(BaseModel):
name: str
email: str
phone: str
interests: List[str]
# 创建支持结构化输出的Agent
structured_agent = create_agent(
model="gpt-4o",
response_format=ToolStrategy(ContactInfo),
tools=[search_wikipedia, calculate]
)
# 调用示例
response = structured_agent.invoke({
"messages": [{
"role": "user",
"content": "提取以下文本中的联系信息:张三,邮箱zhangsan@example.com,电话13800138000,对AI和区块链感兴趣"
}]
})
第四章:从工具到协作者的关键技术突破
4.1 自主规划与决策能力
AI Agent从工具到协作者的核心突破在于自主规划能力。以下是一个基于蒙特卡洛树搜索(MCTS)的规划Agent示例:
import numpy as np
from collections import defaultdict
import random
class MCTSAgent:
"""基于蒙特卡洛树搜索的规划Agent"""
def __init__(self, exploration_weight=1.0):
self.Q = defaultdict(float) # 动作价值
self.N = defaultdict(int) # 访问次数
self.children = dict() # 子节点
self.exploration_weight = exploration_weight
def choose(self, node):
"""选择最优动作"""
if node not in self.children:
return node.find_random_child()
def score(n):
if self.N[n] == 0:
return float("-inf")
return self.Q[n] / self.N[n]
return max(self.children[node], key=score)
def do_rollout(self, node):
"""执行一次模拟"""
path = self._select(node)
leaf = path[-1]
self._expand(leaf)
reward = self._simulate(leaf)
self._backpropagate(path, reward)
def _select(self, node):
"""选择路径"""
path = []
while True:
path.append(node)
if node not in self.children or not self.children[node]:
return path
unexplored = self.children[node] - self.children.keys()
if unexplored:
n = unexplored.pop()
path.append(n)
return path
node = self._uct_select(node)
def _expand(self, node):
"""扩展节点"""
if node in self.children:
return
self.children[node] = node.find_children()
def _simulate(self, node):
"""模拟游戏"""
while True:
if node.is_terminal():
return node.reward()
node = node.find_random_child()
def _backpropagate(self, path, reward):
"""反向传播更新"""
for node in reversed(path):
self.N[node] += 1
self.Q[node] += reward
def _uct_select(self, node):
"""使用UCT公式选择节点"""
log_N_vertex = np.log(self.N[node])
def uct(n):
return self.Q[n] / self.N[n] + self.exploration_weight * np.sqrt(
log_N_vertex / self.N[n]
)
return max(self.children[node], key=uct)
4.2 记忆与上下文管理
长期记忆和上下文管理是AI Agent成为协作者的关键能力。以下是一个基于向量数据库的记忆系统实现:
import numpy as np
from typing import List, Dict, Any
import pickle
from datetime import datetime
class AgentMemory:
"""AI Agent记忆系统"""
def __init__(self, embedding_dim=768, max_memory_size=10000):
self.embedding_dim = embedding_dim
self.max_memory_size = max_memory_size
# 记忆存储
self.short_term_memory = [] # 短期记忆(对话上下文)
self.long_term_memory = [] # 长期记忆(向量化存储)
self.memory_embeddings = [] # 记忆的向量表示
# 元数据
self.memory_metadata = []
self.access_count = defaultdict(int)
self.last_accessed = {}
def add_memory(self, content: str, embedding: np.ndarray,
metadata: Dict[str, Any] = None):
"""添加新记忆"""
memory_id = len(self.long_term_memory)
# 创建记忆对象
memory = {
"id": memory_id,
"content": content,
"embedding": embedding,
"timestamp": datetime.now(),
"metadata": metadata or {},
"importance": 1.0 # 初始重要性
}
# 添加到长期记忆
self.long_term_memory.append(memory)
self.memory_embeddings.append(embedding)
self.memory_metadata.append(metadata or {})
# 管理记忆大小
if len(self.long_term_memory) > self.max_memory_size:
self._prune_memory()
return memory_id
def retrieve_relevant_memories(self, query_embedding: np.ndarray,
top_k: int = 5):
"""检索相关记忆"""
if not self.memory_embeddings:
return []
# 计算相似度
embeddings_array = np.array(self.memory_embeddings)
similarities = np.dot(embeddings_array, query_embedding) / (
np.linalg.norm(embeddings_array, axis=1) * np.linalg.norm(query_embedding)
)
# 获取最相关的记忆
top_indices = np.argsort(similarities)[-top_k:][::-1]
# 更新访问统计
for idx in top_indices:
self.access_count[idx] += 1
self.last_accessed[idx] = datetime.now()
return [self.long_term_memory[idx] for idx in top_indices]
def update_memory_importance(self, memory_id: int,
importance_delta: float):
"""更新记忆重要性"""
if 0 <= memory_id < len(self.long_term_memory):
self.long_term_memory[memory_id]["importance"] += importance_delta
self.long_term_memory[memory_id]["importance"] = max(
0.1, min(10.0, self.long_term_memory[memory_id]["importance"])
)
def _prune_memory(self):
"""修剪记忆,保留最重要的"""
# 计算记忆得分:重要性 * 衰减因子
scores = []
current_time = datetime.now()
for i, memory in enumerate(self.long_term_memory):
# 时间衰减因子(最近访问的记忆更重要)
hours_since_access = (current_time - self.last_accessed.get(i, memory["timestamp"])).total_seconds() / 3600
time_decay = np.exp(-hours_since_access / 168) # 一周衰减
# 访问频率因子
access_factor = np.log(1 + self.access_count.get(i, 0))
# 综合得分
score = memory["importance"] * time_decay * (1 + 0.1 * access_factor)
scores.append((i, score))
# 按得分排序,保留前max_memory_size个
scores.sort(key=lambda x: x[1], reverse=True)
keep_indices = set(idx for idx, _ in scores[:self.max_memory_size])
# 重建记忆存储
new_memories = []
new_embeddings = []
new_metadata = []
for i in range(len(self.long_term_memory)):
if i in keep_indices:
new_memories.append(self.long_term_memory[i])
new_embeddings.append(self.memory_embeddings[i])
new_metadata.append(self.memory_metadata[i])
self.long_term_memory = new_memories
self.memory_embeddings = new_embeddings
self.memory_metadata = new_metadata
def save_memory(self, filepath: str):
"""保存记忆到文件"""
with open(filepath, 'wb') as f:
pickle.dump({
'long_term_memory': self.long_term_memory,
'memory_embeddings': self.memory_embeddings,
'memory_metadata': self.memory_metadata,
'access_count': dict(self.access_count),
'last_accessed': self.last_accessed
}, f)
def load_memory(self, filepath: str):
"""从文件加载记忆"""
with open(filepath, 'rb') as f:
data = pickle.load(f)
self.long_term_memory = data['long_term_memory']
self.memory_embeddings = data['memory_embeddings']
self.memory_metadata = data['memory_metadata']
self.access_count = defaultdict(int, data['access_count'])
self.last_accessed = data['last_accessed']
4.3 工具调用与API集成
AI Agent作为协作者的核心能力之一是能够调用外部工具和API。以下是一个完整的工具调用框架:
from typing import Any, Callable, Dict, List, Optional, Type
from pydantic import BaseModel, Field
import inspect
import json
from datetime import datetime
import requests
class ToolParameter(BaseModel):
"""工具参数定义"""
name: str
type: str
description: str
required: bool = True
default: Any = None
class ToolDefinition(BaseModel):
"""工具定义"""
name: str
description: str
parameters: List[ToolParameter]
return_type: str
examples: List[Dict] = []
class ToolExecutionResult(BaseModel):
"""工具执行结果"""
success: bool
result: Any
error_message: Optional[str] = None
execution_time: float
timestamp: datetime = Field(default_factory=datetime.now)
class ToolRegistry:
"""工具注册与管理中心"""
def __init__(self):
self.tools: Dict[str, Callable] = {}
self.tool_definitions: Dict[str, ToolDefinition] = {}
self.execution_history: List[ToolExecutionResult] = []
def register_tool(self, func: Callable) -> Callable:
"""注册工具函数"""
# 提取函数信息
func_name = func.__name__
func_doc = inspect.getdoc(func) or ""
# 解析参数
signature = inspect.signature(func)
parameters = []
for param_name, param in signature.parameters.items():
if param_name == 'self':
continue
param_type = str(param.annotation) if param.annotation != inspect.Parameter.empty else "Any"
param_desc = ""
# 从docstring中提取参数描述
if 'Args:' in func_doc:
args_section = func_doc.split('Args:')[1].split('\n\n')[0]
for line in args_section.strip().split('\n'):
if f'{param_name}:' in line:
param_desc = line.split(f'{param_name}:')[1].strip()
break
parameters.append(ToolParameter(
name=param_name,
type=param_type,
description=param_desc,
required=param.default == inspect.Parameter.empty,
default=param.default if param.default != inspect.Parameter.empty else None
))
# 创建工具定义
tool_def = ToolDefinition(
name=func_name,
description=func_doc.split('\n\n')[0] if '\n\n' in func_doc else func_doc,
parameters=parameters,
return_type=str(signature.return_annotation) if signature.return_annotation != inspect.Signature.empty else "Any"
)
# 注册工具
self.tools[func_name] = func
self.tool_definitions[func_name] = tool_def
return func
def execute_tool(self, tool_name: str, **kwargs) -> ToolExecutionResult:
"""执行工具"""
if tool_name not in self.tools:
return ToolExecutionResult(
success=False,
result=None,
error_message=f"Tool '{tool_name}' not found"
)
start_time = datetime.now()
try:
# 验证参数
tool_def = self.tool_definitions[tool_name]
for param in tool_def.parameters:
if param.required and param.name not in kwargs:
raise ValueError(f"Missing required parameter: {param.name}")
# 执行工具
result = self.tools**kwargs
execution_time = (datetime.now() - start_time).total_seconds()
execution_result = ToolExecutionResult(
success=True,
result=result,
execution_time=execution_time
)
except Exception as e:
execution_time = (datetime.now() - start_time).total_seconds()
execution_result = ToolExecutionResult(
success=False,
result=None,
error_message=str(e),
execution_time=execution_time
)
# 记录执行历史
self.execution_history.append(execution_result)
# 限制历史记录大小
if len(self.execution_history) > 1000:
self.execution_history = self.execution_history[-1000:]
return execution_result
def get_tool_schema(self, tool_name: str) -> Optional[Dict]:
"""获取工具模式"""
if tool_name not in self.tool_definitions:
return None
tool_def = self.tool_definitions[tool_name]
return tool_def.dict()
def list_tools(self) -> List[Dict]:
"""列出所有可用工具"""
return [
{
"name": name,
"description": defn.description,
"parameters": [p.dict() for p in defn.parameters]
}
for name, defn in self.tool_definitions.items()
]
# 示例工具定义
tool_registry = ToolRegistry()
@tool_registry.register_tool
def search_web(query: str, num_results: int = 5) -> List[Dict]:
"""
在互联网上搜索信息
Args:
query: 搜索关键词
num_results: 返回结果数量,默认为5
Returns:
搜索结果列表,每个结果包含标题、链接和摘要
"""
# 实际实现会调用搜索引擎API
# 这里返回模拟数据
return [
{
"title": f"关于{query}的最新信息",
"url": f"https://example.com/search?q={query}",
"snippet": f"这是关于{query}的搜索结果摘要..."
}
for _ in range(num_results)
]
@tool_registry.register_tool
def calculate_expression(expression: str) -> float:
"""
计算数学表达式
Args:
expression: 数学表达式,如"2 + 3 * 4"
Returns:
计算结果
"""
# 注意:实际应用中需要更安全的方式执行表达式
return eval(expression)
@tool_registry.register_tool
def get_weather(city: str, country: str = "CN") -> Dict[str, Any]:
"""
获取城市天气信息
Args:
city: 城市名称
country: 国家代码,默认为中国
Returns:
天气信息字典
"""
# 模拟天气API调用
return {
"city": city,
"country": country,
"temperature": 25.5,
"condition": "晴朗",
"humidity": 65,
"wind_speed": 12.3
}
@tool_registry.register_tool
def send_email(to: str, subject: str, body: str) -> Dict[str, Any]:
"""
发送电子邮件
Args:
to: 收件人邮箱
subject: 邮件主题
body: 邮件正文
Returns:
发送结果
"""
# 实际实现会调用邮件服务API
return {
"success": True,
"message_id": f"msg_{datetime.now().timestamp()}",
"to": to,
"timestamp": datetime.now().isoformat()
}
第五章:多智能体协作系统实现
5.1 多智能体协作架构设计
多智能体协作系统(MACS)已成为跨越AI“最后一公里”的必然选择。以下是一个完整的多智能体协作系统实现:
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
import asyncio
from dataclasses import dataclass
from datetime import datetime
import uuid
class AgentRole(Enum):
"""智能体角色定义"""
COORDINATOR = "coordinator" # 协调者
EXPERT = "expert" # 领域专家
VALIDATOR = "validator" # 验证者
EXECUTOR = "executor" # 执行者
MONITOR = "monitor" # 监控者
class TaskStatus(Enum):
"""任务状态"""
PENDING = "pending"
IN_PROGRESS = "in_progress"
COMPLETED = "completed"
FAILED = "failed"
CANCELLED = "cancelled"
@dataclass
class Task:
"""任务定义"""
task_id: str
description: str
priority: int = 1
dependencies: List[str] = None
status: TaskStatus = TaskStatus.PENDING
created_at: datetime = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
result: Any = None
error: Optional[str] = None
def __post_init__(self):
if self.created_at is None:
self.created_at = datetime.now()
if self.dependencies is None:
self.dependencies = []
@dataclass
class Message:
"""智能体间消息"""
message_id: str
sender_id: str
receiver_id: str
content: Any
message_type: str
timestamp: datetime = None
priority: int = 1
def __post_init__(self):
if self.timestamp is None:
self.timestamp = datetime.now()
class BaseAgent:
"""基础智能体类"""
def __init__(self, agent_id: str, role: AgentRole, capabilities: List[str]):
self.agent_id = agent_id
self.role = role
self.capabilities = capabilities
self.message_queue = asyncio.Queue()
self.is_running = False
self.current_task: Optional[Task] = None
async def start(self):
"""启动智能体"""
self.is_running = True
asyncio.create_task(self._message_loop())
async def stop(self):
"""停止智能体"""
self.is_running = False
async def _message_loop(self):
"""消息处理循环"""
while self.is_running:
try:
message = await self.message_queue.get()
await self.process_message(message)
except Exception as e:
print(f"Agent {self.agent_id} error processing message: {e}")
async def process_message(self, message: Message):
"""处理消息(子类实现)"""
raise NotImplementedError
async def send_message(self, receiver_id: str, content: Any,
message_type: str = "task"):
"""发送消息"""
# 在实际系统中,这里会通过消息总线发送
message = Message(
message_id=str(uuid.uuid4()),
sender_id=self.agent_id,
receiver_id=receiver_id,
content=content,
message_type=message_type
)
# 模拟消息发送
print(f"Agent {self.agent_id} -> {receiver_id}: {message_type}")
def can_handle_task(self, task_description: str) -> bool:
"""判断是否能处理任务"""
# 根据能力和角色判断
return any(cap in task_description.lower() for cap in self.capabilities)
class CoordinatorAgent(BaseAgent):
"""协调者智能体"""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.COORDINATOR,
["coordinate", "delegate", "monitor"])
self.registered_agents: Dict[str, BaseAgent] = {}
self.task_queue: List[Task] = []
self.task_history: Dict[str, Task] = {}
def register_agent(self, agent: BaseAgent):
"""注册智能体"""
self.registered_agents[agent.agent_id] = agent
async def assign_task(self, task: Task):
"""分配任务"""
# 找到最适合的智能体
best_agent = None
best_score = -1
for agent_id, agent in self.registered_agents.items():
if agent.can_handle_task(task.description):
# 简单的评分机制
score = sum(1 for cap in agent.capabilities
if cap in task.description.lower())
if score > best_score:
best_score = score
best_agent = agent
if best_agent:
task.status = TaskStatus.IN_PROGRESS
task.started_at = datetime.now()
self.task_history[task.task_id] = task
# 发送任务给智能体
await self.send_message(
best_agent.agent_id,
{"task": task, "action": "execute"},
"task_assignment"
)
return True
else:
task.status = TaskStatus.FAILED
task.error = "No suitable agent found"
return False
async def process_message(self, message: Message):
"""处理消息"""
if message.message_type == "task_completed":
task_id = message.content["task_id"]
result = message.content["result"]
if task_id in self.task_history:
task = self.task_history[task_id]
task.status = TaskStatus.COMPLETED
task.completed_at = datetime.now()
task.result = result
print(f"Task {task_id} completed by {message.sender_id}")
elif message.message_type == "task_failed":
task_id = message.content["task_id"]
error = message.content["error"]
if task_id in self.task_history:
task = self.task_history[task_id]
task.status = TaskStatus.FAILED
task.error = error
print(f"Task {task_id} failed: {error}")
class ExpertAgent(BaseAgent):
"""专家智能体"""
def __init__(self, agent_id: str, expertise: str):
capabilities = [expertise.lower(), "analyze", "research"]
super().__init__(agent_id, AgentRole.EXPERT, capabilities)
self.expertise = expertise
async def process_message(self, message: Message):
"""处理消息"""
if message.message_type == "task_assignment":
task = message.content["task"]
self.current_task = task
# 模拟专家处理任务
print(f"Expert {self.agent_id} processing task: {task.description}")
# 执行任务
await asyncio.sleep(1) # 模拟处理时间
# 返回结果
result = {
"analysis": f"Expert analysis of {task.description}",
"recommendations": ["Recommendation 1", "Recommendation 2"],
"confidence": 0.85
}
# 通知协调者
await self.send_message(
message.sender_id,
{
"task_id": task.task_id,
"result": result,
"agent_id": self.agent_id
},
"task_completed"
)
self.current_task = None
class ExecutorAgent(BaseAgent):
"""执行者智能体"""
def __init__(self, agent_id: str):
super().__init__(agent_id, AgentRole.EXECUTOR,
["execute", "implement", "run"])
async def process_message(self, message: Message):
"""处理消息"""
if message.message_type == "task_assignment":
task = message.content["task"]
self.current_task = task
# 模拟执行任务
print(f"Executor {self.agent_id} executing task: {task.description}")
# 执行任务
await asyncio.sleep(0.5) # 模拟执行时间
# 返回结果
result = {
"execution_status": "success",
"output": f"Task {task.task_id} executed successfully",
"timestamp": datetime.now().isoformat()
}
# 通知协调者
await self.send_message(
message.sender_id,
{
"task_id": task.task_id,
"result": result,
"agent_id": self.agent_id
},
"task_completed"
)
self.current_task = None
class MultiAgentSystem:
"""多智能体系统"""
def __init__(self):
self.coordinator = CoordinatorAgent("coordinator_001")
self.agents: Dict[str, BaseAgent] = {
self.coordinator.agent_id: self.coordinator
}
def add_agent(self, agent: BaseAgent):
"""添加智能体"""
self.agents[agent.agent_id] = agent
self.coordinator.register_agent(agent)
async def start(self):
"""启动系统"""
tasks = []
for agent in self.agents.values():
tasks.append(agent.start())
await asyncio.gather(*tasks)
async def submit_task(self, description: str, priority: int = 1) -> str:
"""提交任务"""
task_id = str(uuid.uuid4())
task = Task(
task_id=task_id,
description=description,
priority=priority
)
await self.coordinator.assign_task(task)
return task_id
def get_task_status(self, task_id: str) -> Optional[TaskStatus]:
"""获取任务状态"""
if task_id in self.coordinator.task_history:
return self.coordinator.task_history[task_id].status
return None
# 使用示例
async def demo_multi_agent_system():
"""多智能体系统演示"""
# 创建系统
mas = MultiAgentSystem()
# 添加专家智能体
finance_expert = ExpertAgent("expert_finance", "finance")
tech_expert = ExpertAgent("expert_tech", "technology")
marketing_expert = ExpertAgent("expert_marketing", "marketing")
# 添加执行者智能体
executor1 = ExecutorAgent("executor_001")
executor2 = ExecutorAgent("executor_002")
# 注册智能体
mas.add_agent(finance_expert)
mas.add_agent(tech_expert)
mas.add_agent(marketing_expert)
mas.add_agent(executor1)
mas.add_agent(executor2)
# 启动系统
await mas.start()
# 提交任务
tasks = [
"分析2025年AI Agent市场投资趋势",
"评估最新大语言模型的技术架构",
"制定AI产品的市场营销策略",
"执行数据清洗和预处理任务",
"部署机器学习模型到生产环境"
]
task_ids = []
for task_desc in tasks:
task_id = await mas.submit_task(task_desc)
task_ids.append(task_id)
print(f"Submitted task: {task_desc} (ID: {task_id})")
# 等待任务完成
await asyncio.sleep(3)
# 检查任务状态
print("\nTask Status:")
for task_id in task_ids:
status = mas.get_task_status(task_id)
print(f"Task {task_id}: {status}")
# 停止系统
for agent in mas.agents.values():
await agent.stop()
# 运行演示
# asyncio.run(demo_multi_agent_system())
5.2 智能体间通信协议
实现多智能体协作的关键是建立统一的通信协议。ANP(Agent Network Protocol)和MCP(Model Context Protocol)正在定义智能体的社交准则:
import json
from typing import Any, Dict, List, Optional
from enum import Enum
import hashlib
from datetime import datetime
import uuid
class MessageType(Enum):
"""消息类型枚举"""
TASK_REQUEST = "task_request"
TASK_ASSIGNMENT = "task_assignment"
TASK_RESULT = "task_result"
TASK_FAILURE = "task_failure"
RESOURCE_REQUEST = "resource_request"
RESOURCE_GRANT = "resource_grant"
HEARTBEAT = "heartbeat"
STATUS_UPDATE = "status_update"
class AgentProtocol:
"""智能体通信协议基类"""
def __init__(self, protocol_version: str = "1.0"):
self.protocol_version = protocol_version
self.message_handlers = {}
def register_handler(self, message_type: MessageType, handler: callable):
"""注册消息处理器"""
self.message_handlers[message_type] = handler
async def handle_message(self, message: Dict[str, Any]) -> Optional[Dict[str, Any]]:
"""处理消息"""
msg_type = MessageType(message.get("type"))
if msg_type in self.message_handlers:
return await self.message_handlersmessage
return None
class ANPProtocol(AgentProtocol):
"""Agent Network Protocol实现"""
def __init__(self):
super().__init__("anp-1.0")
self.peer_registry = {} # 对等节点注册表
self.message_queue = []
async def discover_peers(self, network_address: str):
"""发现网络中的对等节点"""
# 实现网络发现逻辑
pass
async def send_to_peer(self, peer_id: str, message: Dict[str, Any]):
"""发送消息给对等节点"""
# 实现点对点消息发送
pass
async def broadcast(self, message: Dict[str, Any], exclude: List[str] = None):
"""广播消息给所有对等节点"""
# 实现广播逻辑
pass
class MCPProtocol(AgentProtocol):
"""Model Context Protocol实现"""
def __init__(self):
super().__init__("mcp-1.0")
self.tool_registry = {}
self.context_manager = ContextManager()
def register_tool(self, tool_name: str, tool_schema: Dict[str, Any],
handler: callable):
"""注册工具"""
self.tool_registry[tool_name] = {
"schema": tool_schema,
"handler": handler
}
async def execute_tool(self, tool_name: str, parameters: Dict[str, Any],
context: Dict[str, Any] = None) -> Dict[str, Any]:
"""执行工具"""
if tool_name not in self.tool_registry:
return {
"success": False,
"error": f"Tool {tool_name} not found"
}
tool_info = self.tool_registry[tool_name]
# 验证参数
if not self._validate_parameters(parameters, tool_info["schema"]):
return {
"success": False,
"error": "Invalid parameters"
}
# 执行工具
try:
result = await tool_infoparameters, context
return {
"success": True,
"result": result
}
except Exception as e:
return {
"success": False,
"error": str(e)
}
def _validate_parameters(self, parameters: Dict[str, Any],
schema: Dict[str, Any]) -> bool:
"""验证参数是否符合模式"""
# 实现参数验证逻辑
return True
class ContextManager:
"""上下文管理器"""
def __init__(self):
self.contexts = {}
self.context_history = []
def create_context(self, context_id: str, initial_data: Dict[str, Any] = None):
"""创建上下文"""
self.contexts[context_id] = {
"data": initial_data or {},
"created_at": datetime.now(),
"updated_at": datetime.now(),
"access_count": 0
}
def update_context(self, context_id: str, updates: Dict[str, Any]):
"""更新上下文"""
if context_id in self.contexts:
self.contexts[context_id]["data"].update(updates)
self.contexts[context_id]["updated_at"] = datetime.now()
self.contexts[context_id]["access_count"] += 1
def get_context(self, context_id: str) -> Optional[Dict[str, Any]]:
"""获取上下文"""
if context_id in self.contexts:
self.contexts[context_id]["access_count"] += 1
return self.contexts[context_id]["data"]
return None
def merge_contexts(self, source_id: str, target_id: str):
"""合并上下文"""
source_data = self.get_context(source_id)
target_data = self.get_context(target_id)
if source_data and target_data:
merged = {**target_data, **source_data}
self.contexts[target_id]["data"] = merged
self.contexts[target_id]["updated_at"] = datetime.now()
第六章:行业应用案例与实践
6.1 金融行业:智能投顾与风险控制
在金融行业,AI Agent已从简单的客服工具进化为能够独立完成投资分析、风险评估的协作者。以下是一个金融分析Agent的实现示例:
from typing import Dict, List, Any, Optional
from datetime import datetime, timedelta
import pandas as pd
import numpy as np
from dataclasses import dataclass
@dataclass
class FinancialData:
"""金融数据结构"""
symbol: str
date: datetime
open_price: float
high_price: float
low_price: float
close_price: float
volume: int
market_cap: Optional[float] = None
@dataclass
class InvestmentRecommendation:
"""投资建议"""
symbol: str
recommendation: str # BUY, SELL, HOLD
confidence: float
target_price: float
current_price: float
upside_potential: float
risk_level: str # LOW, MEDIUM, HIGH
rationale: str
timestamp: datetime
class FinancialAnalysisAgent:
"""金融分析智能体"""
def __init__(self, agent_id: str):
self.agent_id = agent_id
self.data_source = FinancialDataSource()
self.analysis_engine = AnalysisEngine()
self.risk_assessor = RiskAssessor()
async def analyze_stock(self, symbol: str, period: str = "1y") -> InvestmentRecommendation:
"""分析股票"""
# 获取数据
historical_data = await self.data_source.get_historical_data(symbol, period)
financial_statements = await self.data_source.get_financial_statements(symbol)
market_data = await self.data_source.get_market_data(symbol)
# 技术分析
technical_indicators = self.analysis_engine.calculate_technical_indicators(historical_data)
# 基本面分析
fundamental_score = self.analysis_engine.analyze_fundamentals(financial_statements)
# 风险评估
risk_assessment = self.risk_assessor.assess_risk(symbol, historical_data, market_data)
# 生成建议
recommendation = self._generate_recommendation(
symbol,
technical_indicators,
fundamental_score,
risk_assessment,
market_data.current_price
)
return recommendation
def _generate_recommendation(self, symbol: str, technical_indicators: Dict,
fundamental_score: float, risk_assessment: Dict,
current_price: float) -> InvestmentRecommendation:
"""生成投资建议"""
# 综合评分
technical_score = technical_indicators.get('overall_score', 0.5)
fundamental_weight = 0.6
technical_weight = 0.4
total_score = (fundamental_score * fundamental_weight +
technical_score * technical_weight)
# 确定建议
if total_score >= 0.7:
recommendation = "BUY"
confidence = total_score
target_price = current_price * 1.2 # 假设20%上涨空间
elif total_score >= 0.4:
recommendation = "HOLD"
confidence = total_score
target_price = current_price
else:
recommendation = "SELL"
confidence = 1 - total_score
target_price = current_price * 0.8 # 假设20%下跌空间
upside_potential = (target_price - current_price) / current_price
# 风险等级
risk_score = risk_assessment.get('overall_risk', 0.5)
if risk_score < 0.3:
risk_level = "LOW"
elif risk_score < 0.7:
risk_level = "MEDIUM"
else:
risk_level = "HIGH"
rationale = f"""
综合评分: {total_score:.2f}
基本面得分: {fundamental_score:.2f}
技术面得分: {technical_score:.2f}
风险评估: {risk_score:.2f}
更多推荐




所有评论(0)