asyncio — Python 异步 I/O 框架

  一句话理解

  asyncio 是 Python 的标准库,让你用 单线程 写出 并发 的程序。核心思想:当一个任务在等待
  I/O(网络请求、文件读写、数据库查询)时,CPU 不会空等,而是"切出去"执行另一个任务。

  同步(串行):     |--任务A--|         |--任务B--|         |--任务C--|  → 总时间 = A+B+C
  异步(并发):     |--A--|--B--|--C--|--A--|--B--|--C--|--A--|          → 总时间 ≈ max(A,B,C)
                     ↑ 多个任务交替前进,谁不阻塞谁就跑

  ---
  核心概念(三板斧)

  1. async def / await — 协程(coroutine)

  # 普通函数
  def get_data():
      return requests.get("https://api.example.com")  # 阻塞!CPU 干等

  # 异步协程
  async def get_data():
      async with aiohttp.ClientSession() as session:
          async with session.get("https://api.example.com") as resp:
              return await resp.json()

  - async def 定义一个协程函数,调用它不会立即执行,而是返回一个协程对象。
  - await 挂起当前协程,让出控制权,等结果就绪后再继续。
  - 规则: 只能在 async def 函数内部使用 await。

  2. asyncio.run() — 入口

  async def main():
      result = await get_data()
      print(result)

  asyncio.run(main())   # ← 整个程序的唯一入口

  asyncio.run() 创建事件循环、运行协程、最后清理。只调用一次(放在程序顶层)。

  3. asyncio.gather() — 并发执行多个任务

  async def main():
      # 串行:3 秒 + 2 秒 + 1 秒 = 6 秒
      r1 = await fetch("url1")  # 等 3s
      r2 = await fetch("url2")  # 等 2s
      r3 = await fetch("url3")  # 等 1s

      # 并发:max(3, 2, 1) = 3 秒
      r1, r2, r3 = await asyncio.gather(
          fetch("url1"),
          fetch("url2"),
          fetch("url3"),
      )

  这是最常用的并发手段 — 同时发起多个 I/O 操作,等全部完成后再统一处理结果。

  ---
  常用 API 速查

  ┌─────────────────────────────────┬──────────────────────┬──────────────────────────┐
  │               API               │         用途         │           场景           │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.run(coro)               │ 顶层入口             │ 程序启动                 │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.gather(*tasks)          │ 并发执行,等全部完成 │ 批量请求                 │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.create_task(coro)       │ 创建后台任务         │ 不阻塞的 fire-and-forget │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.wait_for(coro, timeout) │ 带超时的等待         │ 防止挂死                 │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.as_completed(tasks)     │ 谁先完成先处理谁     │ 流式结果                 │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.sleep(n)                │ 异步睡眠             │ 等待 / 轮询              │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.Queue                   │ 异步队列             │ 生产者-消费者            │
  ├─────────────────────────────────┼──────────────────────┼──────────────────────────┤
  │ asyncio.Lock / Semaphore        │ 异步锁 / 信号量      │ 限流 / 互斥              │
  └─────────────────────────────────┴──────────────────────┴──────────────────────────┘

  ---
  典型场景及代码

  场景 1:并发 HTTP 请求

  import asyncio
  import aiohttp

  async def fetch(session, url):
      async with session.get(url) as resp:
          return await resp.json()

  async def main():
      urls = [f"https://api.example.com/item/{i}" for i in range(100)]
      async with aiohttp.ClientSession() as session:
          tasks = [fetch(session, url) for url in urls]
          results = await asyncio.gather(*tasks)  # 100 个请求并发
      return results

  asyncio.run(main())

  场景 2:限流(信号量)

  sem = asyncio.Semaphore(10)  # 最多同时 10 个

  async def fetch_with_limit(session, url):
      async with sem:           # 超过 10 个就排队
          return await fetch(session, url)

  场景 3:带超时

  try:
      result = await asyncio.wait_for(fetch(url), timeout=5.0)
  except asyncio.TimeoutError:
      print("请求超时!")

  场景 4:生产者-消费者

  async def producer(queue):
      for i in range(100):
          await queue.put(f"item-{i}")
          await asyncio.sleep(0.1)  # 模拟生产间隔

  async def consumer(queue, name):
      while True:
          item = await queue.get()
          if item is None:  # 终止信号
              break
          print(f"{name} 处理 {item}")
          queue.task_done()

  async def main():
      queue = asyncio.Queue(maxsize=20)
      # 1 个生产者 + 3 个消费者
      consumers = [asyncio.create_task(consumer(queue, f"worker-{i}")) for i in range(3)]
      await producer(queue)
      # 发出终止信号
      for _ in consumers:
          await queue.put(None)
      await asyncio.gather(*consumers)

  场景 5:谁先完成处理谁

  async def main():
      tasks = [fetch(url1), fetch(url2), fetch(url3)]
      for coro in asyncio.as_completed(tasks):
          result = await coro       # URL2 先返回就先处理
          print(f"获得结果: {result}")

  ---
  常见坑 & 最佳实践

  ┌───────────────────────────┬───────────────────────────────────────────────────────────┐
  │            坑             │                           说明                            │
  ├───────────────────────────┼───────────────────────────────────────────────────────────┤
  │ 不要在 async 里调阻塞函数 │ time.sleep() 会卡死整个事件循环!用 await asyncio.sleep() │
  ├───────────────────────────┼───────────────────────────────────────────────────────────┤
  │ requests → aiohttp        │ requests 是同步的,异步要用 aiohttp / httpx               │
  ├───────────────────────────┼───────────────────────────────────────────────────────────┤
  │ for 循环里 create_task    │ 如果循环中创建了 task 但从不 await,可能永远不会执行完    │
  ├───────────────────────────┼───────────────────────────────────────────────────────────┤
  │ 忘记 await                │ async def 的函数不 await 等于没调用                       │
  ├───────────────────────────┼───────────────────────────────────────────────────────────┤
  │ 在同步代码里 RUN 异步     │ 不要在已有事件循环的线程里再调用 asyncio.run()            │
  └───────────────────────────┴───────────────────────────────────────────────────────────┘

  判断是否可以"异步化"

  只有当你的程序主要时间花在 I/O 等待上(网络、数据库、文件)时,asyncio 才有意义。如果瓶颈是 CPU 计算,应该用 multiprocessing。

  ---
  与 LangChain 的关联

  你在 LangChain 中看到的 async 模式(ainvoke, astream 等)底层就是 asyncio。当你需要:

  - 并发调用多个 Agent
  - 同时处理多个用户请求
  - 不给 LLM 请求排队

  就可以用 LangChain 的异步接口 + asyncio:

  # 同步 — 串行,两次 LLM 调用依次执行
  r1 = chain.invoke("问题1")
  r2 = chain.invoke("问题2")

  # 异步 — 并发,两次 LLM 调用同时发出
  r1, r2 = await asyncio.gather(
      chain.ainvoke("问题1"),
      chain.ainvoke("问题2"),
  )

更多推荐