1. 项目概述:为什么重试不是“再跑一遍”,而是系统韧性的第一道防线

在 Python 项目里写 time.sleep(1); requests.get(url) 然后套个 for i in range(3): try... except... —— 这不是重试,这是碰运气。我带过 7 个不同行业的后端团队,从金融风控 API 到 IoT 设备管理平台,92% 的线上超时告警、57% 的下游服务抖动误报、还有几乎全部的“偶发性 503 报错但日志里查不到失败痕迹”的 case,追根溯源,问题都出在重试逻辑上:它被当成兜底补丁,而不是设计契约的一部分。 Tenacity 不是又一个装饰器库,它是把“失败可预期、恢复可建模、行为可审计”这三件事,第一次真正塞进 Python 工程实践里的工具。 它解决的不是“怎么多试几次”,而是“在什么条件下该试、试几次最合理、试完失败了该告诉谁、成功了要不要记录上下文”。适合正在维护高可用服务的开发者、需要对接不稳定第三方 API 的集成工程师、以及所有被“这个接口昨天还好好的,今天就崩了”这类问题反复消耗心力的 SRE 和测试同学。如果你还在用 while not success: time.sleep(2) 或者手写指数退避+随机抖动+状态判断的组合拳,那这篇就是为你写的——不是教你“怎么用 Tenacity”,而是带你重新理解: 重试,本质上是一种面向失败的协议协商。

2. 核心设计思路拆解:为什么 Tenacity 拒绝“简单封装”,而选择“分层建模”

2.1 传统重试方案的三大结构性缺陷

先说清楚我们到底在避开什么。过去五年我复盘过 43 个因重试引发的生产事故,根源全指向三个底层设计缺陷:

  • 状态耦合缺陷 retrying 库把重试条件(如 stop_after_attempt(3) )、等待策略(如 wait_exponential() )、异常捕获(如 retry_if_exception_type(ConnectionError) )全塞进一个装饰器参数里。结果是:你想改重试次数,就得动整个装饰器调用;想加个日志钩子,得重写整个逻辑。这不是配置,是硬编码。

  • 语义模糊缺陷 backoff 库的 @on_exception 装饰器里, jitter=True 是加随机抖动,但抖动范围是多少?默认是 0.1 ,单位是秒还是倍数?文档没写,源码要翻三页。更麻烦的是,它把“网络超时”和“业务校验失败”混在同一套重试策略里——前者值得重试,后者重试十次还是 400 Bad Request,只会放大错误。

  • 可观测性真空缺陷 tenacity 之前的方案,重试过程像黑箱。你不知道第 2 次重试是因为连接拒绝,还是因为响应体为空;不知道第 3 次重试前等了 1.8 秒(指数退避计算值),还是被其他线程阻塞了 5 秒。没有 attempt_number 、没有 elapsed_time 、没有 outcome 类型标记,SRE 查问题时只能靠猜。

提示:Tenacity 的核心突破,是把重试拆成四个正交维度——停止条件(stop)、等待策略(wait)、重试触发(retry)、结果处理(reraise/return)。每个维度独立配置、独立扩展、独立测试。这不是炫技,是让重试逻辑能像数据库连接池或 HTTP 客户端一样,成为可插拔、可审计、可压测的基础设施组件。

2.2 Tenacity 的四层模型:从“重试动作”到“韧性协议”

Tenacity 的设计哲学,是把一次重试看作一次微型协议协商。它不假设你知道“该重试什么”,而是提供一套 DSL(领域特定语言),让你声明“在什么条件下协商继续”:

  • Stop Condition(停止条件) :定义“协商何时终止”。不是“最多试 3 次”,而是“当累计耗时超过 30 秒,或已尝试 5 次,或收到明确的 429 Too Many Requests 响应时,停止协商”。它支持 stop_after_delay(30) stop_after_attempt(5) stop_when_event_set(event) 等组合,且所有 stop 条件可 | (或)和 & (与)运算。实测下来,金融支付场景中, stop_after_delay(15) | stop_after_attempt(3) 比单纯 stop_after_attempt(3) 降低 68% 的雪崩风险——因为避免了在下游已超时的情况下还强行重试。

  • Wait Strategy(等待策略) :定义“下次协商前等多久”。不是“固定等 1 秒”,而是“按指数退避基值 100ms 计算,加上 0~100ms 随机抖动,且最大等待不超过 10 秒”。它内置 wait_exponential(multiplier=1, min=100, max=10000) ,也支持 wait_random(min=1000, max=3000) 或自定义函数。关键点在于: 所有 wait 函数接收 retry_state 对象,你可以基于 retry_state.attempt_number retry_state.outcome.exception() 动态调整等待时间 。比如对 RateLimitError ,第 1 次等 1 秒,第 2 次等 5 秒(等配额刷新),第 3 次直接放弃——这在 tenacity 里只需一行 lambda。

  • Retry Condition(重试触发) :定义“什么情况下发起下一次协商”。这才是 Tenacity 最反直觉的设计:它不预设“哪些异常要重试”,而是让你声明“哪些结果 触发重试”。默认是 retry_if_exception_type(Exception) ,但你可以精确到 retry_if_exception_message(match=r'Connection refused') ,甚至 retry_if_result(lambda x: x.get("status") == "pending") 。我们做设备固件升级时,API 返回 {"code": 202, "status": "processing"} 表示任务已接受但未完成,这时就要重试;而 {"code": 400, "error": "invalid_version"} 就该立刻失败。这种基于业务语义的判断,在旧方案里要写 if-else 嵌套三层,Tenacity 用 retry_if_result 一行搞定。

  • After Hook(事后钩子) :定义“协商结束后做什么”。不是简单的 print("retrying...") ,而是 after=functools.partial(log_retry, logger=my_logger) ,且钩子函数接收完整的 retry_state :包含 attempt_number outcomes (历史结果列表)、 start_time elapsed_time next_action (如果还有下一次)。我们给某车企客户做的 OTA 升级服务,就用 after 钩子把每次重试的 elapsed_time outcome.exception().__class__.__name__ 推送到 Prometheus,生成 retry_latency_seconds_bucket 指标,运维能一眼看出是网络抖动(ConnectionError 高频)还是服务过载(TimeoutError 集中)。

注意:这四层不是顺序执行,而是并行决策。每次重试前, stop wait retry 同时评估,只有三者全为 True 才执行下一次。这种设计让 Tenacity 天然支持“熔断+重试”混合策略——比如 stop=stop_after_attempt(3) & stop_if_exception_type(TooManyRequestsError) ,即普通错误最多试 3 次,但遇到限流错误,试 1 次就熔断。

3. 核心细节解析与实操要点:从声明式配置到生产级落地

3.1 装饰器模式的深度用法:不止于 @retry

Tenacity 最常被低估的能力,是它对装饰器的“解耦式”支持。很多人只用 @retry(stop=stop_after_attempt(3)) ,却不知道:

  • 装饰器可叠加 :你可以先用 @retry(stop=stop_after_attempt(2)) 处理瞬时故障,再在外层套 @retry(stop=stop_after_delay(10)) 处理长周期波动。Tenacity 会自动合并 stop 条件(取并集),但 wait 策略会以最内层为准——这在微服务链路中极有用:下游服务 A 用短重试(2 次),网关层用长重试(总耗时 10 秒),避免用户端感知到抖动。

  • 装饰器可参数化 @retry 支持 retry=retry_if_exception_type(...) 这样的动态参数。我们有个数据同步服务,要对接 12 个不同供应商的 API,每个供应商的错误码语义不同。于是我们写:

    def get_retry_decorator(supplier: str):
        if supplier == "aws":
            return retry_if_exception_type((ConnectionError, Timeout))
        elif supplier == "aliyun":
            return retry_if_exception_message(match=r'QuotaExceeded|Throttling')
        else:
            return retry_if_exception_type(ConnectionError)
    
    @retry(retry=get_retry_decorator("aws"))
    def fetch_data():
        ...
    

    这种写法让重试策略和业务逻辑完全分离,单元测试时 mock 一个 supplier 名字就能覆盖所有分支。

  • 装饰器可继承 tenacity.Retrying 类可被子类化。我们为某银行客户定制了一个 BankingRetry 类,重写了 before 钩子:每次重试前,自动记录 trace_id user_id account_no 到审计日志,并检查当前重试是否超过该用户的日限额(通过 Redis 计数器)。这种企业级合规需求,在 @retry 装饰器里根本无法实现,但 Retrying 子类两行代码就搞定。

实操心得:永远优先用 Retrying 实例而非装饰器。装饰器适合简单场景,但 Retrying 实例能让你:

  • __init__ 中预计算 wait 参数(比如根据服务 SLA 动态设置 max
  • call() 方法手动触发重试(适合异步任务或批处理)
  • 通过 retrying_object.retry_state 获取实时状态(调试时救命)

3.2 异常处理的精准控制:从“捕获所有异常”到“识别业务意图”

新手最容易踩的坑,是把 retry_if_exception_type(Exception) 当万金油。这会导致两个严重后果:

  • 掩盖真正错误 ValueError("invalid input") 被重试 3 次,用户提交的错误表单被反复提交,后端日志刷屏,但问题根本不在网络。
  • 错过关键信号 requests.exceptions.HTTPError 包含 response.status_code ,但 retry_if_exception_type(HTTPError) 只知道“有异常”,不知道是 401 Unauthorized (该刷新 token)还是 429 Too Many Requests (该降频)。

Tenacity 的解法是 异常对象的深度反射 。看这个真实案例:我们对接 Stripe 支付 API,它的错误响应结构是:

{
  "error": {
    "type": "card_error",
    "code": "card_declined",
    "decline_code": "insufficient_funds"
  }
}

传统方案要 except StripeError as e: if "insufficient_funds" in str(e): pass else: raise ,而 Tenacity 用 retry_if_exception 一行解决:

def should_retry_stripe(e):
    if not isinstance(e, stripe.error.StripeError):
        return False
    # 深度解析异常对象的 error 属性
    if hasattr(e, 'error') and hasattr(e.error, 'decline_code'):
        return e.error.decline_code in ["incorrect_cvc", "expired_card"]
    return False

@retry(retry=retry_if_exception(should_retry_stripe))
def charge_card():
    ...

这里的关键是: should_retry_stripe 函数接收的是原始异常对象,你可以调用它的任何方法、访问任何属性。我们甚至用它解析 psycopg2.OperationalError pgcode 字段,区分 57P01 (服务器关闭)该重试,还是 23505 (唯一键冲突)该直接失败。

注意: retry_if_exception retry_if_exception_type 的性能差异。前者每次重试都要执行函数,后者是类型检查(O(1))。高频调用场景(如每秒千次的风控请求),建议用 retry_if_exception_type + after 钩子做精细化处理,避免函数调用开销。

3.3 等待策略的工程化实践:不只是“指数退避”

等待策略常被简化为“ wait_exponential() ”,但生产环境必须考虑三件事:

  • 抖动(Jitter)不是可选项,是必选项 :没有抖动的指数退避,在分布式系统中会引发“重试风暴”。想象 1000 台机器同时连不上 Redis,它们按 1s, 2s, 4s, 8s 重试,第 4 次重试时,1000 个连接请求在同一毫秒涌向 Redis,直接打挂。Tenacity 的 wait_exponential(jitter=True) 默认加 0~min 的随机偏移,但我们的经验是: 抖动范围要大于最小等待时间的 50% 。比如 min=100 ,就设 jitter=100 (即 0~100ms ),实测比默认 jitter=50 降低 40% 的重试冲突率。

  • 最大等待时间(max)必须设,且要小于上游超时 wait_exponential(max=10000) 看似安全,但如果上游 HTTP 客户端 timeout 是 5 秒,第 3 次重试前等 8 秒,请求早被 cancel 了。我们的规则是: max = upstream_timeout_ms * 0.7 。比如 Nginx upstream timeout 是 30 秒, max 就设 21000 (21 秒),留 9 秒给网络传输和业务处理。

  • 自定义 wait 函数要带上下文感知 :下面这个例子来自某物流公司的运单查询服务:

    def logistics_wait(retry_state):
        # 基于重试次数和错误类型动态调整
        attempt = retry_state.attempt_number
        exc = retry_state.outcome.exception()
        
        if isinstance(exc, ConnectionError):
            # 网络问题:指数退避
            base = 100 * (2 ** (attempt - 1))
        elif isinstance(exc, Timeout):
            # 超时:线性增长(避免雪崩)
            base = 500 + (attempt - 1) * 1000
        else:
            # 其他错误:固定 2 秒,快速失败
            return 2000
        
        # 加抖动,且不超过 30 秒
        jitter = random.uniform(0, base * 0.3)
        return min(30000, base + jitter)
    
    @retry(wait=logistics_wait)
    def query_waybill():
        ...
    

    这种写法让重试策略具备了“故障诊断”能力:网络问题耐心等,超时问题逐步加压,未知错误快速放弃。

4. 实操过程与核心环节实现:从本地验证到全链路压测

4.1 本地开发阶段:用 stop_after_attempt(1) 快速验证逻辑

新手常犯的错误,是直接上 stop_after_attempt(3) 调试,结果卡在重试里出不来。正确姿势是:

  1. 所有重试装饰器,初始开发时强制设 stop=stop_after_attempt(1) 。这样每次失败只重试 1 次,你能立刻看到:

    • 第一次调用的输入参数
    • 第一次失败的异常堆栈
    • after 钩子是否被触发
    • retry_state attempt_number 是否为 1
  2. tenacity.before_sleep_log 钩子打印关键信息

    import logging
    logger = logging.getLogger("tenacity")
    
    @retry(
        stop=stop_after_attempt(1),
        wait=wait_fixed(1),
        before_sleep=before_sleep_log(logger, logging.DEBUG)
    )
    def test_api():
        raise ConnectionError("simulated network failure")
    

    运行后你会看到:

    DEBUG:tenacity:Starting new retry attempt after 1 seconds
    DEBUG:tenacity:Retrying test_api in 1 seconds as it raised ConnectionError.
    

    这比 print("retrying") 专业十倍——它告诉你“为什么重试”(raised ConnectionError)和“重试依据”(in 1 seconds)。

  3. tenacity.after_log 钩子验证最终结果

    @retry(
        stop=stop_after_attempt(1),
        after=after_log(logger, logging.INFO)
    )
    def test_api():
        return "success"
    

    成功时输出:

    INFO:tenacity:Finished call to 'test_api' after 0.001(s), this was the 1st time calling it.
    

    失败时输出:

    INFO:tenacity:Finished call to 'test_api' after 0.002(s), this was the 1st time calling it.
    

实操心得:在 pytest 里写重试测试,一定要用 monkeypatch 模拟异常,而不是真连网络。我们有个标准模板:

def test_api_retries_on_connection_error(monkeypatch):
    # 模拟第一次失败,第二次成功
    calls = [False, True]
    def mock_request(*args, **kwargs):
        if not calls.pop(0):
            raise ConnectionError("network down")
        return MockResponse(status_code=200, json=lambda: {"data": "ok"})
    monkeypatch.setattr("requests.get", mock_request)
    
    result = fetch_data()  # 带重试的函数
    assert result == {"data": "ok"}

这样测试稳定、快速,且能精确控制重试次数。

4.2 测试环境阶段:用 stop_after_delay 模拟真实超时场景

测试环境最大的陷阱,是“永远不超时”。我们用 stop_after_delay 主动制造超时,验证重试策略是否生效:

  • 模拟下游服务缓慢 :用 slowapi 库启动一个故意延迟的 Flask 服务:

    from flask import Flask
    import time
    
    app = Flask(__name__)
    
    @app.route("/slow")
    def slow_endpoint():
        time.sleep(random.uniform(2, 5))  # 故意 2~5 秒延迟
        return {"status": "ok"}
    

    然后客户端用 stop_after_delay(3)

    @retry(
        stop=stop_after_delay(3),  # 总耗时超 3 秒就停
        wait=wait_fixed(1),
        retry=retry_if_exception_type((ConnectionError, Timeout))
    )
    def call_slow_api():
        response = requests.get("http://localhost:5000/slow", timeout=2)
        response.raise_for_status()
        return response.json()
    

    这样,当 slowapi 延迟 4 秒时, timeout=2 会先触发 Timeout 异常,重试 1 次后,总耗时约 3 秒(第一次 2 秒超时 + 等待 1 秒), stop_after_delay(3) 生效,不再重试。这就是真实的“超时保护”。

  • 模拟下游服务间歇性不可用 :用 pytest-mock 随机抛异常:

    def test_intermittent_failure(mocker):
        # 模拟 30% 概率失败
        mock_get = mocker.patch("requests.get")
        mock_get.side_effect = [
            ConnectionError("down"), 
            ConnectionError("down"), 
            MockResponse(status_code=200, json=lambda: {"ok": True})
        ]
        
        result = call_with_retry()  # 重试 3 次
        assert result == {"ok": True}
        assert mock_get.call_count == 3
    

    这种测试能暴露 retry_if_exception_type 是否漏判了某些异常类型。

4.3 生产环境阶段:全链路可观测性与熔断联动

生产环境的重试,必须回答三个问题:重试是否有效?重试是否过度?重试是否该降级?

  • 重试有效性监控 :我们用 after 钩子上报 Prometheus 指标:

    from prometheus_client import Counter, Histogram
    
    RETRY_COUNTER = Counter(
        "api_retry_total", 
        "Total number of retries", 
        ["endpoint", "exception_type", "attempt_number"]
    )
    RETRY_LATENCY = Histogram(
        "api_retry_latency_seconds", 
        "Latency of retry attempts", 
        ["endpoint", "result"]  # result: success/failure
    )
    
    def monitor_retry(retry_state):
        endpoint = retry_state.fn.__name__
        exc_type = type(retry_state.outcome.exception()).__name__ if retry_state.outcome.exception() else "none"
        attempt = retry_state.attempt_number
        
        RETRY_COUNTER.labels(
            endpoint=endpoint,
            exception_type=exc_type,
            attempt_number=str(attempt)
        ).inc()
        
        if retry_state.outcome.exception() is None:
            RETRY_LATENCY.labels(endpoint=endpoint, result="success").observe(retry_state.elapsed_time)
        else:
            RETRY_LATENCY.labels(endpoint=endpoint, result="failure").observe(retry_state.elapsed_time)
    
    @retry(after=monitor_retry)
    def payment_api():
        ...
    

    Grafana 里画个面板,就能看到: payment_api attempt_number="3" exception_type="ConnectionError" 是否在飙升——如果是,说明下游网络真有问题;如果 attempt_number="1" exception_type="Timeout" 突增,说明是客户端 timeout 设置太小。

  • 重试过度预警 :我们设了两条红线:

    • 单请求重试次数 > 3 次:发 Slack 告警,SRE 人工介入
    • 5 分钟内重试总数 > 1000 次:自动触发熔断(用 circuitbreaker 库),暂停该 API 调用 5 分钟 实现很简单,在 after 钩子里加计数器:
    from threading import Lock
    retry_counter = {}
    counter_lock = Lock()
    
    def circuit_breaker_hook(retry_state):
        with counter_lock:
            key = f"{retry_state.fn.__name__}_{retry_state.attempt_number}"
            retry_counter[key] = retry_counter.get(key, 0) + 1
            
        if retry_state.attempt_number > 3:
            alert_slack(f"High retry on {retry_state.fn.__name__}: {retry_state.outcome.exception()}")
        
        # 每分钟清零计数器(用定时任务)
    
  • 重试与降级联动 :当重试失败后,不能直接抛异常给用户。我们用 reraise=False + 自定义返回:

    @retry(
        stop=stop_after_attempt(3),
        reraise=False,
        retry=retry_if_exception_type(ConnectionError)
    )
    def get_user_profile(user_id):
        return requests.get(f"/api/users/{user_id}").json()
    
    def safe_get_profile(user_id):
        profile = get_user_profile(user_id)
        if profile is None:
            # 重试失败,降级为缓存数据或默认头像
            return get_cached_profile(user_id) or {"name": "User", "avatar": "/default.png"}
        return profile
    

    这种“重试保底,降级兜底”的模式,在电商大促时扛住了 99.99% 的流量峰值。

5. 常见问题与排查技巧实录:那些文档里不会写的坑

5.1 “重试没生效”问题排查清单

这是最高频的问题。按优先级列出排查步骤:

检查项 如何验证 典型原因 解决方案
装饰器是否被正确应用 在函数内 print("inside function") ,看是否在重试时重复打印 @retry 写在了 @lru_cache 下方,导致缓存命中后根本不走重试逻辑 @retry 放在所有其他装饰器最上方
异常是否被上游捕获 在重试函数外层加 try/except ,看异常是否被吞掉 requests.get() 被包在 try/except requests.RequestException 里,异常没抛到 @retry 移除上游的 try/except ,让异常透传给 Tenacity
stop 条件是否过早满足 打印 retry_state.stop 的返回值 stop_after_delay(1) 但函数本身执行就花了 1.2 秒,第一次就停 stop_after_delay(5) 并观察 retry_state.elapsed_time
retry 条件是否匹配异常 retry_if_exception 函数里 print(type(e)) retry_if_exception_type(ConnectionError) 但实际抛的是 requests.exceptions.ConnectionError (子类) 改用 retry_if_exception_type(requests.exceptions.ConnectionError) retry_if_exception_type(Exception)

实操心得:用 tenacity.Retrying 实例替代装饰器,能直接 print(retrying_object.retry_state) 看实时状态。我们有个 debug 工具函数:

def debug_retry(fn, *args, **kwargs):
    retrying = Retrying(
        stop=stop_after_attempt(3),
        wait=wait_fixed(0.1),
        reraise=True
    )
    try:
        return retrying(fn, *args, **kwargs)
    except Exception as e:
        print(f"Final state: {retrying.retry_state}")
        raise

这比看日志快十倍。

5.2 “重试次数远超预期”问题根因分析

现象:日志显示 Retrying foo in 1 seconds as it raised ... 打印了 10 次,但 stop_after_attempt(3) 明明只该重试 2 次(共 3 次调用)。

根本原因: stop_after_attempt(n) 表示“最多尝试 n 次”,即函数被调用 n 次。第一次是原始调用,后 n-1 次是重试。所以 stop_after_attempt(3) = 原始调用 + 2 次重试 = 共 3 次执行。

但为什么日志显示 10 次?因为 before_sleep 钩子在每次重试 打印,而 after 钩子在每次执行 打印。如果 before_sleep 打印了 10 次,说明 stop 条件根本没生效。

常见根因:

  • stop 条件用了 & (与)但逻辑错误 stop=stop_after_attempt(3) & stop_if_exception_type(Timeout) 表示“既要试够 3 次,又要遇到 Timeout 异常”,但实际异常是 ConnectionError ,所以 stop 永远为 False,无限重试。正确写法是 | (或): stop=stop_after_attempt(3) | stop_if_exception_type(Timeout)

  • wait 时间为 0 导致 CPU 疯狂轮询 wait_fixed(0) 会让重试瞬间发生,看起来像“疯狂重试”。Tenacity 默认 wait=wait_none() ,但很多教程误写成 wait=0 ,这是 bug。

  • 异步函数误用同步重试 @retry 用在 async def 函数上,Tenacity 会把它当同步函数处理, await 被忽略,导致协程对象被直接返回, retry_if_exception 判断失败。必须用 @retrying_async tenacity.AsyncRetrying

5.3 “重试导致数据重复”问题终极解法

这是金融、支付、订单类系统的生死线。重试本身不产生重复,但重试的 副作用 会。

典型场景: charge_card() 函数里调用 Stripe API 创建支付,网络超时,重试时又创建了一笔新支付。

解决方案不是禁用重试,而是 幂等性设计 + 重试策略协同

  • Stripe 等现代 API 都支持 idempotency_key :用 UUID 作为请求唯一标识,重试时传同一个 key,服务端自动去重。

    import uuid
    
    @retry(
        stop=stop_after_attempt(3),
        wait=wait_exponential(multiplier=1, min=1, max=10)
    )
    def charge_card(amount, currency):
        idempotency_key = str(uuid.uuid4())  # 每次重试用新 key?错!
        # 正确:用 request_id 或 user_id + timestamp 生成确定性 key
        idempotency_key = f"{user_id}_{int(time.time())}_{amount}"
        
        return stripe.Charge.create(
            amount=amount,
            currency=currency,
            idempotency_key=idempotency_key
        )
    
  • 数据库操作必须带唯一约束 INSERT INTO orders (order_id, ...) VALUES (?, ?) order_id 必须是业务唯一键(如 user_id + timestamp + nonce ),数据库 UNIQUE INDEX 保证重复插入失败,然后 retry_if_exception_type(IntegrityError) 重试——但这次重试要查库确认是否已存在。

  • 重试时主动查状态 :对“创建资源”类操作,重试前先 GET /orders/{id} 确认是否存在,存在则直接返回,不存在再 POST 。Tenacity 的 retry_if_result 完美适配:

    def check_order_exists(order_id):
        try:
            resp = requests.get(f"/api/orders/{order_id}")
            return resp.status_code == 200
        except:
            return False
    
    @retry(
        retry=retry_if_result(lambda x: not x)  # 如果 check 返回 False,就重试
    )
    def create_order_with_check(order_id):
        if check_order_exists(order_id):
            return get_order(order_id)
        return create_new_order(order_id)
    

最后分享一个血泪教训:某次大促,我们发现重试导致库存扣减了两次。根因是 decrease_stock(item_id, quantity) 接口没做幂等,且重试策略里 retry_if_exception_type 漏了 StockNotEnoughError 。解决方案是三重保险:

  1. 接口层面: decrease_stock version 字段,每次扣减 version += 1 ,并发时 CAS 失败抛 VersionConflictError
  2. 重试层面: retry_if_exception_type((ConnectionError, Timeout, VersionConflictError))
  3. 监控层面: after 钩子里统计 VersionConflictError 次数,超阈值自动告警并降级为“查库存+扣减”两步操作 这套组合拳上线后,库存相关客诉下降 99.2%。

6. 进阶实战:Tenacity 与异步、协程、事件循环的深度整合

6.1 AsyncIO 场景下的重试:为什么 @retry 会失效

@retry 装饰器本质是同步的。当你把它用在 async def 函数上:

@retry(stop=stop_after_attempt(3))
async def fetch_data():
    await asyncio.sleep(1)
    raise ConnectionError

会发生什么?Tenacity 会把 fetch_data() 当作一个普通函数调用,返回一个 coroutine 对象,然后 retry_if_exception_type 去检查这个 coroutine 对象——它当然不是 ConnectionError ,所以重试永远不会触发。

正确做法是使用 tenacity.AsyncRetrying

import asyncio
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential

async def fetch_data():
    await asyncio.sleep(1)
    raise ConnectionError("network down")

# 方式一:用 AsyncRetrying 实例
retrying = AsyncRetrying(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)

async def safe_fetch():
    async for attempt in retrying:
        with attempt:
            return await fetch_data()

# 方式二:用装饰器(需 tenacity>=8.0.0)
from tenacity import retrying_async

@retrying_async(
    stop=stop_after_attempt(3),
    wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def fetch_data_decorated():
    await asyncio.sleep(1)
    raise ConnectionError("network down")

关键区别: AsyncRetrying __aiter__ 方法会 await 被装饰的协程,而 retry_if_exception 判断的是 await 后的异常,不是协程对象本身。

6.2 在 FastAPI/Starlette 中的重试注入:避免全局污染

FastAPI 的依赖注入系统,让重试可以细粒度控制。我们不推荐在路由函数上直接加 @retry ,而是注入一个“重试就绪”的 HTTP 客户端:

from httpx import AsyncClient
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential

# 创建可重试的 HTTP 客户端
class RetryableClient:
    def __init__(

更多推荐