AI Agent 记忆系统设计:短期、长期与情节记忆的工程实现
·
本文是「AI 应用开发进阶实战」系列的扩展篇。一个有记忆的 Agent 和没有记忆的 Agent,差距就像金鱼和人类。
一、Agent 为什么需要记忆?
没有记忆的 Agent:
User: 我叫张三,是一名后端工程师。
Agent: 好的张三,有什么可以帮你?
User: 帮我审查这段 Go 代码。
Agent: [审查完成]
User: 上次说的那个并发问题,你再帮我看看这段代码。
Agent: 什么并发问题?您能再说一下吗? ← 忘光了!
有记忆的 Agent:
User: 我叫张三,是一名后端工程师。
Agent: [记住:用户=张三,角色=后端工程师,语言偏好=Go]
User: 帮我审查这段 Go 代码。
Agent: [审查] [记住:上次讨论了并发安全问题]
User: 上次说的那个并发问题,你再帮我看看这段代码。
Agent: 好的张三,结合我们上次讨论的 goroutine 泄漏问题,
我来检查这段代码的并发安全性... ← 有上下文!
二、三层记忆架构
借鉴认知科学,Agent 记忆分三层:
┌─────────────────────────────────────────────┐
│ 工作记忆 (Working) │
│ 当前对话的上下文,窗口内可见,瞬时的 │
│ 实现:messages array, context window │
├─────────────────────────────────────────────┤
│ 情节记忆 (Episodic) │
│ 过去的对话片段,可检索,中期的 │
│ 实现:向量检索 + 摘要 │
├─────────────────────────────────────────────┤
│ 语义记忆 (Semantic) │
│ 持久化的事实、偏好、知识,长期的 │
│ 实现:结构化存储 + 规则提取 │
└─────────────────────────────────────────────┘
三、短期记忆:超越简单的消息列表
3.1 滑动窗口 + 摘要压缩
最基础的方案是保留最近 N 轮对话。但当对话超过上下文窗口时,需要压缩。
# memory/short_term.py
from typing import List, Dict
from openai import OpenAI
import tiktoken
class ShortTermMemory:
"""短期记忆:滑动窗口 + 自动摘要压缩"""
def __init__(
self,
max_tokens: int = 8000,
reserve_for_response: int = 2000,
model: str = "gpt-4o-mini",
api_key: str = None,
):
self.max_tokens = max_tokens
self.reserve = reserve_for_response
self.messages: List[Dict] = []
self.summary: str = ""
self.client = OpenAI(api_key=api_key)
self.tokenizer = tiktoken.encoding_for_model(model)
def add_message(self, role: str, content: str):
"""添加消息,自动触发压缩"""
self.messages.append({"role": role, "content": content})
if self._token_count() > self.max_tokens:
self._compress()
def get_context(self) -> List[Dict]:
"""获取当前上下文"""
ctx = []
if self.summary:
ctx.append({
"role": "system",
"content": f"[对话历史摘要]\n{self.summary}"
})
ctx.extend(self.messages)
return ctx
def _token_count(self) -> int:
total = len(self.tokenizer.encode(self.summary)) if self.summary else 0
for msg in self.messages:
total += len(self.tokenizer.encode(msg["content"]))
return total + self.reserve
def _compress(self):
"""压缩旧消息为摘要"""
# 取前 60% 的消息生成摘要
split_idx = int(len(self.messages) * 0.6)
old_messages = self.messages[:split_idx]
self.messages = self.messages[split_idx:]
# 用 LLM 生成摘要
conversation_text = "\n".join([
f"{m['role']}: {m['content'][:200]}"
for m in old_messages
])
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "将以下对话压缩为 200 字以内的摘要,保留关键信息和决策。"
}, {
"role": "user",
"content": conversation_text
}],
max_tokens=300,
)
new_summary = response.choices[0].message.content
# 合并新旧摘要
if self.summary:
self.summary = self._merge_summaries(self.summary, new_summary)
else:
self.summary = new_summary
print(f"[Memory] Compressed {len(old_messages)} messages → "
f"{len(self.tokenizer.encode(self.summary))} tokens summary")
def _merge_summaries(self, old: str, new: str) -> str:
"""合并两个摘要"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "将以下两段对话摘要合并为一段连贯的摘要(200字以内)。"
}, {
"role": "user",
"content": f"摘要1: {old}\n\n摘要2: {new}"
}],
max_tokens=300,
)
return response.choices[0].message.content
# 测试
if __name__ == "__main__":
mem = ShortTermMemory(max_tokens=2000)
for i in range(30):
mem.add_message("user", f"这是第{i}条很长的测试消息," * 5)
print(f"消息 {i}: {len(mem.messages)} 条在窗口, "
f"摘要: {len(mem.summary)} 字符")
四、情节记忆:可检索的历史对话
4.1 设计思路
对话结束 → 切分为片段 → 生成嵌入 → 存入向量库
↓
新对话来了 → 相关历史检索 → 注入上下文
4.2 完整实现
# memory/episodic.py
import json
import time
import uuid
from typing import List, Dict, Optional
from datetime import datetime
import chromadb
from openai import OpenAI
class Episode:
"""一个对话片段"""
def __init__(
self,
session_id: str,
content: str,
summary: str,
entities: List[str],
timestamp: float = None,
):
self.id = str(uuid.uuid4())[:8]
self.session_id = session_id
self.content = content
self.summary = summary
self.entities = entities
self.timestamp = timestamp or time.time()
def to_dict(self) -> dict:
return {
"id": self.id,
"session_id": self.session_id,
"summary": self.summary,
"entities": ",".join(self.entities),
"timestamp": self.timestamp,
"date": datetime.fromtimestamp(self.timestamp).isoformat(),
}
class EpisodicMemory:
"""情节记忆:向量检索 + 结构化过滤"""
def __init__(
self,
api_key: str,
persist_dir: str = "./episodic_memory",
embedding_model: str = "text-embedding-3-small",
):
self.client = OpenAI(api_key=api_key)
self.embedding_model = embedding_model
# ChromaDB
self.chroma = chromadb.PersistentClient(path=persist_dir)
self.collection = self.chroma.get_or_create_collection(
name="episodes",
metadata={"hnsw:space": "cosine"},
)
def store(
self,
session_id: str,
messages: List[Dict],
) -> Episode:
"""存储一个对话片段"""
# 提取对话文本
conversation = "\n".join([
f"{m['role']}: {m['content']}"
for m in messages
])
# 生成摘要和实体提取
summary = self._summarize(conversation)
entities = self._extract_entities(conversation)
episode = Episode(
session_id=session_id,
content=conversation,
summary=summary,
entities=entities,
)
# 生成嵌入(用摘要而非全文,更精准)
embedding = self._embed(summary)
# 存入 ChromaDB
self.collection.add(
ids=[episode.id],
embeddings=[embedding],
metadatas=[episode.to_dict()],
documents=[summary],
)
print(f"[Episodic] Stored episode {episode.id}: "
f"{summary[:60]}... ({len(entities)} entities)")
return episode
def recall(
self,
query: str,
top_k: int = 5,
entity_filter: List[str] = None,
time_range_days: int = None,
) -> List[Episode]:
"""检索相关记忆"""
query_embedding = self._embed(query)
# 构建过滤条件
where = {}
if time_range_days:
cutoff = time.time() - time_range_days * 86400
where["timestamp"] = {"$gte": cutoff}
results = self.collection.query(
query_embeddings=[query_embedding],
n_results=top_k * 2, # 多取一些,后面做重排序
where=where if where else None,
include=["metadatas", "documents", "distances"],
)
episodes = []
for i in range(len(results["ids"][0])):
meta = results["metadatas"][0][i]
# 实体过滤
if entity_filter:
episode_entities = set(meta.get("entities", "").split(","))
if not episode_entities & set(entity_filter):
continue # 没有匹配实体,跳过
episodes.append(Episode(
session_id=meta["session_id"],
content="",
summary=meta["summary"],
entities=meta.get("entities", "").split(","),
timestamp=meta["timestamp"],
))
# 按时间二次排序(更近的优先)
episodes.sort(key=lambda e: e.timestamp, reverse=True)
return episodes[:top_k]
def _embed(self, text: str) -> List[float]:
response = self.client.embeddings.create(
model=self.embedding_model,
input=text[:8000],
)
return response.data[0].embedding
def _summarize(self, conversation: str) -> str:
"""生成对话摘要(100字以内)"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "用一句话(100字以内)总结这段对话的核心内容和结论。"
}, {
"role": "user",
"content": conversation[:3000]
}],
max_tokens=200,
)
return response.choices[0].message.content
def _extract_entities(self, text: str) -> List[str]:
"""提取关键实体(人名、技术、项目名等)"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": "从对话中提取关键实体(人名、技术、产品、项目),"
"每行一个,最多10个。只输出实体名。"
}, {
"role": "user",
"content": text[:3000]
}],
max_tokens=200,
)
entities = response.choices[0].message.content.strip().split("\n")
return [e.strip("- *").strip() for e in entities if e.strip()]
# 使用示例
episodic = EpisodicMemory(api_key="sk-xxx")
# 存储一段对话
episodic.store("session_001", [
{"role": "user", "content": "帮我优化这个 SQL 查询,太慢了"},
{"role": "assistant", "content": "发现缺少索引,建议在 user_id 和 created_at 上建联合索引"},
{"role": "user", "content": "好的,加索引后从 3s 降到了 50ms"},
])
# 后续对话中召回
memories = episodic.recall(
query="数据库查询优化",
top_k=3,
time_range_days=30, # 只召回30天内的
)
for mem in memories:
print(f"[{mem.id}] {mem.summary}")
五、语义记忆:持久化的事实和偏好
5.1 自动提取用户画像
# memory/semantic.py
import json
import sqlite3
from typing import Dict, Any, Optional
from datetime import datetime
class SemanticMemory:
"""语义记忆:结构化的事实和偏好存储"""
def __init__(self, db_path: str = "./semantic_memory.db"):
self.conn = sqlite3.connect(db_path)
self.conn.row_factory = sqlite3.Row
self._init_tables()
def _init_tables(self):
self.conn.executescript("""
CREATE TABLE IF NOT EXISTS facts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
category TEXT NOT NULL, -- 'preference', 'identity', 'knowledge'
key TEXT NOT NULL,
value TEXT NOT NULL,
confidence REAL DEFAULT 1.0,
source TEXT, -- 从哪条对话中提取的
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
UNIQUE(category, key)
);
CREATE TABLE IF NOT EXISTS fact_history (
id INTEGER PRIMARY KEY AUTOINCREMENT,
fact_id INTEGER,
old_value TEXT,
new_value TEXT,
changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
""")
self.conn.commit()
def set_fact(
self,
category: str,
key: str,
value: str,
confidence: float = 1.0,
source: str = None,
):
"""设置一个事实"""
# 查旧值
old = self.conn.execute(
"SELECT id, value FROM facts WHERE category=? AND key=?",
(category, key)
).fetchone()
if old:
# 只有置信度更高或值不同时才更新
if old["value"] != value:
self.conn.execute(
"UPDATE facts SET value=?, confidence=?, source=?, updated_at=? WHERE id=?",
(value, confidence, source, datetime.now(), old["id"])
)
# 记录变更历史
self.conn.execute(
"INSERT INTO fact_history (fact_id, old_value, new_value) VALUES (?,?,?)",
(old["id"], old["value"], value)
)
print(f"[Semantic] Updated: {category}.{key} = {value}")
else:
self.conn.execute(
"INSERT INTO facts (category, key, value, confidence, source) VALUES (?,?,?,?,?)",
(category, key, value, confidence, source)
)
print(f"[Semantic] New: {category}.{key} = {value}")
self.conn.commit()
def get_fact(self, category: str, key: str) -> Optional[str]:
"""获取一个事实"""
row = self.conn.execute(
"SELECT value FROM facts WHERE category=? AND key=?",
(category, key)
).fetchone()
return row["value"] if row else None
def get_all_facts(self, category: str = None) -> Dict[str, str]:
"""获取所有事实"""
if category:
rows = self.conn.execute(
"SELECT key, value FROM facts WHERE category=? ORDER BY updated_at DESC",
(category,)
).fetchall()
else:
rows = self.conn.execute(
"SELECT category, key, value FROM facts ORDER BY category, key"
).fetchall()
if category:
return {r["key"]: r["value"] for r in rows}
else:
result = {}
for r in rows:
if r["category"] not in result:
result[r["category"]] = {}
result[r["category"]][r["key"]] = r["value"]
return result
def to_context_string(self) -> str:
"""转换为可注入 LLM 的上下文字符串"""
facts = self.get_all_facts()
lines = ["[用户信息]"]
for category, items in facts.items():
if category == "identity":
for k, v in items.items():
lines.append(f"- {k}: {v}")
lines.append("\n[用户偏好]")
for category, items in facts.items():
if category == "preference":
for k, v in items.items():
lines.append(f"- {k}: {v}")
return "\n".join(lines)
# 自动从对话中提取事实
class FactExtractor:
"""从对话中自动提取可存储的事实"""
def __init__(self, memory: SemanticMemory, api_key: str):
self.memory = memory
self.client = OpenAI(api_key=api_key)
def extract_from_turn(self, user_message: str, assistant_response: str):
"""从一轮对话中提取事实"""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{
"role": "system",
"content": """从用户消息中提取可长期存储的信息。输出严格 JSON:
{
"identity": {"name": "用户姓名", "role": "职业", ...},
"preferences": {"language": "编程语言偏好", "code_style": "代码风格偏好", ...},
"knowledge": {"project_x": "关于项目X的信息", ...}
}
规则:
- identity: 用户是谁(姓名、职业、技能等)
- preferences: 用户的偏好和习惯
- knowledge: 用户分享的领域知识
- 不输出 null,空对象用 {}
- 只提取新信息,不要重复已知信息"""
}, {
"role": "user",
"content": f"用户消息: {user_message}"
}],
temperature=0.1,
)
try:
raw = response.choices[0].message.content
if raw.startswith("```"):
raw = raw.split("\n", 1)[1].rsplit("```", 1)[0]
facts = json.loads(raw)
for category in ["identity", "preferences", "knowledge"]:
for key, value in facts.get(category, {}).items():
if value and value != "null":
self.memory.set_fact(category, key, value,
source=user_message[:100])
except json.JSONDecodeError:
pass
六、记忆管理器:三合一
# memory/manager.py
class MemoryManager:
"""统一记忆管理器"""
def __init__(self, api_key: str):
self.short_term = ShortTermMemory(api_key=api_key)
self.episodic = EpisodicMemory(api_key=api_key)
self.semantic = SemanticMemory()
self.extractor = FactExtractor(self.semantic, api_key)
def build_context(self, user_message: str) -> List[Dict]:
"""构建完整的 LLM 上下文"""
system_additions = []
# 1. 语义记忆:用户画像
profile = self.semantic.to_context_string()
system_additions.append(profile)
# 2. 情节记忆:相关历史
episodes = self.episodic.recall(
query=user_message,
top_k=3,
time_range_days=30,
)
if episodes:
history = "\n".join([
f"- [{datetime.fromtimestamp(e.timestamp).strftime('%m/%d')}] {e.summary}"
for e in episodes
])
system_additions.append(f"\n[相关历史对话]\n{history}")
# 3. 注入到 system prompt
if system_additions:
system_msg = "\n\n".join(system_additions)
self.short_term.messages.insert(0, {
"role": "system",
"content": system_msg
})
return self.short_term.get_context()
def process_turn(self, user_msg: str, assistant_msg: str):
"""处理一轮对话"""
# 更新短期记忆
self.short_term.add_message("user", user_msg)
self.short_term.add_message("assistant", assistant_msg)
# 提取语义事实
self.extractor.extract_from_turn(user_msg, assistant_msg)
def end_session(self, session_id: str):
"""会话结束:存储情节记忆"""
self.episodic.store(
session_id=session_id,
messages=self.short_term.messages,
)
print(f"[Memory] Session {session_id} archived to episodic memory")
# === 完整使用 ===
if __name__ == "__main__":
manager = MemoryManager(api_key="sk-xxx")
# 用户对话
turns = [
("我叫张三,做了5年后端开发,主要用 Go 和 Python。个人偏好简洁的代码风格。",
"好的张三,已记住你的技术栈和代码偏好。"),
("我们在做一个微服务项目,用 gRPC 做服务间通信。最近发现有些接口延迟很高。",
"gRPC 延迟高可能有几个原因:连接池配置、序列化开销、网络抖动。能提供更多细节吗?"),
("就是那个订单服务的 CreateOrder 接口,有时候要 3 秒才返回。",
"3秒对 gRPC 来说确实异常。建议先用 pprof + trace 定位瓶颈点。"),
]
session_id = "session_2024_001"
for user_msg, assistant_msg in turns:
context = manager.build_context(user_msg)
print(f"\n上下文长度: {len(context)} 条消息")
manager.process_turn(user_msg, assistant_msg)
# 结束会话
manager.end_session(session_id)
# 打印提取的用户画像
print("\n=== 用户画像 ===")
print(manager.semantic.to_context_string())
七、记忆系统的工程考量
7.1 存储成本
| 记忆层 | 存储 | 单次查询成本 | 1000 次/天 |
|---|---|---|---|
| 短期 | 内存 | $0 | $0 |
| 情节 | ChromaDB | ~$0.001(embedding) | ~$1 |
| 语义 | SQLite | ~$0.001(提取+嵌入) | ~$2 |
7.2 遗忘策略
时间衰减:
30天前的记忆 → 权重 × 0.5
90天前的记忆 → 权重 × 0.1
180天前的记忆 → 归档或删除
重复强化:
被检索 3 次以上的记忆 → 提升权重
用户主动提及的记忆 → 标记为重要,不过期
冲突解决:
语义记忆值冲突 → 保留最新 + 记录历史
偏好改变 → 更新 fact,旧值进 history 表
7.3 隐私与安全
class PrivacyMemory(SemanticMemory):
"""带隐私控制的语义记忆"""
SENSITIVE_KEYS = {"password", "token", "secret", "api_key", "phone", "email_raw"}
def set_fact(self, category, key, value, **kwargs):
# 过滤敏感信息
if key.lower() in self.SENSITIVE_KEYS:
print(f"[Privacy] Blocked sensitive key: {key}")
return
# 脱敏处理
if key == "email" and "@" in value:
parts = value.split("@")
value = f"{parts[0][:2]}***@{parts[1]}"
super().set_fact(category, key, value, **kwargs)
八、总结
三层记忆 = 短期(上下文) + 情节(检索) + 语义(事实)
关键设计:
短期 → 滑动窗口 + 自动摘要压缩
情节 → 向量检索 + 实体过滤 + 时间衰减
语义 → 结构化存储 + 自动提取 + 冲突解决
工程要点:
成本可控(向量检索用轻量 embedding)
隐私保护(敏感字段过滤 + 脱敏)
自动维护(提取 → 存储 → 过期 → 遗忘)
一个带记忆的 Agent 会让用户感觉"它真的记得我",这种体验的差距是 Prompt Engineering 无法弥补的。
更多推荐

所有评论(0)