为什么需要多线程做健康检查?

假设你有 50 个服务节点要检查。

单线程:

50 个 × 每个 0.8 秒 = 40 秒

多线程(10 并发):

50 个 ÷ 10 线程 ≈ 4 秒

差了 10 倍。 健康检查本身就是 IO 密集型,多线程几乎零成本提速。


核心方案:ThreadPoolExecutor

Python 3.2+ 内置,几行代码搞定,推荐度 ⭐⭐⭐⭐⭐

import requests
import time
from concurrent.futures import ThreadPoolExecutor, as_completed
from dataclasses import dataclass
from typing import List

@dataclass
class NodeResult:
    name: str          # 节点名称
    url: str           # 完整URL
    status: str        # 状态码或错误信息
    elapsed: float     # 耗时(秒)
    alive: bool        # 是否存活

class HealthChecker:
    """多线程健康检查器"""
    
    def __init__(self, nodes: List[dict], health_path: str = "/health", timeout: int = 5):
        """
        nodes: [{"name": "节点A", "host": "api.example.com", "port": 8080}, ...]
        health_path: 健康检查接口路径
        timeout: 单次请求超时(秒)
        """
        self.nodes = nodes
        self.health_path = health_path
        self.timeout = timeout
        self.results: List[NodeResult] = []

    def _check_one(self, node: dict) -> NodeResult:
        """检查单个节点"""
        url = f"http://{node['host']}:{node['port']}{self.health_path}"
        t1 = time.time()
        
        try:
            resp = requests.get(url, timeout=self.timeout)
            elapsed = time.time() - t1
            return NodeResult(
                name=node['name'],
                url=url,
                status=str(resp.status_code),
                elapsed=elapsed,
                alive=resp.status_code == 200
            )
        except requests.exceptions.Timeout:
            return NodeResult(
                name=node['name'], url=url,
                status="Timeout", elapsed=time.time() - t1, alive=False
            )
        except requests.exceptions.ConnectionError:
            return NodeResult(
                name=node['name'], url=url,
                status="Down", elapsed=time.time() - t1, alive=False
            )
        except Exception as e:
            return NodeResult(
                name=node['name'], url=url,
                status=str(e), elapsed=time.time() - t1, alive=False
            )

    def run(self, max_workers: int = 10) -> List[NodeResult]:
        """并发检查所有节点"""
        self.results = []
        
        with ThreadPoolExecutor(max_workers=max_workers) as executor:
            futures = {executor.submit(self._check_one, node): node for node in self.nodes}
            
            for future in as_completed(futures):
                result = future.result()
                self.results.append(result)
                
                mark = "✅" if result.alive else "❌"
                print(f"{mark} {result.name:10s} | {result.elapsed:.3f}s | {result.status}")

        # 按耗时排序
        self.results.sort(key=lambda x: x.elapsed)
        return self.results

    def summary(self):
        """打印统计摘要"""
        alive = [r for r in self.results if r.alive]
        dead = [r for r in self.results if not r.alive]
        
        print("\n" + "="*50)
        print(f"📊 健康检查报告 | 总计 {len(self.nodes)} 节点 | 存活 {len(alive)} | 异常 {len(dead)}")
        print("="*50)
        
        if self.results:
            fastest = self.results[0]
            print(f"\n🏆 最快节点: {fastest.name} ({fastest.elapsed:.3f}s)")
        
        print("\n📋 完整排名:")
        for i, r in enumerate(self.results, 1):
            mark = "✅" if r.alive else "❌"
            print(f"  {i}. {mark} {r.name:10s} | {r.elapsed:.3f}s | {r.status}")


# ========== 使用示例(全是假数据)==========
if __name__ == "__main__":
    # 模拟 8 个服务节点
    NODES = [
        {"name": "用户服务-A",  "host": "user-svc-a.internal", "port": 8080},
        {"name": "用户服务-B",  "host": "user-svc-b.internal", "port": 8080},
        {"name": "订单服务",    "host": "order-svc.internal",   "port": 8081},
        {"name": "支付网关",    "host": "payment-gw.internal",  "port": 8082},
        {"name": "通知服务",    "host": "notify-svc.internal",   "port": 8083},
        {"name": "文件服务",    "host": "file-svc.internal",     "port": 8084},
        {"name": "日志服务",    "host": "log-svc.internal",      "port": 8085},
        {"name": "配置中心",    "host": "config-center.internal","port": 8086},
    ]

    checker = HealthChecker(nodes=NODES, health_path="/api/health", timeout=3)
    checker.run(max_workers=8)
    checker.summary()

输出示例

✅ 用户服务-A  | 0.089s | 200
❌ 订单服务    | 3.001s | Timeout
✅ 支付网关    | 0.112s | 200
✅ 通知服务    | 0.067s | 200
✅ 文件服务    | 0.145s | 200
✅ 日志服务    | 0.098s | 200
✅ 配置中心    | 0.134s | 200
✅ 用户服务-B  | 0.076s | 200

==================================================
📊 健康检查报告 | 总计 8 节点 | 存活 7 | 异常 1
==================================================

🏆 最快节点: 通知服务 (0.067s)

📋 完整排名:
  1. ✅ 通知服务    | 0.067s | 200
  2. ✅ 用户服务-B  | 0.076s | 200
  3. ✅ 用户服务-A  | 0.089s | 200
  4. ✅ 日志服务    | 0.098s | 200
  5. ✅ 支付网关    | 0.112s | 200
  6. ✅ 配置中心    | 0.134s | 200
  7. ✅ 文件服务    | 0.145s | 200
  8. ❌ 订单服务    | 3.001s | Timeout

关键配置说明

参数 推荐值 说明
max_workers 10~20 并发线程数,超过 50 收益递减
timeout 3~5 秒 单次请求超时,防止卡死
health_path /health/api/health 各服务自己定义的健康检查接口

常见坑

1. Session 不能跨线程共享

# ❌ 错误:多线程共享同一个 session
session = requests.Session()
def check(url):
    return session.get(url)  # 线程不安全

# ✅ 正确:每个请求独立创建连接
def check(url):
    return requests.get(url, timeout=5)  # 每个线程自己管理连接

2. max_workers 不是越大越好

workers = 100  # ❌ 大概率更慢,还可能触发对方限流
workers = 10   # ✅ 大部分场景够用

3. 忘了处理异常

不捕获异常的话,一个节点超时会导致整个检查任务中断。

try:
    resp = requests.get(url, timeout=5)
except Exception as e:
    return NodeResult(..., status=str(e), alive=False)  # ✅ 保证不中断

选型决策

节点数 < 20?
  → 单线程 for 循环就够了

节点数 20~200?
  → ThreadPoolExecutor(本方案)✅

节点数 > 200 或已在用异步框架?
  → asyncio + aiohttp

需要定时循环检查 + 告警?
  → 加上 schedule 库,每分钟跑一次

一句话总结

多线程健康检查的核心就三件事:控并发、设超时、收结果

ThreadPoolExecutor + as_completed 解决了 90% 的场景,剩下 10% 才需要上 asyncio。

更多推荐