Python 中的异步编程
·
在 Python 中,async 和 await 是用于实现异步编程(Asynchronous Programming)的核心关键字。它们允许你编写并发代码,特别适用于 I/O 密集型任务(如网络请求、文件读写、数据库操作),能够显著提高程序的执行效率,而无需使用复杂的多线程或多进程。
以下是关于 Python async 的详细解析:
1. 核心概念
async def:用于定义一个协程函数(Coroutine Function)。当调用这个函数时,它不会立即执行,而是返回一个协程对象(Coroutine Object)**。
await:用于在协程内部暂停当前任务的执行,等待另一个可等待对象(如另一个协程、Task 或 Future)完成。在等待期间,控制权会交还给事件循环(Event Loop),使其可以运行其他任务。
事件循环(Event Loop):异步编程的核心引擎。它负责调度和执行所有的协程任务。当一个任务遇到 await 并处于等待状态时,事件循环会切换到其他就绪的任务去执行,从而实现“并发”。
2. 基本语法与示例
定义和运行一个简单的协程
import asyncio
# 1. 使用 async def 定义协程函数
async def say_hello(delay, name):
print(f"Start: Hello {name}")
# 2. 使用 await 暂停执行,模拟 I/O 操作
# asyncio.sleep 是非阻塞的,不会卡住整个程序
await asyncio.sleep(delay)
print(f"End: Hello {name}")
return f"Greeting for {name}"
# 3. 主入口函数
async def main():
print("Main started")
# 直接调用 say_hello 只会创建协程对象,不会执行
# 必须使用 await 或 create_task 来调度执行
result = await say_hello(1, "Alice")
print(result)
print("Main finished")
# 4. 运行最高层级的入口点
asyncio.run(main())
3. 并发执行任务
如果按顺序 await 多个协程,它们仍然是串行执行的。要实现真正的并发(同时运行),需要使用 asyncio.create_task() 或 asyncio.gather()。
方法一:使用 create_task
import asyncio
import time
async def task(name, delay):
print(f"{name} started")
await asyncio.sleep(delay)
print(f"{name} finished after {delay}s")
return name
async def main():
start_time = time.time()
# 创建两个任务,它们会几乎同时开始运行
task1 = asyncio.create_task(task("Task A", 2))
task2 = asyncio.create_task(task("Task B", 3))
# 等待两个任务都完成
await task1
await task2
end_time = time.time()
print(f"Total time: {end_time - start_time:.2f}s")
# 输出约为 3.00s,而不是 5.00s,证明是并发执行的
asyncio.run(main())
方法二:使用 asyncio.gather
gather 可以方便地并发运行多个协程并收集结果。
import asyncio
async def fetch_data(id, delay):
await asyncio.sleep(delay)
return f"Data {id}"
async def main():
# 并发执行三个任务
results = await asyncio.gather(
fetch_data(1, 1),
fetch_data(2, 2),
fetch_data(3, 1)
)
print(results) # ['Data 1', 'Data 2', 'Data 3']
asyncio.run(main())
一个完整的爬虫样例
import asyncio
from requests import get
from bs4 import BeautifulSoup
async def get_weather(city):
"parameter: city (sichuan/chengdu,beijing/haidian)"
myUrl="https://www.ip.cn/tianqi/"+city+"/15day.html"
myHeaders = {
'user-agent': 'Mozilla/5.0 (Macintosh; Intel Mac OS X 10_13_6) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/76.0.3809.132 Safari/537.36'}
print(myUrl)
response=get(url=myUrl,headers=myHeaders)
if (response.status_code==200):
print("顺利获取数据")
else:
print("目的网站无法连接")
print(response.status_code)
# return None
return ({city:[]})
response.encoding = 'utf8'
html=response.text
soup=BeautifulSoup(html,"html.parser")
# print(soup)
# 查找单个匹配的div元素
soup = soup.find_all('div', class_='table-inner')
# print(soup)
soup=BeautifulSoup(str(soup),"html.parser")
soupDate = soup.find_all('p',class_='date')
soupTemp = soup.find_all('p',class_='text-gray')
myDate=[]
for i in soupDate:
date=i.getText()
myDate.append(date[0:10]) # 只取日期 *
myTempL=[]
myTempH=[]
for i in soupTemp[0::4]: # 每四个元素一组,分别对应气温 *
t=i.getText()
t=t.replace('℃','') # 去掉℃
t=t.split('~') # 分割最低温度和最高温度
myTempL.append(int(t[0]))
myTempH.append(int(t[1]))
myinfor=list(zip(myDate,myTempL,myTempH)) #组合输出
return ({city:myinfor})
async def main():
tasks = [get_weather(f"sichuan/chengdu"),
get_weather(f"sichuan/zigong"),
get_weather(f"hainan/sanya"),
get_weather(f"beijing/haidian")]
results = await asyncio.gather(*tasks)
for res in results:
print(res)
asyncio.run(main())
更多推荐


所有评论(0)