Python 后端基础(十):消息队列和 Celery 怎么用,异步任务、削峰填谷和失败重试讲清楚
后端项目里不是所有事情都应该在接口请求里立刻做完。
比如用户上传文件后要解析、发送邮件、生成报告、调用大模型、批量处理图片。如果这些任务都让用户等着,接口会很慢,甚至超时。
消息队列和 Celery 的价值就在这里:把耗时任务从主请求里拆出去,后台慢慢处理。
【一、什么是消息队列】
消息队列可以理解成一个任务中转站。
接口服务 -> 把任务放进队列 -> 后台 Worker 取任务执行
用户请求接口时,后端只需要快速创建任务并返回任务 id;真正耗时的工作交给后台进程处理。
常见消息队列或任务系统:
- Redis
- RabbitMQ
- Kafka
- Celery
- RQ
Python 后端里,Celery + Redis / RabbitMQ 很常见。
【二、为什么要异步任务】
适合异步处理的场景:
- 发送邮件、短信。
- 文件上传后的解析。
- 图片压缩、视频转码。
- Excel 导入导出。
- 爬虫任务。
- AI 大模型调用。
- RAG 文档切片和向量化。
- 定时任务。
- 报表生成。
这些任务有共同点:耗时、不适合让用户一直等、失败后可以重试。
【三、同步接口的问题】
假设用户上传一个 PDF,后端要解析、切片、向量化、入库。如果同步执行:
用户上传 PDF
-> 接口等待解析完成
-> 等待模型 embedding
-> 等待写入向量库
-> 最后返回
如果这个过程 30 秒,用户体验很差,还可能网关超时。
更合理的做法:
用户上传 PDF
-> 后端创建任务
-> 返回 task_id
-> Worker 后台处理
-> 前端轮询任务状态
【四、Celery 是什么】
Celery 是 Python 里常用的分布式任务队列。
它通常由几部分组成:
- Producer:生产任务的一方,一般是 FastAPI/Django 接口。
- Broker:消息中间件,比如 Redis 或 RabbitMQ。
- Worker:执行任务的后台进程。
- Result Backend:保存任务结果的地方,比如 Redis。
流程:
API 服务提交任务
-> Redis/RabbitMQ 保存任务消息
-> Celery Worker 获取任务
-> 执行业务逻辑
-> 保存任务状态和结果
【五、Celery 简单示例】
安装:
pip install celery redis
任务文件:
from celery import Celery
app = Celery(
"tasks",
broker="redis://localhost:6379/0",
backend="redis://localhost:6379/1",
)
@app.task
def add(x, y):
return x + y
提交任务:
result = add.delay(3, 5)
print(result.id)
启动 worker:
celery -A tasks worker --loglevel=info
【六、任务状态怎么设计】
项目里不要只依赖 Celery 自带状态,最好在数据库里有自己的任务表。
例如:
tasks
- id
- user_id
- task_type
- status: pending/running/success/failed
- input
- output
- error_message
- created_at
- updated_at
这样前端可以查询:
GET /tasks/{task_id}
返回:
{
"id": "task_001",
"status": "running",
"progress": 60
}
【七、失败重试为什么重要】
异步任务经常会失败:
- 网络超时。
- 第三方 API 临时不可用。
- 大模型接口限流。
- 文件格式异常。
- 数据库连接失败。
Celery 支持重试:
@app.task(bind=True, max_retries=3)
def call_llm(self, prompt):
try:
return request_llm(prompt)
except Exception as exc:
raise self.retry(exc=exc, countdown=10)
重试不能无限制,要有最大次数和间隔。
【八、削峰填谷是什么意思】
削峰填谷是消息队列的典型作用。
比如突然有 10000 个用户上传文件,如果接口直接处理,服务可能扛不住。
有队列后:
请求高峰 -> 任务先进队列
Worker 按能力慢慢消费
用户会看到“任务处理中”,系统不会被瞬间打爆。
【九、常见坑】
- 把所有业务都扔进队列,导致链路复杂。
- 任务没有幂等性,重试后重复扣款或重复写数据。
- 没有任务状态表,用户不知道处理到哪了。
- Worker 挂了没人发现。
- 失败任务没有记录错误原因。
- AI 调用任务不做超时和重试,成本和稳定性失控。
更多推荐
所有评论(0)