本文详细介绍了一个基于LangGraph的GraphRAG多智能体系统,该系统结合语义搜索和Cypher查询,能在Neo4j知识图谱上执行多步骤推理。文章通过食物助手案例展示了如何构建完整的工作流程,包括查询分析、研究计划生成、研究执行和答案生成,解决了Naive RAG在结构化关系建模、多步推理和可解释性方面的局限性。系统可处理复杂场景,如根据饮食限制发现食谱、生成购物清单等,并提供了完整的实现代码和部署指南。


这个系统能处理三种关键领域的复杂场景:

  • • 根据饮食限制发现食谱
  • • 为特定食谱生成购物清单
  • • 在超市内映射商店产品的位置

通过结合语义搜索(semantic search)进行模糊匹配和精确的 Cypher 查询进行结构化数据检索,这个助手能在 Neo4j 知识图谱上执行多步骤推理,为复杂的查询提供语境相关的回答。

Github 仓库地址: https://github.com/PulsarPioneers/meal-planner-graphrag

1. 引言 — Naive RAG vs Graph RAG

为了这个项目,Naive RAG 方法不够用,原因如下:

  • 缺乏结构化关系建模:Naive RAG 从非结构化文本中检索信息,无法表示和推理实体之间的明确关系。这限制了它处理需要理解信息之间连接的查询的效果。
  • 有限的多步推理:它仅在单一层面处理查询,难以回答需要遍历多个数据点或结合结构化语境中各种来源信息的复杂问题。
  • 缺乏可解释性:由于检索仅基于文本相似度,很难追踪答案是如何构建的,也难以提供透明的推理路径。

因此,我们实现了 Graph RAG 系统来解决这些问题。基于图的框架具有以下优势:

  • 明确的实体关系表示:实体及其连接直接在知识图谱中建模,使系统能够理解和利用数据的结构。
  • 多跳和语境推理:系统可以遍历图谱,执行多步骤推理,结合相关节点的信息来回答复杂查询。
  • 基于模式的检索:通过利用图谱的模式(schema),可以精确地制定查询,检索结果与底层数据模型一致。
  • 提升的可解释性:每个答案的推理路径都可以通过图谱追踪,提供清晰的解释和更高的透明度。

这些功能使 Graph RAG 系统成为需要结构化数据、复杂关系和可解释性的应用的更强大且可靠的解决方案。


项目概览

Agentic Graph RAG 图示

GraphRAG 工作流程步骤:

    1. 查询分析与路由:用户的请求首先被分析和分类,系统会根据查询将其路由到适当的工作流程节点。根据查询内容,系统可能进入下一步(生成研究计划)、提示用户提供更多信息,或者如果请求超出范围则立即回复。
    1. 研究计划生成:系统会根据用户查询的复杂性,构建一个详细的、逐步的研究计划,列出满足请求所需的具体行动。
    1. 研究图谱执行:针对研究计划中的每一步,系统会调用一个专门的子图。通过 LLM 生成 Cypher 查询,针对 Neo4j 知识图谱进行检索。使用语义搜索和结构化图查询的混合方法,检索相关节点和关系,确保结果的广度和精确度。
    1. 答案生成:利用检索到的图谱数据,系统通过 LLM 综合生成全面的回答,根据需要整合多个来源的信息。

在创建图谱时,可以根据需求选择不同的方法。我为了加快速度,自己用样本数据构建了图谱,但也可以使用各种工具。
下面我们来看一种使用 LLM 和 LangChain 构建 Neo4j 图谱的技术。

使用 LLM 构建 Neo4j 图谱

LLM 模型的选择会显著影响输出的准确性和细微差别。

import os
from langchain_openai import ChatOpenAI

os.environ["OPENAI_API_KEY"] = "your-openai-api-key"
llm = ChatOpenAI(temperature=0, model_name="gpt-4o")

LLMGraphTransformer 通过 LLM 解析和分类实体及其关系,将文本文档转换为结构化的图文档。
我们可以根据需求灵活定义需要提取的节点和关系类型。

例如,我们可能需要以下节点:

  • • Recipe
  • • Foodproduct

以及以下关系:

  • • CONTAINS

可以通过以下方式指定:

from langchain_experimental.graph_transformers import LLMGraphTransformer

llm_transformer_filtered = LLMGraphTransformer(
    llm=llm,
    allowed_nodes=["Recipe", "Foodproduct"],
    allowed_relationships=["CONTAINS"],
)

现在,我们可以传入示例文本并检查结果:

from langchain_core.documents import Document

text = """
我最喜欢的烹饪创作是让人无法抗拒的 Vegan Chocolate Cake Recipe。这个美味的甜点以其浓郁的可可风味和柔软湿润的口感而闻名。它完全是素食、无乳制品的,并且由于使用了特殊的无麸质面粉混合物,也是无麸质的。
要制作这个蛋糕,食谱包含以下食品及其相应数量:250克无麸质面粉混合物、80克高品质可可粉、200克砂糖和10克发酵粉。为了丰富口感和确保完美发酵,食谱还包含5克香草精。在液体成分中,需要240毫升杏仁奶和60毫升植物油。
这个食谱可以制作一个巧克力蛋糕,被视为类型为甜点的 Foodproduct。
"""
documents = [Document(page_content=text)]
graph_documents_filtered = await llm_transformer_filtered.aconvert_to_graph_documents(
    documents
)
print(f"Nodes:{graph_documents_filtered[0].nodes}")
print(f"Relationships:{graph_documents_filtered[0].relationships}")

输出结果如下:

Nodes:[Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), Node(id='Gluten-Free Flour Blend', type='Foodproduct', properties={}), Node(id='High-Quality Cocoa Powder', type='Foodproduct', properties={}), Node(id='Granulated Sugar', type='Foodproduct', properties={}), Node(id='Baking Powder', type='Foodproduct', properties={}), Node(id='Vanilla Extract', type='Foodproduct', properties={}), Node(id='Almond Milk', type='Foodproduct', properties={}), Node(id='Vegetable Oil', type='Foodproduct', properties={}), Node(id='Chocolate Cake', type='Foodproduct', properties={})]
Relationships:[Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Gluten-Free Flour Blend', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='High-Quality Cocoa Powder', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Granulated Sugar', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Baking Powder', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Vanilla Extract', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Almond Milk', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Vegetable Oil', type='Foodproduct', properties={}), type='CONTAINS', properties={}), Relationship(source=Node(id='Vegan Chocolate Cake Recipe', type='Recipe', properties={}), target=Node(id='Chocolate Cake', type='Foodproduct', properties={}), type='CONTAINS', properties={})]

最后,生成的图文档可以存储到 Neo4j 图数据库中,通过 Neo4jGraphadd_graph_documents 方法初始化:

import os
from langchain_neo4j import Neo4jGraph

os.environ["NEO4J_URI"] = "bolt://localhost:7687"
os.environ["NEO4J_USERNAME"] = "neo4j"
os.environ["NEO4J_PASSWORD"] = "password"

graph = Neo4jGraph(refresh_schema=False)
graph.add_graph_documents(graph_documents_filtered)

然后,我们可以直接从 Neo4j 控制台查询图谱内容:

MATCH p=(r:Recipe)-[:CONTAINS]->(fp:Foodproduct) RETURN p LIMIT 25;

添加节点嵌入

为了更好地理解和消除用户输入的歧义,我们可以在需要时通过语义搜索增强图谱搜索。下面是一个使用 OpenAI 嵌入的示例。

例如,如果用户问:
“给我一个素食巧克力蛋糕食谱的所有原料”

我们需要找到图谱中与查询语义最接近的 Recipe 节点。为此,我们为每个 Recipe 节点存储一个基于其 ID 计算的嵌入。

以下是如何在 Neo4j 中生成和存储嵌入:

import openai
from neo4j import GraphDatabase

driver = GraphDatabase.driver("bolt://localhost:7687", auth=("neo4j", "password"))

recipe_id = "Vegan Chocolate Cake Recipe"
recipe_embedding = openai.embeddings.create(model="text-embedding-3-small", input=recipe_id).data[0].embedding

with driver.session() as session:
# 创建嵌入字段
  session.run(
"MATCH (r:Recipe {id: $recipe_id}) SET r.embedding = $embedding",
      recipe_id=recipe_id,
      embedding=recipe_embedding
  )
# 创建向量索引
  session.run(
"CREATE VECTOR INDEX recipe_index IF NOT EXISTS FOR (r:Recipe) ON (r.embedding) OPTIONS {indexConfig: {`vector.dimensions`: 1536, `vector.similarity_function`: 'cosine'}}"
  )

之后,我们就可以执行语义搜索:

query = "a chocolate cake recipe that is vegan"
query_embedding = openai.embeddings.create(
    model="text-embedding-3-small",
input=query
).data[0].embedding

with driver.session() as session:
    result = session.run(
"""
        CALL db.index.vector.queryNodes('recipe_index', 1, $embedding)
        YIELD node, score
        RETURN node.id AS name, score
        ORDER BY score DESC
        """,
        embedding=query_embedding
    )
for record in result:
print(record["name"], "=>", record["score"])

输出:

Vegan Chocolate Cake Recipe => 0.9284169673919678

这只是一个简要概述,想了解更多技术细节,请查看 LangChain 文档,或者探索其他工具,如官方的 Neo4j LLM Knowledge Graph Builder。

正如我所说,我通过迭代引入样本数据创建了图谱。你可以在 Github 仓库中找到我使用的图谱数据转储!

这份完整版的大模型 AI 学习资料已经上传CSDN,朋友们如果需要可以微信扫描下方CSDN官方认证二维码免费领取【保证100%免费

在这里插入图片描述

设计工作流程

实现系统包括两个图谱:

  • 研究子图:负责生成多个 Cypher 查询,用于从 Neo4j 知识图谱中检索相关节点和关系。
  • 主图:包含主要工作流程,包括分析用户查询、生成完成任务所需的步骤,以及生成最终回答。

主图结构

LangGraph 图谱预览
LangGraph 的核心概念之一是状态(state)。每次图谱执行都会创建一个状态,在图谱节点执行时在节点之间传递,每个节点在执行后会用其返回值更新这个内部状态。

让我们从构建图谱状态开始。为此,我们定义了两个类:

Router:包含用户查询的分类结果,分为“more-info”、“valid”或“general”。

from typing importLiteral
from pydantic import BaseModel

classRouter(BaseModel):
"""Classify user query."""
    logic: str
type: Literal["more-info", "valid", "general"]

定义的图谱状态包括:

InputState:包含用户和智能体之间交换的消息列表。

from dataclasses import dataclass
from typing import Annotated
from langchain_core.messages import AnyMessage
from langgraph.graph import add_messages

@dataclass(kw_only=True)
classInputState:
"""
    表示包含消息列表的输入状态。

    属性:
        messages (list[AnyMessage]):与状态相关联的消息列表,通过 add_messages 函数处理。
    """
    messages: Annotated[list[AnyMessage], add_messages]

AgentState:包含 Router 对用户查询的分类、研究计划中要执行的步骤列表,以及智能体可以参考的检索到的图谱知识列表。

from dataclasses import dataclass, field
from typing import Annotated
from utils.utils import update_knowledge
from core.state_graph.states.main_graph.input_state import InputState
from core.state_graph.states.main_graph.router import Router
from core.state_graph.states.step import Step

@dataclass(kw_only=True)
classAgentState(InputState):
"""
    表示主状态图中智能体的状态。

    属性:
        router (Router):智能体的路由逻辑。
        steps (list[Step]):智能体执行的步骤序列。
        knowledge (list[dict]):智能体累积的知识,通过 update_knowledge 函数更新。
    """
    router: Router = field(default_factory=lambda: Router(type="general", logic=""))
    steps: list[Step] = field(default_factory=list)
    knowledge: Annotated[list[dict], update_knowledge] = field(default_factory=list)

步骤 1:分析和路由查询

analyze_and_route_query 函数返回并更新状态 AgentStaterouter 变量。route_query 函数根据之前的查询分类决定下一步。

具体来说,这一步会用一个 Router 对象更新状态,该对象的 type 变量包含以下值之一:“more-info”、“valid”或“general”。根据这些信息,工作流程将被路由到相应的节点(“create_research_plan”、“ask_for_more_info”或“respond_to_general_query”之一)。

asyncdefanalyze_and_route_query(state: AgentState, *, config: RunnableConfig) -> dict[str, Router]:
"""
    分析当前智能体状态并确定下一步的路由逻辑。

    参数:
        state (AgentState):智能体的当前状态,包括消息和上下文。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, Router]:包含更新后的路由对象的字典。
    """
    model = init_chat_model(
        name="analyze_and_route_query", **app_config["inference_model_params"]
    )
    messages = [{"role": "system", "content": ROUTER_SYSTEM_PROMPT}] + state.messages
print("---ANALYZE AND ROUTE QUERY---")
print(f"MESSAGES: {state.messages}")
    response = cast(
        Router, await model.with_structured_output(Router).ainvoke(messages)
    )
return {"router": response}

defroute_query(state: AgentState) -> Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
"""
    根据当前状态的路由类型确定智能体的下一步行动。

    参数:
        state (AgentState):智能体的当前状态,包括路由类型。

    返回:
        Literal["create_research_plan", "ask_for_more_info", "respond_to_general_query"]:
            状态图中要执行的下一个节点/行动。

    抛出:
        ValueError:如果路由类型未知。
    """
    _type = state.router.type
if _type == "valid":
return"create_research_plan"
elif _type == "more-info":
return"ask_for_more_info"
elif _type == "general":
return"respond_to_general_query"
else:
raise ValueError(f"Unknown router type {_type}")

对问题“推荐一些甜的食谱!”的输出示例:

{
"logic":"虽然提供了‘甜’的口味信息,但缺少其他强制性约束(饮食要求、用餐时间、食谱复杂性、餐点类型、烹饪时间和热量含量)。因此,需要更多信息才能推荐食谱。",
"type":"more-info"
}

请求被分类为“more-info”,因为它不包含提示中插入的所有强制性约束。

步骤 1.1:超出范围/需要更多信息

我们定义了 ask_for_more_inforespond_to_general_query 函数,它们通过调用 LLM 直接为用户生成回答:第一个函数在路由器确定需要更多用户信息时执行,第二个函数则为与主题无关的一般查询生成回答。在这种情况下,需要将生成的回答连接到消息列表中,更新状态中的 messages 变量。

asyncdefask_for_more_info(state: AgentState, *, config: RunnableConfig) -> dict[str, list[BaseMessage]]:
"""
    根据当前路由逻辑向用户请求更多信息。

    参数:
        state (AgentState):智能体的当前状态,包括路由逻辑和消息。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list[BaseMessage]]:包含请求更多信息的新消息的字典。
    """
    model = init_chat_model(
        name="ask_for_more_info", **app_config["inference_model_params"]
    )
    system_prompt = MORE_INFO_SYSTEM_PROMPT.format(logic=state.router.logic)
    messages = [{"role": "system", "content": system_prompt}] + state.messages
    response = await model.ainvoke(messages)
return {"messages": [response]}

asyncdefrespond_to_general_query(state: AgentState, *, config: RunnableConfig) -> dict[str, list[BaseMessage]]:
"""
    根据智能体的当前状态和路由逻辑,为一般用户查询生成回答。

    参数:
        state (AgentState):智能体的当前状态,包括路由逻辑和消息。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list[BaseMessage]]:包含生成的回答消息的字典。
    """
    model = init_chat_model(
        name="respond_to_general_query", **app_config["inference_model_params"]
    )
    system_prompt = GENERAL_SYSTEM_PROMPT.format(logic=state.router.logic)
print("---RESPONSE GENERATION---")
    messages = [{"role": "system", "content": system_prompt}] + state.messages
    response = await model.ainvoke(messages)
return {"messages": [response]}

对问题“慕尼黑的天气如何?”的输出示例:

{
"logic":"请求是关于慕尼黑当前天气的,与食谱、购物清单或超市产品位置用例无关。因此被分类为一般问题。",
"type":"general"
}
# ---RESPONSE GENERATION---
“我知道你想了解慕尼黑的天气,但我只能帮助处理食谱、食谱购物清单和超市中产品的位置。”

步骤 2:创建研究计划

如果查询分类返回“valid”,用户的请求与文档范围一致,工作流程将到达 create_research_plan 节点,该节点的函数会为与食物相关的查询创建一个逐步研究计划。

  • review_research_plan:检查并改进研究计划的质量和相关性。
  • reduce_research_plan:简化或压缩计划步骤,使其更高效。
  • create_research_plan:协调整个过程,生成计划、压缩计划、审查计划并返回最终步骤。
asyncdefreview_research_plan(plan: Plan) -> Plan:
"""
    审查研究计划以确保其质量和相关性。

    参数:
        plan (Plan):要审查的研究计划。

    返回:
        Plan:审查并可能修改后的研究计划。
    """
    formatted_plan = ""
for i, step inenumerate(plan["steps"]):
        formatted_plan += f"{i+1}. ({step['type']}): {step['question']}\n"

    model = init_chat_model(
        name="create_research_plan", **app_config["inference_model_params"]
    )
    system_prompt = REVIEW_RESEARCH_PLAN_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_structured_schema, plan=formatted_plan
    )

    reviewed_plan = cast(
        Plan, await model.with_structured_output(Plan).ainvoke(system_prompt)
    )
return reviewed_plan

asyncdefreduce_research_plan(plan: Plan) -> Plan:
"""
    通过简化或压缩步骤来减少研究计划。

    参数:
        plan (Plan):要减少的研究计划。

    返回:
        Plan:减少后的研究计划。
    """
    formatted_plan = ""
for i, step inenumerate(plan["steps"]):
        formatted_plan += f"{i+1}. ({step['type']}): {step['question']}\n"

    model = init_chat_model(
        name="reduce_research_plan", **app_config["inference_model_params"]
    )
    system_prompt = REDUCE_RESEARCH_PLAN_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_structured_schema, plan=formatted_plan
    )

    reduced_plan = cast(
        Plan, await model.with_structured_output(Plan).ainvoke(system_prompt)
    )
return reduced_plan

asyncdefcreate_research_plan(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[str] | str]:
"""
    根据智能体的当前知识和消息创建、减少和审查研究计划。

    参数:
        state (AgentState):智能体的当前状态,包括知识和消息。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list[str] | str]:包含审查计划的最终步骤和空知识列表的字典。
    """
    formatted_knowledge = "\n".join([item["content"] for item in state.knowledge])
    model = init_chat_model(
        name="create_research_plan", **app_config["inference_model_params"]
    )
    system_prompt = RESEARCH_PLAN_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_structured_schema, context=formatted_knowledge
    )
    messages = [{"role": "system", "content": system_prompt}] + state.messages
print("---PLAN GENERATION---")

# 生成计划
    plan = cast(Plan, await model.with_structured_output(Plan).ainvoke(messages))
print("Plan")
for i, step inenumerate(plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")

# 减少计划
    reduced_plan = cast(Plan, await reduce_research_plan(plan=plan))
print("Reduced Plan")
for i, step inenumerate(reduced_plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")

# 审查计划
    reviewed_plan = cast(Plan, await review_research_plan(plan=reduced_plan))

print("Reviewed Plan")
for i, step inenumerate(reviewed_plan["steps"]):
print(f"{i+1}. ({step['type']}): {step['question']}")

return {"steps": reviewed_plan["steps"], "knowledge": []}

对问题“推荐一些食谱。我是素食者,不知道早餐吃什么。热量要低于1000卡路里。没有其他偏好。”的输出示例:

{
"steps":
[
{"type":"semantic-search","question":"通过在 Diet 节点的 name 属性中搜索‘Vegetarian’来查找适合素食的食谱。"},
{"type":"semantic-search","question":"通过在 MealMoment 节点的 name 属性中搜索‘Breakfast’来查找适合早餐的食谱。"},
{"type":"query-search","question":"检索既是素食又在早餐时段提供的食谱,方法是取步骤1和步骤2结果的交集。过滤这些食谱,确保其包含的原料总热量低于1000卡路里。使用 CONTAINS 关系计算 FoodProduct 节点的总热量。限制50个。"}
]
}

在这个例子中,用户的请求需要三个步骤来检索信息。

步骤 3:进行研究

这个函数从研究计划中取第一个步骤并用它进行研究。研究过程中,函数调用 researcher_graph 子图,返回所有新收集的知识,我们将在下一节探讨。最后,我们通过移除刚执行的步骤来更新状态中的 steps 变量。

asyncdefconduct_research(state: AgentState) -> dict[str, Any]:
"""
    使用研究图执行研究步骤并更新智能体的知识。

    参数:
        state (AgentState):智能体的当前状态,包括步骤和知识。

    返回:
        dict[str, Any]:包含更新后的知识和剩余步骤的字典。
    """
    response = await research_graph.ainvoke(
        {"step": state.steps[0], "knowledge": state.knowledge}
    )
    knowledge = response["knowledge"]
    step = state.steps[0]
print(
f"\n{len(knowledge)} pieces of knowledge retrieved in total for the step: {step}."
    )
return {"knowledge": knowledge, "steps": state.steps[1:]}

步骤 4:构建研究子图

研究图示

如上图所示,图谱包括:

  • • 查询生成和执行步骤,或
  • • 语义搜索步骤

与主图一样,我们继续定义状态 QueryState(研究图中 execute_query 节点的私有状态)和 ResearcherState(研究图的状态)。

@dataclass(kw_only=True)
classQueryState:
"""研究图中管理研究查询的状态类。"""
    query: str

classStep(TypedDict):
"""单个研究步骤"""
    question: str
type: Literal["semantic_search", "query_search"]

@dataclass(kw_only=True)
classResearcherState:
"""研究图的状态。"""
    step: Step
    queries: list[str] = field(default_factory=list)
    knowledge: Annotated[list[dict], update_knowledge] = field(default_factory=list)
步骤 4.1:语义搜索

这一步骤在 Neo4j 图数据库上执行基于向量的语义搜索,根据相似性而非精确匹配来查找相关节点。

它由两个函数组成:

  • semantic_search:使用 LLM 确定搜索参数并协调语义搜索的执行。
  • execute_semantic_search:使用 OpenAI 嵌入和 Neo4j 的向量索引执行实际的向量相似性搜索。
defexecute_semantic_search(node_label: str, attribute_name: str, query: str):
"""在 Neo4j 向量索引上执行语义搜索。

    此函数使用 OpenAI 嵌入执行基于向量的相似性搜索,查找与提供的查询语义相似的 Neo4j 图数据库中的节点。它将查询转换为嵌入向量,并在相应的向量索引中搜索最相似的节点。

    参数:
        node_label (str):要搜索的节点类型标签(例如,‘Recipe’,‘FoodProduct’)。
        attribute_name (str):要在节点中搜索的属性(例如,‘name’,‘description’)。
        query (str):查找语义相似内容的搜索查询。

    返回:
        list:包含匹配节点的属性字典列表,按相似性得分排序(从高到低)。
    """
    index_name = f"{node_label.lower()}_{attribute_name}_index"
    top_k = 1
    query_embedding = (
        openai.embeddings.create(model=app_config["embedding_model"], input=query)
        .data[0]
        .embedding
    )

    nodes = (
f"node.name as name, node.{attribute_name} as {attribute_name}"
if attribute_name != "name"
elsef"node.{{attribute_name}} as name"
    )
    response = neo4j_graph.query(
f"""
        CALL db.index.vector.queryNodes('{index_name}', {top_k}, {query_embedding})
        YIELD node, score
        RETURN {nodes}
        ORDER BY score DESC"""
    )
print(
f"Semantic Search Tool invoked with parameters: node_label: '{node_label}', attribute_name: '{attribute_name}', query: '{query}'"
    )
print(f"Semantic Search response: {response}")
return response

asyncdefsemantic_search(state: ResearcherState, *, config: RunnableConfig):
"""在研究图中执行语义搜索以查找相关节点。

    此函数分析研究问题以确定最佳搜索参数,并在 Neo4j 图数据库上执行语义搜索。它使用 LLM 确定应搜索的节点类型和属性,然后执行基于向量的相似性搜索,查找可以帮助回答问题的语义相关内容。

    参数:
        state (ResearcherState):当前研究者状态,包含研究步骤问题和累积的知识。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list]:包含语义搜索结果的“knowledge”键的字典,格式化为知识项。
    """
classResponse(TypedDict):
        node_label: str
        attribute_name: str
        query: str

    model = init_chat_model(
        name="semantic_search", **app_config["inference_model_params"]
    )

    vector_indexes = neo4j_graph.query("SHOW VECTOR INDEXES YIELD name RETURN name;")
print(f"vector_indexes: {vector_indexes}")

    system_prompt = SEMANTIC_SEARCH_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_structured_schema,
        vector_indexes=str(vector_indexes)
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "human", "content": state.step["question"]},
    ]
    response = cast(
        Response, await model.with_structured_output(Response).ainvoke(messages)
    )
    sem_search_response = execute_semantic_search(
        node_label=response["node_label"],
        attribute_name=response["attribute_name"],
        query=response["query"],
    )

    search_names = [f"'{record['name']}'"for record in sem_search_response]
    joined_search_names = ", ".join(search_names)
    knowledge = {
"id": new_uuid(),
"content": f"在 {response['node_label']}.{response['attribute_name']} 上执行语义搜索,查找与‘{response['query']}’相似的值\n结果:{joined_search_names}",
    }

return {"knowledge": [knowledge]}

对生成步骤的输出示例:

[
{"type":"semantic_search","question":"通过在 Diet 节点的 name 属性中搜索‘Vegetarian’来查找适合素食的食谱。"},
{"type":"semantic_search","question":"通过在 MealMoment 节点的 name 属性中搜索‘Breakfast’来查找适合早餐的食谱。"}
]
# -- 新知识 --
Semantic Search Tool invoked with parameters: node_label: 'Diet', attribute_name: 'name', query: 'Vegetarian'
Semantic Search response: [{'name': 'Vegetarian'}]

Semantic Search Tool invoked with parameters: node_label: 'MealMoment', attribute_name: 'name', query: 'Breakfast'
Semantic Search response: [{'name': 'Breakfast'}]
步骤 4.2:生成查询

这一步骤根据研究计划中的问题(一个步骤)生成搜索查询。此函数使用 LLM 生成多样化的 Cypher 查询来帮助回答问题。它由三个函数组成:

  • generate_queries:主函数,生成初始查询并应用两种校正方法。
  • correct_query_by_llm:使用具有模式感知的语言模型校正 Cypher 查询。
  • correct_query_by_parser:使用基于解析器的查询校正器进行结构校正。
asyncdefcorrect_query_by_llm(query: str) -> str:
"""使用语言模型校正 Cypher 查询。

    此函数使用 LLM 根据 Neo4j 图谱模式审查和校正 Cypher 查询。它提供模式感知校正,确保查询格式正确并使用有效的关系和节点。

    参数:
        query (str):要校正的 Cypher 查询。

    返回:
        str:校正后的 Cypher 查询。
    """
    model = init_chat_model(
        name="correct_query_by_llm", **app_config["inference_model_params"]
    )
    system_prompt = FIX_QUERY_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_structured_schema
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "human", "content": query},
    ]
    response = await model.ainvoke(messages)
return response.content

defcorrect_query_by_parser(query: str) -> str:
"""使用基于解析器的校正器校正 Cypher 查询。

    此函数使用 CypherQueryCorrector 基于图谱模式解析和校正 Cypher 查询。它从文本中提取 Cypher 查询并应用结构校正。

    参数:
        query (str):包含要校正的 Cypher 查询的文本。

    返回:
        str:校正后的 Cypher 查询。
    """
    corrector_schema = [
        Schema(el["start"], el["type"], el["end"])
for el in neo4j_graph.get_structured_schema.get("relationships", [])
    ]
    cypher_query_corrector = CypherQueryCorrector(corrector_schema)

    extracted_query = extract_cypher(text=query)
    corrected_query = cypher_query_corrector(extracted_query)
return corrected_query

asyncdefgenerate_queries(
    state: ResearcherState, *, config: RunnableConfig
) -> dict[str, list[str]]:
"""为研究步骤生成和校正 Cypher 查询。

    此函数根据研究问题和现有知识上下文生成多个 Cypher 查询。它使用 LLM 生成初始查询,然后应用基于 LLM 和基于解析器的校正,确保查询对 Neo4j 图数据库有效且格式正确。

    参数:
        state (ResearcherState):当前研究者状态,包含研究步骤问题和累积的知识。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list[str]]:包含校正后 Cypher 查询列表的“queries”键的字典。
    """

classResponse(TypedDict):
        queries: list[str]

print("---GENERATE QUERIES---")
    formatted_knowledge = "\n\n".join(
        [f"{i+1}. {item['content']}"for i, item inenumerate(state.knowledge)]
    )
    model = init_chat_model(
        name="generate_queries", **app_config["inference_model_params"]
    )
    system_prompt = GENERATE_QUERIES_SYSTEM_PROMPT.format(
        schema=neo4j_graph.get_schema, context=formatted_knowledge
    )
    messages = [
        {"role": "system", "content": system_prompt},
        {"role": "human", "content": state.step["question"]},
    ]
    response = cast(
        Response, await model.with_structured_output(Response).ainvoke(messages)
    )
    response["queries"] = [
await correct_query_by_llm(query=q) for q in response["queries"]
    ]
    response["queries"] = [
        correct_query_by_parser(query=q) for q in response["queries"]
    ]

print(f"Queries: {response['queries']}")
return {"queries": response["queries"]}

对问题(在语义搜索查询执行后)的输出示例:

“推荐一些食谱。我是素食者,不知道早餐吃什么。热量要低于1000卡路里。没有其他偏好。”

MATCH (r:Recipe)-[:FITS_DIET]->(:Diet {name: 'Vegetarian'}),
      (r)-[:SERVED_DURING]->(:MealMoment {name: 'Breakfast'}),
      (r)-[c:CONTAINS]->(fp:FoodProduct)
WITH r, SUM(c.grams * (fp.calories / 100.0)) AS total_calories
WHERE total_calories < 1000
RETURN r.name AS recipe_name, total_calories
LIMIT 5

执行后的输出:

# -- 新知识 --
╒════════════════════════════╤══════════════════╕
│recipe_name                 │total_calories
╞════════════════════════════╪══════════════════╡
│"Mascarpone Dessert"        │945.8000000000001
├────────────────────────────┼──────────────────┤
│"Buffalo Mozzarella Salad"  │668.88
├────────────────────────────┼──────────────────┤
│"Raisin and Almond Snack"   │374.69999999999993
├────────────────────────────┼──────────────────┤
│"Mozzarella and Basil Salad"│528.4
└────────────────────────────┴──────────────────┘
步骤 4.3:构建子图
defbuild_research_graph():
    builder = StateGraph(ResearcherState)
    builder.add_node(generate_queries)
    builder.add_node(execute_query)
    builder.add_node(semantic_search)

    builder.add_conditional_edges(
        START,
        route_step,
        {"generate_queries": "generate_queries", "semantic_search": "semantic_search"},
    )
    builder.add_conditional_edges(
"generate_queries",
        query_in_parallel,  # type: ignore
        path_map=["execute_query"],
    )
    builder.add_edge("execute_query", END)
    builder.add_edge("semantic_search", END)

return builder.compile()

research_graph = build_research_graph()

步骤 5:检查完成

使用条件边(conditional_edge),我们构建了一个循环,其结束条件由 check_finished 函数的返回值决定。此函数检查由 create_research_plan 节点创建的步骤列表中是否还有步骤需要处理。一旦所有步骤完成,流程将进入 respond 节点。

defcheck_finished(state: AgentState) -> Literal["respond", "conduct_research"]:
"""
    根据已执行的步骤确定智能体是应该回答还是继续研究。

    参数:
        state (AgentState):智能体的当前状态,包括已执行的步骤。

    返回:
        Literal["respond", "conduct_research"]:
            如果还有步骤,则为“conduct_research”,否则为“respond”。
    """
iflen(state.steps or []) > 0:
return"conduct_research"
else:
return"respond"

步骤 6:回答

根据进行的研究生成对用户查询的最终回答。此函数使用对话历史和研究者智能体检索的文档,制定全面的回答。

asyncdefrespond(
    state: AgentState, *, config: RunnableConfig
) -> dict[str, list[BaseMessage]]:
"""
    根据智能体的累积知识和消息为用户生成最终回答。

    参数:
        state (AgentState):智能体的当前状态,包括知识和消息。
        config (RunnableConfig):运行配置。

    返回:
        dict[str, list[BaseMessage]]:包含生成的回答消息的字典。
    """
print("--- RESPONSE GENERATION STEP ---")
    model = init_chat_model(name="respond", **app_config["inference_model_params"])
    formatted_knowledge = "\n\n".join([item["content"] for item in state.knowledge])
    prompt = RESPONSE_SYSTEM_PROMPT.format(context=formatted_knowledge)
    messages = [{"role": "system", "content": prompt}] + state.messages
    response = await model.ainvoke(messages)

return {"messages": [response]}

步骤 7:构建主图

defbuild_main_graph():
    builder = StateGraph(AgentState, input=InputState)
    builder.add_node(analyze_and_route_query)
    builder.add_node(ask_for_more_info)
    builder.add_node(respond_to_general_query)
    builder.add_node(create_research_plan)
    builder.add_node(conduct_research)
    builder.add_node("respond", respond)

    builder.add_edge("create_research_plan", "conduct_research")
    builder.add_edge(START, "analyze_and_route_query")
    builder.add_conditional_edges("analyze_and_route_query", route_query)
    builder.add_conditional_edges("conduct_research", check_finished)
    builder.add_edge("respond", END)

return builder.compile()

结果

我们可以通过以下问题测试其性能:

“给我‘pasta alla carbonara’食谱的购物清单。”

控制台日志
从控制台日志中可以看到,主图创建了以下审查计划:

    1. semantic_search:通过在 Recipe 节点的‘name’属性上进行语义搜索,查找名称类似于‘pasta alla carbonara’的 Recipe 节点。
    1. query_search:检索通过步骤1识别的 Recipe 节点,通过‘CONTAINS’和‘IS_INSTANCE_OF’关系连接的 StoreProduct 节点,并列出其详细信息,如名称、品牌、价格和数量,形成购物清单。限制50个。

执行第一步后,我们得知与‘pasta alla carbonara’对应的 Recipe 节点的准确名称是‘Classic Carbonara’。

app_service-1  | Semantic Search Tool invoked with parameters: node_label: 'Recipe', attribute_name: 'name', query: 'pasta alla carbonara'
app_service-1  | Semantic Search response: [{'name': 'Classic Carbonara'}]

然后执行第二步,使用以下 Cypher 查询:

MATCH (r:Recipe {name: 'Classic Carbonara'})-[:CONTAINS]->(fp:FoodProduct)<-[:IS_INSTANCE_OF]-(sp:StoreProduct)
RETURN sp.name, sp.brand, sp.price, sp.quantity, sp.quantity_unit
LIMIT 50

然后我们得到最终回答。

外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传

实时演示 — 使用 Chainlit 制作的 UI

通过检查图谱内容,我们看到完整的结果是正确的。


结论

Graph RAG:技术挑战与考虑

尽管性能有所提升,实施 Graph RAG 并非没有挑战:

  • 延迟:智能体交互的复杂性增加通常会导致响应时间更长。在速度和准确性之间找到平衡是一个关键挑战。
  • 评估与可观察性:随着 Agentic RAG 系统变得更加复杂,持续的评估和可观察性变得必要。

总之,Graph RAG 在 AI 领域标志着重大突破。通过将大语言模型的能力与自主推理和信息检索相结合,Graph RAG 引入了新的智能和灵活性标准。随着 AI 的持续发展,Graph RAG 将在各行各业中扮演重要角色,改变我们使用技术的方式。

大模型未来如何发展?普通人能从中受益吗?

在科技日新月异的今天,大模型已经展现出了令人瞩目的能力,从编写代码到医疗诊断,再到自动驾驶,它们的应用领域日益广泛。那么,未来大模型将如何发展?普通人又能从中获得哪些益处呢?

通用人工智能(AGI)的曙光:未来,我们可能会见证通用人工智能(AGI)的出现,这是一种能够像人类一样思考的超级模型。它们有可能帮助人类解决气候变化、癌症等全球性难题。这样的发展将极大地推动科技进步,改善人类生活。

个人专属大模型的崛起:想象一下,未来的某一天,每个人的手机里都可能拥有一个私人AI助手。这个助手了解你的喜好,记得你的日程,甚至能模仿你的语气写邮件、回微信。这样的个性化服务将使我们的生活变得更加便捷。

脑机接口与大模型的融合:脑机接口技术的发展,使得大模型与人类的思维直接连接成为可能。未来,你可能只需戴上头盔,心中想到写一篇工作总结”,大模型就能将文字直接投影到屏幕上,实现真正的心想事成。

大模型的多领域应用:大模型就像一个超级智能的多面手,在各个领域都展现出了巨大的潜力和价值。随着技术的不断发展,相信未来大模型还会给我们带来更多的惊喜。赶紧把这篇文章分享给身边的朋友,一起感受大模型的魅力吧!

那么,如何学习AI大模型?

在一线互联网企业工作十余年里,我指导过不少同行后辈,帮助他们得到了学习和成长。我意识到有很多经验和知识值得分享给大家,也可以通过我们的能力和经验解答大家在人工智能学习中的很多困惑。因此,我坚持整理和分享各种AI大模型资料,包括AI大模型入门学习思维导图、精品AI大模型学习书籍手册、视频教程、实战学习等录播视频。在这里插入图片描述

学习阶段包括:

1.大模型系统设计
从大模型系统设计入手,讲解大模型的主要方法。包括模型架构、训练过程、优化策略等,让读者对大模型有一个全面的认识。

在这里插入图片描述

2.大模型提示词工程
通过大模型提示词工程,从Prompts角度入手,更好发挥模型的作用。包括提示词的构造、优化、应用等,让读者学会如何更好地利用大模型。

在这里插入图片描述

3.大模型平台应用开发
借助阿里云PAI平台,构建电商领域虚拟试衣系统。从需求分析、方案设计、到具体实现,详细讲解如何利用大模型构建实际应用。

在这里插入图片描述

4.大模型知识库应用开发
以LangChain框架为例,构建物流行业咨询智能问答系统。包括知识库的构建、问答系统的设计、到实际应用,让读者了解如何利用大模型构建智能问答系统。
在这里插入图片描述

5.大模型微调开发
借助以大健康、新零售、新媒体领域,构建适合当前领域的大模型。包括微调的方法、技巧、到实际应用,让读者学会如何针对特定领域进行大模型的微调。
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述

6.SD多模态大模型
以SD多模态大模型为主,搭建文生图小程序案例。从模型选择、到小程序的设计、到实际应用,让读者了解如何利用大模型构建多模态应用。
在这里插入图片描述

7.大模型平台应用与开发
通过星火大模型、文心大模型等成熟大模型,构建大模型行业应用。包括行业需求分析、方案设计、到实际应用,让读者了解如何利用大模型构建行业应用。

在这里插入图片描述
在这里插入图片描述

学成之后的收获👈

全栈工程实现能力:通过学习,你将掌握从前端到后端,从产品经理到设计,再到数据分析等一系列技能,实现全方位的技术提升。

解决实际项目需求:在大数据时代,企业和机构面临海量数据处理的需求。掌握大模型应用开发技能,将使你能够更准确地分析数据,更有效地做出决策,更好地应对各种实际项目挑战。

AI应用开发实战技能:你将学习如何基于大模型和企业数据开发AI应用,包括理论掌握、GPU算力运用、硬件知识、LangChain开发框架应用,以及项目实战经验。此外,你还将学会如何进行Fine-tuning垂直训练大模型,包括数据准备、数据蒸馏和大模型部署等一站式技能。

提升编码能力:大模型应用开发需要掌握机器学习算法、深度学习框架等技术,这些技术的掌握将提升你的编码能力和分析能力,使你能够编写更高质量的代码。

学习资源📚

  1. AI大模型学习路线图:为你提供清晰的学习路径,助你系统地掌握AI大模型知识。
  2. 100套AI大模型商业化落地方案:学习如何将AI大模型技术应用于实际商业场景,实现技术的商业化价值。
  3. 100集大模型视频教程:通过视频教程,你将更直观地学习大模型的技术细节和应用方法。
  4. 200本大模型PDF书籍:丰富的书籍资源,供你深入阅读和研究,拓宽你的知识视野。
  5. LLM面试题合集:准备面试,了解大模型领域的常见问题,提升你的面试通过率。
  6. AI产品经理资源合集:为你提供AI产品经理的实用资源,帮助你更好地管理和推广AI产品。

👉获取方式: 😝有需要的小伙伴,可以保存图片到wx扫描二v码免费领取【保证100%免费

在这里插入图片描述

Logo

更多推荐