asyncio — Python 异步 I/O 框架
asyncio — Python 异步 I/O 框架
一句话理解
asyncio 是 Python 的标准库,让你用 单线程 写出 并发 的程序。核心思想:当一个任务在等待
I/O(网络请求、文件读写、数据库查询)时,CPU 不会空等,而是"切出去"执行另一个任务。
同步(串行): |--任务A--| |--任务B--| |--任务C--| → 总时间 = A+B+C
异步(并发): |--A--|--B--|--C--|--A--|--B--|--C--|--A--| → 总时间 ≈ max(A,B,C)
↑ 多个任务交替前进,谁不阻塞谁就跑
---
核心概念(三板斧)
1. async def / await — 协程(coroutine)
# 普通函数
def get_data():
return requests.get("https://api.example.com") # 阻塞!CPU 干等
# 异步协程
async def get_data():
async with aiohttp.ClientSession() as session:
async with session.get("https://api.example.com") as resp:
return await resp.json()
- async def 定义一个协程函数,调用它不会立即执行,而是返回一个协程对象。
- await 挂起当前协程,让出控制权,等结果就绪后再继续。
- 规则: 只能在 async def 函数内部使用 await。
2. asyncio.run() — 入口
async def main():
result = await get_data()
print(result)
asyncio.run(main()) # ← 整个程序的唯一入口
asyncio.run() 创建事件循环、运行协程、最后清理。只调用一次(放在程序顶层)。
3. asyncio.gather() — 并发执行多个任务
async def main():
# 串行:3 秒 + 2 秒 + 1 秒 = 6 秒
r1 = await fetch("url1") # 等 3s
r2 = await fetch("url2") # 等 2s
r3 = await fetch("url3") # 等 1s
# 并发:max(3, 2, 1) = 3 秒
r1, r2, r3 = await asyncio.gather(
fetch("url1"),
fetch("url2"),
fetch("url3"),
)
这是最常用的并发手段 — 同时发起多个 I/O 操作,等全部完成后再统一处理结果。
---
常用 API 速查
┌─────────────────────────────────┬──────────────────────┬──────────────────────────┐
│ API │ 用途 │ 场景 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.run(coro) │ 顶层入口 │ 程序启动 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.gather(*tasks) │ 并发执行,等全部完成 │ 批量请求 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.create_task(coro) │ 创建后台任务 │ 不阻塞的 fire-and-forget │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.wait_for(coro, timeout) │ 带超时的等待 │ 防止挂死 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.as_completed(tasks) │ 谁先完成先处理谁 │ 流式结果 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.sleep(n) │ 异步睡眠 │ 等待 / 轮询 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.Queue │ 异步队列 │ 生产者-消费者 │
├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
│ asyncio.Lock / Semaphore │ 异步锁 / 信号量 │ 限流 / 互斥 │
└─────────────────────────────────┴──────────────────────┴──────────────────────────┘
---
典型场景及代码
场景 1:并发 HTTP 请求
import asyncio
import aiohttp
async def fetch(session, url):
async with session.get(url) as resp:
return await resp.json()
async def main():
urls = [f"https://api.example.com/item/{i}" for i in range(100)]
async with aiohttp.ClientSession() as session:
tasks = [fetch(session, url) for url in urls]
results = await asyncio.gather(*tasks) # 100 个请求并发
return results
asyncio.run(main())
场景 2:限流(信号量)
sem = asyncio.Semaphore(10) # 最多同时 10 个
async def fetch_with_limit(session, url):
async with sem: # 超过 10 个就排队
return await fetch(session, url)
场景 3:带超时
try:
result = await asyncio.wait_for(fetch(url), timeout=5.0)
except asyncio.TimeoutError:
print("请求超时!")
场景 4:生产者-消费者
async def producer(queue):
for i in range(100):
await queue.put(f"item-{i}")
await asyncio.sleep(0.1) # 模拟生产间隔
async def consumer(queue, name):
while True:
item = await queue.get()
if item is None: # 终止信号
break
print(f"{name} 处理 {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=20)
# 1 个生产者 + 3 个消费者
consumers = [asyncio.create_task(consumer(queue, f"worker-{i}")) for i in range(3)]
await producer(queue)
# 发出终止信号
for _ in consumers:
await queue.put(None)
await asyncio.gather(*consumers)
场景 5:谁先完成处理谁
async def main():
tasks = [fetch(url1), fetch(url2), fetch(url3)]
for coro in asyncio.as_completed(tasks):
result = await coro # URL2 先返回就先处理
print(f"获得结果: {result}")
---
常见坑 & 最佳实践
┌───────────────────────────┬───────────────────────────────────────────────────────────┐
│ 坑 │ 说明 │
├───────────────────────────┼───────────────────────────────────────────────────────────┤
│ 不要在 async 里调阻塞函数 │ time.sleep() 会卡死整个事件循环!用 await asyncio.sleep() │
├───────────────────────────┼───────────────────────────────────────────────────────────┤
│ requests → aiohttp │ requests 是同步的,异步要用 aiohttp / httpx │
├───────────────────────────┼───────────────────────────────────────────────────────────┤
│ for 循环里 create_task │ 如果循环中创建了 task 但从不 await,可能永远不会执行完 │
├───────────────────────────┼───────────────────────────────────────────────────────────┤
│ 忘记 await │ async def 的函数不 await 等于没调用 │
├───────────────────────────┼───────────────────────────────────────────────────────────┤
│ 在同步代码里 RUN 异步 │ 不要在已有事件循环的线程里再调用 asyncio.run() │
└───────────────────────────┴───────────────────────────────────────────────────────────┘
判断是否可以"异步化"
只有当你的程序主要时间花在 I/O 等待上(网络、数据库、文件)时,asyncio 才有意义。如果瓶颈是 CPU 计算,应该用 multiprocessing。
---
与 LangChain 的关联
你在 LangChain 中看到的 async 模式(ainvoke, astream 等)底层就是 asyncio。当你需要:
- 并发调用多个 Agent
- 同时处理多个用户请求
- 不给 LLM 请求排队
就可以用 LangChain 的异步接口 + asyncio:
# 同步 — 串行,两次 LLM 调用依次执行
r1 = chain.invoke("问题1")
r2 = chain.invoke("问题2")
# 异步 — 并发,两次 LLM 调用同时发出
r1, r2 = await asyncio.gather(
chain.ainvoke("问题1"),
chain.ainvoke("问题2"),
)
更多推荐


所有评论(0)