本文介绍如何通过LangGraph、MCP协议与ReactAgent技术融合,构建支持多轮对话、工具调用、状态持久化和任务中断的生产级智能代理系统。利用LangGraph管理对话状态,MCP协议统一接入外部工具,ReactAgent实现推理-行动循环,并通过InMemorySaver实现状态持久化。该架构解决了传统Agent的状态丢失、工具混乱和黑盒执行三大痛点,形成高内聚、低耦合、易扩展的智能代理解决方案。

运行效果

智能问答

📌 智能代理系统的演进需求

随着大模型能力的不断增强,单纯的“问答机器人”已无法满足企业级应用场景。用户期望的是:

  • 多轮对话记忆 —— 能记住上下文,支持连续追问
  • 工具调用能力 —— 可连接数据库、API、业务系统
  • 状态可持久化 —— 对话中断后能恢复现场
  • 任务可中断 —— 用户可随时取消长耗时操作
  • 流式输出体验 —— 实时响应,提升交互感

传统单次 Prompt 调用模式已力不从心。我们需要一个状态驱动、工具感知、可中断、可恢复的智能代理架构 —— 这正是 LangGraph + MCP + ReactAgent 技术组合的价值所在。


技术组合全景图

技术组件 角色定位 解决的问题
LangGraph 状态图引擎 对话状态管理、流程控制、持久化
MCP协议 工具调用标准协议 统一接入外部工具,解耦工具实现
ReactAgent 推理-行动循环框架 实现“思考→调用→观察→再思考”闭环
LangChain LLM与工具抽象层 统一模型调用、消息封装、工具集成

💡 本系统通过 create_react_agent 创建基于状态图的代理,使用 MultiServerMCPClient 动态加载工具,通过 InMemorySaver 实现对话状态持久化,最终形成一个高内聚、低耦合、易扩展的智能代理系统。


核心代码结构详解

1. 初始化与环境配置

```python
def __init__(self):
# 校验环境变量,确保关键配置不缺失
    required_env_vars = [
"MODEL_NAME", "MODEL_TEMPERATURE", "MODEL_BASE_URL",
"MODEL_API_KEY", "MCP_HUB_COMMON_QA_GROUP_URL",
    ]
for var in required_env_vars:
if not os.getenv(var):
            raise ValueError(f"Missing required environment variable: {var}")

# 初始化LLM客户端,支持流式、重试、超时等生产级配置
    self.llm = ChatOpenAI(...)

# 初始化MCP客户端,支持多工具服务器动态接入
    self.client = MultiServerMCPClient({
"mcp-hub": {
"url": os.getenv("MCP_HUB_COMMON_QA_GROUP_URL"),
"transport": "streamable_http",
        },
    })

# 使用内存检查点保存对话状态(生产环境建议替换为Redis)
    self.checkpointer = InMemorySaver()

# 运行中任务字典,用于支持任务取消
    self.running_tasks = {}

🛠️ 设计要点:环境变量校验前置,避免运行时崩溃;工具配置支持热插拔;状态存储可替换,便于扩展。

2. 流式响应封装

@staticmethod
def _create_response(content: str, message_type: str = "continue", data_type: str = DataTypeEnum.ANSWER.value[0]) -> str:
"""
    封装SSE格式响应,支持前端区分消息类型(继续/结束/错误/信息)
    """
    res = {
"data": {"messageType": message_type, "content": content},
"dataType": data_type,
    }
return"data:" + json.dumps(res, ensure_ascii=False) + "\n\n"

🌊 作用:实现真正的流式输出,前端可实时渲染模型思考过程、工具调用、最终答案,大幅提升用户体验。

3. 上下文记忆管理

@staticmethod
def short_trim_messages(state):
"""
    模型调用前自动修剪历史消息,防止上下文溢出
    保留系统消息 + 最新的人类消息序列,确保对话连贯性
    """
    trimmed_messages = trim_messages(
        messages=state["messages"],
        max_tokens=20000,
        token_counter=lambda msgs: sum(len(m.content or "") for m in msgs),
        strategy="last",      # 保留最新消息
        start_on="human",     # 从用户消息开始保留
        include_system=True,  # 必须保留系统提示
    )
return {"llm_input_messages": trimmed_messages}

🧠 智能裁剪:避免因历史消息过长导致模型性能下降或API报错,同时保持对话逻辑连贯。

4. 主运行逻辑:run_agent

async def run_agent(self, query: str, response, session_id: Optional[str] = None, uuid_str: str = None, user_token=None):
"""
    核心执行方法:启动智能代理,处理用户查询,流式返回结果
    支持:身份验证、任务取消、工具调用监听、对话状态持久化、记录存储
    """
# 解码用户Token,获取用户ID作为任务标识
    user_dict = await decode_jwt_token(user_token)
    task_id = user_dict["id"]
    task_context = {"cancelled": False}
    self.running_tasks[task_id] = task_context

    try:
        t02_answer_data = []  # 收集完整回答用于后续存储

# 动态获取可用工具列表
        tools = await self.client.get_tools()

# 使用session_id作为thread_id,实现多轮对话状态隔离
        thread_id = session_id if session_id else"default_thread"
        config = {"configurable": {"thread_id": thread_id}}

# 定义系统提示词,约束模型行为与输出格式
        system_message = SystemMessage(content="""...""")  # 内容略,见完整代码

# 创建React代理,绑定模型、工具、状态管理器和预处理钩子
        agent = create_react_agent(
            model=self.llm,
            tools=tools,
            prompt=system_message,
            checkpointer=self.checkpointer,
            pre_model_hook=self.short_trim_messages,  # 注册消息修剪钩子
        )

# 流式执行代理,监听每一步输出
        async for message_chunk, metadata in agent.astream(
            input={"messages": [HumanMessage(content=query)]},
            config=config,
            stream_mode="messages",
        ):
# 检查任务是否被取消
if self.running_tasks[task_id]["cancelled"]:
                await response.write(self._create_response("\n> 这条消息已停止", "info"))
                await response.write(self._create_response("", "end", DataTypeEnum.STREAM_END.value[0]))
break

# 处理工具调用节点
if metadata["langgraph_node"] == "tools":
                tool_name = message_chunk.name or "未知工具"
                tool_use = "> 调用工具:" + tool_name + "\n\n"
                await response.write(self._create_response(tool_use))
                t02_answer_data.append(tool_use)
continue

# 输出模型生成内容
if message_chunk.content:
                content = message_chunk.content
                t02_answer_data.append(content)
                await response.write(self._create_response(content))
if hasattr(response, "flush"):
                    await response.flush()
                await asyncio.sleep(0)  # 让出事件循环

# 仅在未取消时保存对话记录
if not self.running_tasks[task_id]["cancelled"]:
            await add_user_record(
                uuid_str, session_id, query, t02_answer_data, {},
                DiFyAppEnum.COMMON_QA.value[0], user_token
            )

    except asyncio.CancelledError:
        ... # 处理取消异常
    except Exception as e:
        ... # 处理运行时异常
    finally:
# 清理任务记录
if task_id in self.running_tasks:
            del self.running_tasks[task_id]

⚙️ 核心价值: 状态隔离:通过 thread_id 区分不同用户/会话 工具透明:实时输出工具调用过程,增强可信度 可中断:支持用户主动取消长时间运行任务 可审计:完整记录对话过程,便于复盘与优化

5. 任务取消与状态管理


async def cancel_task(self, task_id: str) -> bool:
"""取消指定任务,通过设置标志位实现优雅中断"""
if task_id in self.running_tasks:
        self.running_tasks[task_id]["cancelled"] = True
return True
return False

def get_running_tasks(self):
"""获取当前所有运行中任务ID,用于监控与管理"""
return list(self.running_tasks.keys())

🚫 用户体验:当模型“思考太久”,用户可点击“停止”按钮,系统立即响应,避免资源浪费与体验卡顿。

6. MCP使用姿势

  • streamable_http方式调用
self.client = MultiServerMCPClient({
"mcp-hub": {
"url": "http://xxxx.com",
"transport": "streamable_http",
  }
}
  • 本地子进程方式调用三方开源工具
self.client = MultiServerMCPClient({
"undoom-douyin-data-analysis": {
"command": "uvx",
"transport": "stdio",
"args": [
"--index-url",
"https://mirrors.aliyun.com/pypi/simple/",
"--from",
"undoom-douyin-data-analysis",
"undoom-douyin-mcp",
           ],
  },
}
  • 本地子进程方式调用本地开发的工具
current_dir = os.path.dirname(os.path.abspath(__file__))
mcp_tool_path = os.path.join(current_dir, "mcp", "query_db_tool.py")
self.client = MultiServerMCPClient({
"query_qa_record": {
"command": "python",
"args": [mcp_tool_path],
"transport": "stdio",
     }
}

总结

LangGraph + MCP + ReactAgent 的组合,不是简单的技术堆砌,而是面向复杂、真实、生产环境的智能代理架构解决方案。

它解决了传统 Agent 的三大痛点:

❌ 状态丢失 → ✅ LangGraph 状态图持久化 ❌ 工具混乱 → ✅ MCP 协议标准化接入 ❌ 黑盒执行 → ✅ ReactAgent 透明化推理过程

📚 完整代码

参考我的开源项目: git@github.com:apconw/sanic-web.git

🌈 项目亮点

  • ✅ 集成 MCP 多智能体架构
  • ✅ 支持 Dify / LangChain / LlamaIndex / Ollama / vLLM / Neo4j
  • ✅ 前端采用 Vue3 + TypeScript + Vite5,现代化交互体验
  • ✅ 内置 ECharts / AntV 图表问答 + CSV 表格问答
  • ✅ 支持对接主流 RAG 系统 与 Text2SQL 引擎
  • ✅ 轻量级 Sanic 后端,适合快速部署与二次开发

运行效果:

数据问答

读者福利大放送:如果你对大模型感兴趣,想更加深入的学习大模型**,那么这份精心整理的大模型学习资料,绝对能帮你少走弯路、快速入门**

如果你是零基础小白,别担心——大模型入门真的没那么难,你完全可以学得会

👉 不用你懂任何算法和数学知识,公式推导、复杂原理这些都不用操心;
👉 也不挑电脑配置,普通家用电脑完全能 hold 住,不用额外花钱升级设备;
👉 更不用你提前学 Python 之类的编程语言,零基础照样能上手。

你要做的特别简单:跟着我的讲解走,照着教程里的步骤一步步操作就行。

包括:大模型学习线路汇总、学习阶段,大模型实战案例,大模型学习视频,人工智能、机器学习、大模型书籍PDF。带你从零基础系统性的学好大模型!

现在这份资料免费分享给大家,有需要的小伙伴,直接VX扫描下方二维码就能领取啦😝↓↓↓
在这里插入图片描述

为什么要学习大模型?

数据显示,2023 年我国大模型相关人才缺口已突破百万,这一数字直接暴露了人才培养体系的严重滞后与供给不足。而随着人工智能技术的飞速迭代,产业对专业人才的需求将呈爆发式增长,据预测,到 2025 年这一缺口将急剧扩大至 400 万!!
在这里插入图片描述

大模型学习路线汇总

整体的学习路线分成L1到L4四个阶段,一步步带你从入门到进阶,从理论到实战,跟着学习路线一步步打卡,小白也能轻松学会!
在这里插入图片描述

大模型实战项目&配套源码

光学理论可不够,这套学习资料还包含了丰富的实战案例,让你在实战中检验成果巩固所学知识
在这里插入图片描述

大模型学习必看书籍PDF

我精选了一系列大模型技术的书籍和学习文档(电子版),它们由领域内的顶尖专家撰写,内容全面、深入、详尽,为你学习大模型提供坚实的理论基础。
在这里插入图片描述

大模型超全面试题汇总

在面试过程中可能遇到的问题,我都给大家汇总好了,能让你们在面试中游刃有余
在这里插入图片描述

这些资料真的有用吗?

这份资料由我和鲁为民博士(北京清华大学学士和美国加州理工学院博士)共同整理,现任上海殷泊信息科技CEO,其创立的MoPaaS云平台获Forrester全球’强劲表现者’认证,服务航天科工、国家电网等1000+企业,以第一作者在IEEE Transactions发表论文50+篇,获NASA JPL火星探测系统强化学习专利等35项中美专利。本套AI大模型课程由清华大学-加州理工双料博士、吴文俊人工智能奖得主鲁为民教授领衔研发。

资料内容涵盖了从入门到进阶的各类视频教程和实战项目,无论你是小白还是有些技术基础的技术人员,这份资料都绝对能帮助你提升薪资待遇,转行大模型岗位。
在这里插入图片描述
👉获取方式

😝有需要的小伙伴,可以保存图片到VX扫描下方二维码免费领取【保证100%免费】
在这里插入图片描述
相信我,这套大模型系统教程将会是全网最齐全 最适合零基础的!!

Logo

更多推荐