学习Celery在项目中使用的总结
开始本文不是针对分析celery或者教学的,只是在学习之余对自己在项目中使用的总结,董老师知乎上有篇文章写的非常好,大家可以移步,我也是看了这篇文章了解了很多。如果想直接看项目的直接移步github项目。项目中Celery是使用redis最为代理的,功能主要是:1. 发送邮件;2. 定时更新一些有时效性的数据,判断是否到期;配置官方文档的配置列表在这里。下面是项目中的配置,使用的cronta
开始
本文不是针对分析celery或者教学的,只是在学习之余对自己在项目中使用的总结,董老师知乎上有篇文章写的非常好,大家可以移步,我也是看了这篇文章了解了很多。
如果想直接看项目的直接移步github项目。
项目中Celery是使用redis最为代理的,功能主要是:
1. 发送邮件;
2. 定时更新一些有时效性的数据,判断是否到期;
配置
官方文档的配置列表在这里。
下面是项目中的配置,使用的crontab的方式执行周期任务:
# config/settings.py
...
# Celery.
CELERY_BROKER_URL = 'redis://:devpassword@redis:6379/0' # redis作为消息代理
CELERY_RESULT_BACKEND = CELERY_BROKER_URL # 任务结果存储在redis
CELERY_ACCEPT_CONTENT = ['json'] # 接受的内容类型
CELERY_TASK_SERIALIZER = 'json' # 任务序列化和反序列化使用json
CELERY_RESULT_SERIALIZER = 'json' # 同上,结果使用json
CELERY_REDIS_MAX_CONNECTIONS = 5 # 允许redis连接池用于发取消息的连接数
CELERYBEAT_SCHEDULE = { # 任务调度,定期将任务发送到队列中
'mark-soon-to-expire-credit-cards': {
'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards',
'schedule': crontab(hour=0, minute=0)
},
'expire-old-coupons': {
'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
'schedule': crontab(hour=0, minute=1)
},
}
docker-compose.yml配置
celery:
build: . # 指定Dockerfile文件所在位置
command: celery worker -B -l info -A snakeeyes.blueprints.contact.tasks # 运行celery worker,-A是指定应用,-l是指定消息级别为info,-B启动Beat定时任务
env_file: # 环境配置
- '.env'
volumes: # 挂载路径
- '.:/snakeeyes'
app.py中celery的构造函数
这个构造函数是根据Flask官方配置写的,目的是增加上下文支持等。
# 为了后续方便管理和修改,单独将Celery的include参数值放到这里,这里就是所有通过Celery调度的任务列表
CELERY_TASK_LIST = [
'snakeeyes.blueprints.contact.tasks',
'snakeeyes.blueprints.feedback.tasks',
'snakeeyes.blueprints.user.tasks',
'snakeeyes.blueprints.billing.tasks',
]
def create_celery_app(app=None):
"""
Create a new Celery object and tie together the Celery config to the app's
config. Wrap all tasks in the context of the application.
:param app: Flask app
:return: Celery app
"""
app = app or create_app()
celery = Celery(app.import_name, broker=app.config['CELERY_BROKER_URL'],
include=CELERY_TASK_LIST)
celery.conf.update(app.config)
TaskBase = celery.Task
class ContextTask(TaskBase):
abstract = True
def __call__(self, *args, **kwargs):
with app.app_context():
return TaskBase.__call__(self, *args, **kwargs)
celery.Task = ContextTask
return celery
tasks
项目中的任务一般都放在各个蓝图的tasks.py中。比如重置密码的邮件:
celery = create_celery_app() # 构造celery实例
@celery.task() # 用过装饰器告诉celery任务函数
def deliver_password_reset_email(user_id, reset_token):
"""
Send a reset password e-mail to a user.
:param user_id: The user id
:type user_id: int
:param reset_token: The reset token
:type reset_token: str
:return: None if a user was not found
"""
user = User.query.get(user_id)
if user is None:
return
ctx = {'user': user, 'reset_token': reset_token}
send_template_message(subject='Password reset from Snake Eyes',
recipients=[user.email],
template='user/mail/password_reset', ctx=ctx) # 实际渲染邮件和发送邮件的函数
return None
实际应用是在User model下面一个重置密码的类方法:
@classmethod
def initialize_password_reset(cls, identity):
"""
Generate a token to reset the password for a specific user.
:param identity: User e-mail address or username
:type identity: str
:return: User instance
"""
u = User.find_by_identity(identity) # 查找用户
reset_token = u.serialize_token() # 序列化token
# This prevents circular imports.
from snakeeyes.blueprints.user.tasks import (
deliver_password_reset_email)
deliver_password_reset_email.delay(u.id, reset_token) # 通过调用delay将任务发送
return u
在views里面用户触发是通过提交一个密码重置的表单,如果用户在网页上点击了重置密码就会提交这个表单,触发后面的操作,并通过flash告知用户信息,将用户重定向到user.login这个view下面:
@user.route('/account/begin_password_reset', methods=['GET', 'POST'])
@anonymous_required()
def begin_password_reset():
form = BeginPasswordResetForm()
if form.validate_on_submit():
u = User.initialize_password_reset(request.form.get('identity'))
flash('An email has been sent to {0}.'.format(u.email), 'success')
return redirect(url_for('user.login'))
return render_template('user/begin_password_reset.html', form=form)
下面是邮件中通过jinja2渲染的链接:
{{ url_for('user.password_reset', reset_token=reset_token, _external=True) }}
可以看到链接会将用户定位到user.password_reset这个view。我们再看看这个view是怎么操作的。
可以看到初次get请求会得到password_reset.html,如果在这个页面有提交重置密码的表单的话,会用表单的数据替换掉对应User的数据并保存,达到修改密码的效果。
@user.route('/account/password_reset', methods=['GET', 'POST'])
@anonymous_required()
def password_reset():
form = PasswordResetForm(reset_token=request.args.get('reset_token'))
if form.validate_on_submit():
u = User.deserialize_token(request.form.get('reset_token'))
if u is None:
flash('Your reset token has expired or was tampered with.',
'error')
return redirect(url_for('user.begin_password_reset'))
form.populate_obj(u)
u.password = User.encrypt_password(request.form.get('password'))
u.save()
if login_user(u):
flash('Your password has been reset.', 'success')
return redirect(url_for('user.settings'))
return render_template('user/password_reset.html', form=form)
定时任务
上面的task说的是用户触发的操作,将任务推送到队列中。定时任务我们已经在配置中设置好了(有关crontab的参数详见这里):
CELERYBEAT_SCHEDULE = { # 任务调度,定期将任务发送到队列中
'mark-soon-to-expire-credit-cards': { # 名称
'task': 'snakeeyes.blueprints.billing.tasks.mark_old_credit_cards', # 任务
'schedule': crontab(hour=0, minute=0) # 每天凌晨执行
},
'expire-old-coupons': {
'task': 'snakeeyes.blueprints.billing.tasks.expire_old_coupons',
'schedule': crontab(hour=0, minute=1) # 每天零点一分执行
},
}
这里说明第一个任务,每天凌晨检查并标记即将到期的信用卡。根据任务路径可以看到代码片段:
@celery.task()
def mark_old_credit_cards():
"""
Mark credit cards that are going to expire soon or have expired.
:return: Result of updating the records
"""
return CreditCard.mark_old_credit_cards() # 调用的是CreditCard这个类下面的一个类方法并返回结果
可以看到和生产任务的方式类似,只是在配置是将其放入CELERYBEAT_SCHEDULE
这个字典。
更多推荐
所有评论(0)