系统差异
进程池
进程间通信
多线程
线程锁
ThreadLocal

1.系统差异
在Linux和Unix系统上,os模块有fork调用方式来创建子进程,在windows上可用multiprocessing模块来操作进程。
fork方式:

import os

print('Process (%s) start...' % os.getpid())
# Only works on Unix/Linux/Mac:
pid = os.fork()
if pid == 0:
    print('I am child process (%s) and my parent is %s.' % (os.getpid(), os.getppid()))
else:
    print('I (%s) just created a child process (%s).' % (os.getpid(), pid))

windows上的方式:

from multiprocessing import Process

import os


# 子进程要执行的代码
def run_proc(name):
    print('Run child process %s (%s)...' % (name, os.getpid()))


if __name__ == '__main__':
    print('Parent process %s.' % os.getpid())
    p = Process(target=run_proc, args=('test',))
    print('Child process will start.')
    p.start()
    p.join()
    print('Child process end.')
---
Parent process 7504.
Child process will start.
Run child process test (2532)...
Child process end.

创建一个子进程可通过创建一个Process实例,传入一个执行函数和参数,通过start启动,join方法可让进程执行完了再继续往下执行,通常用于进程间同步。

2.进程池pool

用进程池的方式创建子进程:

from multiprocessing import Pool
import os, time, random

def long_time_task(name):
    print('Run task %s (%s)...' % (name, os.getpid()))
    start = time.time()
    time.sleep(random.random() * 3)
    end = time.time()
    print('Task %s runs %0.2f seconds.' % (name, (end - start)))

if __name__=='__main__':
    print('Parent process %s.' % os.getpid())
    p = Pool(4)
    for i in range(5):
        p.apply_async(long_time_task, args=(i,))
    print('Waiting for all subprocesses done...')
    p.close()
    p.join()
    print('All subprocesses done.')
---
Parent process 2836.
Waiting for all subprocesses done...
Run task 0 (5516)...
Run task 1 (1112)...
Run task 2 (6464)...
Run task 3 (7860)...
Task 1 runs 1.87 seconds.
Run task 4 (1112)...
Task 2 runs 2.38 seconds.
Task 0 runs 2.65 seconds.
Task 3 runs 2.57 seconds.
Task 4 runs 2.39 seconds.
All subprocesses done.

pool调用close()后就不能再继续添加新的Process了,调用join方法等待所有子进程执行完,在调用join方法前必须先调用close()方法。

还有个需要注意的地方是task0,1,2,3是立即执行的,task4是等到有一个进程执行完了它才执行,这是因为我的电脑是4核的,所以Pool的默认大小是4,因此最多执行4个进程,这是pool有意设计的限制,而不是操作系统的限制,如果非要把pool(4)改成pool(5),也是可以的,就会同时执行5个进程。

3.进程间通信

from multiprocessing import Process, Queue
import os, time, random


# 写数据进程执行的代码:
def write(q):
    print('Process to write: %s' % os.getpid())
    for value in ['A', 'B', 'C']:
        print('Put %s to queue...' % value)
        q.put(value)
        time.sleep(random.random()*5)


# 读数据进程执行的代码:
def read(q):
    print('Process to read: %s' % os.getpid())
    while True:
        value = q.get(True)
        print('Get %s from queue.' % value)


if __name__ == '__main__':
    # 父进程创建Queue,并传给各个子进程:
    q = Queue()
    pw = Process(target=write, args=(q,))
    pr = Process(target=read, args=(q,))
    # 启动子进程pw,写入:
    pw.start()
    # 启动子进程pr,读取:
    pr.start()
    # 等待pw结束:
    pw.join()
    # pr进程里是死循环,无法等待其结束,只能强行终止:
    pr.terminate()
---
Process to write: 5784
Put A to queue...
Process to read: 8012
Get A from queue.
Put B to queue...
Get B from queue.
Put C to queue...
Get C from queue.

5.多线程
Python使用多线程有两个模块_thread和threading,前者是低级模块,后者是高级模块,对前者做了封装,大多数情况下只需要使用后者;

import threading


def methodRun():
    print('thread %s is running...' % threading.current_thread().name)


print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=methodRun,name='ChildThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)
---
thread MainThread is running...
thread ChildThread is running...
thread MainThread ended.

线程锁
当多个线程同时执行同一段代码操作同一个变量时,有可能因线程不同步造成结果错误,此时就需要同步锁保持线程同步,相当于JAVA里的synchronized:

import threading

lock = threading.Lock()


# lock = threading.Lock()

def methodRun():
    lock.acquire()#获取锁
    try:
        print('thread %s is running...' % threading.current_thread().name)
    finally:
        lock.release()#释放锁


print('thread %s is running...' % threading.current_thread().name)
t = threading.Thread(target=methodRun, name='ChildThread')
t.start()
t.join()
print('thread %s ended.' % threading.current_thread().name)

这段代码实际上不需要加同步锁,只是记录下加锁的方式,在获取锁到释放锁之间的代码只会同时被某一个线程执行,不会多个线程同时执行,当一个线程获取到锁后,其它线程只能等到这个线程释放锁后才能获取锁。

ThreadLocal
在多线程中,每个线程都可能有自己处理的变量,如果这些局部变量在线程里处理时又要传递到别的方法里处理,就得一次次传递,比较麻烦:

import threading


class Student:
    def __init__(self, name):
        self.name = name


def pri(std):
    print(std.name)


def stdthread(str):
    std = Student(str)
    pri(std)


def thread_test():
    t1 = threading.Thread(target=stdthread, args=('Tom',), name='Thread-1')
    t2 = threading.Thread(target=stdthread, args=('Mary',), name='Thread-2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()


thread_test()
---
Tom
Mary

那么就希望把这个变量变成全局的,在pri方法里可以直接调用本线程的那个std,不通过参数传进来:

import threading

dstd = {}  # 全局的Students


class Student:
    def __init__(self, name):
        self.name = name


def pri():
    print(dstd[threading.current_thread()].name)  # 取出对应线程的student对象


def stdthread(str):
    std = Student(str)
    dstd[threading.current_thread()] = std
    pri()


def thread_test():
    t1 = threading.Thread(target=stdthread, args=('Tom',), name='Thread-1')
    t2 = threading.Thread(target=stdthread, args=('Mary',), name='Thread-2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()


thread_test()
---
Tom
Mary

这样虽然不用传递参数了,但是获取变量的方式还是有些奇怪,可以用Python提供的ThreadLocal实现:

import threading

std_local = threading.local()

class Student:
    def __init__(self, name):
        self.name = name


def pri():
    print(std_local.std.name)  # 取出对应线程的student对象


def stdthread(str):
    std = Student(str)
    std_local.std = std
    pri()


def thread_test():
    t1 = threading.Thread(target=stdthread, args=('Tom',), name='Thread-1')
    t2 = threading.Thread(target=stdthread, args=('Mary',), name='Thread-2')
    t1.start()
    t2.start()
    t1.join()
    t2.join()


thread_test()
---
Tom
Mary

每个线程中的std对象是相互独立,互不干扰的。

Logo

更多推荐