Python Spider——Pyppeteer

爬虫具有时效性,该文产生于2023年末

一、爬虫的两种方式

爬虫大致可以分为两类方式:

  • 直接请求

    直接请求的方式一般是使用python的HTTP请求库发起HTTP请求,然后接收返回的数据再进行解析,这种方式存在很大的局限性。当所爬取的页面是动态加载的页面类型时(Ajax等),往往直接获取的请求数据并不是完整的页面数据,并不能有效的提取需要的数据

  • 模拟用户

    而模拟用户的方式则可以很好的解决直接请求的难题,顾名思义,就是模拟用户使用浏览器的方式进行页面数据的获取,一般来说获取到的页面数据都是完全渲染之后的结果,可以看到的数据都可以进行内容提取(若页面加载不完全,也可以使用某些方式使得页面完全加载),也是本文章的重点

当然两种方式目前都不是非常完美的解决方案,不同的网站适用于不同的方式,而且也并非百分百成功,成功条件受限于网站的复杂程度,反爬虫严格程度等

二、Pyppeteer

目前python中存在两个主要用于用户模拟的库:

  • Selenium
  • Pyppeteer

Selenium 是一个用于自动化浏览器操作的工具,常用于模拟用户的真实操作,比如点击、填写表单等。它支持多种浏览器,如 Chrome、Firefox、Safari 等,可以通过驱动程序来控制这些浏览器。不过Selenium在配置的时候往往需要下载对应的浏览器驱动程序,不过在对于多浏览器的支持和泛用性上,会更胜一筹。

Pyppeteer是一个使用Python控制无头和有界面浏览器的库,可用于模拟用户行为、执行JavaScript代码等,适用于网页自动化测试、爬虫和数据挖掘等任务。Pyppeteer相比于Selenium最大的优势是可以实现无头浏览器,它可以在不开启浏览器页面的情况下模拟浏览器进行请求的发送和接收,并且支持原生的Javascript,可以实现更多样化的页面操作

本节将使用Pyppeteer进行Python爬虫的实现

三、爬虫实现

  1. 首先需要明确需要爬取的页面内容和结构

    • 通过分析淘宝搜索之后的页面结构,我们发现该页面较适合使用Pyppeteer进行模拟爬取
  2. 接下来开始逐步编写爬虫代码(由于淘宝的反爬虫非常厉害,所以该程序具有时效性,请针对实际情况进行适当的修改)

    • 首先需要引入pyppeteer模块和asyncio模块

      import asyncio
      from pyppeteer import launch
      
      # 设定一个爬取数值,表示需要爬取的条目数量
      expItemNum = 60
      # 设定淘宝搜索页面的url为起始页面
      url = "商品搜索页面URL"
      # 为了防止反爬虫立刻就发现我们,这里务必要设置用户代理
      user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36 Edg/120.0.0.0"
      
      # 创建一个函数,同时设定几个基本参数
      async def taobaoSearchItem(
          expItemNum, url, user_agent, wait_sec=1, headless_=False, width=1366, height=768
      ):
          """
          expItemNum: 需要的数据条目数(程序在多少数据量时会停止)\n
          url:        淘宝商品页面的url(已经搜索后打开页面的网址)\n
          user_agent: 请求头用户代理\n
          wait_sec:   等待页面加载的事件(Default: 1),时间粒度为秒\n
          headless_:  无头模式,(Default: False, 即显示浏览器界面, 无头有BUG)\n
          width:      浏览器窗口大小(default: 1366)\n
          height:     浏览器窗口大小(default: 768)\n
          """
          # 用于统计总爬取条目数,判断何时停止爬虫程序
          total_num = 0
          # 设定一个变量,用于记录爬取页面数量
          page_num = 1
          # 设定一个列表,列表存放最终爬取的数据,并最后写入文件
          data_list = list()
          
      
    • 然后,我们首先调用launch启动浏览器核心,此时需要有一些参数

      async def taobaoSearchItem(
          expItemNum, url, user_agent, wait_sec=1, headless_=False, width=1366, height=768
      ):
          total_num = 0
          page_num = 1
          data_list = list()
          # 通过异步的方式启动浏览器,并返回一个浏览器对象,便于后续的操作
          # 其中headless用于规定是否为无头模式,该函数参数中默认为有头模式(无头有Bug)
          # dumpio 使得当浏览器打开很多页面的时候防止浏览器自身卡顿
          # args中有一段字符串参数,参数主要规定了窗口的大小,用户代理信息和启用JS,这样可以保证我们的页面正常加载
          browser = await launch(
              headless=headless_,
              dumpio=True,
              args=[
                  f"--window-size={width},{height}, --user-agent={user_agent}, --enable-javascript"
              ]
          )
          # 此时等待获取一个浏览器的新页面实例
          page = await browser.newPage()
          # await page.setCookie(cookie) 本来可以通过设置cookie防止反爬虫程序,但是cookie使用时有些问题,直接导致我使用的浏览器被淘宝标记为爬虫程序,导致一直滑块验证
          # 这里还可以通过这个方式设定页面的大小,防止页面内容不加载
          await page.setViewport({"width": width, "height": height})
          # 这里使用pyppeteer内置的js解释器,在页面中执行js代码,设定浏览器驱动模式,防止反爬虫程序检测到我们的异常状态
          await page.evaluate(
              "() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }"
          )
          # 最后打开页面url,进入淘宝搜索页
          await page.goto(url)
      
    • 在完成浏览器新建和页面打开之后,我们进入了淘宝页面,此时我们将对于页面内容进行筛选

      我们选用pyquery进行html内容的获取,这个模块其实是JQuery的python封装,通过python的方式使用JQuery,可以高效的获取和操作页面DOM

      # 下一段代码会用到一个自定义函数,用户获取页面指定标签的html数据
      def re_match(html, s_str, type_str):
          type_f = f"[{type_str}]"
          return pq(html)(type_f).filter(lambda i, elem: s_str in pq(elem).attr(type_str))
      
      # 这是一个用于判断是否为字符串的数据,由于pyquery返回的数据可能为非字符串或者非计算类型数据,可能导致程序异常,故再次进行判断转换,防止程序异常导致程序终止
      def if_str(input):
          if type(input) == str:
              return input
          else:
              return str(input)
      
      # 这里我们继续在函数体内创建一个while循环,当获取的数据条目总数大于需求总数时停止爬取
      while total_num < expItemNum:
          	# 每次循环开始,使用asyncio.sleep()等待一段时间,保证页面加载完成,防止获取到空数据
              await asyncio.sleep(wait_sec)
              # 接下来是一段滑动页面的代码,目的是通过滑动至页面底部,以保证该页面的所有数据全部加载,防止获取到空数据,造成数据缺失
              # Scrolling code
              # 这里首先通过执行js代码,获取当前文档的页面高度
              total_height = await page.evaluate(
                  """async () => {
                      return Promise.resolve(document.documentElement.scrollHeight);
                      }
                  """
              )
              # 将页面高度等分为五分,我们将分五次滚动页面
              scroll_distance = total_height // 5
              # 这里使用循环,进行五次滚动,最终将滑动至页面底部
              for i in range(5):
                  scroll_by = (i + 1) * scroll_distance
                  # 这里使用await等待页面滚动完毕
                  await page.evaluate(f"var q=document.documentElement.scrollTop={scroll_by}")
                  # 这里留有一定的滑动间隔,防止操作过快
                  await asyncio.sleep(0.2)
      		# 此时的页面已经完全加载,所有的js实时渲染应当已经完成,若未完成页面加载,请适当调整页面的加载时长
              # 此时通过调用自定义函数re_match(html, s_str, type_str)获取该页面中所有的商品卡片,存入item_card列表中
              item_card = re_match(await page.content(), "Card--doubleCardWrapper--", "class")
              # 这里及时的统计当前页面获取到的条目数量,防止死循环
              total_num += item_card.length
              # 输出当前页面获取到的数据条数
              print(f"Data entries found in Page {page_num}: " + if_str(item_card.length))
              # 页面数自增1
              page_num += 1
              # 通过便利item_card中的内容,获取商品卡片中我们所需要的信息
              for item in item_card:
                  data = dict()
                  # 通过re_match获取商品名称,并存入data字典
                  data["title"] = re_match(item, "Title--title--", "class").text()
                  # 通过re_match获取商品图片url,并存入data字典
                  data["pic_url"] = if_str(
                      re_match(item, "MainPic--mainPic--", "class").attr("src")
                  )
                  # 通过re_match获取商品价格,并存入data字典
                  data["price"] = float(
                      re_match(item, "Price--priceInt--", "class").text()
                      + re_match(item, "Price--priceFloat--", "class").text()
                  )
                  # 通过re_match获取商品发货地,并存入data字典
                  data["procity"] = re_match(item, "Price--procity--", "class").text()
                  # 通过re_match获取商家名称,并存入data字典
                  data["shopName"] = re_match(item, "ShopInfo--shopName--", "class").text()
                  # 最后将data字典加入data_list列表中
                  data_list.append(data)
      		# 当获取完当前页面全部数据后,通过js点击页面底部的"返回顶部"按钮,然后再点击页面中的"下一页"按钮进行页面跳转
              # Back Top & Next Page
              await page.evaluate(
                  f"""
                  document.querySelector(".back-btn").click();
                  document.querySelectorAll('.next-next')[0].click();
              """
              )
              # 当获取条目数不满足时,继续循环,直至满足设定数量
      
    • 获取完全部数据之后,将获取到的列表数据写入数据库中

      我们使用到的是sqlite3数据库,该数据库具有免安装,轻量,方便移动等特点,适合小型程序的及时数据存储

      async def taobaoSearchItem(...):
          """
          此处省略前面已有的代码
          """
          # 将获取到的数据列表写入数据库,此时用到了自定义函数database_io()
          for e in data_list:
              database_io("good.db", e)
          # 最后关闭浏览器文件
          await browser.close()
      

      以下是两个用于数据库写入的自定义函数

      # 该函数用于按照指定格式读取data_list中data的数据内容,然后将其加入到sql语句中,并返回sql语句
      def insert_record(data):
          sql_insert_record = f"INSERT INTO taobao VALUES ('{data['title']}','{data['pic_url']}',{data['price']},'{data['procity']}','{data['shopName']}');"
          return sql_insert_record
      
      # 该函数主要用于读写数据库
      # Database IO Module
      # 这里参数包括数据库名称和需要写入的单挑数据(即单个data字典)
      def database_io(DBName, data):
          # 创建一个数据库连接
          conn = sqlite3.connect(DBName)
          # 创建一个指针变量,用于执行插入数据语句
          cur = conn.cursor()
          # 这里若当前不存在名为taobao的数据表,则进行创建
          sql_create_table = "create table if not exists taobao( title text, pic_url text , price real , procity text , shopName text )"
          # 执行sql语句
          conn.execute(sql_create_table)
          # 这里调用自定义函数获取拼接好的sql字段插入语句
          sql_insert = insert_record(data)
          # 调用cur指针变量执行插入语句
          cur.execute(sql_insert)
          # 需要使用conn.commit()提交语句的执行结果
          conn.commit()
          # 这里先关闭cur再关闭conn连接
          cur.close()
          conn.close()
      
    • 最后使用asyncio异步执行爬虫函数

      asyncio.get_event_loop().run_until_complete(
          # 这里填入先前预设好的查询数量和url以及用户代理,后面的参数可以根据需要进行修改,否则使用默认参数
          taobaoSearchItem(expItemNum, url, user_agent)
      )
      
    • 最后是爬取结果展示

      查询结果

  3. 最后是完整代码展示

    import asyncio
    import sqlite3
    from pyppeteer import launch
    from pyquery import PyQuery as pq
    
    expItemNum = 60
    url = "商品搜索页面URL"
    user_agent = "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/71.0.3578.98 Safari/537.36 Edg/120.0.0.0"
    
    
    async def taobaoSearchItem(
        expItemNum, url, user_agent, wait_sec=1, headless_=False, width=1366, height=768
    ):
        """
        expItemNum: 需要的数据条目数(程序在多少数据量时会停止)\n
        url:        淘宝商品页面的url(已经搜索后打开页面的网址)\n
        user_agent: 请求头用户代理\n
        wait_sec:   等待页面加载的事件(Default: 1),时间粒度为秒\n
        headless_:  无头模式,(Default: False, 即显示浏览器界面, 无头有BUG)\n
        width:      浏览器窗口大小(default: 1366)\n
        height:     浏览器窗口大小(default: 768)\n
        """
        total_num = 0
        page_num = 1
        data_list = list()
        # Browser Setting
        browser = await launch(
            headless=headless_,
            dumpio=True,
            args=[
                f"--window-size={width},{height}, --user-agent={user_agent}, --enable-javascript"
            ]
        )
        page = await browser.newPage()
        # await page.setCookie(cookie)
        await page.setViewport({"width": width, "height": height})
        await page.evaluate(
            "() =>{ Object.defineProperties(navigator,{ webdriver:{ get: () => false } }) }"
        )
        # Open the url page
        await page.goto(url)
    
        while total_num < expItemNum:
            await asyncio.sleep(wait_sec)
            # Scrolling code
            # Get Height
            total_height = await page.evaluate(
                """async () => {
                    return Promise.resolve(document.documentElement.scrollHeight);
                    }
                """
            )
            scroll_distance = total_height // 5
            # Multiple scrolling
            for i in range(5):
                scroll_by = (i + 1) * scroll_distance
                # Scrolling
                await page.evaluate(f"var q=document.documentElement.scrollTop={scroll_by}")
                await asyncio.sleep(0.2)
    
            item_card = re_match(await page.content(), "Card--doubleCardWrapper--", "class")
            total_num += item_card.length
            print(f"Data entries found in Page {page_num}: " + if_str(item_card.length))
            page_num += 1
            for item in item_card:
                data = dict()
                data["title"] = re_match(item, "Title--title--", "class").text()
                data["pic_url"] = if_str(
                    re_match(item, "MainPic--mainPic--", "class").attr("src")
                )
                data["price"] = float(
                    re_match(item, "Price--priceInt--", "class").text()
                    + re_match(item, "Price--priceFloat--", "class").text()
                )
                data["procity"] = re_match(item, "Price--procity--", "class").text()
                data["shopName"] = re_match(item, "ShopInfo--shopName--", "class").text()
                data_list.append(data)
    
            # Back Top & Next Page
            await page.evaluate(
                f"""
                document.querySelector(".back-btn").click();
                document.querySelectorAll('.next-next')[0].click();
            """
            )
    
        # Write to the file
        for e in data_list:
            database_io("good.db", e)
        # close the broser
        await browser.close()
    
    
    def re_match(html, s_str, type_str):
        type_f = f"[{type_str}]"
        return pq(html)(type_f).filter(lambda i, elem: s_str in pq(elem).attr(type_str))
    
    def if_str(input):
        if type(input) == str:
            return input
        else:
            return str(input)
    
    def insert_record(data):
        sql_insert_record = f"INSERT INTO taobao VALUES ('{data['title']}','{data['pic_url']}',{data['price']},'{data['procity']}','{data['shopName']}');"
        return sql_insert_record
    
    # Database IO Module
    def database_io(DBName, data):
        conn = sqlite3.connect(DBName)
        cur = conn.cursor()
        sql_create_table = "create table if not exists taobao( title text, pic_url text , price real , procity text , shopName text )"
        conn.execute(sql_create_table)
        sql_insert = insert_record(data)
        cur.execute(sql_insert)
        conn.commit()
        cur.close()
        conn.close()
    
    def readStrFromFile(file):
        with open(file, "r", encoding="utf-8") as f:
            str_read = f.read()
        return str_read
    
    
    asyncio.get_event_loop().run_until_complete(
        taobaoSearchItem(expItemNum, url, user_agent)
    )
    
    
Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐