从0开始手撕大模型应用项目-智能学术系统-第四节-消息队列服务开发指南
Redis + Celery消息队列系统为Scholar-AI项目提供了强大的异步任务处理能力。通过合理的设计和优化,可以支持高并发、高可用的任务处理需求。核心优势:高性能:Redis内存存储,Celery多进程并发高可用:分布式架构,故障自动转移易扩展:水平扩展,动态调整资源易监控:完善的监控和日志系统易维护:成熟的生态,丰富的工具长时间运行的任务(PDF解析、LLM推理)高并发请求处理定时任务
·
消息队列系统开发指南
技术选型
为什么选择Redis + Celery?
1. Redis作为消息代理的优势
高性能特性:
- 内存存储:Redis基于内存,读写速度极快
- 数据结构丰富:支持List、Set、Sorted Set等,适合不同场景
- 原子操作:所有操作都是原子性的,保证数据一致性
- 持久化支持:RDB + AOF双重保障,数据不丢失
消息队列特性:
- List结构:实现FIFO队列,支持阻塞式读取
- Pub/Sub:支持发布订阅模式
- Sorted Set:支持优先级队列
- 过期机制:自动清理过期任务
与项目集成:
- 项目已有Redis缓存系统,复用基础设施
- 减少技术栈复杂度
- 统一运维管理
2. Celery作为任务队列的优势
分布式特性:
- 多进程/多机器:支持水平扩展
- 任务路由:可以指定任务到特定worker
- 负载均衡:自动分配任务到可用worker
- 故障转移:worker故障时自动重新分配
功能丰富:
- 任务重试:支持指数退避重试
- 任务优先级:支持不同优先级任务
- 定时任务:支持cron表达式
- 任务链:支持任务依赖和流水线
- 结果存储:任务执行结果持久化
监控完善:
- Flower:Web界面监控
- 详细日志:任务执行日志
- 指标统计:成功率、延迟等指标
技术选型对比
方案 | 优势 | 劣势 | 适用场景 |
---|---|---|---|
Redis + Celery | 高性能、功能丰富、易扩展 | 配置复杂、学习成本高 | 中大型项目、复杂业务逻辑 |
RabbitMQ + Celery | 可靠性高、功能完善 | 性能较低、资源消耗大 | 对可靠性要求极高的场景 |
RQ (Redis Queue) | 简单易用、轻量级 | 功能有限、扩展性差 | 小型项目、简单任务 |
自建队列 | 完全可控、定制化 | 开发成本高、稳定性差 | 特殊需求、技术实力强 |
为什么不做其他选择?
不选择RabbitMQ的原因:
- 项目已有Redis基础设施,避免引入新组件
- RabbitMQ配置复杂,运维成本高
- 对于当前项目规模,Redis性能已足够
不选择RQ的原因:
- 功能相对简单,扩展性有限
- 缺少企业级特性(监控、管理界面等)
系统架构设计
核心组件说明
1. 任务调度器 (Task Scheduler)
- Celery Beat:定时任务调度
- 自定义调度器:基于业务逻辑的任务调度
- 任务优先级管理:高优先级任务优先执行
2. 消息代理 (Message Broker)
- Redis:存储任务队列和结果
- 队列分离:不同类型任务使用不同队列
- 持久化:任务不丢失
3. 任务执行器 (Task Executors)
- Worker进程:执行具体任务
- 进程池:提高并发处理能力
- 资源隔离:不同类型任务使用不同worker
Celery原理详解
Celery架构原理
1. 核心概念
Producer(生产者)
- 创建任务并发送到消息队列
- 可以是Web应用、定时任务、其他服务
- 通过
delay()
或apply_async()
方法发送任务
Broker(消息代理)
- 存储任务消息的中间件
- 我们使用Redis作为Broker
- 支持多种消息格式(JSON、Pickle等)
Worker(消费者)
- 从队列中获取任务并执行
- 可以运行在多个进程/机器上
- 支持并发执行多个任务
Result Backend(结果后端)
- 存储任务执行结果
- 我们使用Redis作为结果存储
- 支持结果过期和清理
2. 任务执行流程
1. Producer创建任务
↓
2. 任务序列化并发送到Broker
↓
3. Worker从Broker获取任务
↓
4. Worker反序列化并执行任务
↓
5. 执行结果存储到Result Backend
↓
6. Producer获取执行结果
3. 任务状态管理
PENDING:任务已创建,等待执行
STARTED:任务开始执行
SUCCESS:任务执行成功
FAILURE:任务执行失败
RETRY:任务重试中
REVOKED:任务被取消
Celery配置详解
1. 基础配置
# celery_app.py
from celery import Celery
# 创建Celery应用
celery_app = Celery(
'scholar_ai',
broker='redis://localhost:6379/1', # 使用Redis作为消息代理
backend='redis://localhost:6379/2', # 使用Redis存储结果
include=['message_queue.tasks'] # 包含任务模块
)
# 基础配置
celery_app.conf.update(
# 任务序列化
task_serializer='json',
accept_content=['json'],
result_serializer='json',
# 时区设置
timezone='Asia/Shanghai',
enable_utc=True,
# 任务路由
task_routes={
'message_queue.tasks.pdf_parse': {'queue': 'pdf_queue'},
'message_queue.tasks.llm_inference': {'queue': 'llm_queue'},
'message_queue.tasks.web_search': {'queue': 'search_queue'},
},
# 任务优先级
task_default_priority=5,
task_inherit_parent_priority=True,
# 结果过期时间
result_expires=3600, # 1小时
# 任务重试配置
task_acks_late=True,
worker_prefetch_multiplier=1,
# 定时任务配置
beat_schedule={
'cleanup-expired-tasks': {
'task': 'message_queue.tasks.cleanup_expired_tasks',
'schedule': 300.0, # 每5分钟执行一次
},
},
)
2. 高级配置
# 任务重试配置
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def retry_task(self):
try:
# 任务逻辑
pass
except Exception as exc:
# 指数退避重试
raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))
# 任务超时配置
@celery_app.task(time_limit=300, soft_time_limit=240) # 硬超时5分钟,软超时4分钟
def long_running_task():
pass
# 任务优先级配置
@celery_app.task(priority=10) # 高优先级
def high_priority_task():
pass
# 任务链配置
from celery import chain, group, chord
# 顺序执行
job = chain(task1.s(), task2.s(), task3.s())
# 并行执行
job = group(task1.s(), task2.s(), task3.s())
# 并行后执行
job = chord(group(task1.s(), task2.s()), task3.s())
Redis在消息队列中的作用
1. 作为消息代理 (Message Broker)
队列实现:
# Redis List实现FIFO队列
LPUSH task_queue task_data # 入队
BRPOP task_queue timeout # 出队(阻塞式)
# 优先级队列
ZADD priority_queue score task_data # 按分数入队
ZPOPMAX priority_queue # 获取最高优先级任务
消息持久化:
- RDB快照:定期保存数据快照
- AOF日志:记录每个写操作
- 双重保障确保任务不丢失
2. 作为结果存储 (Result Backend)
结果存储:
# 存储任务结果
SET task_result:task_id result_data EX 3600
# 获取任务结果
GET task_result:task_id
# 设置结果过期
EXPIRE task_result:task_id 3600
状态跟踪:
# 任务状态
HSET task_status:task_id status "SUCCESS" timestamp 1234567890
# 任务进度
HSET task_progress:task_id progress 50 total 100
3. 分布式锁实现
防止重复执行:
# 获取锁
SET lock:task_type task_id NX EX 300
# 释放锁
DEL lock:task_type
核心组件实现
1. 任务管理器 (TaskManager)
# task_manager.py
from typing import Optional, Dict, Any, List
from celery.result import AsyncResult
from .models import TaskStatus, TaskPriority, TaskResult
from .celery_app import celery_app
class TaskManager:
"""任务管理器 - 统一的任务管理接口"""
def __init__(self):
self.celery_app = celery_app
async def submit_task(
self,
task_name: str,
args: tuple = (),
kwargs: dict = None,
priority: TaskPriority = TaskPriority.NORMAL,
queue: str = None,
eta: datetime = None,
countdown: int = None
) -> str:
"""提交任务"""
task_kwargs = kwargs or {}
# 设置任务选项
task_options = {
'priority': priority.value,
}
if queue:
task_options['queue'] = queue
if eta:
task_options['eta'] = eta
if countdown:
task_options['countdown'] = countdown
# 提交任务
result = self.celery_app.send_task(
task_name,
args=args,
kwargs=task_kwargs,
**task_options
)
return result.id
async def get_task_result(self, task_id: str) -> Optional[TaskResult]:
"""获取任务结果"""
result = AsyncResult(task_id, app=self.celery_app)
if result.state == TaskStatus.PENDING:
return TaskResult(
task_id=task_id,
status=TaskStatus.PENDING,
result=None,
error=None
)
elif result.state == TaskStatus.SUCCESS:
return TaskResult(
task_id=task_id,
status=TaskStatus.SUCCESS,
result=result.result,
error=None
)
elif result.state == TaskStatus.FAILURE:
return TaskResult(
task_id=task_id,
status=TaskStatus.FAILURE,
result=None,
error=str(result.info)
)
else:
return TaskResult(
task_id=task_id,
status=TaskStatus.STARTED,
result=None,
error=None
)
async def cancel_task(self, task_id: str) -> bool:
"""取消任务"""
result = AsyncResult(task_id, app=self.celery_app)
result.revoke(terminate=True)
return True
async def get_active_tasks(self) -> List[Dict[str, Any]]:
"""获取活跃任务列表"""
inspect = self.celery_app.control.inspect()
active_tasks = inspect.active()
tasks = []
for worker, task_list in active_tasks.items():
for task in task_list:
tasks.append({
'worker': worker,
'task_id': task['id'],
'task_name': task['name'],
'args': task['args'],
'kwargs': task['kwargs'],
'time_start': task['time_start']
})
return tasks
2. 数据模型 (Models)
# models.py
from enum import Enum
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime
class TaskStatus(Enum):
"""任务状态枚举"""
PENDING = "PENDING"
STARTED = "STARTED"
SUCCESS = "SUCCESS"
FAILURE = "FAILURE"
RETRY = "RETRY"
REVOKED = "REVOKED"
class TaskPriority(Enum):
"""任务优先级枚举"""
LOW = 1
NORMAL = 5
HIGH = 10
URGENT = 20
@dataclass
class TaskResult:
"""任务结果数据类"""
task_id: str
status: TaskStatus
result: Any
error: Optional[str]
created_at: Optional[datetime] = None
started_at: Optional[datetime] = None
completed_at: Optional[datetime] = None
duration: Optional[float] = None
@dataclass
class TaskInfo:
"""任务信息数据类"""
task_id: str
task_name: str
args: tuple
kwargs: dict
priority: TaskPriority
queue: str
status: TaskStatus
created_at: datetime
eta: Optional[datetime] = None
retries: int = 0
max_retries: int = 3
任务类型设计
1. PDF解析任务
# tasks/pdf_tasks.py
from celery import current_task
from ..pdf_parse.pdf_service import PDFService
from ..pdf_parse.models import ParseRequest, ParseResult
@celery_app.task(bind=True, name='pdf.parse_document')
def parse_pdf_document(self, file_path: str, parse_options: dict = None) -> dict:
"""PDF文档解析任务"""
try:
# 更新任务状态
current_task.update_state(
state='STARTED',
meta={'progress': 0, 'message': '开始解析PDF文档'}
)
# 创建解析请求
request = ParseRequest(
file_path=file_path,
options=parse_options or {}
)
# 执行解析
pdf_service = PDFService()
result = pdf_service.parse_document(request)
# 更新进度
current_task.update_state(
state='PROGRESS',
meta={'progress': 50, 'message': 'PDF解析完成,正在处理结果'}
)
# 返回结果
return {
'success': True,
'result': result.to_dict(),
'message': 'PDF解析完成'
}
except Exception as exc:
# 记录错误
current_task.update_state(
state='FAILURE',
meta={'error': str(exc), 'message': 'PDF解析失败'}
)
raise exc
2. LLM推理任务
# tasks/llm_tasks.py
from ..llm_scheduler.llm_service import LLMService
from ..llm_scheduler.models import InferenceRequest, InferenceResult
@celery_app.task(bind=True, name='llm.inference', time_limit=300)
def llm_inference(self, prompt: str, model_config: dict = None) -> dict:
"""LLM推理任务"""
try:
# 更新任务状态
current_task.update_state(
state='STARTED',
meta={'progress': 0, 'message': '开始LLM推理'}
)
# 创建推理请求
request = InferenceRequest(
prompt=prompt,
model_config=model_config or {}
)
# 执行推理
llm_service = LLMService()
result = llm_service.inference(request)
# 更新进度
current_task.update_state(
state='PROGRESS',
meta={'progress': 80, 'message': 'LLM推理完成,正在处理结果'}
)
# 返回结果
return {
'success': True,
'result': result.to_dict(),
'message': 'LLM推理完成'
}
except Exception as exc:
# 记录错误
current_task.update_state(
state='FAILURE',
meta={'error': str(exc), 'message': 'LLM推理失败'}
)
raise exc
3. 网络搜索任务
# tasks/search_tasks.py
from ..web_search.search_service import SearchService
from ..web_search.models import SearchRequest, SearchResult
@celery_app.task(bind=True, name='search.web_search')
def web_search_task(self, query: str, search_options: dict = None) -> dict:
"""网络搜索任务"""
try:
# 更新任务状态
current_task.update_state(
state='STARTED',
meta={'progress': 0, 'message': '开始网络搜索'}
)
# 创建搜索请求
request = SearchRequest(
query=query,
options=search_options or {}
)
# 执行搜索
search_service = SearchService()
result = search_service.search(request)
# 更新进度
current_task.update_state(
state='PROGRESS',
meta={'progress': 70, 'message': '搜索完成,正在处理结果'}
)
# 返回结果
return {
'success': True,
'result': result.to_dict(),
'message': '网络搜索完成'
}
except Exception as exc:
# 记录错误
current_task.update_state(
state='FAILURE',
meta={'error': str(exc), 'message': '网络搜索失败'}
)
raise exc
性能优化策略
1. Worker优化
并发配置:
# 根据CPU核心数配置并发
import multiprocessing
concurrency = multiprocessing.cpu_count() * 2
# 启动命令
celery -A message_queue.celery_app worker --concurrency=8 --loglevel=info
内存优化:
# 限制Worker内存使用
celery -A message_queue.celery_app worker --max-memory-per-child=200000
2. 队列优化
队列分离:
# 不同类型任务使用不同队列
task_routes = {
'pdf.*': {'queue': 'pdf_queue'},
'llm.*': {'queue': 'llm_queue'},
'search.*': {'queue': 'search_queue'},
}
优先级队列:
# 使用Redis Sorted Set实现优先级队列
ZADD priority_queue 10 high_priority_task
ZADD priority_queue 5 normal_priority_task
ZADD priority_queue 1 low_priority_task
总结
Redis + Celery消息队列系统为Scholar-AI项目提供了强大的异步任务处理能力。通过合理的设计和优化,可以支持高并发、高可用的任务处理需求。
核心优势:
- 高性能:Redis内存存储,Celery多进程并发
- 高可用:分布式架构,故障自动转移
- 易扩展:水平扩展,动态调整资源
- 易监控:完善的监控和日志系统
- 易维护:成熟的生态,丰富的工具
适用场景:
- 长时间运行的任务(PDF解析、LLM推理)
- 高并发请求处理
- 定时任务调度
- 异步数据处理
更多推荐
所有评论(0)