OpenAI Agents 记忆管理示例
本文档演示了 OpenAI Agents 框架中的各种记忆管理方案,包括 SQLite、SQLAlchemy 和 Redis 会话存储。通过这些示例,您将了解如何在多轮对话中保持上下文记忆,实现持久化存储,以及不同存储方案的特点和适用场景。首先,我们需要配置必要的环境变量和依赖项:SQLite 会话存储示例SQLite 是一个轻量级的嵌入式数据库,非常适合本地开发和小型应用。OpenAI Agen
OpenAI Agents 记忆管理示例
本文档演示了 OpenAI Agents 框架中的各种记忆管理方案,包括 SQLite、SQLAlchemy 和 Redis 会话存储。通过这些示例,您将了解如何在多轮对话中保持上下文记忆,实现持久化存储,以及不同存储方案的特点和适用场景。
环境配置
首先,我们需要配置必要的环境变量和依赖项:
import os
from openai import AsyncOpenAI
from agents import Agent, OpenAIChatCompletionsModel, Runner, SQLiteSession
from dotenv import load_dotenv
load_dotenv()
QWEN_API_KEY = os.getenv("QWEN_API_KEY")
QWEN_BASE_URL = os.getenv("QWEN_BASE_URL")
QWEN_MODEL_NAME = os.getenv("QWEN_MODEL_NAME")
import base64
os.environ["LANGFUSE_PUBLIC_KEY"] = "pk-lf-276a7628-a121-43b1-a533-bc7c46bdb412"
os.environ["LANGFUSE_SECRET_KEY"] = "sk-lf-2039c6c4-2af8-41cc-9508-7d01df1be3d3"
os.environ["LANGFUSE_HOST"] = "http://localhost:3000"
LANGFUSE_AUTH = base64.b64encode(
f"{os.environ.get('LANGFUSE_PUBLIC_KEY')}:{os.environ.get('LANGFUSE_SECRET_KEY')}".encode()
).decode()
os.environ["OTEL_EXPORTER_OTLP_ENDPOINT"] = os.environ.get("LANGFUSE_HOST") + "/api/public/otel"
os.environ["OTEL_EXPORTER_OTLP_HEADERS"] = f"Authorization=Basic {LANGFUSE_AUTH}"
client = AsyncOpenAI(base_url=QWEN_BASE_URL, api_key=QWEN_API_KEY)
model=OpenAIChatCompletionsModel(model="qwen-turbo", openai_client=client)
import logfire
logfire.configure(
service_name='my_agent_service',
send_to_logfire=False,
)
logfire.instrument_openai_agents()
SQLite 会话存储示例
SQLite 是一个轻量级的嵌入式数据库,非常适合本地开发和小型应用。OpenAI Agents 提供了 SQLiteSession
类来实现基于 SQLite 的会话存储。
基本用法 - 内存数据库
下面的示例展示了如何使用内存数据库进行会话管理:
async def main():
# 创建一个Agent
agent = Agent(
name="助手",
instructions="请简洁地回复。",
model=model
)
# 创建一个会话实例,它将在多次运行中持续存在
session_id = "conversation_123"
session = SQLiteSession(session_id)
# 创建持久化的数据库文件
# session = SQLiteSession(session_id, db_path="conversation.db")
print("=== 会话示例 ===")
print("Agent将自动记住之前的消息。\n")
# 第一轮对话
print("第一轮对话:")
print("用户: 金门大桥在哪个城市?")
result = await Runner.run(
agent,
"金门大桥在哪个城市?",
session=session,
)
print(f"助手: {result.final_output}")
print()
# 第二轮对话 - Agent将记住之前的对话
print("第二轮对话:")
print("用户: 它在哪个州?")
result = await Runner.run(agent, "它在哪个州?", session=session)
print(f"助手: {result.final_output}")
print()
# 第三轮对话 - 继续对话
print("第三轮对话:")
print("用户: 那个州的人口是多少?")
result = await Runner.run(
agent,
"那个州的人口是多少?",
session=session,
)
print(f"助手: {result.final_output}")
print()
print("=== 对话完成 ===")
print("注意Agent是如何记住之前轮次的上下文的!")
print("会话自动处理对话历史。")
# 演示limit参数 - 只获取最新的2个项目
print("\n=== 最新项目演示 ===")
latest_items = await session.get_items(limit=2)
print("最新的2个项目:")
for i, msg in enumerate(latest_items, 1):
role = msg.get("role", "未知")
content = msg.get("content", "")
print(f" {i}. {role}: {content}")
print(f"\n从总对话历史中获取了 {len(latest_items)} 个项目。")
# 获取所有项目以显示差异
all_items = await session.get_items()
print(f"会话中的总项目数: {len(all_items)}")
await main()
=== 会话示例 ===
Agent将自动记住之前的消息。
第一轮对话:
用户: 金门大桥在哪个城市?
15:03:05.695 OpenAI Agents trace: Agent workflow
15:03:05.697 Agent run: '助手'
15:03:05.698 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州的旧金山。
第二轮对话:
用户: 它在哪个州?
15:03:06.407 OpenAI Agents trace: Agent workflow
15:03:06.408 Agent run: '助手'
15:03:06.408 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州。
第三轮对话:
用户: 那个州的人口是多少?
15:03:06.815 OpenAI Agents trace: Agent workflow
15:03:06.815 Agent run: '助手'
15:03:06.815 Chat completion with 'qwen-turbo' [LLM]
助手: 加利福尼亚州的人口约为3900万(2023年数据)。
=== 对话完成 ===
注意Agent是如何记住之前轮次的上下文的!
会话自动处理对话历史。
=== 最新项目演示 ===
最新的2个项目:
1. user: 那个州的人口是多少?
2. assistant: [{'annotations': [], 'text': '加利福尼亚州的人口约为3900万(2023年数据)。', 'type': 'output_text'}]
从总对话历史中获取了 2 个项目。
会话中的总项目数: 6
从上面的输出可以看到,Agent 成功地记住了之前的对话内容。在第二轮对话中,当用户问"它在哪个州?"时,Agent 知道"它"指的是金门大桥。这展示了会话存储的核心功能。
持久化存储 - 文件数据库
接下来我们演示如何使用文件数据库实现数据持久化:
async def main():
# 创建一个Agent
agent = Agent(
name="助手",
instructions="请简洁地回复。",
model=model
)
# 创建一个会话实例,它将在多次运行中持续存在
session_id = "conversation_123"
# session = SQLiteSession(session_id) # 内存数据库
# 创建持久化的数据库文件
session = SQLiteSession(session_id, db_path="conversation.db")
print("=== 会话示例 ===")
print("Agent将自动记住之前的消息。\n")
# 第一轮对话
print("第一轮对话:")
print("用户: 金门大桥在哪个城市?")
result = await Runner.run(
agent,
"金门大桥在哪个城市??",
session=session,
)
print(f"助手: {result.final_output}")
print()
# 第二轮对话 - Agent将记住之前的对话
print("第二轮对话:")
print("用户: 它在哪个州?")
result = await Runner.run(agent, "它在哪个州?", session=session)
print(f"助手: {result.final_output}")
print()
# 第三轮对话 - 继续对话
print("第三轮对话:")
print("用户: 那个州的人口是多少?")
result = await Runner.run(
agent,
"那个州的人口是多少?",
session=session,
)
print(f"助手: {result.final_output}")
print()
print("=== 对话完成 ===")
print("注意Agent是如何记住之前轮次的上下文的!")
print("会话自动处理对话历史。")
# 演示limit参数 - 只获取最新的2个项目
print("\n=== 最新项目演示 ===")
latest_items = await session.get_items(limit=2)
print("最新的2个项目:")
for i, msg in enumerate(latest_items, 1):
role = msg.get("role", "未知")
content = msg.get("content", "")
print(f" {i}. {role}: {content}")
print(f"\n从总对话历史中获取了 {len(latest_items)} 个项目。")
# 获取所有项目以显示差异
all_items = await session.get_items()
print(f"会话中的总项目数: {len(all_items)}")
await main()
=== 会话示例 ===
Agent将自动记住之前的消息。
第一轮对话:
用户: 金门大桥在哪个城市?
15:03:10.460 OpenAI Agents trace: Agent workflow
15:03:10.465 Agent run: '助手'
15:03:10.466 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州的旧金山。
第二轮对话:
用户: 它在哪个州?
15:03:10.986 OpenAI Agents trace: Agent workflow
15:03:10.988 Agent run: '助手'
15:03:10.988 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州。
第三轮对话:
用户: 那个州的人口是多少?
15:03:11.340 OpenAI Agents trace: Agent workflow
15:03:11.342 Agent run: '助手'
15:03:11.342 Chat completion with 'qwen-turbo' [LLM]
助手: 加利福尼亚州的人口约在4000万左右(具体数据会随时间略有变化)。
=== 对话完成 ===
注意Agent是如何记住之前轮次的上下文的!
会话自动处理对话历史。
=== 最新项目演示 ===
最新的2个项目:
1. user: 那个州的人口是多少?
2. assistant: [{'annotations': [], 'text': '加利福尼亚州的人口约在4000万左右(具体数据会随时间略有变化)。', 'type': 'output_text'}]
从总对话历史中获取了 2 个项目。
会话中的总项目数: 6
与内存数据库不同,这次我们使用了 db_path="conversation.db"
参数,将数据保存到文件中。这意味着即使程序重启,对话历史也会被保留。
数据库表结构分析
为了更好地理解 SQLite 会话存储的工作原理,让我们分析一下数据库的表结构:
表的数量:3张表
- agent_sessions - 会话表
- agent_messages - 消息表
- sqlite_sequence - SQLite系统表(用于自增ID管理)
表之间的关系
- agent_sessions (会话表)
- 主键 : session_id (TEXT)
- 字段 :
- session_id : 会话唯一标识符
- created_at : 创建时间 (默认当前时间)
- updated_at : 更新时间 (默认当前时间)
- 作用 : 存储每个对话会话的基本信息
- agent_messages (消息表)
- 主键 : id (INTEGER, 自增)
- 字段 :
- id : 消息唯一ID
- session_id : 关联的会话ID (外键)
- message_data : 消息内容 (JSON格式)
- created_at : 消息创建时间
- 外键关系 : session_id → agent_sessions.session_id
- 索引 : idx_agent_messages_session_id (提高查询性能)
- 作用 : 存储具体的对话消息内容
- sqlite_sequence (系统表)
- SQLite自动创建的系统表
- 用于管理自增字段的序列号
- 记录 agent_messages 表的当前最大ID值
表关系图
agent_sessions (1) ←──── (N)
agent_messages
↑ ↑
session_id id
created_at session_id
(FK)
updated_at message_data
created_at
关系说明
- 一对多关系 : 一个会话( agent_sessions )可以包含多条消息( agent_messages )
- 外键约束 : agent_messages.session_id 必须存在于 agent_sessions.session_id 中
- 数据完整性 : 通过外键关系确保消息必须属于某个有效的会话
当前数据状态
- 会话数 : 1个会话 ( conversation_123 )
- 消息数 : 6条消息
- 消息内容 : 包含用户问题和AI助手回答,以JSON格式存储
这种设计是典型的会话-消息关系模型,适合存储对话历史和实现会话管理功能。
SQLAlchemy 会话存储示例
SQLAlchemy 是 Python 中最流行的 ORM(对象关系映射)框架,提供了更强大和灵活的数据库操作能力。SQLAlchemySession
支持多种数据库后端,包括 PostgreSQL、MySQL、SQLite 等。
使用内存数据库
下面的示例展示了如何使用 SQLAlchemy 进行会话管理:
from agents.extensions.memory.sqlalchemy_session import SQLAlchemySession
async def main():
# 创建一个Agent
agent = Agent(
name="助手",
instructions="请简洁地回复。",
model=model
)
session_id = "conversation_123"
session = SQLAlchemySession.from_url(
session_id,
url="sqlite+aiosqlite:///:memory:",
create_tables=True,
)
print("=== 会话示例 ===")
print("Agent将自动记住之前的消息。\n")
# 第一轮对话
print("第一轮对话:")
print("用户: 金门大桥在哪个城市?")
result = await Runner.run(
agent,
"金门大桥在哪个城市??",
session=session,
)
print(f"助手: {result.final_output}")
print()
# 第二轮对话 - Agent将记住之前的对话
print("第二轮对话:")
print("用户: 它在哪个州?")
result = await Runner.run(agent, "它在哪个州?", session=session)
print(f"助手: {result.final_output}")
print()
# 第三轮对话 - 继续对话
print("第三轮对话:")
print("用户: 那个州的人口是多少?")
result = await Runner.run(
agent,
"那个州的人口是多少?",
session=session,
)
print(f"助手: {result.final_output}")
print()
print("=== 对话完成 ===")
print("注意Agent是如何记住之前轮次的上下文的!")
print("会话自动处理对话历史。")
# 演示limit参数 - 只获取最新的2个项目
print("\n=== 最新项目演示 ===")
latest_items = await session.get_items(limit=2)
print("最新的2个项目:")
for i, msg in enumerate(latest_items, 1):
role = msg.get("role", "未知")
content = msg.get("content", "")
print(f" {i}. {role}: {content}")
print(f"\n从总对话历史中获取了 {len(latest_items)} 个项目。")
# 获取所有项目以显示差异
all_items = await session.get_items()
print(f"会话中的总项目数: {len(all_items)}")
await main()
=== 会话示例 ===
Agent将自动记住之前的消息。
第一轮对话:
用户: 金门大桥在哪个城市?
15:10:14.737 OpenAI Agents trace: Agent workflow
15:10:14.747 Agent run: '助手'
15:10:14.747 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州的旧金山市。
第二轮对话:
用户: 它在哪个州?
15:10:15.350 OpenAI Agents trace: Agent workflow
15:10:15.351 Agent run: '助手'
15:10:15.352 Chat completion with 'qwen-turbo' [LLM]
助手: 金门大桥位于美国加利福尼亚州。
第三轮对话:
用户: 那个州的人口是多少?
15:10:15.818 OpenAI Agents trace: Agent workflow
15:10:15.818 Agent run: '助手'
15:10:15.818 Chat completion with 'qwen-turbo' [LLM]
助手: 加利福尼亚州的人口约为3900万(2023年数据)。
=== 对话完成 ===
注意Agent是如何记住之前轮次的上下文的!
会话自动处理对话历史。
=== 最新项目演示 ===
最新的2个项目:
1. user: 那个州的人口是多少?
2. assistant: [{'annotations': [], 'text': '加利福尼亚州的人口约为3900万(2023年数据)。', 'type': 'output_text'}]
从总对话历史中获取了 2 个项目。
会话中的总项目数: 6
SQLAlchemy 会话的行为与 SQLite 会话类似,但提供了更多的灵活性。通过 from_url
方法,您可以轻松切换到不同的数据库后端,只需更改连接字符串即可。
SQLAlchemy 的优势
- 多数据库支持:支持 PostgreSQL、MySQL、SQLite、Oracle 等多种数据库
- 连接池管理:自动管理数据库连接池,提高性能
- 事务支持:提供完整的事务管理功能
- 异步支持:通过
aiosqlite
、asyncpg
等驱动支持异步操作
Redis 会话存储示例
Redis 是一个高性能的内存数据库,特别适合需要快速访问和高并发的场景。Redis 会话存储提供了出色的性能和丰富的功能。
启动 Redis 服务
在使用 Redis 会话之前,需要先启动 Redis 服务。以下是 Docker 命令:
启动带数据持久化的 Redis
docker run -d --name redis-server -p 6380:6379 -v redis-data:/data redis:latest redis-server --appendonly yes
基本 Redis 会话使用
下面的示例展示了 Redis 会话的完整功能:
from agents.extensions.memory import RedisSession
async def main():
# 创建一个代理
agent = Agent(
name="助手",
instructions="回复要非常简洁。",
model=model
)
print("=== Redis 会话示例 ===")
print("此示例需要 Redis 在 localhost:6380 上运行")
print("启动 Redis:redis-server")
print()
# 创建 Redis 会话实例
session_id = "redis_conversation_123"
try:
session = RedisSession.from_url(
session_id,
url="redis://localhost:6380/0", # 使用数据库 0
)
# 测试 Redis 连接性
if not await session.ping():
print("Redis 服务器不可用!")
print("请启动 Redis 服务器后重试。")
return
print("成功连接到 Redis!")
print(f"会话 ID:{session_id}")
# 清除任何现有的会话数据以获得干净的开始
await session.clear_session()
print("会话已清除以进行干净的演示。")
print("代理将自动记住之前的消息。\n")
# 第一轮
print("第一轮:")
print("用户:金门大桥在哪个城市?")
result = await Runner.run(
agent,
"金门大桥在哪个城市?",
session=session,
)
print(f"助手:{result.final_output}")
print()
# 第二轮 - 代理将记住之前的对话
print("第二轮:")
print("用户:它在哪个州?")
result = await Runner.run(agent, "它在哪个州?", session=session)
print(f"助手:{result.final_output}")
print()
# 第三轮 - 继续对话
print("第三轮:")
print("用户:那个州的人口是多少?")
result = await Runner.run(
agent,
"那个州的人口是多少?",
session=session,
)
print(f"助手:{result.final_output}")
print()
print("=== 对话完成 ===")
print("注意代理如何记住之前轮次的上下文!")
print("Redis 会话自动处理具有持久性的对话历史记录。")
# 演示会话持久性
print("\n=== 会话持久性演示 ===")
all_items = await session.get_items()
print(f"存储在 Redis 中的总消息数:{len(all_items)}")
# 演示限制参数
print("\n=== 最新项目演示 ===")
latest_items = await session.get_items(limit=2)
print("最新的 2 个项目:")
for i, msg in enumerate(latest_items, 1):
role = msg.get("role", "unknown")
content = msg.get("content", "")
print(f" {i}. {role}:{content}")
# 使用新会话演示会话隔离
print("\n=== 会话隔离演示 ===")
new_session = RedisSession.from_url(
"different_conversation_456",
url="redis://localhost:6380/0",
)
print("创建具有不同 ID 的新会话...")
result = await Runner.run(
agent,
"你好,这是一个新的对话!",
session=new_session,
)
print(f"新会话响应:{result.final_output}")
# 显示会话是隔离的
original_items = await session.get_items()
new_items = await new_session.get_items()
print(f"原始会话有 {len(original_items)} 个项目")
print(f"新会话有 {len(new_items)} 个项目")
print("会话完全隔离!")
# 清理新会话
await new_session.clear_session()
await new_session.close()
# 可选:演示 TTL(生存时间)功能
print("\n=== TTL 演示 ===")
ttl_session = RedisSession.from_url(
"ttl_demo_session",
url="redis://localhost:6380/0",
ttl=3600, # 1 小时 TTL
)
await Runner.run(
agent,
"此消息将在 1 小时后过期",
session=ttl_session,
)
print("创建了具有 1 小时 TTL 的会话 - 消息将自动过期")
await ttl_session.close()
# 关闭主会话
await session.close()
except Exception as e:
print(f"错误:{e}")
print("确保 Redis 在 localhost:6379 上运行")
await main()
=== Redis 会话示例 ===
此示例需要 Redis 在 localhost:6380 上运行
启动 Redis:redis-server
成功连接到 Redis!
会话 ID:redis_conversation_123
会话已清除以进行干净的演示。
代理将自动记住之前的消息。
第一轮:
用户:金门大桥在哪个城市?
16:22:31.200 OpenAI Agents trace: Agent workflow
16:22:31.200 Agent run: '助手'
16:22:31.200 Chat completion with 'qwen-turbo' [LLM]
助手:美国旧金山。
第二轮:
用户:它在哪个州?
16:22:32.056 OpenAI Agents trace: Agent workflow
16:22:32.056 Agent run: '助手'
16:22:32.056 Chat completion with 'qwen-turbo' [LLM]
助手:加利福尼亚州。
第三轮:
用户:那个州的人口是多少?
16:22:32.598 OpenAI Agents trace: Agent workflow
16:22:32.598 Agent run: '助手'
16:22:32.598 Chat completion with 'qwen-turbo' [LLM]
助手:约3950万。
=== 对话完成 ===
注意代理如何记住之前轮次的上下文!
Redis 会话自动处理具有持久性的对话历史记录。
=== 会话持久性演示 ===
存储在 Redis 中的总消息数:6
=== 最新项目演示 ===
最新的 2 个项目:
1. user:那个州的人口是多少?
2. assistant:[{'annotations': [], 'text': '约3950万。', 'type': 'output_text'}]
=== 会话隔离演示 ===
创建具有不同 ID 的新会话...
16:22:33.117 OpenAI Agents trace: Agent workflow
16:22:33.117 Agent run: '助手'
16:22:33.117 Chat completion with 'qwen-turbo' [LLM]
新会话响应:你好!
原始会话有 6 个项目
新会话有 2 个项目
会话完全隔离!
=== TTL 演示 ===
16:22:33.621 OpenAI Agents trace: Agent workflow
16:22:33.623 Agent run: '助手'
16:22:33.623 Chat completion with 'qwen-turbo' [LLM]
创建了具有 1 小时 TTL 的会话 - 消息将自动过期
从上面的输出可以看到,Redis 会话提供了丰富的功能:
- 连接检测:通过
ping()
方法检测 Redis 服务器连接状态 - 会话隔离:不同的 session_id 创建完全独立的会话空间
- TTL 支持:可以设置消息的生存时间,自动过期清理
- 高性能:Redis 的内存存储提供极快的读写速度
Redis 会话的优势
- 高性能:内存存储,毫秒级响应时间
- 持久化:支持 RDB 和 AOF 持久化机制
- 分布式:支持集群模式,可水平扩展
- TTL 支持:自动过期机制,适合临时会话
- 丰富的数据结构:支持字符串、哈希、列表等多种数据类型
高级功能演示
除了基本的会话存储功能,OpenAI Agents 还提供了一些高级特性:
async def demonstrate_advanced_features():
"""演示高级 Redis 会话功能。"""
print("\n=== 高级功能演示 ===")
# 用于多租户的自定义键前缀
tenant_session = RedisSession.from_url(
"user_123",
url="redis://localhost:6380/0",
key_prefix="tenant_abc:sessions", # 用于隔离的自定义前缀
)
try:
if await tenant_session.ping():
print("自定义键前缀演示:")
await Runner.run(
Agent(name="支持", instructions="要有帮助",model=model),
"来自租户 ABC 的问候",
session=tenant_session,
)
print("成功创建了具有自定义键前缀的会话")
await tenant_session.close()
except Exception as e:
print(f"高级功能错误:{e}")
await demonstrate_advanced_features()
多租户支持
上面的示例展示了如何使用自定义键前缀实现多租户隔离。通过设置不同的 key_prefix
,可以在同一个 Redis 实例中为不同的租户或应用创建完全隔离的存储空间。
=== 高级功能演示 ===
自定义键前缀演示:
16:26:27.403 OpenAI Agents trace: Agent workflow
16:26:27.405 Agent run: '支持'
16:26:27.406 Chat completion with 'qwen-turbo' [LLM]
成功创建了具有自定义键前缀的会话
这个功能特别适用于:
- SaaS 应用:为不同客户提供数据隔离
- 多环境部署:开发、测试、生产环境使用同一 Redis 实例
- 微服务架构:不同服务使用不同的键前缀避免冲突
总结
本文档展示了 OpenAI Agents 框架中三种主要的会话存储方案:
存储方案对比
特性 | SQLite | SQLAlchemy | Redis |
---|---|---|---|
性能 | 中等 | 中等-高 | 极高 |
持久化 | 文件存储 | 多种后端 | 内存+持久化 |
扩展性 | 单机 | 支持集群 | 支持集群 |
复杂度 | 简单 | 中等 | 中等 |
适用场景 | 小型应用 | 企业应用 | 高并发应用 |
选择建议
- SQLite:适合原型开发、小型应用或单机部署
- SQLAlchemy:适合需要关系型数据库特性的企业级应用
- Redis:适合高并发、低延迟要求的生产环境
无论选择哪种存储方案,OpenAI Agents 都提供了一致的 API 接口,让您可以轻松地在不同方案之间切换,满足不同场景的需求。
更多推荐
所有评论(0)