anthropics-claude-cookbooks学习记录01
学习记录claude-cookbooks。今日学习:https://github.com/anthropics/claude-cookbooks/tree/main/patterns/agents。
·
文章目录
学习记录claude-cookbooks。
今日学习:https://github.com/anthropics/claude-cookbooks/tree/main/patterns/agents
basic_workflows.ipynb
Basic Multi-LLM Workflows
本笔记本展示了三种简单的多大语言模型(Multi-LLM)工作流。它们在成本或延迟方面做出权衡,以期获得潜在的任务性能提升:
- 提示链(Prompt-Chaining):将任务分解为一系列顺序执行的子任务,每一步都基于前一步的结果进行构建。
- 并行化(Parallelization):将相互独立的子任务分发给多个大语言模型,实现并发处理。
- 路由(Routing):根据输入特征动态选择专门优化的大语言模型路径。
注意:这些是用于演示核心概念的示例实现,并非生产级代码。
from concurrent.futures import ThreadPoolExecutor
from typing import List, Dict, Callable
from util import llm_call, extract_xml
def chain(input: str, prompts: List[str]) -> str:
"""依次串联多次大语言模型(LLM)调用,将前一步的结果作为下一步的输入。"""
result = input
for i, prompt in enumerate(prompts, 1):
print(f"\n步骤 {i}:")
result = llm_call(f"{prompt}\n输入: {result}")
print(result)
return result
def parallel(prompt: str, inputs: List[str], n_workers: int = 3) -> List[str]:
"""使用相同的提示(prompt),并发处理多个输入。"""
with ThreadPoolExecutor(max_workers=n_workers) as executor:
# 为每个输入提交一个异步 LLM 调用任务
futures = [executor.submit(llm_call, f"{prompt}\n输入: {x}") for x in inputs]
# 等待所有任务完成并收集结果
return [f.result() for f in futures]
def route(input: str, routes: Dict[str, str]) -> str:
"""根据输入内容自动选择最合适的专用处理路径(prompt)。"""
# 第一步:使用大语言模型对输入进行分类,决定应路由到哪个路径
print(f"\n可用路由: {list(routes.keys())}")
selector_prompt = f"""
请分析以下输入,并从下列选项中选择最合适的支持团队:{list(routes.keys())}
请先说明你的推理过程,然后在以下 XML 格式中给出你的选择:
<reasoning>
简要解释为何该工单应分配给某个特定团队。
请考虑关键词、用户意图和紧急程度等因素。
</reasoning>
<selection>
所选团队名称
</selection>
输入: {input}""".strip()
# 调用 LLM 进行路由决策
route_response = llm_call(selector_prompt)
reasoning = extract_xml(route_response, 'reasoning') # 提取推理理由
route_key = extract_xml(route_response, 'selection').strip().lower() # 提取所选路径名称(转为小写)
print("路由分析:")
print(reasoning)
print(f"\n选定路由: {route_key}")
# 使用选定的专用提示处理原始输入
selected_prompt = routes[route_key]
return llm_call(f"{selected_prompt}\n输入: {input}")
Example Usage
以下是每种工作流的实用示例说明:
- 链式工作流(Chain Workflow):用于结构化数据提取与格式化
- 并行化工作流(Parallelization Workflow):用于利益相关方影响分析
- 路由工作流(Route Workflow):用于客服工单处理
# Example 1: Chain workflow for structured data extraction and formatting
# Each step progressively transforms raw text into a formatted table
data_processing_steps = [
"""从文本中仅提取数值及其关联指标。
每条数据以'数值: 指标'的格式换行显示。
示例格式:
92: 客户满意度
45%: 收入增长率""",
"""将所有数值尽可能转换为百分比形式。
若非百分比或点数,则转换为小数(例如:92 points → 92%)。
每行保留一个数值。
示例格式:
92%: 客户满意度
45%: 收入增长率""",
"""按数值从高到低对所有行进行降序排序。
保持每行'数值: 指标'的格式。
示例:
92%: 客户满意度
87%: 员工满意度""",
"""将排序后的数据格式化为Markdown表格,包含列:
| 指标 | 数值 |
|:--|--:|
| 客户满意度 | 92% |"""
]
report = """
第三季度绩效总结:
我们的客户满意度得分本季度升至92 points。
收入较去年增长45%。
主要市场份额现已达到23%。
客户流失率从8%降至5%。
新用户获取成本为每用户43美元。
产品使用率提升至78%。
员工满意度为87分。
营业利润率改善至34%。
"""
print("\n输入文本:")
print(report)
formatted_result = chain(report, data_processing_steps)
# Example 2: Parallelization workflow for stakeholder impact analysis
# Process impact analysis for multiple stakeholder groups concurrently
stakeholders = [
"""客户:
- 对价格敏感
- 希望更好的技术
- 关注环境问题""",
"""员工:
- 担心工作保障
- 需要新技能
- 希望明确方向""",
"""投资者:
- 期望增长
- 希望控制成本
- 关注风险""",
"""供应商:
- 产能限制
- 价格压力
- 技术转型"""
]
impact_results = parallel(
"""分析市场变化将如何影响此利益相关方群体。
提供具体影响和推荐行动。
格式要求清晰的章节和优先级。""",
stakeholders
)
for result in impact_results:
print(result)
print('+' * 80)
# Example 3: Route workflow for customer support ticket handling
# Route support tickets to appropriate teams based on content analysis
support_routes = {
"billing": """您是一名计费服务专家。请遵循以下指南:
1. 始终以"计费支持回复:"开头
2. 首先确认具体的计费问题
3. 清晰地解释任何费用或差异
4. 列出具体的后续步骤和时间安排
5. 如相关,以付款选项结尾
保持回复专业但友好。
输入:""",
"technical": """您是一名技术支持工程师。请遵循以下指南:
1. 始终以"技术支持回复:"开头
2. 列出解决问题的确切步骤
3. 如相关,包含系统要求
4. 提供常见问题的临时解决方案
5. 如需要,提供升级路径
使用清晰的编号步骤和技术细节。
输入:""",
"account": """您是一名账户安全专家。请遵循以下指南:
1. 始终以"账户支持回复:"开头
2. 优先处理账户安全和验证
3. 提供账户恢复/更改的清晰步骤
4. 包含安全提示和警告
5. 设定明确的解决时间预期
保持严肃、以安全为重点的语气。
输入:""",
"product": """您是一名产品专家。请遵循以下指南:
1. 始终以"产品支持回复:"开头
2. 专注于功能教育和最佳实践
3. 包含具体的使用示例
4. 链接到相关文档部分
5. 建议可能有帮助的相关功能
语气要有教育性和鼓励性。
输入:"""
}
# Test with different support tickets
tickets = [
"""主题:无法访问我的账户
消息:您好,我过去一小时一直在尝试登录,但一直收到"密码错误"的提示。
我确定我使用的是正确的密码。您能帮我重新获得访问权限吗?这很紧急,因为我需要在今天下班前提交报告。
- John""",
"""主题:信用卡上的意外收费
消息:您好,我刚刚注意到我的信用卡上有您公司的一笔49.99美元收费,但我以为我使用的是29.99美元的计划。
您能解释这笔费用吗?如果是错误,请进行调整。
谢谢,
Sarah""",
"""主题:如何导出数据?
消息:我需要将所有项目数据导出到Excel。我查阅了文档,但不知道如何进行批量导出。
这可能吗?如果可以,您能带我一步步完成吗?
诚挚问候,
Mike"""
]
print("正在处理支持工单...\n")
for i, ticket in enumerate(tickets, 1):
print(f"\n工单 {i}:")
print("-" * 40)
print(ticket)
print("\n回复:")
print("-" * 40)
response = route(ticket, support_routes)
print(response)
print("+" * 80)
evaluator_optimizer.ipynb
Evaluator-Optimizer Workflow
该工作流通过双模型协作实现:一个LLM生成回答,另一个则通过循环机制进行评估与反馈。
该工作流在以下场景中尤为有效:
- 具备明确的评估标准
- 需要通过迭代优化提升价值
其适用性主要体现在两个特征: - 当提供反馈后,LLM的响应质量能够获得显著提升
- LLM自身能够提供具有实质意义的反馈建议
from util import llm_call, extract_xml
def generate(prompt: str, task: str, context: str = "") -> tuple[str, str]:
"""生成解决方案并根据反馈进行改进"""
full_prompt = f"{prompt}\n{context}\n任务: {task}" if context else f"{prompt}\n任务: {task}"
response = llm_call(full_prompt)
thoughts = extract_xml(response, "thoughts")
result = extract_xml(response, "response")
print("\n=== 生成开始 ===")
print(f"思考过程:\n{thoughts}\n")
print(f"生成结果:\n{result}")
print("=== 生成结束 ===\n")
return thoughts, result
def evaluate(prompt: str, content: str, task: str) -> tuple[str, str]:
"""评估解决方案是否满足要求"""
full_prompt = f"{prompt}\n原始任务: {task}\n待评估内容: {content}"
response = llm_call(full_prompt)
evaluation = extract_xml(response, "evaluation")
feedback = extract_xml(response, "feedback")
print("=== 评估开始 ===")
print(f"评估状态: {evaluation}")
print(f"反馈意见: {feedback}")
print("=== 评估结束 ===\n")
return evaluation, feedback
def loop(task: str, evaluator_prompt: str, generator_prompt: str) -> tuple[str, list[dict]]:
"""持续生成和评估,直到满足要求为止"""
memory = []
chain_of_thought = []
thoughts, result = generate(generator_prompt, task)
memory.append(result)
chain_of_thought.append({"thoughts": thoughts, "result": result})
while True:
evaluation, feedback = evaluate(evaluator_prompt, result, task)
if evaluation == "PASS":
return result, chain_of_thought
context = "\n".join([
"先前尝试:",
*[f"- {m}" for m in memory],
f"\n反馈意见: {feedback}"
])
thoughts, result = generate(generator_prompt, task, context)
memory.append(result)
chain_of_thought.append({"thoughts": thoughts, "result": result})
应用场景示例:迭代式代码开发循环:
evaluator_prompt = """
请对以下代码进行评估:
1. 代码正确性
2. 时间复杂度
3. 代码风格和最佳实践
你只应该进行评估,不要尝试解决任务。
只有当所有标准都满足且没有改进建议时,才输出"PASS"。
请按以下格式简洁地输出评估结果。
<evaluation>通过, 需要改进, 或不通过</evaluation>
<feedback>
需要改进的内容及原因。
</feedback>
"""
generator_prompt = """
你的目标是根据<用户输入>完成任务。如果之前生成的结果有反馈意见,
你应该反思这些反馈以改进你的解决方案。
请按以下格式简洁地输出答案:
<thoughts>
[你对任务和反馈的理解,以及你计划如何改进]
</thoughts>
<response>
[你的代码实现放在这里]
</response>
"""
task = """
<用户输入>
实现一个栈,包含以下功能:
1. push(x) - 入栈
2. pop() - 出栈
3. getMin() - 获取最小值
所有操作都应该是O(1)时间复杂度。
</用户输入>
"""
loop(task, evaluator_prompt, generator_prompt)
orchestrator_workers.ipynb
Orchestrator-Workers Workflow
协调者-工作者工作流
在此工作流中,一个作为“协调者”的核心LLM会动态解析总任务,将其分派给多个“工作者”LLM去执行,并最终整合各工作者的输出结果。
适用场景
该工作流尤其适用于任务复杂、无法预先确定所有子任务的场景。其与简单并行处理的核心区别在于灵活性——子任务并非固定不变,而是由协调者根据当前输入动态生成。
from typing import Dict, List, Optional
from util import llm_call, extract_xml
def parse_tasks(tasks_xml: str) -> List[Dict]:
"""解析XML格式的任务列表,返回任务字典列表"""
tasks = []
current_task = {}
for line in tasks_xml.split('\n'):
line = line.strip()
if not line:
continue
if line.startswith("<task>"):
current_task = {}
elif line.startswith("<type>"):
current_task["type"] = line[6:-7].strip()
elif line.startswith("<description>"):
current_task["description"] = line[12:-13].strip()
elif line.startswith("</task>"):
if "description" in current_task:
if "type" not in current_task:
current_task["type"] = "default" # 默认类型
tasks.append(current_task)
return tasks
class FlexibleOrchestrator:
"""动态分解任务并使用工作者LLM并行执行的协调器"""
def __init__(
self,
orchestrator_prompt: str,
worker_prompt: str,
):
"""使用提示模板初始化协调器"""
self.orchestrator_prompt = orchestrator_prompt
self.worker_prompt = worker_prompt
def _format_prompt(self, template: str, **kwargs) -> str:
"""使用变量格式化提示模板"""
try:
return template.format(**kwargs)
except KeyError as e:
raise ValueError(f"缺少必要的提示变量: {e}")
def process(self, task: str, context: Optional[Dict] = None) -> Dict:
"""处理任务:分解任务并并行执行子任务"""
context = context or {}
# 第一步:获取协调器响应
orchestrator_input = self._format_prompt(
self.orchestrator_prompt,
task=task,
**context
)
orchestrator_response = llm_call(orchestrator_input)
# 解析协调器响应
analysis = extract_xml(orchestrator_response, "analysis")
tasks_xml = extract_xml(orchestrator_response, "tasks")
tasks = parse_tasks(tasks_xml)
print("\n=== 协调器输出 ===")
print(f"\n分析结果:\n{analysis}")
print(f"\n任务列表:\n{tasks}")
# 第二步:处理每个子任务
worker_results = []
for task_info in tasks:
worker_input = self._format_prompt(
self.worker_prompt,
original_task=task,
task_type=task_info['type'],
task_description=task_info['description'],
**context
)
worker_response = llm_call(worker_input)
result = extract_xml(worker_response, "response")
worker_results.append({
"type": task_info["type"],
"description": task_info["description"],
"result": result
})
print(f"\n=== 工作者结果 ({task_info['type']}) ===\n{result}\n")
return {
"analysis": analysis,
"worker_results": worker_results,
}
示例应用场景:营销素材多版本生成
ORCHESTRATOR_PROMPT = """
分析以下任务并将其分解为2-3种不同的创作方向:
任务:{task}
请按以下格式返回响应:
<analysis>
阐述你对任务的理解,并说明每种创作方向的价值所在。
重点说明每种方法如何服务于任务的不同方面。
</analysis>
<tasks>
<task>
<type>正式版</type>
<description>撰写精确、专业的版本,重点突出产品规格参数</description>
</task>
<task>
<type>口语版</type>
<description>撰写生动有趣的版本,与读者建立情感连接</description>
</task>
</tasks>
"""
WORKER_PROMPT = """
根据以下要求生成内容:
原始任务:{original_task}
文案风格:{task_type}
创作指南:{task_description}
请按以下格式返回响应:
<response>
请在此处填写生成的内容,保持指定风格并完整满足所有要求。
</response>
"""
orchestrator = FlexibleOrchestrator(
orchestrator_prompt=ORCHESTRATOR_PROMPT,
worker_prompt=WORKER_PROMPT,
)
results = orchestrator.process(
task="为新型环保水壶撰写产品描述",
context={
"target_audience": "注重环保的千禧一代",
"key_features": ["无塑料材质", "保温功能", "终身保修"]
}
)
更多推荐

所有评论(0)