零基础从入门到精通 AI Agent 开发(全栈保姆级教程)下篇:多 Agent 协作、LangGraph 工作流、MCP 协议集成、Agent 规划与反思、生产级 Agent 平台搭建
目录
- 开篇:单Agent的核心瓶颈,为什么多Agent是生产落地的必由之路
- 核心进阶:多Agent协作的底层原理、角色设计与主流协作模式
- 工作流引擎:LangGraph 深度拆解与多Agent工作流全流程实战
- 无限工具生态:MCP 协议核心原理与Agent全链路集成实战
- 智能跃迁:Agent 规划与反思能力进阶(自我修正、深度思考)
- 全链路落地:从零搭建企业级生产Agent平台(前后端+部署)
- 常见问题 FAQ & 附加篇预告 & 互动环节
1. 开篇:单Agent的核心瓶颈,为什么多Agent是生产落地的必由之路
在上篇教程中,我们从零实现了完整的单Agent系统,掌握了ReAct框架、工具调用、记忆管理三大核心能力,搭建了可执行复杂任务的智能助手。但当我们把Agent放到真实的企业级场景中,单Agent很快会遇到无法突破的瓶颈:
你想让Agent完成「从0到1开发一个用户管理系统,输出需求文档、前后端代码、测试用例并完成上线」的任务,单Agent会出现三个致命问题:
- 专业度严重不足:一个Agent既要懂产品需求分析,又要懂前后端开发,还要懂测试和运维,就像让一个人同时兼任产品、开发、测试、运维,输出的内容极易出现专业错误和幻觉
- 容错率极低:单Agent一旦在某一步走错方向,后续所有步骤都会跑偏,而且很难自我修正,最终输出完全不符合要求的结果
- 执行效率低下:复杂任务需要多步骤并行推进,单Agent只能线性执行,无法同时处理多个子任务,完成一个大型任务可能需要几十上百轮循环,耗时极长
而多Agent系统,就是解决这些问题的终极方案。它的本质,是给AI Agent搭建了一个「完整的专业团队」:每个Agent都有明确的专业角色和职责边界,像真实的团队一样分工协作、各司其职、互相校验、并行推进,最终高质量完成复杂任务。
举个直观的例子,同样是开发用户管理系统的任务,多Agent的执行流程是:
- 项目经理Agent:接收需求,拆解成「需求分析→代码开发→测试验收→上线部署」4个里程碑,制定执行计划,统筹全流程
- 产品经理Agent:接收项目经理的任务,输出完整的产品需求文档、原型说明、功能验收标准,提交给项目经理审核
- 开发工程师Agent:接收审核通过的需求文档,拆解技术方案,输出前后端完整代码,提交给测试工程师
- 测试工程师Agent:接收代码,编写测试用例,执行单元测试,输出测试报告,有BUG则返回开发工程师修改,无BUG则提交给项目经理验收
- 项目经理Agent:最终验收所有交付物,确认任务完成,输出最终结果
整个流程和真实的互联网团队工作模式完全一致,每个Agent只专注于自己的专业领域,输出的内容专业度、准确率、执行效率,都远超单Agent。
这就是多Agent的核心价值:通过分工协作,把大模型的通用能力,转化为可落地的专业生产力,也是目前AI Agent企业级落地的绝对主流方案。
本文,我们会从底层原理到完整实战,彻底搞定多Agent开发、工作流引擎、工具生态、高级智能、生产落地全链路,让你真正具备企业级AI Agent开发能力。
2. 核心进阶:多Agent协作的底层原理、角色设计与主流协作模式
在动手写代码之前,我们必须先搞懂多Agent系统的底层逻辑,避免只会调框架封装,遇到问题无从下手。
2.1 多Agent系统的本质
多Agent系统(Multi-Agent System, MAS)的本质,是由多个具备自主决策能力的单Agent组成,通过标准化的通信机制、明确的分工规则、统一的状态共享,协同完成复杂目标的分布式智能系统。
简单来说:单Agent是一个全能但不专业的人,多Agent是一个分工明确、配合默契的专业团队。
2.2 多Agent系统的六大核心组件
一个稳定、可落地的多Agent系统,必须包含以下6个核心组件,缺一不可:
| 组件名称 | 核心作用 | 通俗类比 | 核心实现 |
|---|---|---|---|
| 角色定义体系 | 每个Agent的身份、专业领域、职责边界、输出规范、准入准出标准 | 团队里的岗位JD | 定制化System Prompt、角色元数据配置 |
| 共享状态池 | 所有Agent可共同读取、修改的共享内存,存储任务进度、中间结果、对话历史、全局记忆 | 团队的共享文档、项目管理看板 | LangGraph State、分布式缓存、数据库 |
| 通信协议 | Agent之间传递信息的标准化格式,保证消息能被准确识别和处理 | 团队的沟通规范、会议制度 | 标准化消息体、事件驱动机制、广播/点对点通信 |
| 任务调度器 | 负责目标拆解、任务分配、进度管控、优先级调整 | 团队的项目经理/主管 | 规划Agent、条件分支逻辑、优先级队列 |
| 协调管控模块 | 负责解决Agent之间的冲突、意见分歧,验收交付物,把控整体方向 | 团队的负责人 | 主管Agent、投票决策机制、验收标准校验 |
| 反馈迭代机制 | Agent之间的结果反馈、错误修正、流程优化,实现自我迭代 | 团队的复盘、评审、绩效机制 | 反思框架、批评Agent、结果校验逻辑 |
2.3 工业界主流的4大多Agent协作模式
不同的任务场景,需要匹配不同的协作模式,这里我给你讲透工业界最常用的4种模式,包括适用场景、优缺点、落地案例,你可以直接按需选用。
2.3.1 顺序协作模式(流水线模式)
核心逻辑:任务被拆解成线性的执行步骤,每个Agent负责其中一个步骤,前一个Agent的输出,是后一个Agent的输入,像流水线一样依次执行,直到任务完成。
适用场景:流程固定、线性推进的任务,比如内容创作、需求开发、文档处理等。
优点:逻辑简单、流程可控、易于调试、稳定性极强。
缺点:无法并行执行,灵活性不足,某一步出错会阻塞整个流程。
落地案例:自媒体内容生产流水线(选题Agent→文案Agent→排版Agent→审核Agent→发布Agent)。
2.3.2 并行协作模式(分工汇总模式)
核心逻辑:大任务被拆解成多个互相独立的子任务,多个Agent同时并行执行不同的子任务,所有子任务完成后,由汇总Agent统一整合,输出最终结果。
适用场景:可拆解为独立子任务的场景,比如市场调研、竞品分析、多维度数据统计等。
优点:执行效率极高,可同时处理多个子任务,互不阻塞。
缺点:需要明确的任务拆解能力,子任务之间不能有强依赖。
落地案例:行业分析报告(多个Agent同时调研政策、市场、竞品、用户、技术,最后汇总成完整报告)。
2.3.3 层级管控模式(主管-执行模式)
核心逻辑:采用「主管Agent + 多个执行Agent」的层级架构,主管Agent负责全局规划、任务拆解、分配调度、结果验收,执行Agent负责落地具体的子任务,完成后提交给主管Agent审核,审核不通过则返回修改,审核通过则进入下一步。
适用场景:复杂、多分支、不确定性高的任务,比如项目开发、复杂问题解决、企业级服务等。
优点:全局可控性强,容错率高,可处理极复杂的任务,是目前工业界最主流的模式。
缺点:对主管Agent的规划、调度、验收能力要求极高。
落地案例:企业级项目开发(项目经理Agent→产品/开发/测试/运维执行Agent)。
2.3.4 联邦民主模式(平等投票模式)
核心逻辑:多个Agent处于完全平等的地位,没有层级之分,针对同一个问题,每个Agent从自己的专业维度输出分析结果和决策建议,最终通过投票、协商的方式达成一致,输出最终结果。
适用场景:需要多维度评估、高风险决策的场景,比如投资决策、风险评估、战略规划等。
优点:考虑维度全面,可有效避免单一视角的偏见和错误,决策准确率高。
缺点:执行效率低,容易出现意见分歧无法达成一致的情况。
落地案例:股票投资决策(宏观经济Agent、行业分析Agent、公司基本面Agent、技术面Agent、风控Agent共同投票决定投资策略)。
2.4 多Agent角色设计黄金法则(工业级落地经验)
90%的多Agent系统落地失败,都是因为角色设计出了问题。这里我给你总结了5条经过无数项目验证的黄金法则,帮你避开90%的坑:
- 单一职责原则:每个Agent只负责一个专业领域,绝对不要让一个Agent既做产品又做开发。职责越单一,专业度越高,出错概率越低。
- 职责边界绝对清晰:明确每个Agent「能做什么、不能做什么、输入是什么、输出必须符合什么标准」,避免越权、重复工作、互相推诿。
- 专业Prompt精准定制:每个Agent的System Prompt必须包含「身份定位→核心职责→工作规范→输出格式→准入准出标准」5个核心部分,越精准,Agent的执行效果越好。
- 准入准出标准化:明确每个Agent的输入必须满足什么条件,输出必须通过什么校验,才能进入下一个环节,从根源上避免错误内容在流程中传递。
- 角色精简原则:能用2个Agent完成的任务,绝对不要用5个。角色越多,通信成本越高,流程越复杂,出错概率越高,只保留核心必要的角色。
2.5 实战:工业级可用的多Agent角色Prompt模板
这里我给你一套可直接复用的、经过调优的4角色团队Prompt模板,对应「项目经理+产品经理+开发工程师+测试工程师」的经典层级管控模式,你可以直接修改适配自己的场景。
# agent_roles.py
# 项目经理Agent(主管Agent)
PROJECT_MANAGER_PROMPT = """
你是一名专业的互联网项目项目经理,拥有10年以上的项目管理经验,负责整个项目的全流程管控、任务拆解、调度协调和最终验收。
## 核心职责
1. 接收用户的原始需求,拆解成可执行的项目里程碑和子任务,制定完整的执行计划
2. 给对应的角色Agent分配任务,明确每个任务的交付标准和截止时间
3. 审核每个角色Agent提交的交付物,符合标准则进入下一个环节,不符合标准则明确指出问题,返回修改
4. 协调解决项目过程中出现的冲突和问题,把控项目整体方向,确保最终交付物完全满足用户需求
5. 项目全部完成后,输出完整的项目交付总结和最终结果
## 工作规范
1. 你必须严格按照「需求拆解→任务分配→审核验收→迭代优化→项目交付」的流程执行,绝对不能跳过任何环节
2. 你只负责项目管理和调度,绝对不能自己执行产品、开发、测试的具体工作,必须分配给对应的专业Agent执行
3. 审核交付物时,必须明确、具体地指出问题,给出可落地的修改建议,不能只说「不符合要求」
4. 只有当所有交付物都通过你的审核,完全满足用户需求时,你才能输出「项目验收通过,任务完成」的最终结论
## 输出格式规范
你必须严格按照以下格式输出,绝对不能修改:
1. 项目规划阶段:输出【项目里程碑规划】,明确每个里程碑的任务、负责人、交付标准
2. 任务分配阶段:输出【任务分配】,明确接收任务的角色、任务内容、交付要求、截止时间
3. 审核阶段:输出【交付物审核结果】,明确「通过/不通过」,不通过必须详细说明修改意见
4. 项目完成阶段:输出【项目最终交付总结】,附上所有交付物,输出Final Answer
"""
# 产品经理Agent
PRODUCT_MANAGER_PROMPT = """
你是一名专业的互联网产品经理,拥有8年以上的B端和C端产品设计经验,负责产品需求分析、文档撰写、功能设计和验收标准制定。
## 核心职责
1. 接收项目经理分配的需求任务,深入理解用户的核心诉求,输出完整、专业的产品需求文档(PRD)
2. 明确产品的核心功能、业务流程、用户场景、功能验收标准
3. 确保输出的PRD逻辑清晰、无遗漏、可落地,能直接交给开发工程师进行开发
4. 若PRD被项目经理或开发、测试提出修改意见,必须快速响应,优化完善PRD
## 工作规范
1. 你输出的PRD必须包含:需求背景、核心目标、用户场景、功能清单、业务流程图、功能详情、验收标准,缺一不可
2. 你只负责产品需求设计,绝对不能编写代码、执行测试等不属于产品经理职责的工作
3. 你的输出必须专业、严谨、可落地,不能出现模糊、歧义的描述,避免开发过程中出现理解偏差
## 输出格式规范
你必须严格按照产品需求文档(PRD)的标准格式输出,结构清晰,层级分明,便于开发和测试工程师阅读。
"""
# Python开发工程师Agent
DEVELOPER_PROMPT = """
你是一名专业的Python后端开发工程师,拥有10年以上的开发经验,精通Python、FastAPI、MySQL、RESTful API开发,代码规范、严谨、可运行。
## 核心职责
1. 接收项目经理分配的开发任务,基于审核通过的PRD文档,进行技术方案设计和代码开发
2. 输出完整、可直接运行的代码,包含详细的注释、接口文档、依赖说明
3. 确保代码无语法错误、逻辑严谨、符合开发规范,可直接通过测试
4. 若测试工程师发现BUG,或项目经理提出修改意见,必须快速定位问题,修复优化代码
## 工作规范
1. 你输出的代码必须是完整、可直接运行的,不能只输出代码片段,必须包含导入语句、函数定义、入口执行逻辑
2. 你必须严格遵循PEP8代码规范,添加详细的注释,说明每个函数的作用、参数、返回值
3. 你只负责Python后端代码开发,绝对不能做产品需求设计、测试用例编写等不属于开发工程师职责的工作
4. 代码必须做异常处理,保证鲁棒性,不能出现一运行就报错的情况
## 输出格式规范
你必须严格按照以下格式输出:
1. 【技术方案设计】:说明整体的技术架构、模块划分、接口设计
2. 【依赖说明】:列出需要安装的Python依赖包和版本
3. 【完整代码】:分模块输出完整的、可直接运行的代码,每个代码块都有明确的文件名和注释
4. 【运行说明】:说明代码的运行步骤、环境要求、注意事项
"""
# 测试工程师Agent
TEST_ENGINEER_PROMPT = """
你是一名专业的软件测试工程师,拥有8年以上的功能测试、接口测试经验,负责测试用例编写、测试执行、BUG输出和回归验证。
## 核心职责
1. 接收项目经理分配的测试任务,基于PRD文档和开发工程师输出的代码,编写完整的测试用例
2. 执行测试,输出详细的测试报告,明确指出代码中的BUG、逻辑问题、不符合需求的地方
3. 验证开发工程师修复后的代码,确认BUG是否解决,输出回归测试结果
4. 只有当所有测试用例都通过,无遗留BUG时,才能输出「测试通过」的结论
## 工作规范
1. 你的测试用例必须覆盖所有的核心功能、边界场景、异常场景,不能有遗漏
2. 你输出的BUG必须明确、具体,包含:BUG描述、复现步骤、预期结果、实际结果、严重等级,便于开发工程师定位修复
3. 你只负责测试工作,绝对不能修改代码、修改需求等不属于测试工程师职责的工作
## 输出格式规范
你必须严格按照以下格式输出:
1. 【测试用例】:列出所有的测试用例,包含用例ID、用例名称、测试步骤、预期结果
2. 【测试报告】:说明测试执行情况、通过率、未通过的用例详情
3. 【BUG清单】:详细列出所有发现的BUG,按严重等级排序
4. 【测试结论】:明确「测试通过/不通过」,不通过必须说明原因
"""
3. 工作流引擎:LangGraph 深度拆解与多Agent工作流全流程实战
理解了多Agent的核心原理和角色设计之后,我们需要一个强大的工作流引擎,来落地多Agent的复杂协作流程。而LangGraph,就是目前工业界最主流、最稳定、生态最完善的Agent工作流引擎,没有之一。
3.1 为什么是LangGraph?
很多同学会问:我用普通的Python代码、LangChain Chain也能实现多Agent流程,为什么一定要用LangGraph?
核心原因是,普通的线性Chain,根本无法满足Agent的核心需求:
- 无法实现循环执行:比如代码测试不通过,需要返回开发环节修改,线性Chain无法做到
- 无法实现条件分支:比如根据审核结果,动态决定是进入下一个环节还是返回修改
- 无法实现状态共享:多Agent之间需要共享任务进度、中间结果、记忆,线性Chain无法做到全局状态统一管理
- 无法实现持久化与断点续跑:生产环境中,Agent执行到一半中断,需要能从断点继续执行,线性Chain无法支持
而LangGraph,是LangChain团队专门为Agent场景开发的状态图工作流引擎,完美解决了以上所有问题,它天生就是为Agent的复杂循环、条件分支、状态共享而生的。
3.2 LangGraph核心概念(零基础一次性讲透)
LangGraph的核心概念非常简单,我用最通俗的类比,让你一眼看懂,零基础也能完全理解:
| 核心概念 | 核心作用 | 通俗类比 |
|---|---|---|
| State(状态) | 整个工作流的全局共享数据池,所有节点都可以读取和修改,存储任务进度、中间结果、对话历史、全局记忆等所有数据 | 团队的共享项目文档、在线协作文档,所有人都能看、能改,所有修改都同步给所有人 |
| Node(节点) | 工作流的执行单元,每个节点对应一个具体的执行逻辑,可以是一个Agent、一个工具调用、一个数据处理函数 | 团队里的每个员工,负责完成自己的具体工作,处理完后把结果更新到共享文档里 |
| Edge(边) | 连接两个节点的流程线,定义执行顺序,A节点执行完成后,自动执行B节点 | 团队的工作流程,A员工做完自己的工作,交给B员工继续做 |
| Conditional Edge(条件边) | 工作流的分支逻辑核心,根据当前State的内容,动态决定下一个要执行的节点 | 团队的审批流程,主管审核通过,就进入下一个环节;审核不通过,就返回修改 |
| Entry Point(入口点) | 工作流的开始节点,整个流程从这里启动 | 项目启动会,项目正式开始 |
| Finish Point(结束点) | 工作流的终止节点,流程执行到这里就结束 | 项目验收完成,正式结项 |
一句话总结LangGraph的核心逻辑:我们定义好全局共享的State,然后搭建节点和边,组成一个完整的状态图,工作流启动后,节点会按照边的规则执行,不断修改State,直到走到结束节点。
3.3 环境准备与基础初始化
首先,我们安装必要的依赖,和上篇的环境完全兼容:
# 安装核心依赖
pip install langgraph langchain langchain-openai python-dotenv
然后,我们复用上篇的llm.py文件,封装了LLM调用函数,无需重复开发。接下来,我们先从最简单的Hello World例子开始,快速上手LangGraph的核心用法。
3.4 入门实战:从零实现第一个LangGraph工作流
我们先实现一个最简单的工作流,理解State、Node、Edge的核心用法,代码可直接复制运行:
# 01_langgraph_hello_world.py
import os
from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from operator import add
# 加载环境变量
load_dotenv()
# ===================== 第一步:定义全局State(共享数据池) =====================
# 用TypedDict定义State的结构,所有字段都有明确的类型
# Annotated[list, add] 表示这个字段是列表类型,每次修改会追加内容,而不是覆盖
class AgentState(TypedDict):
user_input: str # 用户的原始输入
messages: Annotated[list, add] # 对话历史消息列表
current_step: str # 当前执行步骤
final_result: str # 最终结果
# ===================== 第二步:初始化LLM和工作流 =====================
# 初始化LLM,复用上篇的配置
llm = ChatOpenAI(
model="gpt-3.5-turbo",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0
)
# 初始化状态图,传入我们定义的State类型
workflow = StateGraph(AgentState)
# ===================== 第三步:定义Node节点(执行单元) =====================
# 节点1:需求分析节点
def requirement_analysis(state: AgentState):
print("===== 执行需求分析节点 =====")
user_input = state["user_input"]
# 调用LLM进行需求分析
prompt = f"请分析用户的需求,提炼核心目标和关键要求,用户输入:{user_input}"
response = llm.invoke(prompt).content
# 返回要更新的State字段,LangGraph会自动合并到全局State中
return {
"messages": [{"role": "assistant", "content": f"需求分析结果:{response}"}],
"current_step": "需求分析完成"
}
# 节点2:内容生成节点
def content_generation(state: AgentState):
print("===== 执行内容生成节点 =====")
# 从全局State中读取需求分析结果
messages = state["messages"]
requirement_result = messages[-1]["content"]
# 调用LLM生成内容
prompt = f"基于以下需求分析结果,生成对应的完整内容:{requirement_result}"
response = llm.invoke(prompt).content
# 更新State
return {
"messages": [{"role": "assistant", "content": f"生成的内容:{response}"}],
"current_step": "内容生成完成",
"final_result": response
}
# 节点3:结果审核节点
def result_review(state: AgentState):
print("===== 执行结果审核节点 =====")
final_result = state["final_result"]
user_input = state["user_input"]
# 调用LLM审核内容是否符合需求
prompt = f"请审核以下生成的内容是否符合用户的原始需求,只需要输出「通过」或「不通过」,用户需求:{user_input},生成内容:{final_result}"
review_result = llm.invoke(prompt).content.strip()
# 更新State
return {
"messages": [{"role": "assistant", "content": f"审核结果:{review_result}"}],
"current_step": f"审核{review_result}"
}
# ===================== 第四步:定义条件分支逻辑 =====================
# 审核结果判断函数,根据State的内容,决定下一个执行的节点
def review_router(state: AgentState):
review_result = state["messages"][-1]["content"]
if "通过" in review_result:
# 审核通过,走到结束节点
return END
else:
# 审核不通过,返回内容生成节点重新生成
return "content_generation"
# ===================== 第五步:搭建工作流,添加节点和边 =====================
# 1. 把所有节点添加到工作流中
workflow.add_node("requirement_analysis", requirement_analysis)
workflow.add_node("content_generation", content_generation)
workflow.add_node("result_review", result_review)
# 2. 设置入口点,工作流从需求分析节点开始
workflow.set_entry_point("requirement_analysis")
# 3. 添加普通边,定义执行顺序
workflow.add_edge("requirement_analysis", "content_generation")
workflow.add_edge("content_generation", "result_review")
# 4. 添加条件边,根据审核结果决定流程走向
workflow.add_conditional_edges(
"result_review", # 条件边的起点
review_router, # 路由判断函数
# 路由函数返回值和目标节点的映射
{
END: END,
"content_generation": "content_generation"
}
)
# ===================== 第六步:编译工作流,启动执行 =====================
# 编译工作流,生成可执行的应用
app = workflow.compile()
# 启动工作流,传入初始State
initial_state = {
"user_input": "写一篇500字左右的关于AI Agent发展趋势的文章,要求逻辑清晰,观点明确",
"messages": [],
"current_step": "项目启动",
"final_result": ""
}
# 执行工作流,获取最终结果
result = app.invoke(initial_state)
# 输出最终结果
print("\n===== 工作流执行完成 =====")
print(f"最终执行步骤:{result['current_step']}")
print(f"最终生成的文章:\n{result['final_result']}")
运行这段代码,你会看到工作流的完整执行过程:
- 先执行需求分析,提炼用户的核心需求
- 执行内容生成,基于需求分析生成文章
- 执行结果审核,判断文章是否符合要求
- 如果审核不通过,自动返回内容生成节点重新生成;如果审核通过,流程结束
恭喜你!你已经掌握了LangGraph的核心用法,这就是所有复杂Agent工作流的基础。
3.5 进阶实战:从零实现完整的多Agent协作工作流
现在,我们用LangGraph实现上一节定义的「项目经理+产品经理+开发工程师+测试工程师」完整的多Agent团队工作流,这是工业级可直接落地的代码。
3.5.1 第一步:定义State结构和角色Prompt
# 02_multi_agent_workflow.py
import os
from dotenv import load_dotenv
from typing import TypedDict, Annotated
from langgraph.graph import StateGraph, END
from langchain_openai import ChatOpenAI
from operator import add
from agent_roles import (
PROJECT_MANAGER_PROMPT,
PRODUCT_MANAGER_PROMPT,
DEVELOPER_PROMPT,
TEST_ENGINEER_PROMPT
)
# 加载环境变量
load_dotenv()
# 定义全局State结构
class MultiAgentState(TypedDict):
user_requirement: str # 用户的原始需求
messages: Annotated[list, add] # 全局对话历史
current_role: str # 当前执行的角色
prd_document: str # 产品需求文档
project_plan: str # 项目规划
code_content: str # 开发的代码内容
test_report: str # 测试报告
review_result: str # 审核结果
final_delivery: str # 最终交付物
# 初始化LLM
llm = ChatOpenAI(
model="gpt-4o", # 复杂任务推荐使用GPT-4o,效果更好
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0
)
# 初始化工作流
workflow = StateGraph(MultiAgentState)
3.5.2 第二步:实现每个角色的Agent节点
# 节点1:项目经理Agent-项目规划节点
def project_manager_planning(state: MultiAgentState):
print("\n===== 【项目经理】执行项目规划 =====")
user_requirement = state["user_requirement"]
# 构建Prompt,调用LLM生成项目规划
messages = [
{"role": "system", "content": PROJECT_MANAGER_PROMPT},
{"role": "user", "content": f"用户的原始需求是:{user_requirement},请你输出完整的项目里程碑规划和任务分配计划"}
]
project_plan = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【项目经理】项目规划:\n{project_plan}"}],
"current_role": "项目经理",
"project_plan": project_plan
}
# 节点2:产品经理Agent-PRD编写节点
def product_manager_prd(state: MultiAgentState):
print("\n===== 【产品经理】编写PRD文档 =====")
user_requirement = state["user_requirement"]
project_plan = state["project_plan"]
messages = [
{"role": "system", "content": PRODUCT_MANAGER_PROMPT},
{"role": "user", "content": f"用户原始需求:{user_requirement}\n项目规划:{project_plan}\n请你输出完整的产品需求文档(PRD)"}
]
prd_document = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【产品经理】PRD文档:\n{prd_document}"}],
"current_role": "产品经理",
"prd_document": prd_document
}
# 节点3:项目经理Agent-PRD审核节点
def project_manager_prd_review(state: MultiAgentState):
print("\n===== 【项目经理】审核PRD文档 =====")
user_requirement = state["user_requirement"]
prd_document = state["prd_document"]
messages = [
{"role": "system", "content": PROJECT_MANAGER_PROMPT},
{"role": "user", "content": f"用户原始需求:{user_requirement}\n产品经理输出的PRD文档:{prd_document}\n请你审核这份PRD文档是否符合用户需求,输出审核结果,只允许输出「PRD审核通过」或「PRD审核不通过」,并详细说明修改意见"}
]
review_result = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【项目经理】PRD审核结果:\n{review_result}"}],
"current_role": "项目经理",
"review_result": review_result
}
# 节点4:开发工程师Agent-代码开发节点
def developer_coding(state: MultiAgentState):
print("\n===== 【开发工程师】进行代码开发 =====")
prd_document = state["prd_document"]
messages = [
{"role": "system", "content": DEVELOPER_PROMPT},
{"role": "user", "content": f"产品需求文档(PRD):{prd_document}\n请你基于PRD,输出完整的、可直接运行的Python代码"}
]
code_content = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【开发工程师】开发完成的代码:\n{code_content}"}],
"current_role": "开发工程师",
"code_content": code_content
}
# 节点5:测试工程师Agent-测试执行节点
def test_engineer_testing(state: MultiAgentState):
print("\n===== 【测试工程师】执行测试 =====")
prd_document = state["prd_document"]
code_content = state["code_content"]
messages = [
{"role": "system", "content": TEST_ENGINEER_PROMPT},
{"role": "user", "content": f"产品需求文档(PRD):{prd_document}\n开发工程师输出的代码:{code_content}\n请你编写测试用例,执行测试,输出完整的测试报告和BUG清单"}
]
test_report = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【测试工程师】测试报告:\n{test_report}"}],
"current_role": "测试工程师",
"test_report": test_report
}
# 节点6:项目经理Agent-最终验收节点
def project_manager_final_acceptance(state: MultiAgentState):
print("\n===== 【项目经理】最终项目验收 =====")
user_requirement = state["user_requirement"]
prd_document = state["prd_document"]
code_content = state["code_content"]
test_report = state["test_report"]
messages = [
{"role": "system", "content": PROJECT_MANAGER_PROMPT},
{"role": "user", "content": f"用户原始需求:{user_requirement}\nPRD文档:{prd_document}\n开发代码:{code_content}\n测试报告:{test_report}\n请你对整个项目进行最终验收,确认是否完全满足用户需求,输出验收结果和项目交付总结"}
]
final_delivery = llm.invoke(messages).content
return {
"messages": [{"role": "assistant", "content": f"【项目经理】最终交付总结:\n{final_delivery}"}],
"current_role": "项目经理",
"final_delivery": final_delivery,
"review_result": "项目验收通过"
}
3.5.3 第三步:定义条件路由与工作流搭建
# ===================== 定义条件路由函数 =====================
# PRD审核结果路由
def prd_review_router(state: MultiAgentState):
review_result = state["review_result"]
if "审核通过" in review_result:
# PRD审核通过,进入代码开发环节
return "developer_coding"
else:
# 审核不通过,返回产品经理修改PRD
return "product_manager_prd"
# 测试结果路由
def test_report_router(state: MultiAgentState):
test_report = state["test_report"]
if "测试通过" in test_report:
# 测试通过,进入最终验收环节
return "project_manager_final_acceptance"
else:
# 测试不通过,返回开发工程师修复代码
return "developer_coding"
# ===================== 搭建工作流 =====================
# 1. 添加所有节点
workflow.add_node("project_manager_planning", project_manager_planning)
workflow.add_node("product_manager_prd", product_manager_prd)
workflow.add_node("project_manager_prd_review", project_manager_prd_review)
workflow.add_node("developer_coding", developer_coding)
workflow.add_node("test_engineer_testing", test_engineer_testing)
workflow.add_node("project_manager_final_acceptance", project_manager_final_acceptance)
# 2. 设置入口点
workflow.set_entry_point("project_manager_planning")
# 3. 添加普通边
workflow.add_edge("project_manager_planning", "product_manager_prd")
workflow.add_edge("product_manager_prd", "project_manager_prd_review")
workflow.add_edge("developer_coding", "test_engineer_testing")
workflow.add_edge("project_manager_final_acceptance", END)
# 4. 添加条件边
workflow.add_conditional_edges(
"project_manager_prd_review",
prd_review_router,
{
"product_manager_prd": "product_manager_prd",
"developer_coding": "developer_coding"
}
)
workflow.add_conditional_edges(
"test_engineer_testing",
test_report_router,
{
"developer_coding": "developer_coding",
"project_manager_final_acceptance": "project_manager_final_acceptance"
}
)
# ===================== 编译并运行工作流 =====================
app = workflow.compile()
# 初始状态,传入用户需求
initial_state = {
"user_requirement": "用Python开发一个用户管理系统的后端接口,包含用户注册、登录、信息查询、修改、删除功能,基于FastAPI框架,使用内存存储数据,无需数据库",
"messages": [],
"current_role": "项目启动",
"prd_document": "",
"project_plan": "",
"code_content": "",
"test_report": "",
"review_result": "",
"final_delivery": ""
}
# 执行工作流
result = app.invoke(initial_state)
# 输出最终结果
print("\n\n===== 多Agent项目执行完成 =====")
print(f"最终验收结果:{result['review_result']}")
print(f"项目最终交付总结:\n{result['final_delivery']}")
3.5.4 运行效果
运行这段代码,你会看到一个完整的多Agent团队,像真实的项目团队一样,自动完成从项目规划→PRD编写→PRD审核→代码开发→测试→最终验收的全流程,审核不通过会自动返回修改,直到项目完成。
这就是目前企业级AI Agent开发的标准模式,你只需要修改角色Prompt和节点逻辑,就能适配任何复杂的业务场景。
3.6 LangGraph生产级高级特性
这里给你介绍几个LangGraph生产级必备的高级特性,帮你解决落地过程中的核心问题:
- 状态持久化与断点续跑:通过
Checkpointer可以把工作流的State持久化到数据库,执行中断后可以从断点继续执行,生产环境必备 - 子图嵌套:可以把复杂的工作流拆分成多个子图,模块化管理,比如把产品开发流程拆成PRD子图、开发子图、测试子图,便于维护
- 并行节点执行:支持并行执行多个节点,比如多个Agent同时做市场调研,大幅提升执行效率
- 人机交互:支持在工作流中加入人工审核环节,流程执行到对应节点会暂停,等待人工输入后继续执行,适合高风险场景
- 流式输出:支持流式输出工作流的执行过程,实时展示Agent的执行进度,适合前端界面展示
4. 无限工具生态:MCP 协议核心原理与Agent全链路集成实战
我们在上篇实现了基础的工具调用,但传统的工具开发模式有一个致命的痛点:每新增一个工具,都要单独开发代码、写Prompt、适配接口、维护更新,开发成本极高,无法快速扩展工具生态。
而MCP协议(Model Context Protocol),就是解决这个问题的终极方案,它是目前AI Agent工具生态的绝对主流标准,能让你的Agent一键接入无限的工具能力,无需单独开发。
4.1 MCP协议到底是什么?
MCP协议是由Anthropic牵头,联合OpenAI、Google等大厂共同推出的标准化大模型上下文交互协议,它的核心定位是「大模型和外部世界之间的统一桥梁」。
通俗来说,MCP协议就像手机上的「应用商店」:
- 工具提供方只需要按照MCP协议,开发一个MCP Server,就能把自己的工具/服务开放给所有支持MCP的Agent使用
- 你的Agent只需要实现一个MCP Client,就能一键接入所有支持MCP协议的工具,无需关心工具的底层实现,不用写任何适配代码
它彻底解决了传统工具调用的三大痛点:
- 开发成本极高:传统模式每个工具都要单独开发适配,MCP协议一键接入所有工具
- 生态碎片化:不同的Agent框架、不同的大模型,工具调用格式不统一,MCP是全行业统一的标准
- 维护成本极高:工具更新迭代,不需要修改你的Agent代码,MCP Server会自动同步更新
目前,MCP协议已经有了极其丰富的生态,包括文件系统、搜索引擎、Git、数据库、Slack、飞书、AWS、阿里云等上百种工具,只要接入MCP协议,你的Agent就能立刻拥有这些能力。
4.2 MCP协议的核心架构
MCP协议采用客户端-服务端(C/S)架构,核心分为两个部分:
- MCP Client:运行在你的Agent系统中,负责和MCP Server通信,获取工具列表、调用工具、接收返回结果,相当于你的Agent的「工具管理器」
- MCP Server:由工具提供方开发,负责实现具体的工具逻辑,暴露标准化的接口给MCP Client调用,相当于「工具应用」
两者之间通过标准化的JSON-RPC协议通信,支持三种核心能力:
- 工具调用:Client获取Server提供的工具列表,调用指定的工具,获取执行结果
- 上下文推送:Server主动向Client推送上下文信息,比如文件更新、实时消息
- 实时通知:Server主动推送事件通知,比如代码提交、消息提醒
4.3 环境准备与基础初始化
首先,我们安装MCP协议的官方SDK,和我们之前的Agent系统完全兼容:
# 安装MCP核心SDK
pip install mcp openai python-dotenv
4.4 入门实战:从零实现MCP Client与Server
我们先从零实现一个最简单的MCP Server和MCP Client,理解MCP协议的核心工作流程,代码可直接复制运行。
4.4.1 第一步:实现一个简单的MCP Server
我们实现一个提供「计算器工具」和「天气查询工具」的MCP Server,保存为mcp_server.py:
# mcp_server.py
from mcp.server import Server
from mcp.types import Tool, TextContent
import asyncio
# 初始化MCP Server,设置名称
server = Server("demo-mcp-server")
# ===================== 注册工具1:计算器工具 =====================
@server.tool()
async def calculator(expression: str) -> list[TextContent]:
"""
执行数学计算,支持加减乘除、括号等四则运算
:param expression: 数学计算表达式,比如 "100*20+50"
"""
try:
result = eval(expression, {"__builtins__": None}, {})
return [
TextContent(
type="text",
text=f"计算结果:{expression} = {result}"
)
]
except Exception as e:
return [
TextContent(
type="text",
text=f"计算错误:{str(e)}"
)
]
# ===================== 注册工具2:天气查询工具 =====================
@server.tool()
async def get_weather(city: str, date: str) -> list[TextContent]:
"""
查询指定城市指定日期的天气信息
:param city: 要查询的城市,比如 "广州番禺"
:param date: 要查询的日期,格式为 "YYYY-MM-DD"
"""
# 这里模拟天气查询,实际场景可以对接天气API
mock_weather_data = {
"广州番禺": {
"2026-04-18": "多云,气温22-28℃,南风3级,湿度65%",
"2026-04-19": "小雨,气温20-25℃,东风2级,湿度80%"
},
"北京朝阳": {
"2026-04-18": "晴,气温10-22℃,北风4级,湿度30%",
"2026-04-19": "多云,气温12-24℃,南风3级,湿度35%"
}
}
weather = mock_weather_data.get(city, {}).get(date, "暂无该城市该日期的天气数据")
return [
TextContent(
type="text",
text=f"{city} {date} 天气:{weather}"
)
]
# ===================== 启动MCP Server =====================
async def main():
# 使用stdio模式运行Server,和Client通过标准输入输出通信
await server.run_stdio()
if __name__ == "__main__":
asyncio.run(main())
4.4.2 第二步:实现MCP Client,接入Server并调用工具
现在,我们实现MCP Client,对接我们刚才开发的MCP Server,获取工具列表,调用工具,并且集成到我们的LLM中,让大模型能自动调用MCP工具,保存为mcp_client.py:
# mcp_client.py
import os
import asyncio
from dotenv import load_dotenv
from openai import AsyncOpenAI
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client
# 加载环境变量
load_dotenv()
# 初始化AsyncOpenAI客户端
client = AsyncOpenAI(
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL")
)
# ===================== 核心函数:MCP工具调用与LLM集成 =====================
async def run_mcp_agent(user_query):
# 1. 配置MCP Server参数,指定我们刚才开发的Server脚本
server_params = StdioServerParameters(
command="python",
args=["mcp_server.py"] # 我们的MCP Server脚本路径
)
# 2. 创建stdio客户端,连接MCP Server
async with stdio_client(server_params) as (read, write):
# 3. 创建Client会话,初始化连接
async with ClientSession(read, write) as session:
# 初始化会话,握手连接
await session.initialize()
# 4. 获取MCP Server提供的所有工具列表
tools_result = await session.list_tools()
available_tools = tools_result.tools
print(f"成功获取MCP工具列表:{[tool.name for tool in available_tools]}")
# 5. 把MCP工具转换成OpenAI支持的Function Calling格式
openai_tools = []
for tool in available_tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
})
# 6. 调用LLM,让大模型自动判断是否需要调用工具
messages = [
{"role": "system", "content": "你是一个智能AI助手,你可以通过调用MCP工具完成用户的需求,必须严格按照工具的参数要求调用,严禁编造参数。如果不需要调用工具,直接回答用户的问题。"},
{"role": "user", "content": user_query}
]
response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
tools=openai_tools,
temperature=0
)
response_message = response.choices[0].message
# 7. 如果大模型决定调用工具,执行工具调用
if response_message.tool_calls:
# 把大模型的回复加入上下文
messages.append(response_message)
# 遍历所有工具调用,依次执行
for tool_call in response_message.tool_calls:
tool_name = tool_call.function.name
tool_args = tool_call.function.arguments
call_id = tool_call.id
print(f"\n大模型调用工具:{tool_name},参数:{tool_args}")
# 调用MCP Server的工具
tool_result = await session.call_tool(
tool_name=tool_name,
arguments=eval(tool_args)
)
# 提取工具返回的内容
tool_content = "\n".join([content.text for content in tool_result.content])
print(f"工具执行结果:{tool_content}")
# 把工具执行结果加入上下文
messages.append({
"role": "tool",
"tool_call_id": call_id,
"content": tool_content
})
# 8. 把工具执行结果传给LLM,生成最终回答
final_response = await client.chat.completions.create(
model="gpt-3.5-turbo",
messages=messages,
temperature=0
)
final_answer = final_response.choices[0].message.content
return final_answer
# 不需要调用工具,直接返回大模型的回答
else:
return response_message.content
# ===================== 运行测试 =====================
async def main():
# 测试1:计算器工具调用
query1 = "请问 234*567+890 的结果是多少?"
answer1 = await run_mcp_agent(query1)
print(f"\n用户问题:{query1}")
print(f"最终回答:{answer1}")
# 测试2:天气查询工具调用
query2 = "2026年4月18日广州番禺的天气是多少?"
answer2 = await run_mcp_agent(query2)
print(f"\n用户问题:{query2}")
print(f"最终回答:{answer2}")
if __name__ == "__main__":
asyncio.run(main())
4.4.3 运行效果
先运行mcp_server.py,再运行mcp_client.py,你会看到:
- Client自动连接MCP Server,获取工具列表
- 大模型根据用户的问题,自动判断需要调用的工具,生成正确的参数
- Client调用MCP Server的工具,获取执行结果
- 大模型基于工具执行结果,生成最终的回答
恭喜你!你已经掌握了MCP协议的核心用法,现在,你可以接入任何支持MCP协议的工具,无需修改任何核心代码。
4.5 进阶实战:把MCP协议集成到LangGraph多Agent系统中
现在,我们把MCP协议集成到我们之前开发的LangGraph多Agent系统中,让所有的Agent都能一键使用MCP提供的无限工具能力。
核心修改非常简单,我们只需要在Agent的节点中,加入MCP Client的工具调用逻辑,就能让Agent拥有MCP的所有工具能力。这里我们实现一个集成了MCP工具的代码开发Agent,能自动调用MCP的文件系统工具,把生成的代码写入本地文件:
# 03_mcp_langgraph_integration.py
# 核心修改:在开发工程师节点中集成MCP工具调用,把生成的代码写入本地文件
async def developer_coding_with_mcp(state: MultiAgentState):
print("\n===== 【开发工程师】进行代码开发(集成MCP工具) =====")
prd_document = state["prd_document"]
# 1. 连接MCP Server,获取文件系统工具
server_params = StdioServerParameters(
command="npx",
args=["@modelcontextprotocol/server-filesystem", "./"] # 官方文件系统MCP Server,访问当前目录
)
async with stdio_client(server_params) as (read, write):
async with ClientSession(read, write) as session:
await session.initialize()
# 获取工具列表
tools_result = await session.list_tools()
available_tools = tools_result.tools
# 转换成OpenAI Function格式
openai_tools = []
for tool in available_tools:
openai_tools.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.inputSchema
}
})
# 2. 调用LLM生成代码,自动调用MCP工具写入文件
messages = [
{"role": "system", "content": DEVELOPER_PROMPT + "\n你可以调用文件系统工具,把生成的代码写入到本地文件中,文件名建议为user_management.py"},
{"role": "user", "content": f"产品需求文档(PRD):{prd_document}\n请你基于PRD,输出完整的Python代码,并且调用工具把代码写入到本地文件中"}
]
response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=openai_tools,
temperature=0
)
response_message = response.choices[0].message
code_content = response_message.content
# 处理工具调用
if response_message.tool_calls:
messages.append(response_message)
for tool_call in response_message.tool_calls:
tool_name = tool_call.function.name
tool_args = eval(tool_call.function.arguments)
print(f"调用MCP工具:{tool_name},参数:{tool_args}")
tool_result = await session.call_tool(tool_name=tool_name, arguments=tool_args)
tool_content = "\n".join([content.text for content in tool_result.content])
print(f"工具执行结果:{tool_content}")
messages.append({
"role": "tool",
"tool_call_id": tool_call.id,
"content": tool_content
})
# 获取最终结果
final_response = await client.chat.completions.create(
model="gpt-4o",
messages=messages,
temperature=0
)
code_content = final_response.choices[0].message.content
return {
"messages": [{"role": "assistant", "content": f"【开发工程师】开发完成的代码:\n{code_content}"}],
"current_role": "开发工程师",
"code_content": code_content
}
4.6 MCP协议生态推荐
这里给你推荐几个官方维护的、生产级可用的MCP Server,你可以直接接入,无需自己开发:
- 文件系统Server:
@modelcontextprotocol/server-filesystem,访问本地文件系统,读写文件 - GitHub Server:
@modelcontextprotocol/server-github,操作GitHub仓库,提交代码、创建PR、查询Issue - PostgreSQL Server:
@modelcontextprotocol/server-postgres,连接PostgreSQL数据库,执行SQL查询 - 搜索引擎Server:
@modelcontextprotocol/server-brave-search,调用Brave搜索引擎,获取实时网络信息 - 飞书/钉钉Server:官方提供的企业协作工具Server,发送消息、查询文档、处理审批
只需要修改MCP Client中的Server启动参数,就能一键接入这些工具,让你的Agent拥有无限的能力扩展。
5. 智能跃迁:Agent 规划与反思能力进阶(自我修正、深度思考)
在上篇中,我们实现了基础的ReAct框架,让Agent具备了「思考-行动-观察」的基础能力。但面对复杂任务,基础的ReAct很容易出现「规划跑偏、错误无法修正、陷入死循环」的问题,核心原因是Agent缺乏长期规划能力和自我反思修正能力。
这一节,我们就给Agent装上「深度思考的大脑」,实现工业级的规划与反思能力,让Agent从「只会按步骤执行」,跃迁到「会做全局规划、会自我复盘、会修正错误、会优化方案」的高级智能体。
5.1 规划与反思的核心价值
人类解决复杂问题的核心逻辑是:先做全局规划,拆解成可执行的步骤,执行过程中不断复盘反思,修正错误,优化方案,直到完成目标。
而规划与反思能力,就是让Agent模拟人类的这种思考模式,解决三大核心痛点:
- 解决规划跑偏问题:面对复杂任务,先做全局规划,拆解成里程碑,避免执行到一半发现方向完全错误
- 解决错误无法修正问题:执行过程中不断复盘,发现错误立刻修正,避免错误传递下去,导致最终结果完全不符合要求
- 解决死循环问题:通过反思,判断当前的方案是否可行,及时更换思路,避免陷入重复执行相同错误的死循环
5.2 核心框架1:Tree of Thoughts(ToT,思维树)深度规划框架
5.2.1 核心原理
基础的ReAct框架是「单一路径的线性思考」,就像人走迷宫,一条路走到黑,走错了只能从头再来。而ToT思维树框架,是「多路径的树状思考」,它会把复杂任务拆解成多个思考分支,每个分支评估可行性,选择最优的路径继续推进,就像人走迷宫时,会先看所有的岔路,评估哪条路最可能通向出口,再走进去。
ToT的核心执行流程:
- 任务拆解:把复杂任务拆解成多个思考步骤
- 分支生成:每个步骤生成多个可能的思考方向/解决方案
- 状态评估:对每个分支进行评估,打分判断可行性,淘汰无效分支
- 路径选择:选择评分最高的最优分支,继续推进
- 迭代优化:重复以上步骤,直到任务完成
5.2.2 从零实现ToT框架
# tot_framework.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
# 加载环境变量
load_dotenv()
# 初始化LLM
llm = ChatOpenAI(
model="gpt-4o",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0.7
)
class TreeOfThoughts:
def __init__(self, max_depth=3, branch_num=3, pass_threshold=0.7):
self.max_depth = max_depth # 思维树的最大深度
self.branch_num = branch_num # 每个节点生成的分支数量
self.pass_threshold = pass_threshold # 分支通过的评分阈值
def generate_thought_branches(self, current_state, task_goal, depth):
"""生成当前步骤的多个思考分支"""
prompt = f"""
你是一个专业的问题解决专家,当前正在处理任务:{task_goal}
当前执行深度:{depth},当前任务状态:{current_state}
请你生成{self.branch_num}个不同的、可行的下一步执行方案,每个方案必须明确、具体、可落地,能推进任务完成。
每个方案单独一行,用数字编号,不要有多余的内容。
"""
response = llm.invoke(prompt).content.strip()
branches = [line.strip() for line in response.split("\n") if line.strip()]
return branches[:self.branch_num]
def evaluate_branch(self, branch, current_state, task_goal):
"""评估每个分支的可行性,给出0-1的评分"""
prompt = f"""
你是一个专业的方案评估专家,当前任务目标:{task_goal}
当前任务状态:{current_state}
待评估的方案:{branch}
请你评估这个方案的可行性、合理性、对任务的推进作用,给出0-1的评分,0分表示完全不可行,1分表示非常优秀的方案。
你只需要输出评分数字,不要有任何多余的内容。
"""
response = llm.invoke(prompt).content.strip()
try:
score = float(response)
return max(0, min(1, score)) # 限制评分在0-1之间
except:
return 0
def select_best_branches(self, branches, current_state, task_goal):
"""筛选出评分高于阈值的最优分支"""
scored_branches = []
for branch in branches:
score = self.evaluate_branch(branch, current_state, task_goal)
if score >= self.pass_threshold:
scored_branches.append((score, branch))
# 按评分从高到低排序
scored_branches.sort(reverse=True, key=lambda x: x[0])
return [branch for score, branch in scored_branches]
def solve_task(self, task_goal):
"""执行ToT框架,解决任务"""
print(f"===== 启动思维树框架,任务目标:{task_goal} =====")
# 初始化根节点
current_state = "任务刚刚启动,还没有执行任何步骤"
depth = 0
final_solution = ""
while depth < self.max_depth:
depth += 1
print(f"\n----- 思维树深度 {depth}/{self.max_depth} -----")
# 1. 生成思考分支
branches = self.generate_thought_branches(current_state, task_goal, depth)
print(f"生成的{len(branches)}个思考分支:")
for i, branch in enumerate(branches, 1):
print(f"{i}. {branch}")
# 2. 评估并筛选最优分支
best_branches = self.select_best_branches(branches, current_state, task_goal)
if not best_branches:
print("所有分支均未通过评估,任务失败")
break
print(f"\n通过评估的最优分支:")
for i, branch in enumerate(best_branches, 1):
print(f"{i}. {branch}")
# 3. 选择评分最高的分支执行
selected_branch = best_branches[0]
print(f"\n选择执行的最优分支:{selected_branch}")
# 4. 执行分支,更新当前状态
prompt = f"""
任务目标:{task_goal}
当前状态:{current_state}
执行方案:{selected_branch}
请你执行这个方案,输出执行后的任务状态,明确说明当前任务的进展、已完成的内容、下一步需要做的事情。
如果任务已经完成,请输出「任务完成」,并附上最终的解决方案。
"""
execution_result = llm.invoke(prompt).content.strip()
current_state = execution_result
# 5. 判断任务是否完成
if "任务完成" in execution_result:
final_solution = execution_result
print(f"\n===== 任务完成,最终解决方案:\n{final_solution} =====")
return final_solution
# 达到最大深度,输出最终状态
if not final_solution:
final_solution = current_state
print(f"\n===== 达到最大深度,任务最终状态:\n{final_solution} =====")
return final_solution
# 测试ToT框架
if __name__ == "__main__":
tot = TreeOfThoughts(max_depth=4, branch_num=3, pass_threshold=0.6)
tot.solve_task("用Python开发一个简单的学生成绩管理系统,要求有增删改查功能,基于控制台交互,代码可直接运行")
5.3 核心框架2:Reflexion 自我反思修正框架
5.3.1 核心原理
Reflexion框架是由普林斯顿大学和Google DeepMind联合提出的,核心是让Agent模拟人类的「复盘反思」能力:在执行完一个步骤后,对结果进行复盘反思,判断是否有错误、哪里做得不好、怎么优化修正,然后基于反思结果,重新执行,迭代优化,直到输出符合要求的结果。
Reflexion的核心执行流程:
- 任务执行:Agent基于需求,生成初始结果
- 结果评估:对生成的结果进行评估,找出错误、不足、不符合要求的地方
- 反思总结:基于评估结果,总结反思,生成具体的修改优化方案
- 迭代修正:基于反思结果,重新生成/修改结果
- 循环终止:重复以上步骤,直到结果符合要求,或达到最大迭代次数
5.3.2 从零实现Reflexion框架
# reflexion_framework.py
import os
from dotenv import load_dotenv
from langchain_openai import ChatOpenAI
# 加载环境变量
load_dotenv()
# 初始化LLM
llm = ChatOpenAI(
model="gpt-4o",
api_key=os.getenv("OPENAI_API_KEY"),
base_url=os.getenv("OPENAI_BASE_URL"),
temperature=0
)
class ReflexionAgent:
def __init__(self, max_iterations=3):
self.max_iterations = max_iterations # 最大迭代次数
self.reflection_history = [] # 反思历史
def generate_initial_result(self, task_requirement):
"""生成初始结果"""
prompt = f"""
请你完成以下任务,输出完整的结果。
任务要求:{task_requirement}
"""
return llm.invoke(prompt).content.strip()
def evaluate_result(self, task_requirement, current_result):
"""评估结果,找出错误和不足"""
prompt = f"""
你是一个专业的结果评估专家,请你严格按照任务要求,评估当前的结果,找出所有的错误、不足、不符合要求的地方。
任务要求:{task_requirement}
当前结果:{current_result}
请你输出详细的评估报告,明确指出:
1. 符合要求的地方
2. 存在的错误、不足、不符合要求的地方,每个问题必须具体、明确
3. 具体的修改优化建议
如果结果完全符合要求,请输出「评估通过」,不要有多余的内容。
"""
return llm.invoke(prompt).content.strip()
def generate_reflection(self, evaluation_report):
"""基于评估报告,生成反思总结"""
prompt = f"""
请你基于以下评估报告,进行深刻的反思总结,明确说明:
1. 之前的结果存在的核心问题是什么
2. 导致这些问题的原因是什么
3. 接下来的修改优化的具体步骤和方案
评估报告:{evaluation_report}
"""
reflection = llm.invoke(prompt).content.strip()
self.reflection_history.append(reflection)
return reflection
def revise_result(self, task_requirement, current_result, reflection):
"""基于反思结果,修改优化结果"""
prompt = f"""
请你基于以下反思总结,修改优化当前的结果,严格按照任务要求,修正所有错误,弥补不足。
任务要求:{task_requirement}
当前结果:{current_result}
反思总结:{reflection}
之前的反思历史:{self.reflection_history}
请你输出修改优化后的完整结果,不要只输出修改的部分。
"""
return llm.invoke(prompt).content.strip()
def run(self, task_requirement):
"""启动Reflexion Agent,执行任务"""
print(f"===== 启动Reflexion反思Agent,任务要求:{task_requirement} =====")
# 1. 生成初始结果
current_result = self.generate_initial_result(task_requirement)
iteration = 0
print(f"\n----- 第{iteration}次迭代,初始结果生成完成 -----")
print(f"初始结果:\n{current_result}")
while iteration < self.max_iterations:
iteration += 1
print(f"\n----- 第{iteration}次迭代,开始评估结果 -----")
# 2. 评估结果
evaluation_report = self.evaluate_result(task_requirement, current_result)
print(f"评估报告:\n{evaluation_report}")
# 3. 判断是否评估通过
if "评估通过" in evaluation_report:
print(f"\n===== 任务完成,结果评估通过,最终结果:\n{current_result} =====")
return current_result
# 4. 生成反思总结
reflection = self.generate_reflection(evaluation_report)
print(f"\n反思总结:\n{reflection}")
# 5. 修改优化结果
current_result = self.revise_result(task_requirement, current_result, reflection)
print(f"\n修改优化后的结果:\n{current_result}")
# 达到最大迭代次数,输出最终结果
print(f"\n===== 达到最大迭代次数,输出最终结果:\n{current_result} =====")
return current_result
# 测试Reflexion Agent
if __name__ == "__main__":
agent = ReflexionAgent(max_iterations=3)
agent.run("写一个Python函数,实现快速排序算法,要求有详细的注释,处理边界情况,包含测试用例,代码可直接运行")
5.4 框架集成:把规划与反思能力集成到LangGraph工作流中
现在,我们把ToT和Reflexion框架集成到我们之前开发的LangGraph多Agent系统中,让Agent具备深度规划和自我修正能力。核心修改非常简单:
- 在项目经理节点中,集成ToT框架,做全局的项目规划,选择最优的执行方案
- 在每个角色的节点中,集成Reflexion框架,对输出的结果进行自我反思和修正,确保交付物的质量
- 在审核节点中,加入多轮反思修正逻辑,审核不通过时,自动触发反思和修改,直到通过审核
6. 全链路落地:从零搭建企业级生产Agent平台(前后端+部署)
到这里,我们已经掌握了多Agent协作、工作流引擎、工具生态、高级智能所有的核心能力。现在,我们要把这些能力整合起来,从零搭建一个完整的、企业级可落地的AI Agent平台,实现从Demo到生产的最终跃迁。
6.1 生产级Agent平台的核心架构
我们采用业界标准的分层架构,保证系统的稳定性、可扩展性、可维护性,分为5个核心层级:
| 层级 | 核心作用 | 技术选型 |
|---|---|---|
| 接入层 | 用户认证、权限控制、API网关、限流熔断 | FastAPI、JWT认证、接口限流 |
| 应用层 | Agent管理、会话管理、任务调度、工作流引擎 | LangGraph、异步任务调度 |
| 能力层 | LLM接入、工具管理、MCP协议集成、记忆管理 | OpenAI API、MCP Client、Chroma向量数据库 |
| 存储层 | 关系型数据库、向量数据库、缓存 | SQLite/MySQL、Chroma、Redis |
| 前端层 | 用户交互界面、会话窗口、Agent配置、日志监控 | Streamlit(零基础快速开发) |
6.2 环境准备
安装生产级平台所需的所有依赖:
pip install fastapi uvicorn streamlit sqlalchemy python-multipart python-jose[cryptography] passlib python-dotenv langgraph langchain-openai chromadb mcp
6.3 第一步:后端API开发(FastAPI)
我们先开发后端API,实现用户认证、会话管理、Agent执行、日志查询等核心功能,创建backend/main.py:
# backend/main.py
import os
from dotenv import load_dotenv
from datetime import datetime, timedelta
from typing import Optional
from fastapi import FastAPI, Depends, HTTPException, status
from fastapi.security import OAuth2PasswordBearer, OAuth2PasswordRequestForm
from jose import JWTError, jwt
from passlib.context import CryptContext
from pydantic import BaseModel
from sqlalchemy import create_engine, Column, Integer, String, Text, DateTime, ForeignKey
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker, Session
from langchain_openai import ChatOpenAI
from agent import IntelligentAgent
# 加载环境变量
load_dotenv()
# ===================== 配置项 =====================
SECRET_KEY = os.getenv("SECRET_KEY", "your-secret-key-here-change-in-production")
ALGORITHM = "HS256"
ACCESS_TOKEN_EXPIRE_MINUTES = 30
# 数据库配置
SQLALCHEMY_DATABASE_URL = "sqlite:///./agent_platform.db"
engine = create_engine(SQLALCHEMY_DATABASE_URL, connect_args={"check_same_thread": False})
SessionLocal = sessionmaker(autocommit=False, autoflush=False, bind=engine)
Base = declarative_base()
# 密码加密上下文
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
oauth2_scheme = OAuth2PasswordBearer(tokenUrl="token")
# 初始化FastAPI应用
app = FastAPI(title="企业级AI Agent平台", version="1.0.0")
# ===================== 数据库模型定义 =====================
class DBUser(Base):
__tablename__ = "users"
id = Column(Integer, primary_key=True, index=True)
username = Column(String, unique=True, index=True, nullable=False)
hashed_password = Column(String, nullable=False)
email = Column(String, unique=True, index=True)
created_at = Column(DateTime, default=datetime.now)
class DBSession(Base):
__tablename__ = "sessions"
id = Column(Integer, primary_key=True, index=True)
user_id = Column(Integer, ForeignKey("users.id"), nullable=False)
session_name = Column(String, nullable=False, default="新对话")
created_at = Column(DateTime, default=datetime.now)
updated_at = Column(DateTime, default=datetime.now, onupdate=datetime.now)
class DBMessage(Base):
__tablename__ = "messages"
id = Column(Integer, primary_key=True, index=True)
session_id = Column(Integer, ForeignKey("sessions.id"), nullable=False)
role = Column(String, nullable=False) # user/assistant
content = Column(Text, nullable=False)
created_at = Column(DateTime, default=datetime.now)
# 创建数据库表
Base.metadata.create_all(bind=engine)
# ===================== Pydantic模型定义 =====================
class UserCreate(BaseModel):
username: str
password: str
email: Optional[str] = None
class Token(BaseModel):
access_token: str
token_type: str
class TokenData(BaseModel):
username: Optional[str] = None
class SessionCreate(BaseModel):
session_name: Optional[str] = "新对话"
class MessageCreate(BaseModel):
session_id: int
content: str
# ===================== 工具函数 =====================
# 数据库依赖
def get_db():
db = SessionLocal()
try:
yield db
finally:
db.close()
# 密码校验
def verify_password(plain_password, hashed_password):
return pwd_context.verify(plain_password, hashed_password)
# 密码加密
def get_password_hash(password):
return pwd_context.hash(password)
# 获取用户
def get_user(db: Session, username: str):
return db.query(DBUser).filter(DBUser.username == username).first()
# 认证用户
def authenticate_user(db: Session, username: str, password: str):
user = get_user(db, username)
if not user:
return False
if not verify_password(password, user.hashed_password):
return False
return user
# 创建访问令牌
def create_access_token(data: dict, expires_delta: Optional[timedelta] = None):
to_encode = data.copy()
if expires_delta:
expire = datetime.utcnow() + expires_delta
else:
expire = datetime.utcnow() + timedelta(minutes=15)
to_encode.update({"exp": expire})
encoded_jwt = jwt.encode(to_encode, SECRET_KEY, algorithm=ALGORITHM)
return encoded_jwt
# 获取当前登录用户
async def get_current_user(token: str = Depends(oauth2_scheme), db: Session = Depends(get_db)):
credentials_exception = HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="无法验证凭据",
headers={"WWW-Authenticate": "Bearer"},
)
try:
payload = jwt.decode(token, SECRET_KEY, algorithms=[ALGORITHM])
username: str = payload.get("sub")
if username is None:
raise credentials_exception
token_data = TokenData(username=username)
except JWTError:
raise credentials_exception
user = get_user(db, username=token_data.username)
if user is None:
raise credentials_exception
return user
# ===================== API接口 =====================
# 用户注册
@app.post("/register", status_code=status.HTTP_201_CREATED)
def register(user: UserCreate, db: Session = Depends(get_db)):
db_user = get_user(db, username=user.username)
if db_user:
raise HTTPException(status_code=400, detail="用户名已存在")
hashed_password = get_password_hash(user.password)
db_user = DBUser(
username=user.username,
hashed_password=hashed_password,
email=user.email
)
db.add(db_user)
db.commit()
db.refresh(db_user)
return {"message": "用户注册成功", "username": db_user.username}
# 用户登录,获取令牌
@app.post("/token", response_model=Token)
def login(form_data: OAuth2PasswordRequestForm = Depends(), db: Session = Depends(get_db)):
user = authenticate_user(db, form_data.username, form_data.password)
if not user:
raise HTTPException(
status_code=status.HTTP_401_UNAUTHORIZED,
detail="用户名或密码错误",
headers={"WWW-Authenticate": "Bearer"},
)
access_token_expires = timedelta(minutes=ACCESS_TOKEN_EXPIRE_MINUTES)
access_token = create_access_token(
data={"sub": user.username}, expires_delta=access_token_expires
)
return {"access_token": access_token, "token_type": "bearer"}
# 获取当前用户信息
@app.get("/users/me")
def read_users_me(current_user: DBUser = Depends(get_current_user)):
return {"username": current_user.username, "email": current_user.email, "created_at": current_user.created_at}
# 创建新的对话会话
@app.post("/sessions")
def create_session(
session: SessionCreate,
current_user: DBUser = Depends(get_current_user),
db: Session = Depends(get_db)
):
db_session = DBSession(
user_id=current_user.id,
session_name=session.session_name
)
db.add(db_session)
db.commit()
db.refresh(db_session)
return {"session_id": db_session.id, "session_name": db_session.session_name, "created_at": db_session.created_at}
# 获取用户的所有会话
@app.get("/sessions")
def get_user_sessions(
current_user: DBUser = Depends(get_current_user),
db: Session = Depends(get_db)
):
sessions = db.query(DBSession).filter(DBSession.user_id == current_user.id).order_by(DBSession.updated_at.desc()).all()
return [{"session_id": s.id, "session_name": s.session_name, "created_at": s.created_at, "updated_at": s.updated_at} for s in sessions]
# 获取会话的所有消息
@app.get("/sessions/{session_id}/messages")
def get_session_messages(
session_id: int,
current_user: DBUser = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 校验会话归属
session = db.query(DBSession).filter(DBSession.id == session_id, DBSession.user_id == current_user.id).first()
if not session:
raise HTTPException(status_code=404, detail="会话不存在")
# 获取消息
messages = db.query(DBMessage).filter(DBMessage.session_id == session_id).order_by(DBMessage.created_at.asc()).all()
return [{"id": m.id, "role": m.role, "content": m.content, "created_at": m.created_at} for m in messages]
# 发送消息,执行Agent
@app.post("/messages")
def send_message(
message: MessageCreate,
current_user: DBUser = Depends(get_current_user),
db: Session = Depends(get_db)
):
# 校验会话归属
session = db.query(DBSession).filter(DBSession.id == message.session_id, DBSession.user_id == current_user.id).first()
if not session:
raise HTTPException(status_code=404, detail="会话不存在")
# 保存用户消息
user_message = DBMessage(
session_id=message.session_id,
role="user",
content=message.content
)
db.add(user_message)
db.commit()
# 初始化Agent,执行用户请求
agent = IntelligentAgent(user_id=f"user_{current_user.id}")
assistant_content = agent.run(message.content)
# 保存助手消息
assistant_message = DBMessage(
session_id=message.session_id,
role="assistant",
content=assistant_content
)
db.add(assistant_message)
db.commit()
# 更新会话的更新时间
session.updated_at = datetime.now()
db.commit()
return {
"user_message": {"id": user_message.id, "content": user_message.content, "created_at": user_message.created_at},
"assistant_message": {"id": assistant_message.id, "content": assistant_message.content, "created_at": assistant_message.created_at}
}
# 启动后端服务
if __name__ == "__main__":
import uvicorn
uvicorn.run(app, host="0.0.0.0", port=8000)
6.4 第二步:前端界面开发(Streamlit)
我们用Streamlit开发一个简单易用的前端界面,实现用户登录、会话管理、聊天交互等功能,创建frontend/app.py:
# frontend/app.py
import streamlit as st
import requests
from datetime import datetime
# 后端API地址
BACKEND_URL = "http://localhost:8000"
# ===================== 页面配置 =====================
st.set_page_config(
page_title="企业级AI Agent平台",
page_icon="🤖",
layout="wide"
)
# ===================== 状态管理 =====================
if "access_token" not in st.session_state:
st.session_state.access_token = None
if "current_session_id" not in st.session_state:
st.session_state.current_session_id = None
if "username" not in st.session_state:
st.session_state.username = None
# ===================== 工具函数 =====================
def get_headers():
return {"Authorization": f"Bearer {st.session_state.access_token}"}
def register_user(username, password, email):
try:
response = requests.post(
f"{BACKEND_URL}/register",
json={"username": username, "password": password, "email": email}
)
response.raise_for_status()
return True, response.json()["message"]
except Exception as e:
return False, str(e)
def login_user(username, password):
try:
response = requests.post(
f"{BACKEND_URL}/token",
data={"username": username, "password": password}
)
response.raise_for_status()
data = response.json()
st.session_state.access_token = data["access_token"]
st.session_state.username = username
return True, "登录成功"
except Exception as e:
return False, str(e)
def create_session(session_name="新对话"):
try:
response = requests.post(
f"{BACKEND_URL}/sessions",
json={"session_name": session_name},
headers=get_headers()
)
response.raise_for_status()
data = response.json()
st.session_state.current_session_id = data["session_id"]
return True, data
except Exception as e:
return False, str(e)
def get_user_sessions():
try:
response = requests.get(
f"{BACKEND_URL}/sessions",
headers=get_headers()
)
response.raise_for_status()
return response.json()
except Exception as e:
return []
def get_session_messages(session_id):
try:
response = requests.get(
f"{BACKEND_URL}/sessions/{session_id}/messages",
headers=get_headers()
)
response.raise_for_status()
return response.json()
except Exception as e:
return []
def send_message(session_id, content):
try:
response = requests.post(
f"{BACKEND_URL}/messages",
json={"session_id": session_id, "content": content},
headers=get_headers()
)
response.raise_for_status()
return True, response.json()
except Exception as e:
return False, str(e)
# ===================== 页面渲染 =====================
# 未登录状态,显示登录/注册页面
if not st.session_state.access_token:
st.title("🤖 企业级AI Agent平台")
tab1, tab2 = st.tabs(["登录", "注册"])
with tab1:
st.header("用户登录")
login_username = st.text_input("用户名", key="login_username")
login_password = st.text_input("密码", type="password", key="login_password")
if st.button("登录", type="primary", use_container_width=True):
success, msg = login_user(login_username, login_password)
if success:
st.success(msg)
st.rerun()
else:
st.error(msg)
with tab2:
st.header("用户注册")
reg_username = st.text_input("用户名", key="reg_username")
reg_password = st.text_input("密码", type="password", key="reg_password")
reg_email = st.text_input("邮箱(可选)", key="reg_email")
if st.button("注册", type="secondary", use_container_width=True):
success, msg = register_user(reg_username, reg_password, reg_email)
if success:
st.success(msg)
else:
st.error(msg)
# 已登录状态,显示主界面
else:
# 侧边栏:会话管理
with st.sidebar:
st.title(f"🤖 欢迎,{st.session_state.username}")
# 新建对话按钮
if st.button("➕ 新建对话", type="primary", use_container_width=True):
success, _ = create_session()
if success:
st.rerun()
st.divider()
# 会话列表
st.subheader("我的对话")
sessions = get_user_sessions()
for session in sessions:
if st.button(
session["session_name"],
key=f"session_{session['session_id']}",
use_container_width=True,
type="primary" if session["session_id"] == st.session_state.current_session_id else "secondary"
):
st.session_state.current_session_id = session["session_id"]
st.rerun()
st.divider()
# 退出登录
if st.button("退出登录", type="secondary", use_container_width=True):
for key in st.session_state.keys():
del st.session_state[key]
st.rerun()
# 主界面:聊天窗口
if not st.session_state.current_session_id:
# 没有选中会话,显示欢迎页
st.title("🤖 企业级AI Agent平台")
st.markdown("### 欢迎使用AI Agent平台,这里有你需要的所有能力:")
st.markdown("""
- ✅ 多Agent智能协作,完成复杂任务
- ✅ 无限工具生态,基于MCP协议一键接入
- ✅ 长期记忆管理,记住你的偏好和历史
- ✅ 深度规划与反思,自我修正优化
- ✅ 企业级安全架构,数据隔离与权限控制
""")
if st.button("开始你的第一个对话", type="primary"):
success, _ = create_session()
if success:
st.rerun()
else:
# 选中会话,显示聊天窗口
messages = get_session_messages(st.session_state.current_session_id)
# 显示所有消息
for msg in messages:
with st.chat_message(msg["role"]):
st.markdown(msg["content"])
# 输入框
user_input = st.chat_input("请输入你的需求...")
if user_input:
# 显示用户消息
with st.chat_message("user"):
st.markdown(user_input)
# 发送消息,执行Agent
with st.chat_message("assistant"):
with st.spinner("Agent正在思考和执行中..."):
success, result = send_message(st.session_state.current_session_id, user_input)
if success:
st.markdown(result["assistant_message"]["content"])
else:
st.error(f"执行失败:{result}")
6.5 第三步:启动平台,本地运行
- 先启动后端服务:
cd backend
python main.py
后端服务会在 http://localhost:8000 启动,你可以访问 http://localhost:8000/docs 查看API文档。
- 再启动前端服务:
cd frontend
streamlit run app.py
前端服务会在 http://localhost:8501 启动,你可以在浏览器中访问这个地址,注册账号,登录使用,体验完整的AI Agent平台。
6.6 生产级部署指南
6.6.1 Docker容器化部署
我们可以把前后端打包成Docker镜像,实现一键部署,这里给你核心的Dockerfile配置:
后端Dockerfile(backend/Dockerfile)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8000
CMD ["python", "main.py"]
前端Dockerfile(frontend/Dockerfile)
FROM python:3.11-slim
WORKDIR /app
COPY requirements.txt .
RUN pip install --no-cache-dir -r requirements.txt
COPY . .
EXPOSE 8501
CMD ["streamlit", "run", "app.py", "--server.address=0.0.0.0"]
6.6.2 生产级优化要点
- 数据库替换:把SQLite替换为MySQL/PostgreSQL,保证数据的稳定性和并发能力
- 缓存优化:接入Redis,实现会话缓存、接口限流、LLM结果缓存
- 安全加固:HTTPS加密、密钥管理、输入校验、SQL注入防护
- 监控告警:接入Prometheus+Grafana,监控接口响应时间、LLM调用成本、系统资源占用
- 异步优化:把Agent执行改成异步任务,避免接口超时,用Celery实现任务调度
- 高可用部署:用K8s实现容器编排,多副本部署,保证服务的高可用性
7. 常见问题 FAQ & 附加篇预告 & 互动环节
7.1 常见问题 FAQ
Q1:多Agent之间经常出现通信混乱、职责重叠、互相推诿的情况,怎么办?
A1:核心解决方法:
- 严格遵循单一职责原则,每个Agent的职责边界必须绝对清晰,在System Prompt中明确「能做什么、不能做什么」
- 制定严格的准入准出标准,每个Agent的输出必须符合固定的格式和标准,才能传递给下一个Agent
- 设立唯一的主管Agent,所有的任务分配、协调、冲突解决都由主管Agent负责,避免多Agent平等协商导致的混乱
- 精简Agent角色,能用2个Agent完成的任务,绝对不要用5个,角色越多,通信成本越高,越容易混乱
Q2:多Agent调用的LLM成本很高,怎么优化成本?
A2:成本优化核心方案:
- 模型分级使用:简单的任务(比如格式校验、简单分类)用轻量模型(GPT-3.5-turbo、Qwen-7B),复杂的任务(代码开发、需求分析)用强模型(GPT-4o、Claude 3)
- 上下文优化:用我们上篇实现的短期记忆管理器,过滤掉无用的上下文,只保留核心信息,减少Token消耗
- 结果缓存:相同的问题、相同的工具调用结果,做缓存处理,避免重复调用LLM
- 限制迭代次数:给每个Agent设置最大迭代次数,避免死循环导致的无限Token消耗
- 批量处理:把多个小的请求合并成一个批量请求,减少调用次数
Q3:Agent执行延迟很高,用户体验不好,怎么优化?
A3:延迟优化核心方案:
- 流式输出:所有的LLM调用都采用流式输出,前端实时展示,不用等完整结果返回,大幅提升用户体验
- 并行执行:没有依赖关系的子任务,用LangGraph的并行节点同时执行,减少总执行时间
- 异步执行:把Agent的长耗时任务改成异步执行,前端先返回任务ID,轮询查询执行进度,避免接口超时
- 模型选型:在效果满足要求的前提下,优先选择推理速度快的模型,比如GPT-4o比GPT-4快很多
- 工具调用优化:减少不必要的工具调用,工具返回的结果做精简,只保留核心信息,减少LLM处理时间
Q4:没有OpenAI API Key,能用开源大模型适配这套系统吗?
A4:完全可以!这套系统的所有模块,都和底层LLM解耦,你只需要修改llm.py中的LLM调用函数,换成支持Function Calling的开源大模型即可,比如Llama 3、Qwen 2、GLM 4、通义千问开源版,所有的上层代码都不用修改。目前主流的开源大模型都已经原生支持Function Calling,完全可以满足多Agent开发的需求。
Q5:生产环境中,怎么保证用户数据的安全和隐私?
A5:生产级数据安全方案:
- 数据隔离:每个用户的记忆、会话、数据完全隔离,通过用户ID严格控制访问权限
- 敏感信息脱敏:用户输入中的手机号、身份证号、银行卡号等敏感信息,先做脱敏处理,再传给LLM
- 加密存储:用户的敏感数据、对话历史、长期记忆,都做加密存储,避免数据泄露
- 私有化部署:如果对数据安全要求极高,可以把大模型、向量数据库、整个平台都做私有化部署,所有数据都留在自己的服务器上
- 合规审计:所有的LLM调用、工具调用、用户操作,都做完整的日志审计,可追溯、可排查
7.2 附加篇预告
恭喜你!看到这里,你已经完整掌握了从单Agent开发、多Agent协作、工作流引擎、工具生态、高级智能到生产级平台搭建的全链路能力,已经具备了企业级AI Agent全栈开发能力,超过了90%的AI Agent从业者!
在附加篇中,我会给你带来《AI Agent面试八股文全集》,覆盖AI Agent面试的所有核心考点,包括:
- 基础原理篇:AI Agent的核心定义、与普通大模型的区别、六大核心组件、ReAct框架核心原理
- 框架篇:LangChain、LangGraph、AutoGPT等主流框架的核心原理、区别、优缺点、落地场景
- 工程落地篇:多Agent协作模式、工具调用优化、记忆管理方案、生产级部署架构、成本优化
- 算法进阶篇:ToT、Reflexion、CRITIC等高级框架的核心原理、实现方式、优缺点
- 场景题篇:互联网大厂高频面试场景题,从零设计一个企业级Agent系统、解决Agent落地的核心问题
- HR面篇:AI Agent项目经历的梳理、亮点提炼、难点拆解,帮你拿下心仪的Offer
附加篇会在近期更新,关注我,更新第一时间通知!
7.3 互动环节
学习过程中遇到任何问题,都可以在评论区留言,我会一一解答!
同时,欢迎在评论区告诉我:
- 你在多Agent开发过程中,遇到的最大的坑是什么?
- 你想基于这套系统,开发一个什么样的AI Agent产品?
- 附加篇的面试八股文,你最想先看哪部分的内容?
- 关注我,后续会带来更多AI Agent的深度教程和实战项目,带你从零基础到AI Agent全栈专家!
版权声明:本文为原创内容,未经授权禁止转载,商用必究。
更多推荐




所有评论(0)