python 如果要使用协程写异步程序,尽量别调用阻塞式API
遗憾的是python内置的文件操作和requests库的API都是阻塞式的
我写了一个真正的异步爬虫
网络部分使用第三方库 aiohttp
文件操作部分是是我自己想的,实现思路是:把文件操作部分放到另一个线程中,通过信号量通信。不理解可以看操作系统相关教材。

(2019/6/7)
似乎这个asyncFile的实现有问题,每次读写操作都会新建一个 ThreadPoolExecutor
所以不可靠
难道是 .run_in_executor 的问题?

asyncFile.py

import asyncio 
import io

class AsyncFile:
    class _ReadContent:
        '''缓存读取的数据
        read 在子线程中进行
        用 _ReadContent().content 存储返回值
        '''
        def __init__(self,content=None):
            self.content=content

    def __init__(self,path:str,open_flag:str="r",executor=None):
        # 路径
        self.path=path
        # 文件打开标记
        self.open_flag=open_flag
        # 文件
        self._f=open(path,open_flag) 
        # 当前 event_loop
        self._loop=asyncio.get_event_loop()
        # 读写锁,同一时间最多只能有1个读者或者写者
        self._rw_lock=asyncio.Lock()
        # concurrent.futures 的 ThreadPoolExecutor 或者 ProcessPoolExecutor
        # 不过我不确定用 ProcessPoolExecutor 有没有用
        # 默认值为None,之后使用的就是loop的默认executor 
        self._executor=executor

    def _read(self,r_content:_ReadContent,over_semaphore:asyncio.Semaphore):
        # 读操作(阻塞)
        r_content.content=self._f.read()
        # 让父协程从等待队列中唤醒
        over_semaphore.release()
        
    def _write(self,content,over_semaphore:asyncio.Semaphore):
        # 写操作(阻塞)
        self._f.write(content)
        # 让父协程从等待队列中唤醒
        over_semaphore.release()

    async def read(self):
        if not self._f.readable():
            raise io.UnsupportedOperation() 
        async with self._rw_lock:
            # ===============================================
            # over_semaphore 信号量表示了操作是否结束
            over_semaphore=asyncio.Semaphore(0)
            _read_content=self._ReadContent()
            self._loop.run_in_executor(self._executor\
                ,self._read,_read_content,over_semaphore)
            # over_semaphore<=0 时阻塞,被子线程release后才能继续进行
            await over_semaphore.acquire()
            # ===============================================
            return _read_content.content

    async def write(self,content):
        if not self._f.writable():
            raise io.UnsupportedOperation()
        async with self._rw_lock:
            # ===============================================
            # 原理同读方法
            over_semaphore=asyncio.Semaphore(0)
            self._loop.run_in_executor(self._executor\
                ,self._write,content,over_semaphore)
            await over_semaphore.acquire()
            # ===============================================

    async def seek(self,offset,where=0):
        async with self._rw_lock:
            self._f.seek(offset,where)

    async def close(self):
        async with self._rw_lock:
            self._f.close()
    
    def __enter__(self):
        return self 

    def __exit__(self,exc_type,exc_val,traceback):
        try:
            self._f.close()
        finally:
            pass

asyncCrawler.py

from asyncFile import AsyncFile
import asyncio
import aiohttp
import time 

async def get(session,url,timeout=60):
    async with session.request('GET',url,timeout=timeout) as resp:
        return await resp.read()

async def crawl(url,save_path,executor=None):
    async with aiohttp.ClientSession() as session:
        content=await get(session,url)
    if content:
        with AsyncFile(save_path,"wb",executor) as f:
            await f.write(content)

if __name__=="__main__":
    import os 
    from concurrent.futures import ThreadPoolExecutor
    d="./_test_imgs/"
    if not os.path.exists(d):
        os.makedirs(d)
    url="https://timgsa.baidu.com/timg?image&quality=80&size=b9999_10000&sec=1551437241785&di=a827c7962549b54e2d4a84327902bf54&imgtype=0&src=http%3A%2F%2Fwww.baijingapp.com%2Fuploads%2Fcompany%2F03%2F36361%2F20170413%2F1492072091_pic_real.jpg"
    save_path=os.path.join(d,"tmp{}.jpg")
    executor=ThreadPoolExecutor(max_workers=8)
    tasks=[]
    for i in range(20):
        tasks.append(crawl(url,save_path.format(i),executor))
    loop=asyncio.get_event_loop()
    loop.run_until_complete(asyncio.wait(tasks))
    print("over")
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐