Python异步处理实战:让自动化脚本效率翻倍
·
很多自动化任务涉及网络请求、文件IO等耗时操作。如果一个个顺序执行,效率低得让人发指。异步编程能让你同时处理多个任务,效率提升不是一星半点。今天来聊聊Python异步处理的实战技巧。
一、async/await基础
Python 3.5引入的async/await语法是异步编程的核心:
import asyncio
async def fetch_data(url):
"""模拟异步获取数据"""
print(f"开始获取: {url}")
await asyncio.sleep(1) # 模拟网络请求
return f"数据来自 {url}"
async def main():
# 顺序执行:耗时3秒
# result1 = await fetch_data("url1")
# result2 = await fetch_data("url2")
# result3 = await fetch_data("url3")
# 并发执行:耗时1秒
results = await asyncio.gather(
fetch_data("url1"),
fetch_data("url2"),
fetch_data("url3")
)
print(results)
asyncio.run(main())
二、实战:批量下载网页
用aiohttp实现高效批量下载:
import asyncio
import aiohttp
from pathlib import Path
async def download_one(session, url, folder):
"""下载单个网页"""
try:
async with session.get(url, timeout=30) as response:
if response.status == 200:
content = await response.text()
filename = folder / f"{hash(url)}.html"
async with aiofiles.open(filename, 'w', encoding='utf-8') as f:
await f.write(content)
return f"成功: {url}"
else:
return f"失败: {url}, 状态码: {response.status}"
except Exception as e:
return f"异常: {url}, 错误: {e}"
async def batch_download(urls, folder='./downloads', max_concurrent=10):
"""批量下载,支持并发控制"""
folder = Path(folder)
folder.mkdir(parents=True, exist_ok=True)
semaphore = asyncio.Semaphore(max_concurrent) # 限制并发数
async with aiohttp.ClientSession() as session:
async def download_with_limit(url):
async with semaphore:
return await download_one(session, url, folder)
tasks = [download_with_limit(url) for url in urls]
results = await asyncio.gather(*tasks, return_exceptions=True)
success = sum(1 for r in results if isinstance(r, str) and "成功" in r)
print(f"完成:{success}/{len(urls)} 成功")
return results
# 使用
urls = [f"https://example.com/page{i}" for i in range(100)]
asyncio.run(batch_download(urls, max_concurrent=20))
三、异步文件操作
aiofiles让文件IO也能异步:
import asyncio
import aiofiles
async def process_file(filepath):
"""异步处理单个文件"""
async with aiofiles.open(filepath, 'r', encoding='utf-8') as f:
content = await f.read()
# 处理内容
processed = content.upper()
# 写回
output_path = filepath.replace('.txt', '_processed.txt')
async with aiofiles.open(output_path, 'w', encoding='utf-8') as f:
await f.write(processed)
return output_path
async def batch_process_files(filepaths, max_concurrent=5):
"""批量处理文件"""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_with_limit(path):
async with semaphore:
return await process_file(path)
tasks = [process_with_limit(p) for p in filepaths]
return await asyncio.gather(*tasks, return_exceptions=True)
四、异步数据库操作
使用aiomysql实现异步数据库访问:
import asyncio
import aiomysql
async def fetch_users(pool, user_ids):
"""异步批量查询用户"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
placeholders = ','.join(['%s'] * len(user_ids))
sql = f"SELECT * FROM users WHERE id IN ({placeholders})"
await cur.execute(sql, user_ids)
return await cur.fetchall()
async def batch_insert_users(pool, users_data):
"""异步批量插入用户"""
async with pool.acquire() as conn:
async with conn.cursor() as cur:
sql = "INSERT INTO users (name, email) VALUES (%s, %s)"
await cur.executemany(sql, users_data)
await conn.commit()
return cur.rowcount
async def main():
pool = await aiomysql.create_pool(
host='localhost', port=3306, user='root',
password='password', db='test', minsize=5, maxsize=20
)
try:
# 批量查询
users = await fetch_users(pool, [1, 2, 3, 4, 5])
print(f"查到 {len(users)} 个用户")
# 批量插入
new_users = [('张三', 'zhangsan@example.com'), ('李四', 'lisi@example.com')]
count = await batch_insert_users(pool, new_users)
print(f"插入 {count} 条记录")
finally:
pool.close()
await pool.wait_closed()
asyncio.run(main())
五、异步任务队列
处理需要排队的任务:
import asyncio
import queue
from threading import Thread
class AsyncTaskQueue:
"""异步任务队列"""
def __init__(self, max_size=100):
self.queue = asyncio.Queue(maxsize=max_size)
self.results = {}
self.task_id = 0
async def add_task(self, coro):
"""添加任务"""
task_id = self.task_id
self.task_id += 1
async def wrapped():
try:
result = await coro
self.results[task_id] = {'status': 'success', 'result': result}
except Exception as e:
self.results[task_id] = {'status': 'error', 'error': str(e)}
await self.queue.put(wrapped)
return task_id
async def process_all(self, workers=5):
"""处理所有任务"""
async def worker():
while True:
try:
task = self.queue.get_nowait()
except asyncio.QueueEmpty:
break
await task()
self.queue.task_done()
workers = [asyncio.create_task(worker()) for _ in range(workers)]
await asyncio.gather(*workers)
await self.queue.join()
return self.results
六、错误处理与重试
异步任务也要有容错机制:
import asyncio
from tenacity import retry, stop_after_attempt, wait_exponential
async def fetch_with_retry(url, max_retries=3):
"""带重试的异步获取"""
for attempt in range(max_retries):
try:
async with aiohttp.ClientSession() as session:
async with session.get(url, timeout=10) as response:
if response.status == 200:
return await response.json()
else:
raise ValueError(f"HTTP {response.status}")
except Exception as e:
if attempt == max_retries - 1:
raise
await asyncio.sleep(2 ** attempt) # 指数退避
print(f"重试 {url}, 第 {attempt + 1} 次")
async def gather_with_errors(*tasks):
"""收集所有任务结果,包括错误"""
results = await asyncio.gather(*tasks, return_exceptions=True)
successes = [r for r in results if not isinstance(r, Exception)]
errors = [r for r in results if isinstance(r, Exception)]
if errors:
print(f"失败 {len(errors)} 个任务")
for e in errors[:3]: # 最多打印3个错误
print(f" 错误: {e}")
return successes
七、性能对比
异步 vs 同步的性能差距有多大?
import time
# 同步方式:10个任务每个1秒 = 总共10秒
def sync_batch_download(urls):
time.sleep(10) # 假设每个请求1秒,10个就是10秒
return len(urls)
# 异步方式:10个任务并发 = 约1秒
async def async_batch_download(urls):
# 实际场景中,并发处理10个请求约需1秒
await asyncio.sleep(1)
return len(urls)
print(f"同步耗时: {time.time():.2f}s")
# asyncio.run(async_batch_download(urls))
# 异步耗时: ~1s
总结
异步处理的关键点:
- 适合场景:IO密集型任务(网络、文件、数据库)
- 核心语法:async/await + asyncio.gather
- 并发控制:Semaphore限制同时运行的任务数
- 错误处理:return_exceptions捕获所有错误
- 配合库:aiohttp、aiofiles、aiomysql等
对于自动化脚本中的批量网络请求、文件处理等场景,异步处理能带来5-10倍的效率提升,值得学习和使用。
更多推荐
所有评论(0)