基于LangGraph的工业智能体开发实践

🤖 本篇深入探讨LangGraph框架在工业智能体开发中的应用,带你构建灵活、强大的工作流编排系统。


📖 引言:从Chain到Graph的范式升级

LangChain作为大模型应用开发框架,通过Chain的概念实现了线性流程编排。然而,工业场景的智能决策往往需要复杂的分支判断、循环迭代、并行执行,这些能力超出了Chain的范畴。

💡 LangGraph的核心优势

LangGraph应运而生,它将Agent建模为有向图(Graph),通过节点(Node)和边(Edge)的定义,实现了更灵活的工作流编排。

传统Chain:
❌ 只能线性执行
❌ 缺乏循环和分支能力
❌ 状态管理有限
❌ 难以处理复杂场景

LangGraph:
✅ 支持复杂的图结构
✅ 强大的状态管理
✅ 灵活的分支和循环
✅ 适合工业复杂决策场景

🏗️ 一、LangGraph核心概念解析

1.1 🔄 从Chain到Graph

定义对比
特性 LangChain Chain LangGraph Graph
执行模式 线性执行 图结构执行
分支能力 ❌ 不支持 ✅ 支持
循环能力 ❌ 不支持 ✅ 支持
状态管理 基础 强大
调试能力 中等 优秀
工业适用性 ⭐⭐⭐ ⭐⭐⭐⭐⭐
代码对比

LangChain Chain(线性)

# 传统Chain:线性执行
chain = (
    prompt
    | llm
    | parser
)
result = chain.invoke({"query": "设备状态如何?"})

LangGraph Graph(图结构)

# LangGraph:图结构执行
graph = StateGraph(AgentState)
graph.add_node("understand", understand_node)
graph.add_node("retrieve", retrieve_node)
graph.add_node("reason", reason_node)
graph.add_edge("understand", "retrieve")
graph.add_conditional_edges(
    "retrieve",
    should_continue,
    {"continue": "reason", "end": END}
)

1.2 🧩 核心组件详解

State(状态)

定义:State是Agent在执行过程中传递的数据结构,包含输入、中间结果、输出等信息。

规则

✅ 必须使用TypedDict定义
✅ 使用Annotated标注reducer
✅ 支持嵌套结构
✅ 可选字段使用Optional
Node(节点)

定义:Node是图中的处理单元,执行具体的业务逻辑。

使用场景

  • 意图理解
  • 数据检索
  • 推理决策
  • 工具调用
  • 答案生成
Edge(边)

定义:Edge连接不同的节点,定义执行流程。

类型

🔗 普通边:顺序执行(add_edge)
🔀 条件边:根据条件分支(add_conditional_edges)
🔄 循环边:实现循环执行
Conditional Edge(条件边)

功能:根据当前状态决定下一个执行的节点。

示例

def should_retrieve(state):
    if state["confidence"] < 0.7:
        return "retrieve"  # 置信度低,重新检索
    else:
        return END          # 置信度高,结束
Checkpointer(检查点)

定义:Checkpointer负责保存Agent的历史状态,支持记忆和恢复。

类型

  • MemorySaver:内存存储(开发测试)
  • PostgresSaver:PostgreSQL存储(生产环境)

🎯 二、工业Agent的状态设计

2.1 📋 状态类型定义

from typing import TypedDict, List, Dict, Optional, Annotated
from langgraph.graph import add_messages

class IndustrialAgentState(TypedDict):
    """
    工业智能体状态定义

    该状态在Agent的各个节点之间传递,保存所有中间结果
    """

    # 👤 用户输入
    user_query: str                       # 用户原始查询
    query_type: str                       # 查询类型(问答/诊断/调度)

    # 🌐 中间处理结果
    intent: Optional[Dict]                # 意图识别结果
    context: Optional[Dict]               # 上下文信息(设备、参数等)
    knowledge: Optional[Dict]             # 知识图谱查询结果
    sensor_data: Optional[Dict]           # 传感器数据
    historical_data: Optional[List]       # 历史数据

    # 🔧 工具执行
    tool_calls: List[Dict]                # 工具调用记录
    tool_results: List[Dict]              # 工具执行结果

    # 🤔 推理过程
    reasoning_steps: List[str]            # 推理步骤记录
    confidence: float                     # 置信度

    # ✅ 最终输出
    final_answer: Optional[str]           # 最终答案
    action_plan: Optional[List[Dict]]     # 行动计划
    visualizations: Optional[Dict]        # 可视化数据

    # 💬 对话历史(使用add_messages reducer)
    messages: Annotated[List, add_messages]

    # 📋 元信息
    session_id: str                       # 会话ID
    timestamp: str                        # 时间戳

2.2 🔄 状态更新规则

from langgraph.graph import StateGraph, END

# 消息累加器(保留最近N条)
MAX_MESSAGES = 20

def message_reducer(old_messages, new_messages):
    """
    消息累加器:保留最近N条消息

    参数:
        old_messages: 历史消息列表
        new_messages: 新消息列表

    返回:
        List: 合并后的消息列表(最多N条)
    """
    combined = old_messages + new_messages
    return combined[-MAX_MESSAGES:]

# 状态聚合器
class StateUpdater:
    """状态更新工具类"""

    @staticmethod
    def update_context(state: IndustrialAgentState, new_context: Dict) -> IndustrialAgentState:
        """
        更新上下文信息

        使用规则:
        - 合并新旧上下文
        - 新上下文覆盖旧值
        """
        if state["context"] is None:
            state["context"] = {}
        state["context"].update(new_context)
        return state

    @staticmethod
    def add_reasoning_step(state: IndustrialAgentState, step: str) -> IndustrialAgentState:
        """
        添加推理步骤

        使用规则:
        - 追加到推理步骤列表
        - 记录思考过程
        """
        if state["reasoning_steps"] is None:
            state["reasoning_steps"] = []
        state["reasoning_steps"].append(step)
        return state

    @staticmethod
    def calculate_confidence(state: IndustrialAgentState) -> float:
        """
        计算综合置信度

        计算规则:
        - 工具成功率:40%权重
        - 知识匹配度:30%权重
        - 基础分:30%权重
        """
        confidence = 0.0
        if state["tool_results"]:
            success_rate = sum(1 for r in state["tool_results"] if r["success"]) / len(state["tool_results"])
            confidence += success_rate * 0.4

        if state["knowledge"]:
            confidence += 0.3  # 知识图谱有匹配

        confidence += 0.3  # 基础分

        return min(confidence, 1.0)

🔧 三、节点定义与实现

3.1 🧠 意图理解节点

from langchain_openai import ChatOpenAI
from langchain.prompts import ChatPromptTemplate
from langchain.output_parsers import JsonOutputParser

# 初始化大模型
llm = ChatOpenAI(
    model="doubao-seed-1-6-251015",
    temperature=0.1,
    max_tokens=1000
)

# 意图理解Prompt
INTENT_PROMPT = ChatPromptTemplate.from_messages([
    ("system", """你是一个工业场景意图识别专家。
请从用户查询中识别以下信息:

1. query_type: 查询类型(可选值):
   - "qa": 知识问答
   - "diagnosis": 故障诊断
   - "status": 设备状态查询
   - "schedule": 生产调度
   - "analysis": 数据分析

2. entities: 提取的实体(设备ID、参数名称、故障代码等)

3. urgency: 紧急程度(low/medium/high)

输出JSON格式:
{{
  "query_type": "...",
  "entities": {{"device_id": "...", "parameter": "..."}},
  "urgency": "...",
  "requires_data": true/false
}}"""),
    ("user", "用户查询:{query}")
])

def intent_understanding_node(state: IndustrialAgentState) -> IndustrialAgentState:
    """
    意图理解节点

    功能:
    - 使用LLM识别用户查询意图
    - 提取实体信息
    - 判断紧急程度

    输入:
        state.user_query: 用户查询文本

    输出:
        state.intent: 意图识别结果
        state.query_type: 查询类型
        state.context: 提取的实体
        state.reasoning_steps: 推理步骤
    """
    # 构建Prompt
    parser = JsonOutputParser()
    chain = INTENT_PROMPT | llm | parser

    try:
        intent_result = chain.invoke({"query": state["user_query"]})

        # 更新状态
        state["intent"] = intent_result
        state["query_type"] = intent_result.get("query_type", "unknown")

        # 提取实体到上下文
        if intent_result.get("entities"):
            if state["context"] is None:
                state["context"] = {}
            state["context"].update(intent_result["entities"])

        # 记录推理步骤
        state["reasoning_steps"].append(
            f"识别查询类型:{state['query_type']},"
            f"提取实体:{intent_result.get('entities', {})}"
        )

        print(f"[意图理解] {state['query_type']}")

    except Exception as e:
        state["intent"] = {"error": str(e)}
        state["reasoning_steps"].append(f"意图理解失败:{e}")

    return state

3.2 🔍 数据检索节点

from neo4j import GraphDatabase
import requests

class DataRetriever:
    """数据检索工具类"""

    def __init__(self, neo4j_uri, neo4j_user, neo4j_password):
        """
        初始化数据检索器

        参数:
            neo4j_uri: Neo4j数据库地址
            neo4j_user: 用户名
            neo4j_password: 密码
        """
        self.neo4j_driver = GraphDatabase.driver(
            neo4j_uri,
            auth=(neo4j_user, neo4j_password)
        )

    def retrieve_knowledge(self, device_id: str, query_type: str) -> Dict:
        """
        从知识图谱检索知识

        参数:
            device_id: 设备ID
            query_type: 查询类型

        返回:
            Dict: 检索到的知识
        """
        with self.neo4j_driver.session() as session:
            if query_type == "diagnosis":
                query = """
                MATCH (d:Device {id: $device_id})-[:HAS_FAULT]->(f:Fault)
                OPTIONAL MATCH (f)-[:CAUSED_BY]->(cause:Fault)
                RETURN f.name as fault, f.code as code,
                       collect(DISTINCT cause.name) as possible_causes
                """
            else:
                query = """
                MATCH (d:Device {id: $device_id})
                RETURN d.name as name, d.status as status,
                       d.last_maintenance as last_maint
                """

            result = session.run(query, device_id=device_id)
            records = [record.data() for record in result]

            return {"knowledge": records}

    def retrieve_sensor_data(self, device_id: str, hours: int = 24) -> Dict:
        """
        检索传感器数据

        参数:
            device_id: 设备ID
            hours: 查询最近多少小时的数据

        返回:
            Dict: 传感器数据
        """
        # 实际应用中调用IoT平台API
        # url = f"http://iot-platform/api/devices/{device_id}/data"
        # response = requests.get(url, params={"hours": hours})
        # return response.json()

        # 模拟数据
        return {
            "device_id": device_id,
            "timestamp": "2024-01-15T10:00:00Z",
            "vibration": 5.2,
            "temperature": 75.3,
            "pressure": 2.1
        }

def data_retrieval_node(state: IndustrialAgentState) -> IndustrialAgentState:
    """
    数据检索节点

    功能:
    - 根据意图检索知识图谱
    - 获取实时传感器数据
    - 更新状态
    """
    retriever = DataRetriever(
        neo4j_uri="bolt://localhost:7687",
        neo4j_user="neo4j",
        neo4j_password="password"
    )

    # 根据意图决定检索策略
    if state["query_type"] in ["diagnosis", "status"]:
        # 知识图谱检索
        device_id = state["context"].get("device_id")
        if device_id:
            knowledge_result = retriever.retrieve_knowledge(device_id, state["query_type"])
            state["knowledge"] = knowledge_result
            state["reasoning_steps"].append(f"从知识图谱检索设备{device_id}的信息")

    # 检索实时传感器数据
    if state["query_type"] in ["diagnosis", "status", "analysis"]:
        device_id = state["context"].get("device_id")
        if device_id:
            sensor_data = retriever.retrieve_sensor_data(device_id)
            state["sensor_data"] = sensor_data
            state["reasoning_steps"].append("检索实时传感器数据")

    return state

🏗️ 四、构建完整Agent

4.1 📊 图结构定义

from langgraph.graph import StateGraph, END

def build_industrial_agent():
    """
    构建工业智能体

    功能:
    - 创建状态图
    - 添加处理节点
    - 定义边和条件分支
    - 编译Agent

    返回:
        编译后的Agent实例
    """
    # 1. 创建状态图
    workflow = StateGraph(IndustrialAgentState)

    # 2. 添加节点
    workflow.add_node("intent", intent_understanding_node)
    workflow.add_node("retrieve", data_retrieval_node)
    workflow.add_node("reason", reasoning_node)
    workflow.add_node("tools", tool_execution_node)

    # 3. 定义入口
    workflow.set_entry_point("intent")

    # 4. 添加条件分支
    workflow.add_conditional_edges(
        "intent",
        should_retrieve,
        {
            "retrieve": "retrieve",
            "reason": "reason"
        }
    )

    # 5. 添加顺序边
    workflow.add_edge("retrieve", "reason")
    workflow.add_edge("reason", "tools")

    # 6. 添加循环分支
    workflow.add_conditional_edges(
        "tools",
        has_confident_answer,
        {
            "retrieve": "retrieve",  # 重新检索
            END: END  # 结束
        }
    )

    # 7. 编译Agent(添加记忆)
    from langgraph.checkpoint.memory import MemorySaver
    checkpointer = MemorySaver()

    agent = workflow.compile(checkpointer=checkpointer)

    return agent

4.2 🚀 Agent执行示例

# 初始化Agent
agent = build_industrial_agent()

# 执行查询
initial_state: IndustrialAgentState = {
    "user_query": "3号产线电机振动异常,请诊断故障原因",
    "query_type": "",
    "intent": None,
    "context": {},
    "knowledge": None,
    "sensor_data": None,
    "tool_calls": [],
    "tool_results": [],
    "reasoning_steps": [],
    "confidence": 0.0,
    "final_answer": None,
    "action_plan": None,
    "visualizations": None,
    "messages": [],
    "session_id": "session-001",
    "timestamp": "2024-01-15T10:00:00Z"
}

# 执行Agent
config = {"configurable": {"thread_id": "session-001"}}
result = agent.invoke(initial_state, config)

# 输出结果
print("=== 推理过程 ===")
for step in result["reasoning_steps"]:
    print(f"  - {step}")

print("\n=== 诊断结论 ===")
print(result["final_answer"])

print(f"\n置信度: {result['confidence']:.2f}")

📊 五、最佳实践

5.1 ✅ 容错设计

def safe_retrieve_node(state: IndustrialAgentState) -> IndustrialAgentState:
    """
    带容错的数据检索节点

    容错策略:
    - 捕获异常不中断流程
    - 降低置信度
    - 记录错误信息
    """
    try:
        return data_retrieval_node(state)
    except Exception as e:
        # 记录错误但不中断流程
        state["reasoning_steps"].append(f"检索失败,使用默认处理:{e}")
        state["confidence"] *= 0.5  # 降低置信度
        return state

5.2 ⏱️ 超时控制

import signal
from contextlib import contextmanager

@contextmanager
def timeout(seconds):
    """
    超时上下文管理器

    使用场景:
    - LLM调用超时
    - 外部API调用超时
    - 数据库查询超时

    参数:
        seconds: 超时时间(秒)
    """
    def timeout_handler(signum, frame):
        raise TimeoutError(f"操作超时({seconds}秒)")

    signal.signal(signal.SIGALRM, timeout_handler)
    signal.alarm(seconds)
    try:
        yield
    finally:
        signal.alarm(0)

def llm_with_timeout(state: IndustrialAgentState) -> IndustrialAgentState:
    """带超时的LLM调用"""
    try:
        with timeout(30):  # 30秒超时
            return reasoning_node(state)
    except TimeoutError:
        state["final_answer"] = "推理超时,请重试"
        state["confidence"] = 0.3
        return state

5.3 🚀 性能优化

🔧 节点并行化:
   - 使用concurrency参数
   - 并行执行独立节点

💾 缓存机制:
   - 缓存知识图谱查询结果
   - 缓存LLM响应

📦 批处理:
   - 批量处理数据
   - 减少数据库往返

📝 六、总结

6.1 本篇回顾

本篇介绍了基于LangGraph的工业智能体开发:

🏗️ LangGraph核心概念
    ↓
🎯 状态设计
    ↓
🔧 节点实现
    ↓
📊 Agent构建
    ↓
✅ 最佳实践

6.2 技术要点

✅ 使用StateGraph定义Agent结构
✅ 实现条件分支和循环
✅ 集成Checkpointer支持记忆
✅ 添加容错和超时控制
✅ 优化性能和并发

🚀 下篇预告

《RAG技术在工业问答系统中的应用》

敬请期待!🎉


📚 参考资源


感谢阅读!如有问题欢迎在评论区交流讨论 💬

版权归作者所有,未经许可请勿抄袭,套用,商用(或其它具有利益性行为)

⭐ 如果觉得有帮助,请点赞、收藏、分享!⭐

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐