构建AI智能体:四十七、Agent2Agent多智能体系统:基础通信与任务协作实现
摘要:Agent2Agent(A2A)是一个促进多智能体间通信与协作的框架,通过标准化协议实现复杂问题求解。智能体具备自主性、社交能力等特征,分为反应型、慎思型等类型。框架提供四种协作模式和多种冲突解决机制,采用消息传递方式进行通信,包含传输层、任务分配器等组件。演示案例展示了任务管理器与工作者智能体之间的任务分配与执行流程,包括问候交互、任务创建、结果反馈等环节。该框架适用于分布式系统管理、多模
一、基本概述
Agent2Agent(A2A)是一个专注于智能体间通信、协作与协调的框架,旨在促进多个智能体(Agent)之间的通信与协作。在人工智能领域,智能体是自主的软件实体,能够感知环境、执行动作、与其他智能体通信,单个智能体的能力有限,而多个智能体通过有效的交互可以解决更复杂的问题。A2A框架提供了标准化的通信协议、消息传递机制和协作模式,使智能体能够像人类团队一样协同工作,解决单个智能体难以处理的复杂问题。
二、智能体的特征和类型
1. 智能体的特征
智能体是A2A框架中的基本单位。每个智能体具有以下特性:
- 自主性:智能体能够在没有直接外部干预的情况下运作,控制自身内部状态和行为。自主性程度可以从完全自主到半自主不等,取决于系统设计需求。
- 社交能力:智能体能够通过某种通信语言与其他智能体(或人类)进行交互。这种交互不仅仅是数据交换,还包括意图传达、协商协作等复杂社交行为。
- 反应性:智能体能够感知所处环境(包括其他智能体),并对环境变化做出及时、适当的响应。这种响应不是简单的刺激-反应,而是基于内部状态的决策。
- 主动性:智能体不仅能够对环境变化做出反应,还能够主动追求目标,表现出目标导向的行为。它们能够主动发起行动,而不仅仅是被动响应。
- 知识库与推理能力:智能体拥有内部知识表示和推理机制,能够基于已有知识进行决策。知识库可以包括领域知识、交互历史、信任模型等。
- 学习与适应能力:高级智能体具备从经验中学习的能力,能够根据历史交互结果调整行为策略,适应环境变化。
2. 智能体的类型
- 反应型智能体:基于条件-动作规则运作,不维护复杂内部状态。它们对环境刺激做出直接反应,决策速度快但灵活性有限。适用于环境相对稳定、决策逻辑简单的场景。
- 慎思型智能体:基于符号人工智能原理,拥有复杂的内部状态模型(信念、愿望、意图,即BDI架构)。它们能够进行规划推理,适合处理复杂、需要深思熟虑的决策任务。
- 混合型智能体:结合反应型和慎思型的优点,在不同情境下采用不同的决策模式。简单情况使用快速反应机制,复杂情况启用深度推理能力。
- 移动智能体:能够在网络中的不同节点间迁移,携带代码和数据在远程主机上执行。适用于分布式信息检索、网络管理等场景
在A2A框架中,智能体通过消息进行通信。每个智能体有一个唯一的标识符(Agent ID),并可以注册多种能力(Capabilities),这些能力描述了智能体能够执行的任务类型。
三. 智能体的协作机制
1. 协作模式
- 主从协调模式:一个主智能体(协调者)负责任务分解、分配和结果整合,多个从智能体(工作者)执行具体子任务。适用于任务结构清晰、需要集中控制的场景。
- 对等协作模式:所有智能体地位平等,通过协商和共识机制协调行动。没有中心控制节点,系统更具鲁棒性但协调复杂度高。
- 市场协作模式:基于经济学原理,智能体通过虚拟货币、竞价等市场机制进行资源分配和任务协调。适合资源稀缺、需要优化分配的场景。
- 团队协作模式:智能体形成具有共同目标的团队,通过共享计划、联合承诺和相互意识实现深度协作。团队成员对团队目标有共同承诺。
2. 协作策略
- 利他策略:智能体总是优先考虑其他智能体或整体系统的利益,即使可能损害自身短期利益。促进系统整体效能但可能被自私智能体利用。
- 自私策略:智能体只考虑自身利益最大化,可能拒绝合作或提供虚假信息。在竞争性环境中常见,但可能降低系统整体效率。
- 互惠策略:基于"以牙还牙"原则,智能体根据历史交互记录决定合作程度。对方过去合作则积极回应,对方背叛则相应报复。
- 信任基础策略:基于信任模型评估合作伙伴的可靠性,根据信任等级调整合作深度。信任模型考虑历史表现、声誉推荐等多维度信息。
3. 冲突解决机制
- 冲突检测
- 资源冲突:多个智能体竞争有限资源
- 目标冲突:智能体目标相互矛盾或竞争
- 计划冲突:行动序列在时间或空间上冲突
- 冲突解决策略
- 协商:各方通过多轮提议-反提议寻求共赢方案
- 调解:第三方中立智能体协助冲突各方达成协议
- 仲裁:权威智能体强制制定解决方案
- 投票:多数决定原则解决分歧
四、智能体的通信协议
1. 消息(Message)
消息是智能体之间通信的基本单元。A2A框架定义了标准的消息格式,包括:
- 消息ID:唯一标识一条消息。
- 发送者ID:发送消息的智能体标识符。
- 接收者ID:接收消息的智能体标识符。
- 消息类型:描述消息的目的,例如任务请求、任务响应、协商等。
- 内容:消息的具体内容,格式由消息类型和具体协议决定。
- 时间戳:消息发送的时间。
- 会话ID:可选字段,用于将多条消息关联到一个会话中。
- 优先级:消息的紧急程度。
2. 传输层(Transport)
传输层负责消息的实际传递。它可以是本地的(在同一进程内)或分布式的(通过网络)。传输层的主要职责是:
- 连接管理:建立和维护智能体之间的连接。
- 消息路由:将消息从发送者传递到正确的接收者。
- 可靠性:确保消息的可靠传递(例如,通过重试机制)。
A2A框架可以支持多种传输实现,例如本地内存传输、基于消息队列(如RabbitMQ)的分布式传输等。
3. 任务分配器
任务分配器负责将任务分配给合适的智能体。它根据智能体的能力和任务的需求进行匹配。任务分配算法可以是简单的轮询,也可以是基于能力匹配的复杂算法。
4. 协商协议
协商协议定义了智能体之间如何通过多轮交互达成一致。常见的协商协议包括合同网协议和拍卖协议。
五. 组织结构与协调机制
1. 智能体组织结构
- 层次结构:明确的上下级关系,决策权集中在上层智能体。协调效率高但单点故障风险大,适合任务结构固定的场景。
- 扁平结构:智能体间地位平等,通过对等协商协调行动。系统鲁棒性强但协调成本高,适合动态变化的环境。
- 混合结构:结合层次和平扁结构的优点,在局部采用层次协调,在全局采用对等协商。平衡效率与灵活性。
- 市场结构:基于供求关系和价格机制进行协调,智能体通过买卖服务实现资源分配。适合开放、动态的环境。
2. 基于角色的协调
2.1 角色定义
角色是一组职责、权限和期望行为的封装。智能体通过扮演不同角色参与协作:
- 角色职责: 该角色需要完成的任务和承担的责任
- 角色权限: 角色在组织中的决策权力和资源访问权限
- 角色关系: 不同角色间的协作关系和交互协议
2.2 动态角色分配
根据任务需求和组织状态动态分配角色,使系统能够适应环境变化。考虑智能体能力、负载状态等因素。
六、智能体的协作流程
1. 整体协作流程图
流程说明:
阶段1:协作准备阶段
- 任务识别与分解:分析任务需求,识别任务边界,将复杂任务分解为可管理的子任务。考虑任务间的依赖关系和执行顺序。
- 智能体发现与能力匹配:在智能体网络中寻找具备所需能力的参与者,评估各智能体的当前负载、历史表现和信誉度。
- 协作模式选择:根据任务特性、参与智能体关系和环境条件选择最合适的协作模式。
阶段2:通信与协商阶段
- 任务请求与分配:明确任务要求、质量标准和验收条件,建立任务分配协议。
- 能力协商与匹配:确认各智能体的具体能力参数,协商服务等级协议(SLA)。
- 资源协商与分配:分配计算、存储、网络等资源,建立资源使用配额和优先级。
- 计划协调与同步:制定详细执行计划,建立里程碑和检查点,同步各参与方的时间表。
阶段3:任务执行与监控阶段
- 并行任务执行:按照预定计划执行任务,处理任务间的依赖和同步。
- 进度监控与报告:实时跟踪执行状态,收集性能指标,生成进度报告。
- 异常检测与处理:监控系统健康状态,检测异常情况,启动应急预案。
- 动态调整与重分配:根据执行情况动态优化资源分配,调整任务优先级。
阶段4:结果整合与评估阶段
- 子结果收集:汇总各智能体的输出结果,验证数据的完整性和一致性。
- 冲突解决与融合:处理结果间的冲突和不一致,采用合适的融合策略。
- 质量评估与验证:评估最终结果的质量,验证是否满足原始需求。
- 经验学习与更新:总结协作经验,更新智能体的知识库和策略。
阶段5:协作终止与总结
- 释放所有分配的资源
- 生成协作总结报告
- 更新智能体信誉和性能记录
- 归档协作过程和结果数据
2. 协作模式选择决策树
详细说明:
决策因素分析:
- 任务复杂度评估:
- 简单任务:目标明确、步骤清晰、结果可预测
- 复杂任务:多目标、不确定性高、需要动态调整
- 执行者数量考虑:
- 单个执行者:任务规模小、专业要求集中
- 多个执行者:需要分工协作、资源共享
- 任务依赖性分析:
- 强依赖:严格的执行顺序、中间结果依赖
- 弱依赖:可并行执行、独立性较强
- 资源竞争程度:
- 高竞争:稀缺资源、需要优化分配
- 低竞争:资源充足、分配简单
协作模式特性:
- 直接分配模式:
- 适用场景:简单任务、单个专业执行者
- 优势:决策简单、执行高效
- 劣势:单点故障风险、缺乏冗余
- 主从协作模式:
- 适用场景:需要集中协调的多参与者任务
- 优势:职责明确、协调高效
- 劣势:中心节点压力大、灵活性较低
- 流水线协作模式:
- 适用场景:强依赖的序列化任务
- 优势:专业化分工、质量控制容易
- 劣势:瓶颈效应、整体效率受限于最慢环节
- 市场竞标模式:
- 适用场景:资源竞争激烈的环境
- 优势:资源优化配置、激励相容
- 劣势:协商成本高、可能产生投机行为
- 对等协作模式:
- 适用场景:平等参与者间的弱依赖协作
- 优势:鲁棒性强、灵活性高
- 劣势:协调复杂、决策效率较低
3. 主从协作模式流程图
流程说明:
任务分解过程:
- 主智能体分析任务的复杂性和结构特征
- 识别子任务间的数据依赖和执行顺序约束
- 选择基于功能、数据或流程的分解策略
- 制定详细的子任务规格说明书
任务分配机制:
- 基于从智能体的能力专长进行任务匹配
- 考虑当前负载均衡和资源可用性
- 建立任务接受确认机制
- 设定任务执行的约束条件和质量标准
并行执行管理:
- 各从智能体独立执行分配的子任务
- 处理任务执行过程中的资源申请和释放
- 监控子任务执行进度和资源消耗
- 管理子任务间的数据交换和同步
结果收集与整合:
- 主智能体按预定协议收集子任务结果
- 验证子结果的完整性和一致性
- 处理可能的结果冲突和不匹配
- 采用合适的算法融合各子结果
- 生成最终的协作输出
反馈与学习:
- 向所有参与者反馈最终结果
- 记录协作过程中的关键指标
- 更新各智能体的性能评价
- 总结经验用于未来协作优化
4. 任务分解与分配流程
流程说明:
任务接收与分析:
- 详细理解任务需求和目标
- 识别任务的技术约束和业务约束
- 评估任务的规模、复杂性和紧急程度
- 确定任务的优先级和质量要求
分解决策过程:
- 基于任务特性决定是否需要分解
- 考虑团队的执行能力和资源状况
- 评估分解带来的管理和协调成本
- 权衡分解粒度与执行效率的平衡
分解策略选择:
- 功能分解:
- 按不同的功能模块划分任务
- 每个模块具有明确的功能边界
- 定义清晰的模块接口和交互协议
- 适合功能独立性强的复杂系统
- 数据分解:
- 将大数据集划分为可并行处理的子集
- 各子任务处理不同的数据分区
- 需要设计有效的结果合并算法
- 适合数据密集型计算任务
- 时间分解:
- 按时间顺序将任务划分为多个阶段
- 每个阶段产生可验证的中间结果
- 建立阶段间的交付和验收机制
- 适合具有明显阶段性特征的项目
子任务规格制定:
- 明确每个子任务的目标和交付物
- 定义输入输出格式和数据标准
- 设定质量验收标准和完成条件
- 建立进度监控和报告机制
智能体能力匹配:
- 评估各智能体的技术能力和专业领域
- 考虑智能体的当前工作负载和可用性
- 分析智能体的历史表现和信誉评级
- 优化整体系统的负载均衡
任务分配与依赖管理:
- 基于能力匹配结果分配子任务
- 建立任务间的依赖关系图
- 识别关键路径和瓶颈环节
- 制定风险缓解和应急计划
5. 智能体能力匹配矩阵
详细说明:
任务需求维度分析:
- 计算能力:
- 处理复杂算法的能力
- 并行计算和分布式处理支持
- 实时计算性能要求
- 计算精度和准确性标准
- 存储能力:
- 数据存储容量需求
- 存储访问速度和IO性能
- 数据持久化和备份要求
- 存储安全性和隐私保护
- 专业知识:
- 领域特定的知识库
- 专业算法和模型掌握
- 行业标准和规范理解
- 问题诊断和解决经验
- 响应速度:
- 任务响应的实时性要求
- 处理延迟的容忍度
- 并发请求处理能力
- 系统吞吐量性能
- 可靠性:
- 系统可用性和稳定性
- 故障恢复和容错能力
- 服务质量保证级别
- 安全性和可信度评级
智能体能力评估:
- 智能体A:
- 优势:强大的计算能力、深厚的专业知识、高可靠性
- 适用场景:复杂计算任务、专业问题解决、关键业务处理
- 限制:可能成本较高、资源占用大
- 智能体B:
- 优势:大容量存储能力、专业领域知识
- 适用场景:数据密集型任务、知识检索和处理
- 限制:计算能力相对有限、响应速度中等
- 智能体C:
- 优势:高性能计算、快速响应能力
- 适用场景:实时计算、高吞吐量处理
- 限制:专业知识相对缺乏、可靠性中等
- 智能体D:
- 优势:均衡的存储和响应能力、高可靠性
- 适用场景:综合型任务、需要稳定服务的场景
- 限制:专业深度可能不足、计算能力有限
匹配策略:
- 基于任务需求的优先级进行加权匹配
- 考虑智能体的当前负载和可用性
- 评估历史合作表现和信誉评级
- 优化整体系统的资源利用效率
七、Agent2Agent 框架通信示例
示例实现一个简单的Agent2Agent通信框架,展示了多智能体系统的基本工作原理。下面我将逐部分分解代码并说明其设计意图。
1. 基础定义
1.1 消息类型枚举
class MessageType(Enum):
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
GREETING = "greeting" # 问候消息
- 定义标准化的消息类型,确保智能体间通信的一致性
- 便于消息路由和处理逻辑的分发
- 支持系统扩展,可以轻松添加新的消息类型
1.2 消息数据结构
@dataclass
class A2AMessage:
message_id: str # 消息唯一ID
sender_id: str # 发送者ID
receiver_id: str # 接收者ID
message_type: MessageType # 消息类型
content: Dict[str, Any] # 消息内容
timestamp: float # 时间戳
- 提供统一的消息格式,包含必要的元数据
- 支持消息追踪和调试(message_id, timestamp)
- 灵活的内容结构,适应不同类型的通信需求
- 便于序列化和网络传输
2. 通信传输层:SimpleTransport类
class SimpleTransport:
def __init__(self):
self.agents: Dict[str, callable] = {} # 智能体注册表
- 模拟网络通信层,解耦智能体间的直接依赖
- 提供消息路由功能,根据接收者ID分发消息
- 支持异步消息传递,模拟真实网络环境
- 作为智能体注册中心,管理参与协作的智能体
2.1 注册智能体:
def register_agent(self, agent_id: str, message_handler: callable):
self.agents[agent_id] = message_handler
- 建立智能体ID到消息处理函数的映射
- 支持动态加入和退出协作系统
2.2 发送消息:
async def send_message(self, message: A2AMessage):
# 检查接收者是否存在
# 模拟网络延迟
# 调用接收者的消息处理函数
- 提供可靠的消息传递机制
- 包含基本的错误检查(接收者存在性验证)
- 模拟真实网络环境(延迟、异步处理)
3. 智能体基类:SimpleAgent类
class SimpleAgent:
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
self.agent_id = agent_id
self.name = name
self.transport = transport
self.conversation_history: List[A2AMessage] = []
- 提供智能体的共同属性和行为
- 封装消息发送和接收的基础功能
- 维护对话历史,支持调试和分析
- 实现模板方法模式,子类可以重写特定行为
3.1 消息发送:
async def send_message(self, receiver_id: str, message_type: MessageType, content: Dict[str, Any]):
# 创建标准化消息
# 记录到对话历史
# 通过传输层发送
- 封装消息创建过程,确保格式一致性
- 自动维护对话历史
- 提供简单的发送接口,隐藏底层复杂性
3.2 消息接收:
async def receive_message(self, message: A2AMessage):
# 记录接收到的消息
# 提供基础处理(子类可扩展)
- 作为消息处理的入口点
- 提供基础的日志记录功能
- 采用模板方法模式,支持子类定制化处理
4. 专用智能体实现:TaskManagerAgent任务管理器
class TaskManagerAgent(SimpleAgent):
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
super().__init__(agent_id, name, transport)
self.pending_tasks: Dict[str, Dict] = {} # 待处理任务
self.completed_tasks: Dict[str, Any] = {} # 已完成任务
- 实现任务协调者的角色
- 负责任务生命周期管理(创建、分配、跟踪)
- 维护任务状态,支持进度监控
- 处理工作者的响应和结果收集
4.1 任务创建与分配:
async def create_task(self, task_description: str, worker_id: str):
# 生成唯一任务ID
# 保存任务信息到待处理列表
# 发送任务请求给指定工作者
- 封装任务分配逻辑
- 自动生成任务标识,便于跟踪
- 维护任务状态,确保不丢失
4.2 消息处理扩展:
async def receive_message(self, message: A2AMessage):
await super().receive_message(message) # 基础处理
if message.message_type == MessageType.TASK_RESPONSE:
await self.handle_task_response(message)
elif message.message_type == MessageType.GREETING:
await self.handle_greeting(message)
- 继承基础消息处理功能
- 根据消息类型分发到专门的处理方法
- 展示多态性的应用
5. WorkerAgent类:工作者智能体
class WorkerAgent(SimpleAgent):
def __init__(self, agent_id: str, name: str, transport: SimpleTransport, skills: List[str]):
super().__init__(agent_id, name, transport)
self.skills = skills
self.tasks_completed = 0
- 实现任务执行者的角色
- 封装特定领域的能力和技能
- 处理任务请求并生成结果
- 维护工作统计和性能指标
5.1 任务处理:
async def handle_task_request(self, message: A2AMessage):
# 解析任务信息
# 模拟任务执行(异步等待)
# 生成任务结果
# 发送任务完成响应
- 展示异步任务执行模式
- 模拟真实的工作负载和处理时间
- 封装结果生成逻辑
5.2 技能匹配处理:
def process_task(self, description: str) -> str:
if "计算" in description or "数学" in description:
return f"计算完成: 42 (使用 {self.skills} 技能)"
# 其他技能分支...
- 根据任务描述匹配相应的处理逻辑
- 展示基于内容的决策机制
- 集成智能体的技能信息到结果中
6. 程序启动运行
6.1 基础演示
async def demo_agent_communication():
# 创建传输层和智能体实例
# 演示问候消息交换
# 演示任务分配和执行流程
# 展示统计信息
- 展示最基本的A2A通信场景
- 验证消息传递的正确性
- 演示任务生命周期的完整流程
- 提供可视化的执行跟踪
6.2 高级演示
async def advanced_demo():
# 创建多个专业智能体
# 并行分配不同类型任务
# 展示跨领域协作
- 展示多智能体协作的复杂性
- 演示基于技能的智能体专业化
- 验证系统的可扩展性
- 展示并行任务处理能力
运行结果:
==================================================
Agent2Agent 通信演示开始
==================================================
传输层: 智能体 manager_001 已注册
智能体初始化: 任务管理器 (ID: manager_001)
传输层: 智能体 worker_001 已注册
智能体初始化: AI工作者 (ID: worker_001)
============================== 场景1: 简单问候 ==============================
传输层: manager_001 -> worker_001 [greeting]
AI工作者 收到来自 manager_001 的消息: greeting
============================== 场景2: 任务分配 ==============================
--- 任务 1 ---
任务管理器 创建任务: 计算复杂的数学公式
传输层: manager_001 -> worker_001 [task_request]
AI工作者 收到来自 manager_001 的消息: task_request
AI工作者 开始处理任务: 计算复杂的数学公式
传输层: worker_001 -> manager_001 [task_response]
任务管理器 收到来自 worker_001 的消息: task_response
任务管理器 收到任务完成通知: task_1759064250
结果: 计算完成: 42 (使用 ['Python编程', '数据分析', '机器学习'] 技能)
AI工作者 完成任务: 计算复杂的数学公式
--- 任务 2 ---
任务管理器 创建任务: 分析销售数据趋势
传输层: manager_001 -> worker_001 [task_request]
AI工作者 收到来自 manager_001 的消息: task_request
AI工作者 开始处理任务: 分析销售数据趋势
传输层: worker_001 -> manager_001 [task_response]
任务管理器 收到来自 worker_001 的消息: task_response
任务管理器 收到任务完成通知: task_1759064253
结果: 分析报告: 数据趋势良好
AI工作者 完成任务: 分析销售数据趋势
--- 任务 3 ---
任务管理器 创建任务: 编写Python数据处理代码
传输层: manager_001 -> worker_001 [task_request]
AI工作者 收到来自 manager_001 的消息: task_request
AI工作者 开始处理任务: 编写Python数据处理代码
传输层: worker_001 -> manager_001 [task_response]
任务管理器 收到来自 worker_001 的消息: task_response
任务管理器 收到任务完成通知: task_1759064256
结果: 代码已编写: def solution(): return 'Hello World'
AI工作者 完成任务: 编写Python数据处理代码
--- 任务 4 ---
任务管理器 创建任务: 生成月度报告摘要
传输层: manager_001 -> worker_001 [task_request]
AI工作者 收到来自 manager_001 的消息: task_request
AI工作者 开始处理任务: 生成月度报告摘要
传输层: worker_001 -> manager_001 [task_response]
任务管理器 收到来自 worker_001 的消息: task_response
任务管理器 收到任务完成通知: task_1759064259
结果: 任务 '生成月度报告摘要' 已处理完成
AI工作者 完成任务: 生成月度报告摘要
============================== 结果统计 ==============================
任务管理器统计:
总消息数: 9
完成任务: 4
待处理任务: 0
工作者统计:
总消息数: 9
完成任务: 4
==================================================
Agent2Agent 通信演示完成!
==================================================
******************** 高级演示 ********************
传输层: 智能体 manager_1 已注册
智能体初始化: 高级管理器 (ID: manager_1)
传输层: 智能体 worker_1 已注册
智能体初始化: 开发专家 (ID: worker_1)
传输层: 智能体 worker_2 已注册
智能体初始化: 数据分析师 (ID: worker_2)
传输层: 智能体 worker_3 已注册
智能体初始化: 技术作家 (ID: worker_3)
高级管理器 创建任务: 开发新的数据处理模块
传输层: manager_1 -> worker_1 [task_request]
高级管理器 创建任务: 分析用户行为数据
传输层: manager_1 -> worker_2 [task_request]
高级管理器 创建任务: 编写项目文档
传输层: manager_1 -> worker_3 [task_request]
高级管理器 创建任务: 优化算法性能
传输层: manager_1 -> worker_1 [task_request]
开发专家 收到来自 manager_1 的消息: task_request
开发专家 开始处理任务: 开发新的数据处理模块
技术作家 收到来自 manager_1 的消息: task_request
传输层: manager_1 -> worker_1 [task_request]
开发专家 收到来自 manager_1 的消息: task_request
开发专家 开始处理任务: 开发新的数据处理模块
技术作家 收到来自 manager_1 的消息: task_request
开发专家 收到来自 manager_1 的消息: task_request
开发专家 开始处理任务: 开发新的数据处理模块
技术作家 收到来自 manager_1 的消息: task_request
技术作家 收到来自 manager_1 的消息: task_request
技术作家 开始处理任务: 编写项目文档
数据分析师 收到来自 manager_1 的消息: task_request
数据分析师 收到来自 manager_1 的消息: task_request
数据分析师 开始处理任务: 分析用户行为数据
数据分析师 开始处理任务: 分析用户行为数据
开发专家 收到来自 manager_1 的消息: task_request
开发专家 开始处理任务: 优化算法性能
传输层: worker_1 -> manager_1 [task_response]
传输层: worker_2 -> manager_1 [task_response]
传输层: worker_3 -> manager_1 [task_response]
传输层: worker_1 -> manager_1 [task_response]
高级管理器 收到来自 worker_2 的消息: task_response
高级管理器 收到任务完成通知: task_1759064261
结果: 分析报告: 数据趋势良好
数据分析师 完成任务: 分析用户行为数据
高级管理器 收到来自 worker_3 的消息: task_response
技术作家 完成任务: 编写项目文档
高级管理器 收到来自 worker_1 的消息: task_response
开发专家 完成任务: 开发新的数据处理模块
高级管理器 收到来自 worker_1 的消息: task_response
开发专家 完成任务: 优化算法性能
高级演示完成!
管理器处理了 8 条消息
完成了 1 个跨领域任务
八、应用场景
- 复杂问题求解:多个专业智能体协作解决单一智能体难以处理的复杂问题
- 分布式系统管理:智能体协同管理分布式资源和服务
- 多模态AI系统:不同模态的AI智能体(文本、图像、语音)协同工作
- 自动化工作流:智能体协作完成端到端的自动化流程
九、总结
Agent2Agent框架为构建智能体间通信与协作系统提供了坚实的基础。通过标准化的消息格式、灵活的传输层和丰富的协作模式,开发者可以快速构建能够解决复杂问题的多智能体系统。随着人工智能技术的不断发展,A2A框架将在构建更智能、更自治的AI系统中发挥重要作用。
该框架具有良好的扩展性,可以根据具体应用场景添加新的通信协议、协作算法和容错机制,满足不同领域的需求。
附录:示例完整代码
import asyncio
import time
import uuid
from dataclasses import dataclass
from typing import Dict, Any, List, Optional
from enum import Enum
# ==================== 基础定义 ====================
class MessageType(Enum):
"""消息类型枚举"""
TASK_REQUEST = "task_request" # 任务请求
TASK_RESPONSE = "task_response" # 任务响应
GREETING = "greeting" # 问候消息
@dataclass
class A2AMessage:
"""
A2A消息类
定义智能体间通信的标准消息格式
"""
message_id: str # 消息唯一ID
sender_id: str # 发送者ID
receiver_id: str # 接收者ID
message_type: MessageType # 消息类型
content: Dict[str, Any] # 消息内容
timestamp: float # 时间戳
def to_dict(self) -> Dict[str, Any]:
"""将消息转换为字典格式,便于传输"""
return {
"message_id": self.message_id,
"sender_id": self.sender_id,
"receiver_id": self.receiver_id,
"message_type": self.message_type.value,
"content": self.content,
"timestamp": self.timestamp
}
class SimpleTransport:
"""
简单传输层
负责智能体间的消息传递(模拟网络通信)
"""
def __init__(self):
# 存储注册的智能体和它们的消息处理函数
self.agents: Dict[str, callable] = {}
def register_agent(self, agent_id: str, message_handler: callable):
"""
注册智能体到传输层
Args:
agent_id: 智能体ID
message_handler: 消息处理函数
"""
self.agents[agent_id] = message_handler
print(f" 传输层: 智能体 {agent_id} 已注册")
async def send_message(self, message: A2AMessage):
"""
发送消息到目标智能体
Args:
message: 要发送的消息
"""
print(f" 传输层: {message.sender_id} -> {message.receiver_id} "
f"[{message.message_type.value}]")
# 检查接收者是否存在
if message.receiver_id not in self.agents:
print(f" 错误: 智能体 {message.receiver_id} 未注册")
return
# 获取接收者的消息处理函数
receiver_handler = self.agents[message.receiver_id]
# 模拟网络延迟
await asyncio.sleep(0.1)
# 将消息传递给接收者
await receiver_handler(message)
# ==================== 智能体定义 ====================
class SimpleAgent:
"""
简单智能体基类
所有智能体的共同属性和行为
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
"""
初始化智能体
Args:
agent_id: 智能体唯一标识
name: 智能体名称
transport: 传输层实例
"""
self.agent_id = agent_id
self.name = name
self.transport = transport
self.conversation_history: List[A2AMessage] = []
# 向传输层注册自己
self.transport.register_agent(self.agent_id, self.receive_message)
print(f" 智能体初始化: {self.name} (ID: {self.agent_id})")
async def send_message(self, receiver_id: str, message_type: MessageType, content: Dict[str, Any]):
"""
发送消息给其他智能体
Args:
receiver_id: 接收者ID
message_type: 消息类型
content: 消息内容
"""
# 创建消息对象
message = A2AMessage(
message_id=str(uuid.uuid4()), # 生成唯一ID
sender_id=self.agent_id,
receiver_id=receiver_id,
message_type=message_type,
content=content,
timestamp=time.time()
)
# 保存到对话历史
self.conversation_history.append(message)
# 通过传输层发送消息
await self.transport.send_message(message)
async def receive_message(self, message: A2AMessage):
"""
接收消息的通用处理(子类可以重写)
Args:
message: 接收到的消息
"""
print(f" {self.name} 收到来自 {message.sender_id} 的消息: {message.message_type.value}")
# 保存到对话历史
self.conversation_history.append(message)
def get_conversation_summary(self):
"""获取对话摘要"""
return f"{self.name} 进行了 {len(self.conversation_history)} 次消息交换"
class TaskManagerAgent(SimpleAgent):
"""
任务管理器智能体
负责创建和分配任务
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport):
super().__init__(agent_id, name, transport)
self.pending_tasks: Dict[str, Dict] = {} # 待处理任务
self.completed_tasks: Dict[str, Any] = {} # 已完成任务
async def create_task(self, task_description: str, worker_id: str):
"""
创建新任务并分配给工作者
Args:
task_description: 任务描述
worker_id: 工作者智能体ID
"""
task_id = f"task_{int(time.time())}"
print(f" {self.name} 创建任务: {task_description}")
# 保存任务信息
self.pending_tasks[task_id] = {
"description": task_description,
"worker": worker_id,
"status": "assigned",
"created_time": time.time()
}
# 发送任务请求
await self.send_message(
receiver_id=worker_id,
message_type=MessageType.TASK_REQUEST,
content={
"task_id": task_id,
"description": task_description,
"priority": "normal"
}
)
async def receive_message(self, message: A2AMessage):
"""
处理接收到的消息(重写父类方法)
"""
await super().receive_message(message) # 调用父类基础处理
if message.message_type == MessageType.TASK_RESPONSE:
# 处理任务响应
await self.handle_task_response(message)
elif message.message_type == MessageType.GREETING:
# 处理问候消息
await self.handle_greeting(message)
async def handle_task_response(self, message: A2AMessage):
"""
处理任务完成响应
Args:
message: 任务响应消息
"""
task_id = message.content.get("task_id")
result = message.content.get("result")
status = message.content.get("status")
if task_id in self.pending_tasks:
if status == "completed":
# 任务完成
self.completed_tasks[task_id] = {
**self.pending_tasks[task_id],
"result": result,
"completion_time": time.time()
}
del self.pending_tasks[task_id]
print(f" {self.name} 收到任务完成通知: {task_id}")
print(f" 结果: {result}")
else:
print(f" 任务 {task_id} 失败: {result}")
async def handle_greeting(self, message: A2AMessage):
"""
处理问候消息
Args:
message: 问候消息
"""
greeting_text = message.content.get("text", "")
print(f" {self.name} 收到问候: {greeting_text}")
class WorkerAgent(SimpleAgent):
"""
工作者智能体
负责执行具体任务
"""
def __init__(self, agent_id: str, name: str, transport: SimpleTransport, skills: List[str]):
"""
初始化工作者智能体
Args:
skills: 技能列表
"""
super().__init__(agent_id, name, transport)
self.skills = skills
self.tasks_completed = 0
async def receive_message(self, message: A2AMessage):
"""
处理接收到的消息
"""
await super().receive_message(message)
if message.message_type == MessageType.TASK_REQUEST:
# 处理任务请求
await self.handle_task_request(message)
async def handle_task_request(self, message: A2AMessage):
"""
处理任务请求并执行任务
Args:
message: 任务请求消息
"""
task_id = message.content.get("task_id")
description = message.content.get("description")
print(f" {self.name} 开始处理任务: {description}")
# 模拟任务执行时间
await asyncio.sleep(1)
# 生成任务结果(根据任务描述和技能)
result = self.process_task(description)
# 发送任务完成响应
await self.send_message(
receiver_id=message.sender_id,
message_type=MessageType.TASK_RESPONSE,
content={
"task_id": task_id,
"status": "completed",
"result": result,
"worker_skills_used": self.skills
}
)
self.tasks_completed += 1
print(f" {self.name} 完成任务: {description}")
def process_task(self, description: str) -> str:
"""
处理具体任务(根据描述生成结果)
Args:
description: 任务描述
Returns:
任务结果
"""
if "计算" in description or "数学" in description:
return f"计算完成: 42 (使用 {self.skills} 技能)"
elif "分析" in description:
return f"分析报告: 数据趋势良好"
elif "编写" in description or "代码" in description:
return f"代码已编写: def solution(): return 'Hello World'"
else:
return f"任务 '{description}' 已处理完成"
# ==================== 演示程序 ====================
async def demo_agent_communication():
"""
演示智能体间通信的主函数
"""
print("=" * 50)
print(" Agent2Agent 通信演示开始")
print("=" * 50)
# 创建传输层实例
transport = SimpleTransport()
# 创建智能体实例
task_manager = TaskManagerAgent(
agent_id="manager_001",
name="任务管理器",
transport=transport
)
worker_agent = WorkerAgent(
agent_id="worker_001",
name="AI工作者",
transport=transport,
skills=["Python编程", "数据分析", "机器学习"]
)
# 等待一下让注册完成
await asyncio.sleep(0.1)
print("\n" + "="*30 + " 场景1: 简单问候 " + "="*30)
# 场景1: 简单问候
await task_manager.send_message(
receiver_id=worker_agent.agent_id,
message_type=MessageType.GREETING,
content={"text": "你好,我是任务管理器!"}
)
await asyncio.sleep(0.5) # 等待消息处理
print("\n" + "="*30 + " 场景2: 任务分配 " + "="*30)
# 场景2: 任务分配和执行
tasks = [
"计算复杂的数学公式",
"分析销售数据趋势",
"编写Python数据处理代码",
"生成月度报告摘要"
]
for i, task_desc in enumerate(tasks, 1):
print(f"\n--- 任务 {i} ---")
await task_manager.create_task(task_desc, worker_agent.agent_id)
await asyncio.sleep(1.5) # 等待任务完成
print("\n" + "="*30 + " 结果统计 " + "="*30)
# 显示结果统计
print(f" 任务管理器统计:")
print(f" 总消息数: {len(task_manager.conversation_history)}")
print(f" 完成任务: {len(task_manager.completed_tasks)}")
print(f" 待处理任务: {len(task_manager.pending_tasks)}")
print(f"\n 工作者统计:")
print(f" 总消息数: {len(worker_agent.conversation_history)}")
print(f" 完成任务: {worker_agent.tasks_completed}")
print("\n" + "="*50)
print(" Agent2Agent 通信演示完成!")
print("=" * 50)
async def advanced_demo():
"""
高级演示:多个智能体协作
"""
print("\n" + "*"*20 + " 高级演示 " + "*"*20)
transport = SimpleTransport()
# 创建多个智能体
manager = TaskManagerAgent("manager_1", "高级管理器", transport)
# 创建具有不同技能的多个工作者
developer = WorkerAgent("worker_1", "开发专家", transport,
["Python", "Java", "系统设计"])
analyst = WorkerAgent("worker_2", "数据分析师", transport,
["统计分析", "数据可视化", "机器学习"])
writer = WorkerAgent("worker_3", "技术作家", transport,
["文档编写", "技术说明", "API文档"])
await asyncio.sleep(0.1)
# 并行分配任务给不同的工作者
tasks = [
("开发新的数据处理模块", developer.agent_id),
("分析用户行为数据", analyst.agent_id),
("编写项目文档", writer.agent_id),
("优化算法性能", developer.agent_id),
]
# 同时启动所有任务
task_coroutines = [
manager.create_task(desc, worker_id)
for desc, worker_id in tasks
]
await asyncio.gather(*task_coroutines)
# 等待所有任务完成
await asyncio.sleep(3)
print(f"\n 高级演示完成!")
print(f" 管理器处理了 {len(manager.conversation_history)} 条消息")
print(f" 完成了 {len(manager.completed_tasks)} 个跨领域任务")
if __name__ == "__main__":
# 运行基础演示
asyncio.run(demo_agent_communication())
# 运行高级演示
asyncio.run(advanced_demo())
更多推荐
所有评论(0)