很多自动化任务涉及网络请求、文件IO等耗时操作。如果一个个顺序执行,效率低得让人发指。异步编程能让你同时处理多个任务,效率提升不是一星半点。今天来聊聊Python异步处理的实战技巧。

一、async/await基础

Python 3.5引入的async/await语法是异步编程的核心:

import asyncio

async def fetch_data(url):
    """模拟异步获取数据"""
    print(f"开始获取: {url}")
    await asyncio.sleep(1)  # 模拟网络请求
    return f"数据来自 {url}"

async def main():
    # 顺序执行:耗时3秒
    # result1 = await fetch_data("url1")
    # result2 = await fetch_data("url2")
    # result3 = await fetch_data("url3")
    
    # 并发执行:耗时1秒
    results = await asyncio.gather(
        fetch_data("url1"),
        fetch_data("url2"),
        fetch_data("url3")
    )
    print(results)

asyncio.run(main())

二、实战:批量下载网页

用aiohttp实现高效批量下载:

import asyncio
import aiohttp
from pathlib import Path

async def download_one(session, url, folder):
    """下载单个网页"""
    try:
        async with session.get(url, timeout=30) as response:
            if response.status == 200:
                content = await response.text()
                filename = folder / f"{hash(url)}.html"
                async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
                    await f.write(content)
                return f"成功: {url}"
            else:
                return f"失败: {url}, 状态码: {response.status}"
    except Exception as e:
        return f"异常: {url}, 错误: {e}"

async def batch_download(urls, folder='./downloads', max_concurrent=10):
    """批量下载,支持并发控制"""
    folder = Path(folder)
    folder.mkdir(parents=True, exist_ok=True)
    
    semaphore = asyncio.Semaphore(max_concurrent)  # 限制并发数
    
    async with aiohttp.ClientSession() as session:
        async def download_with_limit(url):
            async with semaphore:
                return await download_one(session, url, folder)
        
        tasks = [download_with_limit(url) for url in urls]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        
    success = sum(1 for r in results if isinstance(r, str) and "成功" in r)
    print(f"完成:{success}/{len(urls)} 成功")
    return results

# 使用
urls = [f"https://example.com/page{i}" for i in range(100)]
asyncio.run(batch_download(urls, max_concurrent=20))

三、异步文件操作

aiofiles让文件IO也能异步:

import asyncio
import aiofiles

async def process_file(filepath):
    """异步处理单个文件"""
    async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
        content = await f.read()
    
    # 处理内容
    processed = content.upper()
    
    # 写回
    output_path = filepath.replace('.txt', '_processed.txt')
    async with aiofiles.open(output_path, 'w', encoding='utf-8') as f:
        await f.write(processed)
    
    return output_path

async def batch_process_files(filepaths, max_concurrent=5):
    """批量处理文件"""
    semaphore = asyncio.Semaphore(max_concurrent)
    
    async def process_with_limit(path):
        async with semaphore:
            return await process_file(path)
    
    tasks = [process_with_limit(p) for p in filepaths]
    return await asyncio.gather(*tasks, return_exceptions=True)

四、异步数据库操作

使用aiomysql实现异步数据库访问:

import asyncio
import aiomysql

async def fetch_users(pool, user_ids):
    """异步批量查询用户"""
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            placeholders = ','.join(['%s'] * len(user_ids))
            sql = f"SELECT * FROM users WHERE id IN ({placeholders})"
            await cur.execute(sql, user_ids)
            return await cur.fetchall()

async def batch_insert_users(pool, users_data):
    """异步批量插入用户"""
    async with pool.acquire() as conn:
        async with conn.cursor() as cur:
            sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
            await cur.executemany(sql, users_data)
            await conn.commit()
            return cur.rowcount

async def main():
    pool = await aiomysql.create_pool(
        host='localhost', port=3306, user='root',
        password='password', db='test', minsize=5, maxsize=20
    )
    
    try:
        # 批量查询
        users = await fetch_users(pool, [1, 2, 3, 4, 5])
        print(f"查到 {len(users)} 个用户")
        
        # 批量插入
        new_users = [('张三', 'zhangsan@example.com'), ('李四', 'lisi@example.com')]
        count = await batch_insert_users(pool, new_users)
        print(f"插入 {count} 条记录")
    finally:
        pool.close()
        await pool.wait_closed()

asyncio.run(main())

五、异步任务队列

处理需要排队的任务:

import asyncio
import queue
from threading import Thread

class AsyncTaskQueue:
    """异步任务队列"""
    
    def __init__(self, max_size=100):
        self.queue = asyncio.Queue(maxsize=max_size)
        self.results = {}
        self.task_id = 0
    
    async def add_task(self, coro):
        """添加任务"""
        task_id = self.task_id
        self.task_id += 1
        
        async def wrapped():
            try:
                result = await coro
                self.results[task_id] = {'status': 'success', 'result': result}
            except Exception as e:
                self.results[task_id] = {'status': 'error', 'error': str(e)}
        
        await self.queue.put(wrapped)
        return task_id
    
    async def process_all(self, workers=5):
        """处理所有任务"""
        async def worker():
            while True:
                try:
                    task = self.queue.get_nowait()
                except asyncio.QueueEmpty:
                    break
                await task()
                self.queue.task_done()
        
        workers = [asyncio.create_task(worker()) for _ in range(workers)]
        await asyncio.gather(*workers)
        await self.queue.join()
        
        return self.results

六、错误处理与重试

异步任务也要有容错机制:

import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential

async def fetch_with_retry(url, max_retries=3):
    """带重试的异步获取"""
    for attempt in range(max_retries):
        try:
            async with aiohttp.ClientSession() as session:
                async with session.get(url, timeout=10) as response:
                    if response.status == 200:
                        return await response.json()
                    else:
                        raise ValueError(f"HTTP {response.status}")
        except Exception as e:
            if attempt == max_retries - 1:
                raise
            await asyncio.sleep(2 ** attempt)  # 指数退避
            print(f"重试 {url}, 第 {attempt + 1} 次")

async def gather_with_errors(*tasks):
    """收集所有任务结果,包括错误"""
    results = await asyncio.gather(*tasks, return_exceptions=True)
    successes = [r for r in results if not isinstance(r, Exception)]
    errors = [r for r in results if isinstance(r, Exception)]
    
    if errors:
        print(f"失败 {len(errors)} 个任务")
        for e in errors[:3]:  # 最多打印3个错误
            print(f"  错误: {e}")
    
    return successes

七、性能对比

异步 vs 同步的性能差距有多大?

import time

# 同步方式:10个任务每个1秒 = 总共10秒
def sync_batch_download(urls):
    time.sleep(10)  # 假设每个请求1秒,10个就是10秒
    return len(urls)

# 异步方式:10个任务并发 = 约1秒
async def async_batch_download(urls):
    # 实际场景中,并发处理10个请求约需1秒
    await asyncio.sleep(1)
    return len(urls)

print(f"同步耗时: {time.time():.2f}s")
# asyncio.run(async_batch_download(urls))
# 异步耗时: ~1s

总结

异步处理的关键点:

  1. 适合场景:IO密集型任务(网络、文件、数据库)
  2. 核心语法:async/await + asyncio.gather
  3. 并发控制:Semaphore限制同时运行的任务数
  4. 错误处理:return_exceptions捕获所有错误
  5. 配合库:aiohttp、aiofiles、aiomysql等

对于自动化脚本中的批量网络请求、文件处理等场景,异步处理能带来5-10倍的效率提升,值得学习和使用。

更多推荐