aws strands agents 的多agent时的会话管理方案
if user_confirmation.lower() in ["yes", "y", "start", "begin", "proceed"]: # 确认词。self.conversation_state["phase"] = "ready_to_work" # 转换到准备就绪阶段。"ready_to_work": state["phase"] == "ready_to_work" # 是否准
·
希望智能体在开始“工作”之前进行多轮对话。有几种方法可以实现这一点。让我为您展示最佳模式:
方法 1:具有会话状态管理的单智能体
这是最简单、最优雅的方法——使用一个在内部管理会话状态的智能体:
from strands import Agent, tool from strands.multiagent.a2a import A2AServer from strands.session.s3_session_manager import S3SessionManager import json from typing import Dict, Any class ConversationalWorkflowAgent: def __init__(self, agent_id: str): self.agent_id = agent_id # 用于持久化会话记忆的会话管理器 self.session_manager = S3SessionManager( session_id=f"conversational_{agent_id}", bucket="agent-conversation-memory", # 智能体会话记忆存储桶 prefix=f"conversations/{agent_id}" # 前缀 ) # 会话状态 self.conversation_state = { "phase": "gathering_info", # 阶段: gathering_info -> ready_to_work -> working "collected_info": {}, # 收集的信息 "conversation_count": 0, # 会话次数 "requirements_met": False # 需求是否满足 } # 创建具有会话管理工具的智能体 self.agent = Agent( name=f"Conversational {agent_id}", # 智能体名称 session_manager=self.session_manager, prompt=self.get_conversation_prompt(), # 获取提示词 tools=[ self.update_conversation_state, # 更新会话状态 self.check_readiness_to_work, # 检查是否准备好工作 self.start_working, # 开始工作 *self.get_work_tools() # 获取工作工具 ] ) def get_conversation_prompt(self): return """ 您是一个对话式智能体,需要在开始工作前通过对话收集信息。 会话阶段 (CONVERSATION PHASES): 1. GATHERING_INFO (收集信息): 提问以了解用户需求 (最少 3 次交流) 2. READY_TO_WORK (准备就绪): 确认您已拥有足够信息 3. WORKING (工作中): 执行实际任务 规则 (RULES): - 在回应前始终检查会话状态 - 在提供开始工作之前至少收集 3 条信息 - 在信息收集期间保持对话性和吸引力 - 只有在用户明确确认准备好后才开始工作 使用工具来管理会话状态和检查准备情况。 """ @tool def update_conversation_state(self, key: str, value: str, phase: str = None) -> str: """使用新信息更新会话状态""" self.conversation_state["collected_info"][key] = value # 存储信息 self.conversation_state["conversation_count"] += 1 # 增加计数 if phase: self.conversation_state["phase"] = phase # 更新阶段 # 检查是否有足够的信息继续 info_count = len(self.conversation_state["collected_info"]) # 信息数量 min_exchanges = 3 # 最小交流次数 if (info_count >= min_exchanges and self.conversation_state["conversation_count"] >= min_exchanges and self.conversation_state["phase"] == "gathering_info"): self.conversation_state["phase"] = "ready_to_work" # 转换到准备就绪阶段 return f"状态已更新。收集了 {info_count} 条信息。准备转换到工作阶段。" return f"状态已更新。收集了 {info_count} 条信息。还需要 {min_exchanges - info_count} 条。" @tool def check_readiness_to_work(self) -> str: """检查智能体是否准备好开始工作""" state = self.conversation_state info = { "phase": state["phase"], # 当前阶段 "conversation_count": state["conversation_count"], # 会话次数 "collected_info_count": len(state["collected_info"]), # 收集的信息数量 "collected_info": state["collected_info"], # 收集的信息 "ready_to_work": state["phase"] == "ready_to_work" # 是否准备好工作 } return json.dumps(info, indent=2) # 返回 JSON 格式信息 @tool def start_working(self, user_confirmation: str = "yes") -> str: """开始实际的工作阶段""" if user_confirmation.lower() in ["yes", "y", "start", "begin", "proceed"]: # 确认词 self.conversation_state["phase"] = "working" # 进入工作阶段 return "开始工作阶段。我现在将使用收集到的信息执行您的请求。" else: return "工作阶段未开始。请告诉我何时准备好继续。" def get_work_tools(self): """在子类中重写以提供特定的工作工具""" @tool def perform_analysis(task_description: str) -> str: """执行实际的工作任务""" if self.conversation_state["phase"] != "working": # 检查阶段 return "尚无法执行分析。仍在对话阶段。" collected_info = self.conversation_state["collected_info"] # 获取收集的信息 return f"基于收集的信息执行分析: {collected_info}" # 返回结果 return [perform_analysis] # 使用示例 class DataAnalysisAgent(ConversationalWorkflowAgent): # 数据分析智能体 def get_work_tools(self): @tool def analyze_data(dataset_info: str) -> str: """根据会话需求分析数据""" if self.conversation_state["phase"] != "working": return "尚无法分析数据。请先完成对话。" requirements = self.conversation_state["collected_info"] # 需求信息 return f""" 数据分析结果: - 数据集: {requirements.get('dataset', '未指定')} - 分析类型: {requirements.get('analysis_type', '未指定')} - 重点领域: {requirements.get('focus_areas', '未指定')} - 时间线: {requirements.get('timeline', '未指定')} [此处将执行详细分析] """ @tool def generate_report(report_format: str = "summary") -> str: """生成分析报告""" if self.conversation_state["phase"] != "working": return "尚无法生成报告。分析未开始。" return f"基于分析需求生成了 {report_format} 报告。" return [analyze_data, generate_report] # 部署对话式智能体 if __name__ == "__main__": agent = DataAnalysisAgent("data_analyst") # 创建实例 server = A2AServer(agent=agent.agent, port=9000) # 创建服务器 server.serve() # 启动服务
方法 2:双智能体交接模式
使用一个对话智能体,其将任务交接给一个工作智能体:
from strands import Agent, tool from strands.multiagent.handoffs import handoff_to_agent # 导入交接函数 import asyncio class ConversationManager: # 会话管理器 def __init__(self): # 用于对话的会话智能体 self.conversation_agent = Agent( name="Conversation Manager", # 名称 prompt=""" 您是一名会话管理器。您的工作是: 1. 与用户进行对话以了解他们的需求 2. 收集至少 3 条重要信息 3. 准备就绪时,交接给工作智能体 跟踪会话进度,只有在拥有足够信息时才进行交接。 """, tools=[ self.collect_information, # 收集信息 self.check_conversation_progress, # 检查会话进度 self.handoff_to_worker # 交接给工作器 ] ) # 用于实际任务的工作智能体 self.work_agent = Agent( name="Work Executor", # 工作执行器 prompt=""" 您是一名工作执行器。您接收来自会话管理器的详细需求 并执行实际任务。您可以访问所有收集到的信息。 """, tools=[ self.execute_task, # 执行任务 self.generate_deliverable # 生成交付物 ] ) # 智能体间的共享内存 self.shared_context = { "conversation_history": [], # 会话历史 "collected_requirements": {}, # 收集的需求 "conversation_count": 0, # 会话次数 "ready_for_handoff": False # 是否准备好交接 } @tool def collect_information(self, key: str, value: str) -> str: """在会话期间收集信息""" self.shared_context["collected_requirements"][key] = value # 存储信息 self.shared_context["conversation_count"] += 1 # 增加计数 # 检查是否准备好交接 if (len(self.shared_context["collected_requirements"]) >= 3 and self.shared_context["conversation_count"] >= 3): self.shared_context["ready_for_handoff"] = True # 标记为准备好 return f"已收集: {key} = {value}。准备交接给工作智能体。" return f"已收集: {key} = {value}。交接前需要更多信息。" @tool def check_conversation_progress(self) -> str: """检查当前会话进度""" context = self.shared_context return f""" 会话进度: - 交流次数: {context['conversation_count']} - 已收集需求: {len(context['collected_requirements'])} - 准备好交接: {context['ready_for_handoff']} - 需求: {context['collected_requirements']} """ @tool def handoff_to_worker(self, user_message: str) -> str: """将收集的上下文交接给工作智能体""" if not self.shared_context["ready_for_handoff"]: # 检查是否准备好 return "尚未准备好交接。需要更多对话。" # 为工作智能体准备上下文 context_message = f""" 来自会话管理器的交接: 用户请求: {user_message} 收集的需求: {self.shared_context['collected_requirements']} 会话历史: {self.shared_context['conversation_history']} 请基于此信息执行工作。 """ # 交接给工作智能体 return handoff_to_agent(self.work_agent, context_message) # 调用交接函数 @tool def execute_task(self, task_details: str) -> str: """执行实际的工作任务""" requirements = self.shared_context["collected_requirements"] # 获取需求 return f""" 任务执行结果: 基于需求: {requirements} [实际工作将在此处执行] 任务成功完成。 """ @tool def generate_deliverable(self, format_type: str = "report") -> str: """生成最终交付物""" return f"基于收集的需求和执行的任务生成了 {format_type}。" # 使用 A2A 部署 conversation_manager = ConversationManager() server = A2AServer(agent=conversation_manager.conversation_agent, port=9000) server.serve()
推荐
对于您的用例,我推荐方法 1(具有状态管理的单智能体),因为:
-
简单性 (Simplicity): 一个智能体处理所有事情
-
会话持久性 (Session Persistence): 跨会话的自动记忆
-
清晰的状态跟踪 (Clear State Tracking): 明确的会话阶段
-
A2A 兼容 (A2A Compatible): 与 ClientFactory 完美配合
-
可扩展 (Scalable): 易于扩展更复杂的工作流
更多推荐
所有评论(0)