日常开发中批量任务、接口请求、文件读写、爬虫抓取、数据计算等场景,串行执行效率低下,并发编程就是解决执行速度瓶颈的核心方案。

一、并发基础概念

1. 核心名词区分

  • 串行:任务挨个依次执行,前一个结束才运行下一个,效率最低
  • 并发:同一时间段交替处理多个任务,宏观同时运行,微观交替切换
  • 并行:同一时刻多核CPU同时执行多个任务,真正同时运行
  • 线程:轻量级执行单元,依附进程存在,共享进程资源,切换开销小
  • 进程:资源独立隔离,拥有独立内存空间,切换开销大
  • 协程:用户态轻量级任务,代码层面手动切换,无内核切换损耗

2. GIL全局解释器锁

Python独有锁机制,同一时刻一个进程内仅有一个线程执行

  • 限制:多线程无法利用多核CPU,CPU密集型任务多线程效率低
  • 适用:IO密集型(网络请求、文件读写、休眠等待)多线程优势明显

3. 任务场景选型

  • IO密集型:优先多线程 / 协程
  • CPU计算密集型:优先多进程
  • 高吞吐异步请求:优先asyncio协程

二、多线程编程 threading模块

1. 线程创建两种方式

方式一:函数式创建线程
import threading
import time

def task(name, delay):
    print(f"线程{name}开始执行,休眠{delay}秒")
    time.sleep(delay)
    print(f"线程{name}执行完毕")

if __name__ == '__main__':
    # 创建线程对象
    t1 = threading.Thread(target=task, args=("一号", 2))
    t2 = threading.Thread(target=task, args=("二号", 1))

    # 启动线程
    t1.start()
    t2.start()

    # 等待线程执行结束
    t1.join()
    t2.join()
    print("所有线程运行结束")
方式二:类继承创建线程
import threading
import time

class MyThread(threading.Thread):
    def __init__(self, name, delay):
        super().__init__()
        self.name = name
        self.delay = delay

    def run(self):
        print(f"线程{self.name}开始执行,休眠{self.delay}秒")
        time.sleep(self.delay)
        print(f"线程{self.name}执行完毕")

if __name__ == '__main__':
    t1 = MyThread("线程A", 1)
    t2 = MyThread("线程B", 2)
    t1.start()
    t2.start()
    t1.join()
    t2.join()

2. 线程守护

守护线程随主线程退出直接销毁,非守护线程执行完毕程序才结束

# daemon=True 设置为守护线程
t = threading.Thread(target=task, daemon=True)

3. 线程安全与互斥锁

多线程共享全局变量,并发修改会出现数据错乱,使用Lock加锁保证原子性

import threading

num = 0
lock = threading.Lock()

def add_count():
    global num
    # 上锁
    lock.acquire()
    for _ in range(100000):
        num += 1
    # 释放锁
    lock.release()

if __name__ == '__main__':
    t1 = threading.Thread(target=add_count)
    t2 = threading.Thread(target=add_count)
    t1.start()
    t2.start()
    t1.join()
    t2.join()
    print("最终统计值:", num)

4. 线程池 ThreadPoolExecutor

避免无限创建线程资源耗尽,池化复用线程,管控并发数量

from concurrent.futures import ThreadPoolExecutor
import time

def work(num):
    print(f"任务{num}执行中")
    time.sleep(1)
    return f"任务{num}完成"

if __name__ == '__main__':
    # 设置最大并发线程数
    with ThreadPoolExecutor(max_work=3) as pool:
        task_list = [pool.submit(work, i) for i in range(6)]
        # 遍历获取执行结果
        for res in task_list:
            print(res.result())

三、多进程编程 multiprocessing模块

1. 进程基础创建

进程资源相互独立,默认不共享数据,规避GIL锁限制,适合计算密集型

import multiprocessing
import time

def process_task(name):
    print(f"进程{name}启动运行")
    time.sleep(2)
    print(f"进程{name}运行结束")

if __name__ == '__main__':
    p1 = multiprocessing.Process(target=process_task, args=("P1",))
    p2 = multiprocessing.Process(target=process_task, args=("P2",))
    p1.start()
    p2.start()
    p1.join()
    p2.join()
    print("全部进程执行完毕")

2. 进程间数据通信

使用队列Queue实现进程安全数据传递

from multiprocessing import Process, Queue

def put_data(q):
    q.put("进程传递数据")

def get_data(q):
    print("接收数据:", q.get())

if __name__ == '__main__':
    q = Queue()
    p1 = Process(target=put_data, args=(q,))
    p2 = Process(target=get_data, args=(q,))
    p1.start()
    p2.start()
    p1.join()
    p2.join()

3. 进程池 ProcessPoolExecutor

批量管理进程,充分利用多核CPU算力

from concurrent.futures import ProcessPoolExecutor
import math

def calc_power(num):
    return math.pow(num, 3)

if __name__ == '__main__':
    with ProcessPoolExecutor(max_work=4) as pool:
        result = pool.map(calc_power, [1,2,3,4,5])
        print(list(result))

进程线程核心区别

  1. 内存:进程独立内存,线程共享进程内存
  2. 开销:进程创建销毁开销大,线程开销更小
  3. GIL:多进程突破GIL限制,线程受GIL约束
  4. 通信:进程通信复杂,线程共享变量通信简单

四、协程编程 asyncio

1. 协程基础概念

单线程内切换任务,无系统内核切换消耗,IO阻塞时自动切换其他任务,并发效率最高
关键字:async 定义协程函数,await 阻塞挂起任务

2. 基础协程写法

import asyncio

# 定义协程函数
async def demo1():
    print("协程任务1开始")
    await asyncio.sleep(2)
    print("协程任务1结束")

async def demo2():
    print("协程任务2开始")
    await asyncio.sleep(1)
    print("协程任务2结束")

async def main():
    # 创建并发任务
    t1 = asyncio.create_task(demo1())
    t2 = asyncio.create_task(demo2())
    await t1
    await t2

# 运行协程入口
asyncio.run(main())

3. 批量并发任务

import asyncio

async def task_func(num):
    print(f"异步任务{num}执行")
    await asyncio.sleep(1)
    return f"任务{num}完成"

async def main():
    tasks = [task_func(i) for i in range(5)]
    # 批量等待所有任务执行完毕
    res_list = await asyncio.gather(*tasks)
    print(res_list)

asyncio.run(main())

4. 异步文件读写

适配协程风格文件操作,不阻塞异步流程

import aiofiles

async def read_file():
    async with aiofiles.open("test.txt", "r", encoding="utf-8") as f:
        content = await f.read()
    print(content)

asyncio.run(read_file())

五、并发常见问题与解决方案

1. 死锁问题

多个线程互相持有对方所需锁,互相等待卡死

  • 解决:统一加锁顺序、减少嵌套锁、设置超时释放

2. 线程池/进程池参数设置

根据CPU核心数、IO阻塞程度合理设定最大并发数,避免阻塞雪崩

3. 并发异常捕获

批量任务中单独捕获异常,单个任务报错不影响整体流程

4. 三种并发选型总结

并发方式 适用场景 优缺点
多线程 IO密集、网络请求、文件读写 开销小,受GIL限制,无法多核计算
多进程 CPU密集、大规模数值计算 利用多核,资源开销大,通信繁琐
协程 高并发IO、爬虫、异步接口 效率最高,仅单线程运行,语法特殊

全文总结

本文详细介绍了串行 / 并发 / 并行、GIL 锁基础理论,覆盖多线程、多进程、协程三大并发实现方案。

更多推荐