每天10分钟轻松掌握MCP(适合小白):Day 10 - MCP通信过程追踪与调试技能训练(二)
每天10分钟轻松掌握MCP(适合小白):Day 10 - MCP通信过程追踪与调试技能训练(二)!如果文章对你有帮助,还请给个三连好评,感谢感谢!
·
每天10分钟轻松掌握MCP 40天学习计划 - 第10天
各位各位,我又来啦!在第一部分我们配备了基础装备,现在该拿出放大镜,深入分析那些在网络中飞来飞去的JSON-RPC消息了。就像福尔摩斯分析案发现场一样,我们要从每个细节中找出线索,揭开MCP系统异常的真相!
MCP通信过程追踪与调试技能训练(第二部分)
五、JSON-RPC消息深度分析
JSON-RPC消息就像邮件一样,有固定的格式和规范。掌握这些格式,就像学会看懂邮件的信封、内容和签名,能够快速识别消息的来源、目的和内容。
JSON-RPC消息类型对照表
消息类型 | 必需字段 | 可选字段 | 用途说明 | 生活比喻 |
---|---|---|---|---|
请求消息 | jsonrpc , method , id |
params |
客户端向服务端发起调用 | 寄信给朋友请求帮助 |
响应消息 | jsonrpc , id |
result 或 error |
服务端回复请求结果 | 朋友回信告诉你结果 |
通知消息 | jsonrpc , method |
params |
单向通知,不需要响应 | 发广播通知大家 |
错误响应 | jsonrpc , id , error |
- | 告知请求处理失败 | 回信说"办不了" |
JSON-RPC消息分析器实现
import json
import time
from datetime import datetime, timedelta
from typing import Dict, Any, List, Optional, Union
from dataclasses import dataclass
from enum import Enum
class MessageType(Enum):
"""消息类型枚举"""
REQUEST = "request"
RESPONSE = "response"
NOTIFICATION = "notification"
ERROR = "error"
@dataclass
class MCPMessage:
"""MCP消息模型,就像邮件的详细信息"""
message_type: MessageType
timestamp: datetime
raw_data: str
parsed_data: Dict[str, Any]
# 请求相关字段
method: Optional[str] = None
params: Optional[Any] = None
message_id: Optional[Union[str, int]] = None
# 响应相关字段
result: Optional[Any] = None
error: Optional[Dict[str, Any]] = None
# 性能相关
size_bytes: int = 0
processing_time: Optional[float] = None
class JSONRPCAnalyzer:
"""JSON-RPC消息分析专家,专业的邮件检查员"""
def __init__(self):
self.message_history = []
self.request_response_pairs = {}
self.performance_stats = {
"total_messages": 0,
"total_requests": 0,
"total_responses": 0,
"total_errors": 0,
"average_response_time": 0.0
}
def parse_message(self, raw_message: str, timestamp: datetime = None) -> MCPMessage:
"""解析JSON-RPC消息,就像拆信封看内容"""
if timestamp is None:
timestamp = datetime.now()
try:
# 解析JSON数据
parsed = json.loads(raw_message)
# 验证JSON-RPC格式
if not self._is_valid_jsonrpc(parsed):
raise ValueError("不是有效的JSON-RPC消息")
# 确定消息类型
msg_type = self._determine_message_type(parsed)
# 创建消息对象
message = MCPMessage(
message_type=msg_type,
timestamp=timestamp,
raw_data=raw_message,
parsed_data=parsed,
size_bytes=len(raw_message.encode('utf-8'))
)
# 填充消息字段
self._populate_message_fields(message, parsed)
# 记录消息
self.message_history.append(message)
self._update_stats(message)
# 如果是响应,尝试匹配对应的请求
if msg_type == MessageType.RESPONSE and message.message_id:
self._match_request_response(message)
return message
except Exception as e:
# 创建错误消息对象
return MCPMessage(
message_type=MessageType.ERROR,
timestamp=timestamp,
raw_data=raw_message,
parsed_data={"parse_error": str(e)},
size_bytes=len(raw_message.encode('utf-8')),
error={"code": -32700, "message": f"解析错误: {str(e)}"}
)
def _is_valid_jsonrpc(self, data: Dict[str, Any]) -> bool:
"""验证是否为有效的JSON-RPC消息"""
return (
isinstance(data, dict) and
data.get("jsonrpc") == "2.0"
)
def _determine_message_type(self, data: Dict[str, Any]) -> MessageType:
"""确定消息类型"""
if "method" in data:
# 有method字段
if "id" in data:
return MessageType.REQUEST # 请求消息
else:
return MessageType.NOTIFICATION # 通知消息
elif "id" in data:
# 有id但没有method
if "error" in data:
return MessageType.ERROR # 错误响应
else:
return MessageType.RESPONSE # 正常响应
else:
return MessageType.ERROR # 格式错误
def _populate_message_fields(self, message: MCPMessage, data: Dict[str, Any]) -> None:
"""填充消息字段"""
message.method = data.get("method")
message.params = data.get("params")
message.message_id = data.get("id")
message.result = data.get("result")
message.error = data.get("error")
def _update_stats(self, message: MCPMessage) -> None:
"""更新统计信息"""
self.performance_stats["total_messages"] += 1
if message.message_type == MessageType.REQUEST:
self.performance_stats["total_requests"] += 1
elif message.message_type == MessageType.RESPONSE:
self.performance_stats["total_responses"] += 1
elif message.message_type == MessageType.ERROR:
self.performance_stats["total_errors"] += 1
def _match_request_response(self, response: MCPMessage) -> None:
"""匹配请求和响应消息"""
request_id = response.message_id
# 查找对应的请求
for msg in reversed(self.message_history):
if (msg.message_type == MessageType.REQUEST and
msg.message_id == request_id):
# 计算响应时间
response_time = (response.timestamp - msg.timestamp).total_seconds() * 1000
response.processing_time = response_time
# 记录配对信息
self.request_response_pairs[request_id] = {
"request": msg,
"response": response,
"response_time_ms": response_time
}
# 更新平均响应时间
self._update_average_response_time()
break
def _update_average_response_time(self) -> None:
"""更新平均响应时间"""
response_times = [
pair["response_time_ms"]
for pair in self.request_response_pairs.values()
]
if response_times:
self.performance_stats["average_response_time"] = sum(response_times) / len(response_times)
def analyze_message_pattern(self, time_window_minutes: int = 10) -> Dict[str, Any]:
"""分析消息模式,就像分析邮件的发送规律"""
cutoff_time = datetime.now() - timedelta(minutes=time_window_minutes)
recent_messages = [
msg for msg in self.message_history
if msg.timestamp > cutoff_time
]
if not recent_messages:
return {"error": "时间窗口内没有消息"}
# 分析方法调用频率
method_counts = {}
error_methods = []
slow_requests = []
for msg in recent_messages:
if msg.method:
method_counts[msg.method] = method_counts.get(msg.method, 0) + 1
# 收集错误方法
if msg.message_type == MessageType.ERROR:
error_methods.append(msg.method)
# 收集慢请求(超过1秒的)
if msg.processing_time and msg.processing_time > 1000:
slow_requests.append({
"method": msg.method,
"response_time_ms": msg.processing_time,
"timestamp": msg.timestamp.isoformat()
})
return {
"time_window_minutes": time_window_minutes,
"total_messages": len(recent_messages),
"method_frequency": method_counts,
"error_methods": list(set(error_methods)),
"slow_requests": slow_requests,
"performance_stats": self.performance_stats
}
def get_message_details(self, message_id: Union[str, int]) -> Optional[Dict[str, Any]]:
"""获取特定消息的详细信息"""
# 查找消息
target_message = None
for msg in self.message_history:
if msg.message_id == message_id:
target_message = msg
break
if not target_message:
return None
details = {
"message_id": target_message.message_id,
"type": target_message.message_type.value,
"timestamp": target_message.timestamp.isoformat(),
"method": target_message.method,
"size_bytes": target_message.size_bytes,
"raw_message": target_message.raw_data
}
# 添加性能信息
if target_message.processing_time:
details["processing_time_ms"] = target_message.processing_time
# 添加请求-响应配对信息
if message_id in self.request_response_pairs:
pair = self.request_response_pairs[message_id]
details["paired_message"] = {
"response_time_ms": pair["response_time_ms"],
"success": pair["response"].error is None
}
return details
# 模拟MCP通信分析演示
def demo_jsonrpc_analysis():
"""演示JSON-RPC消息分析"""
analyzer = JSONRPCAnalyzer()
# 模拟一系列MCP消息
sample_messages = [
# 1. 初始化请求
{
"message": '{"jsonrpc": "2.0", "method": "initialize", "params": {"protocolVersion": "2024-11-05", "capabilities": {}}, "id": 1}',
"delay": 0
},
# 2. 初始化响应
{
"message": '{"jsonrpc": "2.0", "result": {"protocolVersion": "2024-11-05", "capabilities": {"tools": {}}}, "id": 1}',
"delay": 0.05
},
# 3. 列出工具请求
{
"message": '{"jsonrpc": "2.0", "method": "tools/list", "id": 2}',
"delay": 0.1
},
# 4. 工具列表响应
{
"message": '{"jsonrpc": "2.0", "result": {"tools": [{"name": "file_reader", "description": "读取文件内容"}]}, "id": 2}',
"delay": 0.15
},
# 5. 工具调用请求
{
"message": '{"jsonrpc": "2.0", "method": "tools/call", "params": {"name": "file_reader", "arguments": {"path": "/tmp/test.txt"}}, "id": 3}',
"delay": 0.2
},
# 6. 工具调用错误响应(模拟文件不存在)
{
"message": '{"jsonrpc": "2.0", "error": {"code": -1, "message": "文件不存在"}, "id": 3}',
"delay": 0.8 # 模拟较长的处理时间
},
# 7. 通知消息
{
"message": '{"jsonrpc": "2.0", "method": "notifications/progress", "params": {"progress": 100}}',
"delay": 0.9
}
]
print("=== JSON-RPC消息分析演示 ===\n")
start_time = datetime.now()
# 按时间顺序解析消息
for i, msg_data in enumerate(sample_messages):
# 模拟时间延迟
message_time = start_time + timedelta(seconds=msg_data["delay"])
# 解析消息
message = analyzer.parse_message(msg_data["message"], message_time)
print(f"📨 消息 {i+1}: {message.message_type.value}")
print(f" 时间: {message.timestamp.strftime('%H:%M:%S.%f')[:-3]}")
print(f" 方法: {message.method or '无'}")
print(f" ID: {message.message_id or '无'}")
if message.processing_time:
print(f" 响应时间: {message.processing_time:.1f}ms")
if message.error:
print(f" 错误: {message.error['message']}")
print()
# 分析消息模式
print("📊 消息模式分析:")
pattern_analysis = analyzer.analyze_message_pattern(time_window_minutes=1)
print(f" 总消息数: {pattern_analysis['total_messages']}")
print(f" 方法调用频率: {pattern_analysis['method_frequency']}")
print(f" 错误方法: {pattern_analysis['error_methods']}")
print(f" 慢请求: {len(pattern_analysis['slow_requests'])} 个")
print(f" 平均响应时间: {analyzer.performance_stats['average_response_time']:.1f}ms")
return analyzer
if __name__ == "__main__":
demo_jsonrpc_analysis()
六、异常情况调试技巧大全
在MCP的世界里,异常就像感冒一样常见。关键是要能快速诊断出"病因",然后对症下药。让我们来学习几种常见"疾病"的诊断和治疗方法。
常见异常类型诊断表
异常类型 | 主要症状 | 可能原因 | 诊断方法 | 治疗方案 | 预防措施 |
---|---|---|---|---|---|
连接超时 | 请求发出后长时间无响应 | 网络延迟、服务器过载 | 检查响应时间统计 | 增加超时时间、优化网络 | 设置合理的超时阈值 |
连接中断 | 通信突然断开 | 网络不稳定、服务重启 | 查看连接状态日志 | 实现重连机制 | 添加心跳检测 |
参数错误 | 服务器返回参数相关错误 | 参数格式错误、缺少必需参数 | 验证请求参数格式 | 修正参数、添加验证 | 使用参数验证库 |
权限拒绝 | 返回权限不足错误 | 认证失败、权限配置错误 | 检查认证信息 | 更新认证、调整权限 | 定期检查权限配置 |
服务不可用 | 大量请求失败 | 服务器宕机、维护中 | 监控服务器状态 | 等待恢复或切换服务 | 实现服务健康检查 |
异常调试工具箱
import asyncio
import aiohttp
import json
from datetime import datetime, timedelta
from typing import Dict, List, Any, Optional, Callable
from dataclasses import dataclass
from enum import Enum
class ExceptionType(Enum):
"""异常类型枚举"""
TIMEOUT = "timeout"
CONNECTION_LOST = "connection_lost"
PARAMETER_ERROR = "parameter_error"
PERMISSION_DENIED = "permission_denied"
SERVICE_UNAVAILABLE = "service_unavailable"
PARSE_ERROR = "parse_error"
UNKNOWN = "unknown"
@dataclass
class DebugEvent:
"""调试事件记录"""
timestamp: datetime
event_type: ExceptionType
message: str
context: Dict[str, Any]
severity: str = "info" # info, warning, error, critical
class MCPDebugger:
"""MCP调试器,专业的系统医生"""
def __init__(self):
self.event_log = []
self.connection_health = {
"last_successful_request": None,
"consecutive_failures": 0,
"total_requests": 0,
"success_rate": 100.0
}
self.performance_thresholds = {
"response_time_warning_ms": 1000,
"response_time_critical_ms": 5000,
"max_consecutive_failures": 3
}
def diagnose_timeout(self, request_data: Dict[str, Any],
timeout_duration: float) -> Dict[str, Any]:
"""诊断超时问题,就像医生检查心跳"""
diagnosis = {
"exception_type": ExceptionType.TIMEOUT.value,
"severity": "error",
"description": f"请求超时 ({timeout_duration:.1f}秒)",
"possible_causes": [
"网络延迟过高",
"服务器处理能力不足",
"服务器无响应",
"超时阈值设置过低"
],
"recommendations": []
}
# 分析超时严重程度
if timeout_duration > self.performance_thresholds["response_time_critical_ms"] / 1000:
diagnosis["severity"] = "critical"
diagnosis["recommendations"].extend([
"立即检查服务器状态",
"考虑增加服务器资源",
"实施请求队列管理"
])
else:
diagnosis["recommendations"].extend([
"适当增加超时时间",
"检查网络连接质量",
"优化请求参数"
])
# 记录诊断事件
self._log_debug_event(
ExceptionType.TIMEOUT,
diagnosis["description"],
{"request": request_data, "timeout_duration": timeout_duration},
diagnosis["severity"]
)
return diagnosis
def diagnose_connection_lost(self, last_successful_time: Optional[datetime] = None) -> Dict[str, Any]:
"""诊断连接中断问题"""
now = datetime.now()
if last_successful_time:
downtime_seconds = (now - last_successful_time).total_seconds()
else:
downtime_seconds = 0
diagnosis = {
"exception_type": ExceptionType.CONNECTION_LOST.value,
"severity": "error",
"description": "连接中断",
"downtime_seconds": downtime_seconds,
"possible_causes": [
"网络连接不稳定",
"服务器重启或维护",
"防火墙或代理问题",
"客户端网络配置错误"
],
"recommendations": [
"实施自动重连机制",
"添加连接状态监控",
"配置连接池管理",
"检查网络环境"
]
}
# 根据中断时间调整严重程度
if downtime_seconds > 300: # 超过5分钟
diagnosis["severity"] = "critical"
diagnosis["recommendations"].insert(0, "紧急联系运维团队")
self._log_debug_event(
ExceptionType.CONNECTION_LOST,
diagnosis["description"],
{"downtime_seconds": downtime_seconds},
diagnosis["severity"]
)
return diagnosis
def diagnose_parameter_error(self, error_message: str,
request_params: Dict[str, Any]) -> Dict[str, Any]:
"""诊断参数错误"""
diagnosis = {
"exception_type": ExceptionType.PARAMETER_ERROR.value,
"severity": "warning",
"description": f"参数错误: {error_message}",
"error_message": error_message,
"request_params": request_params,
"possible_causes": [],
"recommendations": []
}
# 分析错误类型
error_lower = error_message.lower()
if "missing" in error_lower or "required" in error_lower:
diagnosis["possible_causes"].append("缺少必需参数")
diagnosis["recommendations"].append("检查API文档,补充必需参数")
elif "invalid" in error_lower or "format" in error_lower:
diagnosis["possible_causes"].append("参数格式错误")
diagnosis["recommendations"].append("验证参数格式是否符合规范")
elif "type" in error_lower:
diagnosis["possible_causes"].append("参数类型错误")
diagnosis["recommendations"].append("检查参数类型是否正确")
# 提供具体的参数检查建议
diagnosis["recommendations"].extend([
"使用参数验证库进行预检查",
"添加参数格式化和类型转换",
"记录正确的参数示例"
])
self._log_debug_event(
ExceptionType.PARAMETER_ERROR,
diagnosis["description"],
{"error_message": error_message, "params": request_params},
diagnosis["severity"]
)
return diagnosis
def diagnose_permission_denied(self, user_info: Dict[str, Any],
resource: str, operation: str) -> Dict[str, Any]:
"""诊断权限拒绝问题"""
diagnosis = {
"exception_type": ExceptionType.PERMISSION_DENIED.value,
"severity": "warning",
"description": f"权限被拒绝: {operation} on {resource}",
"user_info": user_info,
"resource": resource,
"operation": operation,
"possible_causes": [
"用户权限不足",
"认证信息过期",
"权限配置错误",
"资源访问策略限制"
],
"recommendations": [
"检查用户权限配置",
"更新认证令牌",
"验证资源访问策略",
"联系管理员调整权限"
]
}
# 检查是否是认证问题
if not user_info.get("authenticated", True):
diagnosis["severity"] = "error"
diagnosis["recommendations"].insert(0, "重新进行身份认证")
self._log_debug_event(
ExceptionType.PERMISSION_DENIED,
diagnosis["description"],
{"user": user_info, "resource": resource, "operation": operation},
diagnosis["severity"]
)
return diagnosis
def _log_debug_event(self, event_type: ExceptionType, message: str,
context: Dict[str, Any], severity: str = "info") -> None:
"""记录调试事件"""
event = DebugEvent(
timestamp=datetime.now(),
event_type=event_type,
message=message,
context=context,
severity=severity
)
self.event_log.append(event)
# 更新连接健康状态
self._update_connection_health(event_type, severity)
def _update_connection_health(self, event_type: ExceptionType, severity: str) -> None:
"""更新连接健康状态"""
self.connection_health["total_requests"] += 1
if severity in ["error", "critical"]:
self.connection_health["consecutive_failures"] += 1
else:
self.connection_health["consecutive_failures"] = 0
self.connection_health["last_successful_request"] = datetime.now()
# 计算成功率
total_errors = len([e for e in self.event_log if e.severity in ["error", "critical"]])
self.connection_health["success_rate"] = max(0,
(1 - total_errors / max(1, len(self.event_log))) * 100)
def get_health_report(self) -> Dict[str, Any]:
"""获取系统健康报告,就像体检报告"""
# 统计最近的事件
recent_events = [
e for e in self.event_log
if e.timestamp > datetime.now() - timedelta(hours=1)
]
event_summary = {}
for event in recent_events:
event_type = event.event_type.value
event_summary[event_type] = event_summary.get(event_type, 0) + 1
# 计算健康分数
health_score = self._calculate_health_score()
return {
"health_score": health_score,
"connection_status": self._get_connection_status(),
"recent_events_summary": event_summary,
"total_events": len(self.event_log),
"success_rate": self.connection_health["success_rate"],
"consecutive_failures": self.connection_health["consecutive_failures"],
"recommendations": self._get_health_recommendations(health_score)
}
def _calculate_health_score(self) -> int:
"""计算健康分数 (0-100)"""
base_score = 100
# 根据成功率调整
success_rate = self.connection_health["success_rate"]
base_score = base_score * (success_rate / 100)
# 根据连续失败次数调整
consecutive_failures = self.connection_health["consecutive_failures"]
if consecutive_failures > 0:
base_score -= consecutive_failures * 10
return max(0, int(base_score))
def _get_connection_status(self) -> str:
"""获取连接状态"""
failures = self.connection_health["consecutive_failures"]
if failures == 0:
return "健康"
elif failures < 3:
return "警告"
else:
return "异常"
def _get_health_recommendations(self, health_score: int) -> List[str]:
"""根据健康分数提供建议"""
if health_score >= 90:
return ["系统运行良好,继续监控"]
elif health_score >= 70:
return [
"关注近期的警告信息",
"考虑优化网络连接",
"定期检查服务器状态"
]
elif health_score >= 50:
return [
"系统存在问题,需要立即关注",
"检查网络和服务器配置",
"实施更积极的监控策略"
]
else:
return [
"系统健康状况严重,需要紧急处理",
"联系技术支持团队",
"考虑切换到备用服务"
]
# 异常调试综合演示
def demo_exception_debugging():
"""演示异常调试技巧"""
debugger = MCPDebugger()
print("=== MCP异常调试演示 ===\n")
# 模拟各种异常情况
print("🔍 模拟异常场景:")
# 1. 超时问题
print("\n1️⃣ 诊断超时问题:")
timeout_diagnosis = debugger.diagnose_timeout(
{"method": "tools/call", "params": {"name": "slow_tool"}},
3.2
)
print(f" 诊断结果: {timeout_diagnosis['description']}")
print(f" 严重程度: {timeout_diagnosis['severity']}")
print(f" 建议: {timeout_diagnosis['recommendations'][0]}")
# 2. 连接中断
print("\n2️⃣ 诊断连接中断:")
last_success = datetime.now() - timedelta(minutes=5)
connection_diagnosis = debugger.diagnose_connection_lost(last_success)
print(f" 诊断结果: {connection_diagnosis['description']}")
print(f" 中断时长: {connection_diagnosis['downtime_seconds']:.0f}秒")
print(f" 建议: {connection_diagnosis['recommendations'][0]}")
# 3. 参数错误
print("\n3️⃣ 诊断参数错误:")
param_diagnosis = debugger.diagnose_parameter_error(
"Missing required parameter 'file_path'",
{"action": "read_file"}
)
print(f" 诊断结果: {param_diagnosis['description']}")
print(f" 建议: {param_diagnosis['recommendations'][0]}")
# 4. 权限拒绝
print("\n4️⃣ 诊断权限问题:")
permission_diagnosis = debugger.diagnose_permission_denied(
{"user_id": "user_123", "role": "guest", "authenticated": True},
"system_config",
"write"
)
print(f" 诊断结果: {permission_diagnosis['description']}")
print(f" 建议: {permission_diagnosis['recommendations'][0]}")
# 生成健康报告
print("\n🏥 系统健康报告:")
health_report = debugger.get_health_report()
print(f" 健康分数: {health_report['health_score']}/100")
print(f" 连接状态: {health_report['connection_status']}")
print(f" 成功率: {health_report['success_rate']:.1f}%")
print(f" 建议: {health_report['recommendations'][0]}")
return debugger
if __name__ == "__main__":
demo_exception_debugging()
七、多工具组合调试策略
单一工具就像用一把螺丝刀修复整台电脑,力不从心。真正的调试高手会组合使用多种工具,就像医生会结合多种检查手段来诊断疾病一样。
调试工具组合策略图
综合调试平台实现
import subprocess
import platform
import psutil
from datetime import datetime
from typing import Dict, List, Any, Optional
from dataclasses import dataclass
@dataclass
class DiagnosticResult:
"""诊断结果"""
tool_name: str
status: str # success, warning, error
data: Dict[str, Any]
timestamp: datetime
recommendations: List[str]
class ComprehensiveDebugger:
"""综合调试平台,像医院的全科检查中心"""
def __init__(self):
self.diagnostic_results = []
self.system_info = self._get_system_info()
def _get_system_info(self) -> Dict[str, Any]:
"""获取系统基础信息"""
return {
"platform": platform.system(),
"platform_version": platform.version(),
"python_version": platform.python_version(),
"cpu_count": psutil.cpu_count(),
"memory_total_gb": round(psutil.virtual_memory().total / (1024**3), 2)
}
def check_claude_desktop_status(self) -> DiagnosticResult:
"""检查Claude Desktop状态"""
try:
# 检查Claude Desktop进程
claude_processes = []
for proc in psutil.process_iter(['pid', 'name', 'cpu_percent', 'memory_info']):
try:
if 'claude' in proc.info['name'].lower():
claude_processes.append({
'pid': proc.info['pid'],
'name': proc.info['name'],
'cpu_percent': proc.info['cpu_percent'],
'memory_mb': round(proc.info['memory_info'].rss / 1024 / 1024, 2)
})
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue
if claude_processes:
status = "success"
recommendations = ["Claude Desktop正在运行"]
if any(p['cpu_percent'] > 80 for p in claude_processes):
status = "warning"
recommendations = ["Claude Desktop CPU使用率过高,建议重启"]
else:
status = "error"
recommendations = ["Claude Desktop未运行,请启动应用"]
result = DiagnosticResult(
tool_name="Claude Desktop状态",
status=status,
data={
"processes": claude_processes,
"process_count": len(claude_processes)
},
timestamp=datetime.now(),
recommendations=recommendations
)
except Exception as e:
result = DiagnosticResult(
tool_name="Claude Desktop状态",
status="error",
data={"error": str(e)},
timestamp=datetime.now(),
recommendations=["无法检查Claude Desktop状态"]
)
self.diagnostic_results.append(result)
return result
def check_network_connectivity(self) -> DiagnosticResult:
"""检查网络连接性"""
try:
import socket
import time
# 测试DNS解析
dns_test = self._test_dns_resolution()
# 测试网络延迟
latency_test = self._test_network_latency()
# 综合评估
if dns_test["success"] and latency_test["success"]:
if latency_test["avg_latency_ms"] < 100:
status = "success"
recommendations = ["网络连接良好"]
elif latency_test["avg_latency_ms"] < 500:
status = "warning"
recommendations = ["网络延迟较高,可能影响MCP通信"]
else:
status = "error"
recommendations = ["网络延迟过高,严重影响MCP通信"]
else:
status = "error"
recommendations = ["网络连接存在问题,请检查网络配置"]
result = DiagnosticResult(
tool_name="网络连接性",
status=status,
data={
"dns_resolution": dns_test,
"latency_test": latency_test
},
timestamp=datetime.now(),
recommendations=recommendations
)
except Exception as e:
result = DiagnosticResult(
tool_name="网络连接性",
status="error",
data={"error": str(e)},
timestamp=datetime.now(),
recommendations=["网络检查失败"]
)
self.diagnostic_results.append(result)
return result
def _test_dns_resolution(self) -> Dict[str, Any]:
"""测试DNS解析"""
try:
import socket
import time
test_domains = ["anthropic.com", "google.com", "github.com"]
results = []
for domain in test_domains:
start_time = time.time()
try:
socket.gethostbyname(domain)
resolution_time = (time.time() - start_time) * 1000
results.append({"domain": domain, "success": True, "time_ms": resolution_time})
except socket.gaierror:
results.append({"domain": domain, "success": False, "time_ms": 0})
success_count = sum(1 for r in results if r["success"])
return {
"success": success_count > 0,
"success_rate": success_count / len(test_domains),
"results": results
}
except Exception:
return {"success": False, "error": "DNS测试失败"}
def _test_network_latency(self) -> Dict[str, Any]:
"""测试网络延迟"""
try:
import subprocess
import re
if platform.system() == "Windows":
cmd = ["ping", "-n", "4", "8.8.8.8"]
pattern = r"Average = (\d+)ms"
else:
cmd = ["ping", "-c", "4", "8.8.8.8"]
pattern = r"avg/.*?=.*?/([\d.]+)/"
result = subprocess.run(cmd, capture_output=True, text=True, timeout=10)
if result.returncode == 0:
match = re.search(pattern, result.stdout)
if match:
avg_latency = float(match.group(1))
return {
"success": True,
"avg_latency_ms": avg_latency,
"raw_output": result.stdout
}
return {"success": False, "error": "无法解析ping结果"}
except subprocess.TimeoutExpired:
return {"success": False, "error": "ping超时"}
except Exception as e:
return {"success": False, "error": str(e)}
def check_system_resources(self) -> DiagnosticResult:
"""检查系统资源使用情况"""
try:
# CPU使用率
cpu_percent = psutil.cpu_percent(interval=1)
# 内存使用情况
memory = psutil.virtual_memory()
# 磁盘使用情况
disk = psutil.disk_usage('/')
# 评估资源状况
issues = []
if cpu_percent > 80:
issues.append(f"CPU使用率过高: {cpu_percent:.1f}%")
if memory.percent > 85:
issues.append(f"内存使用率过高: {memory.percent:.1f}%")
if disk.percent > 90:
issues.append(f"磁盘使用率过高: {disk.percent:.1f}%")
if not issues:
status = "success"
recommendations = ["系统资源充足"]
elif len(issues) == 1:
status = "warning"
recommendations = [f"注意: {issues[0]}"]
else:
status = "error"
recommendations = ["系统资源紧张,建议优化"] + issues
result = DiagnosticResult(
tool_name="系统资源",
status=status,
data={
"cpu_percent": cpu_percent,
"memory": {
"total_gb": round(memory.total / (1024**3), 2),
"available_gb": round(memory.available / (1024**3), 2),
"percent": memory.percent
},
"disk": {
"total_gb": round(disk.total / (1024**3), 2),
"free_gb": round(disk.free / (1024**3), 2),
"percent": disk.percent
}
},
timestamp=datetime.now(),
recommendations=recommendations
)
except Exception as e:
result = DiagnosticResult(
tool_name="系统资源",
status="error",
data={"error": str(e)},
timestamp=datetime.now(),
recommendations=["无法检查系统资源"]
)
self.diagnostic_results.append(result)
return result
def run_comprehensive_diagnosis(self) -> Dict[str, Any]:
"""运行全面诊断,就像全身体检"""
print("🔍 开始全面系统诊断...\n")
# 清空之前的结果
self.diagnostic_results = []
# 执行各项检查
checks = [
("检查Claude Desktop状态", self.check_claude_desktop_status),
("检查网络连接性", self.check_network_connectivity),
("检查系统资源", self.check_system_resources)
]
for check_name, check_func in checks:
print(f" {check_name}...")
try:
check_func()
print(f" ✅ {check_name}完成")
except Exception as e:
print(f" ❌ {check_name}失败: {str(e)}")
# 生成综合报告
return self._generate_comprehensive_report()
def _generate_comprehensive_report(self) -> Dict[str, Any]:
"""生成综合诊断报告"""
# 统计各种状态
status_counts = {"success": 0, "warning": 0, "error": 0}
all_recommendations = []
for result in self.diagnostic_results:
status_counts[result.status] += 1
all_recommendations.extend(result.recommendations)
# 计算综合健康分数
total_checks = len(self.diagnostic_results)
if total_checks > 0:
health_score = (
(status_counts["success"] * 100 +
status_counts["warning"] * 50 +
status_counts["error"] * 0) / total_checks
)
else:
health_score = 0
# 确定整体状态
if health_score >= 80:
overall_status = "健康"
elif health_score >= 60:
overall_status = "良好"
elif health_score >= 40:
overall_status = "警告"
else:
overall_status = "异常"
return {
"timestamp": datetime.now().isoformat(),
"system_info": self.system_info,
"overall_status": overall_status,
"health_score": round(health_score, 1),
"check_summary": {
"total_checks": total_checks,
"success_count": status_counts["success"],
"warning_count": status_counts["warning"],
"error_count": status_counts["error"]
},
"detailed_results": [
{
"tool": result.tool_name,
"status": result.status,
"data": result.data,
"recommendations": result.recommendations
}
for result in self.diagnostic_results
],
"priority_recommendations": list(set(all_recommendations))[:5]
}
# 综合调试演示
def demo_comprehensive_debugging():
"""演示综合调试平台"""
debugger = ComprehensiveDebugger()
print("=== MCP综合调试平台演示 ===\n")
# 运行全面诊断
report = debugger.run_comprehensive_diagnosis()
print(f"\n📊 诊断报告摘要:")
print(f" 整体状态: {report['overall_status']}")
print(f" 健康分数: {report['health_score']}/100")
print(f" 检查项目: {report['check_summary']['total_checks']} 项")
print(f" 成功: {report['check_summary']['success_count']} 项")
print(f" 警告: {report['check_summary']['warning_count']} 项")
print(f" 错误: {report['check_summary']['error_count']} 项")
print(f"\n🔧 优先建议:")
for i, recommendation in enumerate(report['priority_recommendations'], 1):
print(f" {i}. {recommendation}")
print(f"\n💻 系统信息:")
sys_info = report['system_info']
print(f" 操作系统: {sys_info['platform']}")
print(f" Python版本: {sys_info['python_version']}")
print(f" CPU核心数: {sys_info['cpu_count']}")
print(f" 内存总量: {sys_info['memory_total_gb']}GB")
return debugger
if __name__ == "__main__":
demo_comprehensive_debugging()
看到这里,你是不是已经感觉自己从MCP调试小白变成了"系统诊断专家"?就像医生从学会看体温表到能做全套体检一样,现在的你已经具备了全方位的MCP调试能力。无论是JSON-RPC消息分析,还是异常情况处理,抑或是多工具组合使用,都已经是你的拿手好戏了!
欢迎大家关注同名公众号《凡人的工具箱》:关注就送学习大礼包
更多推荐
所有评论(0)