Spring AI alibaba 智能体扩展
本文扩展了智能体应用功能,主要实现两部分内容:循环执行检测处理和人机交互工具集成。 循环检测与处理 实现is_stuck方法检测重复消息 通过handle_stuck_state添加提示信息 在run方法中集成循环检测逻辑 交互式AskHuman工具 定义Java版AskHuman工具类 使用@Tool注解标记方法 实现用户输入查询功能 最后还提到后续将参照Open Manus实现MCP集成。整体
本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。
文章目录
前言
在前篇中,参照Open Manus的源码,利用Spring AI 框架,实现了一个JAVA版本具有主要功能的智能体应用。在本篇中,继续参照源码对智能体应用的功能进行一些扩展:
- 实现源码
base.py中关于循环执行检测与处理的相关逻辑,即is_stuck和handle_stuck_state - 实现源码中
AskHuman工具的使用,即让AI自主决定什么时候让用户进行辅助输入查询。
最后还会看一下Open Manus中关于MCP集成的实现。
一、循环执行检测与处理
参照源码,在base.py的run方法的循环中,调用了两个方法:
- is_stuck是用于判断当前是否出现了循环执行的问题。
- handle_stuck_state则是对循环执行的问题进行处理。

is_stuck方法的含义:
def is_stuck(self) -> bool:
"""Check if the agent is stuck in a loop by detecting duplicate content"""
# 检查消息数量是否足够进行循环检测(至少需要2条消息)
if len(self.memory.messages) < 2:
return False # 消息数量不足,无法检测循环,返回False
# 获取最后一条消息(最新的助手回复)
last_message = self.memory.messages[-1]
# 检查最后一条消息是否有内容
if not last_message.content:
return False # 如果最后一条消息内容为空,返回False
# 统计与最后一条消息内容相同的助手消息数量
duplicate_count = sum(
1 # 计数器,每找到一个重复消息就加1
for msg in reversed(self.memory.messages[:-1]) # 遍历除最后一条外的所有历史消息(倒序)
if msg.role == "assistant" and msg.content == last_message.content # 只统计助手角色且内容完全相同的消息
)
# 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环
return duplicate_count >= self.duplicate_threshold
handle_stuck_state方法的主要作用,当智能体陷入循环时,系统会自动添加提示来引导它改变策略,避免继续重复无效的行为。
- 定义一个提示信息,告诉智能体检测到了重复响应,建议考虑新策略
- 将这个提示信息添加到现有的下一步提示前面,让智能体在下次执行时能看到这个提醒
- 记录警告日志,便于调试和监控智能体的状态
def handle_stuck_state(self):
"""Handle stuck state by adding a prompt to change strategy"""
# 定义检测到循环状态时的提示信息
stuck_prompt = "\
Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
# 将循环检测提示添加到现有的下一步提示前面,用换行符连接
self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
# 记录警告日志,记录智能体检测到循环状态并添加了提示信息
logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")
1.1、定义is_stuck
我们在智能体的BaseAgent中首先定义is_stuck,并且参照源码的逻辑进行实现:
/**
* 智能体循环检测阈值
*/
private Integer duplicateThreshold = 2;
/**
* 循环消息检测
* @return
*/
public boolean isStuck() {
// 检查消息数量是否足够进行循环检测(至少需要2条消息)
if (this.memoryMessages.size() < 2) {
return false;
}
// 获取最后一条消息(最新的助手回复)
Message lastMessage = CollUtil.getLast(this.memoryMessages);
if (ObjUtil.isEmpty(lastMessage) || lastMessage.getText().isEmpty()){
return false;
}
//统计与最后一条消息内容相同的助手消息数量
int count = 0;
//遍历除最后一条外的所有历史消息(倒序)
for (int i = this.memoryMessages.size() - 2 ; i >= 0; i--) {
Message message = memoryMessages.get(i);
// 只统计助手角色且内容完全相同的消息
if (message instanceof AssistantMessage && message.getText().equals(lastMessage.getText())){
count++;
}
}
// 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环
return count >= duplicateThreshold;
}
1.2、定义handle_stuck_state
然后定义出现循环问题的解决方法,相对比较简单:
/**
* 处理循环消息
*/
public void handleStuckState(){
String stuckPrompt = "Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted.";
this.nextStepPrompt = nextStepPrompt + "/n" + stuckPrompt;
log.info("Agent detected stuck state. Added prompt: {}",stuckPrompt);
}
1.3、改造run方法
在run方法的循环中,增加循环检测和处理:
//循环条件 小于最大轮次,并且状态不为结束
for (int i = 0; i < maxStep && agentState != AgentState.FINISHED; i++) {
//执行具体的操作,子类实现
String stepResult = step();
// 扩展点 加入循环检测的逻辑
if (this.isStuck()){
this.handleStuckState();
}
//.....
}
二、智能体交互式执行
参照源码,在tool目录下定义了一个AskHuman工具,后续在自己的智能体项目中,需要将其转换为Spring AI工具类的形式。
# 定义人机交互工具类
class AskHuman(BaseTool):
"""Add a tool to ask human for help."""
# 定义工具的唯一标识符名称
name: str = "ask_human"
# 定义工具的功能描述,说明何时使用此工具
description: str = "Use this tool to ask human for help."
# 定义工具的参数结构,使用JSON Schema格式
parameters: str = {
"type": "object", # 参数类型为对象
"properties": { # 定义对象的属性
"inquire": { # 定义询问参数
"type": "string", # 参数类型为字符串
"description": "The question you want to ask human.", # 参数描述
}
},
"required": ["inquire"], # 指定必需参数为inquire
}
# 定义异步执行方法,用于处理人机交互
async def execute(self, inquire: str) -> str:
# 使用input函数在控制台显示问题并等待用户输入,然后去除首尾空白字符
return input(f"""Bot: {inquire}\n\nYou: """).strip()
1.1、AskHuman工具类
/**
* 定义人机交互工具类
*/
public class AskHuman {
@Tool(description = "ask_human")
public String askHuman(@ToolParam(description = "The question you want to ask human.") String inquire) {
// 使用Scanner获取用户输入
Scanner scanner = new Scanner(System.in);
System.out.printf("Bot: %s%n%nYou: ", inquire);
String userInput = scanner.nextLine();
return userInput.trim();
}
}
然后将其注册到ToolRegistration中:
@Configuration
public class ToolRegistration {
@Bean
public ToolCallback[] availableTools(){
FileOperationTool fileOperationTool = new FileOperationTool();
PDFGenerationTool pdfGenerationTool = new PDFGenerationTool();
ResourceDownloadTool resourceDownloadTool = new ResourceDownloadTool();
TerminalOperationTool terminalOperationTool = new TerminalOperationTool();
WebScrapingTool webScrapingTool = new WebScrapingTool();
// WebSearchTool webSearchTool = new WebSearchTool();
TerminateTool terminateTool = new TerminateTool();
//人机交互工具类
AskHumanTool askHumanTool = new AskHumanTool();
return ToolCallbacks.from(fileOperationTool,pdfGenerationTool,resourceDownloadTool,terminalOperationTool,webScrapingTool,terminateTool,askHumanTool);
}
}
三、Open Manus 集成MCP
首先参照源码中的集成方式,在Manus.py中,首先定义了两个属性,表示MCP客户端实例和记录已连接的MCP服务器:
其中还有一些关键的方法:
3.1、源码实现:initialize_mcp_servers
该方法主要是初始化所有配置的MCP服务器连接:
- 遍历配置中的所有MCP服务器
- 支持SSE和stdio两种连接方式
- 包含错误处理和日志记录
async def initialize_mcp_servers(self) -> None:
"""Initialize connections to configured MCP servers."""
# 遍历配置中的所有MCP服务器
for server_id, server_config in config.mcp_config.servers.items():
try:
# 检查服务器类型是否为SSE(Server-Sent Events)
if server_config.type == "sse":
# 如果配置了URL,则连接到SSE服务器
if server_config.url:
await self.connect_mcp_server(server_config.url, server_id)
# 记录成功连接日志
logger.info(
f"Connected to MCP server {server_id} at {server_config.url}"
)
# 检查服务器类型是否为stdio(标准输入输出)
elif server_config.type == "stdio":
# 如果配置了命令,则通过stdio连接到服务器
if server_config.command:
await self.connect_mcp_server(
server_config.command,
server_id,
use_stdio=True,
stdio_args=server_config.args,
)
# 记录成功连接日志
logger.info(
f"Connected to MCP server {server_id} using command {server_config.command}"
)
except Exception as e:
# 记录连接失败的错误日志
logger.error(f"Failed to connect to MCP server {server_id}: {e}")
3.2、源码实现:connect_mcp_server
在上一个initialize_mcp_servers方法中,根据服务器类型,最终执行的是connect_mcp_server方法连接到服务器:
- 检查连接方式(stdio或SSE)
- 将自身添加到保存已连接MCP的集合中。
- 将新工具添加到可用工具集合
可以看到,在连接到服务器之后,会获取到MCP服务对应的工具,然后加入到一个可用工具集合中。
最终在调用工具时,还是会遍历本地的可用工具集合并且执行,而不是每次都去MCP服务器上获取。
async def connect_mcp_server(
self,
server_url: str,
server_id: str = "",
use_stdio: bool = False,
stdio_args: List[str] = None,
) -> None:
"""Connect to an MCP server and add its tools."""
# 检查是否使用stdio连接方式
if use_stdio:
# 通过stdio连接到MCP服务器
await self.mcp_clients.connect_stdio(
server_url, stdio_args or [], server_id
)
# 将服务器信息添加到已连接服务器字典中
self.connected_servers[server_id or server_url] = server_url
else:
# 通过SSE连接到MCP服务器
await self.mcp_clients.connect_sse(server_url, server_id)
# 将服务器信息添加到已连接服务器字典中
self.connected_servers[server_id or server_url] = server_url
# 更新可用工具集合,只添加来自此服务器的新工具
new_tools = [
tool for tool in self.mcp_clients.tools if tool.server_id == server_id
]
# 将新工具添加到可用工具集合中
self.available_tools.add_tools(*new_tools)
3.3、源码实现:disconnect_mcp_server
该方法的作用是,断开MCP服务器连接并移除其工具:
- 断开服务器连接
- 清理已连接MCP的集合
- 重置可用工具集合
async def disconnect_mcp_server(self, server_id: str = "") -> None:
"""Disconnect from an MCP server and remove its tools."""
# 断开与MCP服务器的连接
await self.mcp_clients.disconnect(server_id)
# 如果指定了服务器ID,则只移除该服务器
if server_id:
self.connected_servers.pop(server_id, None)
else:
# 如果没有指定服务器ID,则清空所有已连接服务器
self.connected_servers.clear()
# 重新构建可用工具集合,移除已断开服务器的工具
base_tools = [
tool
for tool in self.available_tools.tools
if not isinstance(tool, MCPClientTool)
]
# 创建新的工具集合,只包含基础工具
self.available_tools = ToolCollection(*base_tools)
# 添加剩余的MCP客户端工具
self.available_tools.add_tools(*self.mcp_clients.tools)
3.4、源码实现:cleanup
该方法的主要目的是清理MCP相关资源:
- 清理浏览器上下文
- 断开所有MCP服务器连接
async def cleanup(self):
"""Clean up Manus agent resources."""
# 清理浏览器上下文助手
if self.browser_context_helper:
await self.browser_context_helper.cleanup_browser()
# 只有在已初始化的情况下才断开所有MCP服务器连接
if self._initialized:
await self.disconnect_mcp_server()
self._initialized = False
3.5、源码实现:think
最后还对父类的think方法做了增强:
- 检查初始化状态
- 自动初始化MCP服务器

更多推荐



所有评论(0)