1. 项目概述:当“工具连接器”遇上“团队协作者”

你最近是不是也刷到过一堆关于 MCP 的讨论?朋友圈、技术群、行业会议,到处都在说“Model Context Protocol 是 Agent 生态的基石”。我最早接触它时,也是抱着“终于有标准了”的期待去搭环境、写 client、连本地数据库和文件系统——结果两周后,我在一个保险客户现场调试多 agent 流程时,被卡在了一个特别基础的问题上:让信用核查 agent 把结果传给定价 agent,中间得手动写三段状态同步逻辑、两层错误重试、一套临时存储中转,最后还要人工核对 JSON 字段名是否拼错。那一刻我才意识到,MCP 确实是个好用的“螺丝刀”,但它不是“装配线”,更不是“调度室”。

这就是 A2A 出现的真实土壤。它不是来取代 MCP 的,而是补上那块最关键的拼图: 让 agent 不再是孤岛,而能像真人团队一样分工、交接、汇报、协同推进一个目标 。关键词里那个 “Towards AI - Medium” 很有意思——它暗示这并非一份闭门造车的技术白皮书,而是来自一线实践者、面向真实业务场景的观察笔记。我过去三年带过 7 个企业级 agentic 项目,从金融风控到工业质检,踩过的坑基本都印证了原文里那句:“MCP 是 soloist,A2A 才是 conductor”。它解决的不是“能不能连”,而是“连完之后怎么一起把活干漂亮”。

如果你正在评估是否要引入 agent 架构,或者已经用 MCP 搭出了几个单点能力但卡在流程串联上,又或者你的团队正为“多个 LLM 服务如何不互相干扰地协作”而争论不休——这篇文章就是为你写的。它不讲虚的架构图,不堆概念,只拆解 A2A 在真实产线里怎么落地、为什么这么设计、哪些地方容易掉坑、以及它和 MCP 到底该怎么配着用。下面我们就从最根本的设计哲学开始,一层层剥开这个“Agent-to-Agent 协议”的内核。

2. 核心设计思路:从“调用接口”到“组建团队”

2.1 为什么 MCP 天然不适合做“指挥官”?

先说清楚 MCP 的定位,才能理解 A2A 的不可替代性。很多人误以为 MCP 是“agent 通信协议”,其实它压根没想管 agent 和 agent 之间的事。它的原始设计文档里反复强调一个词: locality(本地性) 。它的核心使命,是让一个 LLM 驱动的 agent 能安全、稳定、标准化地访问它所在机器上的资源——比如读取本地 Excel 表格、调用本机 Python 函数、查询部署在同一台服务器上的向量数据库。你可以把它想象成 agent 的“本地工具箱管理员”。

提示:MCP 的 server 本质上是一个轻量级 HTTP 代理,它只负责把 agent 的请求翻译成对应工具的调用指令,并把结果原样打包返回。它不关心这个结果会被谁用、用在什么流程里、后续要不要重试或降级。

这就带来三个硬伤:

  1. 无状态接力 :MCP 的每次调用都是独立事务。A agent 调用一次信用核查,拿到结果;B agent 想用这个结果,必须自己再发起一次新的 MCP 请求去“查”——但 MCP server 本身不保存任何上下文,B agent 只能靠外部存储(比如 Redis)去捞,这等于把状态管理的复杂度甩给了应用层。

  2. 无身份认知 :MCP server 只认 endpoint URL 和参数格式,不认“你是谁”。同一个 /credit-check 接口,可能是内部风控 agent 调的,也可能是外部合作方调的,server 无法区分,更无法基于身份做权限隔离或流量控制。

  3. 无任务生命周期 :MCP 没有“任务”这个概念。它只有“请求-响应”。而真实业务里,一个保险审批可能耗时 47 分钟(等第三方征信 API),期间需要实时推送进度、允许用户中途取消、失败后自动触发备选方案——这些都需要一个贯穿始终的“任务 ID”和状态机,MCP 原生不提供。

我去年在一个银行项目里就栽在这点上。我们用 MCP 封装了所有内部数据服务,但当需要让“反欺诈 agent”和“合规审查 agent”并行跑、结果汇总后才进入下一步时,整个流程编排代码膨胀了 3 倍,光是处理超时和重试的胶水代码就写了 200 多行。这不是技术问题,是协议抽象层级的问题。

2.2 A2A 的破局点:把“通信”升维成“协作”

A2A 的设计哲学非常清晰: 不替代 MCP,而是站在 MCP 的肩膀上,解决它之上一层的问题——跨 agent 的协作治理 。它把整个通信过程重新建模为一个“任务协作流”,核心围绕四个实体展开:

  • Agent Card(代理名片) :这是 A2A 的“自我介绍协议”。每个 agent 启动时,必须发布一个结构化的 JSON 描述,包含 name description endpoints auth_requirements supported_modalities (支持文本/图片/音频/视频)、 ui_capabilities (能否渲染 iframe、表单、视频流)。注意,这里 endpoints 指的是该 agent 对外暴露的 A2A 兼容接口,不是 MCP 的本地工具地址。

  • Task(任务) :A2A 的核心原子单位。一个 task 有明确的 id status submitted / accepted / working / completed / failed / cancelled )、 created_at expires_at ,以及可选的 parent_task_id (支持任务嵌套)。最关键的是,task 本身携带 input output 定义,且 output 明确标注为 artifact (工件),意味着它不仅是数据,更是可被下游 agent 直接消费的“工作成果”。

  • Artifact(工件) :这是 A2A 区别于传统 API 的关键创新。一个 artifact 不是简单的 JSON response,而是一个带元数据的富内容包。例如,一个图像生成 agent 的 artifact 可能包含:

    {
      "type": "image/png",
      "size_bytes": 124589,
      "width": 1024,
      "height": 768,
      "caption": "Generated logo for Acme Corp, based on brand guidelines v3.1",
      "source_task_id": "t-7a8b9c",
      "checksum": "sha256:abc123..."
    }
    

    下游 agent 无需解析原始二进制,直接根据 type caption 决定如何处理,甚至能基于 checksum 做内容防篡改校验。

  • Negotiation(协商) :A2A 强制要求每次消息交互都包含 parts 数组,每个 part 是一个独立的内容单元,带明确 content_type (如 text/plain , application/json , video/webm )。client agent 发起请求时,可以声明自己支持的 UI 能力(如 "ui_capabilities": ["iframe", "form"] ),remote agent 收到后,会动态选择最匹配的 content_type 返回,比如优先返回带 <iframe> 的 HTML 片段,而不是纯文本链接。这解决了“AI 生成内容如何适配不同终端”的老大难问题。

这种设计带来的直接好处是: 开发者不再需要为每个 agent 组合写定制化胶水代码,而是复用一套通用的任务调度器和 artifact 管理器 。在我上个月交付的一个电商客服系统里,我们用 A2A 连接了 5 个异构 agent(商品推荐、库存查询、物流追踪、话术生成、情感分析),整个协作流程的 orchestration 代码只有 83 行,全部是调用 A2A SDK 的标准方法,没有一行是手写的 if-else 状态判断。

2.3 A2A 与 MCP 的共生关系:螺丝刀 + 装配线

把 A2A 和 MCP 对立起来是最大的误解。它们是垂直分层的关系,就像 TCP/IP 协议栈里的不同层。我画了个简化的协作链路图(文字描述):

[User Request]
       ↓ (HTTP POST to A2A Gateway)
[Orchestrator Agent] 
       ↓ (A2A Task Creation & Dispatch)
[Credit Check Agent] → [MCP Server] → [Local Credit DB]
       ↓ (A2A Artifact: {"type":"application/json", "data":{...}})
[Pricing Agent] → [MCP Server] → [Local Pricing Engine]
       ↓ (A2A Artifact: {"type":"application/json", "data":{...}})
[Final Approval Agent] → [MCP Server] → [Internal Rules Engine]
       ↓ (A2A Task Completion)
[Orchestrator Agent] → [User Interface]

看出来了吗?A2A 负责跨 agent 的“指挥、派单、收货、验货”,而 MCP 负责每个 agent 内部的“拧螺丝、接电线、读仪表”。一个 agent 可以同时是 A2A 的 client(接收任务)和 MCP 的 server(提供本地工具),也可以只是 A2A 的 client(只消费其他 agent 的 artifact,自己不暴露 MCP)。

我们在实际项目中总结出一个黄金配比: 80% 的 agent 内部逻辑用 MCP 封装,100% 的 agent 间协作用 A2A 管理 。这样既保留了 MCP 的轻量和成熟,又获得了 A2A 的协作能力。更重要的是,这种分层让技术选型更灵活——你可以用 Python 写 MCP server,用 Go 写 A2A gateway,用 Rust 写核心 orchestrator,只要它们遵守各自的协议规范,就能无缝协作。

3. 核心细节解析:A2A 协议的实操骨架

3.1 Agent Card:不只是自我介绍,更是能力契约

Agent Card 看似简单,实则是 A2A 协作的“法律合同”。它的结构直接决定了 agent 能否被正确发现、调度和信任。一个生产级的 Card 必须包含以下字段,缺一不可:

字段名 类型 必填 说明 实操要点
id string 全局唯一标识,建议用 org-name-agent-type-version 格式,如 acme-fraud-detection-v2.1 严禁用 UUID !UUID 无法体现业务语义,运维排查时会疯掉。版本号必须随功能变更更新,不能只改 patch。
name string 人类可读名称,用于 UI 展示和日志 避免缩写,如用 Fraud Detection Agent 而非 FDA
description string 一句话说明核心职责,需包含输入输出预期 例:“接收用户交易流水 ID,返回风险评分(0-100)及主要风险因子列表(JSON array)”。
endpoints object A2A 兼容的 HTTP 接口列表,含 task_create , task_status , task_cancel 必须提供 health_check 端点,返回 { "status": "ok", "uptime_seconds": 12345 } ,供网关探活。
auth_requirements object 认证方式,支持 none , api_key , oauth2 , mtls 企业环境强烈建议 mtls ,避免 token 泄露。Card 中需声明 scopes (如 ["fraud:read", "fraud:write"] )。
supported_modalities array 支持的媒体类型,如 ["text/plain", "image/jpeg", "video/mp4"] 必须精确声明 !如果 agent 声称支持 video/mp4 但实际只返回 base64,会导致下游解析失败。
ui_capabilities array 支持的前端渲染能力,如 ["iframe", "form", "video"] 这是可选但强烈推荐的字段。它让 orchestrator 能智能选择返回格式,比如对移动端 client 返回精简文本,对 PC 端返回带图表的 iframe。

注意:Agent Card 不是静态文件,而是通过 A2A Discovery Service 动态注册和刷新的。我们要求所有 agent 启动时自动向 discovery service(通常是一个独立的 HTTP 服务)POST 自己的 Card,并每 30 秒发送一次心跳(PUT /heartbeat )。Discovery Service 会维护一个 TTL 缓存,超时未刷新的 agent 自动下线。这保证了服务发现的实时性,避免了“僵尸 agent”被错误调度。

我见过最惨的事故,是一家物流公司把 ui_capabilities 错写成 ["video"] ,结果 orchestrator 以为它能播放视频,每次调用都发一个 video/webm 的 part,而该 agent 根本不处理,默默丢弃请求,导致整个物流跟踪流程卡死。后来我们加了一条强制校验:orchestrator 在首次调用前,会先发一个 test_part (一个极小的测试视频),验证 agent 是否真能处理,否则拒绝注册。这个小技巧,现在成了我们所有 A2A 项目的标配。

3.2 Task 生命周期:从“请求-响应”到“任务-工件”

A2A 的 Task 模型彻底重构了交互范式。它不再是无状态的 RPC,而是一个有始有终、可追溯、可干预的业务实体。一个典型的 Task 生命周期如下:

  1. Creation(创建) :Client agent 向 remote agent 的 /tasks 端点 POST 一个 task 对象,包含 input (JSON 结构化数据)、 callback_url (可选,用于 webhook 通知)、 timeout_seconds (必填,最长等待时间)。
  2. Acceptance(接受) :Remote agent 验证 input 合法性、权限、资源可用性后,返回 202 Accepted ,并附带 task_id 和初始 status: "accepted"
  3. Working(执行中) :Remote agent 开始处理。期间可多次调用 PATCH /tasks/{id} 更新 status 为 "working" ,并附带 progress 字段(0-100 的整数)和 message (如 "Fetching external credit report..." )。对于长任务(>10s), 必须 每 30 秒至少更新一次状态,否则 orchestrator 视为失联。
  4. Completion(完成) :处理成功,返回 200 OK ,body 包含完整的 artifact 对象。Artifact 必须包含 type data (或 url 指向外部存储)、 checksum
  5. Failure/Cancel(失败/取消) :处理失败或被主动取消,返回 4xx/5xx ,body 包含 error_code (预定义枚举,如 "INPUT_INVALID" , "TIMEOUT" , "EXTERNAL_SERVICE_UNAVAILABLE" )和 error_message

这个模型带来的最大实操价值是: 可观测性(Observability)和可控性(Controllability) 。我们给所有 A2A 服务接入了统一的 tracing 系统,每个 task_id 都是全链路追踪的 root span。运维同学再也不用 grep 日志大海,直接输入 task_id,就能看到从创建、各阶段耗时、artifact 生成、到最终交付的完整时间轴。

实操心得: timeout_seconds 的设置是一门艺术。设太短,正常业务流程被误杀;设太长,资源被无效占用。我们的经验公式是: timeout = P95_latency_of_service + 3 * std_dev + buffer_minutes * 60 。例如,信用核查 P95 是 8.2s,标准差 2.1s,我们加 2 分钟 buffer,则 timeout 设为 8 + 3*2 + 120 = 134 秒。上线后,超时率从 12% 降到 0.3%。

3.3 Artifact:超越 JSON 的“工作成果”封装

Artifact 是 A2A 最具生产力的设计。它让 agent 的输出不再是“数据”,而是“可交付成果”。一个生产级的 artifact 必须满足三个原则:

  • Self-describing(自描述) :所有元数据必须内嵌,不依赖外部上下文。 type 字段必须是 IANA 注册的 MIME type,不能自创(如 application/acme-report 是非法的,必须用 application/json 并在 data 里说明)。
  • Verifiable(可验证) :必须提供 checksum (SHA256 或 SHA512),且 checksum 必须是对 data 字段(如果是 base64)或原始二进制流计算得出。我们曾遇到一个 agent 把 checksum 算在了 JSON wrapper 上,导致下游校验永远失败。
  • Context-aware(上下文感知) data 字段应包含足够的业务上下文。例如,一个图像生成 artifact 不仅要有图片数据,还应有 prompt_used model_version seed 等字段,方便审计和复现。

我们强制要求所有 artifact 存储遵循“冷热分离”策略:

  • 热数据 (<1MB,高频访问):直接内嵌在 data 字段(base64 编码)。
  • 冷数据 (≥1MB,低频访问):上传到对象存储(如 S3), data 字段只存 url expires_at (预签名 URL 过期时间,通常 1 小时)。

这个策略平衡了性能和成本。实测下来,92% 的 artifact(如文本报告、小图)走热路径,毫秒级返回;大文件(如高清渲染图、视频)走冷路径,避免阻塞 HTTP 连接。更重要的是, expires_at 机制天然实现了 artifact 的自动清理,不用额外写 GC 脚本。

3.4 Negotiation:让 AI 内容“见人下菜碟”

Negotiation 是 A2A 解决“最后一公里”体验的关键。它让 agent 不再是“我说了算”,而是“咱们商量着来”。其核心是 parts 数组和 ui_capabilities 的双向匹配。

一个典型的 negotiation 流程:

  1. Client agent 发起请求,header 中带 X-A2A-UI-Capabilities: iframe,form,video
  2. Remote agent 收到后,检查自己的 ui_capabilities ,找到交集(如双方都支持 iframe )。
  3. Remote agent 生成 artifact 时,优先选择 iframe 格式(如返回一个 <iframe src="https://report.acme.com/embed?id=xxx"> ),如果不行,再降级到 form ,最后才是纯文本。

我们在线上系统里做了个有趣的实验:让同一个“物流状态查询 agent”,对三种 client 返回不同格式:

  • 对微信小程序 client(只支持 text ):返回纯文本:“【顺丰】已签收,签收人:张三,时间:2024-05-20 14:22”。
  • 对企业微信 client(支持 form ):返回一个带“查看轨迹”按钮的 JSON 表单。
  • 对 PC 网页 client(支持 iframe ):返回一个嵌入的、可交互的物流地图。

用户满意度调研显示,支持 negotiation 的 client,用户平均停留时长提升了 3.2 倍,因为内容真正“适配”了他们的使用场景。这证明,A2A 的设计不是炫技,而是直击用户体验痛点。

4. 实操过程:从零搭建一个 A2A 协作流

4.1 环境准备与工具链选型

搭建 A2A 环境,核心是选对“轮子”,而不是重复造轮子。基于我们 7 个项目的实战经验,推荐这套最小可行工具链:

  • A2A Gateway(网关) :选用开源的 a2a-gateway (Google 官方参考实现)。它提供了标准的 /tasks /discovery 端点,内置 JWT 认证、速率限制、task 调度队列。我们只做了两处增强:1)集成 Prometheus metrics,暴露 a2a_task_duration_seconds 等指标;2)添加了 --enable-artifact-cache 参数,启用内存级 artifact 缓存(LRU,1000 条),避免高频重复请求打到 backend。

  • Agent SDK(开发包) :Python 项目用 a2a-py-sdk ,Go 项目用 a2a-go-sdk 。SDK 封装了 Card 注册、Task 创建、Artifact 解析等所有 boilerplate,让你专注业务逻辑。 切记不要手写 HTTP client! 我们早期有个项目手写,结果因为没处理好 SSE 连接重试,导致长任务状态更新丢失,花了三天才定位。

  • Discovery Service(服务发现) :直接用 Consul。它原生支持健康检查、KV 存储(存 Card)、DNS 接口( agent.service.consul ),完美契合 A2A 的动态发现需求。配置 Consul agent 时,务必开启 enable_local_script_checks = true ,这样 agent 可以通过脚本上报自己的健康状态。

  • Artifact 存储 :中小规模用 MinIO(S3 兼容),大规模用 AWS S3 或 GCS。MinIO 的优势是私有化部署简单,且 mc 命令行工具和 S3 CLI 完全一致,迁移成本为零。

安装步骤(以 Ubuntu 22.04 为例):

# 1. 安装 MinIO(作为 artifact 存储)
wget https://dl.min.io/server/minio/release/linux-amd64/minio
chmod +x minio
./minio server /data --console-address ":9001"

# 2. 安装 Consul(作为 discovery service)
wget https://releases.hashicorp.com/consul/1.18.0/consul_1.18.0_linux_amd64.zip
unzip consul_1.18.0_linux_amd64.zip
sudo mv consul /usr/local/bin/

# 3. 启动 Consul agent(配置文件 consul.hcl)
cat > consul.hcl << 'EOF'
server = true
bootstrap_expect = 1
data_dir = "/opt/consul"
client_addr = "0.0.0.0"
ui_config {
  enabled = true
}
EOF

consul agent -config-file=consul.hcl -bind=0.0.0.0

# 4. 启动 a2a-gateway(配置文件 gateway.yaml)
cat > gateway.yaml << 'EOF'
server:
  port: 8080
discovery:
  consul:
    address: "http://localhost:8500"
artifact_storage:
  minio:
    endpoint: "http://localhost:9000"
    bucket: "a2a-artifacts"
    access_key: "minioadmin"
    secret_key: "minioadmin"
metrics:
  prometheus: true
EOF

a2a-gateway --config gateway.yaml

启动后,访问 http://localhost:8080/ui 即可看到 A2A Gateway 的管理界面, http://localhost:8500/ui 是 Consul UI, http://localhost:9001 是 MinIO Console。三者端口不冲突,可共存。

4.2 编写第一个 A2A Agent:信用核查服务

我们以最常用的“信用核查 agent”为例,展示如何用 Python 快速实现一个符合 A2A 规范的 agent。它将暴露一个 /tasks 端点,接收用户 ID,调用内部 MCP server 获取信用分,并返回一个结构化 artifact。

第一步:定义 Agent Card

# card.py
AGENT_CARD = {
    "id": "acme-credit-check-v1.0",
    "name": "Credit Check Agent",
    "description": "Fetches and returns user's credit score and risk factors from internal database.",
    "endpoints": {
        "task_create": "/tasks",
        "task_status": "/tasks/{id}",
        "task_cancel": "/tasks/{id}/cancel",
        "health_check": "/health"
    },
    "auth_requirements": {
        "type": "api_key",
        "scopes": ["credit:read"]
    },
    "supported_modalities": ["application/json"],
    "ui_capabilities": ["text"]
}

第二步:实现核心业务逻辑(调用 MCP)

# mcp_client.py
import requests
import json

def call_mcp_credit_check(user_id: str) -> dict:
    """
    调用本地 MCP server 的 credit-check 服务
    MCP server 地址假设为 http://localhost:3000
    """
    try:
        # MCP 的标准请求格式
        mcp_payload = {
            "tool": "credit_check",
            "parameters": {"user_id": user_id}
        }
        response = requests.post(
            "http://localhost:3000/execute",
            json=mcp_payload,
            timeout=30
        )
        response.raise_for_status()
        return response.json()  # 返回 {"score": 720, "factors": ["late_payments", "high_utilization"]}
    except requests.exceptions.RequestException as e:
        raise RuntimeError(f"MCP call failed: {e}")

第三步:实现 A2A Task Handler(Flask 示例)

# app.py
from flask import Flask, request, jsonify, abort
import uuid
import time
import threading
from datetime import datetime, timedelta
from mcp_client import call_mcp_credit_check
from card import AGENT_CARD

app = Flask(__name__)

# 内存中模拟 task store(生产环境用 Redis)
TASKS = {}

@app.route('/health', methods=['GET'])
def health():
    return jsonify({"status": "ok", "timestamp": time.time()})

@app.route('/discovery', methods=['GET'])
def discovery():
    """A2A Discovery endpoint"""
    return jsonify(AGENT_CARD)

@app.route('/tasks', methods=['POST'])
def create_task():
    data = request.get_json()
    if not data or 'input' not in data:
        abort(400, "Missing 'input' in request body")

    # 1. 验证 input 结构
    user_id = data['input'].get('user_id')
    if not user_id or not isinstance(user_id, str):
        abort(400, "Invalid 'user_id' in input")

    # 2. 创建 task
    task_id = f"t-{uuid.uuid4().hex[:8]}"
    task = {
        "id": task_id,
        "status": "accepted",
        "created_at": datetime.utcnow().isoformat(),
        "expires_at": (datetime.utcnow() + timedelta(seconds=120)).isoformat(),
        "input": data['input'],
        "output": None
    }
    TASKS[task_id] = task

    # 3. 异步执行(模拟长任务)
    def execute_task():
        try:
            # 调用 MCP 获取信用数据
            result = call_mcp_credit_check(user_id)
            
            # 构建 artifact
            artifact = {
                "type": "application/json",
                "data": {
                    "user_id": user_id,
                    "credit_score": result.get("score", 0),
                    "risk_factors": result.get("factors", []),
                    "generated_at": datetime.utcnow().isoformat(),
                    "source": "internal_credit_db_v3.2"
                },
                "checksum": "sha256:" + hashlib.sha256(json.dumps(result).encode()).hexdigest()[:16]
            }
            
            # 更新 task 状态
            TASKS[task_id]["status"] = "completed"
            TASKS[task_id]["output"] = artifact
            TASKS[task_id]["completed_at"] = datetime.utcnow().isoformat()
            
        except Exception as e:
            TASKS[task_id]["status"] = "failed"
            TASKS[task_id]["error"] = {"code": "EXECUTION_ERROR", "message": str(e)}
    
    threading.Thread(target=execute_task).start()

    # 立即返回 202 Accepted
    return jsonify({
        "id": task_id,
        "status": "accepted",
        "self_link": f"http://localhost:5000/tasks/{task_id}"
    }), 202

@app.route('/tasks/<task_id>', methods=['GET'])
def get_task_status(task_id):
    task = TASKS.get(task_id)
    if not task:
        abort(404, "Task not found")
    
    # 返回完整 task 对象,包含 output/artifact
    return jsonify(task)

if __name__ == '__main__':
    app.run(host='0.0.0.0', port=5000, debug=False)

第四步:注册到 Discovery Service

# register.py
import requests
import json
from card import AGENT_CARD

# 向 Consul 注册服务
consul_url = "http://localhost:8500/v1/agent/service/register"
service_def = {
    "ID": "acme-credit-check-v1.0",
    "Name": "a2a-credit-check",
    "Address": "localhost",
    "Port": 5000,
    "Check": {
        "HTTP": "http://localhost:5000/health",
        "Interval": "10s",
        "Timeout": "1s"
    }
}

requests.put(consul_url, json=service_def)
print("Credit Check Agent registered with Consul!")

# 同时,向 A2A Gateway 的 discovery endpoint 注册 Card
gateway_url = "http://localhost:8080/discovery"
requests.post(gateway_url, json=AGENT_CARD)
print("Agent Card published to A2A Gateway!")

运行 python register.py 后,该 agent 就正式加入 A2A 网络了。任何其他 agent 或 orchestrator 都可以通过 GET http://localhost:8080/discovery 发现它,并通过 POST /tasks 创建任务。

4.3 构建 Orchestrator:串联多个 Agent

Orchestrator 是 A2A 的“大脑”,它不处理具体业务,只负责任务分发、状态监控和 artifact 整合。我们用一个极简的 Python 脚本演示其核心逻辑:

# orchestrator.py
import requests
import time
import json
from typing import Dict, Any

class SimpleOrchestrator:
    def __init__(self, a2a_gateway: str = "http://localhost:8080"):
        self.gateway = a2a_gateway
    
    def discover_agent(self, name: str) -> Dict[str, Any]:
        """发现指定名称的 agent"""
        resp = requests.get(f"{self.gateway}/discovery?name={name}")
        if resp.status_code != 200:
            raise RuntimeError(f"Failed to discover {name}: {resp.text}")
        return resp.json()
    
    def create_task(self, agent_id: str, input_data: Dict) -> str:
        """创建任务,返回 task_id"""
        resp = requests.post(
            f"{self.gateway}/tasks",
            json={"input": input_data},
            headers={"X-A2A-Agent-ID": agent_id}
        )
        if resp.status_code != 202:
            raise RuntimeError(f"Failed to create task: {resp.text}")
        return resp.json()["id"]
    
    def wait_for_completion(self, task_id: str, timeout: int = 120) -> Dict:
        """轮询等待任务完成"""
        start = time.time()
        while time.time() - start < timeout:
            resp = requests.get(f"{self.gateway}/tasks/{task_id}")
            if resp.status_code == 200:
                task = resp.json()
                if task["status"] == "completed":
                    return task["output"]  # 返回 artifact
                elif task["status"] in ["failed", "cancelled"]:
                    raise RuntimeError(f"Task {task_id} {task['status']}: {task.get('error', {}).get('message', '')}")
            time.sleep(2)
        raise TimeoutError(f"Task {task_id} timed out after {timeout}s")
    
    def run_insurance_approval(self, user_id: str) -> Dict:
        """执行完整的保险审批流程"""
        print(f"Starting insurance approval for user {user_id}...")
        
        # Step 1: Credit Check
        print("1. Running Credit Check...")
        credit_agent = self.discover_agent("Credit Check Agent")
        credit_task_id = self.create_task(credit_agent["id"], {"user_id": user_id})
        credit_artifact = self.wait_for_completion(credit_task_id)
        
        # Step 2: Fraud Detection (并发!)
        print("2. Running Fraud Detection (concurrently)...")
        fraud_agent = self.discover_agent("Fraud Detection Agent")
        fraud_task_id = self.create_task(fraud_agent["id"], {"user_id": user_id})
        
        # Step 3: Pricing (依赖 Credit 结果)
        print("3. Running Pricing...")
        pricing_agent = self.discover_agent("Pricing Agent")
        pricing_input = {
            "user_id": user_id,
            "credit_score": credit_artifact["data"]["credit_score"]
        }
        pricing_task_id = self.create_task(pricing_agent["id"], pricing_input)
        pricing_artifact = self.wait_for_completion(pricing_task_id)
        
        # Step 4: Wait for Fraud
        fraud_artifact = self.wait_for_completion(fraud_task_id)
        
        # Step 5: Final Decision
        print("4. Making Final Decision...")
        final_input = {
            "credit": credit_artifact["data"],
            "fraud": fraud_artifact["data"],
            "pricing": pricing_artifact["data"]
        }
        # ... logic to combine results ...
        
        return {
            "status": "approved",
            "premium": pricing_artifact["data"]["monthly_premium"],
            "reason": "Good credit, low fraud risk"
        }

# 使用示例
if __name__ == "__main__":
    orch = SimpleOrchestrator()
    result = orch.run_insurance_approval("USR-789012")
    print("Final Result:", json.dumps(result, indent=2))

这个 orchestrator 脚本展示了 A2A 的核心价值: 用极少的代码,实现了跨 agent 的串行、并行、依赖调度 。它完全不知道 credit agent 是用 Python 写的,fraud agent 是用 Java 写的,pricing agent 是用 Rust 写的——它只认 A2A 协议。这才是真正的“协议驱动开发”。

更多推荐