6种A2A(智能体到智能体)的协议方案
a2a.agent/card-url: "https://financial-analysis-agent/.well-known/agent.json" # 卡片URL。paths: ["/.well-known/agent.json", "/health"] # 允许的路径。"capability": "analyze_financial_data", # 分析金融数据。"parameters
1. A2A 协议架构概述
A2A 协议是一个标准化的通信框架,使得来自不同平台、组织和框架的智能体能够无缝通信。以下是完整的架构:
2. 核心 A2A 组件
2.1 智能体卡片(发现机制)
智能体卡片是描述智能体能力的标准化 JSON 文档:
{ "name": "金融分析智能体", "version": "1.2.0", "description": "专注于金融数据分析和报告", "protocol_version": "A2A/1.0", "capabilities": [ { "name": "analyze_financial_data", "description": "分析金融数据集并生成洞察", "parameters": { "dataset_url": {"type": "string", "required": true}, "analysis_type": {"type": "string", "enum": ["trend", "forecast", "comparison"]}, "time_period": {"type": "string", "format": "date-range"} }, "returns": { "type": "object", "properties": { "insights": {"type": "array"}, "charts": {"type": "array"}, "recommendations": {"type": "array"} } } }, { "name": "generate_report", "description": "生成格式化的财务报告", "parameters": { "analysis_id": {"type": "string", "required": true}, "format": {"type": "string", "enum": ["pdf", "html", "excel"]} } } ], "endpoints": { "chat": "/api/v1/chat", "tasks": "/api/v1/tasks", "status": "/api/v1/status", "health": "/health" }, "authentication": { "methods": ["oauth2", "api_key", "mtls"], "oauth2": { "authorization_url": "https://auth.company.com/oauth/authorize", "token_url": "https://auth.company.com/oauth/token", "scopes": ["financial_analysis", "report_generation"] } }, "security": { "encryption": "TLS 1.3", "data_retention": "30 days", "compliance": ["SOC2", "GDPR", "HIPAA"] }, "metadata": { "organization": "FinTech Corp", "contact": "api-support@fintech.com", "documentation": "https://docs.fintech.com/agents/financial-analysis", "pricing": "https://fintech.com/pricing/agents", "rate_limits": { "requests_per_minute": 100, "concurrent_tasks": 5 } } }
2.2 任务管理系统
A2A 使用结构化的任务对象处理复杂的工作流:
from dataclasses import dataclass from typing import Dict, List, Any, Optional from enum import Enum import uuid from datetime import datetime class TaskStatus(Enum): PENDING = "pending" # 挂起 RUNNING = "running" # 运行中 COMPLETED = "completed" # 已完成 FAILED = "failed" # 失败 CANCELLED = "cancelled" # 已取消 class TaskPriority(Enum): LOW = "low" # 低 NORMAL = "normal" # 普通 HIGH = "high" # 高 URGENT = "urgent" # 紧急 @dataclass class A2ATask: """用于 A2A 通信的结构化任务""" id: str agent_id: str capability: str parameters: Dict[str, Any] status: TaskStatus = TaskStatus.PENDING priority: TaskPriority = TaskPriority.NORMAL created_at: datetime = None started_at: Optional[datetime] = None completed_at: Optional[datetime] = None result: Optional[Dict[str, Any]] = None error: Optional[str] = None dependencies: List[str] = None # 此任务所依赖的任务 ID metadata: Dict[str, Any] = None def __post_init__(self): if self.id is None: self.id = str(uuid.uuid4()) if self.created_at is None: self.created_at = datetime.utcnow() if self.dependencies is None: self.dependencies = [] if self.metadata is None: self.metadata = {} # 任务工作流示例 class A2ATaskManager: def __init__(self): self.tasks: Dict[str, A2ATask] = {} self.task_dependencies: Dict[str, List[str]] = {} def create_task(self, agent_id: str, capability: str, parameters: Dict[str, Any], **kwargs) -> A2ATask: """创建一个新的 A2A 任务""" task = A2ATask( id=str(uuid.uuid4()), agent_id=agent_id, capability=capability, parameters=parameters, **kwargs ) self.tasks[task.id] = task return task def create_workflow(self, workflow_definition: List[Dict]) -> List[A2ATask]: """创建具有依赖关系的任务工作流""" tasks = [] task_map = {} # 首先创建所有任务 for step in workflow_definition: task = self.create_task( agent_id=step['agent_id'], capability=step['capability'], parameters=step['parameters'], priority=TaskPriority(step.get('priority', 'normal')) ) tasks.append(task) task_map[step['step_id']] = task.id # 设置依赖关系 for i, step in enumerate(workflow_definition): if 'depends_on' in step: dependency_ids = [task_map[dep] for dep in step['depends_on']] tasks[i].dependencies = dependency_ids return tasks # 工作流定义示例 financial_analysis_workflow = [ { "step_id": "data_collection", # 数据收集 "agent_id": "data_collector_agent", # 数据收集器智能体 "capability": "collect_financial_data", # 收集金融数据 "parameters": {"sources": ["yahoo_finance", "bloomberg"], "symbols": ["AAPL", "GOOGL"]}, "priority": "high" # 高 }, { "step_id": "data_analysis", # 数据分析 "agent_id": "analysis_agent", # 分析智能体 "capability": "analyze_financial_data", # 分析金融数据 "parameters": {"analysis_type": "trend"}, # 趋势分析 "depends_on": ["data_collection"] # 依赖于数据收集 }, { "step_id": "report_generation", # 报告生成 "agent_id": "report_agent", # 报告智能体 "capability": "generate_report", # 生成报告 "parameters": {"format": "pdf"}, # PDF格式 "depends_on": ["data_analysis"] # 依赖于数据分析 } ]
2.3 安全与认证
A2A 提供具有多种认证方法的企业级安全性:
import jwt import httpx from typing import Optional from datetime import datetime, timedelta class A2ASecurityManager: """处理 A2A 认证和授权""" def __init__(self): self.auth_methods = { 'oauth2': self.oauth2_auth, 'api_key': self.api_key_auth, 'mtls': self.mtls_auth, 'jwt': self.jwt_auth } async def authenticate_agent(self, agent_card: dict, credentials: dict) -> dict: """与 A2A 智能体进行认证""" auth_config = agent_card.get('authentication', {}) supported_methods = auth_config.get('methods', ['api_key']) # 按偏好顺序尝试认证方法 for method in supported_methods: if method in credentials: try: auth_result = await self.auth_methods[method]( agent_card, credentials[method] ) return auth_result except Exception as e: print(f"{method} 认证失败: {e}") continue raise Exception("所有认证方法均失败") async def oauth2_auth(self, agent_card: dict, oauth_config: dict) -> dict: """OAuth 2.0 认证流程""" auth_config = agent_card['authentication']['oauth2'] # 步骤 1: 获取授权码 (简化版) auth_url = auth_config['authorization_url'] client_id = oauth_config['client_id'] redirect_uri = oauth_config['redirect_uri'] scopes = ' '.join(auth_config.get('scopes', [])) # 步骤 2: 用 code 交换 token token_url = auth_config['token_url'] token_data = { 'grant_type': 'authorization_code', 'client_id': client_id, 'client_secret': oauth_config['client_secret'], 'code': oauth_config['authorization_code'], 'redirect_uri': redirect_uri } async with httpx.AsyncClient() as client: response = await client.post(token_url, data=token_data) token_response = response.json() return { 'method': 'oauth2', 'access_token': token_response['access_token'], 'token_type': token_response.get('token_type', 'Bearer'), 'expires_at': datetime.utcnow() + timedelta( seconds=token_response.get('expires_in', 3600) ) } async def api_key_auth(self, agent_card: dict, api_key: str) -> dict: """API 密钥认证""" return { 'method': 'api_key', 'api_key': api_key, 'header_name': agent_card.get('authentication', {}).get('api_key_header', 'X-API-Key') } async def mtls_auth(self, agent_card: dict, mtls_config: dict) -> dict: """双向 TLS (mTLS) 认证""" return { 'method': 'mtls', 'cert_file': mtls_config['cert_file'], 'key_file': mtls_config['key_file'], 'ca_file': mtls_config.get('ca_file') } async def jwt_auth(self, agent_card: dict, jwt_config: dict) -> dict: """JWT 令牌认证""" payload = { 'agent_id': jwt_config['agent_id'], 'iat': datetime.utcnow(), 'exp': datetime.utcnow() + timedelta(hours=1), 'aud': agent_card['name'] } token = jwt.encode(payload, jwt_config['private_key'], algorithm='RS256') return { 'method': 'jwt', 'token': token, 'algorithm': 'RS256' } # 使用示例 security_manager = A2ASecurityManager() credentials = { 'oauth2': { 'client_id': 'your_client_id', 'client_secret': 'your_client_secret', 'authorization_code': 'auth_code_from_redirect', 'redirect_uri': 'https://your-app.com/callback' }, 'api_key': 'your_api_key_here' } # 与智能体进行认证 auth_result = await security_manager.authenticate_agent(agent_card, credentials)
3. A2A 通信模式
3.1 同步请求-响应
class A2ASyncClient: """同步 A2A 通信""" async def call_agent_capability(self, agent_url: str, capability: str, parameters: dict, auth: dict) -> dict: """同步调用特定的智能体能力""" headers = self.build_auth_headers(auth) request_payload = { "capability": capability, "parameters": parameters, "request_id": str(uuid.uuid4()), "timestamp": datetime.utcnow().isoformat(), "response_format": "json" } async with httpx.AsyncClient() as client: response = await client.post( f"{agent_url}/api/v1/capabilities/{capability}", json=request_payload, headers=headers, timeout=30.0 ) if response.status_code == 200: return response.json() else: raise Exception(f"智能体调用失败: {response.status_code} - {response.text}") def build_auth_headers(self, auth: dict) -> dict: """构建认证头""" headers = {"Content-Type": "application/json"} if auth['method'] == 'oauth2': headers['Authorization'] = f"{auth['token_type']} {auth['access_token']}" elif auth['method'] == 'api_key': headers[auth['header_name']] = auth['api_key'] elif auth['method'] == 'jwt': headers['Authorization'] = f"Bearer {auth['token']}" return headers
3.2 基于任务的异步通信
class A2AAsyncClient: """具有任务管理的异步 A2A 通信""" async def submit_task(self, agent_url: str, task: A2ATask, auth: dict) -> str: """提交任务以进行异步执行""" headers = self.build_auth_headers(auth) task_payload = { "task_id": task.id, "capability": task.capability, "parameters": task.parameters, "priority": task.priority.value, "callback_url": f"https://your-agent.com/callbacks/{task.id}", "metadata": task.metadata } async with httpx.AsyncClient() as client: response = await client.post( f"{agent_url}/api/v1/tasks", json=task_payload, headers=headers ) if response.status_code == 202: # 已接受 (Accepted) result = response.json() return result['task_id'] else: raise Exception(f"任务提交失败: {response.status_code}") async def get_task_status(self, agent_url: str, task_id: str, auth: dict) -> dict: """获取已提交任务的状态""" headers = self.build_auth_headers(auth) async with httpx.AsyncClient() as client: response = await client.get( f"{agent_url}/api/v1/tasks/{task_id}", headers=headers ) return response.json() async def wait_for_task_completion(self, agent_url: str, task_id: str, auth: dict, timeout: int = 300) -> dict: """通过轮询等待任务完成""" start_time = datetime.utcnow() while (datetime.utcnow() - start_time).seconds < timeout: status = await self.get_task_status(agent_url, task_id, auth) if status['status'] in ['completed', 'failed', 'cancelled']: return status await asyncio.sleep(5) # 每 5 秒轮询一次 raise TimeoutError(f"任务 {task_id} 在 {timeout} 秒内未完成")
3.3 流式通信
class A2AStreamingClient: """用于实时数据的流式 A2A 通信""" async def stream_capability(self, agent_url: str, capability: str, parameters: dict, auth: dict): """从智能体能力流式传输结果""" headers = self.build_auth_headers(auth) headers['Accept'] = 'text/event-stream' request_payload = { "capability": capability, "parameters": parameters, "stream": True } async with httpx.AsyncClient() as client: async with client.stream( 'POST', f"{agent_url}/api/v1/capabilities/{capability}/stream", json=request_payload, headers=headers ) as response: async for chunk in response.aiter_text(): if chunk.startswith('data: '): data = chunk[6:] # 移除 'data: ' 前缀 if data.strip(): yield json.loads(data)
4. A2A 网络拓扑
4.1 中心辐射型(协调器模式)
class A2AOrchestrator: """管理多个专业智能体的中央协调器""" def __init__(self): self.registered_agents = {} self.task_manager = A2ATaskManager() self.security_manager = A2ASecurityManager() async def register_agent(self, agent_url: str, credentials: dict): """在网络中注册一个新智能体""" # 发现智能体能力 agent_card = await self.discover_agent(agent_url) # 与智能体认证 auth = await self.security_manager.authenticate_agent(agent_card, credentials) # 存储智能体信息 self.registered_agents[agent_card['name']] = { 'url': agent_url, 'card': agent_card, 'auth': auth, 'last_seen': datetime.utcnow() } print(f"已注册智能体: {agent_card['name']}") async def execute_workflow(self, workflow_definition: List[Dict]) -> Dict[str, Any]: """执行多智能体工作流""" # 创建工作流任务 tasks = self.task_manager.create_workflow(workflow_definition) # 按照依赖关系执行任务 results = {} completed_tasks = set() while len(completed_tasks) < len(tasks): for task in tasks: if task.id in completed_tasks: continue # 检查依赖关系是否满足 if all(dep_id in completed_tasks for dep_id in task.dependencies): # 执行任务 agent_info = self.registered_agents[task.agent_id] try: result = await self.execute_task(task, agent_info) results[task.id] = result completed_tasks.add(task.id) task.status = TaskStatus.COMPLETED task.completed_at = datetime.utcnow() except Exception as e: task.status = TaskStatus.FAILED task.error = str(e) results[task.id] = {"error": str(e)} completed_tasks.add(task.id) # 即使失败也标记为已完成 return results async def execute_task(self, task: A2ATask, agent_info: dict) -> dict: """在智能体上执行单个任务""" client = A2ASyncClient() result = await client.call_agent_capability( agent_url=agent_info['url'], capability=task.capability, parameters=task.parameters, auth=agent_info['auth'] ) return result
4.2 点对点(网状网络)
class A2APeerNetwork: """智能体直接通信的点对点 A2A 网络""" def __init__(self, agent_id: str): self.agent_id = agent_id self.peer_registry = {} self.message_router = A2AMessageRouter() async def discover_peers(self, discovery_service_url: str): """发现网络中的其他智能体""" async with httpx.AsyncClient() as client: response = await client.get(f"{discovery_service_url}/agents") agents = response.json() for agent in agents: if agent['id'] != self.agent_id: self.peer_registry[agent['id']] = { 'url': agent['url'], 'capabilities': agent['capabilities'], 'last_seen': datetime.utcnow() } async def broadcast_message(self, message: dict, exclude_agents: List[str] = None): """向所有对等点广播消息""" exclude_agents = exclude_agents or [] for agent_id, peer_info in self.peer_registry.items(): if agent_id not in exclude_agents: try: await self.send_message_to_peer(agent_id, message) except Exception as e: print(f"向 {agent_id} 发送消息失败: {e}") async def send_message_to_peer(self, peer_id: str, message: dict): """向特定对等点发送消息""" peer_info = self.peer_registry.get(peer_id) if not peer_info: raise Exception(f"在注册表中未找到对等点 {peer_id}") message_payload = { "from_agent": self.agent_id, "to_agent": peer_id, "message": message, "timestamp": datetime.utcnow().isoformat(), "message_id": str(uuid.uuid4()) } async with httpx.AsyncClient() as client: response = await client.post( f"{peer_info['url']}/api/v1/messages", json=message_payload ) return response.json() class A2AMessageRouter: """在对等网络中的智能体之间路由消息""" def __init__(self): self.routing_table = {} self.message_handlers = {} def register_handler(self, message_type: str, handler_func): """为特定消息类型注册处理程序""" self.message_handlers[message_type] = handler_func async def route_message(self, message: dict) -> dict: """将传入消息路由到相应的处理程序""" message_type = message.get('type', 'unknown') if message_type in self.message_handlers: handler = self.message_handlers[message_type] return await handler(message) else: return {"error": f"没有针对消息类型 {message_type} 的处理程序"}
5. A2A 生产部署
5.1 Kubernetes 部署
# a2a-agent-deployment.yaml apiVersion: apps/v1 kind: Deployment metadata: name: financial-analysis-agent # 金融分析智能体 labels: app: financial-analysis-agent protocol: a2a spec: replicas: 3 # 副本数 selector: matchLabels: app: financial-analysis-agent template: metadata: labels: app: financial-analysis-agent annotations: # 注解 a2a.protocol/version: "1.0" a2a.agent/capabilities: "financial_analysis,report_generation" # 能力 spec: containers: - name: agent image: your-registry/financial-analysis-agent:v1.2.0 # 镜像 ports: - containerPort: 8080 name: http - containerPort: 8443 name: https env: # 环境变量 - name: A2A_AGENT_ID value: "financial-analysis-agent" - name: A2A_PROTOCOL_VERSION value: "1.0" - name: OAUTH2_CLIENT_ID valueFrom: secretKeyRef: name: a2a-auth-secrets # 认证密钥 key: oauth2-client-id - name: OAUTH2_CLIENT_SECRET valueFrom: secretKeyRef: name: a2a-auth-secrets key: oauth2-client-secret volumeMounts: # 卷挂载 - name: tls-certs # TLS 证书 mountPath: /etc/tls readOnly: true - name: agent-config # 智能体配置 mountPath: /etc/agent readOnly: true livenessProbe: # 存活探针 httpGet: path: /health port: 8080 initialDelaySeconds: 30 periodSeconds: 10 readinessProbe: # 就绪探针 httpGet: path: /.well-known/agent.json port: 8080 initialDelaySeconds: 5 periodSeconds: 5 volumes: - name: tls-certs secret: secretName: a2a-tls-certs - name: agent-config configMap: name: agent-config --- apiVersion: v1 kind: Service metadata: name: financial-analysis-agent-service annotations: a2a.discovery/register: "true" # 发现服务注册 a2a.agent/card-url: "https://financial-analysis-agent/.well-known/agent.json" # 卡片URL spec: selector: app: financial-analysis-agent ports: - name: http port: 80 targetPort: 8080 - name: https port: 443 targetPort: 8443 type: LoadBalancer # 负载均衡器类型 --- apiVersion: v1 kind: ConfigMap # 配置映射 metadata: name: agent-config data: agent-card.json: | { "name": "Financial Analysis Agent", "version": "1.2.0", "description": "Specialized in financial data analysis and reporting", "protocol_version": "A2A/1.0", "capabilities": [ { "name": "analyze_financial_data", "description": "Analyze financial datasets and generate insights" } ] }
5.2 服务网格集成
# istio-a2a-config.yaml apiVersion: networking.istio.io/v1beta1 kind: VirtualService # 虚拟服务 metadata: name: a2a-agent-routing spec: hosts: - financial-analysis-agent http: - match: - uri: prefix: "/.well-known/agent.json" # 智能体卡片路径 route: - destination: host: financial-analysis-agent port: number: 80 headers: response: add: a2a-protocol-version: "1.0" cache-control: "public, max-age=300" # 缓存控制 - match: - uri: prefix: "/api/v1/" # API 路径 route: - destination: host: financial-analysis-agent port: number: 80 timeout: 30s # 超时 retries: # 重试 attempts: 3 perTryTimeout: 10s --- apiVersion: security.istio.io/v1beta1 kind: AuthorizationPolicy # 授权策略 metadata: name: a2a-agent-authz spec: selector: matchLabels: app: financial-analysis-agent rules: - from: - source: principals: ["cluster.local/ns/a2a-system/sa/agent-caller"] # 调用者身份 - to: - operation: methods: ["GET"] # GET 方法 paths: ["/.well-known/agent.json", "/health"] # 允许的路径 - to: - operation: methods: ["POST"] # POST 方法 paths: ["/api/v1/*"] # API 路径 when: - key: request.headers[authorization] # 授权头 values: ["Bearer *"] # Bearer Token
这个全面的 A2A 方案为构建可扩展、安全且可互操作的多智能体系统提供了坚实的基础。该协议的灵活性允许各种部署模式,同时在不同智能体实现之间保持标准化。
资料来源
-
Open Protocols for Agent Interoperability Part 4: Inter-Agent Communication on A2A | AWS Open Source Blog (关于 A2A 智能体间通信的开放协议第 4 部分 | AWS 开源博客)
-
Shaping the future of telco operations with an agentic AI collaboration approach | AWS for Industries (通过智能体 AI 协作方法塑造电信运营的未来 | AWS 行业应用)
-
Agent-to-agent protocols - AWS Prescriptive Guidance (智能体到智能体协议 - AWS 最佳实践指南)
-
Open Protocols for Agent Interoperability Part 1: Inter-Agent Communication on MCP | AWS Open Source Blog (关于 MCP 智能体间通信的开放协议第 1 部分 | AWS 开源博客)
-
Agentic protocols - AWS Prescriptive Guidance (智能体协议 - AWS 最佳实践指南)
更多推荐
所有评论(0)