一、为什么需要多智能体?

        传统的单体 Chatbot 把所有业务逻辑塞进一个 Prompt 里,当功能膨胀(商品查询、下单、取消、支付、物流),Prompt 会变得臃肿、难调、不可维护。多智能体架构将不同职责拆分为独立的 Agent,每个 Agent 拥有专属的工具集和系统提示,由“主管”负责路由——就像公司里不同部门各司其职。

本文带你实现一个电商智能助手,包含:

  • 商品智能体(查询列表、详情)

  • 订单操作智能体(创建订单、取消订单)——需要人工确认

  • 订单查询智能体(查询单个订单、订单列表)——无中断,直接返回

  • 通用搜索智能体(处理其他问题)

并且通过 FastAPI 发布为 Web 服务,支持多会话、中断恢复、历史查询。

关键设计:

  • 读写分离:订单写操作(创建/取消)需要人工确认,读操作(查询)直接返回。

  • 中断机制:利用 LangGraph 的 interrupt_before 在写操作前暂停,等待用户通过 /confirm 接口确认。

  • 用户ID注入:通过 RunnableConfig 传递,工具内用 get_config() 获取,用户无需手动输入。

  • 消息摘要:自动压缩长对话历史,避免 token 超

核心代码解析

状态定义

class CustomState(TypedDict):
    messages: Annotated[list, add_messages]
    intent: str
    sub_intent: str

模拟数据库工具

class InMemoryDB:
    def __init__(self):
        self.products = {
            "p001": {"id": "p001", "name": "智能手机X", "price": 2999.00, "stock": 50},
            "p002": {"id": "p002", "name": "无线耳机Pro", "price": 499.00, "stock": 120},
            "p003": {"id": "p003", "name": "智能手表", "price": 1299.00, "stock": 30}
        }
        self.orders = {}
        self.order_counter = 0

    def get_all_products(self):
        return list(self.products.values())

    def get_product(self, product_id):
        return self.products.get(product_id)

    def reduce_stock(self, product_id, quantity):
        product = self.products.get(product_id)
        if product and product["stock"] >= quantity:
            product["stock"] -= quantity
            return True
        return False

    def create_order(self, user_id, product_id, quantity, total):
        self.order_counter += 1
        order = {
            "order_id": f"ORD{self.order_counter}",
            "user_id": user_id,
            "product_id": product_id,
            "quantity": quantity,
            "total_amount": total,
            "status": "pending",
            "created_at": datetime.now().isoformat()
        }
        self.orders[order["order_id"]] = order
        return order

    def get_order(self, order_id):
        return self.orders.get(order_id)

    def get_all_orders(self, user_id):
        return [order for order in self.orders.values() if order["user_id"] == user_id]

    def cancel_order(self, order_id, user_id):
        order = self.orders.get(order_id)
        if not order:
            return f"❌ 订单{order_id}不存在"
        if order["user_id"] != user_id:
            return f"❌ 订单{order_id}不属于您"
        if order["status"] == "pending":
            # 恢复库存
            product = self.products.get(order["product_id"])
            if product:
                product["stock"] += order["quantity"]
            order["status"] = "canceled"
            return f"✅ 订单{order_id}已取消"
        return f"❌ 订单{order_id}不存在或已发货不能取消"


db = InMemoryDB()

工具定义

@tool
def list_products_tool() -> str:
    """查询所有商品列表"""
    products = db.get_all_products()
    if not products:
        return "暂无商品"
    result = "【商品列表】\n"
    for p in products:
        result += f"- {p['name']} (ID:{p['id']}) | ¥{p['price']} | 库存:{p['stock']}\n"
    return result


@tool
def get_product_tool(product_id: str) -> str:
    """根据商品ID查询单个商品详情。"""
    product = db.get_product(product_id)
    if not product:
        return f"未找到商品Id为{product_id}的商品"
    return f"【商品详情】\n名称:{product['name']}\n价格:¥{product['price']}\n库存:{product['stock']}"


@tool
def create_order_tool(product_id: str, quantity: int) -> str:
    """根据商品ID和数量创建订单,并返回订单详情。"""
    config = get_config()
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "❌ 无法获取用户ID,请重新登录"
    product = db.get_product(product_id)
    if not product:
        return f"订单创建失败,商品{product_id}不存在"
    if product["stock"] < quantity:
        return f"订单创建失败,{product['name']}库存不足,当前库存:{product['stock']}"
    total = product["price"] * quantity
    db.reduce_stock(product_id, quantity)
    order = db.create_order(user_id, product_id, quantity, total)
    return f"✅ 订单创建成功!订单号:{order['order_id']},总金额:¥{total},状态:{order['status']}"


@tool
def get_order_tool(order_id: str) -> str:
    """根据订单id查询订单详情"""
    order = db.get_order(order_id)
    if not order:
        return f"未找到订单号{order_id}的订单"
    product = db.get_product(order["product_id"])
    product_name = product["name"] if product else order["product_id"]
    return f"""【订单详情】
        订单号:{order['order_id']}
        商品:{product_name}
        数量:{order['quantity']}
        总金额:¥{order['total_amount']}
        状态:{order['status']}
        下单时间:{order['created_at']}"""

@tool
def get_all_orders_tool() -> str:
    """查询用户所有订单"""
    config = get_config()
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "❌ 无法获取用户ID,请重新登录"
    orders = db.get_all_orders(user_id)
    if not orders:
        return "暂无订单"
    result = "【订单列表】\n"
    for order in orders:
        result += f"-  {order['order_id']} | {order['product_id']} | {order['quantity']} | ¥{order['total_amount']} | {order['status']}\n"
    return result

@tool
def cancel_order_tool(order_id: str) ->str:
    """取消订单"""
    config = get_config()
    user_id = config.get("configurable", {}).get("user_id")
    if not user_id:
        return "❌ 无法获取用户ID,请重新登录"
    return db.cancel_order(order_id, user_id)

@tool
def search_tool(query: str):
    """互联网搜索工具"""
    try:
        response = zhipuai_client.web_search.web_search(search_engine="search_pro", search_query=query)
        if response.search_result:
            return "\n\n".join([result.content for result in response.search_result])
        return "没有搜索到任何内容"
    except Exception as e:
        print(e)
        return "搜索出现错误"

消息摘要(防止消息过长超过大模型限制)

def summarize_messages(messages:list, max_tokens: int = 2000) -> list:
    """如果消息总长度超过阈值,将旧消息压缩为摘要,保留最近几条原始消息"""
    total_len = sum(len(m.content) for m in messages if hasattr(m, "content"))
    if total_len < max_tokens:
        return messages
    #保留最近3条完整消息其余做摘要
    keep_recent = 3
    recent = messages[-keep_recent:]
    old = messages[:-keep_recent]
    prompt = f"""请将以下对话历史压缩成一段简洁的中文摘要(保留关键实体:商品名称、订单号、用户意图、已完成的操作):
    {chr(10).join(f"{'用户' if isinstance(m, HumanMessage) else 'AI'}:{m.content}" for m in old)}
    摘要:
    """
    summary = llm.invoke(prompt).content
    summary_message = SystemMessage(content=f"[对话历史摘要]{summary}")
    return [summary_message] + recent

创建智能体

product_agent = create_agent(
    llm,
    tools=[list_products_tool, get_product_tool],
    system_prompt="你是商品智能体,负责回答所有与商品相关的问题,如查询列表、价格或库存。当用户询问商品列表、价格、库存时,你必须调用工具查询最新的数据。",
    name="product_expert"
)

handle_order_agent = create_agent(
    llm,
    tools=[create_order_tool, cancel_order_tool,list_products_tool],
    system_prompt="""你是订单操作智能体,负责处理订单创建和取消等相关操作。
重要:用户ID会自动从系统获取,你不需要询问用户ID。
如果用户提供了商品名称(如“智能手表”),你必须先调用 list_products_tool 获取商品ID。
如果用户未提供数量,默认数量为1。""",
    name="order_expert",
)

order_query_agent = create_agent(
    llm,
    tools=[get_order_tool, get_all_orders_tool],
    system_prompt="你是订单查询智能体,只负责查询订单详情或订单列表。无需询问用户ID,直接调用工具查询并返回结果。",
    name="order_read_expert"
)

search_agent = create_agent(
    llm,
    tools=[search_tool],
    system_prompt="你是一个智能助手,负责回答用户关于商品和订单以外的所有问题",
    name="search_export"
)

图节点

def supervisor_node(state: CustomState) -> CustomState:
    last_msg = state["messages"][-1]
    intent_prompt = """根据用户输入,判断应该由哪个智能体处理:
        - 如果用户询问商品信息(查询商品列表、商品详情、价格、库存),回复 "product"
        - 如果用户想下单购买或查询订单(创建订单、查询订单状态),回复 "order"
        - 其它都回复 "other"
        用户输入: {user_input}
        只回复 product、other 或 order,不要有其他内容。"""
    response = llm.invoke(intent_prompt.format(user_input=last_msg.content))
    intent = response.content.strip().lower()
    state["intent"] = intent
    return state

def order_supervisor_node(state: CustomState) -> CustomState:
    """订单总入口:判断是操作订单(下单/取消)还是查询订单"""
    last_msg = state["messages"][-1]
    intent_prompt = """判断用户意图属于以下哪一类:
    - 如果用户想创建订单(买、下单、购买),回复 "write"
    - 如果用户想取消订单(取消订单、退单),回复 "write"
    - 如果用户想查询订单(查订单、订单详情、订单列表),回复 "read"
    只回复 write 或 read。
    用户输入: {user_input}
    """
    response = llm.invoke(intent_prompt.format(user_input=last_msg.content))
    intent = response.content.strip().lower()
    state["sub_intent"] = intent
    return state

def product_agent_node(state: CustomState, config: RunnableConfig) -> CustomState:
    compressed = summarize_messages(state["messages"], max_tokens=2000)
    result = product_agent.invoke({"messages": compressed}, config=config)
    state["messages"] = result["messages"]
    return state

def order_handle_agent_node(state: CustomState, config: RunnableConfig) -> CustomState:
    compressed = summarize_messages(state["messages"], max_tokens=2000)
    result = handle_order_agent.invoke({"messages": compressed}, config=config)
    state["messages"] = result["messages"]
    return state

def order_query_agent_node(state: CustomState, config: RunnableConfig) -> CustomState:
    compressed = summarize_messages(state["messages"], max_tokens=2000)
    result = order_query_agent.invoke({"messages": compressed}, config=config)
    state["messages"] = result["messages"]
    return state

def other_agent_node(state: CustomState, config: RunnableConfig) -> CustomState:
    compressed = summarize_messages(state["messages"], max_tokens=2000)
    result = search_agent.invoke({"messages": compressed}, config=config)
    state["messages"] = result["messages"]
    return state

def route_after_supervisor(state: CustomState) -> Literal["product_agent", "order_supervisor", "other_agent", END]:
    intent = state.get("intent", "")
    if intent == "product":
        return "product_agent"
    elif intent == "order":
        return "order_supervisor"
    elif intent == "other":
        return "other_agent"
    else:
        return END

def route_after_order_supervisor(state: CustomState) -> Literal["order_handle_agent", "order_query_agent", END]:
    sub_intent = state.get("sub_intent", "")
    if sub_intent == "write":
        return "order_handle_agent"
    elif sub_intent == "read":
        return "order_query_agent"
    else:
        return END

定义图

workflow = StateGraph(CustomState)
workflow.add_node("supervisor", supervisor_node)
workflow.add_node("product_agent", product_agent_node)
workflow.add_node("order_supervisor", order_supervisor_node)
workflow.add_node("order_handle_agent", order_handle_agent_node)
workflow.add_node("order_query_agent", order_query_agent_node)
workflow.add_node("other_agent", other_agent_node)



workflow.set_entry_point("supervisor")
workflow.add_conditional_edges(
    "supervisor",
    route_after_supervisor,
    {
        "product_agent": "product_agent",
        "order_supervisor": "order_supervisor",
        "other_agent": "other_agent",
        END: END
    }
)
workflow.add_conditional_edges(
    "order_supervisor",
    route_after_order_supervisor,
    {
        "order_handle_agent": "order_handle_agent",
        "order_query_agent": "order_query_agent",
        END: END
    }
)

workflow.add_edge("order_handle_agent", END)
workflow.add_edge("order_query_agent", END)
workflow.add_edge("other_agent", END)
workflow.add_edge("product_agent", END)
memory = MemorySaver()
agent_app = workflow.compile(checkpointer=memory, interrupt_before=["order_handle_agent"])

测试

if __name__ == "__main__":
    config = {"configurable": {"thread_id": str(uuid.uuid4()), "user_id": "123"}}
    while True:
        content = input("用户:")
        if content in ('q', 'quit'):
            print("AI: bye bye")
            break
        agent_app.invoke({"messages": [HumanMessage(content=content)]}, config=config)
        state_after = agent_app.get_state(config)
        if state_after.next == ("order_handle_agent",):
            choice = input("确认对订单操作?(y/n): ")
            if choice.lower() == 'y':
                agent_app.invoke(Command(resume=True), config=config)
                final_state = agent_app.get_state(config)
                if final_state.values.get("messages"):
                    print("AI:", final_state.values["messages"][-1].content)
            else:
                # 1. 添加取消消息到状态中
                cancel_msg = AIMessage(content="订单已取消。")
                agent_app.update_state(config, {"messages": [cancel_msg]})
                # 2. 终止中断,让图结束执行(不进入 order_agent)
                agent_app.invoke(Command(resume=False), config=config)
                # 3. 可选:打印给用户看(也可以不打印,因为消息已在状态中)
                print("AI:", cancel_msg.content)
                continue  # 跳过本轮后续打印
        else:
            final_state = agent_app.get_state(config)
            if final_state.values.get("messages"):
                print("AI:", final_state.values["messages"][-1].content)

测试验证:

使用fastapi发布智能体

import uuid
from typing import Optional, Dict, Any

import uvicorn
from fastapi import FastAPI, HTTPException
from langchain_core.messages import HumanMessage, AIMessage
from langgraph.types import Command
from pydantic import BaseModel

from 多agent案例.multi_agent_demo4 import agent_app

api = FastAPI()

#------Pydantic模型------
class ChtRequest(BaseModel):
    message: str
    thread_id: Optional[str] = None

class ChatResponse(BaseModel):
    thread_id: str
    response: str
    need_confirmation:bool = False
    order_preview:Optional[Dict[str,Any]] = None

class ConfirmRequest(BaseModel):
    confirm: bool
    thread_id: str

class HistoryResponse(BaseModel):
    thread_id: str
    messages: list[Dict]

#-------辅助函数-------

def _get_user_id() -> str:
    return "123"
def _get_config(thread_id: str):
    return {"configurable": {"thread_id": thread_id, "user_id": _get_user_id()}}



@api.post("/chat", response_model=ChatResponse)
def chat(req: ChtRequest):
    thread_id = req.thread_id or str(uuid.uuid4())
    config = _get_config(thread_id)

    state = agent_app.get_state(config)
    if state.next == ("order_handle_agent",):
        raise HTTPException(status_code=409, detail=f"会话 {thread_id} 有未确认的订单操作,请先调用 /confirm 接口")
    agent_app.invoke({"messages":[HumanMessage(content=req.message)]}, config=config)
    state_after = agent_app.get_state(config)
    if state_after.next == ("order_handle_agent",):
        preview = {"message": req.message}
        return ChatResponse(
            thread_id=thread_id,
            response="请确认是否执行此订单操作?",
            need_confirmation=True,
            order_preview=preview
        )
    else:
        last_msg = state_after.values["messages"][-1]
        content = last_msg.content if hasattr(last_msg, "content") else str(last_msg)
        return ChatResponse(thread_id=thread_id, response=content, need_confirmation=False)



@api.post("/confirm", response_model=ChatResponse)
def confirm(req: ConfirmRequest):
    config = _get_config(req.thread_id)
    state = agent_app.get_state(config)

    if state.next != ("order_handle_agent",):
        raise HTTPException(status_code=400, detail=f"会话 {req.thread_id} 没有未确认的订单操作")

    if req.confirm:
        agent_app.invoke(Command(resume=True), config=config)
        final_state = agent_app.get_state(config)
        last_msg = final_state.values["messages"][-1]
        content = last_msg.content if hasattr(last_msg, "content") else str(last_msg)
        return ChatResponse(thread_id=req.thread_id, response=content, need_confirmation=False)
    else:
        cancel_msg = AIMessage(content="订单操作已取消。")
        agent_app.update_state(config, {"messages": [cancel_msg]})
        agent_app.invoke(Command(resume=False), config=config)
        return ChatResponse(thread_id=req.thread_id, response="订单操作已取消。", need_confirmation=False)

@api.get("/history/{thread_id}", response_model=HistoryResponse)
def history(thread_id: str):
    config = _get_config(thread_id)
    state = agent_app.get_state(config)
    messages = []
    for m in state.values.get("messages", []):
        role = "user" if m.type == "human" else "ai"
        messages.append({"role": role, "content": m.content})
    return HistoryResponse(thread_id=thread_id, messages=messages)

if __name__ == "__main__":
    uvicorn.run("start_demo4:api", host="127.0.0.1", port=8000, reload=True)

使用postman测试验证

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐