📌 一句话速览

本文将深入剖析如何通过 LangGraph、MCP 协议与 ReactAgent 三者融合,构建一个支持多轮对话、工具调用、状态持久化与任务中断的生产级智能代理系统。


运行效果

智能问答

📌 智能代理系统的演进需求

随着大模型能力的不断增强,单纯的“问答机器人”已无法满足企业级应用场景。用户期望的是:

  • 多轮对话记忆 —— 能记住上下文,支持连续追问
  • 工具调用能力 —— 可连接数据库、API、业务系统
  • 状态可持久化 —— 对话中断后能恢复现场
  • 任务可中断 —— 用户可随时取消长耗时操作
  • 流式输出体验 —— 实时响应,提升交互感

传统单次 Prompt 调用模式已力不从心。我们需要一个状态驱动、工具感知、可中断、可恢复的智能代理架构 —— 这正是 LangGraph + MCP + ReactAgent 技术组合的价值所在。


技术组合全景图

技术组件 角色定位 解决的问题
LangGraph 状态图引擎 对话状态管理、流程控制、持久化
MCP协议 工具调用标准协议 统一接入外部工具,解耦工具实现
ReactAgent 推理-行动循环框架 实现“思考→调用→观察→再思考”闭环
LangChain LLM与工具抽象层 统一模型调用、消息封装、工具集成

💡 本系统通过 create_react_agent 创建基于状态图的代理,使用 MultiServerMCPClient 动态加载工具,通过 InMemorySaver 实现对话状态持久化,最终形成一个高内聚、低耦合、易扩展的智能代理系统。


核心代码结构详解

1. 初始化与环境配置

```python
def __init__(self):
# 校验环境变量,确保关键配置不缺失
    required_env_vars = [
"MODEL_NAME", "MODEL_TEMPERATURE", "MODEL_BASE_URL",
"MODEL_API_KEY", "MCP_HUB_COMMON_QA_GROUP_URL",
    ]
for var in required_env_vars:
if not os.getenv(var):
            raise ValueError(f"Missing required environment variable: {var}")

# 初始化LLM客户端,支持流式、重试、超时等生产级配置
    self.llm = ChatOpenAI(...)

# 初始化MCP客户端,支持多工具服务器动态接入
    self.client = MultiServerMCPClient({
"mcp-hub": {
"url": os.getenv("MCP_HUB_COMMON_QA_GROUP_URL"),
"transport": "streamable_http",
        },
    })

# 使用内存检查点保存对话状态(生产环境建议替换为Redis)
    self.checkpointer = InMemorySaver()

# 运行中任务字典,用于支持任务取消
    self.running_tasks = {}

🛠️ 设计要点:环境变量校验前置,避免运行时崩溃;工具配置支持热插拔;状态存储可替换,便于扩展。

2. 流式响应封装

@staticmethod
def _create_response(content: str, message_type: str = "continue", data_type: str = DataTypeEnum.ANSWER.value[0]) -> str:
"""
    封装SSE格式响应,支持前端区分消息类型(继续/结束/错误/信息)
    """
    res = {
"data": {"messageType": message_type, "content": content},
"dataType": data_type,
    }
return"data:" + json.dumps(res, ensure_ascii=False) + "\n\n"

🌊 作用:实现真正的流式输出,前端可实时渲染模型思考过程、工具调用、最终答案,大幅提升用户体验。

3. 上下文记忆管理

@staticmethod
def short_trim_messages(state):
"""
    模型调用前自动修剪历史消息,防止上下文溢出
    保留系统消息 + 最新的人类消息序列,确保对话连贯性
    """
    trimmed_messages = trim_messages(
        messages=state["messages"],
        max_tokens=20000,
        token_counter=lambda msgs: sum(len(m.content or "") for m in msgs),
        strategy="last",      # 保留最新消息
        start_on="human",     # 从用户消息开始保留
        include_system=True,  # 必须保留系统提示
    )
return {"llm_input_messages": trimmed_messages}

🧠 智能裁剪:避免因历史消息过长导致模型性能下降或API报错,同时保持对话逻辑连贯。

4. 主运行逻辑:run_agent

async def run_agent(self, query: str, response, session_id: Optional[str] = None, uuid_str: str = None, user_token=None):
"""
    核心执行方法:启动智能代理,处理用户查询,流式返回结果
    支持:身份验证、任务取消、工具调用监听、对话状态持久化、记录存储
    """
# 解码用户Token,获取用户ID作为任务标识
    user_dict = await decode_jwt_token(user_token)
    task_id = user_dict["id"]
    task_context = {"cancelled": False}
    self.running_tasks[task_id] = task_context

    try:
        t02_answer_data = []  # 收集完整回答用于后续存储

# 动态获取可用工具列表
        tools = await self.client.get_tools()

# 使用session_id作为thread_id,实现多轮对话状态隔离
        thread_id = session_id if session_id else"default_thread"
        config = {"configurable": {"thread_id": thread_id}}

# 定义系统提示词,约束模型行为与输出格式
        system_message = SystemMessage(content="""...""")  # 内容略,见完整代码

# 创建React代理,绑定模型、工具、状态管理器和预处理钩子
        agent = create_react_agent(
            model=self.llm,
            tools=tools,
            prompt=system_message,
            checkpointer=self.checkpointer,
            pre_model_hook=self.short_trim_messages,  # 注册消息修剪钩子
        )

# 流式执行代理,监听每一步输出
        async for message_chunk, metadata in agent.astream(
            input={"messages": [HumanMessage(content=query)]},
            config=config,
            stream_mode="messages",
        ):
# 检查任务是否被取消
if self.running_tasks[task_id]["cancelled"]:
                await response.write(self._create_response("\n> 这条消息已停止", "info"))
                await response.write(self._create_response("", "end", DataTypeEnum.STREAM_END.value[0]))
break

# 处理工具调用节点
if metadata["langgraph_node"] == "tools":
                tool_name = message_chunk.name or "未知工具"
                tool_use = "> 调用工具:" + tool_name + "\n\n"
                await response.write(self._create_response(tool_use))
                t02_answer_data.append(tool_use)
continue

# 输出模型生成内容
if message_chunk.content:
                content = message_chunk.content
                t02_answer_data.append(content)
                await response.write(self._create_response(content))
if hasattr(response, "flush"):
                    await response.flush()
                await asyncio.sleep(0)  # 让出事件循环

# 仅在未取消时保存对话记录
if not self.running_tasks[task_id]["cancelled"]:
            await add_user_record(
                uuid_str, session_id, query, t02_answer_data, {},
                DiFyAppEnum.COMMON_QA.value[0], user_token
            )

    except asyncio.CancelledError:
        ... # 处理取消异常
    except Exception as e:
        ... # 处理运行时异常
    finally:
# 清理任务记录
if task_id in self.running_tasks:
            del self.running_tasks[task_id]

⚙️ 核心价值: 状态隔离:通过 thread_id 区分不同用户/会话 工具透明:实时输出工具调用过程,增强可信度 可中断:支持用户主动取消长时间运行任务 可审计:完整记录对话过程,便于复盘与优化

5. 任务取消与状态管理


async def cancel_task(self, task_id: str) -> bool:
"""取消指定任务,通过设置标志位实现优雅中断"""
if task_id in self.running_tasks:
        self.running_tasks[task_id]["cancelled"] = True
return True
return False

def get_running_tasks(self):
"""获取当前所有运行中任务ID,用于监控与管理"""
return list(self.running_tasks.keys())

🚫 用户体验:当模型“思考太久”,用户可点击“停止”按钮,系统立即响应,避免资源浪费与体验卡顿。

6. MCP使用姿势

  • streamable_http方式调用
self.client = MultiServerMCPClient({
"mcp-hub": {
"url": "http://xxxx.com",
"transport": "streamable_http",
  }
}
  • 本地子进程方式调用三方开源工具
self.client = MultiServerMCPClient({
"undoom-douyin-data-analysis": {
"command": "uvx",
"transport": "stdio",
"args": [
"--index-url",
"https://mirrors.aliyun.com/pypi/simple/",
"--from",
"undoom-douyin-data-analysis",
"undoom-douyin-mcp",
           ],
  },
}
  • 本地子进程方式调用本地开发的工具
current_dir = os.path.dirname(os.path.abspath(__file__))
mcp_tool_path = os.path.join(current_dir, "mcp", "query_db_tool.py")
self.client = MultiServerMCPClient({
"query_qa_record": {
"command": "python",
"args": [mcp_tool_path],
"transport": "stdio",
     }
}

总结

LangGraph + MCP + ReactAgent 的组合,不是简单的技术堆砌,而是面向复杂、真实、生产环境的智能代理架构解决方案。

它解决了传统 Agent 的三大痛点:

❌ 状态丢失 → ✅ LangGraph 状态图持久化 ❌ 工具混乱 → ✅ MCP 协议标准化接入 ❌ 黑盒执行 → ✅ ReactAgent 透明化推理过程

普通人如何抓住AI大模型的风口?

领取方式在文末

为什么要学习大模型?

目前AI大模型的技术岗位与能力培养随着人工智能技术的迅速发展和应用 , 大模型作为其中的重要组成部分 , 正逐渐成为推动人工智能发展的重要引擎 。大模型以其强大的数据处理和模式识别能力, 广泛应用于自然语言处理 、计算机视觉 、 智能推荐等领域 ,为各行各业带来了革命性的改变和机遇 。

目前,开源人工智能大模型已应用于医疗、政务、法律、汽车、娱乐、金融、互联网、教育、制造业、企业服务等多个场景,其中,应用于金融、企业服务、制造业和法律领域的大模型在本次调研中占比超过 30%。
在这里插入图片描述

随着AI大模型技术的迅速发展,相关岗位的需求也日益增加。大模型产业链催生了一批高薪新职业:
在这里插入图片描述

人工智能大潮已来,不加入就可能被淘汰。如果你是技术人,尤其是互联网从业者,现在就开始学习AI大模型技术,真的是给你的人生一个重要建议!

最后

只要你真心想学习AI大模型技术,这份精心整理的学习资料我愿意无偿分享给你,但是想学技术去乱搞的人别来找我!

在当前这个人工智能高速发展的时代,AI大模型正在深刻改变各行各业。我国对高水平AI人才的需求也日益增长,真正懂技术、能落地的人才依旧紧缺。我也希望通过这份资料,能够帮助更多有志于AI领域的朋友入门并深入学习。

真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发

在这里插入图片描述

大模型全套学习资料展示

自我们与MoPaaS魔泊云合作以来,我们不断打磨课程体系与技术内容,在细节上精益求精,同时在技术层面也新增了许多前沿且实用的内容,力求为大家带来更系统、更实战、更落地的大模型学习体验。

图片

希望这份系统、实用的大模型学习路径,能够帮助你从零入门,进阶到实战,真正掌握AI时代的核心技能!

01 教学内容

图片

  • 从零到精通完整闭环:【基础理论 →RAG开发 → Agent设计 → 模型微调与私有化部署调→热门技术】5大模块,内容比传统教材更贴近企业实战!

  • 大量真实项目案例: 带你亲自上手搞数据清洗、模型调优这些硬核操作,把课本知识变成真本事‌!

02适学人群

应届毕业生‌: 无工作经验但想要系统学习AI大模型技术,期待通过实战项目掌握核心技术。

零基础转型‌: 非技术背景但关注AI应用场景,计划通过低代码工具实现“AI+行业”跨界‌。

业务赋能突破瓶颈: 传统开发者(Java/前端等)学习Transformer架构与LangChain框架,向AI全栈工程师转型‌。

image.png

vx扫描下方二维码即可
在这里插入图片描述

本教程比较珍贵,仅限大家自行学习,不要传播!更严禁商用!

03 入门到进阶学习路线图

大模型学习路线图,整体分为5个大的阶段:
图片

04 视频和书籍PDF合集

图片

从0到掌握主流大模型技术视频教程(涵盖模型训练、微调、RAG、LangChain、Agent开发等实战方向)

图片

新手必备的大模型学习PDF书单来了!全是硬核知识,帮你少走弯路(不吹牛,真有用)
图片

05 行业报告+白皮书合集

收集70+报告与白皮书,了解行业最新动态!
图片

06 90+份面试题/经验

AI大模型岗位面试经验总结(谁学技术不是为了赚$呢,找个好的岗位很重要)图片
在这里插入图片描述

07 deepseek部署包+技巧大全

在这里插入图片描述

由于篇幅有限

只展示部分资料

并且还在持续更新中…

真诚无偿分享!!!
vx扫描下方二维码即可
加上后会一个个给大家发

在这里插入图片描述

Logo

更多推荐