后端项目里不是所有事情都应该在接口请求里立刻做完。

比如用户上传文件后要解析、发送邮件、生成报告、调用大模型、批量处理图片。如果这些任务都让用户等着,接口会很慢,甚至超时。

消息队列和 Celery 的价值就在这里:把耗时任务从主请求里拆出去,后台慢慢处理。

【一、什么是消息队列】

消息队列可以理解成一个任务中转站。

接口服务 -> 把任务放进队列 -> 后台 Worker 取任务执行

用户请求接口时,后端只需要快速创建任务并返回任务 id;真正耗时的工作交给后台进程处理。

常见消息队列或任务系统:

- Redis

- RabbitMQ

- Kafka

- Celery

- RQ

Python 后端里,Celery + Redis / RabbitMQ 很常见。

【二、为什么要异步任务】

适合异步处理的场景:

- 发送邮件、短信。

- 文件上传后的解析。

- 图片压缩、视频转码。

- Excel 导入导出。

- 爬虫任务。

- AI 大模型调用。

- RAG 文档切片和向量化。

- 定时任务。

- 报表生成。

这些任务有共同点:耗时、不适合让用户一直等、失败后可以重试。

【三、同步接口的问题】

假设用户上传一个 PDF,后端要解析、切片、向量化、入库。如果同步执行:

用户上传 PDF

-> 接口等待解析完成

-> 等待模型 embedding

-> 等待写入向量库

-> 最后返回

如果这个过程 30 秒,用户体验很差,还可能网关超时。

更合理的做法:

用户上传 PDF

-> 后端创建任务

-> 返回 task_id

-> Worker 后台处理

-> 前端轮询任务状态

【四、Celery 是什么】

Celery 是 Python 里常用的分布式任务队列。

它通常由几部分组成:

- Producer:生产任务的一方,一般是 FastAPI/Django 接口。

- Broker:消息中间件,比如 Redis 或 RabbitMQ。

- Worker:执行任务的后台进程。

- Result Backend:保存任务结果的地方,比如 Redis。

流程:

API 服务提交任务

-> Redis/RabbitMQ 保存任务消息

-> Celery Worker 获取任务

-> 执行业务逻辑

-> 保存任务状态和结果

【五、Celery 简单示例】

安装:

pip install celery redis

任务文件:

from celery import Celery

app = Celery(

    "tasks",

    broker="redis://localhost:6379/0",

    backend="redis://localhost:6379/1",

)

@app.task

def add(x, y):

    return x + y

提交任务:

result = add.delay(3, 5)

print(result.id)

启动 worker:

celery -A tasks worker --loglevel=info

【六、任务状态怎么设计】

项目里不要只依赖 Celery 自带状态,最好在数据库里有自己的任务表。

例如:

tasks

- id

- user_id

- task_type

- status: pending/running/success/failed

- input

- output

- error_message

- created_at

- updated_at

这样前端可以查询:

GET /tasks/{task_id}

返回:

{

  "id": "task_001",

  "status": "running",

  "progress": 60

}

【七、失败重试为什么重要】

异步任务经常会失败:

- 网络超时。

- 第三方 API 临时不可用。

- 大模型接口限流。

- 文件格式异常。

- 数据库连接失败。

Celery 支持重试:

@app.task(bind=True, max_retries=3)

def call_llm(self, prompt):

    try:

        return request_llm(prompt)

    except Exception as exc:

        raise self.retry(exc=exc, countdown=10)

重试不能无限制,要有最大次数和间隔。

【八、削峰填谷是什么意思】

削峰填谷是消息队列的典型作用。

比如突然有 10000 个用户上传文件,如果接口直接处理,服务可能扛不住。

有队列后:

请求高峰 -> 任务先进队列

Worker 按能力慢慢消费

用户会看到“任务处理中”,系统不会被瞬间打爆。

【九、常见坑】

- 把所有业务都扔进队列,导致链路复杂。

- 任务没有幂等性,重试后重复扣款或重复写数据。

- 没有任务状态表,用户不知道处理到哪了。

- Worker 挂了没人发现。

- 失败任务没有记录错误原因。

- AI 调用任务不做超时和重试,成本和稳定性失控。

更多推荐