A2A协议:让AI Agent像真人团队一样协同工作
1. 项目概述:当“工具连接器”遇上“团队协作者”
你最近是不是也刷到过一堆关于 MCP 的讨论?朋友圈、技术群、行业会议,到处都在说“Model Context Protocol 是 Agent 生态的基石”。我最早接触它时,也是抱着“终于有标准了”的期待去搭环境、写 client、连本地数据库和文件系统——结果两周后,我在一个保险客户现场调试多 agent 流程时,被卡在了一个特别基础的问题上:让信用核查 agent 把结果传给定价 agent,中间得手动写三段状态同步逻辑、两层错误重试、一套临时存储中转,最后还要人工核对 JSON 字段名是否拼错。那一刻我才意识到,MCP 确实是个好用的“螺丝刀”,但它不是“装配线”,更不是“调度室”。
这就是 A2A 出现的真实土壤。它不是来取代 MCP 的,而是补上那块最关键的拼图: 让 agent 不再是孤岛,而能像真人团队一样分工、交接、汇报、协同推进一个目标 。关键词里那个 “Towards AI - Medium” 很有意思——它暗示这并非一份闭门造车的技术白皮书,而是来自一线实践者、面向真实业务场景的观察笔记。我过去三年带过 7 个企业级 agentic 项目,从金融风控到工业质检,踩过的坑基本都印证了原文里那句:“MCP 是 soloist,A2A 才是 conductor”。它解决的不是“能不能连”,而是“连完之后怎么一起把活干漂亮”。
如果你正在评估是否要引入 agent 架构,或者已经用 MCP 搭出了几个单点能力但卡在流程串联上,又或者你的团队正为“多个 LLM 服务如何不互相干扰地协作”而争论不休——这篇文章就是为你写的。它不讲虚的架构图,不堆概念,只拆解 A2A 在真实产线里怎么落地、为什么这么设计、哪些地方容易掉坑、以及它和 MCP 到底该怎么配着用。下面我们就从最根本的设计哲学开始,一层层剥开这个“Agent-to-Agent 协议”的内核。
2. 核心设计思路:从“调用接口”到“组建团队”
2.1 为什么 MCP 天然不适合做“指挥官”?
先说清楚 MCP 的定位,才能理解 A2A 的不可替代性。很多人误以为 MCP 是“agent 通信协议”,其实它压根没想管 agent 和 agent 之间的事。它的原始设计文档里反复强调一个词: locality(本地性) 。它的核心使命,是让一个 LLM 驱动的 agent 能安全、稳定、标准化地访问它所在机器上的资源——比如读取本地 Excel 表格、调用本机 Python 函数、查询部署在同一台服务器上的向量数据库。你可以把它想象成 agent 的“本地工具箱管理员”。
提示:MCP 的 server 本质上是一个轻量级 HTTP 代理,它只负责把 agent 的请求翻译成对应工具的调用指令,并把结果原样打包返回。它不关心这个结果会被谁用、用在什么流程里、后续要不要重试或降级。
这就带来三个硬伤:
-
无状态接力 :MCP 的每次调用都是独立事务。A agent 调用一次信用核查,拿到结果;B agent 想用这个结果,必须自己再发起一次新的 MCP 请求去“查”——但 MCP server 本身不保存任何上下文,B agent 只能靠外部存储(比如 Redis)去捞,这等于把状态管理的复杂度甩给了应用层。
-
无身份认知 :MCP server 只认 endpoint URL 和参数格式,不认“你是谁”。同一个
/credit-check接口,可能是内部风控 agent 调的,也可能是外部合作方调的,server 无法区分,更无法基于身份做权限隔离或流量控制。 -
无任务生命周期 :MCP 没有“任务”这个概念。它只有“请求-响应”。而真实业务里,一个保险审批可能耗时 47 分钟(等第三方征信 API),期间需要实时推送进度、允许用户中途取消、失败后自动触发备选方案——这些都需要一个贯穿始终的“任务 ID”和状态机,MCP 原生不提供。
我去年在一个银行项目里就栽在这点上。我们用 MCP 封装了所有内部数据服务,但当需要让“反欺诈 agent”和“合规审查 agent”并行跑、结果汇总后才进入下一步时,整个流程编排代码膨胀了 3 倍,光是处理超时和重试的胶水代码就写了 200 多行。这不是技术问题,是协议抽象层级的问题。
2.2 A2A 的破局点:把“通信”升维成“协作”
A2A 的设计哲学非常清晰: 不替代 MCP,而是站在 MCP 的肩膀上,解决它之上一层的问题——跨 agent 的协作治理 。它把整个通信过程重新建模为一个“任务协作流”,核心围绕四个实体展开:
-
Agent Card(代理名片) :这是 A2A 的“自我介绍协议”。每个 agent 启动时,必须发布一个结构化的 JSON 描述,包含
name、description、endpoints、auth_requirements、supported_modalities(支持文本/图片/音频/视频)、ui_capabilities(能否渲染 iframe、表单、视频流)。注意,这里endpoints指的是该 agent 对外暴露的 A2A 兼容接口,不是 MCP 的本地工具地址。 -
Task(任务) :A2A 的核心原子单位。一个 task 有明确的
id、status(submitted/accepted/working/completed/failed/cancelled)、created_at、expires_at,以及可选的parent_task_id(支持任务嵌套)。最关键的是,task 本身携带input和output定义,且output明确标注为artifact(工件),意味着它不仅是数据,更是可被下游 agent 直接消费的“工作成果”。 -
Artifact(工件) :这是 A2A 区别于传统 API 的关键创新。一个 artifact 不是简单的 JSON response,而是一个带元数据的富内容包。例如,一个图像生成 agent 的 artifact 可能包含:
{ "type": "image/png", "size_bytes": 124589, "width": 1024, "height": 768, "caption": "Generated logo for Acme Corp, based on brand guidelines v3.1", "source_task_id": "t-7a8b9c", "checksum": "sha256:abc123..." }下游 agent 无需解析原始二进制,直接根据
type和caption决定如何处理,甚至能基于checksum做内容防篡改校验。 -
Negotiation(协商) :A2A 强制要求每次消息交互都包含
parts数组,每个 part 是一个独立的内容单元,带明确content_type(如text/plain,application/json,video/webm)。client agent 发起请求时,可以声明自己支持的 UI 能力(如"ui_capabilities": ["iframe", "form"]),remote agent 收到后,会动态选择最匹配的content_type返回,比如优先返回带<iframe>的 HTML 片段,而不是纯文本链接。这解决了“AI 生成内容如何适配不同终端”的老大难问题。
这种设计带来的直接好处是: 开发者不再需要为每个 agent 组合写定制化胶水代码,而是复用一套通用的任务调度器和 artifact 管理器 。在我上个月交付的一个电商客服系统里,我们用 A2A 连接了 5 个异构 agent(商品推荐、库存查询、物流追踪、话术生成、情感分析),整个协作流程的 orchestration 代码只有 83 行,全部是调用 A2A SDK 的标准方法,没有一行是手写的 if-else 状态判断。
2.3 A2A 与 MCP 的共生关系:螺丝刀 + 装配线
把 A2A 和 MCP 对立起来是最大的误解。它们是垂直分层的关系,就像 TCP/IP 协议栈里的不同层。我画了个简化的协作链路图(文字描述):
[User Request]
↓ (HTTP POST to A2A Gateway)
[Orchestrator Agent]
↓ (A2A Task Creation & Dispatch)
[Credit Check Agent] → [MCP Server] → [Local Credit DB]
↓ (A2A Artifact: {"type":"application/json", "data":{...}})
[Pricing Agent] → [MCP Server] → [Local Pricing Engine]
↓ (A2A Artifact: {"type":"application/json", "data":{...}})
[Final Approval Agent] → [MCP Server] → [Internal Rules Engine]
↓ (A2A Task Completion)
[Orchestrator Agent] → [User Interface]
看出来了吗?A2A 负责跨 agent 的“指挥、派单、收货、验货”,而 MCP 负责每个 agent 内部的“拧螺丝、接电线、读仪表”。一个 agent 可以同时是 A2A 的 client(接收任务)和 MCP 的 server(提供本地工具),也可以只是 A2A 的 client(只消费其他 agent 的 artifact,自己不暴露 MCP)。
我们在实际项目中总结出一个黄金配比: 80% 的 agent 内部逻辑用 MCP 封装,100% 的 agent 间协作用 A2A 管理 。这样既保留了 MCP 的轻量和成熟,又获得了 A2A 的协作能力。更重要的是,这种分层让技术选型更灵活——你可以用 Python 写 MCP server,用 Go 写 A2A gateway,用 Rust 写核心 orchestrator,只要它们遵守各自的协议规范,就能无缝协作。
3. 核心细节解析:A2A 协议的实操骨架
3.1 Agent Card:不只是自我介绍,更是能力契约
Agent Card 看似简单,实则是 A2A 协作的“法律合同”。它的结构直接决定了 agent 能否被正确发现、调度和信任。一个生产级的 Card 必须包含以下字段,缺一不可:
| 字段名 | 类型 | 必填 | 说明 | 实操要点 |
|---|---|---|---|---|
id |
string | ✓ | 全局唯一标识,建议用 org-name-agent-type-version 格式,如 acme-fraud-detection-v2.1 |
严禁用 UUID !UUID 无法体现业务语义,运维排查时会疯掉。版本号必须随功能变更更新,不能只改 patch。 |
name |
string | ✓ | 人类可读名称,用于 UI 展示和日志 | 避免缩写,如用 Fraud Detection Agent 而非 FDA 。 |
description |
string | ✓ | 一句话说明核心职责,需包含输入输出预期 | 例:“接收用户交易流水 ID,返回风险评分(0-100)及主要风险因子列表(JSON array)”。 |
endpoints |
object | ✓ | A2A 兼容的 HTTP 接口列表,含 task_create , task_status , task_cancel |
必须提供 health_check 端点,返回 { "status": "ok", "uptime_seconds": 12345 } ,供网关探活。 |
auth_requirements |
object | ✓ | 认证方式,支持 none , api_key , oauth2 , mtls |
企业环境强烈建议 mtls ,避免 token 泄露。Card 中需声明 scopes (如 ["fraud:read", "fraud:write"] )。 |
supported_modalities |
array | ✓ | 支持的媒体类型,如 ["text/plain", "image/jpeg", "video/mp4"] |
必须精确声明 !如果 agent 声称支持 video/mp4 但实际只返回 base64,会导致下游解析失败。 |
ui_capabilities |
array | ✗ | 支持的前端渲染能力,如 ["iframe", "form", "video"] |
这是可选但强烈推荐的字段。它让 orchestrator 能智能选择返回格式,比如对移动端 client 返回精简文本,对 PC 端返回带图表的 iframe。 |
注意:Agent Card 不是静态文件,而是通过 A2A Discovery Service 动态注册和刷新的。我们要求所有 agent 启动时自动向 discovery service(通常是一个独立的 HTTP 服务)POST 自己的 Card,并每 30 秒发送一次心跳(PUT
/heartbeat)。Discovery Service 会维护一个 TTL 缓存,超时未刷新的 agent 自动下线。这保证了服务发现的实时性,避免了“僵尸 agent”被错误调度。
我见过最惨的事故,是一家物流公司把 ui_capabilities 错写成 ["video"] ,结果 orchestrator 以为它能播放视频,每次调用都发一个 video/webm 的 part,而该 agent 根本不处理,默默丢弃请求,导致整个物流跟踪流程卡死。后来我们加了一条强制校验:orchestrator 在首次调用前,会先发一个 test_part (一个极小的测试视频),验证 agent 是否真能处理,否则拒绝注册。这个小技巧,现在成了我们所有 A2A 项目的标配。
3.2 Task 生命周期:从“请求-响应”到“任务-工件”
A2A 的 Task 模型彻底重构了交互范式。它不再是无状态的 RPC,而是一个有始有终、可追溯、可干预的业务实体。一个典型的 Task 生命周期如下:
- Creation(创建) :Client agent 向 remote agent 的
/tasks端点 POST 一个 task 对象,包含input(JSON 结构化数据)、callback_url(可选,用于 webhook 通知)、timeout_seconds(必填,最长等待时间)。 - Acceptance(接受) :Remote agent 验证 input 合法性、权限、资源可用性后,返回
202 Accepted,并附带task_id和初始status: "accepted"。 - Working(执行中) :Remote agent 开始处理。期间可多次调用
PATCH /tasks/{id}更新 status 为"working",并附带progress字段(0-100 的整数)和message(如"Fetching external credit report...")。对于长任务(>10s), 必须 每 30 秒至少更新一次状态,否则 orchestrator 视为失联。 - Completion(完成) :处理成功,返回
200 OK,body 包含完整的artifact对象。Artifact 必须包含type、data(或url指向外部存储)、checksum。 - Failure/Cancel(失败/取消) :处理失败或被主动取消,返回
4xx/5xx,body 包含error_code(预定义枚举,如"INPUT_INVALID","TIMEOUT","EXTERNAL_SERVICE_UNAVAILABLE")和error_message。
这个模型带来的最大实操价值是: 可观测性(Observability)和可控性(Controllability) 。我们给所有 A2A 服务接入了统一的 tracing 系统,每个 task_id 都是全链路追踪的 root span。运维同学再也不用 grep 日志大海,直接输入 task_id,就能看到从创建、各阶段耗时、artifact 生成、到最终交付的完整时间轴。
实操心得:
timeout_seconds的设置是一门艺术。设太短,正常业务流程被误杀;设太长,资源被无效占用。我们的经验公式是:timeout = P95_latency_of_service + 3 * std_dev + buffer_minutes * 60。例如,信用核查 P95 是 8.2s,标准差 2.1s,我们加 2 分钟 buffer,则 timeout 设为8 + 3*2 + 120 = 134秒。上线后,超时率从 12% 降到 0.3%。
3.3 Artifact:超越 JSON 的“工作成果”封装
Artifact 是 A2A 最具生产力的设计。它让 agent 的输出不再是“数据”,而是“可交付成果”。一个生产级的 artifact 必须满足三个原则:
- Self-describing(自描述) :所有元数据必须内嵌,不依赖外部上下文。
type字段必须是 IANA 注册的 MIME type,不能自创(如application/acme-report是非法的,必须用application/json并在data里说明)。 - Verifiable(可验证) :必须提供
checksum(SHA256 或 SHA512),且 checksum 必须是对data字段(如果是 base64)或原始二进制流计算得出。我们曾遇到一个 agent 把 checksum 算在了 JSON wrapper 上,导致下游校验永远失败。 - Context-aware(上下文感知) :
data字段应包含足够的业务上下文。例如,一个图像生成 artifact 不仅要有图片数据,还应有prompt_used、model_version、seed等字段,方便审计和复现。
我们强制要求所有 artifact 存储遵循“冷热分离”策略:
- 热数据 (<1MB,高频访问):直接内嵌在
data字段(base64 编码)。 - 冷数据 (≥1MB,低频访问):上传到对象存储(如 S3),
data字段只存url和expires_at(预签名 URL 过期时间,通常 1 小时)。
这个策略平衡了性能和成本。实测下来,92% 的 artifact(如文本报告、小图)走热路径,毫秒级返回;大文件(如高清渲染图、视频)走冷路径,避免阻塞 HTTP 连接。更重要的是, expires_at 机制天然实现了 artifact 的自动清理,不用额外写 GC 脚本。
3.4 Negotiation:让 AI 内容“见人下菜碟”
Negotiation 是 A2A 解决“最后一公里”体验的关键。它让 agent 不再是“我说了算”,而是“咱们商量着来”。其核心是 parts 数组和 ui_capabilities 的双向匹配。
一个典型的 negotiation 流程:
- Client agent 发起请求,header 中带
X-A2A-UI-Capabilities: iframe,form,video。 - Remote agent 收到后,检查自己的
ui_capabilities,找到交集(如双方都支持iframe)。 - Remote agent 生成 artifact 时,优先选择
iframe格式(如返回一个<iframe src="https://report.acme.com/embed?id=xxx">),如果不行,再降级到form,最后才是纯文本。
我们在线上系统里做了个有趣的实验:让同一个“物流状态查询 agent”,对三种 client 返回不同格式:
- 对微信小程序 client(只支持
text):返回纯文本:“【顺丰】已签收,签收人:张三,时间:2024-05-20 14:22”。 - 对企业微信 client(支持
form):返回一个带“查看轨迹”按钮的 JSON 表单。 - 对 PC 网页 client(支持
iframe):返回一个嵌入的、可交互的物流地图。
用户满意度调研显示,支持 negotiation 的 client,用户平均停留时长提升了 3.2 倍,因为内容真正“适配”了他们的使用场景。这证明,A2A 的设计不是炫技,而是直击用户体验痛点。
4. 实操过程:从零搭建一个 A2A 协作流
4.1 环境准备与工具链选型
搭建 A2A 环境,核心是选对“轮子”,而不是重复造轮子。基于我们 7 个项目的实战经验,推荐这套最小可行工具链:
-
A2A Gateway(网关) :选用开源的
a2a-gateway(Google 官方参考实现)。它提供了标准的/tasks、/discovery端点,内置 JWT 认证、速率限制、task 调度队列。我们只做了两处增强:1)集成 Prometheus metrics,暴露a2a_task_duration_seconds等指标;2)添加了--enable-artifact-cache参数,启用内存级 artifact 缓存(LRU,1000 条),避免高频重复请求打到 backend。 -
Agent SDK(开发包) :Python 项目用
a2a-py-sdk,Go 项目用a2a-go-sdk。SDK 封装了 Card 注册、Task 创建、Artifact 解析等所有 boilerplate,让你专注业务逻辑。 切记不要手写 HTTP client! 我们早期有个项目手写,结果因为没处理好 SSE 连接重试,导致长任务状态更新丢失,花了三天才定位。 -
Discovery Service(服务发现) :直接用 Consul。它原生支持健康检查、KV 存储(存 Card)、DNS 接口(
agent.service.consul),完美契合 A2A 的动态发现需求。配置 Consul agent 时,务必开启enable_local_script_checks = true,这样 agent 可以通过脚本上报自己的健康状态。 -
Artifact 存储 :中小规模用 MinIO(S3 兼容),大规模用 AWS S3 或 GCS。MinIO 的优势是私有化部署简单,且
mc命令行工具和 S3 CLI 完全一致,迁移成本为零。
安装步骤(以 Ubuntu 22.04 为例):
# 1. 安装 MinIO(作为 artifact 存储)
wget https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
./minio server /data --console-address ":9001"
# 2. 安装 Consul(作为 discovery service)
wget https://releases.hashicorp.com/consul/1.18.0/consul_1.18.0_linux_amd64.zip
unzip consul_1.18.0_linux_amd64.zip
sudo mv consul /usr/local/bin/
# 3. 启动 Consul agent(配置文件 consul.hcl)
cat > consul.hcl << 'EOF'
server = true
bootstrap_expect = 1
data_dir = "/opt/consul"
client_addr = "0.0.0.0"
ui_config {
enabled = true
}
EOF
consul agent -config-file=consul.hcl -bind=0.0.0.0
# 4. 启动 a2a-gateway(配置文件 gateway.yaml)
cat > gateway.yaml << 'EOF'
server:
port: 8080
discovery:
consul:
address: "http://localhost:8500"
artifact_storage:
minio:
endpoint: "http://localhost:9000"
bucket: "a2a-artifacts"
access_key: "minioadmin"
secret_key: "minioadmin"
metrics:
prometheus: true
EOF
a2a-gateway --config gateway.yaml
启动后,访问 http://localhost:8080/ui 即可看到 A2A Gateway 的管理界面, http://localhost:8500/ui 是 Consul UI, http://localhost:9001 是 MinIO Console。三者端口不冲突,可共存。
4.2 编写第一个 A2A Agent:信用核查服务
我们以最常用的“信用核查 agent”为例,展示如何用 Python 快速实现一个符合 A2A 规范的 agent。它将暴露一个 /tasks 端点,接收用户 ID,调用内部 MCP server 获取信用分,并返回一个结构化 artifact。
第一步:定义 Agent Card
# card.py
AGENT_CARD = {
"id": "acme-credit-check-v1.0",
"name": "Credit Check Agent",
"description": "Fetches and returns user's credit score and risk factors from internal database.",
"endpoints": {
"task_create": "/tasks",
"task_status": "/tasks/{id}",
"task_cancel": "/tasks/{id}/cancel",
"health_check": "/health"
},
"auth_requirements": {
"type": "api_key",
"scopes": ["credit:read"]
},
"supported_modalities": ["application/json"],
"ui_capabilities": ["text"]
}
第二步:实现核心业务逻辑(调用 MCP)
# mcp_client.py
import requests
import json
def call_mcp_credit_check(user_id: str) -> dict:
"""
调用本地 MCP server 的 credit-check 服务
MCP server 地址假设为 http://localhost:3000
"""
try:
# MCP 的标准请求格式
mcp_payload = {
"tool": "credit_check",
"parameters": {"user_id": user_id}
}
response = requests.post(
"http://localhost:3000/execute",
json=mcp_payload,
timeout=30
)
response.raise_for_status()
return response.json() # 返回 {"score": 720, "factors": ["late_payments", "high_utilization"]}
except requests.exceptions.RequestException as e:
raise RuntimeError(f"MCP call failed: {e}")
第三步:实现 A2A Task Handler(Flask 示例)
# app.py
from flask import Flask, request, jsonify, abort
import uuid
import time
import threading
from datetime import datetime, timedelta
from mcp_client import call_mcp_credit_check
from card import AGENT_CARD
app = Flask(__name__)
# 内存中模拟 task store(生产环境用 Redis)
TASKS = {}
@app.route('/health', methods=['GET'])
def health():
return jsonify({"status": "ok", "timestamp": time.time()})
@app.route('/discovery', methods=['GET'])
def discovery():
"""A2A Discovery endpoint"""
return jsonify(AGENT_CARD)
@app.route('/tasks', methods=['POST'])
def create_task():
data = request.get_json()
if not data or 'input' not in data:
abort(400, "Missing 'input' in request body")
# 1. 验证 input 结构
user_id = data['input'].get('user_id')
if not user_id or not isinstance(user_id, str):
abort(400, "Invalid 'user_id' in input")
# 2. 创建 task
task_id = f"t-{uuid.uuid4().hex[:8]}"
task = {
"id": task_id,
"status": "accepted",
"created_at": datetime.utcnow().isoformat(),
"expires_at": (datetime.utcnow() + timedelta(seconds=120)).isoformat(),
"input": data['input'],
"output": None
}
TASKS[task_id] = task
# 3. 异步执行(模拟长任务)
def execute_task():
try:
# 调用 MCP 获取信用数据
result = call_mcp_credit_check(user_id)
# 构建 artifact
artifact = {
"type": "application/json",
"data": {
"user_id": user_id,
"credit_score": result.get("score", 0),
"risk_factors": result.get("factors", []),
"generated_at": datetime.utcnow().isoformat(),
"source": "internal_credit_db_v3.2"
},
"checksum": "sha256:" + hashlib.sha256(json.dumps(result).encode()).hexdigest()[:16]
}
# 更新 task 状态
TASKS[task_id]["status"] = "completed"
TASKS[task_id]["output"] = artifact
TASKS[task_id]["completed_at"] = datetime.utcnow().isoformat()
except Exception as e:
TASKS[task_id]["status"] = "failed"
TASKS[task_id]["error"] = {"code": "EXECUTION_ERROR", "message": str(e)}
threading.Thread(target=execute_task).start()
# 立即返回 202 Accepted
return jsonify({
"id": task_id,
"status": "accepted",
"self_link": f"http://localhost:5000/tasks/{task_id}"
}), 202
@app.route('/tasks/<task_id>', methods=['GET'])
def get_task_status(task_id):
task = TASKS.get(task_id)
if not task:
abort(404, "Task not found")
# 返回完整 task 对象,包含 output/artifact
return jsonify(task)
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000, debug=False)
第四步:注册到 Discovery Service
# register.py
import requests
import json
from card import AGENT_CARD
# 向 Consul 注册服务
consul_url = "http://localhost:8500/v1/agent/service/register"
service_def = {
"ID": "acme-credit-check-v1.0",
"Name": "a2a-credit-check",
"Address": "localhost",
"Port": 5000,
"Check": {
"HTTP": "http://localhost:5000/health",
"Interval": "10s",
"Timeout": "1s"
}
}
requests.put(consul_url, json=service_def)
print("Credit Check Agent registered with Consul!")
# 同时,向 A2A Gateway 的 discovery endpoint 注册 Card
gateway_url = "http://localhost:8080/discovery"
requests.post(gateway_url, json=AGENT_CARD)
print("Agent Card published to A2A Gateway!")
运行 python register.py 后,该 agent 就正式加入 A2A 网络了。任何其他 agent 或 orchestrator 都可以通过 GET http://localhost:8080/discovery 发现它,并通过 POST /tasks 创建任务。
4.3 构建 Orchestrator:串联多个 Agent
Orchestrator 是 A2A 的“大脑”,它不处理具体业务,只负责任务分发、状态监控和 artifact 整合。我们用一个极简的 Python 脚本演示其核心逻辑:
# orchestrator.py
import requests
import time
import json
from typing import Dict, Any
class SimpleOrchestrator:
def __init__(self, a2a_gateway: str = "http://localhost:8080"):
self.gateway = a2a_gateway
def discover_agent(self, name: str) -> Dict[str, Any]:
"""发现指定名称的 agent"""
resp = requests.get(f"{self.gateway}/discovery?name={name}")
if resp.status_code != 200:
raise RuntimeError(f"Failed to discover {name}: {resp.text}")
return resp.json()
def create_task(self, agent_id: str, input_data: Dict) -> str:
"""创建任务,返回 task_id"""
resp = requests.post(
f"{self.gateway}/tasks",
json={"input": input_data},
headers={"X-A2A-Agent-ID": agent_id}
)
if resp.status_code != 202:
raise RuntimeError(f"Failed to create task: {resp.text}")
return resp.json()["id"]
def wait_for_completion(self, task_id: str, timeout: int = 120) -> Dict:
"""轮询等待任务完成"""
start = time.time()
while time.time() - start < timeout:
resp = requests.get(f"{self.gateway}/tasks/{task_id}")
if resp.status_code == 200:
task = resp.json()
if task["status"] == "completed":
return task["output"] # 返回 artifact
elif task["status"] in ["failed", "cancelled"]:
raise RuntimeError(f"Task {task_id} {task['status']}: {task.get('error', {}).get('message', '')}")
time.sleep(2)
raise TimeoutError(f"Task {task_id} timed out after {timeout}s")
def run_insurance_approval(self, user_id: str) -> Dict:
"""执行完整的保险审批流程"""
print(f"Starting insurance approval for user {user_id}...")
# Step 1: Credit Check
print("1. Running Credit Check...")
credit_agent = self.discover_agent("Credit Check Agent")
credit_task_id = self.create_task(credit_agent["id"], {"user_id": user_id})
credit_artifact = self.wait_for_completion(credit_task_id)
# Step 2: Fraud Detection (并发!)
print("2. Running Fraud Detection (concurrently)...")
fraud_agent = self.discover_agent("Fraud Detection Agent")
fraud_task_id = self.create_task(fraud_agent["id"], {"user_id": user_id})
# Step 3: Pricing (依赖 Credit 结果)
print("3. Running Pricing...")
pricing_agent = self.discover_agent("Pricing Agent")
pricing_input = {
"user_id": user_id,
"credit_score": credit_artifact["data"]["credit_score"]
}
pricing_task_id = self.create_task(pricing_agent["id"], pricing_input)
pricing_artifact = self.wait_for_completion(pricing_task_id)
# Step 4: Wait for Fraud
fraud_artifact = self.wait_for_completion(fraud_task_id)
# Step 5: Final Decision
print("4. Making Final Decision...")
final_input = {
"credit": credit_artifact["data"],
"fraud": fraud_artifact["data"],
"pricing": pricing_artifact["data"]
}
# ... logic to combine results ...
return {
"status": "approved",
"premium": pricing_artifact["data"]["monthly_premium"],
"reason": "Good credit, low fraud risk"
}
# 使用示例
if __name__ == "__main__":
orch = SimpleOrchestrator()
result = orch.run_insurance_approval("USR-789012")
print("Final Result:", json.dumps(result, indent=2))
这个 orchestrator 脚本展示了 A2A 的核心价值: 用极少的代码,实现了跨 agent 的串行、并行、依赖调度 。它完全不知道 credit agent 是用 Python 写的,fraud agent 是用 Java 写的,pricing agent 是用 Rust 写的——它只认 A2A 协议。这才是真正的“协议驱动开发”。
更多推荐
所有评论(0)