python 消息队列、异步分布式
一.消息队列消息队列:是在消息的传输过程中保存消息的容器。消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不同的进程。生产者往管道中写消息,消费者从管道中读消息。操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。其中P指producer,即生产者;C指consumer,即消费者。中间的红色表示
一.消息队列
消息队列:是在消息的传输过程中保存消息的容器。消息队列最经典的用法就是消费者和生成者之间通过消息管道来传递消息,消费者和生成者是不同的进程。生产者往管道中写消息,消费者从管道中读消息。
操作系统提供了很多机制来实现进程间的通信 ,multiprocessing模块就提供了Queue和Pipe两种方法来实现。
其中P指producer,即生产者;C指consumer,即消费者。中间的红色表示消息队列,实例中表现为HELLO队列。
1.生产消费实例 Queue 单向进行,即生产者只进行发消息,消费者只进行收
#导入模块
from multiprocessing import Queue
from threading import Thread
import time
#创建生产者
def producer(q):
print("start producer")
for i in range(10):
q.put(i) #发消息
time.sleep(0.5)
print("end producer")
#创建消费者,消费者一般是个死循环,要一直监听是否有需要处理的信息。
def customer(q):
print("start customer")
while 1:
data = q.get() #收消息
print("customer has get value {0}".format(data))
if __name__ == '__main__':
q = Queue() #创建一个队列
pro = Thread(target=producer,args=(q,))
cus = Thread(target=customer,args=(q,))
pro.start()
cus.start()
结果为:
start producer
start customer
producer has produced value 0
customer has get value 0
producer has produced value 1
customer has get value 1
producer has produced value 2
customer has get value 2
producer has produced value 3
customer has get value 3
producer has produced value 4
customer has get value 4
producer has produced value 5
customer has get value 5
producer has produced value 6
customer has get value 6
producer has produced value 7
customer has get value 7
producer has produced value 8
customer has get value 8
producer has produced value 9
customer has get value 9
end producer
可见生产一个则消费一个。
2.通过Mutiprocess里面的Pipe来实现消息队列:
Pipe方法返回(conn1, conn2)代表一个管道的两个端。Pipe方法有duplex参数,如果duplex参数为True(默认值),那么这个管道是全双工模式,也就是说conn1和conn2均可收发。duplex为False,conn1只负责接受消息,conn2只负责发送消息。
send和recv方法分别是发送和接受消息的方法。
close方法表示关闭管道,当消息接受结束以后,关闭管道。
from multiprocessing import Pipe, Process
from threading import Thread
import time
def proc1(pipe):
for i in range(10):
print("send {0}".format(i))
pipe.send(i)
time.sleep(0.5)
print("end proc1")
def proc2(pipe):
n =10
while n:
print("proc2 recv {0}".format(pipe.recv()))
n -=1
if __name__ == '__main__':
(p1,p2) = Pipe(duplex=False)
pr = Process(target=proc1,args=(p2,))
cu = Process(target=proc2,args=(p1,))
pr.start()
cu.start()
结果为:
send 0
proc2 recv 0
send 1
proc2 recv 1
send 2
proc2 recv 2
send 3
proc2 recv 3
send 4
proc2 recv 4
send 5
proc2 recv 5
send 6
proc2 recv 6
send 7
proc2 recv 7
send 8
proc2 recv 8
send 9
proc2 recv 9
end proc1
3.Python提供了Queue模块来专门实现消息队列Queue对象
Queue对象实现一个fifo队列(其他的还有lifo、priority队列,这里不再介绍)。queue只有maxsize一个构造参数,用来指定队列容量,指定为0的时候代表容量无限。主要有以下成员函数:q.qsize() #返回当前队列的空间
q.empty() #判断当前队列是否为空
q.full() #判断当前队列是否满了
q.put() #发消息
q.get() #获取消息
q.task_done() #接受消息的线程条用该函数来说明消息对应的任务是否已经完成
q.join() #等待队列为空,再执行别的操作
二.Celery异步分布式
Celery是一个python开发的异步分布式任务调度模块。
Celery本身并不提供消息服务,使用第三方服务,也就是borker来传递任务,目前支持rebbimq,redis, 数据库等。
`这里我们使用redis
连接url的格式为:
redis://:password@hostname:port/db_number
例如:
BROKER_URL = 'redis://localhost:6379/0'
1.安装celery
pip install celerypip install redis
在服务器上安装redis服务器,并启动redis
第一个简单的例子:在任意路径下创建一个文件。
vim sixgod.py
#/usr/bin/env#coding=utf-8
from celery import Celery,platforms
platforms.C_FORCE_ROOT = True
broker = "redis://localhost:6379/5"
backend = "redis://localhost:6379/6"
app = Celery("sixgod",broker=broker,backend=backend)
@app.task
def add(x,y):
return x+y
2.启动worker
#celery -A sixgod worker -l info3.生产者
在pycharm中将刚才写的文件导入:
4.传入信息
from sixgod import add
re = add.delay(10,20)
运行之后观察服务器端情况:
5.获取
re.result #获取结果
re.ready) #是否处理
re.get #获取结果
re.id #获取id
from sixgod import add
re = add.delay(100,200)
print(re.id) #获取id
运行:
823220ed-abff-45cb-a5f4-42c53c4d33e9
6.登录redis查看
可以看到他们的id部分是相同的。
更多推荐
所有评论(0)