AIGC入门,手搓大模型客户端与MCP交互第2集

在上一篇文章中,我们初步实现了将大语言模型(LLM)与 Model Context Protocol(MCP)服务进行整合,使模型能够根据用户查询调用相应的工具(如获取当前时间、列出时区等)。然而,实践中我们发现,模型输出的 JSON 结构经常不规范,导致解析失败,严重影响系统可靠性。

本文在此基础上,提出并实现了一套更加鲁棒的参数提取机制,显著提升了工具调用的成功率。

一、背景与问题

最初的实现中,我们假设大模型(如 Phi-3)总是返回格式完美的 JSON,例如:

{
  "action": "call_tool",
  "tool_name": "get_current_time",
  "arguments": {"timezone": "Asia/Shanghai"}
}

但实际上,模型常常返回包含多余文本、注释、甚至格式错误的响应,例如:

我应该调用工具来获取时间。代码如下:
{
  "action": "call_tool",
  "tool_name": "get_current_time", // 这是工具名
  "arguments": {
    "timezone": "New York"
  }
}

这种非纯 JSON 响应会导致 json.loads() 解析失败,进而导致整个工具调用流程中断。

  • 有多余的中文内容

  • json字符串中间有注释内容

二、改进方案:鲁棒的 JSON 提取与验证

我们引入了两个关键函数来增强系统的容错能力:

extract_json_from_response(response_text)

该函数用于从模型响应中提取 JSON 对象,具备以下特性:

  • 移除注释:使用正则表达式清除单行(//)和多行(/* */)注释;

  • 多层解析策略:

    • 首先尝试直接解析整个响应;

    • 若失败,则使用递归正则表达式匹配最内层的完整 JSON 对象;

  • 支持嵌套结构:使用 regex 库的 (?R) 递归模式匹配嵌套的 JSON。

validate_tool_call(tool_call_data)

对提取出的 JSON 进行结构验证,确保包含必要的字段且类型正确:

  • 必须包含 “action”: “call_tool”;

  • 必须包含字符串类型的 tool_name;

  • arguments 为可选,但若存在则必须为字典类型。

三、完整代码解析

以下是改进后的核心代码段(省略部分重复内容):

def extract_json_from_response(response_text):
    # 清除注释
    cleaned_text = regex.sub(r'//.*?$', '', response_text, flags=regex.MULTILINE)
    cleaned_text = regex.sub(r'/\*.*?\*/', '', cleaned_text, flags=regex.DOTALL)
    
    try:
        return json.loads(cleaned_text)
    except json.JSONDecodeError:
        # 使用递归正则匹配嵌套JSON
        pattern = r'\{(?:[^{}]|(?R))*\}'
        json_match = regex.search(pattern, cleaned_text)
        if json_match:
            try:
                return json.loads(json_match.group())
            except:
                pass
    return None

def validate_tool_call(tool_call_data):
    if not isinstance(tool_call_data, dict):
        return False
    if tool_call_data.get("action") != "call_tool":
        return False
    if not isinstance(tool_call_data.get("tool_name"), str):
        return False
    if "arguments" in tool_call_data and not isinstance(tool_call_data["arguments"], dict):
        return False
    return True

完整的改进后的测试脚本如下

import asyncio
import sys
import traceback
import json
import regex

import ollama
from mcp import ClientSession, StdioServerParameters
from mcp.client.stdio import stdio_client


class MCPTimeClient:
    def __init__(self):
        self.server_params = StdioServerParameters(
            command=sys.executable,
            args=["time_server.py"],
            env=None
        )
        self.session = None
        self.tools_available = []

    async def __aenter__(self):
        """进入异步上下文管理器"""
        self._stdio_client = stdio_client(self.server_params)
        self._stdio_client_context = self._stdio_client.__aenter__()
        read, write = await self._stdio_client_context
        
        self.session = ClientSession(read, write)
        await self.session.__aenter__()
        
        # 初始化连接
        await self.session.initialize()

        # 获取可用工具
        tools_response = await self.session.list_tools()
        self.tools_available = [tool.name for tool in tools_response.tools]
        print(f"已连接到时间服务器,可用工具: {self.tools_available}")
        
        return self

    async def __aexit__(self, exc_type, exc_val, exc_tb):
        """退出异步上下文管理器"""
        if self.session:
            await self.session.__aexit__(exc_type, exc_val, exc_tb)
        await self._stdio_client.__aexit__(exc_type, exc_val, exc_tb)

    async def call_tool(self, tool_name, arguments):
        """调用指定的 MCP 工具"""
        if not self.session:
            return "错误: 未连接到时间服务器"

        if tool_name not in self.tools_available:
            return f"错误: 工具 '{tool_name}' 不可用"

        try:
            result = await self.session.call_tool(tool_name, arguments=arguments)
            # return result.content[0].text
            return result
        except Exception as e:
            # 打印错误堆栈
            print(f"错误堆栈:")
            traceback.print_exc()
            return f"调用工具时出错: {e}"


def extract_json_from_response(response_text):
    """
    从大模型响应中提取JSON内容,提供更强的容错能力
    
    参数:
        response_text (str): 大模型的原始响应文本
        
    返回:
        dict: 解析出的JSON对象,如果解析失败则返回None
    """
    if not response_text:
        return None

    # 移除单行注释 (//...)
    cleaned_text = regex.sub(r'//.*?$', '', response_text, flags=regex.MULTILINE)

    # 移除多行注释 (/*...*/)
    cleaned_text = regex.sub(r'/\*.*?\*/', '', cleaned_text, flags=regex.DOTALL)

    try:
        # 方法1: 直接尝试解析整个响应
        return json.loads(cleaned_text)
    except json.JSONDecodeError:
        # 如果解析失败,尝试提取第一个完整的 JSON 对象
        # json_match = regex.search(r'\{.*?\}', cleaned_text)
        pattern = r'\{(?:[^{}]|(?R))*\}'
        json_match = regex.search(pattern, cleaned_text)
        if json_match:
            try:
                print("正则匹配到字符串:   ", json_match.group())
                return json.loads(json_match.group())
            except json.JSONDecodeError:
                pass
    

    # 所有方法都失败
    return None


def validate_tool_call(tool_call_data):
    """
    验证工具调用数据的有效性
    
    参数:
        tool_call_data (dict): 解析出的工具调用数据
        
    返回:
        bool: 数据是否有效
    """
    if not isinstance(tool_call_data, dict):
        return False
        
    if tool_call_data.get("action") != "call_tool":
        return False
        
    if "tool_name" not in tool_call_data:
        return False
        
    # 工具名称必须是字符串
    if not isinstance(tool_call_data["tool_name"], str):
        return False
        
    # 参数字段可选,但如果存在必须是字典类型
    if "arguments" in tool_call_data and not isinstance(tool_call_data["arguments"], dict):
        return False
        
    return True


async def ask_llm_with_mcp(user_query):
    """
    使用大模型分析用户查询,并决定是否需要调用 MCP 服务

    参数:
        user_query (str): 用户查询

    返回:
        str: 最终响应
    """
    # 初始化 MCP 客户端
    async with MCPTimeClient() as mcp_client:
        # 构建系统提示,告诉大模型可用的工具和调用方式
        system_prompt = f"""
        你是一个AI助手,可以回答用户问题并决定是否需要调用时间服务。

        你可以使用的工具:
        - get_current_time: 获取指定时区的当前时间,参数: timezone (时区名称)
        - list_common_timezones: 获取常见时区列表,无参数

        调用格式:
        如果需要调用工具,请以以下JSON格式回复:
        {{
            "action": "call_tool",
            "tool_name": "工具名称",
            "arguments": {{参数键: 参数值}}
        }}

        如果不需要调用工具,请直接回复答案。

        当前可用工具: {mcp_client.tools_available}
        """

        # 第一次询问大模型
        print(f"大模型提示词: {system_prompt}")
        print(f"用户查询: {user_query}")
        print("询问大模型是否需要调用工具...")

        response = ollama.chat(
            model="phi3:mini",
            messages=[
                {"role": "system", "content": system_prompt},
                {"role": "user", "content": user_query}
            ]
        )

        model_response = response['message']['content']
        print(f"大模型初始响应: {model_response}")

        # 尝试解析大模型的响应,看是否要调用工具
        try:
            # 使用改进的解析方法
            tool_call = extract_json_from_response(model_response)
            
            if tool_call and validate_tool_call(tool_call):
                tool_name = tool_call.get("tool_name")
                arguments = tool_call.get("arguments", {})

                print(f"大模型决定调用工具: {tool_name}, 参数: {arguments}")

                # 调用MCP工具
                tool_result = await mcp_client.call_tool(tool_name, arguments)
                print(f"工具调用结果: {tool_result}")
                return tool_result

        except Exception as e:
            print(f"解析大模型响应时出错: {e}")
            # 打印异常堆栈以便调试
            traceback.print_exc()
            # 如果解析失败,直接返回大模型的原始响应
            pass

        # 如果不需要调用工具或解析失败,直接返回大模型的响应
        return model_response


async def main():
    """主函数,处理多个示例查询"""
    examples = [
        "现在几点了?",
        "纽约现在是什么时间?",
        "给我列出一些常见的时区",
        "Invalid/Timezone 现在的时间是多少?",
        "讲一个关于时间旅行的故事"
    ]

    for query in examples:
        print("\n" + "=" * 60)
        print(f"处理查询: {query}")

        response = await ask_llm_with_mcp(query)

        print("\n最终响应:")
        print(response)
        print("=" * 60)


if __name__ == "__main__":
    # 运行主函数
    asyncio.run(main())

四、测试与效果

我们使用以下示例查询进行测试:

examples = [
    "现在几点了?",
    "纽约现在是什么时间?",
    "给我列出一些常见的时区",
    "Invalid/Timezone 现在的时间是多少?",
    "讲一个关于时间旅行的故事"
]

改进后,系统能够:

  • 正确解析带注释的 JSON;

  • 从非结构化文本中提取 JSON 对象;

  • 在无法提取时降级返回模型原始响应,避免中断;

  • 对无效工具调用进行验证和过滤。

五、总结与展望

本文通过引入更强的 JSON 提取和验证机制,显著提升了大模型与 MCP 服务交互的可靠性。下一步可能的优化包括:

  • 支持多工具调用(multi-tool calling);

  • 引入上下文记忆,支持多轮对话中的工具调用;

  • 整合更多类型的 MCP 服务(如数据库、API 等)。

通过不断优化解析策略和扩展工具集,我们可以构建更强大、更可靠的 AI 代理系统,更好地服务于实际应用场景。

Logo

更多推荐