内容参考于:图灵AI大模型全栈

实现一个图RAG

输入文档,文档切分,切分后使用大模型提取知识图谱,生成后保存图数据到Neo4j数据库

输入问题给大模型,大模型生成CQL语法,然后去查询Neo4j的数据,然后输出答案

langchain_neo4j要0.9.0版本

Python -m pip install langchain-neo4j==0.9.0

如下图大模型通过分析文档,转成的知识图谱,通过提示词实现

代码,下方用到的 西游记.txt 里的内容就是西游记小说的第一章和第二章

使用提示词,让大模型分析文章,下方的代码是分析西游记小说,从小说中提取关系,并返回成一个json格式,然后通过json个组装成GraphDocument格式(Neo4j通过它插入数据)

下方代码调用逻辑

1. 初始化(脚本启动自动执行)

加载环境配置 → 初始化大模型 → 连接 Neo4j 数据库 → 写好提取规则 → 组装好图谱提取链

2. 构建知识图谱(调用 ingest_data 触发)

读入本地文本文件 → 把长文切成小块 → 逐块调用大模型提取节点和关系(通过提示词实现) → 批量写入 Neo4j 数据库

3. 问答查询(调用 rag_chain.invoke 触发)

接收用户问题 → 自动转成数据库查询语句 → 去 Neo4j 查结果 → 大模型根据查到的图谱数据生成回答 → 返回最终答案

# 导入操作系统交互模块,用来读取环境变量、操作文件路径等
import os
# 导入类型提示工具 List,用来标注「列表」类型的参数/返回值,让代码更易读,编辑器能做语法提示
from typing import List

# 导入 OpenAI 协议的大模型封装类,用来调用兼容 OpenAI 接口的大模型(这里用来调用通义千问)
from langchain_openai import ChatOpenAI
# 从 langchain_neo4j 导入两个核心工具:
# - Neo4jGraph:Neo4j 图数据库的连接与操作封装
# - GraphCypherQAChain:图数据库问答链,自动把自然语言问题转成 Cypher 查询语句再查库回答
from langchain_neo4j import Neo4jGraph, GraphCypherQAChain
# 导入 LangChain 标准文档对象,每个 Document 包含文本内容和元数据
from langchain_core.documents import Document
# 导入聊天提示词模板类,用来构建结构化的提示词,支持系统提示+用户提示的格式
from langchain_core.prompts import ChatPromptTemplate
# 导入 JSON 输出解析器,自动把大模型返回的文本转成 Python 字典/列表,方便程序处理
from langchain_core.output_parsers import JsonOutputParser
# 导入递归字符文本切分器,用来把长文本按规则切成小块,方便大模型分批处理
from langchain_text_splitters import RecursiveCharacterTextSplitter

# 导入 dotenv 库的 load_dotenv 函数,用来加载 .env 文件里的环境变量(比如 API Key、接口地址)
# 好处:敏感信息不写死在代码里,更安全,换环境也方便
from dotenv import load_dotenv

# 从 langchain 社区工具里导入知识图谱文档相关的三个类:
# - GraphDocument:图文档对象,对应一个文本块提取出的完整图谱片段(包含节点+关系+来源文档)
# - Node as LangChainNode:节点类,代表知识图谱里的实体(比如人物、地点、法宝),起别名是避免和其他类重名
# - Relationship as LangChainRelationship:关系类,代表两个节点之间的关联(比如师徒、拥有、位于)
from langchain_community.graphs.graph_document import (
    GraphDocument,
    Node as LangChainNode,
    Relationship as LangChainRelationship,
)

# 执行加载 .env 文件:把 .env 文件里的键值对读取到系统环境变量中
# 后面 os.getenv() 就能拿到这些配置了
load_dotenv()

# 常量:Neo4j 数据库的连接地址
# bolt 是 Neo4j 专用的二进制通信协议,效率更高;localhost 表示本地服务;7687 是 Bolt 协议默认端口
NEO4J_URI = "bolt://localhost:7687"
# 常量:Neo4j 登录用户名,默认初始用户名为 neo4j
NEO4J_USERNAME = "neo4j"
# 常量:Neo4j 登录密码,使用前需要替换成你自己本地数据库设置的密码
NEO4J_PASSWORD = "11111111"  # 替换为你的密码

# 初始化大语言模型实例 llm,后面所有调用大模型的地方都用这个对象
llm = ChatOpenAI(
    # 注释掉的备选模型:如果有 qwen3.7-plus 权限,可以取消这行注释、注释下面一行来切换模型
    # model="qwen3.7-plus",
    # 指定使用的模型名称,这里是通义千问的 plus 版本
    model="qwen-plus",
    # 从环境变量中读取 API 密钥,对应 .env 文件里的 DASHSCOPE_API_KEY
    api_key=os.getenv("DASHSCOPE_API_KEY"),
    # 从环境变量中读取接口基础地址,因为通义千问兼容 OpenAI 协议,需要指定代理地址
    base_url=os.getenv("DASHSCOPE_BASE_URL"),
    # 温度参数:控制大模型输出的随机性
    # 调低到 0.15 的原因:知识提取需要准确、稳定,不能让大模型自由发挥、编造内容
    # 温度越低,输出越确定、越保守;温度越高,创造性越强、越容易瞎编
    temperature=0.15, # 调低温度从而让大模型根据文档生成内容,不让它乱编乱造
)

# 创建 Neo4j 图数据库连接实例 graph
# 后续所有对图数据库的增删改查都通过这个对象来操作
graph = Neo4jGraph(
    url=NEO4J_URI,        # 数据库连接地址
    username=NEO4J_USERNAME,  # 登录用户名
    password=NEO4J_PASSWORD   # 登录密码
)

# ========== 知识图谱提取提示词模板(核心规则载体) ==========
# 整体定位:整个图谱提取环节的「规则说明书」,直接决定输出质量、格式合法性和入库可用性
# 核心目标:把大模型从「自由聊天的通用模型」约束成「标准化提取工具」
#           让大模型输入一段自然文本,稳定输出可直接解析、可直接入库的纯JSON数据
# 设计逻辑:按「定角色 → 给标准 → 立规则 → 强约束 → 给示例 → 重强调」的顺序层层约束
# 链路配合:本提示词 → llm大模型 → json_parser解析器 → extract_one_document组装 → Neo4j入库
#           提示词的约束强度,直接决定了整条自动化链路能不能稳定跑通
extraction_prompt = ChatPromptTemplate.from_messages([
    # ==================== system 系统消息(全局固定规则) ====================
    # 作用:给大模型设定永久生效的指令、标准、格式要求,所有请求都遵循同一套规则
    # 内部分为7个核心模块,每个模块的设计逻辑如下:
    # 模块1:角色与任务定位
    #   作用:划定领域范围(古典小说+西游记)和核心任务(提取实体+关系),避免输出跑偏
    #   背后逻辑:大模型是通用模型,不限制领域会用通用百科标准提取,明确领域后会自动适配原著场景
    #            比如能识别洞府、法宝、妖怪等原著特有实体,提取精准度更高
    # 模块2:实体类型建议列表
    #   作用:统一节点的 type 字段命名规范,全部采用英文标准类型
    #   背后逻辑:如果不做限定,大模型对同一类实体会随机输出「人物/角色/Person」等多种命名
    #            而Neo4j是按节点类型查询的,类型不统一会导致查不全,图谱直接废掉
    #            英文类型完全兼容Neo4j和LangChain原生规范,避免中文编码、空格问题
    #            加「不强制只用这些」是保留灵活性,遇到特殊实体可自行补充,不会漏掉信息
    # 模块3:关系类型建议列表
    #   作用:统一关系的命名和方向,避免同一种关系出现十几种不同表述
    #   背后逻辑:自然语言里同一种关系有无数种说法(拜师/师徒/是徒弟),不统一会导致关系碎片化
    #            最终图谱里同一种关系有多个type,节点之间连不起来,没法做关联查询
    #            英文大写+下划线是Neo4j标准命名风格,后续写Cypher查询语句更方便
    #            明确关系方向(比如徒弟→师父),避免正反颠倒导致逻辑混乱
    # 模块4:五条核心提取规则(质量控制的核心)
    #   规则1「不脑补」:严格限定信息只能来自当前文本,防止大模型把自身知道的西游记常识加进去
    #                   保证图谱里的每一条数据都能追溯到原文片段,避免虚假信息
    #   规则2「统一实体名」:解决实体对齐问题,同一个实体只用最通用的名称当id
    #                       避免孙悟空/美猴王/齐天大圣变成三个独立节点,导致关系断裂
    #   规则3「跨块一致」:适配文本分块提取的场景,保证不同chunk里的同一个实体命名统一
    #                     因为文本是切成小块独立提取的,大模型跨块没有记忆,必须反复强调一致性
    #                     命名统一后,Neo4j入库会自动合并同id节点,形成完整关联图谱
    #   规则4「纯JSON」:强制输出纯JSON,不能有任何解释、markdown、客套话
    #                   因为后面接了JsonOutputParser自动解析,多一个字都会解析失败,整条链路中断
    #   规则5「空内容返回空数组」:处理边界场景(比如纯环境描写、没有实体的chunk)
    #                             避免大模型返回自然语言描述,导致解析报错
    # 模块5:id字段强制要求
    #   作用:强制符合LangChain Node对象和Neo4j节点的结构规范
    #   背后逻辑:id是节点的唯一标识,漏了id程序会直接报错;别称放properties里是两全设计
    #            既保留了实体的别名信息,又不破坏id的唯一性,查询时可以精确匹配也可以模糊匹配
    # 模块6:输出格式示例
    #   作用:给大模型提供具象的模仿模板,大幅提升格式准确率
    #   背后逻辑:大模型对示例的遵循度远高于纯文字规则,只靠文字说格式很容易出错
    #            给一个完整、字段齐全的示例,大模型会严格照着模板套,格式错误率会大幅下降
    #            语法说明:双大括号{{}}是Python字符串模板的转义写法
    #            因为单大括号{}是模板变量占位符,要输出真正的大括号字符就必须写两个转义
    #            实际传给大模型的是正常的单大括号JSON格式
    # 模块7:末尾重点重申
    #   作用:把最核心的格式、命名要求再重复一遍,避免长提示词导致注意力稀释
    #   背后逻辑:提示词越长,中间的内容越容易被大模型忽略,末尾重复核心要求能显著提升遵循度
    ("system", """你是一个擅长从中文古典小说中提取知识图谱的专家。
请严格从以下文本中提取主要**实体**和**关系**,重点关注《西游记》相关内容。

实体类型建议(但不强制只用这些):
  Person(人、神、妖、仙)、Place(地点、山、洞府、天庭)、Item(法宝、兵器、宝贝)、Event(事件)、Group(组织、派系)

关系类型建议(常用):
  MASTER_OF, DISCIPLE_OF(师徒)、LOCATED_IN(位于)、OWNS(拥有)、USED_BY(使用)、ENEMY_OF(敌人)、BATTLE_WITH(战斗)、FROM(来自)、CREATED_BY(制造)、TRANSFORMED_INTO(变成)等

规则:
1. 只提取文本中明确出现或强烈暗示的信息,不要脑补。
2. 实体名称尽量使用原文最常见的叫法(例如:孙悟空 而非 美猴王,除非上下文只用了美猴王)。
3. 同一个实体在不同chunk中应尽量保持名称一致。
4. 输出**必须**是合法的 JSON,不要包含任何解释、注释、markdown。
5. 如果某段文本实在没有可提取内容,返回空数组。

重要:每个节点 **必须** 有 "id" 字段,且 "id" 是实体的主要名称(例如 "孙悟空"、"菩提祖师"、"斜月三星洞")。
如果有别名或中文名,可放在 properties 里的 "别称" 或 "中文名",但 "id" 必须是最常用的叫法。

输出格式(**严格**遵守,不要多一个字):
{{
  "nodes": [
    {{"id": "孙悟空", "type": "Person"}},
    {{"id": "菩提祖师", "type": "Person"}},
    {{"id": "斜月三星洞", "type": "Place"}}
  ],
  "relationships": [
    {{"source": "孙悟空", "target": "菩提祖师", "type": "DISCIPLE_OF"}},
    {{"source": "孙悟空", "target": "斜月三星洞", "type": "LEARNED_AT"}}
  ]
}}

- "id" 是必须的,且全局唯一(同一个实体不同 chunk 用相同 id)
- type 尽量用:Person, Place, Item, Group
- 关系 type 用英文大写 + 下划线,如 DISCIPLE_OF, LOCATED_IN, OWNS, BATTLE_WITH
只返回纯 JSON。
"""),
    # ==================== human 用户消息(动态输入槽) ====================
    # 作用:每次调用时,把当前待处理的文本块填入 {text} 占位符,传给大模型
    # 为什么分 system/human 两部分:这是大模型对话的标准格式
    #   system 是固定指令,human 是每次变化的输入内容
    #   分开写比混在一个字符串里,大模型的指令遵循度更高,效果更好
    ("human", "文本:\n{text}\n请提取。")
])

# 创建 JSON 输出解析器实例
# 和前面的提示词强绑定配合:提示词保证输出纯JSON,解析器负责把字符串转成Python可用的字典/列表
# 额外作用:自动校验JSON格式合法性,格式错误会直接报错,避免后续程序拿到脏数据崩溃
json_parser = JsonOutputParser()

# 用管道符 | 串联成完整的图谱提取链(LCEL 语法,数据从左到右流动)
# 完整执行流程:
# 1. 把传入的 text 填入提示词模板,生成完整的对话消息
# 2. 调用大模型,按提示词规则提取实体和关系,输出JSON字符串
# 3. 解析器把JSON字符串转成Python字典,供后续代码处理
# 整条链的稳定性,完全依赖前面提示词的约束强度
extract_chain = extraction_prompt | llm | json_parser


def extract_one_document(doc: Document) -> GraphDocument:
    """
    功能:处理单个文本块(Document),调用大模型提取出知识图谱,返回 GraphDocument 对象
    入参说明:
        doc : Document  单个文本块文档对象,包含 page_content(文本内容)和 metadata(元数据)
    返回值:GraphDocument 对象,包含提取到的节点列表、关系列表,以及原始来源文档
    预期数据类型:GraphDocument
    设计配合:严格依赖前面提示词输出的标准JSON格式,按固定字段解析节点和关系
    """

    # 调用提取链,传入当前文档的文本内容,得到大模型返回的解析后字典
    raw = extract_chain.invoke({"text": doc.page_content})

    # 校验返回结果是不是字典格式,不是的话说明提取失败,返回空的图谱文档
    # 防止大模型返回了奇怪的格式,导致后面代码报错
    if not isinstance(raw, dict):
        print("LLM 输出不是 dict,跳过该 chunk")
        return GraphDocument(nodes=[], relationships=[], source=doc)

    nodes = []  # 用来存储构建好的节点对象列表
    node_map = {}  # 字典:id → Node 对象,作用是去重,防止同一个实体被重复创建成多个节点
    # 设计配合:对应提示词里「id全局唯一」的规则,依靠id来去重,保证同一个实体只有一个节点

    # 遍历大模型返回的所有节点数据
    for n in raw.get("nodes", []):
        # 取出节点的 id,也就是实体名称
        nid = n.get("id")
        # 没有 id 的节点是无效的,直接跳过(对应提示词里「id必须有」的规则)
        if not nid:
            continue
        # 如果这个 id 已经存在于映射表里,说明重复了,跳过不重复创建
        if nid in node_map:
            continue
        # 取出节点类型,取不到就默认是 Entity(实体)
        node_type = n.get("type", "Entity")
        # 取出节点的附加属性,取不到就默认空字典
        props = n.get("properties", {})
        # 创建 LangChainNode 节点对象,结构和提示词要求的格式一一对应
        node = LangChainNode(id=nid, type=node_type, properties=props)
        # 存入映射表和结果列表
        node_map[nid] = node
        nodes.append(node)

    relationships = []  # 用来存储构建好的关系对象列表
    # 遍历大模型返回的所有关系数据
    for r in raw.get("relationships", []):
        # 取出关系的起点实体 id、终点实体 id、关系类型
        src_id = r.get("source")
        tgt_id = r.get("target")
        rel_type = r.get("type")
        # 三个字段缺一不可,缺了就跳过这条关系(对应提示词里的格式要求)
        if not (src_id and tgt_id and rel_type):
            continue
        # 如果起点或终点不在节点映射表里,说明节点不存在,这条关系是孤立无效的,跳过
        if src_id not in node_map or tgt_id not in node_map:
            continue  # 防止孤立关系

        # 取出关系的附加属性
        props = r.get("properties", {})
        # 创建 LangChainRelationship 关系对象,必须传入真实的节点对象,而不是字符串 id
        rel = LangChainRelationship(
            source=node_map[src_id],  # 起点节点对象
            target=node_map[tgt_id],  # 终点节点对象
            type=rel_type,            # 关系类型
            properties=props          # 关系属性
        )
        relationships.append(rel)

    # 组装成 GraphDocument 返回,同时带上原始来源文档,方便追溯
    return GraphDocument(
        nodes=nodes,
        relationships=relationships,
        source=doc
    )


def ingest_data(file_path: str):
    """
    功能:完整的数据入库流程:读取本地文本文件 → 切分成小块 → 逐块提取知识图谱 → 批量写入 Neo4j
    入参说明:
        file_path : str  本地文本文件的路径,比如 "西游记.txt"
    返回值:无(直接把数据写入数据库)
    """
    # 以只读模式打开文件,指定 utf-8 编码防止中文乱码,读完自动关闭文件
    with open(file_path, "r", encoding="utf-8") as f:
        text = f.read()

    # 创建递归字符文本切分器,把长文本切成小块
    # 为什么要切分:大模型能处理的 token 数有限,太长的文本一次处理不完,也容易提取不全
    text_splitter = RecursiveCharacterTextSplitter(
        chunk_size=1200,       # 每个文本块的最大字符数,1200 字左右适合提取知识图谱
        chunk_overlap=200,     # 相邻块之间重叠 200 字符,防止关键信息刚好在切割点被截断
        # 分隔符优先级:从左到右依次尝试,优先按大的语义单元切分
        # 先按双换行(段落)切,再按单换行(行)切,再按句号、逗号、空格切,最后硬切
        separators=["\n\n", "\n", "。", ",", " ", ""]
    )
    # 执行切分,得到字符串列表
    chunks = text_splitter.split_text(text)
    # 把每个字符串转成 LangChain 的 Document 对象,方便后续统一处理
    documents = [Document(page_content=c) for c in chunks]

    print(f"共切分为 {len(documents)} 个 chunk,开始提取图结构...")

    # 定义列表,存储所有提取成功的图文档
    graph_documents: List[GraphDocument] = []

    # 遍历每个文本块,逐个提取图谱
    # enumerate 的第二个参数 1 表示序号从 1 开始计数
    for i, doc in enumerate(documents, 1):
        # 每处理 10 个,或者处理到最后一个时,打印进度,让用户知道处理到哪了
        if i % 10 == 0 or i == len(documents):
            print(f"  处理中... {i}/{len(documents)}")
        # 调用提取函数,得到图文档
        gd = extract_one_document(doc)
        # 只有提取到了节点或者关系,才加入结果列表,空的就丢弃
        if gd.nodes or gd.relationships:
            graph_documents.append(gd)

    print(f"提取到 {len(graph_documents)} 个有效 GraphDocument")

    # 如果有有效数据,就批量写入 Neo4j 数据库
    if graph_documents:
        print("正在写入 Neo4j ...")
        # 调用 add_graph_documents 批量插入图谱数据
        graph.add_graph_documents(
            graph_documents,       # 要插入的图文档列表
            baseEntityLabel=True,  # 给所有节点都加上一个通用的 Entity 标签,方便全局查询
            include_source=False   # 不存储原始文本块(小说原文)节点,节省空间;如果需要追溯来源可以设为 True,上方的图中Document就是通过这里写True实现的
        )
        print("数据插入完成。")
    else:
        print("没有提取到任何节点/关系,跳过写入。")


def get_graph_rag_chain():
    """
    功能:构建并返回图数据库问答链(Graph RAG)
    工作原理:用户提问 → 大模型把自然语言转成 Cypher 查询语句 → 去 Neo4j 查数据 → 把查询结果和问题一起交给大模型 → 生成自然语言回答
    入参:无
    返回值:GraphCypherQAChain 链对象,调用 invoke 即可提问
    预期数据类型:GraphCypherQAChain
    """
    # 刷新图数据库的 schema(结构信息),让大模型知道数据库里有哪些节点类型、关系类型
    # 这样大模型才能生成正确的 Cypher 语句,这一步是图问答的前提
    graph.refresh_schema()
    # 从大模型创建图问答链
    chain = GraphCypherQAChain.from_llm(
        llm=llm,                    # 用来生成 Cypher 和最终回答的大模型
        graph=graph,                # 连接好的 Neo4j 图数据库
        verbose=True,               # 开启详细日志,会打印出生成的 Cypher 语句、查询结果,方便调试
        allow_dangerous_requests=True,  # 允许执行生成的 Cypher 语句
                                    # 注意:生产环境要谨慎,因为可能有注入风险;本地测试可以开启
        top_k=30,                   # 查询结果最多返回 30 条,防止结果太多撑爆大模型上下文
    )
    return chain


# 主程序入口:只有直接运行这个文件时才会执行下面的代码,被导入时不执行
if __name__ == "__main__":
    # ========== 第一步:构建知识图谱(只需执行一次,重复执行会重复插入数据) ==========
    # 读取西游记.txt,提取图谱并写入 Neo4j
    ingest_data("西游记.txt")

    # ========== 第二步:创建问答链,开始提问 ==========
    rag_chain = get_graph_rag_chain()
    # 定义要测试的问题列表
    questions = [
        "孙悟空的师父是谁?他在哪里学艺?",
        "花果山在哪里",
        "为什么孙悟空被压在五行山下?",
    ]
    # 遍历每个问题,依次提问并打印结果
    for q in questions:
        print(f"\n问题:{q}")
        # 调用问答链,传入查询问题
        response = rag_chain.invoke({"query": q})
        # 从返回结果中取出最终回答并打印
        print(f"回答:{response['result']}")

img

更多推荐