本文是「AI 应用开发进阶实战」系列的扩展篇。一个有记忆的 Agent 和没有记忆的 Agent,差距就像金鱼和人类。


一、Agent 为什么需要记忆?

没有记忆的 Agent:

User: 我叫张三,是一名后端工程师。
Agent: 好的张三,有什么可以帮你?

User: 帮我审查这段 Go 代码。
Agent: [审查完成]

User: 上次说的那个并发问题,你再帮我看看这段代码。
Agent: 什么并发问题?您能再说一下吗?  ← 忘光了!

有记忆的 Agent:

User: 我叫张三,是一名后端工程师。
Agent: [记住:用户=张三,角色=后端工程师,语言偏好=Go]

User: 帮我审查这段 Go 代码。
Agent: [审查] [记住:上次讨论了并发安全问题]

User: 上次说的那个并发问题,你再帮我看看这段代码。
Agent: 好的张三,结合我们上次讨论的 goroutine 泄漏问题,
      我来检查这段代码的并发安全性...  ← 有上下文!

二、三层记忆架构

借鉴认知科学,Agent 记忆分三层:

┌─────────────────────────────────────────────┐
│              工作记忆 (Working)               │
│  当前对话的上下文,窗口内可见,瞬时的          │
│  实现:messages array, context window        │
├─────────────────────────────────────────────┤
│              情节记忆 (Episodic)              │
│  过去的对话片段,可检索,中期的                │
│  实现:向量检索 + 摘要                        │
├─────────────────────────────────────────────┤
│              语义记忆 (Semantic)              │
│  持久化的事实、偏好、知识,长期的              │
│  实现:结构化存储 + 规则提取                   │
└─────────────────────────────────────────────┘

三、短期记忆:超越简单的消息列表

3.1 滑动窗口 + 摘要压缩

最基础的方案是保留最近 N 轮对话。但当对话超过上下文窗口时,需要压缩。

# memory/short_term.py
from typing import List, Dict
from openai import OpenAI
import tiktoken


class ShortTermMemory:
    """短期记忆:滑动窗口 + 自动摘要压缩"""
    
    def __init__(
        self,
        max_tokens: int = 8000,
        reserve_for_response: int = 2000,
        model: str = "gpt-4o-mini",
        api_key: str = None,
    ):
        self.max_tokens = max_tokens
        self.reserve = reserve_for_response
        self.messages: List[Dict] = []
        self.summary: str = ""
        self.client = OpenAI(api_key=api_key)
        self.tokenizer = tiktoken.encoding_for_model(model)
    
    def add_message(self, role: str, content: str):
        """添加消息,自动触发压缩"""
        self.messages.append({"role": role, "content": content})
        
        if self._token_count() > self.max_tokens:
            self._compress()
    
    def get_context(self) -> List[Dict]:
        """获取当前上下文"""
        ctx = []
        
        if self.summary:
            ctx.append({
                "role": "system",
                "content": f"[对话历史摘要]\n{self.summary}"
            })
        
        ctx.extend(self.messages)
        return ctx
    
    def _token_count(self) -> int:
        total = len(self.tokenizer.encode(self.summary)) if self.summary else 0
        for msg in self.messages:
            total += len(self.tokenizer.encode(msg["content"]))
        return total + self.reserve
    
    def _compress(self):
        """压缩旧消息为摘要"""
        # 取前 60% 的消息生成摘要
        split_idx = int(len(self.messages) * 0.6)
        old_messages = self.messages[:split_idx]
        self.messages = self.messages[split_idx:]
        
        # 用 LLM 生成摘要
        conversation_text = "\n".join([
            f"{m['role']}: {m['content'][:200]}"
            for m in old_messages
        ])
        
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": "将以下对话压缩为 200 字以内的摘要,保留关键信息和决策。"
            }, {
                "role": "user",
                "content": conversation_text
            }],
            max_tokens=300,
        )
        
        new_summary = response.choices[0].message.content
        
        # 合并新旧摘要
        if self.summary:
            self.summary = self._merge_summaries(self.summary, new_summary)
        else:
            self.summary = new_summary
        
        print(f"[Memory] Compressed {len(old_messages)} messages → "
              f"{len(self.tokenizer.encode(self.summary))} tokens summary")
    
    def _merge_summaries(self, old: str, new: str) -> str:
        """合并两个摘要"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": "将以下两段对话摘要合并为一段连贯的摘要(200字以内)。"
            }, {
                "role": "user",
                "content": f"摘要1: {old}\n\n摘要2: {new}"
            }],
            max_tokens=300,
        )
        return response.choices[0].message.content


# 测试
if __name__ == "__main__":
    mem = ShortTermMemory(max_tokens=2000)
    
    for i in range(30):
        mem.add_message("user", f"这是第{i}条很长的测试消息," * 5)
        print(f"消息 {i}: {len(mem.messages)} 条在窗口, "
              f"摘要: {len(mem.summary)} 字符")

四、情节记忆:可检索的历史对话

4.1 设计思路

对话结束 → 切分为片段 → 生成嵌入 → 存入向量库
                                    ↓
新对话来了 → 相关历史检索 → 注入上下文

4.2 完整实现

# memory/episodic.py
import json
import time
import uuid
from typing import List, Dict, Optional
from datetime import datetime
import chromadb
from openai import OpenAI


class Episode:
    """一个对话片段"""
    def __init__(
        self,
        session_id: str,
        content: str,
        summary: str,
        entities: List[str],
        timestamp: float = None,
    ):
        self.id = str(uuid.uuid4())[:8]
        self.session_id = session_id
        self.content = content
        self.summary = summary
        self.entities = entities
        self.timestamp = timestamp or time.time()
    
    def to_dict(self) -> dict:
        return {
            "id": self.id,
            "session_id": self.session_id,
            "summary": self.summary,
            "entities": ",".join(self.entities),
            "timestamp": self.timestamp,
            "date": datetime.fromtimestamp(self.timestamp).isoformat(),
        }


class EpisodicMemory:
    """情节记忆:向量检索 + 结构化过滤"""
    
    def __init__(
        self,
        api_key: str,
        persist_dir: str = "./episodic_memory",
        embedding_model: str = "text-embedding-3-small",
    ):
        self.client = OpenAI(api_key=api_key)
        self.embedding_model = embedding_model
        
        # ChromaDB
        self.chroma = chromadb.PersistentClient(path=persist_dir)
        self.collection = self.chroma.get_or_create_collection(
            name="episodes",
            metadata={"hnsw:space": "cosine"},
        )
    
    def store(
        self,
        session_id: str,
        messages: List[Dict],
    ) -> Episode:
        """存储一个对话片段"""
        # 提取对话文本
        conversation = "\n".join([
            f"{m['role']}: {m['content']}"
            for m in messages
        ])
        
        # 生成摘要和实体提取
        summary = self._summarize(conversation)
        entities = self._extract_entities(conversation)
        
        episode = Episode(
            session_id=session_id,
            content=conversation,
            summary=summary,
            entities=entities,
        )
        
        # 生成嵌入(用摘要而非全文,更精准)
        embedding = self._embed(summary)
        
        # 存入 ChromaDB
        self.collection.add(
            ids=[episode.id],
            embeddings=[embedding],
            metadatas=[episode.to_dict()],
            documents=[summary],
        )
        
        print(f"[Episodic] Stored episode {episode.id}: "
              f"{summary[:60]}... ({len(entities)} entities)")
        
        return episode
    
    def recall(
        self,
        query: str,
        top_k: int = 5,
        entity_filter: List[str] = None,
        time_range_days: int = None,
    ) -> List[Episode]:
        """检索相关记忆"""
        query_embedding = self._embed(query)
        
        # 构建过滤条件
        where = {}
        if time_range_days:
            cutoff = time.time() - time_range_days * 86400
            where["timestamp"] = {"$gte": cutoff}
        
        results = self.collection.query(
            query_embeddings=[query_embedding],
            n_results=top_k * 2,  # 多取一些,后面做重排序
            where=where if where else None,
            include=["metadatas", "documents", "distances"],
        )
        
        episodes = []
        for i in range(len(results["ids"][0])):
            meta = results["metadatas"][0][i]
            
            # 实体过滤
            if entity_filter:
                episode_entities = set(meta.get("entities", "").split(","))
                if not episode_entities & set(entity_filter):
                    continue  # 没有匹配实体,跳过
            
            episodes.append(Episode(
                session_id=meta["session_id"],
                content="",
                summary=meta["summary"],
                entities=meta.get("entities", "").split(","),
                timestamp=meta["timestamp"],
            ))
        
        # 按时间二次排序(更近的优先)
        episodes.sort(key=lambda e: e.timestamp, reverse=True)
        
        return episodes[:top_k]
    
    def _embed(self, text: str) -> List[float]:
        response = self.client.embeddings.create(
            model=self.embedding_model,
            input=text[:8000],
        )
        return response.data[0].embedding
    
    def _summarize(self, conversation: str) -> str:
        """生成对话摘要(100字以内)"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": "用一句话(100字以内)总结这段对话的核心内容和结论。"
            }, {
                "role": "user",
                "content": conversation[:3000]
            }],
            max_tokens=200,
        )
        return response.choices[0].message.content
    
    def _extract_entities(self, text: str) -> List[str]:
        """提取关键实体(人名、技术、项目名等)"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": "从对话中提取关键实体(人名、技术、产品、项目),"
                          "每行一个,最多10个。只输出实体名。"
            }, {
                "role": "user",
                "content": text[:3000]
            }],
            max_tokens=200,
        )
        entities = response.choices[0].message.content.strip().split("\n")
        return [e.strip("- *").strip() for e in entities if e.strip()]


# 使用示例
episodic = EpisodicMemory(api_key="sk-xxx")

# 存储一段对话
episodic.store("session_001", [
    {"role": "user", "content": "帮我优化这个 SQL 查询,太慢了"},
    {"role": "assistant", "content": "发现缺少索引,建议在 user_id 和 created_at 上建联合索引"},
    {"role": "user", "content": "好的,加索引后从 3s 降到了 50ms"},
])

# 后续对话中召回
memories = episodic.recall(
    query="数据库查询优化",
    top_k=3,
    time_range_days=30,  # 只召回30天内的
)
for mem in memories:
    print(f"[{mem.id}] {mem.summary}")

五、语义记忆:持久化的事实和偏好

5.1 自动提取用户画像

# memory/semantic.py
import json
import sqlite3
from typing import Dict, Any, Optional
from datetime import datetime


class SemanticMemory:
    """语义记忆:结构化的事实和偏好存储"""
    
    def __init__(self, db_path: str = "./semantic_memory.db"):
        self.conn = sqlite3.connect(db_path)
        self.conn.row_factory = sqlite3.Row
        self._init_tables()
    
    def _init_tables(self):
        self.conn.executescript("""
            CREATE TABLE IF NOT EXISTS facts (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                category TEXT NOT NULL,      -- 'preference', 'identity', 'knowledge'
                key TEXT NOT NULL,
                value TEXT NOT NULL,
                confidence REAL DEFAULT 1.0,
                source TEXT,                 -- 从哪条对话中提取的
                created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP,
                UNIQUE(category, key)
            );
            
            CREATE TABLE IF NOT EXISTS fact_history (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                fact_id INTEGER,
                old_value TEXT,
                new_value TEXT,
                changed_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
            );
            
            CREATE INDEX IF NOT EXISTS idx_facts_category ON facts(category);
        """)
        self.conn.commit()
    
    def set_fact(
        self,
        category: str,
        key: str,
        value: str,
        confidence: float = 1.0,
        source: str = None,
    ):
        """设置一个事实"""
        # 查旧值
        old = self.conn.execute(
            "SELECT id, value FROM facts WHERE category=? AND key=?",
            (category, key)
        ).fetchone()
        
        if old:
            # 只有置信度更高或值不同时才更新
            if old["value"] != value:
                self.conn.execute(
                    "UPDATE facts SET value=?, confidence=?, source=?, updated_at=? WHERE id=?",
                    (value, confidence, source, datetime.now(), old["id"])
                )
                # 记录变更历史
                self.conn.execute(
                    "INSERT INTO fact_history (fact_id, old_value, new_value) VALUES (?,?,?)",
                    (old["id"], old["value"], value)
                )
                print(f"[Semantic] Updated: {category}.{key} = {value}")
        else:
            self.conn.execute(
                "INSERT INTO facts (category, key, value, confidence, source) VALUES (?,?,?,?,?)",
                (category, key, value, confidence, source)
            )
            print(f"[Semantic] New: {category}.{key} = {value}")
        
        self.conn.commit()
    
    def get_fact(self, category: str, key: str) -> Optional[str]:
        """获取一个事实"""
        row = self.conn.execute(
            "SELECT value FROM facts WHERE category=? AND key=?",
            (category, key)
        ).fetchone()
        return row["value"] if row else None
    
    def get_all_facts(self, category: str = None) -> Dict[str, str]:
        """获取所有事实"""
        if category:
            rows = self.conn.execute(
                "SELECT key, value FROM facts WHERE category=? ORDER BY updated_at DESC",
                (category,)
            ).fetchall()
        else:
            rows = self.conn.execute(
                "SELECT category, key, value FROM facts ORDER BY category, key"
            ).fetchall()
        
        if category:
            return {r["key"]: r["value"] for r in rows}
        else:
            result = {}
            for r in rows:
                if r["category"] not in result:
                    result[r["category"]] = {}
                result[r["category"]][r["key"]] = r["value"]
            return result
    
    def to_context_string(self) -> str:
        """转换为可注入 LLM 的上下文字符串"""
        facts = self.get_all_facts()
        lines = ["[用户信息]"]
        
        for category, items in facts.items():
            if category == "identity":
                for k, v in items.items():
                    lines.append(f"- {k}: {v}")
        
        lines.append("\n[用户偏好]")
        for category, items in facts.items():
            if category == "preference":
                for k, v in items.items():
                    lines.append(f"- {k}: {v}")
        
        return "\n".join(lines)


# 自动从对话中提取事实
class FactExtractor:
    """从对话中自动提取可存储的事实"""
    
    def __init__(self, memory: SemanticMemory, api_key: str):
        self.memory = memory
        self.client = OpenAI(api_key=api_key)
    
    def extract_from_turn(self, user_message: str, assistant_response: str):
        """从一轮对话中提取事实"""
        response = self.client.chat.completions.create(
            model="gpt-4o-mini",
            messages=[{
                "role": "system",
                "content": """从用户消息中提取可长期存储的信息。输出严格 JSON:

{
  "identity": {"name": "用户姓名", "role": "职业", ...},
  "preferences": {"language": "编程语言偏好", "code_style": "代码风格偏好", ...},
  "knowledge": {"project_x": "关于项目X的信息", ...}
}

规则:
- identity: 用户是谁(姓名、职业、技能等)
- preferences: 用户的偏好和习惯
- knowledge: 用户分享的领域知识
- 不输出 null,空对象用 {}
- 只提取新信息,不要重复已知信息"""
            }, {
                "role": "user",
                "content": f"用户消息: {user_message}"
            }],
            temperature=0.1,
        )
        
        try:
            raw = response.choices[0].message.content
            if raw.startswith("```"):
                raw = raw.split("\n", 1)[1].rsplit("```", 1)[0]
            facts = json.loads(raw)
            
            for category in ["identity", "preferences", "knowledge"]:
                for key, value in facts.get(category, {}).items():
                    if value and value != "null":
                        self.memory.set_fact(category, key, value,
                                            source=user_message[:100])
        except json.JSONDecodeError:
            pass

六、记忆管理器:三合一

# memory/manager.py
class MemoryManager:
    """统一记忆管理器"""
    
    def __init__(self, api_key: str):
        self.short_term = ShortTermMemory(api_key=api_key)
        self.episodic = EpisodicMemory(api_key=api_key)
        self.semantic = SemanticMemory()
        self.extractor = FactExtractor(self.semantic, api_key)
    
    def build_context(self, user_message: str) -> List[Dict]:
        """构建完整的 LLM 上下文"""
        system_additions = []
        
        # 1. 语义记忆:用户画像
        profile = self.semantic.to_context_string()
        system_additions.append(profile)
        
        # 2. 情节记忆:相关历史
        episodes = self.episodic.recall(
            query=user_message,
            top_k=3,
            time_range_days=30,
        )
        if episodes:
            history = "\n".join([
                f"- [{datetime.fromtimestamp(e.timestamp).strftime('%m/%d')}] {e.summary}"
                for e in episodes
            ])
            system_additions.append(f"\n[相关历史对话]\n{history}")
        
        # 3. 注入到 system prompt
        if system_additions:
            system_msg = "\n\n".join(system_additions)
            self.short_term.messages.insert(0, {
                "role": "system",
                "content": system_msg
            })
        
        return self.short_term.get_context()
    
    def process_turn(self, user_msg: str, assistant_msg: str):
        """处理一轮对话"""
        # 更新短期记忆
        self.short_term.add_message("user", user_msg)
        self.short_term.add_message("assistant", assistant_msg)
        
        # 提取语义事实
        self.extractor.extract_from_turn(user_msg, assistant_msg)
    
    def end_session(self, session_id: str):
        """会话结束:存储情节记忆"""
        self.episodic.store(
            session_id=session_id,
            messages=self.short_term.messages,
        )
        print(f"[Memory] Session {session_id} archived to episodic memory")


# === 完整使用 ===
if __name__ == "__main__":
    manager = MemoryManager(api_key="sk-xxx")
    
    # 用户对话
    turns = [
        ("我叫张三,做了5年后端开发,主要用 Go 和 Python。个人偏好简洁的代码风格。",
         "好的张三,已记住你的技术栈和代码偏好。"),
        ("我们在做一个微服务项目,用 gRPC 做服务间通信。最近发现有些接口延迟很高。",
         "gRPC 延迟高可能有几个原因:连接池配置、序列化开销、网络抖动。能提供更多细节吗?"),
        ("就是那个订单服务的 CreateOrder 接口,有时候要 3 秒才返回。",
         "3秒对 gRPC 来说确实异常。建议先用 pprof + trace 定位瓶颈点。"),
    ]
    
    session_id = "session_2024_001"
    
    for user_msg, assistant_msg in turns:
        context = manager.build_context(user_msg)
        print(f"\n上下文长度: {len(context)} 条消息")
        manager.process_turn(user_msg, assistant_msg)
    
    # 结束会话
    manager.end_session(session_id)
    
    # 打印提取的用户画像
    print("\n=== 用户画像 ===")
    print(manager.semantic.to_context_string())

七、记忆系统的工程考量

7.1 存储成本

记忆层 存储 单次查询成本 1000 次/天
短期 内存 $0 $0
情节 ChromaDB ~$0.001(embedding) ~$1
语义 SQLite ~$0.001(提取+嵌入) ~$2

7.2 遗忘策略

时间衰减:
  30天前的记忆 → 权重 × 0.5
  90天前的记忆 → 权重 × 0.1
  180天前的记忆 → 归档或删除

重复强化:
  被检索 3 次以上的记忆 → 提升权重
  用户主动提及的记忆 → 标记为重要,不过期

冲突解决:
  语义记忆值冲突 → 保留最新 + 记录历史
  偏好改变 → 更新 fact,旧值进 history 表

7.3 隐私与安全

class PrivacyMemory(SemanticMemory):
    """带隐私控制的语义记忆"""
    
    SENSITIVE_KEYS = {"password", "token", "secret", "api_key", "phone", "email_raw"}
    
    def set_fact(self, category, key, value, **kwargs):
        # 过滤敏感信息
        if key.lower() in self.SENSITIVE_KEYS:
            print(f"[Privacy] Blocked sensitive key: {key}")
            return
        
        # 脱敏处理
        if key == "email" and "@" in value:
            parts = value.split("@")
            value = f"{parts[0][:2]}***@{parts[1]}"
        
        super().set_fact(category, key, value, **kwargs)

八、总结

三层记忆 = 短期(上下文) + 情节(检索) + 语义(事实)

关键设计:
  短期 → 滑动窗口 + 自动摘要压缩
  情节 → 向量检索 + 实体过滤 + 时间衰减
  语义 → 结构化存储 + 自动提取 + 冲突解决

工程要点:
  成本可控(向量检索用轻量 embedding)
  隐私保护(敏感字段过滤 + 脱敏)
  自动维护(提取 → 存储 → 过期 → 遗忘)

一个带记忆的 Agent 会让用户感觉"它真的记得我",这种体验的差距是 Prompt Engineering 无法弥补的。

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐