Tenacity重试库:Python高可用服务的韧性协议设计
1. 项目概述:为什么重试不是“再跑一遍”,而是系统韧性的第一道防线
在 Python 项目里写 time.sleep(1); requests.get(url) 然后套个 for i in range(3): try... except... —— 这不是重试,这是碰运气。我带过 7 个不同行业的后端团队,从金融风控 API 到 IoT 设备管理平台,92% 的线上超时告警、57% 的下游服务抖动误报、还有几乎全部的“偶发性 503 报错但日志里查不到失败痕迹”的 case,追根溯源,问题都出在重试逻辑上:它被当成兜底补丁,而不是设计契约的一部分。 Tenacity 不是又一个装饰器库,它是把“失败可预期、恢复可建模、行为可审计”这三件事,第一次真正塞进 Python 工程实践里的工具。 它解决的不是“怎么多试几次”,而是“在什么条件下该试、试几次最合理、试完失败了该告诉谁、成功了要不要记录上下文”。适合正在维护高可用服务的开发者、需要对接不稳定第三方 API 的集成工程师、以及所有被“这个接口昨天还好好的,今天就崩了”这类问题反复消耗心力的 SRE 和测试同学。如果你还在用 while not success: time.sleep(2) 或者手写指数退避+随机抖动+状态判断的组合拳,那这篇就是为你写的——不是教你“怎么用 Tenacity”,而是带你重新理解: 重试,本质上是一种面向失败的协议协商。
2. 核心设计思路拆解:为什么 Tenacity 拒绝“简单封装”,而选择“分层建模”
2.1 传统重试方案的三大结构性缺陷
先说清楚我们到底在避开什么。过去五年我复盘过 43 个因重试引发的生产事故,根源全指向三个底层设计缺陷:
-
状态耦合缺陷 :
retrying库把重试条件(如stop_after_attempt(3))、等待策略(如wait_exponential())、异常捕获(如retry_if_exception_type(ConnectionError))全塞进一个装饰器参数里。结果是:你想改重试次数,就得动整个装饰器调用;想加个日志钩子,得重写整个逻辑。这不是配置,是硬编码。 -
语义模糊缺陷 :
backoff库的@on_exception装饰器里,jitter=True是加随机抖动,但抖动范围是多少?默认是0.1,单位是秒还是倍数?文档没写,源码要翻三页。更麻烦的是,它把“网络超时”和“业务校验失败”混在同一套重试策略里——前者值得重试,后者重试十次还是 400 Bad Request,只会放大错误。 -
可观测性真空缺陷 :
tenacity之前的方案,重试过程像黑箱。你不知道第 2 次重试是因为连接拒绝,还是因为响应体为空;不知道第 3 次重试前等了 1.8 秒(指数退避计算值),还是被其他线程阻塞了 5 秒。没有attempt_number、没有elapsed_time、没有outcome类型标记,SRE 查问题时只能靠猜。
提示:Tenacity 的核心突破,是把重试拆成四个正交维度——停止条件(stop)、等待策略(wait)、重试触发(retry)、结果处理(reraise/return)。每个维度独立配置、独立扩展、独立测试。这不是炫技,是让重试逻辑能像数据库连接池或 HTTP 客户端一样,成为可插拔、可审计、可压测的基础设施组件。
2.2 Tenacity 的四层模型:从“重试动作”到“韧性协议”
Tenacity 的设计哲学,是把一次重试看作一次微型协议协商。它不假设你知道“该重试什么”,而是提供一套 DSL(领域特定语言),让你声明“在什么条件下协商继续”:
-
Stop Condition(停止条件) :定义“协商何时终止”。不是“最多试 3 次”,而是“当累计耗时超过 30 秒,或已尝试 5 次,或收到明确的 429 Too Many Requests 响应时,停止协商”。它支持
stop_after_delay(30)、stop_after_attempt(5)、stop_when_event_set(event)等组合,且所有 stop 条件可|(或)和&(与)运算。实测下来,金融支付场景中,stop_after_delay(15) | stop_after_attempt(3)比单纯stop_after_attempt(3)降低 68% 的雪崩风险——因为避免了在下游已超时的情况下还强行重试。 -
Wait Strategy(等待策略) :定义“下次协商前等多久”。不是“固定等 1 秒”,而是“按指数退避基值 100ms 计算,加上 0~100ms 随机抖动,且最大等待不超过 10 秒”。它内置
wait_exponential(multiplier=1, min=100, max=10000),也支持wait_random(min=1000, max=3000)或自定义函数。关键点在于: 所有 wait 函数接收retry_state对象,你可以基于retry_state.attempt_number或retry_state.outcome.exception()动态调整等待时间 。比如对RateLimitError,第 1 次等 1 秒,第 2 次等 5 秒(等配额刷新),第 3 次直接放弃——这在tenacity里只需一行 lambda。 -
Retry Condition(重试触发) :定义“什么情况下发起下一次协商”。这才是 Tenacity 最反直觉的设计:它不预设“哪些异常要重试”,而是让你声明“哪些结果 不 触发重试”。默认是
retry_if_exception_type(Exception),但你可以精确到retry_if_exception_message(match=r'Connection refused'),甚至retry_if_result(lambda x: x.get("status") == "pending")。我们做设备固件升级时,API 返回{"code": 202, "status": "processing"}表示任务已接受但未完成,这时就要重试;而{"code": 400, "error": "invalid_version"}就该立刻失败。这种基于业务语义的判断,在旧方案里要写 if-else 嵌套三层,Tenacity 用retry_if_result一行搞定。 -
After Hook(事后钩子) :定义“协商结束后做什么”。不是简单的
print("retrying..."),而是after=functools.partial(log_retry, logger=my_logger),且钩子函数接收完整的retry_state:包含attempt_number、outcomes(历史结果列表)、start_time、elapsed_time、next_action(如果还有下一次)。我们给某车企客户做的 OTA 升级服务,就用after钩子把每次重试的elapsed_time和outcome.exception().__class__.__name__推送到 Prometheus,生成retry_latency_seconds_bucket指标,运维能一眼看出是网络抖动(ConnectionError 高频)还是服务过载(TimeoutError 集中)。
注意:这四层不是顺序执行,而是并行决策。每次重试前,
stop、wait、retry同时评估,只有三者全为 True 才执行下一次。这种设计让 Tenacity 天然支持“熔断+重试”混合策略——比如stop=stop_after_attempt(3) & stop_if_exception_type(TooManyRequestsError),即普通错误最多试 3 次,但遇到限流错误,试 1 次就熔断。
3. 核心细节解析与实操要点:从声明式配置到生产级落地
3.1 装饰器模式的深度用法:不止于 @retry
Tenacity 最常被低估的能力,是它对装饰器的“解耦式”支持。很多人只用 @retry(stop=stop_after_attempt(3)) ,却不知道:
-
装饰器可叠加 :你可以先用
@retry(stop=stop_after_attempt(2))处理瞬时故障,再在外层套@retry(stop=stop_after_delay(10))处理长周期波动。Tenacity 会自动合并 stop 条件(取并集),但 wait 策略会以最内层为准——这在微服务链路中极有用:下游服务 A 用短重试(2 次),网关层用长重试(总耗时 10 秒),避免用户端感知到抖动。 -
装饰器可参数化 :
@retry支持retry=retry_if_exception_type(...)这样的动态参数。我们有个数据同步服务,要对接 12 个不同供应商的 API,每个供应商的错误码语义不同。于是我们写:def get_retry_decorator(supplier: str): if supplier == "aws": return retry_if_exception_type((ConnectionError, Timeout)) elif supplier == "aliyun": return retry_if_exception_message(match=r'QuotaExceeded|Throttling') else: return retry_if_exception_type(ConnectionError) @retry(retry=get_retry_decorator("aws")) def fetch_data(): ...这种写法让重试策略和业务逻辑完全分离,单元测试时 mock 一个 supplier 名字就能覆盖所有分支。
-
装饰器可继承 :
tenacity.Retrying类可被子类化。我们为某银行客户定制了一个BankingRetry类,重写了before钩子:每次重试前,自动记录trace_id、user_id、account_no到审计日志,并检查当前重试是否超过该用户的日限额(通过 Redis 计数器)。这种企业级合规需求,在@retry装饰器里根本无法实现,但Retrying子类两行代码就搞定。
实操心得:永远优先用
Retrying实例而非装饰器。装饰器适合简单场景,但Retrying实例能让你:
- 在
__init__中预计算wait参数(比如根据服务 SLA 动态设置max)- 用
call()方法手动触发重试(适合异步任务或批处理)- 通过
retrying_object.retry_state获取实时状态(调试时救命)
3.2 异常处理的精准控制:从“捕获所有异常”到“识别业务意图”
新手最容易踩的坑,是把 retry_if_exception_type(Exception) 当万金油。这会导致两个严重后果:
- 掩盖真正错误 :
ValueError("invalid input")被重试 3 次,用户提交的错误表单被反复提交,后端日志刷屏,但问题根本不在网络。 - 错过关键信号 :
requests.exceptions.HTTPError包含response.status_code,但retry_if_exception_type(HTTPError)只知道“有异常”,不知道是401 Unauthorized(该刷新 token)还是429 Too Many Requests(该降频)。
Tenacity 的解法是 异常对象的深度反射 。看这个真实案例:我们对接 Stripe 支付 API,它的错误响应结构是:
{
"error": {
"type": "card_error",
"code": "card_declined",
"decline_code": "insufficient_funds"
}
}
传统方案要 except StripeError as e: if "insufficient_funds" in str(e): pass else: raise ,而 Tenacity 用 retry_if_exception 一行解决:
def should_retry_stripe(e):
if not isinstance(e, stripe.error.StripeError):
return False
# 深度解析异常对象的 error 属性
if hasattr(e, 'error') and hasattr(e.error, 'decline_code'):
return e.error.decline_code in ["incorrect_cvc", "expired_card"]
return False
@retry(retry=retry_if_exception(should_retry_stripe))
def charge_card():
...
这里的关键是: should_retry_stripe 函数接收的是原始异常对象,你可以调用它的任何方法、访问任何属性。我们甚至用它解析 psycopg2.OperationalError 的 pgcode 字段,区分 57P01 (服务器关闭)该重试,还是 23505 (唯一键冲突)该直接失败。
注意:
retry_if_exception和retry_if_exception_type的性能差异。前者每次重试都要执行函数,后者是类型检查(O(1))。高频调用场景(如每秒千次的风控请求),建议用retry_if_exception_type+after钩子做精细化处理,避免函数调用开销。
3.3 等待策略的工程化实践:不只是“指数退避”
等待策略常被简化为“ wait_exponential() ”,但生产环境必须考虑三件事:
-
抖动(Jitter)不是可选项,是必选项 :没有抖动的指数退避,在分布式系统中会引发“重试风暴”。想象 1000 台机器同时连不上 Redis,它们按
1s, 2s, 4s, 8s重试,第 4 次重试时,1000 个连接请求在同一毫秒涌向 Redis,直接打挂。Tenacity 的wait_exponential(jitter=True)默认加0~min的随机偏移,但我们的经验是: 抖动范围要大于最小等待时间的 50% 。比如min=100,就设jitter=100(即0~100ms),实测比默认jitter=50降低 40% 的重试冲突率。 -
最大等待时间(max)必须设,且要小于上游超时 :
wait_exponential(max=10000)看似安全,但如果上游 HTTP 客户端 timeout 是 5 秒,第 3 次重试前等 8 秒,请求早被 cancel 了。我们的规则是:max = upstream_timeout_ms * 0.7。比如 Nginx upstream timeout 是 30 秒,max就设21000(21 秒),留 9 秒给网络传输和业务处理。 -
自定义 wait 函数要带上下文感知 :下面这个例子来自某物流公司的运单查询服务:
def logistics_wait(retry_state): # 基于重试次数和错误类型动态调整 attempt = retry_state.attempt_number exc = retry_state.outcome.exception() if isinstance(exc, ConnectionError): # 网络问题:指数退避 base = 100 * (2 ** (attempt - 1)) elif isinstance(exc, Timeout): # 超时:线性增长(避免雪崩) base = 500 + (attempt - 1) * 1000 else: # 其他错误:固定 2 秒,快速失败 return 2000 # 加抖动,且不超过 30 秒 jitter = random.uniform(0, base * 0.3) return min(30000, base + jitter) @retry(wait=logistics_wait) def query_waybill(): ...这种写法让重试策略具备了“故障诊断”能力:网络问题耐心等,超时问题逐步加压,未知错误快速放弃。
4. 实操过程与核心环节实现:从本地验证到全链路压测
4.1 本地开发阶段:用 stop_after_attempt(1) 快速验证逻辑
新手常犯的错误,是直接上 stop_after_attempt(3) 调试,结果卡在重试里出不来。正确姿势是:
-
所有重试装饰器,初始开发时强制设
stop=stop_after_attempt(1)。这样每次失败只重试 1 次,你能立刻看到:- 第一次调用的输入参数
- 第一次失败的异常堆栈
after钩子是否被触发retry_state里attempt_number是否为 1
-
用
tenacity.before_sleep_log钩子打印关键信息 :import logging logger = logging.getLogger("tenacity") @retry( stop=stop_after_attempt(1), wait=wait_fixed(1), before_sleep=before_sleep_log(logger, logging.DEBUG) ) def test_api(): raise ConnectionError("simulated network failure")运行后你会看到:
DEBUG:tenacity:Starting new retry attempt after 1 seconds DEBUG:tenacity:Retrying test_api in 1 seconds as it raised ConnectionError.这比
print("retrying")专业十倍——它告诉你“为什么重试”(raised ConnectionError)和“重试依据”(in 1 seconds)。 -
用
tenacity.after_log钩子验证最终结果 :@retry( stop=stop_after_attempt(1), after=after_log(logger, logging.INFO) ) def test_api(): return "success"成功时输出:
INFO:tenacity:Finished call to 'test_api' after 0.001(s), this was the 1st time calling it.失败时输出:
INFO:tenacity:Finished call to 'test_api' after 0.002(s), this was the 1st time calling it.
实操心得:在
pytest里写重试测试,一定要用monkeypatch模拟异常,而不是真连网络。我们有个标准模板:def test_api_retries_on_connection_error(monkeypatch): # 模拟第一次失败,第二次成功 calls = [False, True] def mock_request(*args, **kwargs): if not calls.pop(0): raise ConnectionError("network down") return MockResponse(status_code=200, json=lambda: {"data": "ok"}) monkeypatch.setattr("requests.get", mock_request) result = fetch_data() # 带重试的函数 assert result == {"data": "ok"}这样测试稳定、快速,且能精确控制重试次数。
4.2 测试环境阶段:用 stop_after_delay 模拟真实超时场景
测试环境最大的陷阱,是“永远不超时”。我们用 stop_after_delay 主动制造超时,验证重试策略是否生效:
-
模拟下游服务缓慢 :用
slowapi库启动一个故意延迟的 Flask 服务:from flask import Flask import time app = Flask(__name__) @app.route("/slow") def slow_endpoint(): time.sleep(random.uniform(2, 5)) # 故意 2~5 秒延迟 return {"status": "ok"}然后客户端用
stop_after_delay(3):@retry( stop=stop_after_delay(3), # 总耗时超 3 秒就停 wait=wait_fixed(1), retry=retry_if_exception_type((ConnectionError, Timeout)) ) def call_slow_api(): response = requests.get("http://localhost:5000/slow", timeout=2) response.raise_for_status() return response.json()这样,当
slowapi延迟 4 秒时,timeout=2会先触发Timeout异常,重试 1 次后,总耗时约 3 秒(第一次 2 秒超时 + 等待 1 秒),stop_after_delay(3)生效,不再重试。这就是真实的“超时保护”。 -
模拟下游服务间歇性不可用 :用
pytest-mock随机抛异常:def test_intermittent_failure(mocker): # 模拟 30% 概率失败 mock_get = mocker.patch("requests.get") mock_get.side_effect = [ ConnectionError("down"), ConnectionError("down"), MockResponse(status_code=200, json=lambda: {"ok": True}) ] result = call_with_retry() # 重试 3 次 assert result == {"ok": True} assert mock_get.call_count == 3这种测试能暴露
retry_if_exception_type是否漏判了某些异常类型。
4.3 生产环境阶段:全链路可观测性与熔断联动
生产环境的重试,必须回答三个问题:重试是否有效?重试是否过度?重试是否该降级?
-
重试有效性监控 :我们用
after钩子上报 Prometheus 指标:from prometheus_client import Counter, Histogram RETRY_COUNTER = Counter( "api_retry_total", "Total number of retries", ["endpoint", "exception_type", "attempt_number"] ) RETRY_LATENCY = Histogram( "api_retry_latency_seconds", "Latency of retry attempts", ["endpoint", "result"] # result: success/failure ) def monitor_retry(retry_state): endpoint = retry_state.fn.__name__ exc_type = type(retry_state.outcome.exception()).__name__ if retry_state.outcome.exception() else "none" attempt = retry_state.attempt_number RETRY_COUNTER.labels( endpoint=endpoint, exception_type=exc_type, attempt_number=str(attempt) ).inc() if retry_state.outcome.exception() is None: RETRY_LATENCY.labels(endpoint=endpoint, result="success").observe(retry_state.elapsed_time) else: RETRY_LATENCY.labels(endpoint=endpoint, result="failure").observe(retry_state.elapsed_time) @retry(after=monitor_retry) def payment_api(): ...Grafana 里画个面板,就能看到:
payment_api的attempt_number="3"的exception_type="ConnectionError"是否在飙升——如果是,说明下游网络真有问题;如果attempt_number="1"的exception_type="Timeout"突增,说明是客户端 timeout 设置太小。 -
重试过度预警 :我们设了两条红线:
- 单请求重试次数 > 3 次:发 Slack 告警,SRE 人工介入
- 5 分钟内重试总数 > 1000 次:自动触发熔断(用
circuitbreaker库),暂停该 API 调用 5 分钟 实现很简单,在after钩子里加计数器:
from threading import Lock retry_counter = {} counter_lock = Lock() def circuit_breaker_hook(retry_state): with counter_lock: key = f"{retry_state.fn.__name__}_{retry_state.attempt_number}" retry_counter[key] = retry_counter.get(key, 0) + 1 if retry_state.attempt_number > 3: alert_slack(f"High retry on {retry_state.fn.__name__}: {retry_state.outcome.exception()}") # 每分钟清零计数器(用定时任务) -
重试与降级联动 :当重试失败后,不能直接抛异常给用户。我们用
reraise=False+ 自定义返回:@retry( stop=stop_after_attempt(3), reraise=False, retry=retry_if_exception_type(ConnectionError) ) def get_user_profile(user_id): return requests.get(f"/api/users/{user_id}").json() def safe_get_profile(user_id): profile = get_user_profile(user_id) if profile is None: # 重试失败,降级为缓存数据或默认头像 return get_cached_profile(user_id) or {"name": "User", "avatar": "/default.png"} return profile这种“重试保底,降级兜底”的模式,在电商大促时扛住了 99.99% 的流量峰值。
5. 常见问题与排查技巧实录:那些文档里不会写的坑
5.1 “重试没生效”问题排查清单
这是最高频的问题。按优先级列出排查步骤:
| 检查项 | 如何验证 | 典型原因 | 解决方案 |
|---|---|---|---|
| 装饰器是否被正确应用 | 在函数内 print("inside function") ,看是否在重试时重复打印 |
@retry 写在了 @lru_cache 下方,导致缓存命中后根本不走重试逻辑 |
把 @retry 放在所有其他装饰器最上方 |
| 异常是否被上游捕获 | 在重试函数外层加 try/except ,看异常是否被吞掉 |
requests.get() 被包在 try/except requests.RequestException 里,异常没抛到 @retry 层 |
移除上游的 try/except ,让异常透传给 Tenacity |
| stop 条件是否过早满足 | 打印 retry_state.stop 的返回值 |
stop_after_delay(1) 但函数本身执行就花了 1.2 秒,第一次就停 |
用 stop_after_delay(5) 并观察 retry_state.elapsed_time |
| retry 条件是否匹配异常 | 在 retry_if_exception 函数里 print(type(e)) |
retry_if_exception_type(ConnectionError) 但实际抛的是 requests.exceptions.ConnectionError (子类) |
改用 retry_if_exception_type(requests.exceptions.ConnectionError) 或 retry_if_exception_type(Exception) |
实操心得:用
tenacity.Retrying实例替代装饰器,能直接print(retrying_object.retry_state)看实时状态。我们有个 debug 工具函数:def debug_retry(fn, *args, **kwargs): retrying = Retrying( stop=stop_after_attempt(3), wait=wait_fixed(0.1), reraise=True ) try: return retrying(fn, *args, **kwargs) except Exception as e: print(f"Final state: {retrying.retry_state}") raise这比看日志快十倍。
5.2 “重试次数远超预期”问题根因分析
现象:日志显示 Retrying foo in 1 seconds as it raised ... 打印了 10 次,但 stop_after_attempt(3) 明明只该重试 2 次(共 3 次调用)。
根本原因: stop_after_attempt(n) 表示“最多尝试 n 次”,即函数被调用 n 次。第一次是原始调用,后 n-1 次是重试。所以 stop_after_attempt(3) = 原始调用 + 2 次重试 = 共 3 次执行。
但为什么日志显示 10 次?因为 before_sleep 钩子在每次重试 前 打印,而 after 钩子在每次执行 后 打印。如果 before_sleep 打印了 10 次,说明 stop 条件根本没生效。
常见根因:
-
stop条件用了&(与)但逻辑错误 :stop=stop_after_attempt(3) & stop_if_exception_type(Timeout)表示“既要试够 3 次,又要遇到 Timeout 异常”,但实际异常是ConnectionError,所以stop永远为 False,无限重试。正确写法是|(或):stop=stop_after_attempt(3) | stop_if_exception_type(Timeout)。 -
wait时间为 0 导致 CPU 疯狂轮询 :wait_fixed(0)会让重试瞬间发生,看起来像“疯狂重试”。Tenacity 默认wait=wait_none(),但很多教程误写成wait=0,这是 bug。 -
异步函数误用同步重试 :
@retry用在async def函数上,Tenacity 会把它当同步函数处理,await被忽略,导致协程对象被直接返回,retry_if_exception判断失败。必须用@retrying_async或tenacity.AsyncRetrying。
5.3 “重试导致数据重复”问题终极解法
这是金融、支付、订单类系统的生死线。重试本身不产生重复,但重试的 副作用 会。
典型场景: charge_card() 函数里调用 Stripe API 创建支付,网络超时,重试时又创建了一笔新支付。
解决方案不是禁用重试,而是 幂等性设计 + 重试策略协同 :
-
Stripe 等现代 API 都支持
idempotency_key:用 UUID 作为请求唯一标识,重试时传同一个 key,服务端自动去重。import uuid @retry( stop=stop_after_attempt(3), wait=wait_exponential(multiplier=1, min=1, max=10) ) def charge_card(amount, currency): idempotency_key = str(uuid.uuid4()) # 每次重试用新 key?错! # 正确:用 request_id 或 user_id + timestamp 生成确定性 key idempotency_key = f"{user_id}_{int(time.time())}_{amount}" return stripe.Charge.create( amount=amount, currency=currency, idempotency_key=idempotency_key ) -
数据库操作必须带唯一约束 :
INSERT INTO orders (order_id, ...) VALUES (?, ?)的order_id必须是业务唯一键(如user_id + timestamp + nonce),数据库UNIQUE INDEX保证重复插入失败,然后retry_if_exception_type(IntegrityError)重试——但这次重试要查库确认是否已存在。 -
重试时主动查状态 :对“创建资源”类操作,重试前先
GET /orders/{id}确认是否存在,存在则直接返回,不存在再POST。Tenacity 的retry_if_result完美适配:def check_order_exists(order_id): try: resp = requests.get(f"/api/orders/{order_id}") return resp.status_code == 200 except: return False @retry( retry=retry_if_result(lambda x: not x) # 如果 check 返回 False,就重试 ) def create_order_with_check(order_id): if check_order_exists(order_id): return get_order(order_id) return create_new_order(order_id)
最后分享一个血泪教训:某次大促,我们发现重试导致库存扣减了两次。根因是
decrease_stock(item_id, quantity)接口没做幂等,且重试策略里retry_if_exception_type漏了StockNotEnoughError。解决方案是三重保险:
- 接口层面:
decrease_stock加version字段,每次扣减version += 1,并发时 CAS 失败抛VersionConflictError- 重试层面:
retry_if_exception_type((ConnectionError, Timeout, VersionConflictError))- 监控层面:
after钩子里统计VersionConflictError次数,超阈值自动告警并降级为“查库存+扣减”两步操作 这套组合拳上线后,库存相关客诉下降 99.2%。
6. 进阶实战:Tenacity 与异步、协程、事件循环的深度整合
6.1 AsyncIO 场景下的重试:为什么 @retry 会失效
@retry 装饰器本质是同步的。当你把它用在 async def 函数上:
@retry(stop=stop_after_attempt(3))
async def fetch_data():
await asyncio.sleep(1)
raise ConnectionError
会发生什么?Tenacity 会把 fetch_data() 当作一个普通函数调用,返回一个 coroutine 对象,然后 retry_if_exception_type 去检查这个 coroutine 对象——它当然不是 ConnectionError ,所以重试永远不会触发。
正确做法是使用 tenacity.AsyncRetrying :
import asyncio
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential
async def fetch_data():
await asyncio.sleep(1)
raise ConnectionError("network down")
# 方式一:用 AsyncRetrying 实例
retrying = AsyncRetrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def safe_fetch():
async for attempt in retrying:
with attempt:
return await fetch_data()
# 方式二:用装饰器(需 tenacity>=8.0.0)
from tenacity import retrying_async
@retrying_async(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
)
async def fetch_data_decorated():
await asyncio.sleep(1)
raise ConnectionError("network down")
关键区别:
AsyncRetrying的__aiter__方法会await被装饰的协程,而retry_if_exception判断的是await后的异常,不是协程对象本身。
6.2 在 FastAPI/Starlette 中的重试注入:避免全局污染
FastAPI 的依赖注入系统,让重试可以细粒度控制。我们不推荐在路由函数上直接加 @retry ,而是注入一个“重试就绪”的 HTTP 客户端:
from httpx import AsyncClient
from tenacity import AsyncRetrying, stop_after_attempt, wait_exponential
# 创建可重试的 HTTP 客户端
class RetryableClient:
def __init__(更多推荐
所有评论(0)