简介

在不同系统(或物理设备)之间,应用软件之间,程序进程之间,常常会有各种互相的信息传递;为保证消息传递的可靠性,对所传消息引入一个保存的容器:一方面用来接收发送者产生的信息,一方面在接收者正常的情况下完成消息的派送,并在无法接收消息时对信息进行存储,然后在适当的时机完成信息的派送。一般称该容器为消息队列


适用场景

对于部分需要较长时间处理的任务类型,采用传统的同步处理方式会带来较长时间的性能损耗或是不良的用户体验,如上传文件处理,图像压缩,发送邮件等,此类实时性要求不是特别高的任务类型,采用异步消息队列的模式可以提升处理效率。

这里写图片描述
这里写图片描述


消息分发与任务调度的实现机制

producer发出调用请求(message包含所调用任务的相关信息)—>celery服务启动时,会产生一个或多个交换机(exchanges),对应的交换机 接收请求message—>交换机根据message内容,将message分发到一个或多个符合条件的队列(queue)—>每个队列上都有一个或多个worker在监听,在监听到符合条件的message到达后,worker负责进行任务处理,任务处理完被确认后,队列中的message将被删除。

这里写图片描述

celery支持所有AMQP(Advanced Message Queuing Protocol:高级消息队列协议)路由机制,可以通过配置的方式,执行相关的
消息路由路径,例子如下:

# 在main.py中增加以下代码

from kombu import Exchange, Queue

#增加两个task,test_queue_1与test_queue_2
@app.task
def test_queue_1():
    return 'queue1'

@app.task
def test_queue_2():
    return 'queue2'

# queue_1与queue_2为消息队列名称
# Exchange:为交换机实例,具有不同的类型。详细参考
# routing_key:用来告知exchange将task message传送至相对应的queue

queue = (
    Queue('queue_1', Exchange('Exchange1', type='direct'), routing_key='queue_1_key'),
    Queue('queue_2', Exchange('Exchange2', type='direct'), routing_key='queue_2_key')
)
route = {
    'main.test_queue_1': {'queue': 'queue_1', 'routing_key': 'queue_1_key'},
    'main.test_queue_2': {'queue': 'queue_2', 'routing_key': 'queue_2_key'}
}

app.conf.update(CELERY_QUEUES=queue, CELERY_ROUTES=route)


#########
# 为直观的观察效果,开启终端1运行以下命令,监听queue_1队列:
celery -A main worker -Q queue_1 --loglevel=debug

# 开启终端2运行以下命令,监听queue_2队列:
celery -A main worker -Q queue_2 --loglevel=debug

# 此时调用main.py中的test_queue_1和test_queue_2,会发现task被分发到各个对应的celery worker服务。
# 对于没有被队列接收的sayhi函数,通过sayhi.apply_async(queue='queue_1’)可以将任务分发到queue_1

celery的广播模式(该模式仅仅在broker采用RabbitMQ,Redis才可以使用,例子如下:

# 在main.py中,将broker='mongodb://localhost:27017/workers_tasks' 改为broker='amqp://',同时启动本地rabbitmq服务
# 添加以下代码,
from kombu.common import Broadcast

@app.task
def broadcast():
    return 'broadcast'

queue_bor = (
    Broadcast('broadcast_tasks’), #此处设置消息队列broadcast为广播模式,及该队列上的消息会发送至所有监听它的worker
    Queue('broadcast_tasks'),
)
queue_route = {
    'main.broadcast': {'queue': 'broadcast_tasks'},
}
app.conf.update(CELERY_QUEUES=queue_bor, CELERY_ROUTES=queue_route)


# 按以上配置后,同时在两个终端启动celery服务,通过broadcast.delay()调用task时,会发现两个celery实例均有执行broadcast函数。
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐