目录

一 常见的多智能体架构

分布式架构

集中式架构

​编辑

LangGraph学习

核心组件:节点与可控制性

串行控制

分支控制

条件分支和循环

配置(动态切换模型配置)

MapReduce并行执行


一 常见的多智能体架构

分布式架构
  • 特点:智能体完全自治,通过本地交互实现协作。
  • 通信方式:点对点消息传递(如合同网协议)。
  • 优势:高容错性,适合动态环境(如无人机编队)。
集中式架构
  • 特点:存在一个中央控制单元(如主智能体)协调其他智能体的行为。
  • 适用场景:任务需要全局优化或强一致性,如工业流水线控制。
  • 缺点:单点故障风险,扩展性受限。

1.网状结构:任何一个智能体都可以进行决策

2.监督者结构:由主管来决策下一步的操作

3.监管者架构(工具):智能体作为工具,接受一个llm主管的调用

4.分级架构:多级架构每级都由一个监管者

5.自定义只有部分智能体具备决策权

LangGraph学习

langgraph --面向于智能体编排以及工作流创建

核心组件:节点与可控制性

1.节点:

        节点是图中的基本单元,代表一个具体的功能或操作,

        每个节点负责完成一项特定的任务(例如查询数据,生成文本,做决策等)

        节点接受输入,处理后产生输出。

        可以是简单的函数,api调用,LLM调用或者其他复杂操作。

2.graph:

        图是节点及其连接关系的集合,代表整个工作流程。

        定义了信息如何从一个节点流向另一个节点。

        可以是线性的 (A->B->C)或包含分支,循环的复杂结构。

        控制整个应用的执行流程以及逻辑。

        

from langchain_core.messages import AnyMessage, AIMessage
from langgraph.graph import StateGraph
from pydantic import BaseModel

# 定义节点间的通讯格式
class State(BaseModel):
    messages:list[AnyMessage]
    extra_field:int = 0

def node(state:State):
    messages=state.messages
    new_message=AIMessage("你好我是节点1")

    return {
        "messages":messages+[new_message],
        "extra_field":1
    }

#创建一个图包含节点使用state通信
graph=StateGraph(State)
graph.add_node(node)
graph.set_entry_point("node")   # 设置入口点
graph_builder=graph.compile()

# 查看节点与图结构
""" 使用mermaid(是一个基于文本的图表和可视化工具,
它允许用户通过简单的文本语法来创建复杂的图标和流程图。)
"""
from IPython.display import Image, display

display(Image(graph_builder.get_graph().draw_mermaid_png()))

预期输出:

#调用
from langchain_core.messages import HumanMessage
result=graph_builder.invoke({
    "messages":[HumanMessage("你好啊!我是小鱼!")]
})

# 使用pretty_print来格式化显示
for message in result["messages"]:
    message.pretty_print()

预期输出:

串行控制

from langchain_core.messages import AnyMessage, AIMessage
from langgraph.graph import StateGraph,START,END
from pydantic import BaseModel,Field
from IPython.display import Image, display

class State (BaseModel):
    value1:str =Field(None, description="value1")
    value2:str =Field(None, description="value2")
def step_1(state:State):
    return {
        "value1": 'a'
    }
def step_2(state:State):
    current_value_1=state.value1
    return {
        "value1": current_value_1+' '+'b'
    }
def step_3(state:State):
    return {
        "value2": "2"
    }
graph_builder=StateGraph(State)
graph_builder.add_node(step_1)
graph_builder.add_node(step_2)
graph_builder.add_node(step_3)

graph_builder.add_edge(START,"step_1")
graph_builder.add_edge("step_1","step_2")
graph_builder.add_edge("step_2","step_3")
graph_builder.add_edge("step_3",END)
graph = graph_builder.compile() # 编译实际上是将图的方式编译成runnable
display(Image(graph.get_graph().draw_mermaid_png()))

预期输出:

分支控制

import operator
from langgraph.graph import StateGraph,START,END
from pydantic import BaseModel,Field
from IPython.display import Image, display
from typing import Annotated
# Annotated 允许为类型提供额外的元数据,而不影响类型检查对类型本身的理解
class State(BaseModel):
    aggregate : Annotated[list,operator.add]

def a(state:State):
    print(f"添加A到{state.aggregate}")
    return {
        "aggregate": ["A"]
    }
def b(state:State):
    print(f"添加B到{state.aggregate}")
    return {
        "aggregate": ["B"]
    }
def c(state:State):
    print(f"添加C到{state.aggregate}")
    return {
        "aggregate": ["C"]
    }
def d(state:State):
    print(f"添加D到{state.aggregate}")
    return {
        "aggregate": ["D"]
    }
builder=StateGraph(State)
builder.add_node(a)
builder.add_node(b)
builder.add_node(c)
builder.add_node(d)

builder.add_edge(START,"a")
builder.add_edge("a","b")
builder.add_edge("a","c")
builder.add_edge("b","d")
builder.add_edge("c","d")
builder.add_edge("d",END)
graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

预期输出:

条件分支和循环

import operator
from langgraph.graph import StateGraph,START,END
from pydantic import BaseModel,Field
from IPython.display import Image, display
from typing import Annotated,Literal
# Annotated 允许为类型提供额外的元数据,而不影响类型检查对类型本身的理解
class State(BaseModel):
    aggregate : Annotated[list,operator.add]

def a(state:State):
    print(f"添加A到{state.aggregate}")
    return {
        "aggregate": ["A"]
    }
def b(state:State):
    print(f"添加B到{state.aggregate}")
    return {
        "aggregate": ["B"]
    }
def c(state:State):
    print(f"添加C到{state.aggregate}")
    return {
        "aggregate": ["C"]
    }
def d(state:State):
    print(f"添加D到{state.aggregate}")
    return {
        "aggregate": ["D"]
    }
def route (state:State)->Literal["b",END]:
    if len(state.aggregate) <5:
        return "b"
    else:
        return END
def route_after_d(state: State) -> Literal["a", END]:
    """节点 d 之后的路由 - 决定是否循环"""
    if len(state.aggregate) < 5:
        print(f"  节点 d 路由: 长度 {len(state.aggregate)} < 5, 回到 a")
        return "a"
    else:
        print(f"  节点 d 路由: 长度 {len(state.aggregate)} >= 5, 结束")
        return END

builder=StateGraph(State)
builder.add_node("a",a)
builder.add_node("b",b)
builder.add_node("c",c)
builder.add_node("d",d)

builder.add_edge(START,"a")
builder.add_conditional_edges("a",route,path_map={"b": "b", END: END})
builder.add_edge("b","c")
builder.add_edge("c","d")
builder.add_conditional_edges("d",route_after_d,path_map={"a": "a", END: END})

graph = builder.compile()
display(Image(graph.get_graph().draw_mermaid_png()))

运行结果:

配置(动态切换模型配置)

import operator
from typing import Annotated,Sequence
from pydantic import BaseModel
from IPython.display import Image, display
from langchain_community.chat_models import ChatTongyi,ChatOpenAI
import os
from langchain_core.messages import BaseMessage,HumanMessage
from  langchain_core.runnables.config import RunnableConfig
from langgraph.graph import END,START,StateGraph
from dotenv import load_dotenv

load_dotenv()
tongyi=ChatTongyi(
    model="qwen-plus",
    api_key=os.environ.get("DASHSCOPE_API_KEY"),
)
deepseek= ChatOpenAI(
    model="deepseek-chat",
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com"
)
models={
    "deepseek":deepseek,
    "tongyi":tongyi,
}

class AgentState(BaseModel):
    """
    状态模型
    """
    messages:Annotated[Sequence[BaseMessage],operator.add]
def _call_model(state:AgentState,config:RunnableConfig):
    model_name=config["configurable"].get("model","deepseek") #没有配置的情况下默认使用deepseek
    model=models[model_name]
    response=model.invoke(state.messages)
    return {
        "messages":[response]
    }
builder = StateGraph(AgentState)
builder.add_node("model",_call_model)
builder.add_edge(START,"model")
builder.add_edge("model",END)
graph = builder.compile()

config={
    "configurable":{
        "model":"tongyi"
    }
}
graph.invoke(AgentState(messages=[HumanMessage(content="你好,你是谁")]),config=config)

MapReduce并行执行

(示例):给定一个来自用户的一般主题,生成相关主题列表,为每个主题生成一个笑话,并从结果列表中选择最佳的笑话。

from langchain_core.prompts import PromptTemplate
import operator
from pydantic import BaseModel,Field
from typing import Annotated
from langchain_community.chat_models import ChatTongyi,ChatOpenAI
from langchain_deepseek import ChatDeepSeek
import os
from langgraph.graph import StateGraph,END,START
from langgraph.types import Send
load_dotenv()
# 模型和提示词
# 定义我们将使用的模型和提示词

subjects_prompt =PromptTemplate.from_template(
    """
        你是一个创意助手,你目前的任务是你需要根据给定的主题,生成2到5个与主题相对应的话题或者场景,确保生成的这些话题和场景能够被用于生成笑话。
        要求:
        1.每一个子话题是主题的某一个具体的方面,场景或者相关概念。
        2.不要重复主题本身
        3.每一个子主题之间尽量范围不要有重叠
        主题:{topic}
        输出格式:返回这些子话题的列表
    """)
joke_prompt = PromptTemplate.from_template(
    "请生成一个关于{subject}的笑话")
best_joke_prompt=PromptTemplate.from_template(
    "以下是关于{topic}的笑话,笑话列表如下:{jokes},请你在其中选择最好的的一个,返回最佳笑话的列表下标,下表是整数且大于等于0,例如:笑话1,笑话2,笑话3,笑话1的下表是0")

class Subjects(BaseModel):
    subjects:list[str]=Field(description="主题列表")
class Joke(BaseModel):
    joke:str
class BestJoke(BaseModel):
    id:int =Field(description="最佳笑话的索引,从0开始",ge=0)
model= ChatDeepSeek(
    model="deepseek-chat",
    api_key=os.environ.get("DEEPSEEK_API_KEY"),
    base_url="https://api.deepseek.com",
    temperature=0.8
)

#主图的整体状态
class OverallState(BaseModel):
    topic:str
    subjects:list=Field(default_factory=list)
    jokes:Annotated[list,operator.add]=Field(default_factory=list)
    best_joke:int=Field(default=-1)
#生成笑话
class JokeState(BaseModel):
    subject:str

#生成笑话主题的函数
def generate_joke_subjects(state:OverallState):
    chain=subjects_prompt|model.with_structured_output(Subjects)
    response=chain.invoke({"topic":state.topic})
    return OverallState(topic=state.topic,subjects=response.subjects)

#生成笑话的函数
def generate_joke(state:JokeState):
    chain=joke_prompt|model.with_structured_output(Joke)
    response=chain.invoke({"subject":state.subject})
    return {"jokes": [response.joke]}

# 定义映射到生成的主题上的逻辑
# 我们将在图中使用这个作为边缘
def continue_to_jokes(state:OverallState):
    # 将会返回一个Send对象列表
    # 每个Send对象将包含图中的节点名称
    # 以及要发发送到该节点的状态
    return [Send("generate_joke",JokeState(subject=s)) for s in state.subjects]

#评判最佳笑话
def evaluate_best_joke(state:OverallState):
    # 检查是否有笑话
    if not state.jokes:
        return {"best_joke": -1}

    chain=best_joke_prompt|model.with_structured_output(BestJoke)
    response=chain.invoke({"topic":state.topic,"jokes":state.jokes})
    return {"best_joke":response.id}

graph = StateGraph(OverallState)
graph.add_node("generate_subjects",generate_joke_subjects)
graph.add_node("generate_joke",generate_joke)
graph.add_node("best_joke",evaluate_best_joke)

graph.add_edge(START,"generate_subjects")
graph.add_conditional_edges("generate_subjects",continue_to_jokes,["generate_joke"])
graph.add_edge("generate_joke","best_joke")
graph.add_edge("best_joke",END)

app=graph.compile()
for s in app.stream(OverallState(topic="动物")):
    print(s)

Logo

小龙虾开发者社区是 CSDN 旗下专注 OpenClaw 生态的官方阵地,聚焦技能开发、插件实践与部署教程,为开发者提供可直接落地的方案、工具与交流平台,助力高效构建与落地 AI 应用

更多推荐