1. Agent通信基础设施概述

Agent通信基础设施是多Agent系统的核心组件,它提供了Agent之间进行可靠通信的机制。通信基础设施通常包括以下组件:

1.1 消息传递系统

  • 负责在Agent之间传递消息
  • 提供可靠的、有序的消息传输
  • 支持同步和异步通信

1.2 Agent目录服务

  • 维护系统中所有Agent的信息
  • 提供Agent发现和定位功能
  • 支持Agent注册和注销

1.3 通信协议栈

  • 实现各种通信协议
  • 提供消息编码和解码
  • 处理错误和异常情况

2. 消息队列系统

消息队列系统是Agent通信基础设施的重要组成部分,它提供了一个异步的消息传递机制。主要特点包括:

2.1 解耦性

发送者和接收者不需要同时在线,降低了系统的耦合度。

2.2 可靠性

消息在传输过程中会被持久化,确保不会丢失。

2.3 异步性

发送者可以继续执行其他任务,无需等待接收者处理消息。

3. Agent目录服务

Agent目录服务(也称为黄页服务)是多Agent系统的命名和发现服务。其主要功能包括:

3.1 Agent注册

新加入系统的Agent需要在目录服务中注册自己。

3.2 Agent发现

Agent可以通过目录服务查找其他Agent的信息。

3.3 服务发现

不仅限于Agent发现,还可以发现特定服务或能力。

设计思路

设计Agent通信基础设施需要考虑:

  1. 消息的可靠传递
  2. Agent的动态注册和发现
  3. 系统的可扩展性
  4. 错误处理和恢复机制

代码实现

下面我们将实现完整的Agent通信基础设施:

import queue
import threading
import time
import json
from datetime import datetime
from typing import Dict, List, Any, Optional, Callable
from enum import Enum
import uuid
import weakref
from concurrent.futures import ThreadPoolExecutor
class MessageType(Enum):
"""消息类型枚举"""
REQUEST = "request"
INFORM = "inform"
CONFIRM = "confirm"
FAILURE = "failure"
CANCEL = "cancel"
CUSTOM = "custom"
class Message:
"""消息类"""
def __init__(self,
msg_id: str,
sender: str,
receiver: str,
msg_type: MessageType,
content: Any,
conversation_id: Optional[str] = None,
reply_to: Optional[str] = None,
timestamp: Optional[datetime] = None,
priority: int = 1):
self.msg_id = msg_id
self.sender = sender
self.receiver = receiver
self.msg_type = msg_type
self.content = content
self.conversation_id = conversation_id or f"conv_{uuid.uuid4().hex[:8]}"
self.reply_to = reply_to
self.timestamp = timestamp or datetime.now()
self.priority = priority
self.attempts = 0
self.max_attempts = 3  # 最大重试次数
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'msg_id': self.msg_id,
'sender': self.sender,
'receiver': self.receiver,
'msg_type': self.msg_type.value,
'content': self.content,
'conversation_id': self.conversation_id,
'reply_to': self.reply_to,
'timestamp': self.timestamp.isoformat(),
'priority': self.priority,
'attempts': self.attempts
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'Message':
"""从字典创建消息"""
msg_type = MessageType(data['msg_type'])
timestamp = datetime.fromisoformat(data['timestamp'])
msg = cls(
msg_id=data['msg_id'],
sender=data['sender'],
receiver=data['receiver'],
msg_type=msg_type,
content=data['content'],
conversation_id=data.get('conversation_id'),
reply_to=data.get('reply_to'),
timestamp=timestamp,
priority=data.get('priority', 1)
)
msg.attempts = data.get('attempts', 0)
return msg
def __str__(self) -> str:
return f"Message(id={self.msg_id}, type={self.msg_type.value}, from={self.sender}, to={self.receiver})"
class MessageQueue:
"""消息队列系统"""
def __init__(self, max_size: int = 1000):
self.queues: Dict[str, queue.PriorityQueue] = {}
self.max_size = max_size
self.lock = threading.RLock()
self.stats = {
'sent_messages': 0,
'received_messages': 0,
'failed_deliveries': 0,
'queue_creations': 0
}
def create_queue(self, agent_id: str) -> bool:
"""为Agent创建消息队列"""
with self.lock:
if agent_id not in self.queues:
self.queues[agent_id] = queue.PriorityQueue(maxsize=self.max_size)
self.stats['queue_creations'] += 1
return True
return False
def destroy_queue(self, agent_id: str) -> bool:
"""销毁Agent的消息队列"""
with self.lock:
if agent_id in self.queues:
del self.queues[agent_id]
return True
return False
def send_message(self, receiver: str, message: Message) -> bool:
"""发送消息到指定Agent的队列"""
with self.lock:
if receiver in self.queues:
try:
# 使用优先级队列,优先级数值越小优先级越高
priority_item = (-message.priority, time.time(), message)
self.queues[receiver].put_nowait(priority_item)
self.stats['sent_messages'] += 1
return True
except queue.Full:
print(f"警告: {receiver}的队列已满,消息发送失败")
self.stats['failed_deliveries'] += 1
return False
else:
print(f"错误: 未找到接收者 {receiver} 的消息队列")
self.stats['failed_deliveries'] += 1
return False
def receive_message(self, agent_id: str, timeout: float = 1.0) -> Optional[Message]:
"""从Agent的队列接收消息"""
with self.lock:
if agent_id in self.queues:
try:
priority, _, message = self.queues[agent_id].get(timeout=timeout)
self.stats['received_messages'] += 1
return message
except queue.Empty:
return None
return None
def get_queue_size(self, agent_id: str) -> int:
"""获取指定队列的大小"""
with self.lock:
if agent_id in self.queues:
return self.queues[agent_id].qsize()
return 0
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
stats_copy = self.stats.copy()
stats_copy['queue_count'] = len(self.queues)
return stats_copy
class AgentInfo:
"""Agent信息类"""
def __init__(self,
agent_id: str,
name: str,
capabilities: List[str],
status: str = "active",
address: Optional[str] = None,
last_seen: Optional[datetime] = None):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.status = status
self.address = address
self.last_seen = last_seen or datetime.now()
self.registration_time = datetime.now()
self.properties: Dict[str, Any] = {}
def to_dict(self) -> Dict[str, Any]:
"""转换为字典"""
return {
'agent_id': self.agent_id,
'name': self.name,
'capabilities': self.capabilities,
'status': self.status,
'address': self.address,
'last_seen': self.last_seen.isoformat(),
'registration_time': self.registration_time.isoformat(),
'properties': self.properties
}
@classmethod
def from_dict(cls, data: Dict[str, Any]) -> 'AgentInfo':
"""从字典创建AgentInfo"""
info = cls(
agent_id=data['agent_id'],
name=data['name'],
capabilities=data['capabilities'],
status=data['status'],
address=data.get('address'),
last_seen=datetime.fromisoformat(data['last_seen'])
)
info.registration_time = datetime.fromisoformat(data['registration_time'])
info.properties = data.get('properties', {})
return info
class AgentDirectory:
"""Agent目录服务"""
def __init__(self, cleanup_interval: int = 300):  # 5分钟清理间隔
self.agents: Dict[str, AgentInfo] = {}
self.capability_index: Dict[str, List[str]] = {}
self.name_index: Dict[str, List[str]] = {}
self.lock = threading.RLock()
self.cleanup_interval = cleanup_interval
self.running = True
self.cleanup_timer = threading.Timer(cleanup_interval, self._cleanup_inactive_agents)
self.cleanup_timer.start()
self.stats = {
'registrations': 0,
'unregistrations': 0,
'lookups': 0,
'updates': 0
}
def register_agent(self, agent_info: AgentInfo) -> bool:
"""注册Agent"""
with self.lock:
old_info = self.agents.get(agent_info.agent_id)
self.agents[agent_info.agent_id] = agent_info
# 更新索引
self._update_indices(agent_info, old_info)
self.stats['registrations'] += 1
return True
def unregister_agent(self, agent_id: str) -> bool:
"""注销Agent"""
with self.lock:
if agent_id in self.agents:
agent_info = self.agents[agent_id]
# 从索引中移除
self._remove_from_indices(agent_info)
del self.agents[agent_id]
self.stats['unregistrations'] += 1
return True
return False
def update_agent_status(self, agent_id: str, status: str) -> bool:
"""更新Agent状态"""
with self.lock:
if agent_id in self.agents:
self.agents[agent_id].status = status
self.agents[agent_id].last_seen = datetime.now()
self.stats['updates'] += 1
return True
return False
def get_agent_info(self, agent_id: str) -> Optional[AgentInfo]:
"""获取Agent信息"""
with self.lock:
self.stats['lookups'] += 1
return self.agents.get(agent_id)
def find_agents_by_capability(self, capability: str) -> List[AgentInfo]:
"""根据能力查找Agents"""
with self.lock:
self.stats['lookups'] += 1
agent_ids = self.capability_index.get(capability, [])
return [self.agents[aid] for aid in agent_ids if aid in self.agents]
def find_agents_by_name(self, name: str) -> List[AgentInfo]:
"""根据名称查找Agents"""
with self.lock:
self.stats['lookups'] += 1
agent_ids = self.name_index.get(name, [])
return [self.agents[aid] for aid in agent_ids if aid in self.agents]
def find_agents(self, criteria: Dict[str, Any] = None) -> List[AgentInfo]:
"""根据条件查找Agents"""
with self.lock:
self.stats['lookups'] += 1
result = list(self.agents.values())
if criteria:
for key, value in criteria.items():
if key == 'capability':
result = [agent for agent in result if value in agent.capabilities]
elif key == 'status':
result = [agent for agent in result if agent.status == value]
elif key == 'name':
result = [agent for agent in result if agent.name == value]
return result
def _update_indices(self, new_info: AgentInfo, old_info: Optional[AgentInfo]):
"""更新索引"""
# 更新能力索引
if old_info:
# 移除旧的能力索引
for cap in old_info.capabilities:
if cap in self.capability_index:
if new_info.agent_id in self.capability_index[cap]:
self.capability_index[cap].remove(new_info.agent_id)
for cap in new_info.capabilities:
if cap not in self.capability_index:
self.capability_index[cap] = []
if new_info.agent_id not in self.capability_index[cap]:
self.capability_index[cap].append(new_info.agent_id)
# 更新名称索引
if old_info and old_info.name != new_info.name:
# 从旧名称索引中移除
if old_info.name in self.name_index:
if new_info.agent_id in self.name_index[old_info.name]:
self.name_index[old_info.name].remove(new_info.agent_id)
if new_info.name not in self.name_index:
self.name_index[new_info.name] = []
if new_info.agent_id not in self.name_index[new_info.name]:
self.name_index[new_info.name].append(new_info.agent_id)
def _remove_from_indices(self, agent_info: AgentInfo):
"""从索引中移除Agent"""
# 从能力索引中移除
for cap in agent_info.capabilities:
if cap in self.capability_index:
if agent_info.agent_id in self.capability_index[cap]:
self.capability_index[cap].remove(agent_info.agent_id)
# 从名称索引中移除
if agent_info.name in self.name_index:
if agent_info.agent_id in self.name_index[agent_info.name]:
self.name_index[agent_info.name].remove(agent_info.agent_id)
def _cleanup_inactive_agents(self):
"""清理不活跃的Agents(定时任务)"""
if not self.running:
return
# 标记为1小时前最后活动的Agent为不活跃
cutoff_time = datetime.now() - timedelta(minutes=60)  # 1小时
with self.lock:
inactive_agents = []
for agent_id, agent_info in self.agents.items():
if agent_info.last_seen < cutoff_time and agent_info.status != 'inactive':
inactive_agents.append(agent_id)
for agent_id in inactive_agents:
self.agents[agent_id].status = 'inactive'
print(f"标记Agent {agent_id} 为不活跃状态")
# 重新启动定时器
if self.running:
self.cleanup_timer = threading.Timer(self.cleanup_interval, self._cleanup_inactive_agents)
self.cleanup_timer.start()
def stop_cleanup(self):
"""停止清理定时器"""
self.running = False
if hasattr(self, 'cleanup_timer'):
self.cleanup_timer.cancel()
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
stats_copy = self.stats.copy()
stats_copy['agent_count'] = len(self.agents)
stats_copy['capability_count'] = len(self.capability_index)
return stats_copy
class CommunicationMiddleware:
"""通信中间件"""
def __init__(self, message_queue: MessageQueue, directory: AgentDirectory):
self.message_queue = message_queue
self.directory = directory
self.handlers: Dict[MessageType, List[Callable]] = {}
self.running = True
self.executor = ThreadPoolExecutor(max_workers=10)
self.stats = {
'processed_messages': 0,
'handler_errors': 0,
'delivery_attempts': 0
}
self.lock = threading.RLock()
def register_handler(self, msg_type: MessageType, handler: Callable):
"""注册消息处理器"""
with self.lock:
if msg_type not in self.handlers:
self.handlers[msg_type] = []
self.handlers[msg_type].append(handler)
def send_message(self, message: Message) -> bool:
"""发送消息"""
with self.lock:
self.stats['delivery_attempts'] += 1
# 检查接收者是否存在
receiver_info = self.directory.get_agent_info(message.receiver)
if not receiver_info:
print(f"错误: 接收者 {message.receiver} 不存在")
return False
# 检查接收者状态
if receiver_info.status != 'active':
print(f"警告: 接收者 {message.receiver} 不处于活跃状态")
# 可以选择排队等待或直接失败
# 这里我们选择直接失败
return False
# 发送到消息队列
success = self.message_queue.send_message(message.receiver, message)
if success:
print(f"消息已发送: {message}")
else:
print(f"消息发送失败: {message}")
return success
def broadcast_message(self, sender: str, msg_type: MessageType, content: Any,
filter_func: Optional[Callable[[AgentInfo], bool]] = None) -> int:
"""广播消息到多个Agents"""
# 获取所有活跃的Agents
all_agents = self.directory.find_agents({'status': 'active'})
# 应用过滤器
if filter_func:
filtered_agents = [agent for agent in all_agents if filter_func(agent)]
else:
filtered_agents = all_agents
sent_count = 0
for agent in filtered_agents:
if agent.agent_id != sender:  # 不发送给自己
msg = Message(
msg_id=f"broadcast_{uuid.uuid4().hex[:8]}",
sender=sender,
receiver=agent.agent_id,
msg_type=msg_type,
content=content
)
if self.send_message(msg):
sent_count += 1
print(f"广播消息发送给 {sent_count} 个Agents")
return sent_count
def start_listening(self, agent_id: str, callback: Callable[[Message], None]):
"""开始监听消息"""
def listener():
while self.running:
try:
message = self.message_queue.receive_message(agent_id, timeout=1.0)
if message:
self._process_message(message, callback)
except Exception as e:
print(f"监听消息时发生错误: {e}")
# 在单独的线程中启动监听
listener_thread = threading.Thread(target=listener, daemon=True)
listener_thread.start()
return listener_thread
def _process_message(self, message: Message, callback: Callable[[Message], None]):
"""处理接收到的消息"""
with self.lock:
self.stats['processed_messages'] += 1
try:
# 调用回调函数处理消息
callback(message)
# 更新发送者状态
self.directory.update_agent_status(message.sender, 'active')
except Exception as e:
with self.lock:
self.stats['handler_errors'] += 1
print(f"处理消息时发生错误: {e}, 消息: {message}")
def get_stats(self) -> Dict[str, Any]:
"""获取统计信息"""
with self.lock:
stats_copy = self.stats.copy()
return stats_copy
def shutdown(self):
"""关闭中间件"""
self.running = False
self.executor.shutdown(wait=True)
print("通信中间件已关闭")
from datetime import timedelta
class AgentCommunicationFramework:
"""Agent通信框架"""
def __init__(self):
self.message_queue = MessageQueue()
self.directory = AgentDirectory()
self.middleware = CommunicationMiddleware(self.message_queue, self.directory)
self.agents: Dict[str, 'BaseAgent'] = {}
self.running = True
def register_agent(self, agent: 'BaseAgent') -> bool:
"""注册Agent到框架"""
# 创建Agent信息
agent_info = AgentInfo(
agent_id=agent.agent_id,
name=agent.name,
capabilities=agent.capabilities,
status='active'
)
# 注册到目录服务
success = self.directory.register_agent(agent_info)
if success:
# 为Agent创建消息队列
self.message_queue.create_queue(agent.agent_id)
# 将Agent添加到框架管理
self.agents[agent.agent_id] = agent
# 启动Agent的消息监听
agent.start_listening(self.middleware)
print(f"Agent {agent.name} ({agent.agent_id}) 已注册")
return success
def unregister_agent(self, agent_id: str) -> bool:
"""从框架注销Agent"""
success = self.directory.unregister_agent(agent_id)
if success:
# 销毁消息队列
self.message_queue.destroy_queue(agent_id)
# 从框架管理中移除
if agent_id in self.agents:
del self.agents[agent_id]
print(f"Agent {agent_id} 已注销")
return success
def send_message(self, message: Message) -> bool:
"""发送消息"""
return self.middleware.send_message(message)
def broadcast_message(self, sender: str, msg_type: MessageType, content: Any,
filter_func: Optional[Callable[[AgentInfo], bool]] = None) -> int:
"""广播消息"""
return self.middleware.broadcast_message(sender, msg_type, content, filter_func)
def find_agents_by_capability(self, capability: str) -> List[AgentInfo]:
"""根据能力查找Agents"""
return self.directory.find_agents_by_capability(capability)
def get_stats(self) -> Dict[str, Any]:
"""获取框架统计信息"""
return {
'message_queue': self.message_queue.get_stats(),
'directory': self.directory.get_stats(),
'middleware': self.middleware.get_stats(),
'registered_agents': len(self.agents)
}
def shutdown(self):
"""关闭框架"""
self.running = False
self.middleware.shutdown()
self.directory.stop_cleanup()
print("Agent通信框架已关闭")
class BaseAgent:
"""基础Agent类"""
def __init__(self, agent_id: str, name: str, capabilities: List[str]):
self.agent_id = agent_id
self.name = name
self.capabilities = capabilities
self.framework: Optional[AgentCommunicationFramework] = None
self.listening_thread = None
self.running = True
def connect_to_framework(self, framework: AgentCommunicationFramework):
"""连接到通信框架"""
self.framework = framework
return framework.register_agent(self)
def disconnect_from_framework(self):
"""断开与通信框架的连接"""
if self.framework:
return self.framework.unregister_agent(self.agent_id)
return False
def send_message(self, receiver: str, msg_type: MessageType, content: Any) -> bool:
"""发送消息"""
if not self.framework:
print("错误: Agent未连接到通信框架")
return False
message = Message(
msg_id=f"msg_{uuid.uuid4().hex[:8]}",
sender=self.agent_id,
receiver=receiver,
msg_type=msg_type,
content=content
)
return self.framework.send_message(message)
def broadcast_message(self, msg_type: MessageType, content: Any,
filter_func: Optional[Callable[[AgentInfo], bool]] = None) -> int:
"""广播消息"""
if not self.framework:
print("错误: Agent未连接到通信框架")
return 0
return self.framework.broadcast_message(self.agent_id, msg_type, content, filter_func)
def start_listening(self, middleware: CommunicationMiddleware):
"""开始监听消息"""
def message_callback(message: Message):
self.handle_message(message)
self.listening_thread = middleware.start_listening(self.agent_id, message_callback)
def handle_message(self, message: Message):
"""处理接收到的消息"""
print(f"[{self.name}] 收到消息: {message.msg_type.value} from {message.sender}")
print(f"  内容: {message.content}")
# 根据消息类型处理
if message.msg_type == MessageType.REQUEST:
self.handle_request(message)
elif message.msg_type == MessageType.INFORM:
self.handle_inform(message)
elif message.msg_type == MessageType.CONFIRM:
self.handle_confirm(message)
else:
print(f"  未知消息类型: {message.msg_type}")
def handle_request(self, message: Message):
"""处理请求消息"""
print(f"  [{self.name}] 处理请求: {message.content}")
# 子类应重写此方法
pass
def handle_inform(self, message: Message):
"""处理通知消息"""
print(f"  [{self.name}] 收到通知: {message.content}")
# 子类应重写此方法
pass
def handle_confirm(self, message: Message):
"""处理确认消息"""
print(f"  [{self.name}] 收到确认: {message.content}")
# 子类应重写此方法
pass
def find_agents_by_capability(self, capability: str) -> List[AgentInfo]:
"""根据能力查找其他Agents"""
if self.framework:
return self.framework.find_agents_by_capability(capability)
return []
class DataProcessingAgent(BaseAgent):
"""数据处理Agent"""
def __init__(self, agent_id: str, name: str):
super().__init__(agent_id, name, ['data_processing', 'analysis'])
self.processed_data_count = 0
def handle_request(self, message: Message):
"""处理数据处理请求"""
if isinstance(message.content, dict) and 'data' in message.content:
data = message.content['data']
operation = message.content.get('operation', 'process')
print(f"  [{self.name}] 开始处理数据,操作: {operation}")
# 模拟数据处理
processed_data = self._simulate_data_processing(data, operation)
# 发送处理结果
result_msg = Message(
msg_id=f"result_{uuid.uuid4().hex[:8]}",
sender=self.agent_id,
receiver=message.sender,
msg_type=MessageType.INFORM,
content={
'status': 'success',
'result': processed_data,
'original_request': message.msg_id
}
)
self.processed_data_count += 1
print(f"  [{self.name}] 数据处理完成,发送结果")
self.send_message(message.sender, MessageType.INFORM, result_msg.content)
else:
error_msg = {
'status': 'error',
'error': '无效的数据处理请求',
'original_request': message.msg_id
}
self.send_message(message.sender, MessageType.FAILURE, error_msg)
def _simulate_data_processing(self, data, operation):
"""模拟数据处理"""
time.sleep(0.5)  # 模拟处理时间
if operation == 'sum':
return sum(data) if isinstance(data, list) else "无法求和"
elif operation == 'count':
return len(data) if isinstance(data, (list, str)) else "无法计数"
else:
return f"处理了{len(str(data))}个字符的数据"
class MonitoringAgent(BaseAgent):
"""监控Agent"""
def __init__(self, agent_id: str, name: str):
super().__init__(agent_id, name, ['monitoring', 'status_reporting'])
self.monitored_agents = set()
self.status_reports = []
def handle_inform(self, message: Message):
"""处理状态报告"""
if isinstance(message.content, dict) and 'status' in message.content:
status_info = message.content
agent_id = message.sender
print(f"  [{self.name}] 收到 {agent_id} 的状态报告: {status_info['status']}")
# 记录状态报告
report = {
'timestamp': datetime.now(),
'agent_id': agent_id,
'status': status_info['status'],
'details': status_info.get('details', {})
}
self.status_reports.append(report)
def start_monitoring(self, agent_id: str):
"""开始监控指定Agent"""
self.monitored_agents.add(agent_id)
print(f"  [{self.name}] 开始监控Agent: {agent_id}")
def request_status(self, agent_id: str):
"""请求指定Agent的状态"""
request_content = {
'request_type': 'status',
'target_agent': agent_id
}
self.send_message(agent_id, MessageType.REQUEST, request_content)
class CoordinatorAgent(BaseAgent):
"""协调Agent"""
def __init__(self, agent_id: str, name: str):
super().__init__(agent_id, name, ['coordination', 'task_management'])
self.assigned_tasks = {}
self.task_results = {}
def assign_task(self, agent_id: str, task: Dict[str, Any]):
"""分配任务给其他Agent"""
task_id = f"task_{uuid.uuid4().hex[:8]}"
self.assigned_tasks[task_id] = {
'agent_id': agent_id,
'task': task,
'assigned_at': datetime.now()
}
print(f"  [{self.name}] 分配任务 {task_id} 给 {agent_id}")
# 发送任务请求
task_request = {
'task_id': task_id,
'task_details': task,
'assigned_by': self.agent_id
}
self.send_message(agent_id, MessageType.REQUEST, task_request)
def handle_inform(self, message: Message):
"""处理结果通知"""
if isinstance(message.content, dict) and 'status' in message.content:
if message.content['status'] == 'success':
original_request = message.content.get('original_request', '')
print(f"  [{self.name}] 收到任务结果: {original_request}")
# 存储结果
self.task_results[original_request] = message.content.get('result')
def demo_agent_communication():
"""演示Agent通信"""
print("=== Agent通信基础设施演示 ===")
# 创建通信框架
framework = AgentCommunicationFramework()
# 创建Agents
data_processor = DataProcessingAgent("processor_001", "数据处理器")
monitor = MonitoringAgent("monitor_001", "系统监控器")
coordinator = CoordinatorAgent("coordinator_001", "任务协调器")
# 注册Agents到框架
print("\n1. 注册Agents:")
data_processor.connect_to_framework(framework)
monitor.connect_to_framework(framework)
coordinator.connect_to_framework(framework)
# 等待注册完成
time.sleep(0.5)
print(f"\n2. 当前注册的Agents:")
for agent_id, agent in framework.agents.items():
print(f"  - {agent.name} ({agent_id}): {agent.capabilities}")
print(f"\n3. 按能力查找Agents:")
processing_agents = framework.find_agents_by_capability('data_processing')
print(f"  数据处理能力的Agents: {[agent.name for agent in processing_agents]}")
monitoring_agents = framework.find_agents_by_capability('monitoring')
print(f"  监控能力的Agents: {[agent.name for agent in monitoring_agents]}")
print(f"\n4. Agent间通信演示:")
# 数据处理器向监控器发送状态报告
print(f"\n4.1 数据处理器发送状态报告:")
status_report = {
'status': 'active',
'details': {
'processed_count': data_processor.processed_data_count,
'uptime': '2 hours',
'load': 'low'
}
}
data_processor.send_message(monitor.agent_id, MessageType.INFORM, status_report)
# 协调器向数据处理器发送任务请求
print(f"\n4.2 协调器发送数据处理任务:")
task_request = {
'data': [1, 2, 3, 4, 5],
'operation': 'sum'
}
coordinator.send_message(data_processor.agent_id, MessageType.REQUEST, task_request)
# 等待处理完成
time.sleep(2)
# 协调器再发送一个任务
print(f"\n4.3 协调器发送计数任务:")
count_request = {
'data': "Hello, Agent Communication!",
'operation': 'count'
}
coordinator.send_message(data_processor.agent_id, MessageType.REQUEST, count_request)
# 等待处理完成
time.sleep(2)
print(f"\n5. 框架统计信息:")
stats = framework.get_stats()
print(f"  注册Agents: {stats['registered_agents']}")
print(f"  消息队列统计: {stats['message_queue']}")
print(f"  目录服务统计: {stats['directory']}")
print(f"\n6. 断开Agents连接:")
data_processor.disconnect_from_framework()
monitor.disconnect_from_framework()
coordinator.disconnect_from_framework()
# 关闭框架
framework.shutdown()
def demo_broadcast_communication():
"""演示广播通信"""
print("\n\n=== 广播通信演示 ===")
# 创建通信框架
framework = AgentCommunicationFramework()
# 创建多个监控Agent
monitors = []
for i in range(3):
monitor = MonitoringAgent(f"monitor_{i+1:03d}", f"监控器{i+1}")
monitor.connect_to_framework(framework)
monitors.append(monitor)
# 创建一个数据Agent
data_agent = DataProcessingAgent("broadcaster_001", "广播器")
data_agent.connect_to_framework(framework)
# 等待注册完成
time.sleep(0.5)
print(f"\n广播系统就绪,{len(monitors)+1}个Agents已连接")
# 广播状态报告
print(f"\n数据Agent广播状态报告:")
status_report = {
'status': 'operational',
'timestamp': datetime.now().isoformat(),
'system_load': 'normal'
}
# 广播给所有监控Agent
broadcast_count = data_agent.broadcast_message(
MessageType.INFORM,
status_report,
filter_func=lambda agent: 'monitoring' in agent.capabilities
)
print(f"广播发送给 {broadcast_count} 个监控Agent")
# 等待处理
time.sleep(1)
# 广播给所有Agent
print(f"\n广播系统公告给所有Agents:")
announcement = {
'type': 'system_announcement',
'message': '系统将在5分钟后进行维护',
'severity': 'medium'
}
announcement_count = data_agent.broadcast_message(MessageType.INFORM, announcement)
print(f"公告发送给 {announcement_count} 个Agents")
# 等待处理
time.sleep(1)
# 断开连接
data_agent.disconnect_from_framework()
for monitor in monitors:
monitor.disconnect_from_framework()
framework.shutdown()
def demo_error_handling():
"""演示错误处理"""
print("\n\n=== 错误处理演示 ===")
# 创建通信框架
framework = AgentCommunicationFramework()
# 创建Agents
sender = BaseAgent("sender_001", "发送者", ['messaging'])
receiver = BaseAgent("receiver_001", "接收者", ['receiving'])
sender.connect_to_framework(framework)
receiver.connect_to_framework(framework)
# 等待注册完成
time.sleep(0.5)
print("1. 正常消息传递:")
success_msg = {
'type': 'test',
'content': '这是一条正常消息'
}
sender.send_message(receiver.agent_id, MessageType.INFORM, success_msg)
# 等待处理
time.sleep(0.5)
print("\n2. 发送到不存在的Agent:")
sender.send_message("nonexistent_agent", MessageType.INFORM, {"test": "message"})
print("\n3. 模拟接收者离线:")
# 注销接收者
receiver.disconnect_from_framework()
# 尝试发送消息到已注销的Agent
sender.send_message(receiver.agent_id, MessageType.REQUEST, {"request": "data"})
print("\n4. 重新连接接收者:")
receiver.connect_to_framework(framework)
time.sleep(0.5)
# 发送消息
sender.send_message(receiver.agent_id, MessageType.INFORM, {"reconnected": "message"})
# 等待处理
time.sleep(0.5)
# 断开连接
sender.disconnect_from_framework()
receiver.disconnect_from_framework()
framework.shutdown()
if __name__ == "__main__":
demo_agent_communication()
demo_broadcast_communication()
demo_error_handling(

实践练习

  1. 扩展MessageQueue类,增加消息持久化功能,确保系统重启后消息不丢失。

  2. 实现一个负载均衡机制,在多个相同能力的Agents之间分配任务。

  3. 添加安全机制,实现消息加密和身份验证功能。

学AI大模型的正确顺序,千万不要搞错了

🤔2026年AI风口已来!各行各业的AI渗透肉眼可见,超多公司要么转型做AI相关产品,要么高薪挖AI技术人才,机遇直接摆在眼前!

有往AI方向发展,或者本身有后端编程基础的朋友,直接冲AI大模型应用开发转岗超合适!

就算暂时不打算转岗,了解大模型、RAG、Prompt、Agent这些热门概念,能上手做简单项目,也绝对是求职加分王🔋

在这里插入图片描述

📝给大家整理了超全最新的AI大模型应用开发学习清单和资料,手把手帮你快速入门!👇👇

学习路线:

✅大模型基础认知—大模型核心原理、发展历程、主流模型(GPT、文心一言等)特点解析
✅核心技术模块—RAG检索增强生成、Prompt工程实战、Agent智能体开发逻辑
✅开发基础能力—Python进阶、API接口调用、大模型开发框架(LangChain等)实操
✅应用场景开发—智能问答系统、企业知识库、AIGC内容生成工具、行业定制化大模型应用
✅项目落地流程—需求拆解、技术选型、模型调优、测试上线、运维迭代
✅面试求职冲刺—岗位JD解析、简历AI项目包装、高频面试题汇总、模拟面经

以上6大模块,看似清晰好上手,实则每个部分都有扎实的核心内容需要吃透!

我把大模型的学习全流程已经整理📚好了!抓住AI时代风口,轻松解锁职业新可能,希望大家都能把握机遇,实现薪资/职业跃迁~

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐