消息队列系统开发指南

技术选型

为什么选择Redis + Celery?

1. Redis作为消息代理的优势

高性能特性:

  • 内存存储:Redis基于内存,读写速度极快
  • 数据结构丰富:支持List、Set、Sorted Set等,适合不同场景
  • 原子操作:所有操作都是原子性的,保证数据一致性
  • 持久化支持:RDB + AOF双重保障,数据不丢失

消息队列特性:

  • List结构:实现FIFO队列,支持阻塞式读取
  • Pub/Sub:支持发布订阅模式
  • Sorted Set:支持优先级队列
  • 过期机制:自动清理过期任务

与项目集成:

  • 项目已有Redis缓存系统,复用基础设施
  • 减少技术栈复杂度
  • 统一运维管理
2. Celery作为任务队列的优势

分布式特性:

  • 多进程/多机器:支持水平扩展
  • 任务路由:可以指定任务到特定worker
  • 负载均衡:自动分配任务到可用worker
  • 故障转移:worker故障时自动重新分配

功能丰富:

  • 任务重试:支持指数退避重试
  • 任务优先级:支持不同优先级任务
  • 定时任务:支持cron表达式
  • 任务链:支持任务依赖和流水线
  • 结果存储:任务执行结果持久化

监控完善:

  • Flower:Web界面监控
  • 详细日志:任务执行日志
  • 指标统计:成功率、延迟等指标

技术选型对比

方案 优势 劣势 适用场景
Redis + Celery 高性能、功能丰富、易扩展 配置复杂、学习成本高 中大型项目、复杂业务逻辑
RabbitMQ + Celery 可靠性高、功能完善 性能较低、资源消耗大 对可靠性要求极高的场景
RQ (Redis Queue) 简单易用、轻量级 功能有限、扩展性差 小型项目、简单任务
自建队列 完全可控、定制化 开发成本高、稳定性差 特殊需求、技术实力强

为什么不做其他选择?

不选择RabbitMQ的原因:

  • 项目已有Redis基础设施,避免引入新组件
  • RabbitMQ配置复杂,运维成本高
  • 对于当前项目规模,Redis性能已足够

不选择RQ的原因:

  • 功能相对简单,扩展性有限
  • 缺少企业级特性(监控、管理界面等)

系统架构设计

核心组件说明

1. 任务调度器 (Task Scheduler)
  • Celery Beat:定时任务调度
  • 自定义调度器:基于业务逻辑的任务调度
  • 任务优先级管理:高优先级任务优先执行
2. 消息代理 (Message Broker)
  • Redis:存储任务队列和结果
  • 队列分离:不同类型任务使用不同队列
  • 持久化:任务不丢失
3. 任务执行器 (Task Executors)
  • Worker进程:执行具体任务
  • 进程池:提高并发处理能力
  • 资源隔离:不同类型任务使用不同worker

Celery原理详解

Celery架构原理

1. 核心概念

Producer(生产者)

  • 创建任务并发送到消息队列
  • 可以是Web应用、定时任务、其他服务
  • 通过delay()apply_async()方法发送任务

Broker(消息代理)

  • 存储任务消息的中间件
  • 我们使用Redis作为Broker
  • 支持多种消息格式(JSON、Pickle等)

Worker(消费者)

  • 从队列中获取任务并执行
  • 可以运行在多个进程/机器上
  • 支持并发执行多个任务

Result Backend(结果后端)

  • 存储任务执行结果
  • 我们使用Redis作为结果存储
  • 支持结果过期和清理
2. 任务执行流程
1. Producer创建任务
   ↓
2. 任务序列化并发送到Broker
   ↓
3. Worker从Broker获取任务
   ↓
4. Worker反序列化并执行任务
   ↓
5. 执行结果存储到Result Backend
   ↓
6. Producer获取执行结果
3. 任务状态管理

PENDING:任务已创建,等待执行
STARTED:任务开始执行
SUCCESS:任务执行成功
FAILURE:任务执行失败
RETRY:任务重试中
REVOKED:任务被取消

Celery配置详解

1. 基础配置
# celery_app.py
from celery import Celery

# 创建Celery应用
celery_app = Celery(
    'scholar_ai',
    broker='redis://localhost:6379/1',  # 使用Redis作为消息代理
    backend='redis://localhost:6379/2',  # 使用Redis存储结果
    include=['message_queue.tasks']  # 包含任务模块
)

# 基础配置
celery_app.conf.update(
    # 任务序列化
    task_serializer='json',
    accept_content=['json'],
    result_serializer='json',
    
    # 时区设置
    timezone='Asia/Shanghai',
    enable_utc=True,
    
    # 任务路由
    task_routes={
        'message_queue.tasks.pdf_parse': {'queue': 'pdf_queue'},
        'message_queue.tasks.llm_inference': {'queue': 'llm_queue'},
        'message_queue.tasks.web_search': {'queue': 'search_queue'},
    },
    
    # 任务优先级
    task_default_priority=5,
    task_inherit_parent_priority=True,
    
    # 结果过期时间
    result_expires=3600,  # 1小时
    
    # 任务重试配置
    task_acks_late=True,
    worker_prefetch_multiplier=1,
    
    # 定时任务配置
    beat_schedule={
        'cleanup-expired-tasks': {
            'task': 'message_queue.tasks.cleanup_expired_tasks',
            'schedule': 300.0,  # 每5分钟执行一次
        },
    },
)
2. 高级配置
# 任务重试配置
@celery_app.task(bind=True, autoretry_for=(Exception,), retry_kwargs={'max_retries': 3, 'countdown': 60})
def retry_task(self):
    try:
        # 任务逻辑
        pass
    except Exception as exc:
        # 指数退避重试
        raise self.retry(exc=exc, countdown=60 * (2 ** self.request.retries))

# 任务超时配置
@celery_app.task(time_limit=300, soft_time_limit=240)  # 硬超时5分钟,软超时4分钟
def long_running_task():
    pass

# 任务优先级配置
@celery_app.task(priority=10)  # 高优先级
def high_priority_task():
    pass

# 任务链配置
from celery import chain, group, chord

# 顺序执行
job = chain(task1.s(), task2.s(), task3.s())

# 并行执行
job = group(task1.s(), task2.s(), task3.s())

# 并行后执行
job = chord(group(task1.s(), task2.s()), task3.s())

Redis在消息队列中的作用

1. 作为消息代理 (Message Broker)

队列实现:

# Redis List实现FIFO队列
LPUSH task_queue task_data  # 入队
BRPOP task_queue timeout   # 出队(阻塞式)

# 优先级队列
ZADD priority_queue score task_data  # 按分数入队
ZPOPMAX priority_queue              # 获取最高优先级任务

消息持久化:

  • RDB快照:定期保存数据快照
  • AOF日志:记录每个写操作
  • 双重保障确保任务不丢失

2. 作为结果存储 (Result Backend)

结果存储:

# 存储任务结果
SET task_result:task_id result_data EX 3600

# 获取任务结果
GET task_result:task_id

# 设置结果过期
EXPIRE task_result:task_id 3600

状态跟踪:

# 任务状态
HSET task_status:task_id status "SUCCESS" timestamp 1234567890

# 任务进度
HSET task_progress:task_id progress 50 total 100

3. 分布式锁实现

防止重复执行:

# 获取锁
SET lock:task_type task_id NX EX 300

# 释放锁
DEL lock:task_type

核心组件实现

1. 任务管理器 (TaskManager)

# task_manager.py
from typing import Optional, Dict, Any, List
from celery.result import AsyncResult
from .models import TaskStatus, TaskPriority, TaskResult
from .celery_app import celery_app

class TaskManager:
    """任务管理器 - 统一的任务管理接口"""
    
    def __init__(self):
        self.celery_app = celery_app
    
    async def submit_task(
        self,
        task_name: str,
        args: tuple = (),
        kwargs: dict = None,
        priority: TaskPriority = TaskPriority.NORMAL,
        queue: str = None,
        eta: datetime = None,
        countdown: int = None
    ) -> str:
        """提交任务"""
        task_kwargs = kwargs or {}
        
        # 设置任务选项
        task_options = {
            'priority': priority.value,
        }
        
        if queue:
            task_options['queue'] = queue
        if eta:
            task_options['eta'] = eta
        if countdown:
            task_options['countdown'] = countdown
        
        # 提交任务
        result = self.celery_app.send_task(
            task_name,
            args=args,
            kwargs=task_kwargs,
            **task_options
        )
        
        return result.id
    
    async def get_task_result(self, task_id: str) -> Optional[TaskResult]:
        """获取任务结果"""
        result = AsyncResult(task_id, app=self.celery_app)
        
        if result.state == TaskStatus.PENDING:
            return TaskResult(
                task_id=task_id,
                status=TaskStatus.PENDING,
                result=None,
                error=None
            )
        elif result.state == TaskStatus.SUCCESS:
            return TaskResult(
                task_id=task_id,
                status=TaskStatus.SUCCESS,
                result=result.result,
                error=None
            )
        elif result.state == TaskStatus.FAILURE:
            return TaskResult(
                task_id=task_id,
                status=TaskStatus.FAILURE,
                result=None,
                error=str(result.info)
            )
        else:
            return TaskResult(
                task_id=task_id,
                status=TaskStatus.STARTED,
                result=None,
                error=None
            )
    
    async def cancel_task(self, task_id: str) -> bool:
        """取消任务"""
        result = AsyncResult(task_id, app=self.celery_app)
        result.revoke(terminate=True)
        return True
    
    async def get_active_tasks(self) -> List[Dict[str, Any]]:
        """获取活跃任务列表"""
        inspect = self.celery_app.control.inspect()
        active_tasks = inspect.active()
        
        tasks = []
        for worker, task_list in active_tasks.items():
            for task in task_list:
                tasks.append({
                    'worker': worker,
                    'task_id': task['id'],
                    'task_name': task['name'],
                    'args': task['args'],
                    'kwargs': task['kwargs'],
                    'time_start': task['time_start']
                })
        
        return tasks

2. 数据模型 (Models)

# models.py
from enum import Enum
from dataclasses import dataclass
from typing import Any, Optional
from datetime import datetime

class TaskStatus(Enum):
    """任务状态枚举"""
    PENDING = "PENDING"
    STARTED = "STARTED"
    SUCCESS = "SUCCESS"
    FAILURE = "FAILURE"
    RETRY = "RETRY"
    REVOKED = "REVOKED"

class TaskPriority(Enum):
    """任务优先级枚举"""
    LOW = 1
    NORMAL = 5
    HIGH = 10
    URGENT = 20

@dataclass
class TaskResult:
    """任务结果数据类"""
    task_id: str
    status: TaskStatus
    result: Any
    error: Optional[str]
    created_at: Optional[datetime] = None
    started_at: Optional[datetime] = None
    completed_at: Optional[datetime] = None
    duration: Optional[float] = None

@dataclass
class TaskInfo:
    """任务信息数据类"""
    task_id: str
    task_name: str
    args: tuple
    kwargs: dict
    priority: TaskPriority
    queue: str
    status: TaskStatus
    created_at: datetime
    eta: Optional[datetime] = None
    retries: int = 0
    max_retries: int = 3

任务类型设计

1. PDF解析任务

# tasks/pdf_tasks.py
from celery import current_task
from ..pdf_parse.pdf_service import PDFService
from ..pdf_parse.models import ParseRequest, ParseResult

@celery_app.task(bind=True, name='pdf.parse_document')
def parse_pdf_document(self, file_path: str, parse_options: dict = None) -> dict:
    """PDF文档解析任务"""
    try:
        # 更新任务状态
        current_task.update_state(
            state='STARTED',
            meta={'progress': 0, 'message': '开始解析PDF文档'}
        )
        
        # 创建解析请求
        request = ParseRequest(
            file_path=file_path,
            options=parse_options or {}
        )
        
        # 执行解析
        pdf_service = PDFService()
        result = pdf_service.parse_document(request)
        
        # 更新进度
        current_task.update_state(
            state='PROGRESS',
            meta={'progress': 50, 'message': 'PDF解析完成,正在处理结果'}
        )
        
        # 返回结果
        return {
            'success': True,
            'result': result.to_dict(),
            'message': 'PDF解析完成'
        }
        
    except Exception as exc:
        # 记录错误
        current_task.update_state(
            state='FAILURE',
            meta={'error': str(exc), 'message': 'PDF解析失败'}
        )
        raise exc

2. LLM推理任务

# tasks/llm_tasks.py
from ..llm_scheduler.llm_service import LLMService
from ..llm_scheduler.models import InferenceRequest, InferenceResult

@celery_app.task(bind=True, name='llm.inference', time_limit=300)
def llm_inference(self, prompt: str, model_config: dict = None) -> dict:
    """LLM推理任务"""
    try:
        # 更新任务状态
        current_task.update_state(
            state='STARTED',
            meta={'progress': 0, 'message': '开始LLM推理'}
        )
        
        # 创建推理请求
        request = InferenceRequest(
            prompt=prompt,
            model_config=model_config or {}
        )
        
        # 执行推理
        llm_service = LLMService()
        result = llm_service.inference(request)
        
        # 更新进度
        current_task.update_state(
            state='PROGRESS',
            meta={'progress': 80, 'message': 'LLM推理完成,正在处理结果'}
        )
        
        # 返回结果
        return {
            'success': True,
            'result': result.to_dict(),
            'message': 'LLM推理完成'
        }
        
    except Exception as exc:
        # 记录错误
        current_task.update_state(
            state='FAILURE',
            meta={'error': str(exc), 'message': 'LLM推理失败'}
        )
        raise exc

3. 网络搜索任务

# tasks/search_tasks.py
from ..web_search.search_service import SearchService
from ..web_search.models import SearchRequest, SearchResult

@celery_app.task(bind=True, name='search.web_search')
def web_search_task(self, query: str, search_options: dict = None) -> dict:
    """网络搜索任务"""
    try:
        # 更新任务状态
        current_task.update_state(
            state='STARTED',
            meta={'progress': 0, 'message': '开始网络搜索'}
        )
        
        # 创建搜索请求
        request = SearchRequest(
            query=query,
            options=search_options or {}
        )
        
        # 执行搜索
        search_service = SearchService()
        result = search_service.search(request)
        
        # 更新进度
        current_task.update_state(
            state='PROGRESS',
            meta={'progress': 70, 'message': '搜索完成,正在处理结果'}
        )
        
        # 返回结果
        return {
            'success': True,
            'result': result.to_dict(),
            'message': '网络搜索完成'
        }
        
    except Exception as exc:
        # 记录错误
        current_task.update_state(
            state='FAILURE',
            meta={'error': str(exc), 'message': '网络搜索失败'}
        )
        raise exc

性能优化策略

1. Worker优化

并发配置:

# 根据CPU核心数配置并发
import multiprocessing
concurrency = multiprocessing.cpu_count() * 2

# 启动命令
celery -A message_queue.celery_app worker --concurrency=8 --loglevel=info

内存优化:

# 限制Worker内存使用
celery -A message_queue.celery_app worker --max-memory-per-child=200000

2. 队列优化

队列分离:

# 不同类型任务使用不同队列
task_routes = {
    'pdf.*': {'queue': 'pdf_queue'},
    'llm.*': {'queue': 'llm_queue'},
    'search.*': {'queue': 'search_queue'},
}

优先级队列:

# 使用Redis Sorted Set实现优先级队列
ZADD priority_queue 10 high_priority_task
ZADD priority_queue 5 normal_priority_task
ZADD priority_queue 1 low_priority_task

总结

Redis + Celery消息队列系统为Scholar-AI项目提供了强大的异步任务处理能力。通过合理的设计和优化,可以支持高并发、高可用的任务处理需求。

核心优势:

  1. 高性能:Redis内存存储,Celery多进程并发
  2. 高可用:分布式架构,故障自动转移
  3. 易扩展:水平扩展,动态调整资源
  4. 易监控:完善的监控和日志系统
  5. 易维护:成熟的生态,丰富的工具

适用场景:

  • 长时间运行的任务(PDF解析、LLM推理)
  • 高并发请求处理
  • 定时任务调度
  • 异步数据处理
Logo

更多推荐