【Agent开发】第二阶段:赋予 Agent “海马体” —— 持久化记忆与断点续聊
文章摘要: 本文介绍了如何为AI Agent添加持久化记忆功能,实现会话的断点续聊。通过LangGraph框架的Checkpointer机制,可将对话状态自动保存到数据库,下次运行时通过thread_id恢复上下文。文章详细演示了使用MemorySaver实现本地内存存储的步骤,包括引入检查点模块、修改编译配置和运行时传入thread_id等关键代码实现。同时预留了切换到Redis存储的接口,为后
【Agent开发】第二阶段:赋予 Agent “海马体” —— 持久化记忆与断点续聊 – pd的AI Agent开发笔记
文章目录
前置环境:当前环境是基于WSL2 + Ubuntu 24.04 + Docker Desktop构建的云原生开发平台,所有服务(MySQL、Redis、Qwen)均以独立容器形式运行并通过Docker Compose统一编排。如何配置请参考我的博客 WSL2 + Ubuntu 24.04 + Docker Desktop 配置双内核环境
第 1 讲 —— 会话持久化
我们先用最简单的 MemorySaver (本地内存) 来演示 Checkpointer 的原理,因为它不需要额外配置 Docker,能让你专注于代码逻辑。理解后,我们再花 5 分钟切换到 Redis。
1. 什么是 Checkpointer?
在 LangGraph 中,Checkpointer 就像一个存档点。
- 每次 State (状态) 发生变化(比如模型说了一句话,或者工具执行完了),Checkpointer 就会把当前的 messages 列表保存到数据库。
- 下次运行时,只要传入相同的 thread_id,LangGraph 会自动从数据库加载之前的消息,恢复上下文。
2. 代码实战:添加记忆功能
修改之前的 agent_framework.py:
第一步:引入 MemorySaver
from langgraph.checkpoint.memory import MemorySaver
# 如果用 Redis (稍后切换): from langgraph.checkpoint.redis import RedisSaver
第二步:编译时传入 Checkpointer
找到 app = workflow.compile() 这一行,修改为:
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
print("✅ 记忆系统已加载,内存记忆")
第三步:运行时的关键 —— 传入 thread_id
这是最重要的一步!如果不传 thread_id,LangGraph 会认为每次都是新对话。
# 如果不传 thread_id,LangGraph 会认为每次都是新对话。
def run_framework_agent_memory(query: str, thread_id: str = "user_123"):
print(f"\n👤 用户 (Thread: {thread_id}): {query}")
# 配置运行时参数,必须包含 thread_id
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content=query)]}
# 传入 config 参数
for event in app.stream(inputs, config, stream_mode="values"):
last_msg = event["messages"][-1]
role = last_msg.type
content = last_msg.content
# 简单格式化输出
if role == "ai":
if last_msg.tool_calls:
print(f"🧠 思考: 准备调用工具 {[t['name'] for t in last_msg.tool_calls]}")
else:
print(f"🤖 Agent: {content}")
elif role == "tool":
print(f"📝 工具结果: {content}")
完整代码:
# --- 1. 定义工具集 ---
# 不再需要手动写 JSON Schema,框架会自动从函数签名和文档字符串生成。
from langchain_core.tools import tool
from datetime import datetime
@tool
def get_current_date() -> str:
"""获取当前的日期和星期几。适用于询问今天周几、日期的问题。"""
now = datetime.now()
# 直接返回包含星期的字符串,杜绝幻觉
return f"日期:{now.strftime('%Y-%m-%d')}, 星期:{now.strftime('%A')}"
@tool
def get_current_time() -> str:
"""获取当前的具体时间(时:分:秒)。适用于询问几点、剩余时间的问题。"""
return f"时间:{datetime.now().strftime('%H:%M:%S')}"
tools = [get_current_date, get_current_time]
# # --- 调试:打印工具信息 ---
# print("\n🔍 已注册的工具列表:")
# for t in tools:
# print(f" - 名称:{t.name}")
# print(f" - 描述:{t.description}")
# print(f" - 参数:{t.args_schema}")
# print("-" * 30)
# # -----------------------
# --- 2. 定义状态 ---
# 这是 LangGraph 的核心。我们需要定义一个“容器”,用来在节点之间传递数据。
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
import operator
# 定义状态结构
class AgentState(TypedDict):
# messages 是一个消息列表,operator.add 表示每次更新状态时,新消息会追加到列表后面(而不是覆盖)
messages: Annotated[Sequence[BaseMessage], operator.add]
# --- 3. 构建节点 (Nodes) ---
# 节点就是具体的执行逻辑。我们需要两个主要节点:模型节点 和 工具执行节点。
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
# 1. 初始化模型 (指向你的本地 Qwen)
# base_url 指向你的 Docker 服务
llm = ChatOpenAI(
model="qwen-3-4b", # 替换为你之前查到的真实模型名
base_url="http://localhost:7575/v1", # 注意加上 /v1
api_key="not-needed", # 本地通常不需要 key
temperature=0.05
)
# 将工具绑定到模型上,这样模型就知道自己有这些能力了
llm_with_tools = llm.bind_tools(tools)
import re,json
from langchain_core.messages import AIMessage
def extract_tool_calls_from_content(content: str):
"""从 content 中提取所有被<tool_call>包围的工具调用 JSON"""
tool_calls = []
matches = re.findall(r"<tool_call>\s*({.*?})\s*</tool_call>", content, re.DOTALL)
for i, match in enumerate(matches):
try:
data = json.loads(match.strip())
name = data.get("name")
arguments = data.get("arguments", {})
if name:
# 注意:LangChain 内部使用 'args' 而不是 'arguments'
tool_calls.append({
"name": name,
"args": arguments, # ⚠️ 关键:用 'args'
"id": f"call_{i}_{name}", # 必须有唯一 id
"type": "tool_call"
})
except Exception as e:
print(f"❌ 解析工具调用失败: {e}")
return tool_calls
# 2. 定义“思考”节点
def call_model(state: AgentState):
messages = state["messages"]
raw_response = llm_with_tools.invoke(messages)
# 提取工具调用
extracted_tool_calls = extract_tool_calls_from_content(raw_response.content)
# 创建新的 AIMessage,注入 tool_calls
response = AIMessage(
content=raw_response.content, # 保留原始思考过程(可选)
tool_calls=extracted_tool_calls # 👈 核心:手动注入
)
print("🔧 注入的 tool_calls:", extracted_tool_calls) # 调试用
return {"messages": [response]}
# 3. 定义“工具执行”节点
# LangGraph 内置了 ToolNode,自动处理工具查找和参数解析,不用我们自己写 registry 了!
tool_node = ToolNode(tools)
# --- 4. 构建图 (Graph) & 路由逻辑 ---
# 这是替代 while 循环的部分。我们需要定义流程怎么走。
from langgraph.graph import StateGraph, START, END
from langgraph.graph.message import add_messages
from langchain_core.messages import AIMessage
from langgraph.checkpoint.memory import MemorySaver
# 路由函数:决定下一步是去“调工具”还是“直接结束”
def should_continue(state: AgentState):
last_message = state["messages"][-1]
# 如果最后一条消息是 AI 发出的,且包含工具调用请求
if isinstance(last_message, AIMessage) and last_message.tool_calls:
return "tools" # 去工具节点
return END # 否则结束
# 1. 初始化图
workflow = StateGraph(AgentState)
# 2. 添加节点
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
# 3. 添加边 (Edges)
workflow.add_edge(START, "agent") # 从开始直接进入 agent 思考
workflow.add_conditional_edges(
"agent",
should_continue, # 根据思考结果决定去向
{
"tools": "tools", # 如果有工具调用,去 tools 节点
END: END # 如果没有,结束
}
)
workflow.add_edge("tools", "agent") # 工具执行完后,必须回到 agent 节点进行下一轮思考(形成闭环!)
# 4. 编译成可执行的应用
# 内存初始化
memory = MemorySaver()
app = workflow.compile(checkpointer=memory)
print("✅ 记忆系统已加载,内存记忆")
# --- 5. 运行 Agent ---
from langchain_core.messages import HumanMessage
# 如果不传 thread_id,LangGraph 会认为每次都是新对话。
def run_framework_agent_memory(query: str, thread_id: str = "user_123"):
print(f"\n👤 用户 (Thread: {thread_id}): {query}")
# 配置运行时参数,必须包含 thread_id
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content=query)]}
# 传入 config 参数
for event in app.stream(inputs, config, stream_mode="values"):
last_msg = event["messages"][-1]
role = last_msg.type
content = last_msg.content
# 简单格式化输出
if role == "ai":
if last_msg.tool_calls:
print(f"🧠 思考: 准备调用工具 {[t['name'] for t in last_msg.tool_calls]}")
else:
# 匹配思考部分 输出内容去掉思考
think_match = re.search(r"<think>(.*?)</think>", content, re.DOTALL)
content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
elif role == "tool":
print(f"📝 工具结果: {content}")
if __name__ == "__main__":
# 第一轮对话
run_framework_agent_memory("我叫张三,记住我的名字。", thread_id="session_001")
print("\n--- 模拟程序重启,开启新对话 ---\n")
# 第二轮对话 (使用相同的 thread_id)
run_framework_agent_memory("我叫什么名字?", thread_id="session_001")
# 第三轮对话 (使用不同的 thread_id,应该是失忆状态)
run_framework_agent_memory("我叫什么名字?", thread_id="session_002")
输出:
✅ 记忆系统已加载,内存记忆
👤 用户 (Thread: session_001): 我叫张三,记住我的名字。
🤖 Agent:已记住您的名字是张三。请问还有其他需要帮助的吗?
--- 模拟程序重启,开启新对话 ---
👤 用户 (Thread: session_001): 我叫什么名字?
🤖 Agent: 您叫张三。
👤 用户 (Thread: session_002): 我叫什么名字?
🤖 Agent:我无法获取您的姓名信息。您需要直接告诉我是谁在提问哦!
3. 进阶:切换到 Redis
安装依赖,langgraph-checkpoint-redis会自动安装Redis依赖,注意:Redis版本需要>=8.0
pip install langgraph-checkpoint-redis
写一个基类设置Redis连接
# src/service/redis_client.py
import redis
from langgraph.checkpoint.redis import RedisSaver
# 单例模式
class RedisClient:
_instance = None
def __new__(cls, *args, **kwargs):
# 如果实例不存在,则创建新实例
if not cls._instance:
cls._instance = super().__new__(cls)
print("创建新的单例实例")
else:
print("返回已存在的单例实例")
return cls._instance
def __init__(self):
self.host = "127.0.0.1"
self.port = 6379
self.db = 0
# TTL 配置,包含:
# default_ttl:检查点存活时间(分钟),默认 60 分钟。
# refresh_on_read:读取时是否刷新 TTL,默认 True。
self.ttl_config = {"default_ttl": 120, "refresh_on_read": False}
self.redis = redis.Redis(host=self.host, port=self.port, db=self.db)
self.redis_saver = RedisSaver(redis_client=self.redis,ttl=self.ttl_config)
修改记忆注入:
# src/Mini-Agent/agent_framework.py
import sys
import os
# 添加父目录到系统路径
sys.path.append(os.path.dirname(os.path.dirname(os.path.abspath(__file__))))
# 现在可以直接导入
from service.redis_client import RedisClient
# redis记忆初始化
memory = RedisClient().redis_saver
memory.setup()
print("✅ redis记忆系统已加载")
app = workflow.compile(checkpointer=memory)
输出:
创建新的单例实例
✅ redis记忆系统已加载
👤 用户 (Thread: session_001): 我叫张三,记住我的名字。
🤖 Agent: 已记住您的名字是张三。请问还有其他需要帮助的吗?
--- 模拟程序重启,开启新对话 ---
👤 用户 (Thread: session_001): 我叫什么名字?
🤖 Agent: 您叫张三。
👤 用户 (Thread: session_002): 我叫什么名字?
🤖 Agent: 我无法获取您的姓名信息。您需要直接告诉我是谁在提问哦!
🕵️♂️ 实战演练:Agent 记忆追踪行动
如果上面的对话成功了,现在我们要看看数据到底存在哪了。
连接上redis,并查看数据
// checkpoint > checkpoint:session_001 > checkpoint:session_001:__empty__:1f113db2-0629-6e28-8001-35b18e366769
[
{
"thread_id": "session_001",
"run_id": "",
"checkpoint_ns": "__empty__",
"checkpoint_id": "1f113db2-0629-6e28-8001-35b18e366769",
"parent_checkpoint_id": "1f113db1-c5a9-63de-8000-bf8851e7566b",
"checkpoint_ts": 1772196728729.553,
"checkpoint": {
"type": "json",
"v": 4,
"ts": "2026-02-27T12:52:08.729552+00:00",
"id": "1f113db2-0629-6e28-8001-35b18e366769",
"channel_values": {
"messages": [
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"HumanMessage"
],
"kwargs": {
"content": "我叫张三,记住我的名字。",
"type": "human"
}
},
{
"lc": 1,
"type": "constructor",
"id": [
"langchain",
"schema",
"messages",
"AIMessage"
],
"kwargs": {
"content": "<think>\n好的,用户说“我叫张三,记住我的名字。”首先,我需要确认用户的要求。用户希望我记住他的名字,张三。接下来,我需要检查是否有需要调用的工具来处理这个请求。\n\n查看提供的工具,有get_current_date和get_current_time,这两个函数分别用于获取当前日期和时间。但用户的需求是记住名字,这并不涉及日期或时间的获取。因此,这两个工具在这里并不适用。\n\n用户可能是在测试我是否能处理简单的指令,或者希望后续对话中能使用这个名字。作为AI,我需要确认是否已经正确记录了用户的名字。根据我的设计,我应该能够记住对话中的关键信息,如用户提到的名字。因此,不需要调用任何工具,直接回应用户即可。\n\n接下来,我应该回复用户,确认已经记住了他的名字,并询问是否有其他需要帮助的地方。这样既符合用户的要求,也保持了对话的流畅性。不需要使用任何工具函数,因为任务已经完成,只需自然语言回应即可。\n</think>\n\n已记住您的名字是张三。请问还有其他需要帮助的吗?",
"type": "ai",
"tool_calls": [],
"invalid_tool_calls": []
}
}
]
},
"channel_versions": {
"__start__": "00000000000000000000000000000002.0.35019554440721345",
"messages": "00000000000000000000000000000003.0.1628457894508898",
"branch:to:agent": "00000000000000000000000000000003.0.1628457894508898"
},
"versions_seen": {
"__input__": {},
"__start__": {
"__start__": "00000000000000000000000000000001.0.3573929798569724"
},
"agent": {
"branch:to:agent": "00000000000000000000000000000002.0.35019554440721345"
}
},
"updated_channels": [
"messages"
],
"pending_sends": []
},
"metadata": "{\"source\":\"loop\",\"step\":1,\"parents\":{}}",
"has_writes": false,
"source": "loop",
"step": 1
}
]
使用 Python 脚本查看 (更直观)
import redis
import json
# 连接 Redis
r = redis.Redis(host="127.0.0.1", port=6379, db=0, decode_responses=True)
# 查找所有包含 session_001 的 Key
pattern = "*session_001*"
keys = r.keys(pattern)
print(f"🔍 找到 {len(keys)} 个相关 Key:")
for key in keys:
print(f"\n📂 Key: {key}")
# 获取类型
k_type = r.type(key)
print(f" 类型:{k_type}")
# 获取 TTL
ttl = r.ttl(key)
print(f" 剩余寿命:{ttl} 秒")
# 获取内容
if k_type == 'string':
value = r.get(key)
# 尝试格式化 JSON
try:
data = json.loads(value)
# 简单打印消息内容
if 'channel_values' in data and 'messages' in data['channel_values']:
msgs = data['channel_values']['messages']
print(f" 📝 内容预览 (最后一条): {msgs[-1]['content'][:50]}...")
else:
print(f" 📝 内容预览:{value[:100]}...")
except:
print(f" 📝 内容:{value[:100]}...")
🧠 深度解析:LangGraph Redis 存储结构揭秘
能注意到Redis中总共有4个储存部分
- checkpoint
- checkpoint_latest
- checkpoint_write
- write_keys_zset
然后里面才是session_001、session_002的数据
四大存储组件的作用
看到的四个 Key 前缀,分别承担了不同的职责,共同构成了一个高可用、可回溯的记忆系统:
| Key 前缀 | 角色比喻 | 核心作用 |
|---|---|---|
checkpoint |
📚 历史档案库 | 存储每一个步骤的完整状态快照。就像游戏的每一个存档点。如果程序崩溃,可以从这里恢复。 |
checkpoint_latest |
🚀 快速传送门 | 指向当前线程最新的那个 Checkpoint ID。避免每次都要遍历所有历史记录去找最新状态,极大提升读取速度。 |
checkpoint_writes |
✍️ 增量日志本 | 存储单次执行中产生的新数据(比如刚生成的回复)。采用“写时复制”策略,只有当一步彻底成功后,才合并到主 checkpoint 中。保证原子性。 |
write_keys_zset |
🗂️ 索引目录 | (ZSet = Sorted Set) 维护所有待处理写入操作的有序列表,确保在并发环境下,写入操作按顺序执行,不丢数据。 |
💡 架构智慧:
这种设计采用了 CQRS (命令查询职责分离) 的思想。
- 写路径:先写 checkpoint_writes (轻量),成功后再合并。
- 读路径:通过 checkpoint_latest 快速定位,读取 checkpoint 中的完整状态。
🎬 情景设定:用户 “IronMan” 的对话流
这是一个非常深刻的问题!你触及到了 LangGraph Checkpoint Redis 实现的核心架构。
很多开发者只把 Redis 当作一个“存字符串的字典”,但 LangGraph 团队在这里利用了 Redis 的高级特性(ZSet、原子写)来实现分布式状态机最关键的三个需求:
- 并发安全:多个用户同时对话,或者同一个用户快速发送多条消息,数据不能乱。
- 断点续传:程序崩了,重启后能精确知道哪一步没做完。
- 时间旅行:随时回滚到之前的任意一步状态。
你看到的四个 Key 结构,正是为了实现这些目标而设计的"写时复制 (Copy-On-Write) + 日志追加"模式。
我们用"银行记账"和"游戏存档"的例子来通俗解释这四个部分。
🎬 情景设定:用户 “IronMan” 的对话流
假设用户 “IronMan” 正在进行多轮对话:
- Step 0: 用户说:“你好”。
- Step 1: Agent 回复:“你好,我是助手”。
- Step 2: 用户说:“我叫 IronMan”。
- Step 3: Agent 思考中… (此时程序突然崩溃/断电了!)
- Step 3 (恢复): 程序重启,继续执行 Step 3,回复:“好的,记住了”。
我们来看看在这个过程中,Redis 里的四个 Key 是如何协作的。
1. checkpoint: 完整的历史快照 (Full Snapshots)
类比:游戏的“存档文件” (.sav)
每一个存档文件都记录了那一刻游戏的完整状态(位置、血量、背包)。
- 结构:
checkpoint:{thread_id}:{ns}:{checkpoint_id} - 内容:包含该时刻完整的
channel_values(所有消息列表)。 - 行为:
- 当 Step 1 完成时,生成
checkpoint_..._id_1,里面存了[Msg0, Msg1]。 - 当 Step 2 完成时,生成
checkpoint_..._id_2,里面存了[Msg0, Msg1, Msg2]。
- 当 Step 1 完成时,生成
- 特点:
- 不可变 (Immutable):一旦写入,永不修改。这是“写时复制”的体现——新的状态是旧状态的副本 + 增量,而不是直接覆盖旧文件。
- 作用:支持“时间旅行”。如果你想回滚到 Step 1,只需读取
id_1的那个 Key 即可。
2. checkpoint_writes: 增量日志 (Write-Ahead Log / Delta)
类比:银行的“交易流水单”
银行不会每发生一笔交易就重写一次你的总余额文件。它只是记一笔流水:“+100 元”。
- 结构:
checkpoint_writes:{thread_id}:{ns}:{checkpoint_id}:{task_id}:{write_index}- 注意这里多了
task_id和write_index,说明它是更细粒度的。
- 注意这里多了
- 内容:只存变化的部分 (Delta)。比如 Step 3 中,Agent 调用工具产生的那个新消息。
- 行为:
- 在 Step 3 执行过程中,Agent 生成了回复。在它准备更新主存档之前,先把这条新消息写到
checkpoint_writes里。 - 关键点:这个写入是原子性的。即使写完这一条就断电了,这条数据也是安全的。
- 在 Step 3 执行过程中,Agent 生成了回复。在它准备更新主存档之前,先把这条新消息写到
- 为什么需要它?(解决崩溃问题):
- 如果 Step 3 执行到一半断电了:
checkpoint_id_2(Step 2 的存档) 是完整的,没坏。checkpoint_writes里可能有一条未提交的记录。
- 重启后:LangGraph 会检查
checkpoint_writes。发现有一条属于checkpoint_id_2的未完成写入。它会重放这条日志,把数据补全,然后再生成新的checkpoint_id_3。 - 如果没有它:断电瞬间,内存里的新消息丢了,且没有落盘,Agent 就会丢失刚才的思考结果,甚至陷入死循环。
- 如果 Step 3 执行到一半断电了:
3. checkpoint_latest: 指针 (The Pointer)
类比:游戏主菜单上的“当前存档槽位”
你硬盘里有 10 个存档文件 (checkpoint_1…checkpoint_10),但主菜单只需要知道哪一个是最新的,以便你点击“继续游戏”。
- 结构:
checkpoint_latest:{thread_id}:{ns} - 内容:存储的是最新那个 Checkpoint 的 ID (例如
"1f113db2-...")。 - 行为:
- 每次成功生成一个新的 Checkpoint,就更新这个 Key 的值。
- 这是一个可变的 Key。
- 作用:O(1) 快速定位。
- 如果不存这个指针,每次启动 Agent,你都得去遍历所有
checkpoint:*的 Key,比较它们的时间戳来找最新的,效率极低。 - 有了它,LangGraph 只要读这一个 Key,拿到 ID,再去读对应的
checkpoint大文件,瞬间恢复状态。
- 如果不存这个指针,每次启动 Agent,你都得去遍历所有
4. write_keys_zset: 调度器与垃圾回收 (The Scheduler & GC)
类比:机场的“航班起飞时间表” + “过期行李清理清单”
这是一个 Sorted Set (ZSet)。ZSet 的特点是:每个成员都有一个 Score (分数),通常是时间戳。
-
结构:
write_keys_zset:{thread_id}(或者是全局的,取决于配置) -
内容:
- Member:
checkpoint_writes的 Key 名称。 - Score: 该写入发生的时间戳。
- Member:
-
核心作用 1:崩溃恢复的“待办清单”
- 当系统重启时,它不仅看
checkpoint_latest,还会查这个 ZSet。 - 它会问:“有没有哪些
checkpoint_writes是发生在最新 Checkpoint 之后,但还没被合并进主存档的?” - 通过 Score (时间戳) 排序,它能按顺序重放这些未完成的写入。
- 当系统重启时,它不仅看
-
核心作用 2:TTL 与 垃圾回收 (GC) ⭐ 这是 ZSet 最妙的地方
- 你之前配置了
ttl=3600(1 小时过期)。 - Redis 的 Key 过期是异步的,有时候不靠谱。而且
checkpoint是一串链表,删了一个中间的,后面的怎么办? - LangGraph 利用 ZSet 的 Score 来做主动清理:
- 定期(或每次启动时)查询 ZSet:
ZRANGEBYSCORE ... 0 (当前时间 - TTL)。 - 找出所有超过 1 小时的
checkpoint_writesKey。 - 批量删除这些 Key,并从 ZSet 中移除它们。
- 同时,如果某个旧的
checkpoint不再被任何未完成的 write 引用,也可以被安全删除。
- 定期(或每次启动时)查询 ZSet:
- 为什么不用 Redis 原生 TTL? 因为我们需要事务性。删除数据和更新索引必须在逻辑上保持一致,ZSet 提供了这种有序的管理能力。
- 你之前配置了
🔄 综合流程演示:一次完整的“写时复制”
让我们把刚才的 Step 2 -> Step 3 过程串联起来:
-
初始状态:
checkpoint_latest指向CP_2(Step 2 完成)。checkpoint:CP_2存有[Msg0, Msg1, Msg2]。write_keys_zset是空的 (假设之前的都清理了)。
-
开始 Step 3 (Agent 思考并生成回复):
- LangGraph 读取
CP_2到内存。 - Agent 生成新消息
Msg3。
- LangGraph 读取
-
预写日志 (Write-Ahead):
- 系统生成一个唯一的
write_id。 - 将
Msg3写入checkpoint_writes:...:write_id。 - 关键:同时将这个 Key 加入
write_keys_zset,Score 设为当前时间戳。 - (此时如果断电,数据已在 Redis,下次可恢复)
- 系统生成一个唯一的
-
创建新快照 (Copy-On-Write):
- 系统在内存中合并
CP_2的数据 +Msg3,形成新状态State_3。 - 生成新的 ID
CP_3。 - 将
State_3写入新的 Keycheckpoint:CP_3。 - (注意:
checkpoint:CP_2依然原封不动地留在那里,这就是“写时复制”)
- 系统在内存中合并
-
提交与清理 (Commit):
- 更新
checkpoint_latest指向CP_3。 - 从
write_keys_zset中移除刚才那个write_id(因为已经合并进主存档了,不需要再作为“待办事项”了)。 - (可选) 删除
checkpoint_writes:...:write_id以节省空间。
- 更新
-
垃圾回收 (GC):
- 后台进程扫描
write_keys_zset。 - 发现
CP_1相关的 writes 已经是 2 小时前的了 (Score 很小)。 - 删除
CP_1的存档和相关 writes,释放内存。
- 后台进程扫描
💡 为什么要这么复杂?(设计哲学)
你可能会想:“直接 SET messages [...] 覆盖不就行了吗?”
-
并发冲突:
- 如果是微服务架构,两个容器同时处理同一个用户的请求(虽然少见,但在重试机制下可能发生)。
- 直接覆盖会导致丢失更新 (Lost Update)。
- 使用
checkpoint_writes+ZSet+ 版本号机制,可以检测到冲突,确保只有合法的写入被合并。
-
Exactly-Once 语义:
- 在分布式系统中,网络抖动可能导致请求重发。
- 通过
write_index和 ZSet 的记录,LangGraph 能保证同一条消息只被处理一次,不会重复回复用户。
-
人类介入 (Human-in-the-loop):
- 因为保留了所有历史
checkpoint(直到 GC),你可以随时让时间倒流。 - 比如用户说“不对,重来”,你可以直接把
checkpoint_latest指回CP_2,Agent 就会忘记CP_3发生的一切,重新生成。
- 因为保留了所有历史
| Key 前缀 | 角色 | 可变性 | 核心作用 | 类比 |
|---|---|---|---|---|
checkpoint |
存档文件 | ❌ 不可变 | 存储完整状态快照,支持时间旅行 | 游戏 .sav 文件 |
checkpoint_writes |
事务日志 | ✅ 追加写 | 防止崩溃丢数据,实现原子写入 | 银行流水单 |
checkpoint_latest |
指针 | ✅ 可变 | 快速找到最新存档,O(1) 加载 | 游戏“继续”按钮 |
write_keys_zset |
调度器 | ✅ 增删改 | 管理待处理写入,基于时间戳做 GC | 航班时刻表 + 清理清单 |
JSON 数据结构逐字段拆解
聚焦checkpoint内容,这是最核心的部分。
A. 身份标识 (Identity)
"thread_id": "session_001",
"checkpoint_ns": "__empty__",
"checkpoint_id": "1f113db2-0629-6e28-8001-35b18e366769",
"parent_checkpoint_id": "1f113db1-c5a9-63de-8000-bf8851e7566b"
thread_id: 你的会话 ID。这是隔离不同用户数据的钥匙。checkpoint_ns: 命名空间。empty 表示这是主流程。如果你以后搞多 Agent 协作(子图),这里会出现子图的名字(如 agent_router),实现嵌套状态管理。checkpoint_id: 当前这一步的唯一 UUID。parent_checkpoint_id: 关键! 它指向上一步的 ID。这就形成了一条链表。- 作用:支持时间旅行。你可以拿着这个 ID 去加载“上一秒”的状态,实现“撤销”、“回滚”或“分支剧情”功能。
B. 核心数据:channel_values (记忆的真相)
"channel_values": {
"messages": [
{
"lc": 1,
"type": "constructor",
"id": ["langchain", "schema", "messages", "HumanMessage"],
"kwargs": {
"content": "我叫张三,记住我的名字。",
"type": "human"
}
},
{
// ... AIMessage ...
"kwargs": {
"content": "<think>...</think>\n\n已记住您的名字...",
"type": "ai",
"tool_calls": [],
"invalid_tool_calls": []
}
}
]
}
channel_values: 这里存放着你定义的AgentState中的所有变量。因为我只定义了messages,所以这里只有messages键。如果你定义了user_name: str,这里也会多一个user_name键。- LangChain 序列化格式 (
lc,type,id):- 这不是普通的 JSON,这是 LangChain 特定的序列化协议。
lc: 1: 协议版本。id: 告诉反序列化器,这个对象对应 Python 里的哪个类 (langchain.schema.messages.HumanMessage)。kwargs: 真正的构造函数参数。- 意义:无论你的 Python 代码怎么变,只要遵循这个协议,存进去的对象就能原样还原出来,包括复杂的对象方法(如果支持的话)。
C. 版本控制:channel_versions & versions_seen
"channel_versions": {
"messages": "00000000000000000000000000000003.0.1628457894508898",
"branch:to:agent": "..."
},
"versions_seen": {
"agent": {
"branch:to:agent": "..."
}
}
- 这是什么? 这是 LangGraph 状态机引擎的“心跳”。
- 作用:
- 每个 Channel (如
messages) 都有一个版本号。每次有新消息加入,版本号递增。 versions_seen记录每个节点(Node)最后看到的版本。- 核心逻辑:当 Agent 运行时,它会检查:“我上次处理到版本 3 了,现在最新版本是 4 吗?如果是,我就需要再次运行;如果还是 3,说明没新事,我可以休息。”
- 这防止了无限循环和重复处理,是 ReAct 循环能自动停止的底层保障。
- 每个 Channel (如
D. 更新标记:updated_channels
"updated_channels": ["messages"]
- 告诉系统:在这一步操作中,只有 messages 通道发生了变化。其他通道(如果有)保持不变。这有助于优化增量更新。
🔍 实战验证:利用结构特性“篡改”记忆
if __name__ == "__main__":
run_framework_agent_memory("我叫张三,记住我的名字。", thread_id="session_003")
# 然后手动去修改相关记忆内容 checkpoint:session_003:__empty__:{checkpoint_id} 把"张三"改成"pd",然后再运行下面
# 第二轮对话 (使用相同的 thread_id)
run_framework_agent_memory("我叫什么名字?", thread_id="session_003")
运行结果如下:
创建新的单例实例
✅ redis记忆系统已加载
👤 用户 (Thread: session_003): 我叫什么名字?
🤖 Agent: 您叫pd。
构建“长期记忆” —— MySQL 用户画像系统
本讲目标:利用 MySQL 存储用户的关键事实(如名字、偏好、职业),实现跨会话的永久记忆。
核心区别:
- Redis (短期):存完整的对话流,有 TTL,会过期,适合上下文理解。
- MySQL (长期):存结构化的“事实”,永久保存,适合用户画像和个性化服务。
目录结构:
./src
├── Mini_Agent
│ ├── __init__.py
│ ├── agent_framework.py
│ ├── agent_v1.py
│ ├── graph.py
│ ├── state.py
│ └── tools
├── core
│ ├── __init__.py
│ ├── config.py
│ ├── db_session.py
│ └── models.py
├── main.py
├── service
│ ├── __init__.py
│ └── redis_client.py
├── test
│ ├── __init__.py
│ └── redis_look.py
└── utils
├── __init__.py
└── xml_parser.py
1. 数据库设计:用户画像表 (user_profiles)
我们需要一张表来存储 Key-Value 形式的用户信息。为了支持扩展,我们设计得灵活一些。
🗄️ SQL 建表语句
执行以下 SQL:
-- 创建数据库(如果还没创建)
CREATE DATABASE IF NOT EXISTS agent_memory CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci;
USE agent_memory;
-- 创建用户画像表
CREATE TABLE IF NOT EXISTS user_profiles (
id INT AUTO_INCREMENT PRIMARY KEY,
thread_id VARCHAR(255) NOT NULL, -- 关联会话 ID (可选,也可以按 user_id)
user_key VARCHAR(100) NOT NULL, -- 记忆的键,如 "name", "favorite_food"
user_value TEXT NOT NULL, -- 记忆的值,如 "IronMan", "Pizza"
confidence_score FLOAT DEFAULT 1.0, -- 置信度 (0.0-1.0),Agent 不确定时可以存低分
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP,
-- 索引优化:快速查找某个会话的所有记忆,或特定键
UNIQUE KEY uk_thread_key (thread_id, user_key),
INDEX idx_thread (thread_id)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
💡 设计思路解析
thread_id:我们将记忆绑定到会话。未来可以扩展为 user_id(如果做了用户登录系统)。user_key/user_value:采用 EAV (Entity-Attribute-Value) 模型的简化版。这样不需要为每个属性(名字、年龄、喜好)单独建列,扩展性极强。UNIQUE KEY:确保同一个会话下,同一个 key(如 “name”)只有一条记录。更新时直接ON DUPLICATE KEY UPDATE。confidence_score:这是一个高级字段。如果 Agent 听不清或不确定用户说的是不是事实,可以存 0.5。查询时只查 > 0.8 的高置信度数据。
2. 开发记忆工具:Save & Get
我们需要两个工具让 Agent 操作数据库。为了代码整洁,我们先封装一个 DB 连接类。
🛠️ 步骤 A:安装依赖、定义模型 (Model)
pip install pymysql sqlalchemy
(使用 SQLAlchemy 可以让代码更通用,避免原生 SQL 注入风险)
我们需要一个 Python 类来映射数据库表。
# src/core/models.py
from sqlalchemy import Column, Integer, String, Text, Float, DateTime, UniqueConstraint, Index
from sqlalchemy.orm import declarative_base
from datetime import datetime
Base = declarative_base()
class UserProfile(Base):
__tablename__ = 'user_profiles'
id = Column(Integer, primary_key=True, autoincrement=True)
thread_id = Column(String(255), nullable=False, index=True) # 会话ID
user_key = Column(String(100), nullable=False) # 键:name, preference
user_value = Column(Text, nullable=False) # 值:IronMan, Pizza
confidence_score = Column(Float, default=1.0) # 置信度
created_at = Column(DateTime, default=datetime.utcnow)
updated_at = Column(DateTime, default=datetime.utcnow, onupdate=datetime.utcnow)
# 定义唯一约束:同一个 session 下,key 不能重复
__table_args__ = (
UniqueConstraint('thread_id', 'user_key', name='uk_thread_key'),
Index('idx_thread', 'thread_id'),
)
def __repr__(self):
return f"<UserProfile(thread_id={self.thread_id}, key={self.user_key}, value={self.user_value})>"
💡 优势:
- 如果你以后想换 PostgreSQL,只需改连接字符串,这个类完全不用动。
__table_args__直接在代码里定义了索引和约束,不需要去数据库手动敲 SQL。datetime.utcnow自动处理时间戳。
设定配置文件
import os
from dotenv import load_dotenv
load_dotenv() # 加载 .env 文件
class Settings:
# LLM 配置
LLM_MODEL_NAME = os.getenv("LLM_MODEL", "qwen-3-4b")
LLM_BASE_URL = os.getenv("LLM_BASE_URL", "http://localhost:7575/v1")
LLM_API_KEY = os.getenv("LLM_API_KEY", "not-needed")
LLM_TEMPERATURE = float(os.getenv("LLM_TEMP", "0.05"))
# Redis 配置
REDIS_HOST = os.getenv("REDIS_HOST", "127.0.0.1")
REDIS_PORT = int(os.getenv("REDIS_PORT", "6379"))
# MySQL 配置
DATABASE_URL = os.getenv("DATABASE_URL", "mysql+pymysql://root:xxxxxx@localhost:3307/agent_memory")
settings = Settings()
🛠️ 步骤 B:封装数据库会话 (Session Manager)
为了保证连接池的高效利用,我们用一个依赖注入风格的上下文管理器。
# src/core/db_session.py
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from src.core.models import Base
from src.core.config import settings
# 创建引擎
engine = create_engine(
settings.DATABASE_URL,
pool_pre_ping=True, # 自动检测断连并重连
pool_size=10, # 连接池大小
max_overflow=20, # 最大溢出连接数
echo=False # 生产环境关闭 SQL 日志,调试时设为 True
)
# 创建 Session 工厂
# scoped_session 确保在多线程/异步环境下每个线程有独立的 Session
SessionLocal = scoped_session(sessionmaker(bind=engine))
def init_db():
"""初始化数据库:如果表不存在则自动创建"""
Base.metadata.create_all(bind=engine)
print("✅ 数据库表结构已同步 (ORM 模式)")
def get_db_session():
"""获取数据库会话"""
return SessionLocal()
🛠️ 步骤 C:编写 memory_tools.py
# src\Mini_Agent\tools\memory_tools.py
# src\Mini_Agent\tools\memory_tools.py
from langchain_core.tools import tool
from src.core.db_session import get_db_session
from src.core.models import UserProfile
from sqlalchemy import select
from typing import List
@tool
def save_user_memory(key: str, value: str, thread_id: str) -> str:
"""
保存用户的关键信息到长期记忆。
参数:
key: 信息类别 (如 'name', 'job')
value: 具体内容
thread_id: 当前会话 ID
"""
session = get_db_session()
try:
# 1. 尝试查询是否已存在
stmt = select(UserProfile).where(
UserProfile.thread_id == thread_id,
UserProfile.user_key == key
)
existing_profile = session.execute(stmt).scalars().first()
if existing_profile:
# 2. 如果存在,更新值
existing_profile.user_value = value
existing_profile.confidence_score = 1.0
# updated_at 会自动由 onupdate 触发
else:
# 3. 如果不存在,插入新记录
new_profile = UserProfile(
thread_id=thread_id,
user_key=key,
user_value=value,
confidence_score=1.0
)
session.add(new_profile)
session.commit()
return f"✅ 成功记住:{key} = {value}"
except Exception as e:
session.rollback()
return f"❌ 保存记忆失败:{str(e)}"
finally:
session.close()
@tool
def get_user_memory(keys: List[str], thread_id: str) -> str:
"""
查询用户的长期记忆。
参数:
keys: 想要查询的键列表 (如 ['name', 'job'])。为空则查所有。
thread_id: 当前会话 ID
"""
session = get_db_session()
try:
query = select(UserProfile).where(UserProfile.thread_id == thread_id)
if keys:
query = query.where(UserProfile.user_key.in_(keys))
# 按创建时间排序,确保拿到最新的(虽然 unique 约束保证只有一个)
query = query.order_by(UserProfile.created_at.desc())
results = session.execute(query).scalars().all()
if not results:
return "ℹ️ 未找到相关记忆。"
# 格式化为易读字符串
memory_list = [f"{p.user_key}: {p.user_value}" for p in results]
return "🧠 长期记忆加载:" + "; ".join(memory_list)
except Exception as e:
return f"❌ 查询记忆失败:{str(e)}"
finally:
session.close()
memory_tools = [save_user_memory, get_user_memory]
✨ 代码亮点解析
- 类型安全:
UserProfile是强类型的 Python 对象。IDE 会自动提示 user_value, thread_id 等属性,写错字段名直接报错,不用等到运行时。
- 防 SQL 注入:
session.execute(stmt)内部会自动处理参数转义。你再也不用担心'; DROP TABLE users; --这种攻击了。
- 逻辑清晰:
select(...).where(...)的链式调用非常像自然语言,可读性远超拼接 SQL 字符串。scalars().first()和scalars().all()是直接取出模型对象,不需要再手动解析元组。
- 事务管理:
commit()和rollback()显式控制事务,确保数据一致性。
把之前的获取时间的函数也放到tools下
# src\Mini_Agent\tools\base_tools.py
from langchain_core.tools import tool
from datetime import datetime
@tool
def get_current_date() -> str:
"""获取当前的日期和星期几。适用于询问今天周几、日期的问题。"""
now = datetime.now()
# 直接返回包含星期的字符串,杜绝幻觉
return f"日期:{now.strftime('%Y-%m-%d')}, 星期:{now.strftime('%A')}"
@tool
def get_current_time() -> str:
"""获取当前的具体时间(时:分:秒)。适用于询问几点、剩余时间的问题。"""
return f"时间:{datetime.now().strftime('%H:%M:%S')}"
# 导出列表
base_tools = [get_current_date, get_current_time]
收集导出的工具列表
# src\Mini_Agent\tools\__init__.py
from .base_tools import base_tools
from .memory_tools import memory_tools
# 统一导出所有工具
ALL_TOOLS = base_tools + memory_tools
🛠️ 步骤 D:重构智能体编排
分层架构 (Layered Architecture) 进行重构:
- Tools Layer: 独立管理所有工具。
- State Layer: 独立定义状态结构。
- Graph Layer: 专注构建工作流(节点、边、编译)。
- Entry Point: 只负责运行和交互。
状态定义 (AgentState)
# src\Mini_Agent\state.py
# --- 2. 定义状态 ---
# 这是 LangGraph 的核心。我们需要定义一个“容器”,用来在节点之间传递数据。
from typing import Annotated, Sequence, TypedDict
from langchain_core.messages import BaseMessage
import operator
# 定义状态结构
class AgentState(TypedDict):
# messages 是一个消息列表,operator.add 表示每次更新状态时,新消息会追加到列表后面(而不是覆盖)
messages: Annotated[Sequence[BaseMessage], operator.add]
LangGraph 构建逻辑 (Nodes, Edges, Compile)
# src\Mini_Agent\graph.py
from langchain_openai import ChatOpenAI
from langgraph.prebuilt import ToolNode
from langgraph.graph import StateGraph, START, END
from langchain_core.messages import AIMessage
from src.core.config import settings
from .state import AgentState
from .tools import ALL_TOOLS
from src.utils.xml_parser import extract_tool_calls_from_content
from src.service.redis_client import RedisClient
# 1. 初始化模型
llm = ChatOpenAI(
model=settings.LLM_MODEL_NAME,
base_url=settings.LLM_BASE_URL,
api_key=settings.LLM_API_KEY,
temperature=settings.LLM_TEMPERATURE
)
llm_with_tools = llm.bind_tools(ALL_TOOLS)
# 2. 定义节点
def call_model(state: AgentState):
messages = state["messages"]
raw_response = llm_with_tools.invoke(messages)
extracted_tool_calls = extract_tool_calls_from_content(raw_response.content)
response = AIMessage(
content=raw_response.content,
tool_calls=extracted_tool_calls
)
return {"messages": [response]}
tool_node = ToolNode(ALL_TOOLS)
# 3. 路由逻辑
def should_continue(state: AgentState):
last_message = state["messages"][-1]
if isinstance(last_message, AIMessage) and last_message.tool_calls:
return "tools"
return END
# 4. 构建图
workflow = StateGraph(AgentState)
workflow.add_node("agent", call_model)
workflow.add_node("tools", tool_node)
workflow.add_edge(START, "agent")
workflow.add_conditional_edges("agent", should_continue, {
"tools": "tools",
END: END
})
workflow.add_edge("tools", "agent")
# 5. 编译 (带 Checkpointer)
redis_client_instance = RedisClient()
memory = redis_client_instance.redis_saver
memory.setup()
app = workflow.compile(checkpointer=memory)
🛠️ 步骤 E:编写main.py
import re
from langchain_core.messages import HumanMessage
from src.Mini_Agent.graph import app
def run_agent(query: str, thread_id: str = "default_session"):
print(f"\n👤 用户 (Thread: {thread_id}): {query}")
config = {"configurable": {"thread_id": thread_id}}
inputs = {"messages": [HumanMessage(content=query)]}
for event in app.stream(inputs, config, stream_mode="values"):
last_msg = event["messages"][-1]
role = last_msg.type
content = last_msg.content
if role == "ai":
if last_msg.tool_calls:
print(f"🧠 思考:准备调用工具 {[t['name'] for t in last_msg.tool_calls]}")
else:
# 清理输出
clean_content = re.sub(r'<think>.*?</think>', '', content, flags=re.DOTALL)
clean_content = re.sub(r'\n+', '\n', clean_content).strip()
print(f"🤖 Agent: {clean_content}")
elif role == "tool":
print(f"📝 工具结果:{content}")
# python -m src.main
if __name__ == "__main__":
# 示例运行
run_agent("我叫pd,记住我。", thread_id="session_004")
run_agent("我是谁?", thread_id="session_004")
在控制台中输入:
python -m src.main
控制台输出:
用户 (Thread: session_004): 我叫pd,记住我。
🧠 思考:准备调用工具 ['save_user_memory']
📝 工具结果:✅ 成功记住:name = pd
🤖 Agent: 已记住您的名字是pd。如果您有其他需要保存的信息,也可以随时告诉我哦!😊
👤 用户 (Thread: session_004): 我是谁?
🧠 思考:准备调用工具 ['get_user_memory']
📝 工具结果:🧠 长期记忆加载:name: pd
🤖 Agent: 您是pd。如果您有其他问题或需要帮助,随时告诉我哦!😊
可以在mysql数据库中查看结果:

✅ 现在的架构全貌
你的 AI Agent 平台现在已经非常完善了:
| 组件 | 技术栈 | 作用 | 数据形式 |
|---|---|---|---|
| 短期记忆 | Redis + LangGraph Checkpointer | 对话上下文、状态快照、断点续传 | JSON Blob (Checkpoints) |
| 长期记忆 | MySQL + SQLAlchemy ORM | 用户画像、永久事实、偏好设置 | Structured Rows (ORM Objects) |
| 大脑 | Qwen (Docker) + LangGraph | 推理、规划、工具调用 | Prompts & Tool Calls |
| 工具链 | Python Functions | 时间、数据库读写、API 调用 | Functions |
这才是真正的云原生、工程化的 AI Agent 架构!
更多推荐



所有评论(0)