Python 并发编程 Process 进程、Thread 线程、阻塞模型、asyncio 协程等基础教程
01、进程的基本介绍进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器。什么是进程?进程的概念主要有两点:第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的
01、Process进程基本介绍
进程(Process)是计算机中的程序关于某数据集合上的一次运行活动,是系统进行资源分配和调度的基本单位,是操作系统结构的基础,进程是线程的容器。
什么是进程?
进程的概念主要有两点:
第一,进程是一个实体。每一个进程都有它自己的地址空间,一般情况下,包括文本区域(text region)、数据区域(data region)和堆栈(stack region)。文本区域存储处理器执行的代码;数据区域存储变量和进程执行期间使用的动态分配的内存;堆栈区域存储着活动过程调用的指令(比如错误信息打印的调用关系)和本地变量。
第二,进程是一个“执行中的程序”。程序是一个没有生命的实体,只有处理器赋予程序生命时,它才能成为一个活动的实体,我们称其为进程。
进程与线程的区别与联系
进程的执行过程是线状的,尽管中间会发生中断或暂停,但该进程所拥有的资源只为该线状执行过程服务。一旦发生进程上下文切换,这些资源都是要被保护起来的。这是进程宏观上的执行过程。
进程又可有单线程进程
与多线程进程
两种。我们知道,进程有一个进程控制块PCB,相关程序段和该程序段对其进行操作的数据结构集这三部分,单线程进程的执行过程在宏观上是线性的,微观上也只有单一的执行过程;而多线程进程在宏观上的执行过程同样为线性的,但微观上却可以有多个执行操作(线程),如不同代码片段以及相关的数据结构集。线程的改变只代表了CPU执行过程的改变,而没有发生进程所拥有的资源变化。
除了CPU之外,计算机内的软硬件资源的分配与线程无关,线程只能共享它所属进程的资源。与进程控制表和 PCB 相似,每个线程也有自己的线程控制表TCB,而这个TCB中所保存的线程状态信息则要比PCB表少得多,这些信息主要是相关指针用堆栈(系统栈和用户栈),寄存器中的状态数据。
进程拥有一个完整的虚拟地址空间,不依赖于线程而独立存在;反之,线程是进程的一部分,没有自己的地址空间,与进程内的其他线程一起共享分配给该进程的所有资源。
进程:计算机分配资源的最小单位,线程:CPU处理器分配的最小单位。
创建进程就是在内存中申请一块内存空间将需要运行的代码丢进去一个进程
对应在内存中就是一块独立的内存空间
多个进程对应在内存中就是多块独立的内存空间
进程与进程之间数据默认情况下是无法直接交互,如果想交互可以借助于第三方工具、模块
阻塞调用是指调用结果返回之前,当前线程会被挂起(如遇到io操作)。函数只有在得到结果之后才会将阻塞的线程激活。有人也许会把阻塞调用和同步调用等同起来,实际上他是不同的。对于同步调用来说,很多时候当前线程还是激活的,只是从逻辑上当前函数没有返回而已。
- 同步调用:apply一个累计1亿次的任务,该调用会一直等待,直到任务返回结果为止,但并未阻塞住(即便是被抢走cpu的执行权限,那也是处于就绪态);
- 阻塞调用:当socket工作在阻塞模式的时候,如果没有数据的情况下调用recv函数,则当前线程就会被挂起,直到有数据为止。
非阻塞和阻塞的概念相对应,指在不能立刻得到结果之前也会立刻返回,同时该函数不会阻塞当前线程。
1)子进程是从父进程中使用Process生成的,对子进程使用os.getppid()可以得到父进程的pid
2)全局变量在多个进程中不能共享,在子进程中修改全局变量,对父进程没有影响
02、mutilprocessing模块
仔细说来,multiprocessing不是一个模块而是python中一个操作、管理进程的包。 之所以叫multi是取自multiple的多功能的意思,在这个包中几乎包含了和进程有关的所有子模块。由于提供的子模块非常多,为了方便大家归类记忆,我将这部分大致分为四个部分:创建进程部分,进程同步部分,进程池部分,进程之间数据共享。
python中的多线程无法利用多核优势,如果想要充分地使用多核CPU的资源(os.cpu_count()查看),在python中大部分情况需要使用多进程。Python提供了multiprocessing。 multiprocessing模块用来开启子进程,并在子进程中执行我们定制的任务(比如函数),该模块与多线程模块threading的编程接口类似。
multiprocessing模块的功能众多:支持子进程、通信和共享数据、执行不同形式的同步,提供了Process、Queue、Pipe、Lock等组件。
需要再次强调的一点是:与线程不同,进程没有任何共享状态,进程修改的数据,改动仅限于该进程内。
multiprocessing.Process模块
process模块是一个创建进程的模块,借助这个模块,就可以完成进程的创建。
Process([group [, target [, name [, args [, kwargs]]]]])
# 由该类实例化得到的对象,表示一个子进程中的任务(尚未启动)
Process类的注意点强调:
- 需要使用关键字的方式来指定参数
- args指定的为传给target函数的位置参数,是一个元组形式,必须有逗号
Process类参数介绍:
- group参数未使用,值始终为None
- target表示调用对象,即子进程要执行的任务
- args表示调用对象的位置参数元组,args=(1,2,‘egon’,)
- kwargs表示调用对象的字典,kwargs={‘name’:‘egon’,‘age’:18}
- name为子进程的名称
Process对象方法介绍:
- p.start():启动进程,并调用该子进程中的p.run()
- p.run():进程启动时运行的方法,正是它去调用target指定的函数,我们自定义类的类中一定要实现该方法
- p.terminate():强制终止进程p,不会进行任何清理操作,如果p创建了子进程,该子进程就成了僵尸进程,使用该方法需要特别小心这种情况。如果p还保存了一个锁那么也将不会被释放,进而导致死锁
- p.is_alive():如果p仍然运行,返回True
- p.join([timeout]):主线程等待p终止(强调:是主线程处于等的状态,而p是处于运行的状态)。timeout是可选的超时时间,需要强调的是,p.join只能join住start开启的进程,而不能join住run开启的进程
Process的对象属性介绍:
-
p.daemon:默认值为False,如果设为True,代表p为后台运行的守护进程,当p的父进程终止时,p也随之终止,并且设定为True后,p不能创建自己的新进程,必须在p.start()之前设置
-
p.name:进程的名称
-
p.pid:进程的pid
-
p.exitcode:进程在运行时为None、如果为–N,表示被信号N结束(了解即可)
-
p.authkey:进程的身份验证键,默认是由os.urandom()随机生成的32字符的字符串。这个键的用途是为涉及网络连接的底层进程间通信提供安全性,这类连接只有在具有相同的身份验证键时才能成功(了解即可)
类继承的方式来使用:开进程的方法
import time
import random
from multiprocessing import Process
class Piao(Process):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
print('%s piaoing' %self.name)
time.sleep(random.randrange(1,5))
print('%s piao end' %self.name)
p=Piao('egon')
p.start() # start会自动调用run
print('主线程')
创建并开启子进程
from multiprocessing import Process
import time
import os
def task(name):
print("{} tasking starting!".format(name))
print("{} current process PID = {}".format(name, os.getpid()))
print("{} parent’s process PID = {}".format(name, os.getppid()))
# 查看当前进程的父进程的PID
time.sleep(5)
print("{} tasking ending!".format(name))
if __name__ == "__main__":
process_1 = Process(target=task, args=("唤醒手腕",))
process_2 = Process(target=task, args=("燕小姐", ))
process_1.start()
process_2.start()
process_1.join() # 主进程等待子进程结束
process_2.join() # 主进程等待子进程结束
print("__main__ processing!")
查看进程是否存活:is_alive()
告诉操作系统帮你去杀死当前进程但是需要一定的时间而代码的运行速度极快。
process_1 = Process(target=task, args=("唤醒手腕",))
print(process_1.is_alive())
process_1.start()
print(process_1.is_alive())
process_1.terminate()
time.sleep(0.1) # 杀死进程需要时间,进行时间延迟,保证进程已经杀死
print(process_1.is_alive())
03、僵尸 / 孤儿 / 守护 进程
孤儿进程:一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
僵尸进程:一个进程使用fork创建子进程,如果子进程退出,而父进程并没有调用wait或waitpid获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中。这种进程称之为僵死进程。
那么什么称为僵尸进程呢?
即子进程先于父进程退出后,子进程的PCB需要其父进程释放,但是父进程并没有释放子进程的PCB,这样的子进程就称为僵尸进程,僵尸进程实际上是一个已经死掉的进程。
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<assert.h>
#include<sys/types.h>
int main()
{
pid_t pid = fork();
if(pid == 0) // 子进程
{
printf("child id is %d \n", getpid());
printf("parent id is %d \n", getppid());
}
else // 父进程不退出,使子进程成为僵尸进程
{
while(1)
{}
}
exit(0);
}
我们将它挂在后台执行,可以看到结果,用ps可以看到子进程后有一个 ,defunct是已死的,僵尸的意思,可以看出这时的子进程已经是一个僵尸进程了。因为子进程已经结束,而其父进程并未释放其PCB,所以产生了这个僵尸进程。
我们也可以用 ps -aux | grep pid 查看进程状态
一个进程在调用exit命令结束自己的生命的时候,其实它并没有真正的被销毁,而是留下一个称为僵尸进程(Zombie)的数据结构(系统调用exit,它的作用是使进程退出,但也仅仅限于将一个正常的进程变成一个僵尸进程,并不能将其完全销毁)。
在Linux进程的状态中,僵尸进程是非常特殊的一种,它已经放弃了几乎所有内存空间,没有任何可执行代码,也不能被调度,仅仅在进程列表中保留一个位置,记载该进程的退出状态等信息供其他进程收集,除此之外,僵尸进程不再占有任何内存空间。
这个僵尸进程需要它的父进程来为它收尸,如果他的父进程没有处理这个僵尸进程的措施,那么它就一直保持僵尸状态,如果这时父进程结束了,那么init进程自动会接手这个子进程,为它收尸,它还是能被清除的。但是如果如果父进程是一个循环,不会结束,那么子进程就会一直保持僵尸状态,这就是为什么系统中有时会有很多的僵尸进程。
试想一下,如果有大量的僵尸进程驻在系统之中,必然消耗大量的系统资源。但是系统资源是有限的,因此当僵尸进程达到一定数目时,系统因缺乏资源而导致奔溃。所以在实际编程中,避免和防范僵尸进程的产生显得尤为重要。
孤儿进程
一个父进程退出,而它的一个或多个子进程还在运行,那么那些子进程将成为孤儿进程。孤儿进程将被init进程(进程号为1)所收养,并由init进程对它们完成状态收集工作。
子进程死亡需要父进程来处理,那么意味着正常的进程应该是子进程先于父进程死亡。当父进程先于子进程死亡时,子进程死亡时没父进程处理,这个死亡的子进程就是孤儿进程。
但孤儿进程与僵尸进程不同的是,由于父进程已经死亡,系统会帮助父进程回收处理孤儿进程。所以孤儿进程实际上是不占用资源的,因为它终究是被系统回收了。不会像僵尸进程那样占用PID,损害运行系统。
#include<stdio.h>
#include<stdlib.h>
#include<unistd.h>
#include<string.h>
#include<assert.h>
#include<sys/types.h>
int main()
{
pid_t pid = fork();
if(pid == 0)
{
printf("child ppid is %d \n", getppid());
sleep(10);
// 为了让父进程先结束
printf("child ppid is %d \n", getppid());
}
else
{
printf("parent id is %d \n", getpid());
}
exit(0);
}
从执行结果来看,此时由pid == 4168
父进程创建的子进程,其输出的父进程pid == 1
,说明当其为孤儿进程时被init进程回收,最终并不会占用资源,这就是为什么要将孤儿进程分配给init进程。
孤儿进程、僵尸进程产生的原因:
(1) 一般进程
正常情况下:子进程由父进程创建,子进程再创建新的进程。父子进程是一个异步过程,父进程永远无法预测子进程的结束,所以,当子进程结束后,它的父进程会调用wait()或waitpid()取得子进程的终止状态,回收掉子进程的资源。
(2)孤儿进程
孤儿进程:父进程结束了,而它的一个或多个子进程还在运行,那么这些子进程就成为孤儿进程(father died)。子进程的资源由init进程(进程号PID = 1)回收。
(3)僵尸进程
僵尸进程:子进程退出了,但是父进程没有用wait或waitpid去获取子进程的状态信息,那么子进程的进程描述符仍然保存在系统中,这种进程称为僵死进程。
守护进程的介绍:
主进程创建守护进程
其一:守护进程会在主进程代码执行结束后就终止
其二:守护进程内无法再开启子进程,否则抛出异常:AssertionError: daemonic processes are not allowed to have children
注意:进程之间是互相独立的,主进程代码运行结束,守护进程随即终止
from multiprocessing import Process
import time
import random
class Piao(Process):
def __init__(self,name):
self.name=name
super().__init__()
def run(self):
print('%s is piaoing' %self.name)
time.sleep(random.randrange(1,3))
print('%s is piao end' %self.name)
p=Piao('egon')
p.daemon=True
# 一定要在p.start()前设置,设置p为守护进程,禁止p创建子进程,并且父进程代码执行结束,p即终止运行
p.start()
print('主')
04、进程间变量的共享
import multiprocessing
import time
# 定义一个全局变量num_list,里面有一个元素100
num_list = [100]
# 写入数据
def write_data(num_list):
for i in range(5):
num_list.append(i)
time.sleep(0.1)
print("write_data:", num_list) # 读取写入数据后的num_list
# 读取数据
def read_data(num_list):
print("read_data:", num_list) # 读取全局变量num_list
if __name__ == '__main__':
# 创建写入数据的子进程
write_process = multiprocessing.Process(target=write_data, args=(num_list,))
# 创建读取数据的子进程
read_process = multiprocessing.Process(target=read_data, args=(num_list,))
# 开启写入子进程
write_process.start()
# 等待写入子进程执行完毕再继续
write_process.join()
# 开启读取数据子进程
read_process.start()
运行结果:结果(证明进程间的变量是不共享的)
注:创建子进程其实是对主进程进行拷贝,进程之间相互独立,访问的全局变量不是同一个,所以进程间不共享全局变量。
主进程会等待所有的子进程执行完成程序再退出。
进程之间共享数据(数值型):
import multiprocessing
def func(num):
num.value = 10.78
# 子进程改变数值的值,主进程跟着改变
if __name__=="__main__":
num = multiprocessing.Value("d",10.0)
# d表示数值,主进程与子进程共享这个value
# 主进程与子进程都是用的同一个value
print(num.value)
p = multiprocessing.Process(target=func,args=(num,))
p.start()
p.join()
print(num.value)
进程之间共享数据(dict,list):
import multiprocessing
def func(mydict,mylist):
mydict["index1"]="aaaaaa" #子进程改变dict,主进程跟着改变
mydict["index2"]="bbbbbb"
mylist.append(11) #子进程改变List,主进程跟着改变
mylist.append(22)
mylist.append(33)
if __name__=="__main__":
with multiprocessing.Manager() as MG: #重命名
mydict=multiprocessing.Manager().dict() #主进程与子进程共享这个字典
mylist=multiprocessing.Manager().list(range(5)) #主进程与子进程共享这个List
p=multiprocessing.Process(target=func,args=(mydict,mylist))
p.start()
p.join()
print(mylist)
print(mydict)
05、socket并发通信原理
server端
from socket import *
from multiprocessing import Process
server=socket(AF_INET,SOCK_STREAM)
server.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server.bind(('127.0.0.1',8080))
server.listen(5)
def talk(conn,client_addr):
while True:
try:
msg=conn.recv(1024)
if not msg:break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__': #windows下start进程一定要写到这下面
while True:
conn,client_addr=server.accept()
p=Process(target=talk,args=(conn,client_addr))
p.start()
client端
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
以上解决方案的隐患
每来一个客户端,都在服务端开启一个进程,如果并发来一个万个客户端,要开启一万个进程吗,你自己尝试着在你自己的机器上开启一万个,10万个进程试一试。
解决方法:进程池
# Pool内的进程数默认是cpu核数,假设为4(查看方法os.cpu_count())
# 开启6个客户端,会发现2个客户端处于等待状态
# 在每个进程内查看pid,会发现pid使用为4个,即多个客户端公用4个进程
from socket import *
from multiprocessing import Pool
import os
server = socket(AF_INET, SOCK_STREAM)
server.setsockopt(SOL_SOCKET, SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8080))
server.listen(5)
def talk(conn, client_addr):
print('进程pid: %s' % os.getpid())
while True:
try:
msg = conn.recv(1024)
if not msg: break
conn.send(msg.upper())
except Exception:
break
if __name__ == '__main__':
p = Pool()
while True:
conn, client_addr = server.accept()
p.apply_async(talk, args=(conn, client_addr))
# p.apply(talk,args=(conn,client_addr))
# 同步的话,则同一时间只有一个客户端能访问
回掉函数:
需要回调函数的场景:进程池中任何一个任务一旦处理完了,就立即告知主进程:我好了额,你可以处理我的结果了。主进程则调用一个函数去处理该结果,该函数即回调函数
我们可以把耗时间(阻塞)的任务放到进程池中,然后指定回调函数(主进程负责执行),这样主进程在执行回调函数时就省去了I/O的过程,直接拿到的是任务的结果。
06、Queue队列库的介绍
进程彼此之间互相隔离,要实现进程间通信(IPC),multiprocessing模块支持两种形式:队列和管道,这两种方式都是使用消息传递的
创建队列的类(底层就是以管道和锁定的方式实现):
Queue([maxsize]):
# 创建共享的进程队列,Queue是多进程安全的队列,
# 可以使用Queue实现多进程之间的数据传递。
# maxsize是队列中允许最大项数,省略则无大小限制。
Queue对象 主要方法:
q.put
# 该方法用以插入数据到队列中,put方法还有两个可选参数:blocked和timeout。
# 如果blocked为True(默认值),并且timeout为正值,该方法会阻塞timeout指定的时间,直到该队列有剩余的空间。
# 如果超时,会抛出Queue.Full异常。如果blocked为False,但该Queue已满,会立即抛出Queue.Full异常。
q.get
# 方法可以从队列读取并且删除一个元素。同样,get方法有两个可选参数:blocked和timeout。
# 如果blocked为True(默认值),并且timeout为正值,那么在等待时间内没有取到任何元素,会抛出Queue.Empty异常。
# 如果blocked为False,有两种情况存在,如果Queue有一个值可用,则立即返回该值,否则,如果队列为空,则立即抛出Queue.Empty异常.
q.get_nowait() # 同q.get(False)
q.put_nowait() # 同q.put(False)
q.empty()
# 调用此方法时q为空则返回True,该结果不可靠,比如在返回True的过程中,如果队列中又加入了项目。
q.full()
# 调用此方法时q已满则返回True,该结果不可靠,比如在返回True的过程中,如果队列中的项目被取走。
q.qsize()
# 返回队列中目前项目的正确数量,结果也不可靠,理由同q.empty()和q.full()一样
Queue对象 其他方法(了解):
q.cancel_join_thread():不会在进程退出时自动连接后台线程。可以防止join_thread()方法阻塞
q.close():关闭队列,防止队列中加入更多数据。调用此方法,后台线程将继续写入那些已经入队列但尚未写入的数据,但将在此方法完成时马上关闭。如果q被垃圾收集,将调用此方法。关闭队列不会在队列使用者中产生任何类型的数据结束信号或异常。例如,如果某个使用者正在被阻塞在get()操作上,关闭生产者中的队列不会导致get()方法返回错误。
q.join_thread():连接队列的后台线程。此方法用于在调用q.close()方法之后,等待所有队列项被消耗。默认情况下,此方法由不是q的原始创建者的所有进程调用。调用q.cancel_join_thread方法可以禁止这种行为
07、进程池的使用介绍
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。多进程是实现并发的手段之一,需要注意的问题是:
- 很明显需要并发执行的任务通常要远大于CPU核数
- 一个操作系统不可能无限开启进程,通常有几个核就开几个进程
- 进程开启过多,效率反而会下降(开启进程是需要占用系统资源的,而且开启多余核数目的进程也无法做到并行)
在利用Python进行系统管理的时候,特别是同时操作多个文件目录,或者远程控制多台主机,并行操作可以节约大量的时间。当被操作对象数目不大时,可以直接利用multiprocessing中的Process动态成生多个进程,十几个还好,但如果是上百个,上千个目标,手动的去限制进程数量却又太过繁琐,此时可以发挥进程池的功效。
Pool可以提供指定数量的进程供用户调用,当有新的请求提交到pool中时,如果池还没有满,那么就会创建一个新的进程用来执行该请求;但如果池中的进程数已经达到规定最大值,那么该请求就会等待,直到池中有进程结束,才会创建新的进程来它。
创建进程池的类:如果指定numprocess为3,则进程池会从无到有创建三个进程,然后自始至终使用这三个进程去执行所有任务,不会开启其他进程。
Pool([numprocess [,initializer [, initargs]]]):创建进程池
- numprocess:要创建的进程数,如果省略,将默认使用cpu_count()的值
- initializer:是每个工作进程启动时要执行的可调用对象,默认为None
- initargs:是要传给initializer的参数组
p.apply(func [, args [, kwargs]])
# 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。需要强调的是:此操作并不会在所有池工作进程中并执行func函数。
# 如果要通过不同参数并发地执行func函数,必须从不同线程调用p.apply()函数或者使用p.apply_async()
p.apply_async(func [, args [, kwargs]])
# 在一个池工作进程中执行func(*args,**kwargs),然后返回结果。此方法的结果是AsyncResult类的实例,callback是可调用对象,接收输入参数。
# 当func的结果变为可用时,将理解传递给callback。callback禁止执行任何阻塞操作,否则将接收其他异步操作中的结果。
p.close()
# 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
P.join()
# 等待所有工作进程退出。此方法只能在close()或teminate()之后调用
方法apply_async()
和map_async()
的返回值是AsyncResul的实例obj,实例具有以下方法:
obj.get()
# 返回结果,如果有必要则等待结果到达。timeout是可选的。如果在指定时间内还没有到达,将引发一场。如果远程操作中引发了异常,它将在调用此方法时再次被引发。
obj.ready()
# 如果调用完成,返回True
obj.successful()
# 如果调用完成且没有引发异常,返回True,如果在结果就绪之前调用此方法,引发异常
obj.wait([timeout])
# 等待结果变为可用。
obj.terminate()
# 立即终止所有工作进程,同时不执行任何清理或结束任何挂起工作。如果p被垃圾回收,将自动调用此函数
同步调用apply
同步调用,直到本次任务执行完毕拿到res,等待任务work执行的过程中可能有阻塞也可能没有阻塞,但不管该任务是否存在阻塞,同步调用都会在原地等着,只是等的过程中若是任务发生了阻塞就会被夺走cpu的执行权限。
from multiprocessing import Pool
import os, time
def work(n):
print('%s run' % os.getpid())
time.sleep(3)
return n ** 2
if __name__ == '__main__':
p = Pool(3) # 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l = []
for i in range(10):
res = p.apply(work, args=(i,))
res_l.append(res)
print(res_l)
异步调用apply_async
异步apply_async用法:如果使用异步提交的任务,主进程需要使用jion,等待进程池内任务都处理完,然后可以用get收集结果,否则,主进程结束,进程池可能还没来得及执行,也就跟着一起结束了。
from multiprocessing import Pool
import os, time
def work(n):
print('%s run' % os.getpid())
time.sleep(3)
return n ** 2
if __name__ == '__main__':
p = Pool(3)
# 进程池中从无到有创建三个进程,以后一直是这三个进程在执行任务
res_l = []
for i in range(10):
res = p.apply_async(work, args=(i,))
# 同步运行,阻塞、直到本次任务执行完毕拿到res
res_l.append(res)
p.close()
p.join()
for res in res_l:
print(res.get())
# 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
详细分析原理:
# 一:使用进程池(异步调用,apply_async)
# coding: utf-8
from multiprocessing import Process, Pool
import time
def func(msg):
print("msg:", msg)
time.sleep(1)
return msg
if __name__ == "__main__":
pool = Pool(processes=3)
res_l = []
for i in range(10):
msg = "hello %d" % (i)
res = pool.apply_async(func, (msg,))
# 维持执行的进程总数为processes,当一个进程执行完毕后会添加新的进程进去
res_l.append(res)
# 没有后面的join,或get,则程序整体结束,进程池中的任务还没来得及全部执行完也都跟着主进程一起结束了
pool.close()
# 关闭进程池,防止进一步操作。如果所有操作持续挂起,它们将在工作进程终止前完成
pool.join()
# 调用join之前,先调用close函数,否则会出错。执行完close后不会有新的进程加入到pool,join函数等待所有子进程结束
print(res_l)
# 看到的是<multiprocessing.pool.ApplyResult object at 0x10357c4e0>
# 对象组成的列表,而非最终的结果,但这一步是在join后执行的,证明结果已经计算完毕,剩下的事情就是调用每个对象下的get方法去获取结果
for i in res_l:
print(i.get())
# 使用get来获取apply_aync的结果,如果是apply,则没有get方法,因为apply是同步执行,立刻获取结果,也根本无需get
Process产生的子进程,默认主进程等待所有子进程执行完毕之后在终止。
Pool进程池,只要主进程跑完了,立刻终止所有程序。
08、死锁 / 递归锁 / 互斥锁
from threading import Thread,Lock
import time
noodle_lock = Lock()
kuaizi_lock = Lock()
def eat1(name):
noodle_lock.acquire()
print("%s拿到面条" % (name))
kuaizi_lock.acquire()
print("%s拿到筷子" % (name))
print("开始吃")
time.sleep(0.7)
kuaizi_lock.release()
print("%s放下筷子" % (name))
noodle_lock.release()
print("%s放下面条" % (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s拿到筷子" % (name))
noodle_lock.acquire()
print("%s拿到面条" % (name))
print("开始吃")
time.sleep(0.7)
noodle_lock.release()
print("%s放下面条" % (name))
kuaizi_lock.release()
print("%s放下筷子" % (name))
if __name__ == "__main__":
name_list1 = ["马具强","熊卫华"]
name_list2 = ["黄熊大","黄将用"]
for name in name_list1:
Thread(target=eat1,args=(name,)).start()
for name in name_list2:
Thread(target=eat2,args=(name,)).start()
运行可以发现,以上的代码,发生了死锁的情况。
递归锁
递归锁专门用来解决死锁现象,临时用于快速解决服务器崩溃异常现象,用递归锁应急,解决应急问题的。
到底什么是递归锁,为什么会存在?
递归锁原理
原理其实很简单的:就是递归锁,每开一把门,在字典里面存一份数据,退出的时候去到door1或者door2里面找到这个钥匙退出就OK了。
递归锁用于多重锁的情况,如果只是一层锁,我们不用。在实际情况下,递归锁场景用的不是特别多,所以知道就行了。
from threading import Thread,RLock
rlock = RLock()
def func(name):
rlock.acquire()
print(name,1)
rlock.acquire()
print(name,2)
rlock.acquire()
print(name,3)
rlock.release()
rlock.release()
rlock.release()
lst = []
for i in range(10):
t1 = Thread(target=func,args=("name%s" % (i) , ))
t1.start()
lst.append(t1)
for i in lst:
i.join()
print("程序结束了")
用递归锁应急解决死锁现象
noodle_lock = kuaizi_lock = RLock()
def eat1(name):
noodle_lock.acquire()
print("%s拿到面条" % (name))
kuaizi_lock.acquire()
print("%s拿到筷子" % (name))
print("开始吃")
time.sleep(0.7)
kuaizi_lock.release()
print("%s放下筷子" % (name))
noodle_lock.release()
print("%s放下面条" % (name))
def eat2(name):
kuaizi_lock.acquire()
print("%s拿到筷子" % (name))
noodle_lock.acquire()
print("%s拿到面条" % (name))
print("开始吃")
time.sleep(0.7)
noodle_lock.release()
print("%s放下面条" % (name))
kuaizi_lock.release()
print("%s放下筷子" % (name))
if __name__ == "__main__":
name_list1 = ["马具强","熊卫华"]
name_list2 = ["黄熊大","黄将用"]
for name in name_list1:
Thread(target=eat1,args=(name,)).start()
for name in name_list2:
Thread(target=eat2,args=(name,)).start()
互斥锁
从语法上来看,锁是可以互相嵌套的,但是不要使用。上一次锁,就对应解开一把锁,形成互斥锁。吃面条和拿筷子是同时的,上一次锁就够了,不要分别上锁。尽量不要形成锁的嵌套,容易死锁。
mylock = Lock()
def eat1(name):
mylock.acquire()
print("%s拿到面条" % (name))
print("%s拿到筷子" % (name))
print("开始吃")
time.sleep(0.7)
print("%s放下筷子" % (name))
print("%s放下面条" % (name))
mylock.release()
def eat2(name):
mylock.acquire()
print("%s拿到筷子" % (name))
print("%s拿到面条" % (name))
print("开始吃")
time.sleep(0.7)
print("%s放下面条" % (name))
print("%s放下筷子" % (name))
mylock.release()
if __name__ == "__main__":
name_list1 = ["马具强","熊卫华"]
name_list2 = ["黄熊大","黄将用"]
for name in name_list1:
Thread(target=eat1,args=(name,)).start()
for name in name_list2:
Thread(target=eat2,args=(name,)).start()
09、threading线程模块
multiprocess模块的完全模仿了threading模块的接口,二者在使用层面,有很大的相似性。
这个是模块在较低级的模块 _thread 基础上建立较高级的线程接口,大多数情况我们使用threading就够用了。多线程的应用场景是进行多个任务处理时。由于线程是操作系统直接支持的执行单元,我们可以通过建立多个线程来实现多个任务的处理,使它们同步进行(宏观看起来是这样的,实际上是各个线程交替工作)。
创建线程的两种方式:
方式一:
from threading import Thread
import time
def sayhi(name):
time.sleep(2)
print('%s say hello' % name)
if __name__ == '__main__':
t = Thread(target=sayhi, args=('egon',))
t.start()
print('主线程')
方式二:
from threading import Thread
import time
class SayHello(Thread):
def __init__(self, name):
super().__init__()
self.name = name
def run(self):
time.sleep(2)
print('%s say hello' % self.name)
if __name__ == '__main__':
t = SayHello('唤醒手腕')
t.start()
print('主线程')
有关threading的参数介绍
-
threading.active_count() 返回当前存活着的Tread对象个数
-
threading.current_thread() 返回当前正在运行的线程的Tread对象
-
threading.enumerate() 返回一个列表,列表里面是还存活的Tread对象
-
threading.Thread(group=None, target=None, name=None, args=(), kwargs={}, *, daemon=None)创建线程,直接使用Tread类这是一种方法,另一种方法思新建一个类然后继承threading.Thread
group 应该为 None;为了日后扩展 ThreadGroup 类实现而保留。
target 是用于run()方法调用的可调用对象。默认是 None,表示不需要调用任何方法。
name 是线程名称。默认情况下,由 “Thread-N” 格式构成一个唯一的名称,其中 N 是小的十进制数。
args 是用于调用目标函数的参数元组。默认是 ()。
kwargs 是用于调用目标函数的关键字参数字典。默认是 {}。 如果不是 None,daemon 参数将显式地设置该线程是否为守护模式。 如果是None (默认值),线程将继承当前线程的守护模式属性。
-
Thread类的start()方法用来开始一个线程。
-
hread类的join(timeout=None)方法会让开启线程的线程(一般指主线程)等待,阻塞这个线程,直到这个线程运行结束才结束等待。timeout的参数值为浮点数,用于设置操作超时的时间。
-
threading.Lock 锁对象,可以通过它来创建锁被创建时为非锁定状态,原始锁有两种状态锁定和非锁定。
-
Lock对象acquire(blocking=True, timeout=-1)方法,获得锁。
当锁的状态为非锁定时, acquire() 将锁状态改为锁定并立即返回(即执行下面的程序)。
当状态是锁定时, acquire() 将阻塞(将发起获得锁的线程挂起直到锁被释放获得锁),当其他线程调用 release() 将锁改为非锁定状态后(即锁被释放后), 被挂起线程的acquire() 将获得锁且重置其为锁定状态并返回(与1一致)。
blocking 参数为bool值(默认True),可以阻塞或非阻塞地获得锁(即无法获得锁时是否阻塞线程)
timeout 参数为浮点数(默认-1),当无法获得锁时,timeout为正决定阻塞的时间,为负数时为无限等待。blocking为False时timeout无作用(不阻塞当然涉及不到阻塞的时间)
-
Lock对象release()方法,释放锁。
当锁被锁定,将它重置为未锁定,并返回。如果其他线程正在等待这个锁解锁而被阻塞,只允许其中一个允许。
在未锁定的锁调用时,会引发 RuntimeError 异常。
-
Lock对象的locked()方法,用来判断是否获得了锁。
锁,一般用在两个线程同时使用一个公共变量的情况下。为了防止两个线程同时修改变量导致的混乱。
线程抢的是GIL锁,GIL锁相当于执行权限,拿到执行权限后才能拿到互斥锁Lock,其他线程也可以抢到GIL,但如果发现Lock仍然没有被释放则阻塞,即便是拿到执行权限GIL也要立刻交出来。
join是等待所有,即整体串行,而锁只是锁住修改共享数据的部分,即部分串行,要想保证数据安全的根本原理在于让并发变成串行,join与互斥锁都可以实现,毫无疑问,互斥锁的部分串行效率要更高。
开启新的子进程必须在__main__
进程下创建,否则会出现以下的报错内容。
RuntimeError:
An attempt has been made to start a new process before the
current process has finished its bootstrapping phase.
import time
from threading import Thread
from multiprocessing import Process
# 开启线程不需要在main下面 执行代码直接书写就可以
# 但是我们还是习惯性的将启动命令写在main下面
def task(name):
print("%s is running" % name)
time.sleep(1)
print("%s is over" % name)
# t = Thread(target=task, args=('Wrist',))
p = Process(target=task, args=('Wrist',))
p.start()
print("__main__ is running")
为什么开启子进程 一定要放在 if __name__ == '__main__'
下面
windows 平台有特殊要求Since Windows has no fork, the multiprocessing module starts a new Python process and imports the calling module. If Process() gets called upon import, then this sets off an infinite succession of new processes (or until your machine runs out of resources). This is the reason for hiding calls to Process() inside
平台有特殊要求Since窗口没有分叉,多处理模块启动一个新的Python进程并导入调用模块。如果在导入时调用进程(),那么这将引发无限连续的新进程(或者直到您的机器耗尽资源耗尽)。
探究import导入模块时候,会开辟线程,还是子进程?
# import_file.py
import os
print("son process is {}".format(os.getpid()))
print("son father process is {}".format(os.getppid()))
# main_test.py
import import_file
import os
print("main process is {}".format(os.getpid()))
运行结果如下:PID是一样的,说明没有开辟子进程
son process is 8752
son father process is 14080
main process is 8752
# import_file.py
import os
import threading
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
# main_test.py
import threading
import import_file
import os
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
if __name__ == '__main__':
length = len(threading.enumerate())
print('当前运行的线程数为:%d' % length)
运行结果如下:当前运行的线程数都是1,说明也没有开辟新的线程。
当前运行的线程数为:1
当前运行的线程数为:1
当前运行的线程数为:1
10、父线程与子线程介绍
在线程的生命周期中,从创建到执行以及最终终止,线程通常处于四种状态之一:开始态、可调度状态、阻塞态和终止态。
父线程和子线程
当一个新的进程或程序开始运行时,它将以一个线程开始,这个线程被称为主线程。然后主线程可以启动或生成其他线程,这被称为子线程,它们同样是进程的一部分,但独立执行其他任务。如果需要,这些线程还可以生成自己的子线程,当每个线程完成执行时,将通知它们的父线程,最后主线程终止整个任务。父线程和子线程关系如下图所示:
线程的四种基本状态
不同的编程语言可能会使用不同的名称,并且还有一些额外的名称,但通常在线程的生命周期中,从创建到执行以及最终终止,线程通常处于四种状态:开始态、可调度态、阻塞态和终止态。
- 开始态:主线程需要产生或创建另一个线程来辅助完成整个任务,子线程将以新状态开始,Python语言要求在创建线程后显式启动它。
- 可调度态:处于可运行状态,这意味着操作系统可以安排资源调度执行。通过上下文与其他线程交换,以便在处理器上运行。
- 阻塞态:当线程需要等待事件发生时,如外部输入或计时器,或者调用子线程的join()方法进入阻塞状态,当进入阻塞态时,线程不会使用任何CPU资源。
- 终止态:线程在完成执行或异常中止时进入终止状态。
线程的状态转换如下图所示:
在Python语言中Python线程可以从这里开始与主线程对GIL的竞争,在t_bootstrap中,申请完了GIL,也就是说子线程也就获得了GIL,使其始终保存着活动线程的状态对象。
我们知道,在操作系统从进程A切换到进程B时,首先会保存进程A的上下文环境,再进行切换;当从进程B切换回进程A时,又会恢复进程A的上下文环境,这样就保证了进程A始终是在属于自己的上下文环境中运行。
这里的线程状态对象就等同于进程的上下文,Python同样会有一套存储、恢复线程状态对象的机制。同时,在Python内部,维护着一个全局变量:PyThreadState * _PyThread- State_Current
当前活动线程所对应的线程状态对象就保存在这个变量里,当Python调度线程时,会将被激活的线程所对应的线程状态对象赋给_PyThreadState_Current
,使其始终保存着活动线程的状态对象。
这就引出了这样的一个问题:Python如何在调度进程时,获得被激活线程对应的状态对象?Python内部会通过一个单向链表来管理所有的Python线程的状态对象,当需要寻找一个线程对应的状态对象时。
主线程结束,子线程是否进行执行?
import threading
import time
def t_task():
print("son thread start running!")
time.sleep(3)
print("son thread end")
if __name__ == "__main__":
t = threading.Thread(target=t_task)
t.start()
print("main_thread end")
可见父线程结束后,子线程仍在运行,此时结束进程,子线程才会被终止
主线程结束后进程不等待守护线程完成,立即结束
当设置一个线程为守护线程时,此线程所属进程不会等待此线程运行结束,进程将立即结束。
11、Process / Tread 区别
解释下运行的周期存活问题:
Process进程
:
Process非进程池:父进程
等待所有非守护进程
结束后,然后再结束。
Process进程池Pool:父进程
结束,所有非守护进程
也会自动结束,更Pool底层的实现有关。
Thread线程
:
Thread非线程池:主线程
等待所有非守护线程
结束后,然后再结束。
进程Process和线程threading区别
1.一个线程只能属于一个进程,而一个进程可以有多个线程,但至少有一个线程(线程是计算机的最小单位);
2.资源分配给进程,同一进程的所有线程共享该进程的所有资源,进程与进程之间资源相互独立,互不影响(类似深拷贝);
3.多进程模式最大的优点就是稳定性高,因为一个子进程崩溃了,不会影响主进程和其他子进程,多进程模式的缺点是在Windows下创建进程开销巨大。另外,操作系统能同时运行的进程数也是有限的,在内存和CPU的限制下,如果有几千个进程同时运行,操作系统连调度都会成问题(进程的创建比线程的创建更加占用计算机资源);
4.多线程模式致命的缺点就是任何一个线程挂掉都可能直接造成整个进程崩溃,因为所有线程共享进程的内存;
5.由于GIL锁的缘故,python 中线程实际上是并发运行(即便有多个cpu,线程会在其中一个cpu来回切换,只占用一个cpu资源),而进程才是真正的并行(同时执行多个任务,占用多个cpu资源)
12、全局解释器锁 (GIL)
首先需要明确的一点是GIL并不是Python的特性,它是在实现Python解析器(CPython)时所引入的一个概念。就好比C++是一套语言(语法)标准,但是可以用不同的编译器来编译成可执行代码。有名的编译器例如GCC,INTEL C++,Visual C++等。Python也一样,同样一段代码可以通过CPython,PyPy,Psyco等不同的Python执行环境来执行。像其中的JPython就没有GIL。然而因为CPython是大部分环境下默认的Python执行环境。所以在很多人的概念里CPython就是Python,也就想当然的把GIL归结为Python语言的缺陷。所以这里要先明确一点:GIL并不是Python的特性,Python完全可以不依赖于GIL
为什么会有GIL?
由于物理上得限制,各CPU厂商在核心频率上的比赛已经被多核所取代。为了更有效的利用多核处理器的性能,就出现了多线程的编程方式,而随之带来的就是线程间数据一致性和状态同步的困难。即使在CPU内部的Cache也不例外,为了有效解决多份缓存之间的数据同步时各厂商花费了不少心思,也不可避免的带来了一定的性能损失。
Python当然也逃不开,为了利用多核,Python开始支持多线程。而解决多线程之间数据完整性和状态同步的最简单方法自然就是加锁。 于是有了GIL这把超级大锁,而当越来越多的代码库开发者接受了这种设定后,他们开始大量依赖这种特性(即默认python内部对象是thread-safe的,无需在实现时考虑额外的内存锁和同步操作)。
慢慢的这种实现方式被发现是蛋疼且低效的。但当大家试图去拆分和去除GIL的时候,发现大量库代码开发者已经重度依赖GIL而非常难以去除了。有多难?做个类比,像MySQL这样的“小项目”为了把Buffer Pool Mutex这把大锁拆分成各个小锁也花了从5.5到5.6再到5.7多个大版为期近5年的时间,本且仍在继续。MySQL这个背后有公司支持且有固定开发团队的产品走的如此艰难,那又更何况Python这样核心开发和代码贡献者高度社区化的团队呢?
所以简单的说GIL的存在更多的是历史原因。如果推到重来,多线程的问题依然还是要面对,但是至少会比目前GIL这种方式会更优雅。
GIL的影响
从上文的介绍和官方的定义来看,GIL无疑就是一把全局排他锁。毫无疑问全局锁的存在会对多线程的效率有不小影响。甚至就几乎等于Python是个单线程的程序。
那么读者就会说了,全局锁只要释放的勤快效率也不会差啊。只要在进行耗时的IO操作的时候,能释放GIL,这样也还是可以提升运行效率的嘛。或者说再差也不会比单线程的效率差吧。理论上是这样,而实际上呢?Python比你想的更糟。
下面我们就对比下Python在多线程和单线程下得效率对比。测试方法很简单,一个循环1亿次的计数器函数。一个通过单线程执行两次,一个多线程执行。最后比较执行总时间。测试环境为双核的Mac pro。
注:为了减少线程库本身性能损耗对测试结果带来的影响,这里单线程的代码同样使用了线程。只是顺序的执行两次,模拟单线程。
顺序执行的单线程(single_thread.py)
#! /usr/bin/python
from threading import Thread
import time
def my_counter():
i = 0
for _ in range(100000000):
i = i + 1
return True
def main():
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
t.join()
end_time = time.time()
print("Total time: {}".format(end_time - start_time))
if __name__ == '__main__':
main()
同时执行的两个并发线程(multi_thread.py)
#! /usr/bin/python
from threading import Thread
import time
def my_counter():
i = 0
for _ in range(100000000):
i = i + 1
return True
def main():
thread_array = {}
start_time = time.time()
for tid in range(2):
t = Thread(target=my_counter)
t.start()
thread_array[tid] = t
for i in range(2):
thread_array[i].join()
end_time = time.time()
print("Total time: {}".format(end_time - start_time))
if __name__ == "__main__":
main()
可以看到python在多线程的情况下居然比单线程整整慢了45%。按照之前的分析,即使是有GIL全局锁的存在,串行化的多线程也应该和单线程有一样的效率才对。那么怎么会有这么糟糕的结果呢?
在
intel i7
的4CPU-8核
处理器的win10系统
测试,多线程的情况比单线程整整快了45%
计算密集型:多进程效率高
from multiprocessing import Process
from threading import Thread
import os, time
def work():
res = 0
for i in range(100000000):
res *= i
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 本机为4核
start = time.time()
for i in range(4):
p = Process(target=work)
# 耗时 15.626161098480225
p = Thread(target=work)
# 耗时 40.876734256744385
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print('run time is %s' % (stop - start))
IO密集型:多线程效率高
from multiprocessing import Process
from threading import Thread
import threading
import os, time
def work():
time.sleep(2)
print('haha ~~~')
if __name__ == '__main__':
l = []
print(os.cpu_count()) # 本机为4核
start = time.time()
for i in range(4):
p = Process(target=work)
# 耗时 3.9347474575042725多, 大部分时间耗费在创建进程上
p = Thread(target=work)
# 耗时 2.004810333251953
l.append(p)
p.start()
for p in l:
p.join()
stop = time.time()
print('run time is %s' % (stop - start))
测试实验分析:
我们有四个任务需要处理,处理方式肯定是要玩出并发的效果,解决方案可以是:
方案一:开启四个进程
方案二:一个进程下,开启四个线程
单核情况下,分析结果:
如果四个任务是计算密集型,没有多核来并行计算,方案一徒增了创建进程的开销,方案二胜
如果四个任务是I/O密集型,方案一创建进程的开销大,且进程的切换速度远不如线程,方案二胜
多核情况下,分析结果:
如果四个任务是计算密集型,多核意味着并行计算,在python中一个进程中同一时刻只有一个线程执行用不上多核,方案一胜
如果四个任务是I/O密集型,再多的核也解决不了I/O问题,方案二胜
得出结论:
现在的计算机基本上都是多核,python对于计算密集型的任务开多线程的效率并不能带来多大性能上的提升,甚至不如串行(没有大量切换)
但是,对于IO密集型的任务效率还是有显著提升的。
那么CPython实现中的GIL又是什么呢?GIL全称Global Interpreter Lock为了避免误导,我们还是来看一下官方给出的解释:
Python的解释器的版本:CPython、JPython、pypyPython
但是普遍使用的都是cPython解释器
在CPython解释器中GIL是一把互斥锁,用来阻止同一个进程下的多个线程的同时执行同一个进程下的多个线程无法利用多核优势。
疑问:python的多线程是不是一点用都没有???无法利用多核优势!
数据结构和GIL
Queue
- 标准库queue模块,提供FIFO的Queue、LIFO的队列、优先队列。
- Queue类是线程安全的,适用于多线程间安全的交换数据。内部使用了Lock和Condition。
- 在自定义容器类中,如果不加锁,是不可能获得准确的大小的,因为你刚读取到了一个大小,还没有取走数据,就有可能被其他线程改 了。
- Queue类的size虽然加了锁,但是,依然不能保证立即get、put就能成功,因为读取大小和get、put方法是分开的。
import queue
q = queue.Queue(8)
if q.qsize() == 7:
q.put() #上下两句可能会被打断
if q.qsize() == 1:
q.get() #未必会成功,同样上下两句会被打断
GIL全局解释器锁
-
CPython 在解释器进程级别有一把锁,叫做GIL,即全局解释器锁。
-
GIL 保证CPython进程中,只有一个线程执行字节码。甚至是在多核CPU的情况下,也只允许同时只能有一个CPU 上运行该进程的一个线程。
-
CPython中
IO密集型,某个线程阻塞,就会调度其他就绪线程;
CPU密集型,当前线程可能会连续的获得GIL,导致其它线程几乎无法使用CPU。
-
在CPython中由于有GIL存在,IO密集型,使用多线程较为合算;CPU密集型,使用多进程,要绕开GIL。
新版CPython正在努力优化GIL的问题,但不是移除。
如果在意多线程的效率问题,请绕行,选择其它语言erlang、Go等。
GIL保护的是解释器级的数据,保护用户自己的数据则需要自己加锁处理,如下图
Python中绝大多数内置数据结构的读、写操作都是原子操作。由于GIL的存在,Python的内置数据类型在多线程编程的时候就变成了安全的了,但是实际上它们本身不是
线程安全类型
- GIL不是python的特点而是cPython解释器的特点
- GIL是保证解释器级别的数据的安全
- GIL会导致同一个进程下的多个线程的无法同时执行
- 针对不同的数据还是需要加不同的锁处理
保留GIL的原因:
Guido坚持的简单哲学,对于初学者门槛低,不需要高深的系统知识也能安全、简单的使用Python。
而且移除GIL,会降低CPython单线程的执行效率。
Python的原始解释器CPython中存在着GIL,因此再解释执行Python代码时,会产生互斥锁来限制线程对共享资源的访问,知道解释器遇到I / O操作或者操作次数达到一定数目。
因此,由于GIL的存在,在进行多线程操作时,不能调用多个CPU内核,所有在CPU密集型操作时更倾向于使用多进程。
对于IO密集型操作,使用多线程则可以提高效率,例如Python爬虫的开发。
测试下面两个计算密集型代码:运行时长差不多
单线程计算:
import datetime
def calc():
sum = 0
for _ in range(1000000000): #10亿
sum += 1
start = datetime.datetime.now()
for i in range(4):
calc()
dalta = (datetime.datetime.now() - start).total_seconds()
print("耗时:{}".format(dalta))
多线程计算:
import datetime
import threading
def calc():
sum = 0
for _ in range(1000000000): #10亿
sum += 1
start = datetime.datetime.now()
for i in range(4):
threading.Thread(target=calc).start()
for i in threading.enumerate():
if i.name != "MainThread":
i.join()
dalta = (datetime.datetime.now() - start).total_seconds()
print("耗时:{}".format(dalta))
注意,不要在代码中出现print等访问IO的语句。访问IO,线程阻塞,会释放GIL锁,其他线程被调度。
程序1是单线程程序,所有calc()依次执行,根本就不是并发。在主线程内,函数串行执行。
程序2是多线程程序,calc()执行在不同的线程中,但是由于GIL的存在,线程的执行变成了假并发。但是这些线程 可以被调度到不同的CPU核心上执行,只不过GIL让同一时间该进程只有一个线程被执行。
从两段程序测试的结果来看,CPython中多线程根本没有任何优势,和一个线程执行时间相当。因为GIL的存在,尤其是像上面的计算密集型程序,和单线程串行效果相当。这样,实际上就没有用上CPU多核心的优势。
13、Windows / Linux 进程
查看进程、杀进程(tasklist、taskkill)
1)PID:进程的唯一标识。如果一个进程含有多个线程,所有线程调用 getpid 函数会返回相同的值。
2)PGID:进程组 ID。每个进程都会有进程组 ID,表示该进程所属的进程组。默认情况下新创建的进程会继承父进程的进程组 ID。
3)SID:会话 ID。每个进程也都有会话 ID。默认情况下,新创建的进程会继承父进程的会话 ID。
4)PPID:是程序的父进程号。
会话(session)是一个或多个进程组的集合
通常是由shell的管道线将几个进程变成一组的。
查看进程列表
tasklist | findstr “java”
通过进程号终止进程
taskkill /F /PID 2328
通过映像名称终止进程(支持通配符*模糊处理)
taskkill /F /IM powershell.exe
taskkill /F /IM power*.exe
在Python中查看进程、杀进程
import os
import time
# 2种方式打开应用
# (1)阻塞方式打开应用
os.system('"C:\\Program Files (x86)\\bd\\infoflow\\infoflow.exe"')
# (2)非阻塞方式打开应用
os.startfile('"C:\\Program Files (x86)\\bd\\infoflow\\infoflow.exe"')
# 等待10s
time.sleep(10)
# 关闭应用,2个进程
# 关闭进程1
os.system('"taskkill /F /IM infoflow.exe"')
# 关闭进程2
os.system('"taskkill /F /IM hiwebhelper.exe"')
Linux 查看进程之PS命令
要对进程进行检测和控制,首先必须要了解当前进程的情况,也就是需要查看当前进程运行状态。Linux 系统中我们可以使用 ps 命令查看进程。
ps(process status) 命令是 Linux 下最常用的进程查看工具,使用该命令可以确定哪些进程正在运行和运行的状态、进程是否结束、进程有没有僵尸、哪些进程占用了过多的资源等等。
注意:ps 命令工具显示的是进程的瞬间状态,并不是动态连续显示,如果想对进程状态进行实时监控应该用 top 命令。
使用标准语法 (Unix 风格) 查看各个进程
-e
:显示系统内所有进程的信息。与 -A 选项功能相同
-f
:使用完整 (full) 的格式显示进程信息,如果只有 ps -e 则输出进程信息的格式和只使用 ps 一样(都只有PID TTY TIME CMD这几项,但是输出信息的内容和ps的不一样)
各字段含义如下:
UID:启动该进程的用户的 ID 号
PPID:代表该进程的父进程的 ID 号
C:进程的 CPU 处理器利用率
STIME/START:表示进程的启动时间
14、互斥锁 / 事件 / 信号量
互斥锁 Lock
在之前我们了解到python多线程中的全局变量改变问题在线程中改变一个全局变量,多线程开发的时候共享全局变量会带来资源竞争效果。也就是数据不安全。所以为了掌控进程合理对变量的改变,我们用线程锁来控制。
互斥锁:每个线程几乎同时修改一个共享数据的时候,需要进行同步控制线程同步能够保证多个线程安全的访问竞争资源(全局内容),简单的同步机制就是使用互斥锁。
某个线程要更改共享数据时,先将其锁定,此时资源的状态为锁定状态,其他线程就能更改,直到该线程将资源状态改为非锁定状态,也就是释放资源,其他的线程才能再次锁定资源。互斥锁保证了每一次只有一个线程进入写入操作。从而保证了多线程下数据的安全性。
死锁
死锁:在多个线程共享资源的时候,如果两个线程分别占有一部分资源,并且同时等待对方的资源,就会造成死锁现象。如果锁之间相互嵌套,就有可能出现死锁。因此尽量不要出现锁之间的嵌套。
import time
from threading import Thread
value = 100
def task():
global value
temp = value
time.sleep(0.1) # 遇到IO,GIL自动释放
value = temp - 1
if __name__ == "__main__":
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
# 100个线程启动,自动去抢GIL锁
for i in range(100):
t_list[i].join()
print(value)
如果task函数中加time.sleep(0.1),那么最终的结果是99,如果不加sleep,那么最终的结果是0,为什么会产生这种所谓的情况呢?
如果不加sleep,当开启100个线程以后,这100线程会自动去争抢全局解释器锁(Global interpreter lock),当 Thread - A 线程,执行完,Thread - B 抢到 GIL 开启执行,但是此时的Temp 已经是 99了。
如果加sleep,当开启100个线程以后,这100线程会自动去争抢全局解释器锁(Global interpreter lock),当 Thread - A 线程,执行到 sleep(遇到IO,自动放弃全局解释器锁),Thread - B 抢到 GIL 开启执行,但是此时的Temp 仍然是100。
可见,在多线程中操作全局变量是危险的,不可控制的。GIL只是用了保护解释器级别的数据,自定义的全局变量需要自己加锁进行保护。
解决方案:调用Lock设置互斥锁。
import time
from threading import Thread, Lock
value = 100
mutex = Lock()
def task():
global value
mutex.acquire()
temp = value
time.sleep(0.01) # 遇到IO,GIL自动释放
value = temp - 1
"""
mutex.release()
with mutex:
temp = value
time.sleep(0.01) # 遇到IO,GIL自动释放
value = temp - 1
"""
if __name__ == "__main__":
t_list = []
for i in range(100):
t = Thread(target=task)
t.start()
t_list.append(t)
# 100个线程启动,自动去抢GIL锁
for i in range(100):
t_list[i].join()
print(value)
Python线程之threading.Event
python使用threading.Event可以使一个线程等待其他线程的通知,我们把这个Event传递到线程对象中,Event默认内置了一个标志,初始值为False。一旦该线程通过wait()方法进入等待状态,直到另一个线程调用该Event的set()方法将内置标志设置为True时,该Event会通知所有等待状态的线程恢复运行。
python线程的事件Event用于主线程控制其他线程的执行,事件主要提供了三个方法wait、clear、set。
事件处理的机制:全局定义了一个“Flag”,如果“Flag”值为 False,那么当程序执行 event.wait 方法时就会阻塞,如果“Flag”值为True,那么event.wait 方法时便不再阻塞。
- clear:将“Flag”设置为False
- set:将“Flag”设置为True
import threading
import time
class MyThread(threading.Thread):
def __init__(self, signal):
threading.Thread.__init__(self)
# 初始化
self.signal = signal
def run(self):
print("%s:I am %s,I will sleep ..." % (time.ctime(), self.name))
# 进入等待状态
self.signal.wait()
print("%s:I am %s, I awake..." % (time.ctime(), self.name))
if __name__ == "__main__":
# 初始 为 False
event_obj = threading.Event()
for t in range(0, 3):
thread = MyThread(event_obj)
thread.start()
print("%s:main thread sleep 3 seconds... " % time.ctime())
time.sleep(3)
# 唤醒含有event_obj, 处于等待状态的线程
event_obj.set()
运行展示如下所示:
Mon Jan 24 13:19:56 2022:I am Thread-1,I will sleep ...
Mon Jan 24 13:19:56 2022:I am Thread-2,I will sleep ...
Mon Jan 24 13:19:56 2022:I am Thread-3,I will sleep ...
Mon Jan 24 13:19:56 2022:main thread sleep 3 seconds...
Mon Jan 24 13:19:59 2022:I am Thread-2, I awake...
Mon Jan 24 13:19:59 2022:I am Thread-1, I awake...
Mon Jan 24 13:19:59 2022:I am Thread-3, I awake...
Python Threading Semaphore信号量(子线程的数量)
原因:主要用在保护有限的资源。
假设当前数据库支持最大连接数为3,将信号量初始值设为3,那么同时最大可以有三个线程连接数据库,其他线程若再想连接数据库,则只有等待,直到某一个线程释放数据库连接。
import logging
import threading
import time
import random
from threading import Semaphore
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s [*] %(message)s"
)
threads = []
lock_sm = Semaphore(3)
class connectdb(threading.Thread):
def run(self):
while True:
lock_sm.acquire()
logging.info(f"{self.name} connecting to db... ")
logging.info(f"{self.name} released db...")
time.sleep(2)
lock_sm.release()
if __name__ == '__main__':
for i in range(5):
threads.append(connectdb())
# 从五个线程取出三个
random_threads = random.sample(threads, 3)
# 阻塞启动线程 ---------------------------
for t in random_threads:
t.start()
for t in random_threads:
t.join()
注意:输出结果是每两秒打印的,跟数据库连接操作差的远呢
15、Pool线程池原理介绍
什么是池?
池是用来保证计算机硬件安全的情况下最大限度的利用计算机。
它降低了程序的运行效率,但是保证了计算机硬件的安全从而让你写的程序能够正常运行。
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# Executor 执行者; 实行者; 遗嘱执行人
pool = ThreadPoolExecutor(max_workers=5)
max_workers就是线程池的个数,如果不指定的话,那么默认是你计算机CPU的个数+4,下面是源码的展示:
任务的提交方式
同步 : 提交任务之后原地等待任务的返回结果期间不做任何事
异步 : 提交任务之后不等待任务的返回结果执行继续往下执行
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
# Executor 执行者; 实行者; 遗嘱执行人
pool = ThreadPoolExecutor(max_workers=5)
def task(n):
time.sleep(5)
return n
for i in range(20):
res = pool.submit(task, i)
print(res.result())
# res.result() 会同步阻塞程序,等待返回值
关于pool线程池常见的方法进行介绍:
submit(fn, *args, **kwargs)
"""
Submits a callable to be executed with the given arguments.
Schedules the callable to be executed as fn(*args, **kwargs) and returns
a Future instance representing the execution of the callable.
Returns:
A Future representing the given call.
"""
pool.shutdown(wait=True, cancel_futures=False)
"""
It is safe to call this method several times. Otherwise, no other
methods can be called after this one.
wait:
If True then shutdown will not return until all running
futures have finished executing and the resources used by the
executor have been reclaimed.
cancel_futures:
If True then shutdown will cancel all pending
futures. Futures that are completed or running will not be
cancelled.
"""
通过多线程改善传输层TCP协议套接字的会话案例:
import socket
import threading
def communication(conn):
while True:
try:
data = conn.recv(1024)
if len(data) == 0:
break
print(threading.enumerate())
conn.send(data.upper())
except ConnectionResetError as e:
print(e)
break
conn.close()
def server():
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM, 0)
server.bind(('127.0.0.1', 8080))
server.listen(5)
while True:
conn, cli_address = server.accept()
t_conn = threading.Thread(target=communication, args=(conn, ))
t_conn.start()
if __name__ == "__main__":
server()
16、Pool进程池原理介绍
在07小节已经介绍了进程池的使用,但是07小节介绍的是multiprocessing中的Pool进程池,在本节介绍的是concurrent中的进程池创建。
This probably means that you are not using fork to start your child processes and you have forgotten to use the proper idiom in the main module 在window系统必须在
__name__ == "__main__"
import time
from concurrent.futures import ProcessPoolExecutor, ThreadPoolExecutor
from concurrent.futures import Future
# Executor 执行者; 实行者; 遗嘱执行人
pool = ProcessPoolExecutor(max_workers=5)
def task(var):
time.sleep(2)
return var
def callback_fn(resp: Future):
print(resp.result())
if __name__ == "__main__":
for i in range(20):
pool.submit(task, i).add_done_callback(callback_fn)
解释:submit函数返回的是Future对象,add_done_callback(callback_fn)是设置回调函数,到进程结束后,会把Future对象传给callback_fn函数,然后执行callback_fn函数。
类似于JavaScript的回调函数,同样都是把回调函数作为参数传递给异步的函数的。
add_done_callback(self, fn) 的源码展示:
def add_done_callback(self, fn):
"""
Attaches a callable that will be called when the future finishes.
Args:
fn: A callable that will be called with this future as its only
argument when the future completes or is cancelled. The callable
will always be called by a thread in the same process in which
it was added. If the future has already completed or been
cancelled then the callable will be called immediately. These
callables are called in the order that they were added.
"""
with self._condition:
if self._state not in [CANCELLED, CANCELLED_AND_NOTIFIED, FINISHED]:
self._done_callbacks.append(fn)
return
try:
fn(self)
except Exception:
LOGGER.exception('exception calling callback for %r', self)
17、协程基本概念解释
协程不是进程,也不是线程,它就是一个函数,一个特殊的函数——可以在某个地方挂起,并且可以重新在挂起处继续运行。所以说,协程与进程、线程相比,不是一个维度的概念。
一个进程可以包含多个线程,一个线程也可以包含多个协程,也就是说,一个线程内可以有多个那样的特殊函数在运行。但是有一点,必须明确,一个线程内的多个协程的运行是串行的。如果有多核CPU的话,多个进程或一个进程内的多个线程是可以并行运行的,但是一个线程内的多个协程却绝对串行的,无论有多少个CPU(核)。
这个比较好理解,毕竟协程虽然是一个特殊的函数,但仍然是一个函数。一个线程内可以运行多个函数,但是这些函数都是串行运行的。当一个协程运行时,其他协程必须挂起。
协程与进程、线程的比较
虽然说,协程与进程、线程相比不是一个维度的概念,但是有时候,我们仍然需要将它们做一番比较,具体如下:
- 协程既不是进程,也不是线程,协程仅仅是一个特殊的函数,协程跟他们就不是一个维度。
- 一个进程可以包含多个线程,一个线程可以包含多个协程。
- 一个线程内的多个协程虽然可以切换,但是这多个协程是串行执行的,只能在这一个线程内运行,没法利用CPU多核能力。
- 协程与进程一样,它们的切换都存在上下文切换问题。
表面上,进程、线程、协程都存在上下文切换的问题,但是三者上下文切换又有明显不同,见下表:
协程,英文Coroutines,是一种比线程更加轻量级的存在。正如一个进程可以拥有多个线程一样,一个线程也可以拥有多个协程。
最重要的是,协程不是被操作系统内核所管理,而完全是由程序所控制(也就是在用户态执行)。
正如刚才所写的代码示例,python可以通过 yield/send 的方式实现协程。在python 3.5以后, async/await 成为了更好的替代方案。
python中使用协程最常用的库就是asyncio,首先先介绍几个概念:
event_loop 事件循环:相当于一个无限循环,我们可以把一些函数注册到这个事件循环上,当满足条件时,就会调用对应的处理方法。
coroutine 协程:协程对象,只一个使用async关键字定义的函数,他的调用不会立即执行函数,而是会返回一个协程对象。协程对象需要注册到事件循环中,由事件循环调用。
task 任务:一个协程对象就是一个原生可以挂起的函数,任务则是对协程的进一步封装,其中包含任务的各种状态。
future:代表将来执行或没有执行的任务结果。它与task没有本质的区别。
async/await 关键字:python3.5用于定义协程的关键字,async定义一个协程,await用于挂起阻塞的异步调用接口。
yield不能检测IO,实现遇到IO自动切换,如下所示,yield不能检测到io进行自动的切换。
import time
def func1():
while True:
print('func1')
yield
def func2():
g=func1()
for i in range(10000000):
i+1
next(g)
time.sleep(3)
print('func2')
start=time.time()
func2()
stop=time.time()
print(stop-start)
对于单线程下,我们不可避免程序中出现io操作,但如果我们能在自己的程序中(即用户程序级别,而非操作系统级别)控制单线程下的多个任务能在一个任务遇到io阻塞时就切换到另外一个任务去计算,这样就保证了该线程能够最大限度地处于就绪态,即随时都可以被cpu执行的状态。
相当于我们在用户程序级别将自己的io操作最大限度地隐藏起来,从而可以迷惑操作系统,让其看到:该线程好像是一直在计算,io比较少,从而更多的将cpu的执行权限分配给我们的线程。
Python协程的使用:
import time
from gevent import monkey
monkey.patch_all()
import gevent
def task_A():
print("A start")
time.sleep(2)
print("A end")
def task_B():
print("B start")
time.sleep(3)
print("B end")
start_time = time.time()
t_A = gevent.spawn(task_A)
t_B = gevent.spawn(task_B)
t_A.join()
t_B.join()
end_time = time.time()
print(end_time - start_time)
运行的结果:task_A、task_B在遇到IO的时候,会互相的切换。
A start
B start
A end
B end
3.0235118865966797
注意gevent本身检测不到IO,借助monkey猴子插件来检测IO:
from gevent import monkey
monkey.patch_all()
协程实现Websocket的并发
服务器端的代码:
from gevent import monkey;monkey.patch_all()
from socket import *
import gevent
#如果不想用money.patch_all()打补丁,可以用gevent自带的socket
# from gevent import socket
# s=socket.socket()
def server(server_ip,port):
s=socket(AF_INET,SOCK_STREAM)
s.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
s.bind((server_ip,port))
s.listen(5)
while True:
conn,addr=s.accept()
gevent.spawn(talk,conn,addr)
def talk(conn,addr):
try:
while True:
res=conn.recv(1024)
print('client %s:%s msg: %s' %(addr[0],addr[1],res))
conn.send(res.upper())
except Exception as e:
print(e)
finally:
conn.close()
if __name__ == '__main__':
server('127.0.0.1',8080)
客户端的代码:
#_*_coding:utf-8_*_
__author__ = 'Linhaifeng'
from socket import *
client=socket(AF_INET,SOCK_STREAM)
client.connect(('127.0.0.1',8080))
while True:
msg=input('>>: ').strip()
if not msg:continue
client.send(msg.encode('utf-8'))
msg=client.recv(1024)
print(msg.decode('utf-8'))
多线程并发多个客户端:
from threading import Thread
from socket import *
import threading
def client(server_ip,port):
c=socket(AF_INET,SOCK_STREAM) #套接字对象一定要加到函数内,即局部名称空间内,放在函数外则被所有线程共享,则大家公用一个套接字对象,那么客户端端口永远一样了
c.connect((server_ip,port))
count=0
while True:
c.send(('%s say hello %s' %(threading.current_thread().getName(),count)).encode('utf-8'))
msg=c.recv(1024)
print(msg.decode('utf-8'))
count+=1
if __name__ == '__main__':
for i in range(500):
t=Thread(target=client,args=('127.0.0.1',8080))
t.start()
18、阻塞 / 非阻塞 IO模型
在linux中,默认情况下所有的socket都是blocking,一个典型的读操作流程大概是这样:
当用户进程调用了recvfrom这个系统调用,kernel就开始了IO的第一个阶段:准备数据。对于network io来说,很多时候数据在一开始还没有到达(比如,还没有收到一个完整的UDP包),这个时候kernel就要等待足够的数据到来。
而在用户进程这边,整个进程会被阻塞。当kernel一直等到数据准备好了,它就会将数据从kernel中拷贝到用户内存,然后kernel返回结果,用户进程才解除block的状态,重新运行起来。 所以,blocking IO的特点就是在IO执行的两个阶段(等待数据和拷贝数据两个阶段)都被block了。
几乎所有的程序员第一次接触到的网络编程都是从listen()、send()、recv() 等接口开始的,使用这些接口可以很方便的构建服务器/客户机的模型。然而大部分的socket接口都是阻塞型的。
谓阻塞型接口是指系统调用(一般是IO接口)不返回调用结果并让当前线程一直阻塞,只有当该系统调用获得结果或者超时出错时才返回。
实际上,除非特别指定,几乎所有的IO接口 ( 包括socket接口 ) 都是阻塞型的。这给网络编程带来了一个很大的问题,如在调用recv(1024)的同时,线程将被阻塞,在此期间,线程将无法执行任何运算或响应任何的网络请求。
简单的解决方案:在服务器端使用多线程(或多进程)。多线程(或多进程)的目的是让每个连接都拥有独立的线程(或进程),这样任何一个连接的阻塞都不会影响其他的连接。
该方案的问题是:开启多进程或都线程的方式,在遇到要同时响应成百上千路的连接请求,则无论多线程还是多进程都会严重占据系统资源,降低系统对外界响应效率,而且线程与进程本身也更容易进入假死状态。
改进方案:很多程序员可能会考虑使用“线程池”或“连接池”。“线程池”旨在减少创建和销毁线程的频率,其维持一定合理数量的线程,并让空闲的线程重新承担新的执行任务。“连接池”维持连接的缓存池,尽量重用已有的连接、减少创建和关闭连接的频率。这两种技术都可以很好的降低系统开销,都被广泛应用很多大型系统,如websphere、tomcat和各种数据库等。
改进后方案其实也存在着问题:“线程池”和“连接池”技术也只是在一定程度上缓解了频繁调用IO接口带来的资源占用。而且,所谓“池”始终有其上限,当请求大大超过上限时,“池”构成的系统对外界的响应并不比没有池的时候效果好多少。所以使用“池”必须考虑其面临的响应规模,并根据响应规模调整“池”的大小。
对应上例中的所面临的可能同时出现的上千甚至上万次的客户端请求,“线程池”或“连接池”或许可以缓解部分压力,但是不能解决所有问题。总之,多线程模型可以方便高效的解决小规模的服务请求,但面对大规模的服务请求,多线程模型也会遇到瓶颈,可以用非阻塞接口来尝试解决这个问题。
19、非阻塞IO模型测试应用
非阻塞IO(non-blocking IO)
Linux下,可以通过设置socket使其变为non-blocking。当对一个non-blocking socket执行读操作时,流程是这个样子:
从图中可以看出,当用户进程发出read操作时,如果kernel中的数据还没有准备好,那么它并不会block用户进程,而是立刻返回一个error。从用户进程角度讲 ,它发起一个read操作后,并不需要等待,而是马上就得到了一个结果。用户进程判断结果是一个error时,它就知道数据还没有准备好,于是用户就可以在本次到下次再发起read询问的时间间隔内做其他事情,或者直接再次发送read操作。一旦kernel中的数据准备好了,并且又再次收到了用户进程的system call,那么它马上就将数据拷贝到了用户内存(这一阶段仍然是阻塞的),然后返回。
也就是说非阻塞的recvform系统调用调用之后,进程并没有被阻塞,内核马上返回给进程,如果数据还没准备好,此时会返回一个error。进程在返回之后,可以干点别的事情,然后再发起recvform系统调用。重复上面的过程,循环往复的进行recvform系统调用。这个过程通常被称之为轮询。轮询检查内核数据,直到数据准备好,再拷贝数据到进程,进行数据处理。需要注意,拷贝数据整个过程,进程仍然是属于阻塞的状态。
所以,在非阻塞式IO中,用户进程其实是需要不断的主动询问kernel数据准备好了没有。
import socket
server = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
server.bind(('127.0.0.1', 8080))
server.listen(5)
server.setblocking(False)
while True:
try:
conn, addr = server.accept()
while True:
data: str = conn.recv()
if len(data) == 0:
break
conn.send(data.upper())
except BlockingIOError as err:
print("do other things")
无阻塞IO在TCP socket服务器端的应用:
import socket
import time
server = socket.socket()
server.setsockopt(socket.SOL_SOCKET, socket.SO_REUSEADDR, 1)
server.bind(('127.0.0.1', 8083))
server.listen(5)
server.setblocking(False)
r_list = []
w_list = {}
while True:
try:
conn, addr = server.accept()
r_list.append(conn)
except BlockingIOError:
# 强调强调强调:!!!非阻塞IO的精髓在于完全没有阻塞!!!
# time.sleep(0.5) # 打开该行注释纯属为了方便查看效果
print('在做其他的事情')
print('rlist: ', len(r_list))
print('wlist: ', len(w_list))
# 遍历读列表,依次取出套接字读取内容
del_rlist = []
for conn in r_list:
try:
data = conn.recv(1024)
if not data:
conn.close()
del_rlist.append(conn)
continue
w_list[conn] = data.upper()
except BlockingIOError: # 没有收成功,则继续检索下一个套接字的接收
continue
except ConnectionResetError: # 当前套接字出异常,则关闭,然后加入删除列表,等待被清除
conn.close()
del_rlist.append(conn)
# 遍历写列表,依次取出套接字发送内容
del_wlist = []
for conn, data in w_list.items():
try:
conn.send(data)
del_wlist.append(conn)
except BlockingIOError:
continue
# 清理无用的套接字,无需再监听它们的IO操作
for conn in del_rlist:
r_list.remove(conn)
for conn in del_wlist:
w_list.pop(conn)
Client 客户端的使用:
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
但是非阻塞IO模型绝不被推荐。
我们不能否则其优点:能够在等待任务完成的时间里干其他活了(包括提交其他任务,也就是 “后台” 可以有多个任务在“”同时“”执行)。
但是也难掩其缺点:
- 循环调用recv()将大幅度推高CPU占用率;这也是我们在代码中留一句time.sleep(2)的原因,否则在低配主机下极容易出现卡机情况
- 任务完成的响应延迟增大了,因为每过一段时间才去轮询一次read操作,而任务可能在两次轮询之间的任意时间完成。这会导致整体数据吞吐量的降低。
此外,在这个方案中recv()更多的是起到检测“操作是否完成”的作用,实际操作系统提供了更为高效的检测“操作是否完成“作用的接口,例如select()多路复用模式,可以一次检测多个连接是否活跃。
20、多路复用IO原理介绍
多路复用IO(IO multiplexing)
IO multiplexing这个词可能有点陌生,但是如果我说select/epoll,大概就都能明白了。有些地方也称这种IO方式为事件驱动IO(event driven IO)。我们都知道,select/epoll的好处就在于单个process就可以同时处理多个网络连接的IO。它的基本原理就是select/epoll这个function会不断的轮询所负责的所有socket,当某个socket有数据到达了,就通知用户进程。它的流程如图:
当用户进程调用了select,那么整个进程会被block,而同时,kernel会“监视”所有select负责的socket,当任何一个socket中的数据准备好了,select就会返回。这个时候用户进程再调用read操作,将数据从kernel拷贝到用户进程。 这个图和blocking IO的图其实并没有太大的不同,事实上还更差一些。因为这里需要使用两个系统调用(select和recvfrom),而blocking IO只调用了一个系统调用(recvfrom)。但是,用select的优势在于它可以同时处理多个connection。
强调:
-
如果处理的连接数不是很高的话,使用select/epoll的web server不一定比使用multi-threading + blocking IO的web server性能更好,可能延迟还更大。select/epoll的优势并不是对于单个连接能处理得更快,而是在于能处理更多的连接。
-
在多路复用模型中,对于每一个socket,一般都设置成为non-blocking,但是,如上图所示,整个用户的process其实是一直被block的。只不过process是被select这个函数block,而不是被socket IO给block。
结论: select的优势在于可以处理多个连接,不适用于单个连接
select网络IO模型
客户端的实现:
from socket import *
import select
server = socket(AF_INET, SOCK_STREAM)
server.bind(('127.0.0.1', 8093))
server.listen(5)
server.setblocking(False)
print('starting...')
rlist = [server, ]
wlist = []
wdata = {}
while True:
rl, wl, xl = select.select(rlist, wlist, [], 0.5)
print(wl)
for sock in rl:
if sock == server:
conn, addr = sock.accept()
rlist.append(conn)
else:
try:
data = sock.recv(1024)
if not data:
sock.close()
rlist.remove(sock)
continue
wlist.append(sock)
wdata[sock] = data.upper()
except Exception:
sock.close()
rlist.remove(sock)
for sock in wl:
sock.send(wdata[sock])
wlist.remove(sock)
wdata.pop(sock)
客户端的实现:
from socket import *
client = socket(AF_INET, SOCK_STREAM)
client.connect(('127.0.0.1', 8093))
while True:
msg = input('>>: ').strip()
if not msg: continue
client.send(msg.encode('utf-8'))
data = client.recv(1024)
print(data.decode('utf-8'))
client.close()
select监听fd变化的过程分析:
-
用户进程创建socket对象,拷贝监听的fd到内核空间,每一个fd会对应一张系统文件表,内核空间的fd响应到数据后,就会发送信号给用户进程数据已到;
-
用户进程再发送系统调用,比如(accept)将内核空间的数据copy到用户空间,同时作为接受数据端内核空间的数据清除,这样重新监听时fd再有新的数据又可以响应到了(发送端因为基于TCP协议所以需要收到应答后才会清除)。
该模型的优点:
比其他模型,使用select() 的事件驱动模型只用单线程(进程)执行,占用资源少,不消耗太多 CPU,同时能够为多客户端提供服务。如果试图建立一个简单的事件驱动的服务器程序,这个模型有一定的参考价值。
该模型的缺点:
首先select()接口并不是实现“事件驱动”的最好选择。因为当需要探测的句柄值较大时,select()接口本身需要消耗大量时间去轮询各个句柄。很多操作系统提供了更为高效的接口,如linux提供了epoll,BSD提供了kqueue,Solaris提供了/dev/poll,…。如果需要实现更高效的服务器程序,类似epoll这样的接口更被推荐。遗憾的是不同的操作系统特供的epoll接口有很大差异,所以使用类似于epoll的接口实现具有较好跨平台能力的服务器会比较困难。
其次,该模型将事件探测和事件响应夹杂在一起,一旦事件响应的执行体庞大,则对整个模型是灾难性的。
21、异步IO模型原理介绍
Linux下的asynchronous IO其实用得不多,从内核2.6版本才开始引入。先看一下它的流程:
用户进程发起read操作之后,立刻就可以开始去做其它的事。而另一方面,从kernel的角度,当它受到一个asynchronous read之后,首先它会立刻返回,所以不会对用户进程产生任何block。然后,kernel会等待数据准备完成,然后将数据拷贝到用户内存,当这一切都完成之后,kernel会给用户进程发送一个signal,告诉它read操作完成了。
IO模型比较分析
到目前为止,已经将四个IO Model都介绍完了。现在回过头来回答最初的那几个问题:blocking和non-blocking的区别在哪,synchronous IO和asynchronous IO的区别在哪。 先回答最简单的这个:blocking vs non-blocking。前面的介绍中其实已经很明确的说明了这两者的区别。调用blocking IO会一直block住对应的进程直到操作完成,而non-blocking IO在kernel还准备数据的情况下会立刻返回。
再说明synchronous IO和asynchronous IO的区别之前,需要先给出两者的定义。Stevens给出的定义(其实是POSIX的定义)是这样子的: A synchronous I/O operation causes the requesting process to be blocked until that I/O operationcompletes; An asynchronous I/O operation does not cause the requesting process to be blocked; 两者的区别就在于synchronous IO做”IO operation”的时候会将process阻塞。按照这个定义,四个IO模型可以分为两大类,之前所述的blocking IO,non-blocking IO,IO multiplexing都属于synchronous IO这一类,而 asynchronous I/O后一类 。
有人可能会说,non-blocking IO并没有被block啊。这里有个非常“狡猾”的地方,定义中所指的”IO operation”是指真实的IO操作,就是例子中的recvfrom这个system call。non-blocking IO在执行recvfrom这个system call的时候,如果kernel的数据没有准备好,这时候不会block进程。但是,当kernel中数据准备好的时候,recvfrom会将数据从kernel拷贝到用户内存中,这个时候进程是被block了,在这段时间内,进程是被block的。而asynchronous IO则不一样,当进程发起IO 操作之后,就直接返回再也不理睬了,直到kernel发送一个信号,告诉进程说IO完成。在这整个过程中,进程完全没有被block。
各个IO Model的比较如图所示:
经过上面的介绍,会发现non-blocking IO和asynchronous IO的区别还是很明显的。在non-blocking IO中,虽然进程大部分时间都不会被block,但是它仍然要求进程去主动的check,并且当数据准备完成以后,也需要进程主动的再次调用recvfrom来将数据拷贝到用户内存。而asynchronous IO则完全不同。它就像是用户进程将整个IO操作交给了他人(kernel)完成,然后他人做完后发信号通知。在此期间,用户进程不需要去检查IO操作的状态,也不需要主动的去拷贝数据。
22、selectors模块实现并发
什么是IO复用?
IO复用:为了解释这个名词,首先来理解下复用这个概念,复用也就是共用的意思,这样理解还是有些抽象,为此,咱们来理解下复用在通信领域的使用,在通信领域中为了充分利用网络连接的物理介质,往往在同一条网络链路上采用时分复用或频分复用的技术使其在同一链路上传输多路信号,到这里我们就基本上理解了复用的含义,即公用某个“介质”来尽可能多的做同一类(性质)的事,那IO复用的“介质”是什么呢?
为此我们首先来看看服务器编程的模型,客户端发来的请求服务端会产生一个进程来对其进行服务,每当来一个客户请求就产生一个进程来服务,然而进程不可能无限制的产生,因此为了解决大量客户端访问的问题,引入了IO复用技术,即:一个进程可以同时对多个客户请求进行服务。
也就是说IO复用的“介质”是进程(准确的说复用的是select和poll,因为进程也是靠调用select和poll来实现的),复用一个进程(select和poll)来对多个IO进行服务,虽然客户端发来的IO是并发的但是IO所需的读写数据多数情况下是没有准备好的,因此就可以利用一个函数(select和poll)来监听IO所需的这些数据的状态,一旦IO有数据可以进行读写了,进程就来对这样的IO进行服务。
理解完IO复用后,我们在来看下实现IO复用中的三个API(select、poll和epoll)的区别和联系
select,poll,epoll都是IO多路复用的机制,I/O多路复用就是通过一种机制,可以监视多个描述符,一旦某个描述符就绪(一般是读就绪或者写就绪),能够通知应用程序进行相应的读写操作。
但select,poll,epoll本质上都是同步I/O,因为他们都需要在读写事件就绪后自己负责进行读写,也就是说这个读写过程是阻塞的,而异步I/O则无需自己负责进行读写,异步I/O的实现会负责把数据从内核拷贝到用户空间。三者的原型如下所示:
int select(int nfds, fd_set *readfds, fd_set *writefds, fd_set *exceptfds, struct timeval *timeout);
int poll(struct pollfd *fds, nfds_t nfds, int timeout);
int epoll_wait(int epfd, struct epoll_event *events, int maxevents, int timeout);
select的第一个参数nfds为fdset集合中最大描述符值加1,fdset是一个位数组,其大小限制为__FD_SETSIZE(1024),位数组的每一位代表其对应的描述符是否需要被检查。第二三四参数表示需要关注读、写、错误事件的文件描述符位数组,这些参数既是输入参数也是输出参数,可能会被内核修改用于标示哪些描述符上发生了关注的事件,所以每次调用select前都需要重新初始化fdset。timeout参数为超时时间,该结构会被内核修改,其值为超时剩余的时间。
select的调用步骤如下:
(1)使用copy_from_user从用户空间拷贝fdset到内核空间
(2)注册回调函数__pollwait
(3)遍历所有fd,调用其对应的poll方法(对于socket,这个poll方法是sock_poll,sock_poll根据情况会调用到tcp_poll,udp_poll或者datagram_poll)
(4)以tcp_poll为例,其核心实现就是__pollwait,也就是上面注册的回调函数。
(5)__pollwait的主要工作就是把current(当前进程)挂到设备的等待队列中,不同的设备有不同的等待队列,对于tcp_poll 来说,其等待队列是sk->sk_sleep(注意把进程挂到等待队列中并不代表进程已经睡眠了)。在设备收到一条消息(网络设备)或填写完文件数 据(磁盘设备)后,会唤醒设备等待队列上睡眠的进程,这时current便被唤醒了。
(6)poll方法返回时会返回一个描述读写操作是否就绪的mask掩码,根据这个mask掩码给fd_set赋值。
(7)如果遍历完所有的fd,还没有返回一个可读写的mask掩码,则会调用schedule_timeout是调用select的进程(也就是 current)进入睡眠。当设备驱动发生自身资源可读写后,会唤醒其等待队列上睡眠的进程。如果超过一定的超时时间(schedule_timeout 指定),还是没人唤醒,则调用select的进程会重新被唤醒获得CPU,进而重新遍历fd,判断有没有就绪的fd。
(8)把fd_set从内核空间拷贝到用户空间。
总结下select的几大缺点:
(1)每次调用select,都需要把fd集合从用户态拷贝到内核态,这个开销在fd很多时会很大
(2)同时每次调用select都需要在内核遍历传递进来的所有fd,这个开销在fd很多时也很大
(3)select支持的文件描述符数量太小了,默认是1024
poll与select不同,通过一个pollfd数组向内核传递需要关注的事件,故没有描述符个数的限制,pollfd中的events字段和revents分别用于标示关注的事件和发生的事件,故pollfd数组只需要被初始化一次。
poll的实现机制与select类似,其对应内核中的sys_poll,只不过poll向内核传递pollfd数组,然后对pollfd中的每个描述符进行poll,相比处理fdset来说,poll效率更高。poll返回后,需要对pollfd中的每个元素检查其revents值,来得指事件是否发生。
直到Linux2.6才出现了由内核直接支持的实现方法,那就是epoll,被公认为Linux2.6下性能最好的多路I/O就绪通知方法。epoll可以同时支持水平触发和边缘触发(Edge Triggered,只告诉进程哪些文件描述符刚刚变为就绪状态,它只说一遍,如果我们没有采取行动,那么它将不会再次告知,这种方式称为边缘触发),理论上边缘触发的性能要更高一些,但是代码实现相当复杂。epoll同样只告知那些就绪的文件描述符,而且当我们调用epoll_wait()获得就绪文件描述符时,返回的不是实际的描述符,而是一个代表就绪描述符数量的值,你只需要去epoll指定的一个数组中依次取得相应数量的文件描述符即可,这里也使用了内存映射(mmap)技术,这样便彻底省掉了这些文件描述符在系统调用时复制的开销。另一个本质的改进在于epoll采用基于事件的就绪通知方式。在select/poll中,进程只有在调用一定的方法后,内核才对所有监视的文件描述符进行扫描,而epoll事先通过epoll_ctl()来注册一个文件描述符,一旦基于某个文件描述符就绪时,内核会采用类似callback的回调机制,迅速激活这个文件描述符,当进程调用epoll_wait()时便得到通知。
epoll既然是对select和poll的改进,就应该能避免上述的三个缺点。那epoll都是怎么解决的呢?在此之前,我们先看一下epoll 和select和poll的调用接口上的不同,select和poll都只提供了一个函数——select或者poll函数。而epoll提供了三个函 数,epoll_create,epoll_ctl和epoll_wait,epoll_create是创建一个epoll句柄;epoll_ctl是注 册要监听的事件类型;epoll_wait则是等待事件的产生。
对于第一个缺点,epoll的解决方案在epoll_ctl函数中。每次注册新的事件到epoll句柄中时(在epoll_ctl中指定 EPOLL_CTL_ADD),会把所有的fd拷贝进内核,而不是在epoll_wait的时候重复拷贝。epoll保证了每个fd在整个过程中只会拷贝 一次。
对于第二个缺点,epoll的解决方案不像select或poll一样每次都把current轮流加入fd对应的设备等待队列中,而只在 epoll_ctl时把current挂一遍(这一遍必不可少)并为每个fd指定一个回调函数,当设备就绪,唤醒等待队列上的等待者时,就会调用这个回调 函数,而这个回调函数会把就绪的fd加入一个就绪链表)。epoll_wait的工作实际上就是在这个就绪链表中查看有没有就绪的fd(利用 schedule_timeout()实现睡一会,判断一会的效果,和select实现中的第7步是类似的)。
对于第三个缺点,epoll没有这个限制,它所支持的FD上限是最大可以打开文件的数目,这个数字一般远大于2048,举个例子, 在1GB内存的机器上大约是10万左右,具体数目可以cat /proc/sys/fs/file-max察看,一般来说这个数目和系统内存关系很大。
总结:
(1)select,poll实现需要自己不断轮询所有fd集合,直到设备就绪,期间可能要睡眠和唤醒多次交替。而epoll其实也需要调用 epoll_wait不断轮询就绪链表,期间也可能多次睡眠和唤醒交替,但是它是设备就绪时,调用回调函数,把就绪fd放入就绪链表中,并唤醒在 epoll_wait中进入睡眠的进程。虽然都要睡眠和交替,但是select和poll在“醒着”的时候要遍历整个fd集合,而epoll在“醒着”的 时候只要判断一下就绪链表是否为空就行了,这节省了大量的CPU时间,这就是回调机制带来的性能提升。
(2)select,poll每次调用都要把fd集合从用户态往内核态拷贝一次,并且要把current往设备等待队列中挂一次,而epoll只要 一次拷贝,而且把current往等待队列上挂也只挂一次(在epoll_wait的开始,注意这里的等待队列并不是设备等待队列,只是一个epoll内 部定义的等待队列),这也能节省不少的开销。
这三种IO多路复用模型在不同的平台有着不同的支持,而epoll在windows下就不支持,好在我们有selectors模块,帮我们默认选择当前平台下最合适的。
服务器端的代码实现:
from socket import *
import selectors
sel=selectors.DefaultSelector()
def accept(server_fileobj,mask):
conn,addr=server_fileobj.accept()
sel.register(conn,selectors.EVENT_READ,read)
def read(conn,mask):
try:
data=conn.recv(1024)
if not data:
print('closing',conn)
sel.unregister(conn)
conn.close()
return
conn.send(data.upper()+b'_SB')
except Exception:
print('closing', conn)
sel.unregister(conn)
conn.close()
server_fileobj=socket(AF_INET,SOCK_STREAM)
server_fileobj.setsockopt(SOL_SOCKET,SO_REUSEADDR,1)
server_fileobj.bind(('127.0.0.1',8088))
server_fileobj.listen(5)
server_fileobj.setblocking(False) #设置socket的接口为非阻塞
sel.register(server_fileobj,selectors.EVENT_READ,accept) #相当于网select的读列表里append了一个文件句柄server_fileobj,并且绑定了一个回调函数accept
while True:
events=sel.select() #检测所有的fileobj,是否有完成wait data的
for sel_obj,mask in events:
callback=sel_obj.data #callback=accpet
callback(sel_obj.fileobj,mask) #accpet(server_fileobj,1)
客户端的代码实现:
from socket import *
c=socket(AF_INET,SOCK_STREAM)
c.connect(('127.0.0.1',8088))
while True:
msg=input('>>: ')
if not msg:continue
c.send(msg.encode('utf-8'))
data=c.recv(1024)
print(data.decode('utf-8'))
23、asyncio协程并发编程
所谓「异步 IO」,就是你发起一个 IO 操作,却不用等它结束,你可以继续做其他事情,当它结束时,你会得到通知。
Asyncio 是并发(concurrency)的一种方式。当然对 Python 来说,并发编程还可以通过线程(threading)和多进程(multiprocessing)来实现。Asyncio 并不能带来真正的并行(parallelism)。
当然,因为 GIL(全局解释器锁)的存在,Python 的多线程也不能带来真正的并行。
可交给 asyncio 执行的任务,称为协程(coroutine)。一个协程可以放弃执行,把机会让给其它协程(即 yield from 或 await)。
协程的定义,需要使用 async def 语句
async def do_some_work(x): pass
do_some_work 便是一个协程。准确来说,do_some_work 是一个协程函数,可以通过 asyncio.iscoroutinefunction 来验证:
print(asyncio.iscoroutinefunction(do_some_work)) # True
这个协程什么都没做,我们让它睡眠几秒,以模拟实际的工作量 :
import asyncio
async def do_some_work(time):
print("Waiting " + str(time))
await asyncio.sleep(time)
这边注意下,要使用的是asyncio提供的sleep方法,不能使用time.sleep(),因为time.sleep()是同步阻塞的。
在解释 await 之前,有必要说明一下协程可以做哪些事。协程可以:
-
等待一个 future 结束
-
等待另一个协程(产生一个结果,或引发一个异常)
-
产生一个结果给正在等它的协程
-
引发一个异常给正在等它的协程
asyncio.sleep 也是一个协程,所以 await asyncio.sleep(arg) 就是等待另一个协程。可参见 asyncio.sleep 的文档:
sleep(delay, result=None, *, loop=None)
Coroutine that completes after a given time (in seconds).
简单来说,只有 loop 运行了,协程才可能运行:
下面先拿到当前线程缺省的 loop ,然后把协程对象交给 loop.run_until_complete,协程对象随后会在 loop 里得到
loop = asyncio.get_event_loop()
loop.run_until_complete(do_some_work(3))
run_until_complete 是一个阻塞(blocking)调用,直到协程运行结束,它才返回。这一点从函数名不难看出。
run_until_complete 的参数是一个 future,但是我们这里传给它的却是协程对象,之所以能这样,是因为它在内部做了检查,通过 ensure_future 函数把协程对象包装(wrap)成了 future。所以,我们可以写得更明显一些:
loop.run_until_complete(asyncio.ensure_future(do_some_work(3)))
回调函数
假如协程是一个 IO 的读操作,等它读完数据后,我们希望得到通知,以便下一步数据的处理。这一需求可以通过往 future 添加回调来实现。
def done_callback(futu):
print('Done')
futu = asyncio.ensure_future(do_some_work(3))
futu.add_done_callback(done_callback)
loop.run_until_complete(futu)
更多推荐
所有评论(0)