本文为个人学习笔记整理,仅供交流参考,非专业教学资料,内容请自行甄别。


前言

  在前篇中,参照Open Manus的源码,利用Spring AI 框架,实现了一个JAVA版本具有主要功能的智能体应用。在本篇中,继续参照源码对智能体应用的功能进行一些扩展:

  • 实现源码base.py中关于循环执行检测与处理的相关逻辑,即is_stuckhandle_stuck_state
  • 实现源码中AskHuman工具的使用,即让AI自主决定什么时候让用户进行辅助输入查询。

  最后还会看一下Open Manus中关于MCP集成的实现。

一、循环执行检测与处理

  参照源码,在base.pyrun方法的循环中,调用了两个方法:

  1. is_stuck是用于判断当前是否出现了循环执行的问题。
  2. handle_stuck_state则是对循环执行的问题进行处理。

在这里插入图片描述
  is_stuck方法的含义:

def is_stuck(self) -> bool:
    """Check if the agent is stuck in a loop by detecting duplicate content"""
    # 检查消息数量是否足够进行循环检测(至少需要2条消息)
    if len(self.memory.messages) < 2:
        return False  # 消息数量不足,无法检测循环,返回False

    # 获取最后一条消息(最新的助手回复)
    last_message = self.memory.messages[-1]
    # 检查最后一条消息是否有内容
    if not last_message.content:
        return False  # 如果最后一条消息内容为空,返回False

    # 统计与最后一条消息内容相同的助手消息数量
    duplicate_count = sum(
        1  # 计数器,每找到一个重复消息就加1
        for msg in reversed(self.memory.messages[:-1])  # 遍历除最后一条外的所有历史消息(倒序)
        if msg.role == "assistant" and msg.content == last_message.content  # 只统计助手角色且内容完全相同的消息
    )

    # 判断重复次数是否达到阈值,如果达到则认为智能体陷入循环
    return duplicate_count >= self.duplicate_threshold

  handle_stuck_state方法的主要作用,当智能体陷入循环时,系统会自动添加提示来引导它改变策略,避免继续重复无效的行为。

  • 定义一个提示信息,告诉智能体检测到了重复响应,建议考虑新策略
  • 将这个提示信息添加到现有的下一步提示前面,让智能体在下次执行时能看到这个提醒
  • 记录警告日志,便于调试和监控智能体的状态
def handle_stuck_state(self):
    """Handle stuck state by adding a prompt to change strategy"""
    # 定义检测到循环状态时的提示信息
    stuck_prompt = "\
    Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted."
    # 将循环检测提示添加到现有的下一步提示前面,用换行符连接
    self.next_step_prompt = f"{stuck_prompt}\n{self.next_step_prompt}"
    # 记录警告日志,记录智能体检测到循环状态并添加了提示信息
    logger.warning(f"Agent detected stuck state. Added prompt: {stuck_prompt}")

1.1、定义is_stuck

  我们在智能体的BaseAgent中首先定义is_stuck,并且参照源码的逻辑进行实现:


    /**
     * 智能体循环检测阈值
     */
    private Integer duplicateThreshold = 2;

    /**
     * 循环消息检测
     * @return
     */
    public boolean isStuck() {
//        检查消息数量是否足够进行循环检测(至少需要2条消息)
        if (this.memoryMessages.size() < 2) {
            return false;
        }
        // 获取最后一条消息(最新的助手回复)
        Message lastMessage = CollUtil.getLast(this.memoryMessages);
        if (ObjUtil.isEmpty(lastMessage) || lastMessage.getText().isEmpty()){
            return false;
        }

        //统计与最后一条消息内容相同的助手消息数量
        int count = 0;
        //遍历除最后一条外的所有历史消息(倒序)
        for (int i = this.memoryMessages.size() - 2 ; i >=  0; i--) {
            Message message = memoryMessages.get(i);
//            只统计助手角色且内容完全相同的消息
            if (message instanceof AssistantMessage && message.getText().equals(lastMessage.getText())){
                count++;
            }
        }
//        判断重复次数是否达到阈值,如果达到则认为智能体陷入循环
        return count >= duplicateThreshold;
    }

1.2、定义handle_stuck_state

  然后定义出现循环问题的解决方法,相对比较简单:

    /**
     * 处理循环消息
     */
    public void handleStuckState(){
        String stuckPrompt = "Observed duplicate responses. Consider new strategies and avoid repeating ineffective paths already attempted.";
        this.nextStepPrompt = nextStepPrompt + "/n" + stuckPrompt;
        log.info("Agent detected stuck state. Added prompt: {}",stuckPrompt);
    }

1.3、改造run方法

  在run方法的循环中,增加循环检测和处理:

					 //循环条件 小于最大轮次,并且状态不为结束
            for (int i = 0; i < maxStep && agentState != AgentState.FINISHED; i++) {
                //执行具体的操作,子类实现
                String stepResult = step();
						// 扩展点 加入循环检测的逻辑
                if (this.isStuck()){
                    this.handleStuckState();
                }
                //.....
            }

二、智能体交互式执行

  参照源码,在tool目录下定义了一个AskHuman工具,后续在自己的智能体项目中,需要将其转换为Spring AI工具类的形式。

# 定义人机交互工具类
class AskHuman(BaseTool):
    """Add a tool to ask human for help."""

    # 定义工具的唯一标识符名称
    name: str = "ask_human"
    # 定义工具的功能描述,说明何时使用此工具
    description: str = "Use this tool to ask human for help."
    # 定义工具的参数结构,使用JSON Schema格式
    parameters: str = {
        "type": "object",  # 参数类型为对象
        "properties": {  # 定义对象的属性
            "inquire": {  # 定义询问参数
                "type": "string",  # 参数类型为字符串
                "description": "The question you want to ask human.",  # 参数描述
            }
        },
        "required": ["inquire"],  # 指定必需参数为inquire
    }

    # 定义异步执行方法,用于处理人机交互
    async def execute(self, inquire: str) -> str:
        # 使用input函数在控制台显示问题并等待用户输入,然后去除首尾空白字符
        return input(f"""Bot: {inquire}\n\nYou: """).strip()

1.1、AskHuman工具类

/**
 * 定义人机交互工具类
 */
public class AskHuman {

    @Tool(description = "ask_human")
    public String askHuman(@ToolParam(description = "The question you want to ask human.") String inquire) {

        // 使用Scanner获取用户输入
        Scanner scanner = new Scanner(System.in);
        System.out.printf("Bot: %s%n%nYou: ", inquire);
        String userInput = scanner.nextLine();

        return userInput.trim();


    }
}

  然后将其注册到ToolRegistration中:

@Configuration
public class ToolRegistration {

    @Bean
    public ToolCallback[] availableTools(){
        FileOperationTool fileOperationTool = new FileOperationTool();
        PDFGenerationTool pdfGenerationTool = new PDFGenerationTool();
        ResourceDownloadTool resourceDownloadTool = new ResourceDownloadTool();
        TerminalOperationTool terminalOperationTool = new TerminalOperationTool();
        WebScrapingTool webScrapingTool = new WebScrapingTool();
//        WebSearchTool webSearchTool = new WebSearchTool();
        TerminateTool terminateTool = new TerminateTool();
        //人机交互工具类
        AskHumanTool askHumanTool = new AskHumanTool();

        return ToolCallbacks.from(fileOperationTool,pdfGenerationTool,resourceDownloadTool,terminalOperationTool,webScrapingTool,terminateTool,askHumanTool);
    }
}

三、Open Manus 集成MCP

  首先参照源码中的集成方式,在Manus.py中,首先定义了两个属性,表示MCP客户端实例记录已连接的MCP服务器
在这里插入图片描述

  其中还有一些关键的方法:

3.1、源码实现:initialize_mcp_servers

  该方法主要是初始化所有配置的MCP服务器连接

  • 遍历配置中的所有MCP服务器
  • 支持SSE和stdio两种连接方式
  • 包含错误处理和日志记录
    async def initialize_mcp_servers(self) -> None:
        """Initialize connections to configured MCP servers."""
        # 遍历配置中的所有MCP服务器
        for server_id, server_config in config.mcp_config.servers.items():
            try:
                # 检查服务器类型是否为SSE(Server-Sent Events)
                if server_config.type == "sse":
                    # 如果配置了URL,则连接到SSE服务器
                    if server_config.url:
                        await self.connect_mcp_server(server_config.url, server_id)
                        # 记录成功连接日志
                        logger.info(
                            f"Connected to MCP server {server_id} at {server_config.url}"
                        )
                # 检查服务器类型是否为stdio(标准输入输出)
                elif server_config.type == "stdio":
                    # 如果配置了命令,则通过stdio连接到服务器
                    if server_config.command:
                        await self.connect_mcp_server(
                            server_config.command,
                            server_id,
                            use_stdio=True,
                            stdio_args=server_config.args,
                        )
                        # 记录成功连接日志
                        logger.info(
                            f"Connected to MCP server {server_id} using command {server_config.command}"
                        )
            except Exception as e:
                # 记录连接失败的错误日志
                logger.error(f"Failed to connect to MCP server {server_id}: {e}")

3.2、源码实现:connect_mcp_server

  在上一个initialize_mcp_servers方法中,根据服务器类型,最终执行的是connect_mcp_server方法连接到服务器:

  • 检查连接方式(stdio或SSE)
  • 将自身添加到保存已连接MCP的集合中。
  • 将新工具添加到可用工具集合

  可以看到,在连接到服务器之后,会获取到MCP服务对应的工具,然后加入到一个可用工具集合中。
最终在调用工具时,还是会遍历本地的可用工具集合并且执行,而不是每次都去MCP服务器上获取。

    async def connect_mcp_server(
        self,
        server_url: str,
        server_id: str = "",
        use_stdio: bool = False,
        stdio_args: List[str] = None,
    ) -> None:
        """Connect to an MCP server and add its tools."""
        # 检查是否使用stdio连接方式
        if use_stdio:
            # 通过stdio连接到MCP服务器
            await self.mcp_clients.connect_stdio(
                server_url, stdio_args or [], server_id
            )
            # 将服务器信息添加到已连接服务器字典中
            self.connected_servers[server_id or server_url] = server_url
        else:
            # 通过SSE连接到MCP服务器
            await self.mcp_clients.connect_sse(server_url, server_id)
            # 将服务器信息添加到已连接服务器字典中
            self.connected_servers[server_id or server_url] = server_url

        # 更新可用工具集合,只添加来自此服务器的新工具
        new_tools = [
            tool for tool in self.mcp_clients.tools if tool.server_id == server_id
        ]
        # 将新工具添加到可用工具集合中
        self.available_tools.add_tools(*new_tools)

3.3、源码实现:disconnect_mcp_server

  该方法的作用是,断开MCP服务器连接并移除其工具:

  • 断开服务器连接
  • 清理已连接MCP的集合
  • 重置可用工具集合
    async def disconnect_mcp_server(self, server_id: str = "") -> None:
        """Disconnect from an MCP server and remove its tools."""
        # 断开与MCP服务器的连接
        await self.mcp_clients.disconnect(server_id)
        # 如果指定了服务器ID,则只移除该服务器
        if server_id:
            self.connected_servers.pop(server_id, None)
        else:
            # 如果没有指定服务器ID,则清空所有已连接服务器
            self.connected_servers.clear()

        # 重新构建可用工具集合,移除已断开服务器的工具
        base_tools = [
            tool
            for tool in self.available_tools.tools
            if not isinstance(tool, MCPClientTool)
        ]
        # 创建新的工具集合,只包含基础工具
        self.available_tools = ToolCollection(*base_tools)
        # 添加剩余的MCP客户端工具
        self.available_tools.add_tools(*self.mcp_clients.tools)

3.4、源码实现:cleanup

  该方法的主要目的是清理MCP相关资源:

  • 清理浏览器上下文
  • 断开所有MCP服务器连接
    async def cleanup(self):
        """Clean up Manus agent resources."""
        # 清理浏览器上下文助手
        if self.browser_context_helper:
            await self.browser_context_helper.cleanup_browser()
        # 只有在已初始化的情况下才断开所有MCP服务器连接
        if self._initialized:
            await self.disconnect_mcp_server()
            self._initialized = False

3.5、源码实现:think

  最后还对父类的think方法做了增强:

  • 检查初始化状态
  • 自动初始化MCP服务器

在这里插入图片描述

Logo

更多推荐