python3 多线程优化
场景一:指定线程数,使其满负荷执行任务,避免线程空闲#!/usr/bin/env python# -*- coding: utf-8 -*-# Author: jia666# Time: 2021/3/18 10:22from threading import Threadimport timeimport random'''任务场景一个完整的流程,需要A函数循环执行已知:A的处理时间是动态的,线
·
场景一:指定线程数,使其满负荷执行任务,避免线程空闲
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author : jia666
# Time : 2021/3/18 10:22
from threading import Thread
import time
import random
'''任务场景
一个完整的流程,需要A函数循环执行
已知:A的处理时间是动态的,线程数固定
原来的执行逻辑:拥有一定数目的线程
1.所有的线程,全部执行A函数,然后join(),等待所有线程结束,
2.重复执行1
优化:为了提高单位时间内处理数据量,现在组织提出要求提高处理效率
可优化点:
1.全部线程执行A函数时,每个线程执行的时间是不固定的,当一个线程执行时间较长,但其他线程较短已完成时
2.由于线程join()的阻塞作用,存在多个线程等待一个线程的情况,所有全部完成后,再执行下一步操作,这样浪费了资源
优化猜想:在线程数一定时,分配空闲线程给A函数,减少线程空闲时间,使其满负荷
'''
class Sayhi(Thread):
def __init__(self,name):
super().__init__()
self.name=name
def run(self):
a = random.randint(1, 3) # 随机时间
time.sleep(a)
print('%s say hello' % self.name)
if __name__ == '__main__':
Thd_Num= 5 #多线程限定数
thread_list = [] # 多线程列表
i=0
while True:
while True:
thd_sign=False #新线程创建与否标志
if len(thread_list)==Thd_Num: #线程开启是否完全
thread_list = [t for t in thread_list if t.is_alive()] # 记录存活的线程
if len(thread_list) == Thd_Num: #存活线程已满负荷,等待一段时间后再次检测
print('睡眠3秒')
time.sleep(3)
else:
thd_sign = True
else:
thd_sign=True
if thd_sign: #创建新线程
for t in range(Thd_Num-len(thread_list)): #没有完全开启,继续开启新线程
t = Sayhi('aaa')
t.start()
thread_list.append(t)
break
应用场景二:指定线程数,动态分配线程给两个依赖函数,B依赖A
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author : jia666
# Time : 2021/3/18 16:25
import time
import queue
import random
from threading import Thread
'''任务场景
一个完整的流程,需要A函数与B函数执行,A函数生成pid,B函数依据pid进行数据处理,即B依赖A产生的数据
已知:A/B函数的处理时间是动态的,线程数固定
原来的执行逻辑:拥有一定数目的线程
1.所有的线程,全部执行A函数,然后join(),等待所有线程结束,
2.所有的线程,全部执行B函数,然后join(),等待所有线程结束,
3.重复执行1,2
remark:该执行逻辑为顺序执行,当A/B函数执行时,另一个函数是停止状态,
优化:为了提高单位时间内处理数据量,现在组织提出要求提高处理效率
可优化点:
1.全部线程执行A/B函数时,每个线程执行的时间是不固定的,当一个线程执行时间较长,但其他线程较短已完成时
2.由于线程join()的阻塞作用,存在多个线程等待一个线程的情况,所有全部完成后,再执行下一步操作,这样浪费了资源
优化猜想:在线程数一定时,动态分配线程给A/B函数,减少线程空闲时间,使其满负荷运转
'''
class A(Thread):
'模拟生成任务队列'
def __init__(self):
super().__init__()
def run(self):
# 模拟生成任务数据需要的时间
a=random.randint(1, 3) #随机时间
time.sleep(a)
task_que.put(a) #模拟生成的任务数据
print('[-]已生产%s个pid' % task_que.qsize())
class B(Thread):
'模拟处理任务队列'
def __init__(self):
super().__init__()
def run(self):
while not task_que.empty():
print('[-]剩余%s个加密pid' % task_que.qsize())
task_que.get() # 获取任务数据
a = random.randint(1, 3) # 模拟处理数据的需要的时间
time.sleep(a)
def wait_thread(SleepTime=3):
'检测线程是否存在空闲'
global thd_sign,thread_list
while not thd_sign:
if len(thread_list) == Thd_Num: # 线程达到限定数
thread_list = [t for t in thread_list if t.is_alive()] # 记录存活的线程
if len(thread_list) == Thd_Num: # 存活线程已满负荷,等待一段时间后再次检测
print('线程无空闲,睡眠%s秒'%SleepTime)
time.sleep(SleepTime) #多久检测一次线程是否空闲
else: # 线程存在空闲
thd_sign = True
else: # 线程存在空闲
thd_sign = True
thd_sign=False #重置为False
if __name__ == '__main__':
task_que = queue.Queue() # 任务队列
Thd_Num = 8 # 固定的线程数
thread_list=[] # 多线程列表
thd_sign = False # 新建线程标志
while True: #无限循环A,B操作
wait_thread() #检测线程是否存在空闲
for t in range(Thd_Num-len(thread_list)): #没有完全开启,继续开启新线程
sa = A()
sa.start()
thread_list.append(sa)
if not task_que.empty():#依据A函数产生数据而执行的B函数
while task_que.qsize()>Thd_Num: #当剩余的任务量小于线程数时,退出循环
wait_thread() # 检测线程是否存在空闲
for t in range(Thd_Num - len(thread_list)): # 没有完全开启,继续开启新线程
sa = B()
sa.start()
thread_list.append(sa)
补充知识
1 python 默认参数创建线程后,不管主线程是否执行完毕,都会等待子线程执行完毕才一起退出,有无join结果一样
2 如果创建线程,并且设置了daemon为true,即thread.setDaemon(True), 则主线程执行完毕后自动退出,不会等待子线程的执行结果。而且随着主线程退出,子线程也消亡。
3 join方法的作用是阻塞,等待子线程结束,join方法有一个参数是timeout,即如果主线程等待timeout,子线程还没有结束,则主线程强制结束子线程。
4 如果线程daemon属性为False, 则join里的timeout参数无效。主线程会一直等待子线程结束。
5 如果线程daemon属性为True, 则join里的timeout参数是有效的, 主线程会等待timeout时间后,结束子线程。此处有一个坑,即如果同时有N个子线程join(timeout),那么实际上主线程会等待的超时时间最长为 N * timeout, 因为每个子线程的超时开始时刻是上一个子线程超时结束的时刻
更多推荐
已为社区贡献16条内容
所有评论(0)