Python 协程与异步编程:从入门到精通
Python 协程与异步编程:从入门到精通
作为一名从Python转向Rust的后端开发者,我深刻体会到Python异步编程的强大和灵活。Python的协程和异步编程不仅可以提高程序的性能,还可以使代码更加简洁、优雅,这让我在编写高并发服务时更加自信。今天,我想分享一下Python协程与异步编程的高级应用,希望能帮助大家更好地理解和使用这个强大的特性。
一、异步编程的基本概念
1. 什么是异步编程
异步编程是一种编程范式,它允许程序在等待某些操作(如I/O操作)完成时执行其他任务,从而提高程序的并发性能。
2. 协程的基本概念
协程是一种可以暂停执行并在稍后恢复的函数,它是实现异步编程的基础。
import asyncio
async def hello():
print("Hello")
await asyncio.sleep(1)
print("World")
async def main():
await hello()
asyncio.run(main())
# 输出:
# Hello
# (等待1秒)
# World
二、高级应用技巧
1. 并发执行多个协程
我们可以使用asyncio.gather来并发执行多个协程。
import asyncio
import time
async def task(name, duration):
print(f"Task {name} started")
await asyncio.sleep(duration)
print(f"Task {name} finished")
return f"Task {name} result"
async def main():
start = time.time()
# 并发执行多个协程
results = await asyncio.gather(
task("A", 2),
task("B", 1),
task("C", 3)
)
end = time.time()
print(f"Total time: {end - start} seconds")
print(f"Results: {results}")
asyncio.run(main())
# 输出:
# Task A started
# Task B started
# Task C started
# Task B finished
# Task A finished
# Task C finished
# Total time: 3.001234 seconds
# Results: ['Task A result', 'Task B result', 'Task C result']
2. 使用 async with 和 async for
我们可以使用async with和async for来处理异步上下文管理器和异步迭代器。
import asyncio
class AsyncContextManager:
async def __aenter__(self):
print("Entering context")
await asyncio.sleep(0.5)
return self
async def __aexit__(self, exc_type, exc_val, exc_tb):
print("Exiting context")
await asyncio.sleep(0.5)
class AsyncIterator:
def __init__(self, count):
self.count = count
self.current = 0
def __aiter__(self):
return self
async def __anext__(self):
if self.current < self.count:
value = self.current
self.current += 1
await asyncio.sleep(0.5)
return value
else:
raise StopAsyncIteration
async def main():
# 使用 async with
async with AsyncContextManager():
print("Inside context")
# 使用 async for
print("Iterating:")
async for value in AsyncIterator(3):
print(f"Value: {value}")
asyncio.run(main())
# 输出:
# Entering context
# Inside context
# Exiting context
# Iterating:
# Value: 0
# Value: 1
# Value: 2
3. 任务管理
我们可以使用asyncio.create_task来创建任务,并使用Task对象来管理任务的执行。
import asyncio
async def task(name, duration):
print(f"Task {name} started")
await asyncio.sleep(duration)
print(f"Task {name} finished")
return f"Task {name} result"
async def main():
# 创建任务
task1 = asyncio.create_task(task("A", 2))
task2 = asyncio.create_task(task("B", 1))
# 等待任务完成
await task1
await task2
# 获取任务结果
print(f"Task1 result: {task1.result()}")
print(f"Task2 result: {task2.result()}")
asyncio.run(main())
# 输出:
# Task A started
# Task B started
# Task B finished
# Task A finished
# Task1 result: Task A result
# Task2 result: Task B result
三、实用示例
1. 异步HTTP客户端
我们可以使用aiohttp库来创建异步HTTP客户端,并发发送多个HTTP请求。
import asyncio
import aiohttp
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
async def main():
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.github.com"
]
async with aiohttp.ClientSession() as session:
tasks = [fetch(url, session) for url in urls]
results = await asyncio.gather(*tasks)
for url, content in zip(urls, results):
print(f"URL: {url}, Content length: {len(content)}")
asyncio.run(main())
2. 异步文件I/O
我们可以使用aiofiles库来进行异步文件I/O操作。
import asyncio
import aiofiles
async def read_file(filename):
async with aiofiles.open(filename, 'r') as f:
content = await f.read()
return content
async def write_file(filename, content):
async with aiofiles.open(filename, 'w') as f:
await f.write(content)
async def main():
# 读取文件
content = await read_file("input.txt")
print(f"Read content: {content}")
# 写入文件
await write_file("output.txt", content)
print("File written")
asyncio.run(main())
3. 异步数据库操作
我们可以使用asyncpg库来进行异步数据库操作。
import asyncio
import asyncpg
async def main():
# 连接数据库
conn = await asyncpg.connect(
host="localhost",
port=5432,
user="postgres",
password="password",
database="test"
)
# 创建表
await conn.execute('''
CREATE TABLE IF NOT EXISTS users (
id SERIAL PRIMARY KEY,
name TEXT,
email TEXT
)
''')
# 插入数据
await conn.execute(
"INSERT INTO users (name, email) VALUES ($1, $2)",
"Alice", "alice@example.com"
)
# 查询数据
rows = await conn.fetch("SELECT * FROM users")
for row in rows:
print(f"ID: {row['id']}, Name: {row['name']}, Email: {row['email']}")
# 关闭连接
await conn.close()
asyncio.run(main())
四、高级异步编程技术
1. 信号量
我们可以使用asyncio.Semaphore来限制并发数。
import asyncio
async def task(name, duration, semaphore):
async with semaphore:
print(f"Task {name} started")
await asyncio.sleep(duration)
print(f"Task {name} finished")
async def main():
# 创建信号量,限制最多2个并发任务
semaphore = asyncio.Semaphore(2)
tasks = [
task("A", 2, semaphore),
task("B", 1, semaphore),
task("C", 3, semaphore),
task("D", 1, semaphore)
]
await asyncio.gather(*tasks)
asyncio.run(main())
# 输出:
# Task A started
# Task B started
# Task B finished
# Task C started
# Task A finished
# Task D started
# Task C finished
# Task D finished
2. 事件
我们可以使用asyncio.Event来实现任务间的通信。
import asyncio
async def waiter(name, event):
print(f"Waiter {name} waiting")
await event.wait()
print(f"Waiter {name} received event")
async def main():
event = asyncio.Event()
# 创建多个等待者
waiters = [waiter(f"{i}", event) for i in range(3)]
# 启动等待者
await asyncio.gather(*waiters, return_exceptions=True)
# 触发事件
print("Setting event")
event.set()
# 等待所有等待者完成
await asyncio.gather(*waiters)
asyncio.run(main())
# 输出:
# Waiter 0 waiting
# Waiter 1 waiting
# Waiter 2 waiting
# Setting event
# Waiter 0 received event
# Waiter 1 received event
# Waiter 2 received event
3. 队列
我们可以使用asyncio.Queue来实现任务间的消息传递。
import asyncio
async def producer(queue):
for i in range(5):
await asyncio.sleep(0.5)
item = f"Item {i}"
await queue.put(item)
print(f"Produced: {item}")
await queue.put(None) # 发送结束信号
async def consumer(queue):
while True:
item = await queue.get()
if item is None:
break
await asyncio.sleep(1)
print(f"Consumed: {item}")
queue.task_done()
async def main():
queue = asyncio.Queue(maxsize=2)
# 启动生产者和消费者
producer_task = asyncio.create_task(producer(queue))
consumer_task = asyncio.create_task(consumer(queue))
# 等待生产者完成
await producer_task
# 等待消费者处理完所有项目
await queue.join()
# 取消消费者任务
consumer_task.cancel()
asyncio.run(main())
# 输出:
# Produced: Item 0
# Produced: Item 1
# Consumed: Item 0
# Produced: Item 2
# Consumed: Item 1
# Produced: Item 3
# Consumed: Item 2
# Produced: Item 4
# Consumed: Item 3
# Consumed: Item 4
五、实战应用
1. 异步Web服务器
我们可以使用FastAPI或aiohttp来创建异步Web服务器。
from fastapi import FastAPI
import asyncio
app = FastAPI()
async def heavy_task():
await asyncio.sleep(2)
return "Task completed"
@app.get("/")
async def root():
result = await heavy_task()
return {"message": result}
@app.get("/items/{item_id}")
async def read_item(item_id: int, q: str = None):
return {"item_id": item_id, "q": q}
# 运行服务器:
# uvicorn main:app --reload
2. 异步爬虫
我们可以使用aiohttp来创建异步爬虫,并发爬取多个网页。
import asyncio
import aiohttp
from bs4 import BeautifulSoup
async def fetch(url, session):
async with session.get(url) as response:
return await response.text()
async def parse(url, session):
html = await fetch(url, session)
soup = BeautifulSoup(html, "html.parser")
title = soup.title.string if soup.title else "No title"
return (url, title)
async def main():
urls = [
"https://www.example.com",
"https://www.google.com",
"https://www.github.com",
"https://www.python.org",
"https://www.rust-lang.org"
]
async with aiohttp.ClientSession() as session:
tasks = [parse(url, session) for url in urls]
results = await asyncio.gather(*tasks)
for url, title in results:
print(f"URL: {url}, Title: {title}")
asyncio.run(main())
3. 异步微服务
我们可以使用asyncio和gRPC来创建异步微服务。
# 服务端
import asyncio
import grpc
from protos import service_pb2
from protos import service_pb2_grpc
class Service(service_pb2_grpc.ServiceServicer):
async def SayHello(self, request, context):
await asyncio.sleep(1)
return service_pb2.HelloResponse(message=f"Hello, {request.name}!")
async def serve():
server = grpc.aio.server()
service_pb2_grpc.add_ServiceServicer_to_server(Service(), server)
server.add_insecure_port('[::]:50051')
await server.start()
await server.wait_for_termination()
if __name__ == '__main__':
asyncio.run(serve())
# 客户端
import asyncio
import grpc
from protos import service_pb2
from protos import service_pb2_grpc
async def run():
async with grpc.aio.insecure_channel('localhost:50051') as channel:
stub = service_pb2_grpc.ServiceStub(channel)
response = await stub.SayHello(service_pb2.HelloRequest(name='Alice'))
print(f"Response: {response.message}")
if __name__ == '__main__':
asyncio.run(run())
六、总结
Python的协程与异步编程是一个非常强大的特性,它可以帮助我们编写高性能、高并发的程序。通过掌握并发执行多个协程、使用async with和async for、任务管理等高级技巧,我们可以编写更加高效、可维护的异步代码。
作为一名从Python转向Rust的开发者,我发现Python的异步编程与Rust的异步编程有一些相似之处,它们都使用协程来实现异步操作。但Python的异步编程更加简洁、易用,而Rust的异步编程更加类型安全、性能更高。这两种风格各有优缺点,我们可以根据具体的场景选择合适的语言和技术。
希望这篇文章能对你有所帮助,如果你有任何问题或建议,欢迎在评论区留言。
更多推荐
所有评论(0)