场景一:指定线程数,使其满负荷执行任务,避免线程空闲

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author  : jia666
# Time    : 2021/3/18 10:22
from threading import Thread
import time
import random
'''任务场景
一个完整的流程,需要A函数循环执行
已知:A的处理时间是动态的,线程数固定
原来的执行逻辑:拥有一定数目的线程
1.所有的线程,全部执行A函数,然后join(),等待所有线程结束,
2.重复执行1
优化:为了提高单位时间内处理数据量,现在组织提出要求提高处理效率
可优化点:
1.全部线程执行A函数时,每个线程执行的时间是不固定的,当一个线程执行时间较长,但其他线程较短已完成时
2.由于线程join()的阻塞作用,存在多个线程等待一个线程的情况,所有全部完成后,再执行下一步操作,这样浪费了资源
优化猜想:在线程数一定时,分配空闲线程给A函数,减少线程空闲时间,使其满负荷

'''
class Sayhi(Thread):
    def __init__(self,name):
        super().__init__()
        self.name=name
    def run(self):
        a = random.randint(1, 3)  # 随机时间
        time.sleep(a)
        print('%s say hello' % self.name)


if __name__ == '__main__':
    Thd_Num= 5 #多线程限定数
    thread_list = []  # 多线程列表
    i=0
    while True:
        while True:
            thd_sign=False  #新线程创建与否标志
            if len(thread_list)==Thd_Num: #线程开启是否完全
                thread_list = [t for t in thread_list if t.is_alive()]  # 记录存活的线程
                if len(thread_list) == Thd_Num: #存活线程已满负荷,等待一段时间后再次检测
                    print('睡眠3秒')
                    time.sleep(3)
                else:
                    thd_sign = True
            else:
                thd_sign=True
            if thd_sign: #创建新线程
                for t in range(Thd_Num-len(thread_list)): #没有完全开启,继续开启新线程
                    t = Sayhi('aaa')
                    t.start()
                    thread_list.append(t)
                break

应用场景二:指定线程数,动态分配线程给两个依赖函数,B依赖A

#!/usr/bin/env python
# -*- coding: utf-8 -*-
# Author  : jia666
# Time    : 2021/3/18 16:25

import time
import queue
import random
from threading import Thread

'''任务场景
一个完整的流程,需要A函数与B函数执行,A函数生成pid,B函数依据pid进行数据处理,即B依赖A产生的数据
已知:A/B函数的处理时间是动态的,线程数固定
原来的执行逻辑:拥有一定数目的线程
1.所有的线程,全部执行A函数,然后join(),等待所有线程结束,
2.所有的线程,全部执行B函数,然后join(),等待所有线程结束,
3.重复执行1,2
remark:该执行逻辑为顺序执行,当A/B函数执行时,另一个函数是停止状态,

优化:为了提高单位时间内处理数据量,现在组织提出要求提高处理效率
可优化点:
1.全部线程执行A/B函数时,每个线程执行的时间是不固定的,当一个线程执行时间较长,但其他线程较短已完成时
2.由于线程join()的阻塞作用,存在多个线程等待一个线程的情况,所有全部完成后,再执行下一步操作,这样浪费了资源

优化猜想:在线程数一定时,动态分配线程给A/B函数,减少线程空闲时间,使其满负荷运转
'''

class A(Thread):
    '模拟生成任务队列'
    def __init__(self):
        super().__init__()
    def run(self):
        # 模拟生成任务数据需要的时间
        a=random.randint(1, 3) #随机时间
        time.sleep(a)
        task_que.put(a) #模拟生成的任务数据
        print('[-]已生产%s个pid' % task_que.qsize())

class B(Thread):
    '模拟处理任务队列'
    def __init__(self):
        super().__init__()
    def run(self):
        while not task_que.empty():
            print('[-]剩余%s个加密pid' % task_que.qsize())
            task_que.get()  # 获取任务数据
            a = random.randint(1, 3)  # 模拟处理数据的需要的时间
            time.sleep(a)
def wait_thread(SleepTime=3):
    '检测线程是否存在空闲'
    global thd_sign,thread_list
    while not thd_sign:
        if len(thread_list) == Thd_Num:  # 线程达到限定数
            thread_list = [t for t in thread_list if t.is_alive()]  # 记录存活的线程
            if len(thread_list) == Thd_Num:  # 存活线程已满负荷,等待一段时间后再次检测
                print('线程无空闲,睡眠%s秒'%SleepTime)
                time.sleep(SleepTime) #多久检测一次线程是否空闲
            else:  # 线程存在空闲
                thd_sign = True
        else:  # 线程存在空闲
            thd_sign = True
    thd_sign=False  #重置为False
if __name__ == '__main__':
    task_que = queue.Queue()  # 任务队列
    Thd_Num = 8  # 固定的线程数
    thread_list=[]  # 多线程列表
    thd_sign = False # 新建线程标志

    while True: #无限循环A,B操作
        wait_thread() #检测线程是否存在空闲
        for t in range(Thd_Num-len(thread_list)): #没有完全开启,继续开启新线程
            sa = A()
            sa.start()
            thread_list.append(sa)
        if not task_que.empty():#依据A函数产生数据而执行的B函数
            while task_que.qsize()>Thd_Num: #当剩余的任务量小于线程数时,退出循环
                wait_thread()  # 检测线程是否存在空闲
                for t in range(Thd_Num - len(thread_list)):  # 没有完全开启,继续开启新线程
                    sa = B()
                    sa.start()
                    thread_list.append(sa)
补充知识
1 python 默认参数创建线程后,不管主线程是否执行完毕,都会等待子线程执行完毕才一起退出,有无join结果一样

2 如果创建线程,并且设置了daemon为true,即thread.setDaemon(True), 则主线程执行完毕后自动退出,不会等待子线程的执行结果。而且随着主线程退出,子线程也消亡。

3 join方法的作用是阻塞,等待子线程结束,join方法有一个参数是timeout,即如果主线程等待timeout,子线程还没有结束,则主线程强制结束子线程。

4 如果线程daemon属性为False, 则join里的timeout参数无效。主线程会一直等待子线程结束。

5 如果线程daemon属性为True, 则join里的timeout参数是有效的, 主线程会等待timeout时间后,结束子线程。此处有一个坑,即如果同时有N个子线程join(timeout),那么实际上主线程会等待的超时时间最长为 N * timeout, 因为每个子线程的超时开始时刻是上一个子线程超时结束的时刻
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐