1. A2A 协议架构概述
A2A 协议是一个标准化的通信框架,使得来自不同平台、组织和框架的智能体能够无缝通信。以下是完整的架构:

2. 核心 A2A 组件
2.1 智能体卡片(发现机制)
智能体卡片是描述智能体能力的标准化 JSON 文档:

{
  "name": "金融分析智能体",
  "version": "1.2.0",
  "description": "专注于金融数据分析和报告",
  "protocol_version": "A2A/1.0",
  "capabilities": [
    {
      "name": "analyze_financial_data",
      "description": "分析金融数据集并生成洞察",
      "parameters": {
        "dataset_url": {"type": "string", "required": true},
        "analysis_type": {"type": "string", "enum": ["trend", "forecast", "comparison"]},
        "time_period": {"type": "string", "format": "date-range"}
      },
      "returns": {
        "type": "object",
        "properties": {
          "insights": {"type": "array"},
          "charts": {"type": "array"},
          "recommendations": {"type": "array"}
        }
      }
    },
    {
      "name": "generate_report",
      "description": "生成格式化的财务报告",
      "parameters": {
        "analysis_id": {"type": "string", "required": true},
        "format": {"type": "string", "enum": ["pdf", "html", "excel"]}
      }
    }
  ],
  "endpoints": {
    "chat": "/api/v1/chat",
    "tasks": "/api/v1/tasks",
    "status": "/api/v1/status",
    "health": "/health"
  },
  "authentication": {
    "methods": ["oauth2", "api_key", "mtls"],
    "oauth2": {
      "authorization_url": "https://auth.company.com/oauth/authorize",
      "token_url": "https://auth.company.com/oauth/token",
      "scopes": ["financial_analysis", "report_generation"]
    }
  },
  "security": {
    "encryption": "TLS 1.3",
    "data_retention": "30 days",
    "compliance": ["SOC2", "GDPR", "HIPAA"]
  },
  "metadata": {
    "organization": "FinTech Corp",
    "contact": "api-support@fintech.com",
    "documentation": "https://docs.fintech.com/agents/financial-analysis",
    "pricing": "https://fintech.com/pricing/agents",
    "rate_limits": {
      "requests_per_minute": 100,
      "concurrent_tasks": 5
    }
  }
}

2.2 任务管理系统
A2A 使用结构化的任务对象处理复杂的工作流:

from dataclasses import dataclass
from typing import Dict, List, Any, Optional
from enum import Enum
import uuid
from datetime import datetime

class TaskStatus(Enum):
    PENDING = "pending" # 挂起
    RUNNING = "running" # 运行中
    COMPLETED = "completed" # 已完成
    FAILED = "failed" # 失败
    CANCELLED = "cancelled" # 已取消

class TaskPriority(Enum):
    LOW = "low" # 低
    NORMAL = "normal" # 普通
    HIGH = "high" # 高
    URGENT = "urgent" # 紧急

@dataclass
class A2ATask:
    """用于 A2A 通信的结构化任务"""
    id: str
    agent_id: str
    capability: str
    parameters: Dict[str, Any]
    status: TaskStatus = TaskStatus.PENDING
    priority: TaskPriority = TaskPriority.NORMAL
    created_at: datetime = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    result: Optional[Dict[str, Any]] = None
    error: Optional[str] = None
    dependencies: List[str] = None  # 此任务所依赖的任务 ID
    metadata: Dict[str, Any] = None
    
    def __post_init__(self):
        if self.id is None:
            self.id = str(uuid.uuid4())
        if self.created_at is None:
            self.created_at = datetime.utcnow()
        if self.dependencies is None:
            self.dependencies = []
        if self.metadata is None:
            self.metadata = {}

# 任务工作流示例
class A2ATaskManager:
    def __init__(self):
        self.tasks: Dict[str, A2ATask] = {}
        self.task_dependencies: Dict[str, List[str]] = {}
    
    def create_task(self, agent_id: str, capability: str, 
                   parameters: Dict[str, Any], **kwargs) -> A2ATask:
        """创建一个新的 A2A 任务"""
        task = A2ATask(
            id=str(uuid.uuid4()),
            agent_id=agent_id,
            capability=capability,
            parameters=parameters,
            **kwargs
        )
        self.tasks[task.id] = task
        return task
    
    def create_workflow(self, workflow_definition: List[Dict]) -> List[A2ATask]:
        """创建具有依赖关系的任务工作流"""
        tasks = []
        task_map = {}
        
        # 首先创建所有任务
        for step in workflow_definition:
            task = self.create_task(
                agent_id=step['agent_id'],
                capability=step['capability'],
                parameters=step['parameters'],
                priority=TaskPriority(step.get('priority', 'normal'))
            )
            tasks.append(task)
            task_map[step['step_id']] = task.id
        
        # 设置依赖关系
        for i, step in enumerate(workflow_definition):
            if 'depends_on' in step:
                dependency_ids = [task_map[dep] for dep in step['depends_on']]
                tasks[i].dependencies = dependency_ids
        
        return tasks

# 工作流定义示例
financial_analysis_workflow = [
    {
        "step_id": "data_collection", # 数据收集
        "agent_id": "data_collector_agent", # 数据收集器智能体
        "capability": "collect_financial_data", # 收集金融数据
        "parameters": {"sources": ["yahoo_finance", "bloomberg"], "symbols": ["AAPL", "GOOGL"]},
        "priority": "high" # 高
    },
    {
        "step_id": "data_analysis", # 数据分析
        "agent_id": "analysis_agent", # 分析智能体
        "capability": "analyze_financial_data", # 分析金融数据
        "parameters": {"analysis_type": "trend"}, # 趋势分析
        "depends_on": ["data_collection"] # 依赖于数据收集
    },
    {
        "step_id": "report_generation", # 报告生成
        "agent_id": "report_agent", # 报告智能体
        "capability": "generate_report", # 生成报告
        "parameters": {"format": "pdf"}, # PDF格式
        "depends_on": ["data_analysis"] # 依赖于数据分析
    }
]

2.3 安全与认证
A2A 提供具有多种认证方法的企业级安全性:

import jwt
import httpx
from typing import Optional
from datetime import datetime, timedelta

class A2ASecurityManager:
    """处理 A2A 认证和授权"""
    
    def __init__(self):
        self.auth_methods = {
            'oauth2': self.oauth2_auth,
            'api_key': self.api_key_auth,
            'mtls': self.mtls_auth,
            'jwt': self.jwt_auth
        }
    
    async def authenticate_agent(self, agent_card: dict, credentials: dict) -> dict:
        """与 A2A 智能体进行认证"""
        auth_config = agent_card.get('authentication', {})
        supported_methods = auth_config.get('methods', ['api_key'])
        
        # 按偏好顺序尝试认证方法
        for method in supported_methods:
            if method in credentials:
                try:
                    auth_result = await self.auth_methods[method](
                        agent_card, credentials[method]
                    )
                    return auth_result
                except Exception as e:
                    print(f"{method} 认证失败: {e}")
                    continue
        
        raise Exception("所有认证方法均失败")
    
    async def oauth2_auth(self, agent_card: dict, oauth_config: dict) -> dict:
        """OAuth 2.0 认证流程"""
        auth_config = agent_card['authentication']['oauth2']
        
        # 步骤 1: 获取授权码 (简化版)
        auth_url = auth_config['authorization_url']
        client_id = oauth_config['client_id']
        redirect_uri = oauth_config['redirect_uri']
        scopes = ' '.join(auth_config.get('scopes', []))
        
        # 步骤 2: 用 code 交换 token
        token_url = auth_config['token_url']
        token_data = {
            'grant_type': 'authorization_code',
            'client_id': client_id,
            'client_secret': oauth_config['client_secret'],
            'code': oauth_config['authorization_code'],
            'redirect_uri': redirect_uri
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(token_url, data=token_data)
            token_response = response.json()
        
        return {
            'method': 'oauth2',
            'access_token': token_response['access_token'],
            'token_type': token_response.get('token_type', 'Bearer'),
            'expires_at': datetime.utcnow() + timedelta(
                seconds=token_response.get('expires_in', 3600)
            )
        }
    
    async def api_key_auth(self, agent_card: dict, api_key: str) -> dict:
        """API 密钥认证"""
        return {
            'method': 'api_key',
            'api_key': api_key,
            'header_name': agent_card.get('authentication', {}).get('api_key_header', 'X-API-Key')
        }
    
    async def mtls_auth(self, agent_card: dict, mtls_config: dict) -> dict:
        """双向 TLS (mTLS) 认证"""
        return {
            'method': 'mtls',
            'cert_file': mtls_config['cert_file'],
            'key_file': mtls_config['key_file'],
            'ca_file': mtls_config.get('ca_file')
        }
    
    async def jwt_auth(self, agent_card: dict, jwt_config: dict) -> dict:
        """JWT 令牌认证"""
        payload = {
            'agent_id': jwt_config['agent_id'],
            'iat': datetime.utcnow(),
            'exp': datetime.utcnow() + timedelta(hours=1),
            'aud': agent_card['name']
        }
        
        token = jwt.encode(payload, jwt_config['private_key'], algorithm='RS256')
        
        return {
            'method': 'jwt',
            'token': token,
            'algorithm': 'RS256'
        }

# 使用示例
security_manager = A2ASecurityManager()

credentials = {
    'oauth2': {
        'client_id': 'your_client_id',
        'client_secret': 'your_client_secret',
        'authorization_code': 'auth_code_from_redirect',
        'redirect_uri': 'https://your-app.com/callback'
    },
    'api_key': 'your_api_key_here'
}

# 与智能体进行认证
auth_result = await security_manager.authenticate_agent(agent_card, credentials)

3. A2A 通信模式
3.1 同步请求-响应

class A2ASyncClient:
    """同步 A2A 通信"""
    
    async def call_agent_capability(self, agent_url: str, capability: str, 
                                  parameters: dict, auth: dict) -> dict:
        """同步调用特定的智能体能力"""
        
        headers = self.build_auth_headers(auth)
        
        request_payload = {
            "capability": capability,
            "parameters": parameters,
            "request_id": str(uuid.uuid4()),
            "timestamp": datetime.utcnow().isoformat(),
            "response_format": "json"
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{agent_url}/api/v1/capabilities/{capability}",
                json=request_payload,
                headers=headers,
                timeout=30.0
            )
            
            if response.status_code == 200:
                return response.json()
            else:
                raise Exception(f"智能体调用失败: {response.status_code} - {response.text}")
    
    def build_auth_headers(self, auth: dict) -> dict:
        """构建认证头"""
        headers = {"Content-Type": "application/json"}
        
        if auth['method'] == 'oauth2':
            headers['Authorization'] = f"{auth['token_type']} {auth['access_token']}"
        elif auth['method'] == 'api_key':
            headers[auth['header_name']] = auth['api_key']
        elif auth['method'] == 'jwt':
            headers['Authorization'] = f"Bearer {auth['token']}"
        
        return headers

3.2 基于任务的异步通信

class A2AAsyncClient:
    """具有任务管理的异步 A2A 通信"""
    
    async def submit_task(self, agent_url: str, task: A2ATask, auth: dict) -> str:
        """提交任务以进行异步执行"""
        
        headers = self.build_auth_headers(auth)
        
        task_payload = {
            "task_id": task.id,
            "capability": task.capability,
            "parameters": task.parameters,
            "priority": task.priority.value,
            "callback_url": f"https://your-agent.com/callbacks/{task.id}",
            "metadata": task.metadata
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{agent_url}/api/v1/tasks",
                json=task_payload,
                headers=headers
            )
            
            if response.status_code == 202:  # 已接受 (Accepted)
                result = response.json()
                return result['task_id']
            else:
                raise Exception(f"任务提交失败: {response.status_code}")
    
    async def get_task_status(self, agent_url: str, task_id: str, auth: dict) -> dict:
        """获取已提交任务的状态"""
        
        headers = self.build_auth_headers(auth)
        
        async with httpx.AsyncClient() as client:
            response = await client.get(
                f"{agent_url}/api/v1/tasks/{task_id}",
                headers=headers
            )
            
            return response.json()
    
    async def wait_for_task_completion(self, agent_url: str, task_id: str, 
                                     auth: dict, timeout: int = 300) -> dict:
        """通过轮询等待任务完成"""
        
        start_time = datetime.utcnow()
        
        while (datetime.utcnow() - start_time).seconds < timeout:
            status = await self.get_task_status(agent_url, task_id, auth)
            
            if status['status'] in ['completed', 'failed', 'cancelled']:
                return status
            
            await asyncio.sleep(5)  # 每 5 秒轮询一次
        
        raise TimeoutError(f"任务 {task_id} 在 {timeout} 秒内未完成")

3.3 流式通信

class A2AStreamingClient:
    """用于实时数据的流式 A2A 通信"""
    
    async def stream_capability(self, agent_url: str, capability: str, 
                              parameters: dict, auth: dict):
        """从智能体能力流式传输结果"""
        
        headers = self.build_auth_headers(auth)
        headers['Accept'] = 'text/event-stream'
        
        request_payload = {
            "capability": capability,
            "parameters": parameters,
            "stream": True
        }
        
        async with httpx.AsyncClient() as client:
            async with client.stream(
                'POST',
                f"{agent_url}/api/v1/capabilities/{capability}/stream",
                json=request_payload,
                headers=headers
            ) as response:
                
                async for chunk in response.aiter_text():
                    if chunk.startswith('data: '):
                        data = chunk[6:]  # 移除 'data: ' 前缀
                        if data.strip():
                            yield json.loads(data)

4. A2A 网络拓扑
4.1 中心辐射型(协调器模式)

class A2AOrchestrator:
    """管理多个专业智能体的中央协调器"""
    
    def __init__(self):
        self.registered_agents = {}
        self.task_manager = A2ATaskManager()
        self.security_manager = A2ASecurityManager()
    
    async def register_agent(self, agent_url: str, credentials: dict):
        """在网络中注册一个新智能体"""
        
        # 发现智能体能力
        agent_card = await self.discover_agent(agent_url)
        
        # 与智能体认证
        auth = await self.security_manager.authenticate_agent(agent_card, credentials)
        
        # 存储智能体信息
        self.registered_agents[agent_card['name']] = {
            'url': agent_url,
            'card': agent_card,
            'auth': auth,
            'last_seen': datetime.utcnow()
        }
        
        print(f"已注册智能体: {agent_card['name']}")
    
    async def execute_workflow(self, workflow_definition: List[Dict]) -> Dict[str, Any]:
        """执行多智能体工作流"""
        
        # 创建工作流任务
        tasks = self.task_manager.create_workflow(workflow_definition)
        
        # 按照依赖关系执行任务
        results = {}
        completed_tasks = set()
        
        while len(completed_tasks) < len(tasks):
            for task in tasks:
                if task.id in completed_tasks:
                    continue
                
                # 检查依赖关系是否满足
                if all(dep_id in completed_tasks for dep_id in task.dependencies):
                    # 执行任务
                    agent_info = self.registered_agents[task.agent_id]
                    
                    try:
                        result = await self.execute_task(task, agent_info)
                        results[task.id] = result
                        completed_tasks.add(task.id)
                        task.status = TaskStatus.COMPLETED
                        task.completed_at = datetime.utcnow()
                        
                    except Exception as e:
                        task.status = TaskStatus.FAILED
                        task.error = str(e)
                        results[task.id] = {"error": str(e)}
                        completed_tasks.add(task.id)  # 即使失败也标记为已完成
        
        return results
    
    async def execute_task(self, task: A2ATask, agent_info: dict) -> dict:
        """在智能体上执行单个任务"""
        
        client = A2ASyncClient()
        
        result = await client.call_agent_capability(
            agent_url=agent_info['url'],
            capability=task.capability,
            parameters=task.parameters,
            auth=agent_info['auth']
        )
        
        return result

4.2 点对点(网状网络)

class A2APeerNetwork:
    """智能体直接通信的点对点 A2A 网络"""
    
    def __init__(self, agent_id: str):
        self.agent_id = agent_id
        self.peer_registry = {}
        self.message_router = A2AMessageRouter()
    
    async def discover_peers(self, discovery_service_url: str):
        """发现网络中的其他智能体"""
        
        async with httpx.AsyncClient() as client:
            response = await client.get(f"{discovery_service_url}/agents")
            agents = response.json()
            
            for agent in agents:
                if agent['id'] != self.agent_id:
                    self.peer_registry[agent['id']] = {
                        'url': agent['url'],
                        'capabilities': agent['capabilities'],
                        'last_seen': datetime.utcnow()
                    }
    
    async def broadcast_message(self, message: dict, exclude_agents: List[str] = None):
        """向所有对等点广播消息"""
        
        exclude_agents = exclude_agents or []
        
        for agent_id, peer_info in self.peer_registry.items():
            if agent_id not in exclude_agents:
                try:
                    await self.send_message_to_peer(agent_id, message)
                except Exception as e:
                    print(f"向 {agent_id} 发送消息失败: {e}")
    
    async def send_message_to_peer(self, peer_id: str, message: dict):
        """向特定对等点发送消息"""
        
        peer_info = self.peer_registry.get(peer_id)
        if not peer_info:
            raise Exception(f"在注册表中未找到对等点 {peer_id}")
        
        message_payload = {
            "from_agent": self.agent_id,
            "to_agent": peer_id,
            "message": message,
            "timestamp": datetime.utcnow().isoformat(),
            "message_id": str(uuid.uuid4())
        }
        
        async with httpx.AsyncClient() as client:
            response = await client.post(
                f"{peer_info['url']}/api/v1/messages",
                json=message_payload
            )
            
            return response.json()

class A2AMessageRouter:
    """在对等网络中的智能体之间路由消息"""
    
    def __init__(self):
        self.routing_table = {}
        self.message_handlers = {}
    
    def register_handler(self, message_type: str, handler_func):
        """为特定消息类型注册处理程序"""
        self.message_handlers[message_type] = handler_func
    
    async def route_message(self, message: dict) -> dict:
        """将传入消息路由到相应的处理程序"""
        
        message_type = message.get('type', 'unknown')
        
        if message_type in self.message_handlers:
            handler = self.message_handlers[message_type]
            return await handler(message)
        else:
            return {"error": f"没有针对消息类型 {message_type} 的处理程序"}

5. A2A 生产部署
5.1 Kubernetes 部署

# a2a-agent-deployment.yaml
apiVersion: apps/v1
kind: Deployment
metadata:
  name: financial-analysis-agent # 金融分析智能体
  labels:
    app: financial-analysis-agent
    protocol: a2a
spec:
  replicas: 3 # 副本数
  selector:
    matchLabels:
      app: financial-analysis-agent
  template:
    metadata:
      labels:
        app: financial-analysis-agent
      annotations: # 注解
        a2a.protocol/version: "1.0"
        a2a.agent/capabilities: "financial_analysis,report_generation" # 能力
    spec:
      containers:
      - name: agent
        image: your-registry/financial-analysis-agent:v1.2.0 # 镜像
        ports:
        - containerPort: 8080
          name: http
        - containerPort: 8443
          name: https
        env: # 环境变量
        - name: A2A_AGENT_ID
          value: "financial-analysis-agent"
        - name: A2A_PROTOCOL_VERSION
          value: "1.0"
        - name: OAUTH2_CLIENT_ID
          valueFrom:
            secretKeyRef:
              name: a2a-auth-secrets # 认证密钥
              key: oauth2-client-id
        - name: OAUTH2_CLIENT_SECRET
          valueFrom:
            secretKeyRef:
              name: a2a-auth-secrets
              key: oauth2-client-secret
        volumeMounts: # 卷挂载
        - name: tls-certs # TLS 证书
          mountPath: /etc/tls
          readOnly: true
        - name: agent-config # 智能体配置
          mountPath: /etc/agent
          readOnly: true
        livenessProbe: # 存活探针
          httpGet:
            path: /health
            port: 8080
          initialDelaySeconds: 30
          periodSeconds: 10
        readinessProbe: # 就绪探针
          httpGet:
            path: /.well-known/agent.json
            port: 8080
          initialDelaySeconds: 5
          periodSeconds: 5
      volumes:
      - name: tls-certs
        secret:
          secretName: a2a-tls-certs
      - name: agent-config
        configMap:
          name: agent-config
---
apiVersion: v1
kind: Service
metadata:
  name: financial-analysis-agent-service
  annotations:
    a2a.discovery/register: "true" # 发现服务注册
    a2a.agent/card-url: "https://financial-analysis-agent/.well-known/agent.json" # 卡片URL
spec:
  selector:
    app: financial-analysis-agent
  ports:
  - name: http
    port: 80
    targetPort: 8080
  - name: https
    port: 443
    targetPort: 8443
  type: LoadBalancer # 负载均衡器类型
---
apiVersion: v1
kind: ConfigMap # 配置映射
metadata:
  name: agent-config
data:
  agent-card.json: |
    {
      "name": "Financial Analysis Agent",
      "version": "1.2.0",
      "description": "Specialized in financial data analysis and reporting",
      "protocol_version": "A2A/1.0",
      "capabilities": [
        {
          "name": "analyze_financial_data",
          "description": "Analyze financial datasets and generate insights"
        }
      ]
    }

5.2 服务网格集成

# istio-a2a-config.yaml
apiVersion: networking.istio.io/v1beta1
kind: VirtualService # 虚拟服务
metadata:
  name: a2a-agent-routing
spec:
  hosts:
  - financial-analysis-agent
  http:
  - match:
    - uri:
        prefix: "/.well-known/agent.json" # 智能体卡片路径
    route:
    - destination:
        host: financial-analysis-agent
        port:
          number: 80
    headers:
      response:
        add:
          a2a-protocol-version: "1.0"
          cache-control: "public, max-age=300" # 缓存控制
  - match:
    - uri:
        prefix: "/api/v1/" # API 路径
    route:
    - destination:
        host: financial-analysis-agent
        port:
          number: 80
    timeout: 30s # 超时
    retries: # 重试
      attempts: 3
      perTryTimeout: 10s
---
apiVersion: security.istio.io/v1beta1
kind: AuthorizationPolicy # 授权策略
metadata:
  name: a2a-agent-authz
spec:
  selector:
    matchLabels:
      app: financial-analysis-agent
  rules:
  - from:
    - source:
        principals: ["cluster.local/ns/a2a-system/sa/agent-caller"] # 调用者身份
  - to:
    - operation:
        methods: ["GET"] # GET 方法
        paths: ["/.well-known/agent.json", "/health"] # 允许的路径
  - to:
    - operation:
        methods: ["POST"] # POST 方法
        paths: ["/api/v1/*"] # API 路径
    when:
    - key: request.headers[authorization] # 授权头
      values: ["Bearer *"] # Bearer Token

这个全面的 A2A 方案为构建可扩展、安全且可互操作的多智能体系统提供了坚实的基础。该协议的灵活性允许各种部署模式,同时在不同智能体实现之间保持标准化。

资料来源

  • Open Protocols for Agent Interoperability Part 4: Inter-Agent Communication on A2A | AWS Open Source Blog (关于 A2A 智能体间通信的开放协议第 4 部分 | AWS 开源博客)

  • Shaping the future of telco operations with an agentic AI collaboration approach | AWS for Industries (通过智能体 AI 协作方法塑造电信运营的未来 | AWS 行业应用)

  • Agent-to-agent protocols - AWS Prescriptive Guidance (智能体到智能体协议 - AWS 最佳实践指南)

  • Open Protocols for Agent Interoperability Part 1: Inter-Agent Communication on MCP | AWS Open Source Blog (关于 MCP 智能体间通信的开放协议第 1 部分 | AWS 开源博客)

  • Agentic protocols - AWS Prescriptive Guidance (智能体协议 - AWS 最佳实践指南)


Logo

更多推荐