Python 多线程接口健康检查:生产级实践指南
·
为什么需要多线程做健康检查?
假设你有 50 个服务节点要检查。
单线程:
50 个 × 每个 0.8 秒 = 40 秒
多线程(10 并发):
50 个 ÷ 10 线程 ≈ 4 秒
差了 10 倍。 健康检查本身就是 IO 密集型,多线程几乎零成本提速。
核心方案:ThreadPoolExecutor
Python 3.2+ 内置,几行代码搞定,推荐度 ⭐⭐⭐⭐⭐。
import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List
@dataclass
class NodeResult:
name: str # 节点名称
url: str # 完整URL
status: str # 状态码或错误信息
elapsed: float # 耗时(秒)
alive: bool # 是否存活
class HealthChecker:
"""多线程健康检查器"""
def __init__(self, nodes: List[dict], health_path: str = "/health", timeout: int = 5):
"""
nodes: [{"name": "节点A", "host": "api.example.com", "port": 8080}, ...]
health_path: 健康检查接口路径
timeout: 单次请求超时(秒)
"""
self.nodes = nodes
self.health_path = health_path
self.timeout = timeout
self.results: List[NodeResult] = []
def _check_one(self, node: dict) -> NodeResult:
"""检查单个节点"""
url = f"http://{node['host']}:{node['port']}{self.health_path}"
t1 = time.time()
try:
resp = requests.get(url, timeout=self.timeout)
elapsed = time.time() - t1
return NodeResult(
name=node['name'],
url=url,
status=str(resp.status_code),
elapsed=elapsed,
alive=resp.status_code == 200
)
except requests.exceptions.Timeout:
return NodeResult(
name=node['name'], url=url,
status="Timeout", elapsed=time.time() - t1, alive=False
)
except requests.exceptions.ConnectionError:
return NodeResult(
name=node['name'], url=url,
status="Down", elapsed=time.time() - t1, alive=False
)
except Exception as e:
return NodeResult(
name=node['name'], url=url,
status=str(e), elapsed=time.time() - t1, alive=False
)
def run(self, max_workers: int = 10) -> List[NodeResult]:
"""并发检查所有节点"""
self.results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
futures = {executor.submit(self._check_one, node): node for node in self.nodes}
for future in as_completed(futures):
result = future.result()
self.results.append(result)
mark = "✅" if result.alive else "❌"
print(f"{mark} {result.name:10s} | {result.elapsed:.3f}s | {result.status}")
# 按耗时排序
self.results.sort(key=lambda x: x.elapsed)
return self.results
def summary(self):
"""打印统计摘要"""
alive = [r for r in self.results if r.alive]
dead = [r for r in self.results if not r.alive]
print("\n" + "="*50)
print(f"📊 健康检查报告 | 总计 {len(self.nodes)} 节点 | 存活 {len(alive)} | 异常 {len(dead)}")
print("="*50)
if self.results:
fastest = self.results[0]
print(f"\n🏆 最快节点: {fastest.name} ({fastest.elapsed:.3f}s)")
print("\n📋 完整排名:")
for i, r in enumerate(self.results, 1):
mark = "✅" if r.alive else "❌"
print(f" {i}. {mark} {r.name:10s} | {r.elapsed:.3f}s | {r.status}")
# ========== 使用示例(全是假数据)==========
if __name__ == "__main__":
# 模拟 8 个服务节点
NODES = [
{"name": "用户服务-A", "host": "user-svc-a.internal", "port": 8080},
{"name": "用户服务-B", "host": "user-svc-b.internal", "port": 8080},
{"name": "订单服务", "host": "order-svc.internal", "port": 8081},
{"name": "支付网关", "host": "payment-gw.internal", "port": 8082},
{"name": "通知服务", "host": "notify-svc.internal", "port": 8083},
{"name": "文件服务", "host": "file-svc.internal", "port": 8084},
{"name": "日志服务", "host": "log-svc.internal", "port": 8085},
{"name": "配置中心", "host": "config-center.internal","port": 8086},
]
checker = HealthChecker(nodes=NODES, health_path="/api/health", timeout=3)
checker.run(max_workers=8)
checker.summary()
输出示例:
✅ 用户服务-A | 0.089s | 200
❌ 订单服务 | 3.001s | Timeout
✅ 支付网关 | 0.112s | 200
✅ 通知服务 | 0.067s | 200
✅ 文件服务 | 0.145s | 200
✅ 日志服务 | 0.098s | 200
✅ 配置中心 | 0.134s | 200
✅ 用户服务-B | 0.076s | 200
==================================================
📊 健康检查报告 | 总计 8 节点 | 存活 7 | 异常 1
==================================================
🏆 最快节点: 通知服务 (0.067s)
📋 完整排名:
1. ✅ 通知服务 | 0.067s | 200
2. ✅ 用户服务-B | 0.076s | 200
3. ✅ 用户服务-A | 0.089s | 200
4. ✅ 日志服务 | 0.098s | 200
5. ✅ 支付网关 | 0.112s | 200
6. ✅ 配置中心 | 0.134s | 200
7. ✅ 文件服务 | 0.145s | 200
8. ❌ 订单服务 | 3.001s | Timeout
关键配置说明
| 参数 | 推荐值 | 说明 |
|---|---|---|
max_workers |
10~20 | 并发线程数,超过 50 收益递减 |
timeout |
3~5 秒 | 单次请求超时,防止卡死 |
health_path |
/health 或 /api/health |
各服务自己定义的健康检查接口 |
常见坑
1. Session 不能跨线程共享
# ❌ 错误:多线程共享同一个 session
session = requests.Session()
def check(url):
return session.get(url) # 线程不安全
# ✅ 正确:每个请求独立创建连接
def check(url):
return requests.get(url, timeout=5) # 每个线程自己管理连接
2. max_workers 不是越大越好
workers = 100 # ❌ 大概率更慢,还可能触发对方限流
workers = 10 # ✅ 大部分场景够用
3. 忘了处理异常
不捕获异常的话,一个节点超时会导致整个检查任务中断。
try:
resp = requests.get(url, timeout=5)
except Exception as e:
return NodeResult(..., status=str(e), alive=False) # ✅ 保证不中断
选型决策
节点数 < 20?
→ 单线程 for 循环就够了
节点数 20~200?
→ ThreadPoolExecutor(本方案)✅
节点数 > 200 或已在用异步框架?
→ asyncio + aiohttp
需要定时循环检查 + 告警?
→ 加上 schedule 库,每分钟跑一次
一句话总结
多线程健康检查的核心就三件事:控并发、设超时、收结果。
ThreadPoolExecutor + as_completed 解决了 90% 的场景,剩下 10% 才需要上 asyncio。
更多推荐
所有评论(0)