开启Agent MCP 3.0时代:工作流引擎的革命性突破
工作流引擎是Agent MCP 3.0的核心组件,它允许开发者以声明式的方式定义多步骤任务流程,让AI代理能够自动按照预设的逻辑顺序执行一系列工具调用,并自动处理节点间的数据传递。与传统的线性执行模式不同,工作流引擎支持**有向无环图(DAG)**的执行模式,这意味着我们可以构建复杂的依赖关系,让任务执行更加灵活和智能。"tool_name": "get_pic", # 工具名称"idx": "n
在AI代理(Agent)技术快速发展的今天,我们很高兴向大家介绍Agent MCP 3.0的核心创新——工作流引擎工具。这一突破性功能将彻底改变我们构建和部署AI代理的方式,让复杂任务自动化变得更加直观、高效和可靠。
什么是工作流引擎?
工作流引擎是Agent MCP 3.0的核心组件,它允许开发者以声明式的方式定义多步骤任务流程,让AI代理能够自动按照预设的逻辑顺序执行一系列工具调用,并自动处理节点间的数据传递。
与传统的线性执行模式不同,工作流引擎支持**有向无环图(DAG)**的执行模式,这意味着我们可以构建复杂的依赖关系,让任务执行更加灵活和智能。
代码解析:工作流引擎的核心实现
让我们深入分析这个革命性的实现:
@mcp.tool()
async def workflow_tool(workflow_model: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
工作流引擎工具:按拓扑顺序执行工具节点
"""
# ...核心实现...
1. 工作流模型定义
工作流通过一个结构化的JSON数组定义,每个节点包含:
[
{
"tool_name": "get_pic", # 工具名称
"idx": "node1", # 节点ID
"pre_node": [], # 前置依赖节点
"next_node": ["node2"], # 后续节点
"input_params": {"image_id": "abc123"} # 输入参数
},
# ...
]
2. 拓扑排序算法
工作流引擎的核心是拓扑排序算法,它确保节点按照正确的依赖顺序执行:
# 构建图和入度
for node in workflow_model:
idx = node["idx"]
for dep in node.get("pre_node", []):
graph[dep].append(idx)
in_degree[idx] += 1
# 拓扑排序
queue = deque([node["idx"] for node in workflow_model if in_degree[node["idx"]] == 0])
execution_order = []
这一算法自动检测循环依赖,确保工作流的正确执行。
3. 动态参数传递
工作流引擎支持强大的参数模板功能,允许节点间无缝数据传递:
"data": "{node1.url}" # 自动替换为node1节点的url字段
引擎会自动解析这些占位符,并将前驱节点的输出作为当前节点的输入,实现真正的数据流驱动。
实战示例:图片处理工作流
让我们看一个实际的工作流示例,它展示了Agent MCP 3.0的强大能力:
asyncio.run(workflow_tool([
{
"tool_name": "get_pic",
"idx": "node1",
"pre_node": [],
"next_node": ["node2"],
"input_params": {
"image_id": "hero_001"
}
},
{
"tool_name": "simulate_api_call",
"idx": "node2",
"pre_node": ["node1"],
"next_node": [],
"input_params": {
"data": "{node1.url}",
"multiplier": 3
}
}
]))
这个工作流执行以下操作:
- 首先调用
get_pic
工具获取图片信息,返回类似{"image_id": "hero_001", "url": "https://example.com/images/hero_001.jpg", "size": "1024x768"}
- 然后调用
simulate_api_call
工具,将第一步获取的URL作为输入,并执行相应的处理
注意:正如示例中使用的example.com
域名,正如官方说明:“This domain is for use in illustrative examples in documents”,它专为示例文档而设,无需事先协调或申请许可即可使用。
为什么这是Agent MCP 3.0的里程碑?
1. 声明式编程范式
工作流引擎引入了声明式编程范式,开发者只需关注"做什么",而不是"怎么做"。这大大降低了复杂任务的实现难度。
2. 可视化工作流潜力
这种结构化的节点定义为未来实现可视化工作流编辑器奠定了基础,让非技术用户也能构建复杂的AI工作流。
3. 错误隔离与重试机制
工作流引擎天然支持节点级错误处理,单个节点失败不会影响整个流程,且可以针对特定节点实现重试策略。
4. 性能优化空间
通过分析工作流图,系统可以智能地并行执行无依赖关系的节点,显著提升执行效率。
展望未来
工作流引擎只是Agent MCP 3.0的开始。随着这一架构的成熟,我们将看到:
- 自适应工作流:AI代理能够根据上下文动态调整工作流结构
- 跨代理协作:多个Agent可以共享和协调工作流执行
- 工作流市场:可重用、可组合的工作流模板库
- 可视化开发环境:拖放式工作流设计器
结语
Agent MCP 3.0的工作流引擎代表了AI代理技术的一个重要转折点。它不仅解决了复杂任务编排的痛点,更为AI系统的模块化、可组合性和可维护性树立了新标准。
正如我们的示例中使用的example.com
域名所示,简单而强大的示例往往蕴含着无限可能。现在,是时候拥抱这个新的时代,构建更加智能、灵活和强大的AI代理系统了!
准备好迎接Agent MCP 3.0时代了吗?工作流引擎已经就绪,只等你来创造!
import asyncio
from mcp.server.fastmcp import FastMCP
from starlette.requests import Request
from config import mcp_server_host, mcp_server_name, mcp_server_port, mcp_server_log_level
from starlette.responses import JSONResponse
from typing import List, Dict, Any
from collections import defaultdict, deque
mcp = FastMCP(
name=mcp_server_name,
host=mcp_server_host,
port=mcp_server_port,
log_level=mcp_server_log_level
)
# ==================== 测试工具 1: get_pic ====================
@mcp.tool()
async def get_pic(image_id: str = "default") -> Dict[str, Any]:
"""
模拟获取图片信息
:param image_id: 图片ID
:return: 图片信息字典
"""
return {
"image_id": image_id,
"url": f"https://example.com/images/{image_id}.jpg",
"size": "1024x768"
}
# ==================== 测试工具 2: simulate_api_call ====================
@mcp.tool()
async def simulate_api_call(data: str = "", multiplier: int = 2) -> Dict[str, Any]:
"""
模拟API调用,对输入字符串做处理
:param data: 输入数据
:param multiplier: 倍数(用于模拟计算)
:return: 处理结果
"""
return {
"input": data,
"processed": data * multiplier,
"multiplier": multiplier
}
# ==================== 核心工具: workflow_tool ====================
@mcp.tool()
async def workflow_tool(workflow_model: List[Dict[str, Any]]) -> Dict[str, Any]:
"""
工作流引擎工具:按拓扑顺序执行工具节点
参数格式:
[
{
"tool_name": "get_pic",
"idx": "node1",
"pre_node": [],
"next_node": ["node2"],
"input_params": {"image_id": "abc123"},
"output_params": [] # 可选,用于声明输出字段名,这里我们动态传递
},
{
"tool_name": "simulate_api_call",
"idx": "node2",
"pre_node": ["node1"],
"next_node": [],
"input_params": {
"data": "{node1.url}", # 支持 {node_id.field} 占位符
"multiplier": 3
}
}
]
:param workflow_model: 工作流节点配置列表
:return: 最终执行结果,包含每个节点输出
"""
mcp_all_tools={"get_pic":get_pic,"simulate_api_call":simulate_api_call}
if not isinstance(workflow_model, list):
return {"error": "workflow_model must be a list of node dicts"}
# 构建节点索引映射
node_map = {node["idx"]: node for node in workflow_model}
in_degree = defaultdict(int)
graph = defaultdict(list)
# 构建图和入度
for node in workflow_model:
idx = node["idx"]
for dep in node.get("pre_node", []):
graph[dep].append(idx)
in_degree[idx] += 1
# 拓扑排序
queue = deque([node["idx"] for node in workflow_model if in_degree[node["idx"]] == 0])
execution_order = []
while queue:
current = queue.popleft()
execution_order.append(current)
for neighbor in graph[current]:
in_degree[neighbor] -= 1
if in_degree[neighbor] == 0:
queue.append(neighbor)
if len(execution_order) != len(workflow_model):
return {"error": "Cycle detected in workflow graph"}
# 执行节点
results = {}
for node_id in execution_order:
node = node_map[node_id]
tool_name = node["tool_name"]
input_params = node.get("input_params", {}).copy()
# 替换占位符 {node_id.field}
for key, value in input_params.items():
if isinstance(value, str) and "{" in value and "}" in value:
try:
# 简单模板替换:{node_id.field}
if value.count("{") == 1 and value.count("}") == 1:
start = value.find("{") + 1
end = value.find("}")
ref = value[start:end]
if "." in ref:
ref_node, ref_field = ref.split(".", 1)
if ref_node in results and ref_field in results[ref_node]:
input_params[key] = value.replace(f"{{{ref}}}", str(results[ref_node][ref_field]))
except Exception as e:
return {"error": f"Failed to resolve placeholder in {node_id}: {str(e)}"}
# 获取工具函数
tool_func = mcp_all_tools.get(tool_name)
if not tool_func:
return {"error": f"Tool '{tool_name}' not found"}
# 执行工具(假设都是 async)
try:
if callable(tool_func):
result = await tool_func(**input_params)
else:
result = {"error": f"Tool '{tool_name}' is not callable"}
except Exception as e:
result = {"error": f"Tool '{tool_name}' failed: {str(e)}"}
results[node_id] = result
return {
"status": "success",
"execution_order": execution_order,
"results": results
}
# ==================== 自定义路由(非必须,保留原样) ====================
@mcp.custom_route("/api/data", methods=["GET"])
def get_data():
return JSONResponse({"message": "Hello World"})
@mcp.custom_route("/api/post", methods=["POST"])
async def get_optim_api(request: Request):
return JSONResponse({"message": "Hello World"})
# ==================== 启动 ====================
if __name__ == '__main__':
asyncio.run(workflow_tool([
{
"tool_name": "get_pic",
"idx": "node1",
"pre_node": [],
"next_node": ["node2"],
"input_params": {
"image_id": "hero_001"
}
},
{
"tool_name": "simulate_api_call",
"idx": "node2",
"pre_node": ["node1"],
"next_node": [],
"input_params": {
"data": "{node1.url}",
"multiplier": 3
}
}
]))
# mcp.run()
更多推荐
所有评论(0)