每天10分钟轻松掌握MCP 40天学习计划 - 第10天

各位各位,我又来啦!在第一部分我们配备了基础装备,现在该拿出放大镜,深入分析那些在网络中飞来飞去的JSON-RPC消息了。就像福尔摩斯分析案发现场一样,我们要从每个细节中找出线索,揭开MCP系统异常的真相!

MCP通信过程追踪与调试技能训练(第二部分)

五、JSON-RPC消息深度分析

JSON-RPC消息就像邮件一样,有固定的格式和规范。掌握这些格式,就像学会看懂邮件的信封、内容和签名,能够快速识别消息的来源、目的和内容。

JSON-RPC消息类型对照表

消息类型 必需字段 可选字段 用途说明 生活比喻
请求消息 jsonrpc, method, id params 客户端向服务端发起调用 寄信给朋友请求帮助
响应消息 jsonrpc, id resulterror 服务端回复请求结果 朋友回信告诉你结果
通知消息 jsonrpc, method params 单向通知,不需要响应 发广播通知大家
错误响应 jsonrpc, id, error - 告知请求处理失败 回信说"办不了"

JSON-RPC消息分析器实现

import json
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from enum import Enum

class MessageType(Enum):
    """消息类型枚举"""
    REQUEST = "request"
    RESPONSE = "response" 
    NOTIFICATION = "notification"
    ERROR = "error"

@dataclass
class MCPMessage:
    """MCP消息模型,就像邮件的详细信息"""
    message_type: MessageType
    timestamp: datetime
    raw_data: str
    parsed_data: Dict[str, Any]
    
    # 请求相关字段
    method: Optional[str] = None
    params: Optional[Any] = None
    message_id: Optional[Union[str, int]] = None
    
    # 响应相关字段
    result: Optional[Any] = None
    error: Optional[Dict[str, Any]] = None
    
    # 性能相关
    size_bytes: int = 0
    processing_time: Optional[float] = None

class JSONRPCAnalyzer:
    """JSON-RPC消息分析专家,专业的邮件检查员"""
    
    def __init__(self):
        self.message_history = []
        self.request_response_pairs = {}
        self.performance_stats = {
            "total_messages": 0,
            "total_requests": 0,
            "total_responses": 0,
            "total_errors": 0,
            "average_response_time": 0.0
        }
    
    def parse_message(self, raw_message: str, timestamp: datetime = None) -> MCPMessage:
        """解析JSON-RPC消息,就像拆信封看内容"""
        if timestamp is None:
            timestamp = datetime.now()
        
        try:
            # 解析JSON数据
            parsed = json.loads(raw_message)
            
            # 验证JSON-RPC格式
            if not self._is_valid_jsonrpc(parsed):
                raise ValueError("不是有效的JSON-RPC消息")
            
            # 确定消息类型
            msg_type = self._determine_message_type(parsed)
            
            # 创建消息对象
            message = MCPMessage(
                message_type=msg_type,
                timestamp=timestamp,
                raw_data=raw_message,
                parsed_data=parsed,
                size_bytes=len(raw_message.encode('utf-8'))
            )
            
            # 填充消息字段
            self._populate_message_fields(message, parsed)
            
            # 记录消息
            self.message_history.append(message)
            self._update_stats(message)
            
            # 如果是响应,尝试匹配对应的请求
            if msg_type == MessageType.RESPONSE and message.message_id:
                self._match_request_response(message)
            
            return message
            
        except Exception as e:
            # 创建错误消息对象
            return MCPMessage(
                message_type=MessageType.ERROR,
                timestamp=timestamp,
                raw_data=raw_message,
                parsed_data={"parse_error": str(e)},
                size_bytes=len(raw_message.encode('utf-8')),
                error={"code": -32700, "message": f"解析错误: {str(e)}"}
            )
    
    def _is_valid_jsonrpc(self, data: Dict[str, Any]) -> bool:
        """验证是否为有效的JSON-RPC消息"""
        return (
            isinstance(data, dict) and 
            data.get("jsonrpc") == "2.0"
        )
    
    def _determine_message_type(self, data: Dict[str, Any]) -> MessageType:
        """确定消息类型"""
        if "method" in data:
            # 有method字段
            if "id" in data:
                return MessageType.REQUEST  # 请求消息
            else:
                return MessageType.NOTIFICATION  # 通知消息
        elif "id" in data:
            # 有id但没有method
            if "error" in data:
                return MessageType.ERROR  # 错误响应
            else:
                return MessageType.RESPONSE  # 正常响应
        else:
            return MessageType.ERROR  # 格式错误
    
    def _populate_message_fields(self, message: MCPMessage, data: Dict[str, Any]) -> None:
        """填充消息字段"""
        message.method = data.get("method")
        message.params = data.get("params")
        message.message_id = data.get("id")
        message.result = data.get("result")
        message.error = data.get("error")
    
    def _update_stats(self, message: MCPMessage) -> None:
        """更新统计信息"""
        self.performance_stats["total_messages"] += 1
        
        if message.message_type == MessageType.REQUEST:
            self.performance_stats["total_requests"] += 1
        elif message.message_type == MessageType.RESPONSE:
            self.performance_stats["total_responses"] += 1
        elif message.message_type == MessageType.ERROR:
            self.performance_stats["total_errors"] += 1
    
    def _match_request_response(self, response: MCPMessage) -> None:
        """匹配请求和响应消息"""
        request_id = response.message_id
        
        # 查找对应的请求
        for msg in reversed(self.message_history):
            if (msg.message_type == MessageType.REQUEST and 
                msg.message_id == request_id):
                
                # 计算响应时间
                response_time = (response.timestamp - msg.timestamp).total_seconds() * 1000
                response.processing_time = response_time
                
                # 记录配对信息
                self.request_response_pairs[request_id] = {
                    "request": msg,
                    "response": response,
                    "response_time_ms": response_time
                }
                
                # 更新平均响应时间
                self._update_average_response_time()
                break
    
    def _update_average_response_time(self) -> None:
        """更新平均响应时间"""
        response_times = [
            pair["response_time_ms"] 
            for pair in self.request_response_pairs.values()
        ]
        
        if response_times:
            self.performance_stats["average_response_time"] = sum(response_times) / len(response_times)
    
    def analyze_message_pattern(self, time_window_minutes: int = 10) -> Dict[str, Any]:
        """分析消息模式,就像分析邮件的发送规律"""
        cutoff_time = datetime.now() - timedelta(minutes=time_window_minutes)
        recent_messages = [
            msg for msg in self.message_history 
            if msg.timestamp > cutoff_time
        ]
        
        if not recent_messages:
            return {"error": "时间窗口内没有消息"}
        
        # 分析方法调用频率
        method_counts = {}
        error_methods = []
        slow_requests = []
        
        for msg in recent_messages:
            if msg.method:
                method_counts[msg.method] = method_counts.get(msg.method, 0) + 1
                
                # 收集错误方法
                if msg.message_type == MessageType.ERROR:
                    error_methods.append(msg.method)
                
                # 收集慢请求(超过1秒的)
                if msg.processing_time and msg.processing_time > 1000:
                    slow_requests.append({
                        "method": msg.method,
                        "response_time_ms": msg.processing_time,
                        "timestamp": msg.timestamp.isoformat()
                    })
        
        return {
            "time_window_minutes": time_window_minutes,
            "total_messages": len(recent_messages),
            "method_frequency": method_counts,
            "error_methods": list(set(error_methods)),
            "slow_requests": slow_requests,
            "performance_stats": self.performance_stats
        }
    
    def get_message_details(self, message_id: Union[str, int]) -> Optional[Dict[str, Any]]:
        """获取特定消息的详细信息"""
        # 查找消息
        target_message = None
        for msg in self.message_history:
            if msg.message_id == message_id:
                target_message = msg
                break
        
        if not target_message:
            return None
        
        details = {
            "message_id": target_message.message_id,
            "type": target_message.message_type.value,
            "timestamp": target_message.timestamp.isoformat(),
            "method": target_message.method,
            "size_bytes": target_message.size_bytes,
            "raw_message": target_message.raw_data
        }
        
        # 添加性能信息
        if target_message.processing_time:
            details["processing_time_ms"] = target_message.processing_time
        
        # 添加请求-响应配对信息
        if message_id in self.request_response_pairs:
            pair = self.request_response_pairs[message_id]
            details["paired_message"] = {
                "response_time_ms": pair["response_time_ms"],
                "success": pair["response"].error is None
            }
        
        return details

# 模拟MCP通信分析演示
def demo_jsonrpc_analysis():
    """演示JSON-RPC消息分析"""
    
    analyzer = JSONRPCAnalyzer()
    
    # 模拟一系列MCP消息
    sample_messages = [
        # 1. 初始化请求
        {
            "message": '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}}, "id": 1}',
            "delay": 0
        },
        # 2. 初始化响应
        {
            "message": '{"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}}, "id": 1}',
            "delay": 0.05
        },
        # 3. 列出工具请求
        {
            "message": '{"jsonrpc": "2.0", "method": "tools/list", "id": 2}',
            "delay": 0.1
        },
        # 4. 工具列表响应
        {
            "message": '{"jsonrpc": "2.0", "result": {"tools": [{"name": "file_reader", "description": "读取文件内容"}]}, "id": 2}',
            "delay": 0.15
        },
        # 5. 工具调用请求
        {
            "message": '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "file_reader", "arguments": {"path": "/tmp/test.txt"}}, "id": 3}',
            "delay": 0.2
        },
        # 6. 工具调用错误响应(模拟文件不存在)
        {
            "message": '{"jsonrpc": "2.0", "error": {"code": -1, "message": "文件不存在"}, "id": 3}',
            "delay": 0.8  # 模拟较长的处理时间
        },
        # 7. 通知消息
        {
            "message": '{"jsonrpc": "2.0", "method": "notifications/progress", "params": {"progress": 100}}',
            "delay": 0.9
        }
    ]
    
    print("=== JSON-RPC消息分析演示 ===\n")
    
    start_time = datetime.now()
    
    # 按时间顺序解析消息
    for i, msg_data in enumerate(sample_messages):
        # 模拟时间延迟
        message_time = start_time + timedelta(seconds=msg_data["delay"])
        
        # 解析消息
        message = analyzer.parse_message(msg_data["message"], message_time)
        
        print(f"📨 消息 {i+1}: {message.message_type.value}")
        print(f"   时间: {message.timestamp.strftime('%H:%M:%S.%f')[:-3]}")
        print(f"   方法: {message.method or '无'}")
        print(f"   ID: {message.message_id or '无'}")
        if message.processing_time:
            print(f"   响应时间: {message.processing_time:.1f}ms")
        if message.error:
            print(f"   错误: {message.error['message']}")
        print()
    
    # 分析消息模式
    print("📊 消息模式分析:")
    pattern_analysis = analyzer.analyze_message_pattern(time_window_minutes=1)
    
    print(f"   总消息数: {pattern_analysis['total_messages']}")
    print(f"   方法调用频率: {pattern_analysis['method_frequency']}")
    print(f"   错误方法: {pattern_analysis['error_methods']}")
    print(f"   慢请求: {len(pattern_analysis['slow_requests'])} 个")
    print(f"   平均响应时间: {analyzer.performance_stats['average_response_time']:.1f}ms")
    
    return analyzer

if __name__ == "__main__":
    demo_jsonrpc_analysis()

六、异常情况调试技巧大全

在MCP的世界里,异常就像感冒一样常见。关键是要能快速诊断出"病因",然后对症下药。让我们来学习几种常见"疾病"的诊断和治疗方法。

常见异常类型诊断表

异常类型 主要症状 可能原因 诊断方法 治疗方案 预防措施
连接超时 请求发出后长时间无响应 网络延迟、服务器过载 检查响应时间统计 增加超时时间、优化网络 设置合理的超时阈值
连接中断 通信突然断开 网络不稳定、服务重启 查看连接状态日志 实现重连机制 添加心跳检测
参数错误 服务器返回参数相关错误 参数格式错误、缺少必需参数 验证请求参数格式 修正参数、添加验证 使用参数验证库
权限拒绝 返回权限不足错误 认证失败、权限配置错误 检查认证信息 更新认证、调整权限 定期检查权限配置
服务不可用 大量请求失败 服务器宕机、维护中 监控服务器状态 等待恢复或切换服务 实现服务健康检查

异常调试工具箱

import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum

class ExceptionType(Enum):
    """异常类型枚举"""
    TIMEOUT = "timeout"
    CONNECTION_LOST = "connection_lost"
    PARAMETER_ERROR = "parameter_error"
    PERMISSION_DENIED = "permission_denied"
    SERVICE_UNAVAILABLE = "service_unavailable"
    PARSE_ERROR = "parse_error"
    UNKNOWN = "unknown"

@dataclass
class DebugEvent:
    """调试事件记录"""
    timestamp: datetime
    event_type: ExceptionType
    message: str
    context: Dict[str, Any]
    severity: str = "info"  # info, warning, error, critical

class MCPDebugger:
    """MCP调试器,专业的系统医生"""
    
    def __init__(self):
        self.event_log = []
        self.connection_health = {
            "last_successful_request": None,
            "consecutive_failures": 0,
            "total_requests": 0,
            "success_rate": 100.0
        }
        self.performance_thresholds = {
            "response_time_warning_ms": 1000,
            "response_time_critical_ms": 5000,
            "max_consecutive_failures": 3
        }
    
    def diagnose_timeout(self, request_data: Dict[str, Any], 
                        timeout_duration: float) -> Dict[str, Any]:
        """诊断超时问题,就像医生检查心跳"""
        
        diagnosis = {
            "exception_type": ExceptionType.TIMEOUT.value,
            "severity": "error",
            "description": f"请求超时 ({timeout_duration:.1f}秒)",
            "possible_causes": [
                "网络延迟过高",
                "服务器处理能力不足", 
                "服务器无响应",
                "超时阈值设置过低"
            ],
            "recommendations": []
        }
        
        # 分析超时严重程度
        if timeout_duration > self.performance_thresholds["response_time_critical_ms"] / 1000:
            diagnosis["severity"] = "critical"
            diagnosis["recommendations"].extend([
                "立即检查服务器状态",
                "考虑增加服务器资源",
                "实施请求队列管理"
            ])
        else:
            diagnosis["recommendations"].extend([
                "适当增加超时时间",
                "检查网络连接质量",
                "优化请求参数"
            ])
        
        # 记录诊断事件
        self._log_debug_event(
            ExceptionType.TIMEOUT,
            diagnosis["description"],
            {"request": request_data, "timeout_duration": timeout_duration},
            diagnosis["severity"]
        )
        
        return diagnosis
    
    def diagnose_connection_lost(self, last_successful_time: Optional[datetime] = None) -> Dict[str, Any]:
        """诊断连接中断问题"""
        
        now = datetime.now()
        if last_successful_time:
            downtime_seconds = (now - last_successful_time).total_seconds()
        else:
            downtime_seconds = 0
        
        diagnosis = {
            "exception_type": ExceptionType.CONNECTION_LOST.value,
            "severity": "error",
            "description": "连接中断",
            "downtime_seconds": downtime_seconds,
            "possible_causes": [
                "网络连接不稳定",
                "服务器重启或维护",
                "防火墙或代理问题",
                "客户端网络配置错误"
            ],
            "recommendations": [
                "实施自动重连机制",
                "添加连接状态监控",
                "配置连接池管理",
                "检查网络环境"
            ]
        }
        
        # 根据中断时间调整严重程度
        if downtime_seconds > 300:  # 超过5分钟
            diagnosis["severity"] = "critical"
            diagnosis["recommendations"].insert(0, "紧急联系运维团队")
        
        self._log_debug_event(
            ExceptionType.CONNECTION_LOST,
            diagnosis["description"],
            {"downtime_seconds": downtime_seconds},
            diagnosis["severity"]
        )
        
        return diagnosis
    
    def diagnose_parameter_error(self, error_message: str, 
                                request_params: Dict[str, Any]) -> Dict[str, Any]:
        """诊断参数错误"""
        
        diagnosis = {
            "exception_type": ExceptionType.PARAMETER_ERROR.value,
            "severity": "warning",
            "description": f"参数错误: {error_message}",
            "error_message": error_message,
            "request_params": request_params,
            "possible_causes": [],
            "recommendations": []
        }
        
        # 分析错误类型
        error_lower = error_message.lower()
        
        if "missing" in error_lower or "required" in error_lower:
            diagnosis["possible_causes"].append("缺少必需参数")
            diagnosis["recommendations"].append("检查API文档,补充必需参数")
            
        elif "invalid" in error_lower or "format" in error_lower:
            diagnosis["possible_causes"].append("参数格式错误")
            diagnosis["recommendations"].append("验证参数格式是否符合规范")
            
        elif "type" in error_lower:
            diagnosis["possible_causes"].append("参数类型错误")
            diagnosis["recommendations"].append("检查参数类型是否正确")
        
        # 提供具体的参数检查建议
        diagnosis["recommendations"].extend([
            "使用参数验证库进行预检查",
            "添加参数格式化和类型转换",
            "记录正确的参数示例"
        ])
        
        self._log_debug_event(
            ExceptionType.PARAMETER_ERROR,
            diagnosis["description"],
            {"error_message": error_message, "params": request_params},
            diagnosis["severity"]
        )
        
        return diagnosis
    
    def diagnose_permission_denied(self, user_info: Dict[str, Any],
                                 resource: str, operation: str) -> Dict[str, Any]:
        """诊断权限拒绝问题"""
        
        diagnosis = {
            "exception_type": ExceptionType.PERMISSION_DENIED.value,
            "severity": "warning",
            "description": f"权限被拒绝: {operation} on {resource}",
            "user_info": user_info,
            "resource": resource,
            "operation": operation,
            "possible_causes": [
                "用户权限不足",
                "认证信息过期",
                "权限配置错误",
                "资源访问策略限制"
            ],
            "recommendations": [
                "检查用户权限配置",
                "更新认证令牌",
                "验证资源访问策略",
                "联系管理员调整权限"
            ]
        }
        
        # 检查是否是认证问题
        if not user_info.get("authenticated", True):
            diagnosis["severity"] = "error"
            diagnosis["recommendations"].insert(0, "重新进行身份认证")
        
        self._log_debug_event(
            ExceptionType.PERMISSION_DENIED,
            diagnosis["description"],
            {"user": user_info, "resource": resource, "operation": operation},
            diagnosis["severity"]
        )
        
        return diagnosis
    
    def _log_debug_event(self, event_type: ExceptionType, message: str,
                        context: Dict[str, Any], severity: str = "info") -> None:
        """记录调试事件"""
        event = DebugEvent(
            timestamp=datetime.now(),
            event_type=event_type,
            message=message,
            context=context,
            severity=severity
        )
        
        self.event_log.append(event)
        
        # 更新连接健康状态
        self._update_connection_health(event_type, severity)
    
    def _update_connection_health(self, event_type: ExceptionType, severity: str) -> None:
        """更新连接健康状态"""
        self.connection_health["total_requests"] += 1
        
        if severity in ["error", "critical"]:
            self.connection_health["consecutive_failures"] += 1
        else:
            self.connection_health["consecutive_failures"] = 0
            self.connection_health["last_successful_request"] = datetime.now()
        
        # 计算成功率
        total_errors = len([e for e in self.event_log if e.severity in ["error", "critical"]])
        self.connection_health["success_rate"] = max(0, 
            (1 - total_errors / max(1, len(self.event_log))) * 100)
    
    def get_health_report(self) -> Dict[str, Any]:
        """获取系统健康报告,就像体检报告"""
        
        # 统计最近的事件
        recent_events = [
            e for e in self.event_log 
            if e.timestamp > datetime.now() - timedelta(hours=1)
        ]
        
        event_summary = {}
        for event in recent_events:
            event_type = event.event_type.value
            event_summary[event_type] = event_summary.get(event_type, 0) + 1
        
        # 计算健康分数
        health_score = self._calculate_health_score()
        
        return {
            "health_score": health_score,
            "connection_status": self._get_connection_status(),
            "recent_events_summary": event_summary,
            "total_events": len(self.event_log),
            "success_rate": self.connection_health["success_rate"],
            "consecutive_failures": self.connection_health["consecutive_failures"],
            "recommendations": self._get_health_recommendations(health_score)
        }
    
    def _calculate_health_score(self) -> int:
        """计算健康分数 (0-100)"""
        base_score = 100
        
        # 根据成功率调整
        success_rate = self.connection_health["success_rate"]
        base_score = base_score * (success_rate / 100)
        
        # 根据连续失败次数调整
        consecutive_failures = self.connection_health["consecutive_failures"]
        if consecutive_failures > 0:
            base_score -= consecutive_failures * 10
        
        return max(0, int(base_score))
    
    def _get_connection_status(self) -> str:
        """获取连接状态"""
        failures = self.connection_health["consecutive_failures"]
        
        if failures == 0:
            return "健康"
        elif failures < 3:
            return "警告"
        else:
            return "异常"
    
    def _get_health_recommendations(self, health_score: int) -> List[str]:
        """根据健康分数提供建议"""
        if health_score >= 90:
            return ["系统运行良好,继续监控"]
        elif health_score >= 70:
            return [
                "关注近期的警告信息",
                "考虑优化网络连接",
                "定期检查服务器状态"
            ]
        elif health_score >= 50:
            return [
                "系统存在问题,需要立即关注",
                "检查网络和服务器配置",
                "实施更积极的监控策略"
            ]
        else:
            return [
                "系统健康状况严重,需要紧急处理",
                "联系技术支持团队",
                "考虑切换到备用服务"
            ]

# 异常调试综合演示
def demo_exception_debugging():
    """演示异常调试技巧"""
    
    debugger = MCPDebugger()
    
    print("=== MCP异常调试演示 ===\n")
    
    # 模拟各种异常情况
    print("🔍 模拟异常场景:")
    
    # 1. 超时问题
    print("\n1️⃣ 诊断超时问题:")
    timeout_diagnosis = debugger.diagnose_timeout(
        {"method": "tools/call", "params": {"name": "slow_tool"}},
        3.2
    )
    print(f"   诊断结果: {timeout_diagnosis['description']}")
    print(f"   严重程度: {timeout_diagnosis['severity']}")
    print(f"   建议: {timeout_diagnosis['recommendations'][0]}")
    
    # 2. 连接中断
    print("\n2️⃣ 诊断连接中断:")
    last_success = datetime.now() - timedelta(minutes=5)
    connection_diagnosis = debugger.diagnose_connection_lost(last_success)
    print(f"   诊断结果: {connection_diagnosis['description']}")
    print(f"   中断时长: {connection_diagnosis['downtime_seconds']:.0f}秒")
    print(f"   建议: {connection_diagnosis['recommendations'][0]}")
    
    # 3. 参数错误
    print("\n3️⃣ 诊断参数错误:")
    param_diagnosis = debugger.diagnose_parameter_error(
        "Missing required parameter 'file_path'",
        {"action": "read_file"}
    )
    print(f"   诊断结果: {param_diagnosis['description']}")
    print(f"   建议: {param_diagnosis['recommendations'][0]}")
    
    # 4. 权限拒绝
    print("\n4️⃣ 诊断权限问题:")
    permission_diagnosis = debugger.diagnose_permission_denied(
        {"user_id": "user_123", "role": "guest", "authenticated": True},
        "system_config",
        "write"
    )
    print(f"   诊断结果: {permission_diagnosis['description']}")
    print(f"   建议: {permission_diagnosis['recommendations'][0]}")
    
    # 生成健康报告
    print("\n🏥 系统健康报告:")
    health_report = debugger.get_health_report()
    print(f"   健康分数: {health_report['health_score']}/100")
    print(f"   连接状态: {health_report['connection_status']}")
    print(f"   成功率: {health_report['success_rate']:.1f}%")
    print(f"   建议: {health_report['recommendations'][0]}")
    
    return debugger

if __name__ == "__main__":
    demo_exception_debugging()

七、多工具组合调试策略

单一工具就像用一把螺丝刀修复整台电脑,力不从心。真正的调试高手会组合使用多种工具,就像医生会结合多种检查手段来诊断疾病一样。

调试工具组合策略图

在这里插入图片描述

综合调试平台实现

import subprocess
import platform
import psutil
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass

@dataclass 
class DiagnosticResult:
    """诊断结果"""
    tool_name: str
    status: str  # success, warning, error
    data: Dict[str, Any]
    timestamp: datetime
    recommendations: List[str]

class ComprehensiveDebugger:
    """综合调试平台,像医院的全科检查中心"""
    
    def __init__(self):
        self.diagnostic_results = []
        self.system_info = self._get_system_info()
    
    def _get_system_info(self) -> Dict[str, Any]:
        """获取系统基础信息"""
        return {
            "platform": platform.system(),
            "platform_version": platform.version(),
            "python_version": platform.python_version(),
            "cpu_count": psutil.cpu_count(),
            "memory_total_gb": round(psutil.virtual_memory().total / (1024**3), 2)
        }
    
    def check_claude_desktop_status(self) -> DiagnosticResult:
        """检查Claude Desktop状态"""
        try:
            # 检查Claude Desktop进程
            claude_processes = []
            for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info']):
                try:
                    if 'claude' in proc.info['name'].lower():
                        claude_processes.append({
                            'pid': proc.info['pid'],
                            'name': proc.info['name'],
                            'cpu_percent': proc.info['cpu_percent'],
                            'memory_mb': round(proc.info['memory_info'].rss / 1024 / 1024, 2)
                        })
                except (psutil.NoSuchProcess, psutil.AccessDenied):
                    continue
            
            if claude_processes:
                status = "success"
                recommendations = ["Claude Desktop正在运行"]
                if any(p['cpu_percent'] > 80 for p in claude_processes):
                    status = "warning"
                    recommendations = ["Claude Desktop CPU使用率过高,建议重启"]
            else:
                status = "error"
                recommendations = ["Claude Desktop未运行,请启动应用"]
            
            result = DiagnosticResult(
                tool_name="Claude Desktop状态",
                status=status,
                data={
                    "processes": claude_processes,
                    "process_count": len(claude_processes)
                },
                timestamp=datetime.now(),
                recommendations=recommendations
            )
            
        except Exception as e:
            result = DiagnosticResult(
                tool_name="Claude Desktop状态",
                status="error",
                data={"error": str(e)},
                timestamp=datetime.now(),
                recommendations=["无法检查Claude Desktop状态"]
            )
        
        self.diagnostic_results.append(result)
        return result
    
    def check_network_connectivity(self) -> DiagnosticResult:
        """检查网络连接性"""
        try:
            import socket
            import time
            
            # 测试DNS解析
            dns_test = self._test_dns_resolution()
            
            # 测试网络延迟
            latency_test = self._test_network_latency()
            
            # 综合评估
            if dns_test["success"] and latency_test["success"]:
                if latency_test["avg_latency_ms"] < 100:
                    status = "success"
                    recommendations = ["网络连接良好"]
                elif latency_test["avg_latency_ms"] < 500:
                    status = "warning" 
                    recommendations = ["网络延迟较高,可能影响MCP通信"]
                else:
                    status = "error"
                    recommendations = ["网络延迟过高,严重影响MCP通信"]
            else:
                status = "error"
                recommendations = ["网络连接存在问题,请检查网络配置"]
            
            result = DiagnosticResult(
                tool_name="网络连接性",
                status=status,
                data={
                    "dns_resolution": dns_test,
                    "latency_test": latency_test
                },
                timestamp=datetime.now(),
                recommendations=recommendations
            )
            
        except Exception as e:
            result = DiagnosticResult(
                tool_name="网络连接性",
                status="error",
                data={"error": str(e)},
                timestamp=datetime.now(),
                recommendations=["网络检查失败"]
            )
        
        self.diagnostic_results.append(result)
        return result
    
    def _test_dns_resolution(self) -> Dict[str, Any]:
        """测试DNS解析"""
        try:
            import socket
            import time
            
            test_domains = ["anthropic.com", "google.com", "github.com"]
            results = []
            
            for domain in test_domains:
                start_time = time.time()
                try:
                    socket.gethostbyname(domain)
                    resolution_time = (time.time() - start_time) * 1000
                    results.append({"domain": domain, "success": True, "time_ms": resolution_time})
                except socket.gaierror:
                    results.append({"domain": domain, "success": False, "time_ms": 0})
            
            success_count = sum(1 for r in results if r["success"])
            
            return {
                "success": success_count > 0,
                "success_rate": success_count / len(test_domains),
                "results": results
            }
        except Exception:
            return {"success": False, "error": "DNS测试失败"}
    
    def _test_network_latency(self) -> Dict[str, Any]:
        """测试网络延迟"""
        try:
            import subprocess
            import re
            
            if platform.system() == "Windows":
                cmd = ["ping", "-n", "4", "8.8.8.8"]
                pattern = r"Average = (\d+)ms"
            else:
                cmd = ["ping", "-c", "4", "8.8.8.8"]
                pattern = r"avg/.*?=.*?/([\d.]+)/"
            
            result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
            
            if result.returncode == 0:
                match = re.search(pattern, result.stdout)
                if match:
                    avg_latency = float(match.group(1))
                    return {
                        "success": True,
                        "avg_latency_ms": avg_latency,
                        "raw_output": result.stdout
                    }
            
            return {"success": False, "error": "无法解析ping结果"}
            
        except subprocess.TimeoutExpired:
            return {"success": False, "error": "ping超时"}
        except Exception as e:
            return {"success": False, "error": str(e)}
    
    def check_system_resources(self) -> DiagnosticResult:
        """检查系统资源使用情况"""
        try:
            # CPU使用率
            cpu_percent = psutil.cpu_percent(interval=1)
            
            # 内存使用情况
            memory = psutil.virtual_memory()
            
            # 磁盘使用情况
            disk = psutil.disk_usage('/')
            
            # 评估资源状况
            issues = []
            if cpu_percent > 80:
                issues.append(f"CPU使用率过高: {cpu_percent:.1f}%")
            if memory.percent > 85:
                issues.append(f"内存使用率过高: {memory.percent:.1f}%")
            if disk.percent > 90:
                issues.append(f"磁盘使用率过高: {disk.percent:.1f}%")
            
            if not issues:
                status = "success"
                recommendations = ["系统资源充足"]
            elif len(issues) == 1:
                status = "warning"
                recommendations = [f"注意: {issues[0]}"]
            else:
                status = "error"
                recommendations = ["系统资源紧张,建议优化"] + issues
            
            result = DiagnosticResult(
                tool_name="系统资源",
                status=status,
                data={
                    "cpu_percent": cpu_percent,
                    "memory": {
                        "total_gb": round(memory.total / (1024**3), 2),
                        "available_gb": round(memory.available / (1024**3), 2),
                        "percent": memory.percent
                    },
                    "disk": {
                        "total_gb": round(disk.total / (1024**3), 2),
                        "free_gb": round(disk.free / (1024**3), 2),
                        "percent": disk.percent
                    }
                },
                timestamp=datetime.now(),
                recommendations=recommendations
            )
            
        except Exception as e:
            result = DiagnosticResult(
                tool_name="系统资源",
                status="error",
                data={"error": str(e)},
                timestamp=datetime.now(),
                recommendations=["无法检查系统资源"]
            )
        
        self.diagnostic_results.append(result)
        return result
    
    def run_comprehensive_diagnosis(self) -> Dict[str, Any]:
        """运行全面诊断,就像全身体检"""
        
        print("🔍 开始全面系统诊断...\n")
        
        # 清空之前的结果
        self.diagnostic_results = []
        
        # 执行各项检查
        checks = [
            ("检查Claude Desktop状态", self.check_claude_desktop_status),
            ("检查网络连接性", self.check_network_connectivity),
            ("检查系统资源", self.check_system_resources)
        ]
        
        for check_name, check_func in checks:
            print(f"   {check_name}...")
            try:
                check_func()
                print(f"   ✅ {check_name}完成")
            except Exception as e:
                print(f"   ❌ {check_name}失败: {str(e)}")
        
        # 生成综合报告
        return self._generate_comprehensive_report()
    
    def _generate_comprehensive_report(self) -> Dict[str, Any]:
        """生成综合诊断报告"""
        
        # 统计各种状态
        status_counts = {"success": 0, "warning": 0, "error": 0}
        all_recommendations = []
        
        for result in self.diagnostic_results:
            status_counts[result.status] += 1
            all_recommendations.extend(result.recommendations)
        
        # 计算综合健康分数
        total_checks = len(self.diagnostic_results)
        if total_checks > 0:
            health_score = (
                (status_counts["success"] * 100 + 
                 status_counts["warning"] * 50 + 
                 status_counts["error"] * 0) / total_checks
            )
        else:
            health_score = 0
        
        # 确定整体状态
        if health_score >= 80:
            overall_status = "健康"
        elif health_score >= 60:
            overall_status = "良好"  
        elif health_score >= 40:
            overall_status = "警告"
        else:
            overall_status = "异常"
        
        return {
            "timestamp": datetime.now().isoformat(),
            "system_info": self.system_info,
            "overall_status": overall_status,
            "health_score": round(health_score, 1),
            "check_summary": {
                "total_checks": total_checks,
                "success_count": status_counts["success"],
                "warning_count": status_counts["warning"],
                "error_count": status_counts["error"]
            },
            "detailed_results": [
                {
                    "tool": result.tool_name,
                    "status": result.status,
                    "data": result.data,
                    "recommendations": result.recommendations
                }
                for result in self.diagnostic_results
            ],
            "priority_recommendations": list(set(all_recommendations))[:5]
        }

# 综合调试演示
def demo_comprehensive_debugging():
    """演示综合调试平台"""
    
    debugger = ComprehensiveDebugger()
    
    print("=== MCP综合调试平台演示 ===\n")
    
    # 运行全面诊断
    report = debugger.run_comprehensive_diagnosis()
    
    print(f"\n📊 诊断报告摘要:")
    print(f"   整体状态: {report['overall_status']}")
    print(f"   健康分数: {report['health_score']}/100")
    print(f"   检查项目: {report['check_summary']['total_checks']} 项")
    print(f"   成功: {report['check_summary']['success_count']} 项")
    print(f"   警告: {report['check_summary']['warning_count']} 项") 
    print(f"   错误: {report['check_summary']['error_count']} 项")
    
    print(f"\n🔧 优先建议:")
    for i, recommendation in enumerate(report['priority_recommendations'], 1):
        print(f"   {i}. {recommendation}")
    
    print(f"\n💻 系统信息:")
    sys_info = report['system_info']
    print(f"   操作系统: {sys_info['platform']}")
    print(f"   Python版本: {sys_info['python_version']}")
    print(f"   CPU核心数: {sys_info['cpu_count']}")
    print(f"   内存总量: {sys_info['memory_total_gb']}GB")
    
    return debugger

if __name__ == "__main__":
    demo_comprehensive_debugging()

看到这里,你是不是已经感觉自己从MCP调试小白变成了"系统诊断专家"?就像医生从学会看体温表到能做全套体检一样,现在的你已经具备了全方位的MCP调试能力。无论是JSON-RPC消息分析,还是异常情况处理,抑或是多工具组合使用,都已经是你的拿手好戏了!


欢迎大家关注同名公众号《凡人的工具箱》:关注就送学习大礼包

在这里插入图片描述

Logo

更多推荐