学习记录claude-cookbooks。
今日学习:https://github.com/anthropics/claude-cookbooks/tree/main/patterns/agents

basic_workflows.ipynb

Basic Multi-LLM Workflows

本笔记本展示了三种简单的多大语言模型(Multi-LLM)工作流。它们在成本或延迟方面做出权衡,以期获得潜在的任务性能提升:

  • 提示链(Prompt-Chaining):将任务分解为一系列顺序执行的子任务,每一步都基于前一步的结果进行构建。
  • 并行化(Parallelization):将相互独立的子任务分发给多个大语言模型,实现并发处理。
  • 路由(Routing):根据输入特征动态选择专门优化的大语言模型路径。

注意:这些是用于演示核心概念的示例实现,并非生产级代码。

from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Callable
from util import llm_call, extract_xml

def chain(input: str, prompts: List[str]) -> str:
    """依次串联多次大语言模型(LLM)调用,将前一步的结果作为下一步的输入。"""
    result = input
    for i, prompt in enumerate(prompts, 1):
        print(f"\n步骤 {i}:")
        result = llm_call(f"{prompt}\n输入: {result}")
        print(result)
    return result


def parallel(prompt: str, inputs: List[str], n_workers: int = 3) -> List[str]:
    """使用相同的提示(prompt),并发处理多个输入。"""
    with ThreadPoolExecutor(max_workers=n_workers) as executor:
        # 为每个输入提交一个异步 LLM 调用任务
        futures = [executor.submit(llm_call, f"{prompt}\n输入: {x}") for x in inputs]
        # 等待所有任务完成并收集结果
        return [f.result() for f in futures]


def route(input: str, routes: Dict[str, str]) -> str:
    """根据输入内容自动选择最合适的专用处理路径(prompt)。"""
    # 第一步:使用大语言模型对输入进行分类,决定应路由到哪个路径
    print(f"\n可用路由: {list(routes.keys())}")
    selector_prompt = f"""
    请分析以下输入,并从下列选项中选择最合适的支持团队:{list(routes.keys())}
    请先说明你的推理过程,然后在以下 XML 格式中给出你的选择:

    <reasoning>
    简要解释为何该工单应分配给某个特定团队。
    请考虑关键词、用户意图和紧急程度等因素。
    </reasoning>

    <selection>
    所选团队名称
    </selection>

    输入: {input}""".strip()
    
    # 调用 LLM 进行路由决策
    route_response = llm_call(selector_prompt)
    reasoning = extract_xml(route_response, 'reasoning')      # 提取推理理由
    route_key = extract_xml(route_response, 'selection').strip().lower()  # 提取所选路径名称(转为小写)
    
    print("路由分析:")
    print(reasoning)
    print(f"\n选定路由: {route_key}")
    
    # 使用选定的专用提示处理原始输入
    selected_prompt = routes[route_key]
    return llm_call(f"{selected_prompt}\n输入: {input}")

Example Usage

以下是每种工作流的实用示例说明:

  1. 链式工作流(Chain Workflow):用于结构化数据提取与格式化
  2. 并行化工作流(Parallelization Workflow):用于利益相关方影响分析
  3. 路由工作流(Route Workflow):用于客服工单处理
# Example 1: Chain workflow for structured data extraction and formatting
# Each step progressively transforms raw text into a formatted table

data_processing_steps = [
    """从文本中仅提取数值及其关联指标。
    每条数据以'数值: 指标'的格式换行显示。
    示例格式:
    92: 客户满意度
    45%: 收入增长率""",
    
    """将所有数值尽可能转换为百分比形式。
    若非百分比或点数,则转换为小数(例如:92 points → 92%)。
    每行保留一个数值。
    示例格式:
    92%: 客户满意度
    45%: 收入增长率""",
    
    """按数值从高到低对所有行进行降序排序。
    保持每行'数值: 指标'的格式。
    示例:
    92%: 客户满意度
    87%: 员工满意度""",
    
    """将排序后的数据格式化为Markdown表格,包含列:
    | 指标 | 数值 |
    |:--|--:|
    | 客户满意度 | 92% |"""
]

report = """
第三季度绩效总结:
我们的客户满意度得分本季度升至92 points。
收入较去年增长45%。
主要市场份额现已达到23%。
客户流失率从8%降至5%。
新用户获取成本为每用户43美元。
产品使用率提升至78%。
员工满意度为87分。
营业利润率改善至34%。
"""

print("\n输入文本:")
print(report)
formatted_result = chain(report, data_processing_steps)
# Example 2: Parallelization workflow for stakeholder impact analysis
# Process impact analysis for multiple stakeholder groups concurrently

stakeholders = [
    """客户:
    - 对价格敏感
    - 希望更好的技术
    - 关注环境问题""",
    
    """员工:
    - 担心工作保障
    - 需要新技能
    - 希望明确方向""",
    
    """投资者:
    - 期望增长
    - 希望控制成本
    - 关注风险""",
    
    """供应商:
    - 产能限制
    - 价格压力
    - 技术转型"""
]

impact_results = parallel(
    """分析市场变化将如何影响此利益相关方群体。
    提供具体影响和推荐行动。
    格式要求清晰的章节和优先级。""",
    stakeholders
)

for result in impact_results:
    print(result)
    print('+' * 80)
# Example 3: Route workflow for customer support ticket handling
# Route support tickets to appropriate teams based on content analysis

support_routes = {
    "billing": """您是一名计费服务专家。请遵循以下指南:
    1. 始终以"计费支持回复:"开头
    2. 首先确认具体的计费问题
    3. 清晰地解释任何费用或差异
    4. 列出具体的后续步骤和时间安排
    5. 如相关,以付款选项结尾
    
    保持回复专业但友好。
    
    输入:""",
    
    "technical": """您是一名技术支持工程师。请遵循以下指南:
    1. 始终以"技术支持回复:"开头
    2. 列出解决问题的确切步骤
    3. 如相关,包含系统要求
    4. 提供常见问题的临时解决方案
    5. 如需要,提供升级路径
    
    使用清晰的编号步骤和技术细节。
    
    输入:""",
    
    "account": """您是一名账户安全专家。请遵循以下指南:
    1. 始终以"账户支持回复:"开头
    2. 优先处理账户安全和验证
    3. 提供账户恢复/更改的清晰步骤
    4. 包含安全提示和警告
    5. 设定明确的解决时间预期
    
    保持严肃、以安全为重点的语气。
    
    输入:""",
    
    "product": """您是一名产品专家。请遵循以下指南:
    1. 始终以"产品支持回复:"开头
    2. 专注于功能教育和最佳实践
    3. 包含具体的使用示例
    4. 链接到相关文档部分
    5. 建议可能有帮助的相关功能
    
    语气要有教育性和鼓励性。
    
    输入:"""
}

# Test with different support tickets
tickets = [
    """主题:无法访问我的账户
    消息:您好,我过去一小时一直在尝试登录,但一直收到"密码错误"的提示。
    我确定我使用的是正确的密码。您能帮我重新获得访问权限吗?这很紧急,因为我需要在今天下班前提交报告。
    - John""",
    
    """主题:信用卡上的意外收费
    消息:您好,我刚刚注意到我的信用卡上有您公司的一笔49.99美元收费,但我以为我使用的是29.99美元的计划。
    您能解释这笔费用吗?如果是错误,请进行调整。
    谢谢,
    Sarah""",
    
    """主题:如何导出数据?
    消息:我需要将所有项目数据导出到Excel。我查阅了文档,但不知道如何进行批量导出。
    这可能吗?如果可以,您能带我一步步完成吗?
    诚挚问候,
    Mike"""
]

print("正在处理支持工单...\n")
for i, ticket in enumerate(tickets, 1):
    print(f"\n工单 {i}:")
    print("-" * 40)
    print(ticket)
    print("\n回复:")
    print("-" * 40)
    response = route(ticket, support_routes)
    print(response)
    print("+" * 80)

evaluator_optimizer.ipynb

Evaluator-Optimizer Workflow

该工作流通过双模型协作实现:一个LLM生成回答,另一个则通过循环机制进行评估与反馈。

该工作流在以下场景中尤为有效:

  • 具备明确的评估标准
  • 需要通过迭代优化提升价值
    其适用性主要体现在两个特征:
  • 当提供反馈后,LLM的响应质量能够获得显著提升
  • LLM自身能够提供具有实质意义的反馈建议
from util import llm_call, extract_xml

def generate(prompt: str, task: str, context: str = "") -> tuple[str, str]:
    """生成解决方案并根据反馈进行改进"""
    full_prompt = f"{prompt}\n{context}\n任务: {task}" if context else f"{prompt}\n任务: {task}"
    response = llm_call(full_prompt)
    thoughts = extract_xml(response, "thoughts")
    result = extract_xml(response, "response")
    
    print("\n=== 生成开始 ===")
    print(f"思考过程:\n{thoughts}\n")
    print(f"生成结果:\n{result}")
    print("=== 生成结束 ===\n")
    
    return thoughts, result

def evaluate(prompt: str, content: str, task: str) -> tuple[str, str]:
    """评估解决方案是否满足要求"""
    full_prompt = f"{prompt}\n原始任务: {task}\n待评估内容: {content}"
    response = llm_call(full_prompt)
    evaluation = extract_xml(response, "evaluation")
    feedback = extract_xml(response, "feedback")
    
    print("=== 评估开始 ===")
    print(f"评估状态: {evaluation}")
    print(f"反馈意见: {feedback}")
    print("=== 评估结束 ===\n")
    
    return evaluation, feedback

def loop(task: str, evaluator_prompt: str, generator_prompt: str) -> tuple[str, list[dict]]:
    """持续生成和评估,直到满足要求为止"""
    memory = []
    chain_of_thought = []
    
    thoughts, result = generate(generator_prompt, task)
    memory.append(result)
    chain_of_thought.append({"thoughts": thoughts, "result": result})
    
    while True:
        evaluation, feedback = evaluate(evaluator_prompt, result, task)
        if evaluation == "PASS":
            return result, chain_of_thought
            
        context = "\n".join([
            "先前尝试:",
            *[f"- {m}" for m in memory],
            f"\n反馈意见: {feedback}"
        ])
        
        thoughts, result = generate(generator_prompt, task, context)
        memory.append(result)
        chain_of_thought.append({"thoughts": thoughts, "result": result})

应用场景示例:迭代式代码开发循环:

evaluator_prompt = """
请对以下代码进行评估:
1. 代码正确性
2. 时间复杂度
3. 代码风格和最佳实践

你只应该进行评估,不要尝试解决任务。
只有当所有标准都满足且没有改进建议时,才输出"PASS"。
请按以下格式简洁地输出评估结果。

<evaluation>通过, 需要改进, 或不通过</evaluation>
<feedback>
需要改进的内容及原因。
</feedback>
"""

generator_prompt = """
你的目标是根据<用户输入>完成任务。如果之前生成的结果有反馈意见,
你应该反思这些反馈以改进你的解决方案。

请按以下格式简洁地输出答案:

<thoughts>
[你对任务和反馈的理解,以及你计划如何改进]
</thoughts>

<response>
[你的代码实现放在这里]
</response>
"""

task = """
<用户输入>
实现一个栈,包含以下功能:
1. push(x) - 入栈
2. pop() - 出栈  
3. getMin() - 获取最小值
所有操作都应该是O(1)时间复杂度。
</用户输入>
"""

loop(task, evaluator_prompt, generator_prompt)

orchestrator_workers.ipynb

Orchestrator-Workers Workflow

协调者-工作者工作流
在此工作流中,一个作为“协调者”的核心LLM会动态解析总任务,将其分派给多个“工作者”LLM去执行,并最终整合各工作者的输出结果。

适用场景
该工作流尤其适用于任务复杂、无法预先确定所有子任务的场景。其与简单并行处理的核心区别在于灵活性——子任务并非固定不变,而是由协调者根据当前输入动态生成。

from typing import Dict, List, Optional
from util import llm_call, extract_xml

def parse_tasks(tasks_xml: str) -> List[Dict]:
    """解析XML格式的任务列表,返回任务字典列表"""
    tasks = []
    current_task = {}
    
    for line in tasks_xml.split('\n'):
        line = line.strip()
        if not line:
            continue
            
        if line.startswith("<task>"):
            current_task = {}
        elif line.startswith("<type>"):
            current_task["type"] = line[6:-7].strip()
        elif line.startswith("<description>"):
            current_task["description"] = line[12:-13].strip()
        elif line.startswith("</task>"):
            if "description" in current_task:
                if "type" not in current_task:
                    current_task["type"] = "default"  # 默认类型
                tasks.append(current_task)
    
    return tasks

class FlexibleOrchestrator:
    """动态分解任务并使用工作者LLM并行执行的协调器"""
    
    def __init__(
        self,
        orchestrator_prompt: str,
        worker_prompt: str,
    ):
        """使用提示模板初始化协调器"""
        self.orchestrator_prompt = orchestrator_prompt
        self.worker_prompt = worker_prompt

    def _format_prompt(self, template: str, **kwargs) -> str:
        """使用变量格式化提示模板"""
        try:
            return template.format(**kwargs)
        except KeyError as e:
            raise ValueError(f"缺少必要的提示变量: {e}")

    def process(self, task: str, context: Optional[Dict] = None) -> Dict:
        """处理任务:分解任务并并行执行子任务"""
        context = context or {}
        
        # 第一步:获取协调器响应
        orchestrator_input = self._format_prompt(
            self.orchestrator_prompt,
            task=task,
            **context
        )
        orchestrator_response = llm_call(orchestrator_input)
        
        # 解析协调器响应
        analysis = extract_xml(orchestrator_response, "analysis")
        tasks_xml = extract_xml(orchestrator_response, "tasks")
        tasks = parse_tasks(tasks_xml)
        
        print("\n=== 协调器输出 ===")
        print(f"\n分析结果:\n{analysis}")
        print(f"\n任务列表:\n{tasks}")
        
        # 第二步:处理每个子任务
        worker_results = []
        for task_info in tasks:
            worker_input = self._format_prompt(
                self.worker_prompt,
                original_task=task,
                task_type=task_info['type'],
                task_description=task_info['description'],
                **context
            )
            
            worker_response = llm_call(worker_input)
            result = extract_xml(worker_response, "response")
            
            worker_results.append({
                "type": task_info["type"],
                "description": task_info["description"],
                "result": result
            })
            
            print(f"\n=== 工作者结果 ({task_info['type']}) ===\n{result}\n")
        
        return {
            "analysis": analysis,
            "worker_results": worker_results,
        }

示例应用场景:营销素材多版本生成

ORCHESTRATOR_PROMPT = """
分析以下任务并将其分解为2-3种不同的创作方向:

任务:{task}

请按以下格式返回响应:

<analysis>
阐述你对任务的理解,并说明每种创作方向的价值所在。
重点说明每种方法如何服务于任务的不同方面。
</analysis>

<tasks>
    <task>
    <type>正式版</type>
    <description>撰写精确、专业的版本,重点突出产品规格参数</description>
    </task>
    <task>
    <type>口语版</type>
    <description>撰写生动有趣的版本,与读者建立情感连接</description>
    </task>
</tasks>
"""

WORKER_PROMPT = """
根据以下要求生成内容:
原始任务:{original_task}
文案风格:{task_type}
创作指南:{task_description}

请按以下格式返回响应:

<response>
请在此处填写生成的内容,保持指定风格并完整满足所有要求。
</response>
"""


orchestrator = FlexibleOrchestrator(
    orchestrator_prompt=ORCHESTRATOR_PROMPT,
    worker_prompt=WORKER_PROMPT,
)

results = orchestrator.process(
    task="为新型环保水壶撰写产品描述",
    context={
        "target_audience": "注重环保的千禧一代",
        "key_features": ["无塑料材质", "保温功能", "终身保修"]
    }
)
Logo

更多推荐