代码实例

os.fork():linux系统上应用

fork()函数,属于linux内建函数,并且只在Linux系统下存在。它非常特殊。普通的函数调用,调用一次,返回一次,但是fork()调用一次,返回两次,因为操作系统自动把当前进程(称为父进程)复制了一份(称为子进程),然后分别在父进程和子进程内返回。子进程永远返回0,而父进程返回子进程的PID。这样做的理由是,一个父进程可以fork()出很多子进程,所以,父进程要记下每个子进程的ID,而子进程只需要调用getppid()就可以拿到父进程的ID,子进程只需要调用os.getpid()函数可以获取自己的进程号。

#!/user/bin/python

#encoding=utf-8

import os

print (os.getpid())      #主进程的id
pid = os.fork()          #创建一个子进程,只能在linux下运行
print (pid)              #子进程id和0
if pid == 0:
  print ('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
  print ('I (%s) just created a child process (%s).' % (os.getpid(), pid))

#运行后可见效果,更容易理解


multiprocessing模块

multiprocessing模块就是跨平台版本的多进程管理包,支持子进程、通信和共享数据、执行不同形式的同步

Multiprocessing模块创建进程使用的是Process类

Process类的构造方法:

help(multiprocessing.Process)

__init__(self, group=None, target=None, name=None, args=(), kwargs={})

参数说明:

group:进程所属组,基本不用。

target:表示调用对象,一般为函数。

args:表示调用对象的位置参数元组。

name:进程别名。

kwargs:表示调用对象的字典。

创建进程

思路:

1.生成子进程,给它指定一个完成的任务(一个函数)

2.生成所有的子进程,并start

3.join一下所有的子进程,等待所有的子进程执行完毕了,在执行主进程的剩余代码

代码示例1:

#encoding:utf-8

import multiprocessing

def do(n) :
  #获取当前线程的名字
  name = multiprocessing.current_process().name
  print(name,'starting')
  print("worker ", n)
  return

if __name__ == '__main__' :
  numList = []
  for i in range(5) :
    p = multiprocessing.Process(target=do, args=(i,))
    numList.append(p)    #将子进程对象添加在列表中
    p.start()            #启动进程
    #p.join()            #p进程,通过join方法通知主进程死等我结束,再继续执行,相当于顺序执行了

  for i in numList:
    i.join()             #每一个子进程执行结束后,开始下一个循环,待子进程全部执行完毕,再执行主进程

  print("Process end.")

代码示例2:5个进程,新建5个文件,每个文件写100行自定义的文字

#encoding:utf-8
import multiprocessing

def writeContent(filepath):
    with open(filepath,"w")  as  fp:
        for i in range(100):
            fp.write("glory"+str(i)+"\n")

if __name__=="__main__":
    pList=[]
    for i in range(1,6):
        p=multiprocessing.Process(target=writeContent,args=("test"+str(i)+".txt",))
        pList.append(p)
        p.start()

    for p in pList:
        p.join()

    print ("程序运行成功!")

os.fork() 和multiprocessing 的结合使用(linux环境下)

#!/usr/bin/python

# -*- coding: utf-8 -*-

from multiprocessing import Process  
import os  
import time  

def sleeper(name, seconds):  
    print("Process ID# %s" % (os.getpid()))  #获取当前进程
    print("Parent Process ID# %s" % (os.getppid()))  #获取当前进程的父进程
    print("%s will sleep for %s seconds" % (name, seconds))  
    time.sleep(seconds)

child_proc = Process(target = sleeper, args = ('bob', 5))  
child_proc.start()  
print("in parent process after child process start")  
print("parent process about to join child process")
child_proc.join()
print("in parent process after child process join" )
print("the parent's parent process: %s" % (os.getppid()))


多进程模板程序

#coding=utf-8

import multiprocessing
import urllib.request
import time

def func1(url) :
  response = urllib.request.urlopen(url)
  html = response.read()
  print(html[0:20])
  time.sleep(1)

def func2(url) :
  response = urllib.request.urlopen(url)
  html = response.read()
  print(html[0:20])
  time.sleep(1)

if __name__ == '__main__' :
    p1 = multiprocessing.Process(target=func1,args=("http://www.sogou.com",),name="gloryroad1")
    p2 = multiprocessing.Process(target=func2,args=("http://www.baidu.com",),name="gloryroad2")

    p1.start()
    p2.start()
    p1.join()
    p2.join()
    time.sleep(1)
    print("done!")

对比单进程和多进程程序执行的效率

#coding: utf-8

import multiprocessing
import time

def m1(x):
    time.sleep(0.05)
    return x * x

if __name__ == '__main__':
    #多进程执行
    pool = multiprocessing.Pool(multiprocessing.cpu_count())  #进程池,内核有几个就有几个进程
    i_list = range(1000)
    time1=time.time()
    pool.map(m1, i_list)
    time2=time.time()
    print('time elapse:',time2-time1)

    #单进程执行
    time1=time.time()
    list(map(m1, i_list))
    time2=time.time()
    print('time elapse:',time2-time1)

进程池

在使用Python进行系统管理时,特别是同时操作多个文件目录或者远程控制多台主机,并行操作可以节约大量的时间。如果操作的对象数目不大时,还可以直接使用Process类动态的生成多个进程,十几个还好,但是如果上百个甚至更多,那手动去限制进程数量就显得特别的繁琐,此时进程池就派上用场了。

Pool类可以提供指定数量的进程供用户调用,当有新的请求提交到Pool中时,如果池还没有满,就会创建一个新的进程来执行请求。如果池满,请求就会告知先等待,直到池中有进程结束,才会创建新的进程来执行这些请求。

Pool类的方法

apply_async()
函数原型:apply_async(func, args=(), kwds={}, callback=None, error_callback=None)
异步非阻塞的,不用等待当前进程执行完毕,随时根据系统调度来进行进程切换(建议使用)
注意:apply_async每次只能提交一个进程的请求

map()

函数原型:map(func, iterable[, chunksize=None])

Pool类中的map方法,与内置的map函数用法行为基本一致,它会使进程阻塞直到返回结果。

注意,虽然第二个参数是一个迭代器,但在实际使用中,必须在整个队列都就绪后,程序才会运行子进程。

close()

关闭进程池(Pool),使其不再接受新的任务

terminate()

立刻结束工作进程,不再处理未处理的任务

join()

使主进程阻塞等待子进程的退出,join方法必须在close或terminate之后使用

multiprocessing.cpu_count()

获取cup的核数

创建简单的进程池

代码示例一:使用进程池创建进程+map函数

#encoding:utf-8

import multiprocessing
import os
import time
import random

def func(x):
    time.sleep(random.random()*4)
    print ("pid:",os.getpid(),x*x)
    #print (os.getppid())
    return x*x

if __name__=="__main__":
    pool=multiprocessing.Pool(multiprocessing.cpu_count())
    i_list=range(8)
    print (pool.map(func,i_list))

代码示例二:apply_async()  只能创建一个任务且执行一次

#encoding=utf-8

from multiprocessing import Pool

def f(x):
    return x * x

if __name__ == '__main__':
    pool = Pool(processes = 4)      # start 4 worker processes
    result = pool.apply_async(f, [10])  # 只能创建一个任务,且执行一次,可以传多个参数,根据函数的参数来定
    print(result)  #ApplyResult object
    print(result.get(timeout = 1))   #获取值
    print(pool.map(f, range(10)))   

代码示例三:实现给pool.map中的函数传递多个参数

#encoding=utf-8

from multiprocessing import Pool

“实现思想:定义一个类,有多个实例变量;然后定义一个函数,参数为类的实例,最后对类实例的实例变量进行运算”
def f(object):
    return object.x * object.y

class A:

    def __init__(self,a,b):
        self.x =a
        self.y =b

if __name__ == '__main__':
    pool = Pool(processes = 4)      # start 4 worker processes
    params = [A(i,i) for i in range(10)]      #推导列表生成实例列表
    #print (params)  #打印实例列表
    print(pool.map(f,params))   

代码示例四:多进程与单进程执行时间比较

#encoding=utf-8

import time
from multiprocessing import Pool

def run(fn):
  #fn: 函数参数是数据列表的一个元素
  time.sleep(1)
  return fn * fn

if __name__ == "__main__":
  testFL = [1,2,3,4,5,6]   #testFL:要处理的数据列表

  #顺序执行花费的时间
  print ('Single process execution sequence:')
  s = time.time()
  for fn in testFL:
    run(fn)
  e1 = time.time()
  print (u"顺序执行时间:", int(e1 - s))

  #创建多个进程,并行执行花费的时间
  print ('concurrent:')
  pool = Pool(5)       #创建拥有5个进程数量的进程池,假设核数就是4个,轮询处理4个,
  rl =pool.map(run, testFL)
  pool.close()         #关闭进程池,不再接受新的任务
  pool.join()          #主进程阻塞等待子进程的退出
  e2 = time.time()
  print (u"并行执行时间:", int(e2 - e1))

进程同步(Queue和JoinableQueue)

multiprocessing.Queue类似于queue.Queue,一般用来多个进程间交互信息。Queue是进程和线程安全的。它实现queue.Queue的大部分方法,但task_done()和join()没有实现。

multiprocessing.JoinableQueue是multiprocessing.Queue的子类,增加了task_done()方法和join()方法。

task_done():用来告诉queue一个task完成。一般在调用get()时获得一个task,在task结束后调用task_done()来通知Queue当前task完成。

join():阻塞直到queue中的所有的task都被处理(即task_done方法被调用)。

使用Queue

代码示例一:

#encoding=utf-8

from multiprocessing import Process, Queue  

def offer(queue):  
  "入队列"
  if queue.empty():
    queue.put("Hello World")  
  else:
    print(queue.get())

if __name__ == '__main__':  
  # 创建一个队列实例
  q = Queue()
  p = Process(target = offer, args = (q,))  
  p.start()  
  print(q.get())  # 出队列
  q.put("tester")

  m = Process(target = offer, args = (q,))
  m.start()
  p.join()
  m.join()

代码示例二:

#encoding=utf-8

from multiprocessing import Process, Queue
import os, time, random

#写数据进程执行的代码:

def write(q):
  for value in ['A', 'B', 'C']:
    print('Put %s to queue...' % value)
    q.put(value)
    time.sleep(random.random())

# 读数据进程执行的代码
def read(q):
  time.sleep(1)
  while not q.empty():   #if not q.empty():
    print('Get %s from queue.' % q.get(True))
    time.sleep(1)        #目的是等待写队列完成

if __name__=='__main__':
    # 父进程创建Queue,并传给各个子进程
    q = Queue()
    pw = Process(target = write, args = (q,))
    pr = Process(target = read, args = (q,))
    pw.start()       #启动子进程pw,写入
    pr.start()       #启动子进程pr,读取
    pw.join()        #等待pw结束
    pr.join()        #等待pr结束
    print("Done!")


#由于操作系统对进程的调度时间不一样,所以该程序每次执行的结果均不一样。程序读队列函数中为什么要加一句time.sleep(1),
目的是等待写进程将数据写到队列中,防止有时写进程还没将数据写进队列,读进程就开始读了,导致读不到数据。但是这种并不能有效的预防此种情况的出现。

使用Queue&JoinableQueue

#encoding=utf-8

import multiprocessing
import time

class Consumer(multiprocessing.Process):

    def __init__(self, task_queue, result_queue):
        multiprocessing.Process.__init__(self)
        self.task_queue = task_queue               #任务队列
        self.result_queue = result_queue           #存储结果的结果队列

    def run(self):  
        "重写原进程的run方法"
        proc_name = self.name                      #取到当前进程的名字
        while True:
            next_task = self.task_queue.get()   
            if next_task is None:                  #为None时进程退出
                print(('%s: Exiting' % proc_name))
                self.task_queue.task_done()
                break
            print(('%s: %s' % (proc_name, next_task)))  #打印next_task,自动调用的__str__方法
            answer = next_task()                
            self.task_queue.task_done()                 #JoinableQueue特有的一定要执行,返回
            self.result_queue.put(answer)
        return


class Task(object):

    def __init__(self, a, b):
        self.a = a
        self.b = b

    def __call__(self):
        time.sleep(0.1) # pretend to take some time to do the work
        return '%s * %s = %s' % (self.a, self.b, self.a * self.b)

    def __str__(self):
        return '%s * %s' % (self.a, self.b)

if __name__ == '__main__':
    tasks = multiprocessing.JoinableQueue()   #任务队列
    results = multiprocessing.Queue()         #存结果队列
    num_consumers = multiprocessing.cpu_count()
    print(('Creating %d consumers' % num_consumers))
    # 创建cup核数量数量个的子进程,使用推导列表生成子进程列表
    consumers = [ Consumer(tasks, results) for i in range(num_consumers) ]
    # 依次启动子进程
    #print (consumers)
    for w in consumers:
        w.start()

    num_jobs = 10
    for i in range(num_jobs):
        tasks.put(Task(i, i))

    for i in range(num_consumers):
        tasks.put(None)                #有几个进程加几个None,以此结束死循环

    tasks.join()

    while num_jobs:
        result = results.get()
        print ('Result: %s' %result)
        num_jobs -= 1

名词解释

线程安全:通过加锁的方式避免引起读写操作的冲突,确保数据的一致性(洗手间的例子)

线程不安全:与之相反,会造成数据的混乱

进程同步(加锁)

先看一下未加锁的实例

#未加锁处理,py2环境下运行,会出现打印数据错乱的情况

#encoding=utf-8

from multiprocessing import Process
import time

def l(num):  
  time.sleep(0.2)
  print("Hello Num: %s" % (num))

if __name__ == '__main__':  
    for num in range(50):  
      Process(target = l, args = (num,)).start()

加锁后的实例(Lock)

#不会出现上述错乱的情况

#encoding=utf-8

from multiprocessing import Process, Lock  
import time

def l(num,lock):  
  lock.acquire() # 获得锁
  time.sleep(0.2)
  print("Hello Num: %s" % (num))  
  lock.release() # 释放锁

if __name__ == '__main__':  
    lock = Lock()  # 创建一个共享锁实例
    for num in range(50):  
      Process(target = l, args = (num,lock)).start()

加多把锁(Semaphore)

Semaphore用于控制对共享资源的访问数量。Semaphore锁和Lock稍有不同,Semaphore相当于N把锁,获取其中一把就可以执行。可用锁的总数N在创建实例时传入,比如s = Semaphore(n)。与Lock一样,如果可用锁为0,进程将会阻塞,直到可用锁大于0。

#encoding=utf-8

import multiprocessing
import time

def worker(s, i):
  s.acquire()
  print(multiprocessing.current_process().name + " acquire")
  time.sleep(i)
  print(multiprocessing.current_process().name + " release")
  s.release()

if __name__ == "__main__":
  # 设置限制最多3个进程同时访问共享资源
  s = multiprocessing.Semaphore(3)
  for i in range(5):
    p = multiprocessing.Process(target = worker, args = (s, i * 2))
    p.start()

进程同步(信号传递-Event)

Event提供一种简单的方法,可以在进程间传递状态信息,实现进程间同步通信。事件可以切换设置和未设置状态。通过使用一个可选的超时值,时间对象的用户可以等待其状态从未设置变为设置

#encoding=utf-8

import multiprocessing
import time

def wait_for_event(e):
    print('wait_for_event: starting')
    e.wait()                                #等待收到能执行信号,如果一直未收到将一直阻塞
    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
    print('wait_for_event_timeout: starting')
    e.wait(t)                               #等待t秒超时,此时Event的状态仍未未设置,继续执行
    print('wait_for_event_timeout: e.is_set()->', e.is_set())
    e.set()                                 #初始内部标志为真

if __name__ == '__main__':
    e = multiprocessing.Event()
    print("begin,e.is_set()", e.is_set())
    w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout,args=(e, 2))
    w2.start()
    print('main: waiting before calling Event.set()')
    time.sleep(3)
    #e.set()   #可注释此句话看效果
    print('main: event is set')

代码逻辑:
函数a-wait_for_event:e.wait():死等,等别的进程调用e.set(),才会继续执行
函数b-wait_for_event_timeout:e.wait(t):死等t秒,等别的进程调用e.set()或经过t秒,才会继续执行,继续执行了,就会调用e.set(),实现函数a就可以从死等状态变为继续执行执行状态

主程序:
声明了一个信号对象e
启动了进程a,执行函数a
启动了进程b,执行函数b


支持多个进程,待听到指令后死等的所有进程全部不用等了,比如:田径比赛,所有运动员听到发令枪声,就全都跑了

#encoding=utf-8

import multiprocessing
import time

def wait_for_event(e):
    print('wait_for_event: starting')
    e.wait() 
    print('wait_for_event: e.is_set()->', e.is_set())

def wait_for_event_timeout(e, t):
    print('wait_for_event_timeout: starting')
    e.wait(t)
    print('wait_for_event_timeout: e.is_set()->', e.is_set())
    e.set()

if __name__ == '__main__':
    e = multiprocessing.Event()
    print("begin,e.is_set()", e.is_set())
    w1 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
    w1.start()

    w2 = multiprocessing.Process(name='block', target=wait_for_event, args=(e,))
    w2.start()

    w3 = multiprocessing.Process(name='nonblock', target=wait_for_event_timeout,args=(e, 2))
    w3.start()

    print('main: waiting before calling Event.set()')
    time.sleep(3)
    #e.set()   #可注释此句话看效果
    print('main: event is set')

进程同步(使用管道-Pipe)

Pipe是两个进程间通信的工具。Pipe可以是单向(half-duplex),也可以是双向(duplex)。我们通过mutiprocessing.Pipe(duplex=False)创建单向管道 (默认为双向)。一个进程从PIPE一端输入对象,然后被PIPE另一端的进程接收,单向管道只允许管道一端的进程输入,而双向管道则允许从两端输入。

Pipe的每个端口同时最多一个进程读写,否则会出现各种问题,可能造成corruption异常。

Pipe对象建立的时候,返回一个含有两个元素的元组对象,每个元素代表Pipe的一端(Connection对象)。我们对Pipe的某一端调用send()方法来传送对象,在另一端使用recv()来接收。

#encoding=utf-8

import multiprocessing as mp

def proc_1(pipe):
    pipe.send('hello')
    print('proc_1 received: %s' %pipe.recv())
    pipe.send("what is your name?")
    print('proc_1 received: %s' %pipe.recv())

def proc_2(pipe):
    print('proc_2 received: %s' %pipe.recv())
    pipe.send('hello, too')
    print('proc_2 received: %s' %pipe.recv())
    pipe.send("I don't tell you!")

if __name__ == '__main__':
  # 创建一个管道对象pipe
  pipe = mp.Pipe()
  #print(len(pipe))
  #print(type(pipe))
  #print (pipe)
  # 将第一个pipe对象传给进程1
  p1 = mp.Process(target = proc_1, args = (pipe[0], ))
  # 将第二个pipe对象传给进程2
  p2 = mp.Process(target = proc_2, args = (pipe[1], ))
  p1.start()
  p2.start()
  p1.join()
  p2.join()

代码实例二:将获取的信息写入文件

#encoding=utf-8

import multiprocessing as mp
from multiprocessing import Process, Lock

def write_file(content,lock,file_path="a.txt"):
    lock.acquire()
    with open(file_path,"a") as fp:
        fp.write(content+"\n")
    lock.release()

def proc_1(pipe,lock):
    pipe.send('hello')
    info = pipe.recv()
    print('proc_1 received: %s' %info)
    write_file(info,lock)
    pipe.send("what is your name?")
    info = pipe.recv()
    print('proc_1 received: %s' %info)
    write_file(info,lock)

def proc_2(pipe,lock):
    info = pipe.recv()
    print('proc_2 received: %s' %info)
    write_file(info,lock)
    pipe.send('hello, too')
    info = pipe.recv()
    write_file(info,lock)
    print('proc_2 received: %s' %info)
    pipe.send("I don't tell you!")

if __name__ == '__main__':
  pipe = mp.Pipe()
  lock=Lock()
  # 将第一个pipe对象传给进程1
  p1 = mp.Process(target = proc_1, args = (pipe[0],lock ))
  # 将第二个pipe对象传给进程2
  p2 = mp.Process(target = proc_2, args = (pipe[1],lock ))
  p1.start()
  p2.start()
  p1.join()
  p2.join()

进程同步(使用Condition)

一个condition变量总是与某些类型的锁相联系,当几个condition变量必须共享同一个锁的时候,是很有用的。锁是conditon对象的一部分:没有必要分别跟踪

condition变量服从上下文管理协议:with语句块封闭之前可以获取与锁的联系。 acquire() 和release() 会调用与锁相关联的相应方法

wait()方法会释放锁,当另外一个进程使用notify() or notify_all()唤醒它之前会一直阻塞。一旦被唤醒,wait()会重新获得锁并返回;

wait(timeout=None) :等待通知,或者等到设定的超时时间;

notify()和notify_all()不会释放锁;

#encoding=utf-8

import multiprocessing as mp
import threading
import time

def consumer(cond):
  with cond:
    print("consumer before wait")
    cond.wait()            #等待消费,死等
    #cond.wait(timeout=10)  #等待10秒后还未消费自动结束
    print("consumer after wait")

def producer(cond):
  with cond:
    print("producer before notifyAll")
    cond.notify_all()  #通知所有消费者可以消费了
    #cond.notify()     #只能通知一个消费者,想通知几个人就写几次
    print("producer after notifyAll")

if __name__ == '__main__':    
  condition = mp.Condition()
  p1 = mp.Process(name = "p1", target = consumer, args=(condition,))
  p2 = mp.Process(name = "p2", target = consumer, args=(condition,))
  p3 = mp.Process(name = "p3", target = producer, args=(condition,))

  p1.start()
  time.sleep(2)
  p2.start()
  time.sleep(2)
  p3.start()

多进程间共享数字变量

未使用共享变量

#数据在子进程运行的时候被改变;但是主进程打印的数据仍为原值,互相独立不共享

#encoding=utf-8

from multiprocessing import Process

def f(n, a):
    n = n+1
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = 0
    arr =list(range(10))
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
    print(num)
    print(arr[:])

使用共享变量

#encoding=utf-8

from multiprocessing import Process, Value, Array

def f(n, a):
    n.value = n.value+1
    for i in range(len(a)):
        a[i] = -a[i]

if __name__ == '__main__':
    num = Value('d', 0.0) # 参数"d"(double简写)指定参数类型,创建一个进程间共享的数字类型,默认值为0
    arr = Array('i', range(10)) # 创建一个进程间共享的数组类型,初始值为range[10]
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
    #此时num.value为1.0,arr[:]为[0, -1, -2, -3, -4, -5, -6, -7, -8, -9]
    p = Process(target = f, args = (num, arr))
    p.start()
    p.join()
    print(num.value) # 获取共享变量num的值
    print(arr[:])

使用共享变量、加锁

#encoding=utf-8

import time
from multiprocessing import Process, Value, Lock

class Counter(object):
    def __init__(self, initval = 0):
        self.val = Value('i', initval)
        self.lock = Lock()

    def increment(self):
        “如果此处不对共享变量加锁的话,多个进程同时运行的结果每次都不一样”
        with self.lock:
            self.val.value += 1 # 共享变量自加1
            #print(“increment one time!”,self.value() )  

    def value(self):
        with self.lock:
            return self.val.value


def func(counter):
    for i in range(50):
        time.sleep(0.01)
        counter.increment()

if __name__ == '__main__':
    counter = Counter(0)
    procs = [Process(target = func, args = (counter,)) for i in range(10)]
    # 等价于
    # for i in range(10):
    #     Process(target = func, args = (counter,))

    for p in procs: 
        p.start()

    for p in procs: 
        p.join()

    print(counter.value())

多进程间共享字符串变量

Manager()函数返回一个管理对象,它控制了一个服务端进程,用来保持Python对象,并允许其它进程使用代理来管理这些对象。

#encoding=utf-8

from multiprocessing import Process, Manager, Value
from ctypes import c_char_p

def greet(shareStr):
    shareStr.value = shareStr.value + ", World!"

if __name__ == '__main__':
    manager = Manager()
    shareStr = manager.Value(c_char_p, "Hello")
    processList = [Process(target = greet, args = (shareStr,)) for i in range(3)]
    for process in processList:
        process.start()
        process.join()    

    print(shareStr.value)

多进程间共享字典和列表

#encoding=utf-8

from multiprocessing import Process, Manager

def f(shareDict, shareList):
    shareDict[1] = '1'
    shareDict['2'] = 2
    shareDict[0.25] = None
    shareList.reverse() # 翻转列表

if __name__ == '__main__':
    manager = Manager()
    shareDict = manager.dict()                    # 创建共享的字典类型
    shareList = manager.list(range(10))       # 创建共享的列表类型
    p = Process(target = f, args = (shareDict, shareList))
    p.start()
    p.join()
    print(shareDict)
    print(shareList)

多进程间共享实例

#encoding=utf-8

import time, os
import random
from multiprocessing import Pool, Value, Lock
from multiprocessing.managers import BaseManager

class MyManager(BaseManager):
  pass

def Manager():
    m = MyManager()
    m.start()
    return m

class Counter(object):

    def __init__(self, initval=0):
        self.val = Value('i', initval)
        self.lock = Lock()

    def increment(self):
        with self.lock:
            self.val.value += 1

    def value(self):
        with self.lock:
            return self.val.value

#将Counter类注册到Manager管理类中
MyManager.register('Counter', Counter)

def long_time_task(name,counter):
    time.sleep(0.2)
    print('Run task %s (%s)...\n' % (name, os.getpid()))
    start = time.time()
    #time.sleep(random.random() * 3)
    for i in range(50):
        time.sleep(0.01)
        counter.increment()
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__ == '__main__':
    manager = Manager()
    # 创建共享Counter类实例对象的变量,Counter类的初始值0
    counter = manager.Counter(0)
    print('Parent process %s.' % os.getpid())
    p = Pool()
    for i in range(5):
        p.apply_async(long_time_task, args = (str(i), counter))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
    print(counter.value())

逻辑思路:
    继承BaseManager类,生成MyManager子类
    定义一个Manager函数,对MyManager类进行实例化,并且启动MyManager的进程,返回MyManager类的实例对象
    定义一个Counter类
    MyManager.register('Counter', Counter)  将Counter类注册到Manager管理类中
    定义多进程执行的任务函数,定义的类实例作为参数传入到任务函数中
    在主程序中,定义进程池,for循环启动多个进程,并且每个进程都执行的是之前定义的任务函数
    关闭进程池,等待子进程结束后再打印共享实例中的value值

 

Logo

更多推荐