开始

本文不是针对分析celery或者教学的,只是在学习之余对自己在项目中使用的总结,董老师知乎上有篇文章写的非常好,大家可以移步,我也是看了这篇文章了解了很多。

如果想直接看项目的直接移步github项目

项目中Celery是使用redis最为代理的,功能主要是:
1. 发送邮件;
2. 定时更新一些有时效性的数据,判断是否到期;

配置

官方文档的配置列表在这里。

下面是项目中的配置,使用的crontab的方式执行周期任务:

# config/settings.py
...
# Celery.
CELERY_BROKER_URL = 'redis://:devpassword@redis:6379/0'  # redis作为消息代理
CELERY_RESULT_BACKEND = CELERY_BROKER_URL  # 任务结果存储在redis
CELERY_ACCEPT_CONTENT = ['json']  # 接受的内容类型
CELERY_TASK_SERIALIZER = 'json'  # 任务序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json'  # 同上,结果使用json
CELERY_REDIS_MAX_CONNECTIONS = 5  # 允许redis连接池用于发取消息的连接数
CELERYBEAT_SCHEDULE = {  # 任务调度,定期将任务发送到队列中 
    'mark-soon-to-expire-credit-cards': {
        'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',
        'schedule': crontab(hour=0, minute=0)
    },
    'expire-old-coupons': {
        'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
        'schedule': crontab(hour=0, minute=1)
    },
}

docker-compose.yml配置

celery:
  build: .  # 指定Dockerfile文件所在位置
  command: celery worker -B -l info -A snakeeyes.blueprints.contact.tasks  # 运行celery worker,-A是指定应用,-l是指定消息级别为info,-B启动Beat定时任务
  env_file:  # 环境配置
    - '.env'
  volumes:  # 挂载路径
    - '.:/snakeeyes'

app.py中celery的构造函数

这个构造函数是根据Flask官方配置写的,目的是增加上下文支持等。

# 为了后续方便管理和修改,单独将Celery的include参数值放到这里,这里就是所有通过Celery调度的任务列表
CELERY_TASK_LIST = [
    'snakeeyes.blueprints.contact.tasks',
    'snakeeyes.blueprints.feedback.tasks',
    'snakeeyes.blueprints.user.tasks',
    'snakeeyes.blueprints.billing.tasks',
]


def create_celery_app(app=None):
    """
    Create a new Celery object and tie together the Celery config to the app's
    config. Wrap all tasks in the context of the application.

    :param app: Flask app
    :return: Celery app
    """
    app = app or create_app()

    celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
                    include=CELERY_TASK_LIST)
    celery.conf.update(app.config)
    TaskBase = celery.Task

    class ContextTask(TaskBase):
        abstract = True

        def __call__(self, *args, **kwargs):
            with app.app_context():
                return TaskBase.__call__(self, *args, **kwargs)

    celery.Task = ContextTask
    return celery

tasks

项目中的任务一般都放在各个蓝图的tasks.py中。比如重置密码的邮件:

celery = create_celery_app()  # 构造celery实例


@celery.task()  # 用过装饰器告诉celery任务函数
def deliver_password_reset_email(user_id, reset_token):
    """
    Send a reset password e-mail to a user.

    :param user_id: The user id
    :type user_id: int
    :param reset_token: The reset token
    :type reset_token: str
    :return: None if a user was not found
    """
    user = User.query.get(user_id)

    if user is None:
        return

    ctx = {'user': user, 'reset_token': reset_token}

    send_template_message(subject='Password reset from Snake Eyes',
                          recipients=[user.email],
                          template='user/mail/password_reset', ctx=ctx)  # 实际渲染邮件和发送邮件的函数

    return None

实际应用是在User model下面一个重置密码的类方法:

    @classmethod
    def initialize_password_reset(cls, identity):
        """
        Generate a token to reset the password for a specific user.

        :param identity: User e-mail address or username
        :type identity: str
        :return: User instance
        """
        u = User.find_by_identity(identity)  # 查找用户
        reset_token = u.serialize_token()  # 序列化token

        # This prevents circular imports.
        from snakeeyes.blueprints.user.tasks import (
            deliver_password_reset_email)
        deliver_password_reset_email.delay(u.id, reset_token)  # 通过调用delay将任务发送

        return u

在views里面用户触发是通过提交一个密码重置的表单,如果用户在网页上点击了重置密码就会提交这个表单,触发后面的操作,并通过flash告知用户信息,将用户重定向到user.login这个view下面:

@user.route('/account/begin_password_reset', methods=['GET', 'POST'])
@anonymous_required()
def begin_password_reset():
    form = BeginPasswordResetForm()

    if form.validate_on_submit():
        u = User.initialize_password_reset(request.form.get('identity'))

        flash('An email has been sent to {0}.'.format(u.email), 'success')
        return redirect(url_for('user.login'))

    return render_template('user/begin_password_reset.html', form=form)

下面是邮件中通过jinja2渲染的链接:

{{ url_for('user.password_reset', reset_token=reset_token, _external=True) }}

可以看到链接会将用户定位到user.password_reset这个view。我们再看看这个view是怎么操作的。
可以看到初次get请求会得到password_reset.html,如果在这个页面有提交重置密码的表单的话,会用表单的数据替换掉对应User的数据并保存,达到修改密码的效果。

@user.route('/account/password_reset', methods=['GET', 'POST'])
@anonymous_required()
def password_reset():
    form = PasswordResetForm(reset_token=request.args.get('reset_token'))

    if form.validate_on_submit():
        u = User.deserialize_token(request.form.get('reset_token'))

        if u is None:
            flash('Your reset token has expired or was tampered with.',
                  'error')
            return redirect(url_for('user.begin_password_reset'))

        form.populate_obj(u)
        u.password = User.encrypt_password(request.form.get('password'))
        u.save()

        if login_user(u):
            flash('Your password has been reset.', 'success')
            return redirect(url_for('user.settings'))

    return render_template('user/password_reset.html', form=form)

定时任务

上面的task说的是用户触发的操作,将任务推送到队列中。定时任务我们已经在配置中设置好了(有关crontab的参数详见这里):

CELERYBEAT_SCHEDULE = {  # 任务调度,定期将任务发送到队列中 
    'mark-soon-to-expire-credit-cards': {  # 名称
        'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',  # 任务
        'schedule': crontab(hour=0, minute=0)  # 每天凌晨执行
    },
    'expire-old-coupons': {
        'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
        'schedule': crontab(hour=0, minute=1)  # 每天零点一分执行
    },
}

这里说明第一个任务,每天凌晨检查并标记即将到期的信用卡。根据任务路径可以看到代码片段:

@celery.task()
def mark_old_credit_cards():
    """
    Mark credit cards that are going to expire soon or have expired.

    :return: Result of updating the records
    """
    return CreditCard.mark_old_credit_cards()  # 调用的是CreditCard这个类下面的一个类方法并返回结果

可以看到和生产任务的方式类似,只是在配置是将其放入CELERYBEAT_SCHEDULE这个字典。

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐