python~多进程简单实例
代码实例os.fork():linux系统上应用fork()函数,属于linux内建函数,并且只在Linux系统下存在。它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的PID。这样做的理由是,一个父进程可以fork()出...
代码实例
os.fork():linux系统上应用
fork()函数,属于linux内建函数,并且只在Linux系统下存在。它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的PID。这样做的理由是,一个父进程可以fork()出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID,子进程只需要调用os.getpid()函数可以获取自己的进程号。
#!/user/bin/python
#encoding=utf-8
import os
print (os.getpid()) #主进程的id
pid = os.fork() #创建一个子进程,只能在linux下运行
print (pid) #子进程id和0
if pid == 0:
print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
print ('I (%s) just created a child process (%s).' % (os.getpid(), pid))
#运行后可见效果,更容易理解
multiprocessing模块
multiprocessing模块就是跨平台版本的多进程管理包,支持子进程、通信和共享数据、执行不同形式的同步
Multiprocessing模块创建进程使用的是Process类
Process类的构造方法:
help(multiprocessing.Process)
__init__(self, group=None, target=None, name=None, args=(), kwargs={})
参数说明:
group:进程所属组,基本不用。
target:表示调用对象,一般为函数。
args:表示调用对象的位置参数元组。
name:进程别名。
kwargs:表示调用对象的字典。
创建进程
思路:
1.生成子进程,给它指定一个完成的任务(一个函数)
2.生成所有的子进程,并start
3.join一下所有的子进程,等待所有的子进程执行完毕了,在执行主进程的剩余代码
代码示例1:
#encoding:utf-8
import multiprocessing
def do(n) :
#获取当前线程的名字
name = multiprocessing.current_process().name
print(name,'starting')
print("worker ", n)
return
if __name__ == '__main__' :
numList = []
for i in range(5) :
p = multiprocessing.Process(target=do, args=(i,))
numList.append(p) #将子进程对象添加在列表中
p.start() #启动进程
#p.join() #p进程,通过join方法通知主进程死等我结束,再继续执行,相当于顺序执行了
for i in numList:
i.join() #每一个子进程执行结束后,开始下一个循环,待子进程全部执行完毕,再执行主进程
print("Process end.")
代码示例2:5个进程,新建5个文件,每个文件写100行自定义的文字
#encoding:utf-8
import multiprocessing
def writeContent(filepath):
with open(filepath,"w") as fp:
for i in range(100):
fp.write("glory"+str(i)+"\n")
if __name__=="__main__":
pList=[]
for i in range(1,6):
p=multiprocessing.Process(target=writeContent,args=("test"+str(i)+".txt",))
pList.append(p)
p.start()
for p in pList:
p.join()
print ("程序运行成功!")
os.fork() 和multiprocessing 的结合使用(linux环境下)
#!/usr/bin/python
# -*- coding: utf-8 -*-
from multiprocessing import Process
import os
import time
def sleeper(name, seconds):
print("Process ID# %s" % (os.getpid())) #获取当前进程
print("Parent Process ID# %s" % (os.getppid())) #获取当前进程的父进程
print("%s will sleep for %s seconds" % (name, seconds))
time.sleep(seconds)
child_proc = Process(target = sleeper, args = ('bob', 5))
child_proc.start()
print("in parent process after child process start")
print("parent process about to join child process")
child_proc.join()
print("in parent process after child process join" )
print("the parent's parent process: %s" % (os.getppid()))
多进程模板程序
#coding=utf-8
import multiprocessing
import urllib.request
import time
def func1(url) :
response = urllib.request.urlopen(url)
html = response.read()
print(html[0:20])
time.sleep(1)
def func2(url) :
response = urllib.request.urlopen(url)
html = response.read()
print(html[0:20])
time.sleep(1)
if __name__ == '__main__' :
p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")
p1.start()
p2.start()
p1.join()
p2.join()
time.sleep(1)
print("done!")
对比单进程和多进程程序执行的效率
#coding: utf-8
import multiprocessing
import time
def m1(x):
time.sleep(0.05)
return x * x
if __name__ == '__main__':
#多进程执行
pool = multiprocessing.Pool(multiprocessing.cpu_count()) #进程池,内核有几个就有几个进程
i_list = range(1000)
time1=time.time()
pool.map(m1, i_list)
time2=time.time()
print('time elapse:',time2-time1)
#单进程执行
time1=time.time()
list(map(m1, i_list))
time2=time.time()
print('time elapse:',time2-time1)
进程池
在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。
Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。
Pool类的方法
apply_async()
函数原型:apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
异步非阻塞的,不用等待当前进程执行完毕,随时根据系统调度来进行进程切换(建议使用)
注意:apply_async每次只能提交一个进程的请求
map()
函数原型:map(func, iterable[, chunksize=None])
Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。
注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。
close()
关闭进程池(Pool),使其不再接受新的任务
terminate()
立刻结束工作进程,不再处理未处理的任务
join()
使主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用
multiprocessing.cpu_count()
获取cup的核数
创建简单的进程池
代码示例一:使用进程池创建进程+map函数
#encoding:utf-8
import multiprocessing
import os
import time
import random
def func(x):
time.sleep(random.random()*4)
print ("pid:",os.getpid(),x*x)
#print (os.getppid())
return x*x
if __name__=="__main__":
pool=multiprocessing.Pool(multiprocessing.cpu_count())
i_list=range(8)
print (pool.map(func,i_list))
代码示例二:apply_async() 只能创建一个任务且执行一次
#encoding=utf-8
from multiprocessing import Pool
def f(x):
return x * x
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
result = pool.apply_async(f, [10]) # 只能创建一个任务,且执行一次,可以传多个参数,根据函数的参数来定
print(result) #ApplyResult object
print(result.get(timeout = 1)) #获取值
print(pool.map(f, range(10)))
代码示例三:实现给pool.map中的函数传递多个参数
#encoding=utf-8
from multiprocessing import Pool
“实现思想:定义一个类,有多个实例变量;然后定义一个函数,参数为类的实例,最后对类实例的实例变量进行运算”
def f(object):
return object.x * object.y
class A:
def __init__(self,a,b):
self.x =a
self.y =b
if __name__ == '__main__':
pool = Pool(processes = 4) # start 4 worker processes
params = [A(i,i) for i in range(10)] #推导列表生成实例列表
#print (params) #打印实例列表
print(pool.map(f,params))
代码示例四:多进程与单进程执行时间比较
#encoding=utf-8
import time
from multiprocessing import Pool
def run(fn):
#fn: 函数参数是数据列表的一个元素
time.sleep(1)
return fn * fn
if __name__ == "__main__":
testFL = [1,2,3,4,5,6] #testFL:要处理的数据列表
#顺序执行花费的时间
print ('Single process execution sequence:')
s = time.time()
for fn in testFL:
run(fn)
e1 = time.time()
print (u"顺序执行时间:", int(e1 - s))
#创建多个进程,并行执行花费的时间
print ('concurrent:')
pool = Pool(5) #创建拥有5个进程数量的进程池,假设核数就是4个,轮询处理4个,
rl =pool.map(run, testFL)
pool.close() #关闭进程池,不再接受新的任务
pool.join() #主进程阻塞等待子进程的退出
e2 = time.time()
print (u"并行执行时间:", int(e2 - e1))
进程同步(Queue和JoinableQueue)
multiprocessing.Queue类似于queue.Queue,一般用来多个进程间交互信息。Queue是进程和线程安全的。它实现queue.Queue的大部分方法,但task_done()和join()没有实现。
multiprocessing.JoinableQueue是multiprocessing.Queue的子类,增加了task_done()方法和join()方法。
task_done():用来告诉queue一个task完成。一般在调用get()时获得一个task,在task结束后调用task_done()来通知Queue当前task完成。
join():阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。
使用Queue
代码示例一:
#encoding=utf-8
from multiprocessing import Process, Queue
def offer(queue):
"入队列"
if queue.empty():
queue.put("Hello World")
else:
print(queue.get())
if __name__ == '__main__':
# 创建一个队列实例
q = Queue()
p = Process(target = offer, args = (q,))
p.start()
print(q.get()) # 出队列
q.put("tester")
m = Process(target = offer, args = (q,))
m.start()
p.join()
m.join()
代码示例二:
#encoding=utf-8
from multiprocessing import Process, Queue
import os, time, random
#写数据进程执行的代码:
def write(q):
for value in ['A', 'B', 'C']:
print('Put %s to queue...' % value)
q.put(value)
time.sleep(random.random())
# 读数据进程执行的代码
def read(q):
time.sleep(1)
while not q.empty(): #if not q.empty():
print('Get %s from queue.' % q.get(True))
time.sleep(1) #目的是等待写队列完成
if __name__=='__main__':
# 父进程创建Queue,并传给各个子进程
q = Queue()
pw = Process(target = write, args = (q,))
pr = Process(target = read, args = (q,))
pw.start() #启动子进程pw,写入
pr.start() #启动子进程pr,读取
pw.join() #等待pw结束
pr.join() #等待pr结束
print("Done!")
#由于操作系统对进程的调度时间不一样,所以该程序每次执行的结果均不一样。程序读队列函数中为什么要加一句time.sleep(1),
目的是等待写进程将数据写到队列中,防止有时写进程还没将数据写进队列,读进程就开始读了,导致读不到数据。但是这种并不能有效的预防此种情况的出现。
使用Queue&JoinableQueue
#encoding=utf-8
import multiprocessing
import time
class Consumer(multiprocessing.Process):
def __init__(self, task_queue, result_queue):
multiprocessing.Process.__init__(self)
self.task_queue = task_queue #任务队列
self.result_queue = result_queue #存储结果的结果队列
def run(self):
"重写原进程的run方法"
proc_name = self.name #取到当前进程的名字
while True:
next_task = self.task_queue.get()
if next_task is None: #为None时进程退出
print(('%s: Exiting' % proc_name))
self.task_queue.task_done()
break
print(('%s: %s' % (proc_name, next_task))) #打印next_task,自动调用的__str__方法
answer = next_task()
self.task_queue.task_done() #JoinableQueue特有的一定要执行,返回
self.result_queue.put(answer)
return
class Task(object):
def __init__(self, a, b):
self.a = a
self.b = b
def __call__(self):
time.sleep(0.1) # pretend to take some time to do the work
return '%s * %s = %s' % (self.a, self.b, self.a * self.b)
def __str__(self):
return '%s * %s' % (self.a, self.b)
if __name__ == '__main__':
tasks = multiprocessing.JoinableQueue() #任务队列
results = multiprocessing.Queue() #存结果队列
num_consumers = multiprocessing.cpu_count()
print(('Creating %d consumers' % num_consumers))
# 创建cup核数量数量个的子进程,使用推导列表生成子进程列表
consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
# 依次启动子进程
#print (consumers)
for w in consumers:
w.start()
num_jobs = 10
for i in range(num_jobs):
tasks.put(Task(i, i))
for i in range(num_consumers):
tasks.put(None) #有几个进程加几个None,以此结束死循环
tasks.join()
while num_jobs:
result = results.get()
print ('Result: %s' %result)
num_jobs -= 1
名词解释
线程安全:通过加锁的方式避免引起读写操作的冲突,确保数据的一致性(洗手间的例子)
线程不安全:与之相反,会造成数据的混乱
进程同步(加锁)
先看一下未加锁的实例
#未加锁处理,py2环境下运行,会出现打印数据错乱的情况
#encoding=utf-8
from multiprocessing import Process
import time
def l(num):
time.sleep(0.2)
print("Hello Num: %s" % (num))
if __name__ == '__main__':
for num in range(50):
Process(target = l, args = (num,)).start()
加锁后的实例(Lock)
#不会出现上述错乱的情况
#encoding=utf-8
from multiprocessing import Process, Lock
import time
def l(num,lock):
lock.acquire() # 获得锁
time.sleep(0.2)
print("Hello Num: %s" % (num))
lock.release() # 释放锁
if __name__ == '__main__':
lock = Lock() # 创建一个共享锁实例
for num in range(50):
Process(target = l, args = (num,lock)).start()
加多把锁(Semaphore)
Semaphore用于控制对共享资源的访问数量。Semaphore锁和Lock稍有不同,Semaphore相当于N把锁,获取其中一把就可以执行。可用锁的总数N在创建实例时传入,比如s = Semaphore(n)。与Lock一样,如果可用锁为0,进程将会阻塞,直到可用锁大于0。
#encoding=utf-8
import multiprocessing
import time
def worker(s, i):
s.acquire()
print(multiprocessing.current_process().name + " acquire")
time.sleep(i)
print(multiprocessing.current_process().name + " release")
s.release()
if __name__ == "__main__":
# 设置限制最多3个进程同时访问共享资源
s = multiprocessing.Semaphore(3)
for i in range(5):
p = multiprocessing.Process(target = worker, args = (s, i * 2))
p.start()
进程同步(信号传递-Event)
Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置
#encoding=utf-8
import multiprocessing
import time
def wait_for_event(e):
print('wait_for_event: starting')
e.wait() #等待收到能执行信号,如果一直未收到将一直阻塞
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
print('wait_for_event_timeout: starting')
e.wait(t) #等待t秒超时,此时Event的状态仍未未设置,继续执行
print('wait_for_event_timeout: e.is_set()->', e.is_set())
e.set() #初始内部标志为真
if __name__ == '__main__':
e = multiprocessing.Event()
print("begin,e.is_set()", e.is_set())
w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
w1.start()
w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout,args=(e, 2))
w2.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
#e.set() #可注释此句话看效果
print('main: event is set')
代码逻辑:
函数a-wait_for_event:e.wait():死等,等别的进程调用e.set(),才会继续执行
函数b-wait_for_event_timeout:e.wait(t):死等t秒,等别的进程调用e.set()或经过t秒,才会继续执行,继续执行了,就会调用e.set(),实现函数a就可以从死等状态变为继续执行执行状态
主程序:
声明了一个信号对象e
启动了进程a,执行函数a
启动了进程b,执行函数b
支持多个进程,待听到指令后死等的所有进程全部不用等了,比如:田径比赛,所有运动员听到发令枪声,就全都跑了
#encoding=utf-8
import multiprocessing
import time
def wait_for_event(e):
print('wait_for_event: starting')
e.wait()
print('wait_for_event: e.is_set()->', e.is_set())
def wait_for_event_timeout(e, t):
print('wait_for_event_timeout: starting')
e.wait(t)
print('wait_for_event_timeout: e.is_set()->', e.is_set())
e.set()
if __name__ == '__main__':
e = multiprocessing.Event()
print("begin,e.is_set()", e.is_set())
w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
w1.start()
w2 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
w2.start()
w3 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout,args=(e, 2))
w3.start()
print('main: waiting before calling Event.set()')
time.sleep(3)
#e.set() #可注释此句话看效果
print('main: event is set')
进程同步(使用管道-Pipe)
Pipe是两个进程间通信的工具。Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。
Pipe的每个端口同时最多一个进程读写,否则会出现各种问题,可能造成corruption异常。
Pipe对象建立的时候,返回一个含有两个元素的元组对象,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。
#encoding=utf-8
import multiprocessing as mp
def proc_1(pipe):
pipe.send('hello')
print('proc_1 received: %s' %pipe.recv())
pipe.send("what is your name?")
print('proc_1 received: %s' %pipe.recv())
def proc_2(pipe):
print('proc_2 received: %s' %pipe.recv())
pipe.send('hello, too')
print('proc_2 received: %s' %pipe.recv())
pipe.send("I don't tell you!")
if __name__ == '__main__':
# 创建一个管道对象pipe
pipe = mp.Pipe()
#print(len(pipe))
#print(type(pipe))
#print (pipe)
# 将第一个pipe对象传给进程1
p1 = mp.Process(target = proc_1, args = (pipe[0], ))
# 将第二个pipe对象传给进程2
p2 = mp.Process(target = proc_2, args = (pipe[1], ))
p1.start()
p2.start()
p1.join()
p2.join()
代码实例二:将获取的信息写入文件
#encoding=utf-8
import multiprocessing as mp
from multiprocessing import Process, Lock
def write_file(content,lock,file_path="a.txt"):
lock.acquire()
with open(file_path,"a") as fp:
fp.write(content+"\n")
lock.release()
def proc_1(pipe,lock):
pipe.send('hello')
info = pipe.recv()
print('proc_1 received: %s' %info)
write_file(info,lock)
pipe.send("what is your name?")
info = pipe.recv()
print('proc_1 received: %s' %info)
write_file(info,lock)
def proc_2(pipe,lock):
info = pipe.recv()
print('proc_2 received: %s' %info)
write_file(info,lock)
pipe.send('hello, too')
info = pipe.recv()
write_file(info,lock)
print('proc_2 received: %s' %info)
pipe.send("I don't tell you!")
if __name__ == '__main__':
pipe = mp.Pipe()
lock=Lock()
# 将第一个pipe对象传给进程1
p1 = mp.Process(target = proc_1, args = (pipe[0],lock ))
# 将第二个pipe对象传给进程2
p2 = mp.Process(target = proc_2, args = (pipe[1],lock ))
p1.start()
p2.start()
p1.join()
p2.join()
进程同步(使用Condition)
一个condition变量总是与某些类型的锁相联系,当几个condition变量必须共享同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪
condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和release() 会调用与锁相关联的相应方法
wait()方法会释放锁,当另外一个进程使用notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回;
wait(timeout=None) :等待通知,或者等到设定的超时时间;
notify()和notify_all()不会释放锁;
#encoding=utf-8
import multiprocessing as mp
import threading
import time
def consumer(cond):
with cond:
print("consumer before wait")
cond.wait() #等待消费,死等
#cond.wait(timeout=10) #等待10秒后还未消费自动结束
print("consumer after wait")
def producer(cond):
with cond:
print("producer before notifyAll")
cond.notify_all() #通知所有消费者可以消费了
#cond.notify() #只能通知一个消费者,想通知几个人就写几次
print("producer after notifyAll")
if __name__ == '__main__':
condition = mp.Condition()
p1 = mp.Process(name = "p1", target = consumer, args=(condition,))
p2 = mp.Process(name = "p2", target = consumer, args=(condition,))
p3 = mp.Process(name = "p3", target = producer, args=(condition,))
p1.start()
time.sleep(2)
p2.start()
time.sleep(2)
p3.start()
多进程间共享数字变量
未使用共享变量
#数据在子进程运行的时候被改变;但是主进程打印的数据仍为原值,互相独立不共享
#encoding=utf-8
from multiprocessing import Process
def f(n, a):
n = n+1
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = 0
arr =list(range(10))
p = Process(target = f, args = (num, arr))
p.start()
p.join()
print(num)
print(arr[:])
使用共享变量
#encoding=utf-8
from multiprocessing import Process, Value, Array
def f(n, a):
n.value = n.value+1
for i in range(len(a)):
a[i] = -a[i]
if __name__ == '__main__':
num = Value('d', 0.0) # 参数"d"(double简写)指定参数类型,创建一个进程间共享的数字类型,默认值为0
arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
p = Process(target = f, args = (num, arr))
p.start()
p.join()
#此时num.value为1.0,arr[:]为[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
p = Process(target = f, args = (num, arr))
p.start()
p.join()
print(num.value) # 获取共享变量num的值
print(arr[:])
使用共享变量、加锁
#encoding=utf-8
import time
from multiprocessing import Process, Value, Lock
class Counter(object):
def __init__(self, initval = 0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
“如果此处不对共享变量加锁的话,多个进程同时运行的结果每次都不一样”
with self.lock:
self.val.value += 1 # 共享变量自加1
#print(“increment one time!”,self.value() )
def value(self):
with self.lock:
return self.val.value
def func(counter):
for i in range(50):
time.sleep(0.01)
counter.increment()
if __name__ == '__main__':
counter = Counter(0)
procs = [Process(target = func, args = (counter,)) for i in range(10)]
# 等价于
# for i in range(10):
# Process(target = func, args = (counter,))
for p in procs:
p.start()
for p in procs:
p.join()
print(counter.value())
多进程间共享字符串变量
Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。
#encoding=utf-8
from multiprocessing import Process, Manager, Value
from ctypes import c_char_p
def greet(shareStr):
shareStr.value = shareStr.value + ", World!"
if __name__ == '__main__':
manager = Manager()
shareStr = manager.Value(c_char_p, "Hello")
processList = [Process(target = greet, args = (shareStr,)) for i in range(3)]
for process in processList:
process.start()
process.join()
print(shareStr.value)
多进程间共享字典和列表
#encoding=utf-8
from multiprocessing import Process, Manager
def f(shareDict, shareList):
shareDict[1] = '1'
shareDict['2'] = 2
shareDict[0.25] = None
shareList.reverse() # 翻转列表
if __name__ == '__main__':
manager = Manager()
shareDict = manager.dict() # 创建共享的字典类型
shareList = manager.list(range(10)) # 创建共享的列表类型
p = Process(target = f, args = (shareDict, shareList))
p.start()
p.join()
print(shareDict)
print(shareList)
多进程间共享实例
#encoding=utf-8
import time, os
import random
from multiprocessing import Pool, Value, Lock
from multiprocessing.managers import BaseManager
class MyManager(BaseManager):
pass
def Manager():
m = MyManager()
m.start()
return m
class Counter(object):
def __init__(self, initval=0):
self.val = Value('i', initval)
self.lock = Lock()
def increment(self):
with self.lock:
self.val.value += 1
def value(self):
with self.lock:
return self.val.value
#将Counter类注册到Manager管理类中
MyManager.register('Counter', Counter)
def long_time_task(name,counter):
time.sleep(0.2)
print('Run task %s (%s)...\n' % (name, os.getpid()))
start = time.time()
#time.sleep(random.random() * 3)
for i in range(50):
time.sleep(0.01)
counter.increment()
end = time.time()
print('Task %s runs %0.2f seconds.' % (name, (end - start)))
if __name__ == '__main__':
manager = Manager()
# 创建共享Counter类实例对象的变量,Counter类的初始值0
counter = manager.Counter(0)
print('Parent process %s.' % os.getpid())
p = Pool()
for i in range(5):
p.apply_async(long_time_task, args = (str(i), counter))
print('Waiting for all subprocesses done...')
p.close()
p.join()
print('All subprocesses done.')
print(counter.value())
逻辑思路:
继承BaseManager类,生成MyManager子类
定义一个Manager函数,对MyManager类进行实例化,并且启动MyManager的进程,返回MyManager类的实例对象
定义一个Counter类
MyManager.register('Counter', Counter) 将Counter类注册到Manager管理类中
定义多进程执行的任务函数,定义的类实例作为参数传入到任务函数中
在主程序中,定义进程池,for循环启动多个进程,并且每个进程都执行的是之前定义的任务函数
关闭进程池,等待子进程结束后再打印共享实例中的value值
更多推荐
所有评论(0)