【大模型+知识图谱+工业智能体技术架构】~系列文章03:基于LangGraph的工业智能体开发实践!!!
文章摘要: 本文深入探讨了LangGraph框架在工业智能体开发中的应用。相比传统LangChain的线性执行模式,LangGraph通过图结构实现了更复杂的业务流程编排,支持分支判断、循环迭代和并行执行等关键能力。文章详细解析了LangGraph的核心概念,包括状态管理(State)、节点(Node)、边(Edge)和检查点(Checkpointer),并提供了工业级Agent的状态设计规范。特
基于LangGraph的工业智能体开发实践
🤖 本篇深入探讨LangGraph框架在工业智能体开发中的应用,带你构建灵活、强大的工作流编排系统。
📖 引言:从Chain到Graph的范式升级
LangChain作为大模型应用开发框架,通过Chain的概念实现了线性流程编排。然而,工业场景的智能决策往往需要复杂的分支判断、循环迭代、并行执行,这些能力超出了Chain的范畴。
💡 LangGraph的核心优势
LangGraph应运而生,它将Agent建模为有向图(Graph),通过节点(Node)和边(Edge)的定义,实现了更灵活的工作流编排。
传统Chain:
❌ 只能线性执行
❌ 缺乏循环和分支能力
❌ 状态管理有限
❌ 难以处理复杂场景
LangGraph:
✅ 支持复杂的图结构
✅ 强大的状态管理
✅ 灵活的分支和循环
✅ 适合工业复杂决策场景
🏗️ 一、LangGraph核心概念解析
1.1 🔄 从Chain到Graph
定义对比
| 特性 | LangChain Chain | LangGraph Graph |
|---|---|---|
| 执行模式 | 线性执行 | 图结构执行 |
| 分支能力 | ❌ 不支持 | ✅ 支持 |
| 循环能力 | ❌ 不支持 | ✅ 支持 |
| 状态管理 | 基础 | 强大 |
| 调试能力 | 中等 | 优秀 |
| 工业适用性 | ⭐⭐⭐ | ⭐⭐⭐⭐⭐ |
代码对比
LangChain Chain(线性):
# 传统Chain:线性执行
chain = (
prompt
| llm
| parser
)
result = chain.invoke({"query": "设备状态如何?"})
LangGraph Graph(图结构):
# LangGraph:图结构执行
graph = StateGraph(AgentState)
graph.add_node("understand", understand_node)
graph.add_node("retrieve", retrieve_node)
graph.add_node("reason", reason_node)
graph.add_edge("understand", "retrieve")
graph.add_conditional_edges(
"retrieve",
should_continue,
{"continue": "reason", "end": END}
)
1.2 🧩 核心组件详解
State(状态)
定义:State是Agent在执行过程中传递的数据结构,包含输入、中间结果、输出等信息。
规则:
✅ 必须使用TypedDict定义
✅ 使用Annotated标注reducer
✅ 支持嵌套结构
✅ 可选字段使用Optional
Node(节点)
定义:Node是图中的处理单元,执行具体的业务逻辑。
使用场景:
- 意图理解
- 数据检索
- 推理决策
- 工具调用
- 答案生成
Edge(边)
定义:Edge连接不同的节点,定义执行流程。
类型:
🔗 普通边:顺序执行(add_edge)
🔀 条件边:根据条件分支(add_conditional_edges)
🔄 循环边:实现循环执行
Conditional Edge(条件边)
功能:根据当前状态决定下一个执行的节点。
示例:
def should_retrieve(state):
if state["confidence"] < 0.7:
return "retrieve" # 置信度低,重新检索
else:
return END # 置信度高,结束
Checkpointer(检查点)
定义:Checkpointer负责保存Agent的历史状态,支持记忆和恢复。
类型:
MemorySaver:内存存储(开发测试)PostgresSaver:PostgreSQL存储(生产环境)
🎯 二、工业Agent的状态设计
2.1 📋 状态类型定义
from typing import TypedDict, List, Dict, Optional, Annotated
from langgraph.graph import add_messages
class IndustrialAgentState(TypedDict):
"""
工业智能体状态定义
该状态在Agent的各个节点之间传递,保存所有中间结果
"""
# 👤 用户输入
user_query: str # 用户原始查询
query_type: str # 查询类型(问答/诊断/调度)
# 🌐 中间处理结果
intent: Optional[Dict] # 意图识别结果
context: Optional[Dict] # 上下文信息(设备、参数等)
knowledge: Optional[Dict] # 知识图谱查询结果
sensor_data: Optional[Dict] # 传感器数据
historical_data: Optional[List] # 历史数据
# 🔧 工具执行
tool_calls: List[Dict] # 工具调用记录
tool_results: List[Dict] # 工具执行结果
# 🤔 推理过程
reasoning_steps: List[str] # 推理步骤记录
confidence: float # 置信度
# ✅ 最终输出
final_answer: Optional[str] # 最终答案
action_plan: Optional[List[Dict]] # 行动计划
visualizations: Optional[Dict] # 可视化数据
# 💬 对话历史(使用add_messages reducer)
messages: Annotated[List, add_messages]
# 📋 元信息
session_id: str # 会话ID
timestamp: str # 时间戳
2.2 🔄 状态更新规则
from langgraph.graph import StateGraph, END
# 消息累加器(保留最近N条)
MAX_MESSAGES = 20
def message_reducer(old_messages, new_messages):
"""
消息累加器:保留最近N条消息
参数:
old_messages: 历史消息列表
new_messages: 新消息列表
返回:
List: 合并后的消息列表(最多N条)
"""
combined = old_messages + new_messages
return combined[-MAX_MESSAGES:]
# 状态聚合器
class StateUpdater:
"""状态更新工具类"""
@staticmethod
def update_context(state: IndustrialAgentState, new_context: Dict) -> IndustrialAgentState:
"""
更新上下文信息
使用规则:
- 合并新旧上下文
- 新上下文覆盖旧值
"""
if state["context"] is None:
state["context"] = {}
state["context"].update(new_context)
return state
@staticmethod
def add_reasoning_step(state: IndustrialAgentState, step: str) -> IndustrialAgentState:
"""
添加推理步骤
使用规则:
- 追加到推理步骤列表
- 记录思考过程
"""
if state["reasoning_steps"] is None:
state["reasoning_steps"] = []
state["reasoning_steps"].append(step)
return state
@staticmethod
def calculate_confidence(state: IndustrialAgentState) -> float:
"""
计算综合置信度
计算规则:
- 工具成功率:40%权重
- 知识匹配度:30%权重
- 基础分:30%权重
"""
confidence = 0.0
if state["tool_results"]:
success_rate = sum(1 for r in state["tool_results"] if r["success"]) / len(state["tool_results"])
confidence += success_rate * 0.4
if state["knowledge"]:
confidence += 0.3 # 知识图谱有匹配
confidence += 0.3 # 基础分
return min(confidence, 1.0)
🔧 三、节点定义与实现
3.1 🧠 意图理解节点
from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import JsonOutputParser
# 初始化大模型
llm = ChatOpenAI(
model="doubao-seed-1-6-251015",
temperature=0.1,
max_tokens=1000
)
# 意图理解Prompt
INTENT_PROMPT = ChatPromptTemplate.from_messages([
("system", """你是一个工业场景意图识别专家。
请从用户查询中识别以下信息:
1. query_type: 查询类型(可选值):
- "qa": 知识问答
- "diagnosis": 故障诊断
- "status": 设备状态查询
- "schedule": 生产调度
- "analysis": 数据分析
2. entities: 提取的实体(设备ID、参数名称、故障代码等)
3. urgency: 紧急程度(low/medium/high)
输出JSON格式:
{{
"query_type": "...",
"entities": {{"device_id": "...", "parameter": "..."}},
"urgency": "...",
"requires_data": true/false
}}"""),
("user", "用户查询:{query}")
])
def intent_understanding_node(state: IndustrialAgentState) -> IndustrialAgentState:
"""
意图理解节点
功能:
- 使用LLM识别用户查询意图
- 提取实体信息
- 判断紧急程度
输入:
state.user_query: 用户查询文本
输出:
state.intent: 意图识别结果
state.query_type: 查询类型
state.context: 提取的实体
state.reasoning_steps: 推理步骤
"""
# 构建Prompt
parser = JsonOutputParser()
chain = INTENT_PROMPT | llm | parser
try:
intent_result = chain.invoke({"query": state["user_query"]})
# 更新状态
state["intent"] = intent_result
state["query_type"] = intent_result.get("query_type", "unknown")
# 提取实体到上下文
if intent_result.get("entities"):
if state["context"] is None:
state["context"] = {}
state["context"].update(intent_result["entities"])
# 记录推理步骤
state["reasoning_steps"].append(
f"识别查询类型:{state['query_type']},"
f"提取实体:{intent_result.get('entities', {})}"
)
print(f"[意图理解] {state['query_type']}")
except Exception as e:
state["intent"] = {"error": str(e)}
state["reasoning_steps"].append(f"意图理解失败:{e}")
return state
3.2 🔍 数据检索节点
from neo4j import GraphDatabase
import requests
class DataRetriever:
"""数据检索工具类"""
def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
"""
初始化数据检索器
参数:
neo4j_uri: Neo4j数据库地址
neo4j_user: 用户名
neo4j_password: 密码
"""
self.neo4j_driver = GraphDatabase.driver(
neo4j_uri,
auth=(neo4j_user, neo4j_password)
)
def retrieve_knowledge(self, device_id: str, query_type: str) -> Dict:
"""
从知识图谱检索知识
参数:
device_id: 设备ID
query_type: 查询类型
返回:
Dict: 检索到的知识
"""
with self.neo4j_driver.session() as session:
if query_type == "diagnosis":
query = """
MATCH (d:Device {id: $device_id})-[:HAS_FAULT]->(f:Fault)
OPTIONAL MATCH (f)-[:CAUSED_BY]->(cause:Fault)
RETURN f.name as fault, f.code as code,
collect(DISTINCT cause.name) as possible_causes
"""
else:
query = """
MATCH (d:Device {id: $device_id})
RETURN d.name as name, d.status as status,
d.last_maintenance as last_maint
"""
result = session.run(query, device_id=device_id)
records = [record.data() for record in result]
return {"knowledge": records}
def retrieve_sensor_data(self, device_id: str, hours: int = 24) -> Dict:
"""
检索传感器数据
参数:
device_id: 设备ID
hours: 查询最近多少小时的数据
返回:
Dict: 传感器数据
"""
# 实际应用中调用IoT平台API
# url = f"http://iot-platform/api/devices/{device_id}/data"
# response = requests.get(url, params={"hours": hours})
# return response.json()
# 模拟数据
return {
"device_id": device_id,
"timestamp": "2024-01-15T10:00:00Z",
"vibration": 5.2,
"temperature": 75.3,
"pressure": 2.1
}
def data_retrieval_node(state: IndustrialAgentState) -> IndustrialAgentState:
"""
数据检索节点
功能:
- 根据意图检索知识图谱
- 获取实时传感器数据
- 更新状态
"""
retriever = DataRetriever(
neo4j_uri="bolt://localhost:7687",
neo4j_user="neo4j",
neo4j_password="password"
)
# 根据意图决定检索策略
if state["query_type"] in ["diagnosis", "status"]:
# 知识图谱检索
device_id = state["context"].get("device_id")
if device_id:
knowledge_result = retriever.retrieve_knowledge(device_id, state["query_type"])
state["knowledge"] = knowledge_result
state["reasoning_steps"].append(f"从知识图谱检索设备{device_id}的信息")
# 检索实时传感器数据
if state["query_type"] in ["diagnosis", "status", "analysis"]:
device_id = state["context"].get("device_id")
if device_id:
sensor_data = retriever.retrieve_sensor_data(device_id)
state["sensor_data"] = sensor_data
state["reasoning_steps"].append("检索实时传感器数据")
return state
🏗️ 四、构建完整Agent
4.1 📊 图结构定义
from langgraph.graph import StateGraph, END
def build_industrial_agent():
"""
构建工业智能体
功能:
- 创建状态图
- 添加处理节点
- 定义边和条件分支
- 编译Agent
返回:
编译后的Agent实例
"""
# 1. 创建状态图
workflow = StateGraph(IndustrialAgentState)
# 2. 添加节点
workflow.add_node("intent", intent_understanding_node)
workflow.add_node("retrieve", data_retrieval_node)
workflow.add_node("reason", reasoning_node)
workflow.add_node("tools", tool_execution_node)
# 3. 定义入口
workflow.set_entry_point("intent")
# 4. 添加条件分支
workflow.add_conditional_edges(
"intent",
should_retrieve,
{
"retrieve": "retrieve",
"reason": "reason"
}
)
# 5. 添加顺序边
workflow.add_edge("retrieve", "reason")
workflow.add_edge("reason", "tools")
# 6. 添加循环分支
workflow.add_conditional_edges(
"tools",
has_confident_answer,
{
"retrieve": "retrieve", # 重新检索
END: END # 结束
}
)
# 7. 编译Agent(添加记忆)
from langgraph.checkpoint.memory import MemorySaver
checkpointer = MemorySaver()
agent = workflow.compile(checkpointer=checkpointer)
return agent
4.2 🚀 Agent执行示例
# 初始化Agent
agent = build_industrial_agent()
# 执行查询
initial_state: IndustrialAgentState = {
"user_query": "3号产线电机振动异常,请诊断故障原因",
"query_type": "",
"intent": None,
"context": {},
"knowledge": None,
"sensor_data": None,
"tool_calls": [],
"tool_results": [],
"reasoning_steps": [],
"confidence": 0.0,
"final_answer": None,
"action_plan": None,
"visualizations": None,
"messages": [],
"session_id": "session-001",
"timestamp": "2024-01-15T10:00:00Z"
}
# 执行Agent
config = {"configurable": {"thread_id": "session-001"}}
result = agent.invoke(initial_state, config)
# 输出结果
print("=== 推理过程 ===")
for step in result["reasoning_steps"]:
print(f" - {step}")
print("\n=== 诊断结论 ===")
print(result["final_answer"])
print(f"\n置信度: {result['confidence']:.2f}")
📊 五、最佳实践
5.1 ✅ 容错设计
def safe_retrieve_node(state: IndustrialAgentState) -> IndustrialAgentState:
"""
带容错的数据检索节点
容错策略:
- 捕获异常不中断流程
- 降低置信度
- 记录错误信息
"""
try:
return data_retrieval_node(state)
except Exception as e:
# 记录错误但不中断流程
state["reasoning_steps"].append(f"检索失败,使用默认处理:{e}")
state["confidence"] *= 0.5 # 降低置信度
return state
5.2 ⏱️ 超时控制
import signal
from contextlib import contextmanager
@contextmanager
def timeout(seconds):
"""
超时上下文管理器
使用场景:
- LLM调用超时
- 外部API调用超时
- 数据库查询超时
参数:
seconds: 超时时间(秒)
"""
def timeout_handler(signum, frame):
raise TimeoutError(f"操作超时({seconds}秒)")
signal.signal(signal.SIGALRM, timeout_handler)
signal.alarm(seconds)
try:
yield
finally:
signal.alarm(0)
def llm_with_timeout(state: IndustrialAgentState) -> IndustrialAgentState:
"""带超时的LLM调用"""
try:
with timeout(30): # 30秒超时
return reasoning_node(state)
except TimeoutError:
state["final_answer"] = "推理超时,请重试"
state["confidence"] = 0.3
return state
5.3 🚀 性能优化
🔧 节点并行化:
- 使用concurrency参数
- 并行执行独立节点
💾 缓存机制:
- 缓存知识图谱查询结果
- 缓存LLM响应
📦 批处理:
- 批量处理数据
- 减少数据库往返
📝 六、总结
6.1 本篇回顾
本篇介绍了基于LangGraph的工业智能体开发:
🏗️ LangGraph核心概念
↓
🎯 状态设计
↓
🔧 节点实现
↓
📊 Agent构建
↓
✅ 最佳实践
6.2 技术要点
✅ 使用StateGraph定义Agent结构
✅ 实现条件分支和循环
✅ 集成Checkpointer支持记忆
✅ 添加容错和超时控制
✅ 优化性能和并发
🚀 下篇预告
《RAG技术在工业问答系统中的应用》
敬请期待!🎉
📚 参考资源
感谢阅读!如有问题欢迎在评论区交流讨论 💬
版权归作者所有,未经许可请勿抄袭,套用,商用(或其它具有利益性行为)。
⭐ 如果觉得有帮助,请点赞、收藏、分享!⭐
更多推荐




所有评论(0)