㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这篇文章要做的事情很明确:用 Python、Requests、BeautifulSoup 和 SQLite,采集农业试验站公开名录中的站点名、试验方向、地区、主管单位、状态等字段,并把它整理成适合长期监控的“机构 / 站点 / 研究方向”数据模型。

读完本文,你可以获得:

  1. 一套可直接运行的公开名录采集项目骨架,不是只贴几行选择器的玩具脚本。
  2. 一个适合机构类、站点类、研究方向类信息沉淀的 SQLite 三表模型设计思路。
  3. 一套从请求、解析、清洗、去重、导出到后续监控的完整工程化做法。

这类项目不追求炫技,重点是稳定、可维护、能复跑。公开名录采集看起来简单,真正落地时麻烦通常不在“请求一个页面”,而在字段不齐、页面结构变化、重复数据、历史状态变化和后续自动化监控。

1️⃣ 摘要(Abstract)

本文将基于 Python 编写一个农业试验站公开名录监控爬虫,使用 requests 完成页面请求,使用 BeautifulSoup + lxml 完成 HTML 解析,最终将站点名、试验方向、地区、主管单位、状态等字段写入 SQLite,并导出 CSV 结果文件。

读完本文,你会掌握:

  1. 如何把“公开名录列表页 → 详情页 → 结构化字段”拆成稳定的采集流程。
  2. 如何设计“主管单位表、研究方向表、站点表”的三表模型。
  3. 如何处理常见的 403、429、空壳 HTML、乱码、字段缺失和重复抓取问题。

为了保证可复现,本文没有绑定某一个具体网站。真实项目中你可能面对的是农业科研机构官网、地方农业技术推广中心官网、试验站公示页面或公开名录平台。本文采用一个本地 demo 站点来模拟真实列表页和详情页,代码运行方式与真实 HTTP 页面一致,因此后续替换真实公开站点时,不需要推翻项目结构。

2️⃣ 背景与需求(Why)

农业试验站、科研基地、长期定位试验点、区域技术服务站等信息,往往分散在不同单位的公开网页里。单页查看没有问题,但如果我们想做持续分析和变化监控,人工维护 Excel 很快就会变得吃力。

比如,一个数据团队可能会关心这些问题:

  • 某个地区有哪些农业试验站?
  • 不同试验方向的站点分布是否均衡?
  • 某些站点的运行状态是否从“运行中”变为“维护中”?
  • 某个主管单位名下到底有哪些公开站点?
  • 后续是否可以把这些站点接入地图、检索系统或内部知识库?

这就是爬虫和结构化存储的价值:把散落在网页上的半结构化信息,变成可以查询、去重、比对和长期维护的数据资产。

2.1 为什么要爬

这里的“爬”不是为了做高并发抓取,也不是为了绕过限制获取非公开数据。它更接近于信息整理和自动化监控。

具体价值有三个:

第一,降低人工整理成本。
公开名录一旦有几十页、几百条,人工复制字段就很容易漏、错、重复。

第二,提高更新感知能力。
如果每周或每月定时采集一次,就可以判断某些站点是否新增、状态是否变化、主管单位是否调整。

第三,方便数据分析。
当站点、主管单位、研究方向分表以后,可以很自然地做地区统计、方向统计、机构关联分析。

2.2 目标字段

本文的目标字段如下:

字段 含义 示例
站点名 农业试验站或试验基地名称 华北旱作农业试验站
试验方向 主要研究、试验或示范方向 旱作节水与小麦玉米轮作
地区 所在省市或区域 河北省石家庄市
主管单位 管理、依托或主管机构 某农业科学院作物研究所
状态 当前公开展示的运行状态 运行中 / 维护中 / 暂停

真实网站中字段名称不一定完全一致。比如“试验方向”可能写成“研究方向”,“主管单位”可能写成“依托单位”,“状态”可能写成“运行状态”。所以解析层不能写死一种情况,需要做字段别名兼容。

2.3 数据模型目标

这篇文章的实战重点是“机构 / 站点 / 研究方向”三表模型。

起步模型如下:

  1. authorities:主管单位表。
  2. research_directions:研究方向表。
  3. stations:站点表。

站点表通过外键关联主管单位和研究方向。

如果一个站点未来对应多个研究方向,可以扩展为四表模型,即增加 station_direction_relations 关系表。但这篇文章先以“一站点对应一个主方向”的三表模型起步,便于落地。


3️⃣ 合规与注意事项(必写)

爬虫文章必须先把边界说清楚。技术本身没有问题,但采集方式、频率和目标范围要克制。

3.1 robots.txt 基本说明

在接入真实网站前,建议先访问目标站点根目录下的 robots.txt,例如:

https://example.org/robots.txt

robots.txt 是网站给自动化访问程序的爬取建议。它通常会说明哪些路径允许访问、哪些路径不建议或不允许自动化抓取。

需要注意:

  • robots.txt 不是登录授权,也不代表可以采集所有内容。
  • 即使某个路径没有被禁止,也要控制访问频率。
  • 如果网站明确禁止自动化访问某些目录,就不要抓取这些目录。

本文示例只采集公开 HTML 页面,不涉及登录态、验证码、付费墙或非公开接口。

3.2 频率控制

很多公开名录页面本身访问量并不大,没必要用攻击式并发。

建议:

  • 单线程起步。
  • 每次请求间隔 0.5 到 2 秒。
  • 遇到 429、503 等响应时暂停或退避。
  • 不对同一个站点进行短时间高频扫描。
  • 定时任务按天、周、月运行,而不是按秒运行。

本文代码默认每次请求间隔 0.6 秒,并且在失败时使用指数退避重试。

3.3 不采集敏感信息

本文目标字段只有站点名、试验方向、地区、主管单位、状态,均属于公开目录类字段。

不建议采集:

  • 个人身份证、手机号、邮箱等个人敏感信息。
  • 登录后才能看到的数据。
  • 明确需要付费或授权才能访问的数据。
  • 通过绕过验证码、破解接口参数、伪装登录态等方式获取的数据。

我们把项目定位为公开信息整理和监控,不做越界采集。

3.4 不绕过付费或登录限制

如果目标站点需要登录,或者明确有权限控制,本文方案不应该用于绕过限制。

中性的做法是:

  • 使用公开页面。
  • 使用公开接口。
  • 如需使用内部系统,获得授权后再接入。
  • 不破解、不撞库、不绕过验证码、不批量模拟异常行为。

爬虫能不能长期运行,不只取决于技术,更取决于访问方式是否稳定、合理、有边界。


4️⃣ 技术选型与整体流程(What / How)

4.1 静态、动态还是 API

常见名录页面大概有三种:

第一种是静态 HTML。
列表页直接返回完整 HTML,详情页也是 HTML。字段存在于表格、列表或段落中。

第二种是动态渲染页面。
浏览器能看到数据,但 requests.get() 拿到的是空壳,数据由 JavaScript 后续加载。

第三种是接口返回 JSON。
页面通过接口拿数据,接口返回结构化 JSON。

本文采用第一种:静态或半结构化 HTML 页面。

原因很简单:很多公开名录、公示页面、单位官网栏目仍然是普通 HTML。对于这种场景,requests + BeautifulSoup + lxml 足够稳定,维护成本也比 Playwright 低。

如果你拿到的页面是动态渲染,可以先在浏览器开发者工具里看 Network 面板,确认是否存在 JSON 接口。如果有接口,优先抓接口;如果没有稳定接口,再考虑 Playwright。

4.2 整体流程

本文项目流程如下:

采集 Fetch
  ↓
解析 Parse
  ↓
清洗 Normalize
  ↓
去重 Deduplicate
  ↓
存储 Storage
  ↓
导出 Export
  ↓
监控 Monitor

更细一点:

列表页请求
  ↓
提取详情页 URL
  ↓
详情页请求
  ↓
抽取:站点名、试验方向、地区、主管单位、状态
  ↓
字段清洗:去空格、字段别名统一、缺失值兜底
  ↓
计算 content_hash
  ↓
SQLite 三表入库
  ↓
判断新增 / 更新 / 未变化
  ↓
导出 CSV

4.3 为什么不用一上来就 Scrapy

Scrapy 很强,尤其适合大规模、多站点、复杂调度和中间件管理。但这篇文章的目标是建立一个干净、易读、可复现的项目模板。对于几十页、几百页的公开名录监控,requests + bs4 足够。

后续如果你要做多站点批量采集,再把 Fetcher、Parser、Storage 抽象迁移到 Scrapy 也不难。

4.4 为什么不用 Playwright

Playwright 适合处理强动态页面、前端渲染页面、需要浏览器环境的页面。但它的成本更高:

  • 运行更慢。
  • 环境更重。
  • 容器化部署更麻烦。
  • 对简单 HTML 页面属于“杀鸡用牛刀”。

所以本文先选择轻量路线。判断页面确实是动态渲染后,再升级到 Playwright。


5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

推荐使用:

Python 3.10+

我个人习惯用 Python 3.11 或 3.12。本文代码没有使用特别新的语法,Python 3.10 以上都比较稳。

5.2 创建虚拟环境

mkdir agri_station_monitor
cd agri_station_monitor

python -m venv .venv

# macOS / Linux
source .venv/bin/activate

# Windows PowerShell
.venv\Scripts\Activate.ps1

5.3 安装依赖

新建 requirements.txt

requests==2.32.3
beautifulsoup4==4.12.3
lxml==5.3.0
tenacity==9.0.0
python-dateutil==2.9.0.post0

安装:

pip install -r requirements.txt

这里几个依赖的作用:

依赖 作用
requests HTTP 请求
beautifulsoup4 HTML 解析
lxml 更快、更稳定的 HTML 解析器
tenacity 重试和退避
python-dateutil 时间处理扩展,本文保留为后续扩展

5.4 推荐项目结构

agri_station_monitor/
├── main.py
├── requirements.txt
├── agri_monitor/
│   ├── __init__.py
│   ├── config.py
│   ├── demo_site.py
│   ├── fetcher.py
│   ├── models.py
│   ├── parser.py
│   ├── storage.py
│   └── utils.py
└── data/
    ├── demo_site/
    │   ├── index.html
    │   ├── detail_001.html
    │   ├── detail_002.html
    │   ├── detail_003.html
    │   └── detail_004.html
    └── output/
        ├── agri_stations.sqlite
        └── agri_stations.csv

这个结构有几个好处:

  • 请求层、解析层、存储层分离。
  • 后续更换目标站点时,只需要改 Parser 或配置。
  • 数据输出统一放到 data/output/
  • demo 页面自动生成,不依赖外网。

6️⃣ 核心实现:请求层(Fetcher)

请求层的职责很单纯:给一个 URL,返回页面 HTML。

但真正写的时候,不能只写:

requests.get(url).text

这类写法用于临时验证可以,长期任务会遇到很多问题:

  • 没有 User-Agent,容易被拒绝。
  • 没有 timeout,网络卡住时程序一直挂着。
  • 没有 session,不能复用连接。
  • 没有重试,偶发 503 就失败。
  • 没有频率控制,容易触发限制。

6.1 Fetcher 设计重点

本文请求层包含以下设计:

  1. headers:设置 UA、Accept、Accept-Language、Referer。
  2. timeout:每次请求设置超时时间。
  3. session:复用 TCP 连接,也方便后续处理 cookie。
  4. retry:遇到 403、429、5xx 等状态时重试。
  5. sleep_seconds:请求间隔,避免短时间高频访问。

6.2 完整代码:agri_monitor/fetcher.py

from __future__ import annotations

import time
from typing import Optional

import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential


class FetchError(RuntimeError):
    pass


class Fetcher:
    def __init__(
        self,
        user_agent: str,
        timeout: int = 10,
        referer: Optional[str] = None,
        sleep_seconds: float = 0.6,
    ) -> None:
        self.timeout = timeout
        self.sleep_seconds = sleep_seconds
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": user_agent,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Connection": "keep-alive",
        })
        if referer:
            self.session.headers.update({"Referer": referer})

    @retry(
        retry=retry_if_exception_type((requests.RequestException, FetchError)),
        wait=wait_exponential(multiplier=1, min=1, max=8),
        stop=stop_after_attempt(3),
        reraise=True,
    )
    def get(self, url: str) -> str:
        time.sleep(self.sleep_seconds)
        response = self.session.get(url, timeout=self.timeout)

        if response.status_code in {403, 429, 500, 502, 503, 504}:
            raise FetchError(f"temporary or restricted status: {response.status_code}, url={url}")

        response.raise_for_status()

        # 一些老站点可能没有明确声明 UTF-8。
        # apparent_encoding 不完美,但比完全依赖响应头更稳。
        response.encoding = response.apparent_encoding or response.encoding
        return response.text

6.3 关于 headers

User-Agent 不建议伪装得太夸张。比如把自己伪装成最新版 Chrome,不一定更好。对于合规采集,更建议使用明确、温和的 UA:

AgriStationMonitor/1.0 (+research-contact@example.com)

如果是真实单位内部项目,可以写成:

YourOrgDataCollector/1.0 (+contact@example.org)

这样对方运维看到日志时也容易判断访问来源。

6.4 关于 timeout

timeout=10 是一个相对保守的起步值。公网环境不稳定时可以调到 15 或 20,但不建议完全不设置。

6.5 关于 session / cookie

本文 demo 不需要 cookie。

如果目标站点的公开页面需要设置一次 cookie 才能访问,也可以用 requests.Session() 保存。注意这里说的是公开页面常规 cookie,不包括绕过登录限制。

6.6 关于失败处理

本文用 tenacity 做指数退避:

wait_exponential(multiplier=1, min=1, max=8)

大概意思是失败后不要马上疯狂重试,而是逐步拉长等待时间。很多临时错误过几秒就恢复了,温和重试比暴力重试更适合长期任务。


7️⃣ 核心实现:解析层(Parser)

解析层是这个项目最容易变脏的地方。不同站点的 HTML 写法差异很大:

有的详情页是表格:

<tr>
  <th>站点名</th>
  <td>华北旱作农业试验站</td>
</tr>

有的是定义列表:

<dt>站点名</dt>
<dd>华北旱作农业试验站</dd>

还有的把字段写在段落里,需要正则或更复杂的规则。

本文先覆盖两类最常见结构:

  1. 表格结构:tr > th/td
  2. 定义列表结构:dt + dd

7.1 列表页如何拿详情链接

列表页通常有多个详情链接:

<a class="station-link" href="detail_001.html">查看</a>

解析时要注意:

  • 相对链接转绝对链接。
  • 去重。
  • 过滤空链接。
  • 不要把无关导航链接当成详情链接。

本文选择器:

a.station-link, .station-list a, table a

真实项目中建议优先使用更稳定的选择器,比如固定栏目容器下的链接:

#content .station-list a

或者根据 URL 特征过滤:

if "/station/detail/" in href:
    ...

7.2 详情页如何抽字段

详情页解析要做字段别名:

FIELD_ALIASES = {
    "站点名": "station_name",
    "站点名称": "station_name",
    "名称": "station_name",
    "试验方向": "direction_name",
    "研究方向": "direction_name",
    "方向": "direction_name",
    "地区": "region",
    "所在地区": "region",
    "区域": "region",
    "主管单位": "authority_name",
    "管理单位": "authority_name",
    "依托单位": "authority_name",
    "状态": "status",
    "运行状态": "status",
}

这一步很关键。真实名录页面里字段名并不总是统一的。如果不做别名兼容,换一个栏目就可能解析不到。

7.3 缺失字段怎么办

缺失字段不要让程序直接崩掉。

本文策略:

  • 字段缺失时填 "未知"
  • 站点名缺失时,用页面标题或 h1 兜底。
  • 解析失败的详情页记录日志,不影响其他页面继续采集。

为什么不用空字符串?

因为空字符串在后续分析里很容易被忽略,而 "未知" 更适合做质量检查。比如你可以直接统计:

SELECT COUNT(*) FROM stations WHERE region = '未知';

7.4 完整代码:agri_monitor/parser.py

from __future__ import annotations

from bs4 import BeautifulSoup

from .models import StationRecord
from .utils import absolute_url, build_hash, clean_text


FIELD_ALIASES = {
    "站点名": "station_name",
    "站点名称": "station_name",
    "名称": "station_name",
    "试验方向": "direction_name",
    "研究方向": "direction_name",
    "方向": "direction_name",
    "地区": "region",
    "所在地区": "region",
    "区域": "region",
    "主管单位": "authority_name",
    "管理单位": "authority_name",
    "依托单位": "authority_name",
    "状态": "status",
    "运行状态": "status",
}


class AgriStationParser:
    def parse_list(self, html: str, list_url: str) -> list[str]:
        soup = BeautifulSoup(html, "lxml")
        links: list[str] = []

        for a_tag in soup.select("a.station-link, .station-list a, table a"):
            href = a_tag.get("href")
            if not href:
                continue
            links.append(absolute_url(list_url, href))

        # dict.fromkeys 可以在保留顺序的同时去重。
        return list(dict.fromkeys(links))

    def parse_detail(self, html: str, url: str, crawled_at: str) -> StationRecord:
        soup = BeautifulSoup(html, "lxml")
        data = {
            "station_name": "未知",
            "direction_name": "未知",
            "region": "未知",
            "authority_name": "未知",
            "status": "未知",
        }

        # 兼容“字段-值”表格结构:<tr><th>站点名</th><td>...</td></tr>
        for row in soup.select("tr"):
            cells = row.find_all(["th", "td"])
            if len(cells) < 2:
                continue

            key = clean_text(cells[0].get_text(" "))
            value = clean_text(cells[1].get_text(" "))
            normalized_key = FIELD_ALIASES.get(key)

            if normalized_key:
                data[normalized_key] = value

        # 兼容 definition list:<dt>站点名</dt><dd>...</dd>
        for dt in soup.select("dt"):
            key = clean_text(dt.get_text(" "))
            dd = dt.find_next_sibling("dd")
            normalized_key = FIELD_ALIASES.get(key)

            if normalized_key and dd:
                data[normalized_key] = clean_text(dd.get_text(" "))

        # 页面标题兜底,避免详情页没有“站点名”时完全丢失。
        if data["station_name"] == "未知":
            h1 = soup.select_one("h1, .title, .station-title")
            if h1:
                data["station_name"] = clean_text(h1.get_text(" "))

        content_hash = build_hash(
            data["station_name"],
            data["direction_name"],
            data["region"],
            data["authority_name"],
            data["status"],
        )

        return StationRecord(
            source_url=url,
            station_name=data["station_name"],
            direction_name=data["direction_name"],
            region=data["region"],
            authority_name=data["authority_name"],
            status=data["status"],
            content_hash=content_hash,
            crawled_at=crawled_at,
        )

8️⃣ 数据存储与导出(Storage)

本文使用 SQLite 起步。

原因很实际:

  • 不需要安装 MySQL。
  • 单机任务足够用。
  • 方便导出 CSV。
  • 适合小型监控任务。
  • 后续迁移 MySQL 或 PostgreSQL 成本不高。

8.1 字段映射表

中文字段 程序字段 数据类型 示例值
站点名 station_name TEXT 华北旱作农业试验站
试验方向 direction_name TEXT 旱作节水与小麦玉米轮作
地区 region TEXT 河北省石家庄市
主管单位 authority_name TEXT 某农业科学院作物研究所
状态 status TEXT 运行中
来源链接 source_url TEXT http://127.0.0.1:8000/detail_001.html
内容哈希 content_hash TEXT sha256 字符串
首次发现时间 first_seen_at TEXT 2026-06-10T14:00:00+08:00
最近采集时间 last_seen_at TEXT 2026-06-10T14:00:00+08:00

8.2 三表模型

核心表:

authorities
research_directions
stations

关系:

authorities.id  <── stations.authority_id
research_directions.id <── stations.direction_id

也就是:

一个主管单位 -> 多个站点
一个研究方向 -> 多个站点
一个站点 -> 一个主管单位、一个主研究方向

真实业务里,如果一个站点有多个方向,可以这样扩展:

stations
research_directions
station_direction_relations

不过起步不建议过度设计。先保证字段稳定入库,再看数据复杂度决定是否拆关系表。

8.3 去重策略

本文采用两层去重。

第一层:URL 唯一。

source_url TEXT NOT NULL UNIQUE

同一个详情页不会重复插入。

第二层:内容 hash。

content_hash = sha256(站点名 + 试验方向 + 地区 + 主管单位 + 状态)

如果同一个 URL 的字段内容发生变化,就认为这条站点信息更新了。

这样可以判断:

  • 新增:URL 不存在。
  • 未变化:URL 存在,hash 一样。
  • 更新:URL 存在,hash 不一样。

8.4 完整代码:agri_monitor/storage.py

from __future__ import annotations

import csv
import sqlite3
from pathlib import Path

from .models import StationRecord


class SQLiteStorage:
    def __init__(self, sqlite_path: Path) -> None:
        self.sqlite_path = sqlite_path
        self.sqlite_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = sqlite3.connect(self.sqlite_path)
        self.conn.row_factory = sqlite3.Row

    def init_schema(self) -> None:
        self.conn.executescript(
            """
            PRAGMA foreign_keys = ON;

            CREATE TABLE IF NOT EXISTS authorities (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                created_at TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS research_directions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                created_at TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS stations (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                station_name TEXT NOT NULL,
                region TEXT NOT NULL,
                status TEXT NOT NULL,
                source_url TEXT NOT NULL UNIQUE,
                authority_id INTEGER NOT NULL,
                direction_id INTEGER NOT NULL,
                content_hash TEXT NOT NULL,
                first_seen_at TEXT NOT NULL,
                last_seen_at TEXT NOT NULL,
                updated_at TEXT NOT NULL,
                FOREIGN KEY(authority_id) REFERENCES authorities(id),
                FOREIGN KEY(direction_id) REFERENCES research_directions(id)
            );

            CREATE TABLE IF NOT EXISTS change_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                station_id INTEGER NOT NULL,
                old_hash TEXT NOT NULL,
                new_hash TEXT NOT NULL,
                changed_at TEXT NOT NULL,
                FOREIGN KEY(station_id) REFERENCES stations(id)
            );
            """
        )
        self.conn.commit()

    def _get_or_create_id(self, table: str, name: str, now: str) -> int:
        row = self.conn.execute(
            f"SELECT id FROM {table} WHERE name = ?",
            (name,),
        ).fetchone()

        if row:
            return int(row["id"])

        cursor = self.conn.execute(
            f"INSERT INTO {table}(name, created_at) VALUES (?, ?)",
            (name, now),
        )
        return int(cursor.lastrowid)

    def upsert_station(self, record: StationRecord) -> str:
        authority_id = self._get_or_create_id(
            "authorities",
            record.authority_name,
            record.crawled_at,
        )
        direction_id = self._get_or_create_id(
            "research_directions",
            record.direction_name,
            record.crawled_at,
        )

        row = self.conn.execute(
            "SELECT id, content_hash FROM stations WHERE source_url = ?",
            (record.source_url,),
        ).fetchone()

        if not row:
            self.conn.execute(
                """
                INSERT INTO stations(
                    station_name,
                    region,
                    status,
                    source_url,
                    authority_id,
                    direction_id,
                    content_hash,
                    first_seen_at,
                    last_seen_at,
                    updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    record.station_name,
                    record.region,
                    record.status,
                    record.source_url,
                    authority_id,
                    direction_id,
                    record.content_hash,
                    record.crawled_at,
                    record.crawled_at,
                    record.crawled_at,
                ),
            )
            self.conn.commit()
            return "created"

        station_id = int(row["id"])
        old_hash = str(row["content_hash"])

        if old_hash == record.content_hash:
            self.conn.execute(
                "UPDATE stations SET last_seen_at = ? WHERE id = ?",
                (record.crawled_at, station_id),
            )
            self.conn.commit()
            return "unchanged"

        self.conn.execute(
            """
            UPDATE stations
            SET station_name = ?,
                region = ?,
                status = ?,
                authority_id = ?,
                direction_id = ?,
                content_hash = ?,
                last_seen_at = ?,
                updated_at = ?
            WHERE id = ?
            """,
            (
                record.station_name,
                record.region,
                record.status,
                authority_id,
                direction_id,
                record.content_hash,
                record.crawled_at,
                record.crawled_at,
                station_id,
            ),
        )

        self.conn.execute(
            """
            INSERT INTO change_events(
                station_id,
                old_hash,
                new_hash,
                changed_at
            )
            VALUES (?, ?, ?, ?)
            """,
            (
                station_id,
                old_hash,
                record.content_hash,
                record.crawled_at,
            ),
        )

        self.conn.commit()
        return "updated"

    def export_csv(self, csv_path: Path) -> None:
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        rows = self.conn.execute(
            """
            SELECT
                s.station_name AS 站点名,
                d.name AS 试验方向,
                s.region AS 地区,
                a.name AS 主管单位,
                s.status AS 状态,
                s.source_url AS 来源链接,
                s.last_seen_at AS 最近采集时间
            FROM stations s
            JOIN authorities a ON s.authority_id = a.id
            JOIN research_directions d ON s.direction_id = d.id
            ORDER BY s.id ASC
            """
        ).fetchall()

        with csv_path.open("w", newline="", encoding="utf-8-sig") as f:
            writer = csv.writer(f)
            writer.writerow([
                "站点名",
                "试验方向",
                "地区",
                "主管单位",
                "状态",
                "来源链接",
                "最近采集时间",
            ])

            for row in rows:
                writer.writerow([row[key] for key in row.keys()])

    def preview(self, limit: int = 5) -> list[dict[str, str]]:
        rows = self.conn.execute(
            """
            SELECT
                s.station_name,
                d.name AS direction_name,
                s.region,
                a.name AS authority_name,
                s.status
            FROM stations s
            JOIN authorities a ON s.authority_id = a.id
            JOIN research_directions d ON s.direction_id = d.id
            ORDER BY s.id ASC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()

        return [dict(row) for row in rows]

    def close(self) -> None:
        self.conn.close()

8.5 为什么 CSV 使用 utf-8-sig

很多人会遇到一个小问题:CSV 用 UTF-8 写出后,Excel 打开中文乱码。

解决办法之一是使用:

encoding="utf-8-sig"

这样 Excel 识别起来更友好。


9️⃣ 运行方式与结果展示(必写)

为了让项目能在没有外网、没有真实目标站点的情况下跑通,本文代码内置一个 demo 站点生成器。运行时会自动创建本地 HTML 页面,并启动一个本地 HTTP 服务。

9.1 配置文件:agri_monitor/config.py

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path


PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = PROJECT_ROOT / "data"
DEMO_SITE_DIR = DATA_DIR / "demo_site"
OUTPUT_DIR = DATA_DIR / "output"


@dataclass(frozen=True)
class CrawlConfig:
    base_url: str
    list_path: str = "/index.html"
    timeout: int = 10
    sleep_seconds: float = 0.6
    max_retries: int = 3
    user_agent: str = "AgriStationMonitor/1.0 (+research-contact@example.com)"
    referer: str | None = None
    sqlite_path: Path = OUTPUT_DIR / "agri_stations.sqlite"
    csv_path: Path = OUTPUT_DIR / "agri_stations.csv"

    @property
    def list_url(self) -> str:
        return self.base_url.rstrip("/") + self.list_path

9.2 数据模型:agri_monitor/models.py

from __future__ import annotations

from dataclasses import dataclass


@dataclass
class StationRecord:
    source_url: str
    station_name: str
    direction_name: str
    region: str
    authority_name: str
    status: str
    content_hash: str
    crawled_at: str


@dataclass
class CrawlResult:
    created: int = 0
    updated: int = 0
    unchanged: int = 0
    failed: int = 0

9.3 工具函数:agri_monitor/utils.py

from __future__ import annotations

import hashlib
import re
from urllib.parse import urljoin


def clean_text(value: str | None) -> str:
    if not value:
        return "未知"

    text = re.sub(r"\s+", " ", value).strip()
    return text if text else "未知"


def absolute_url(base_url: str, href: str) -> str:
    return urljoin(base_url, href)


def build_hash(*parts: str) -> str:
    raw = "||".join(clean_text(p) for p in parts)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

9.4 本地 demo 站点:agri_monitor/demo_site.py

这部分代码用于生成模拟页面。它的作用不是造假数据,而是让项目在任何环境下都能跑通。

真实接站点时,可以不使用 --demo

from __future__ import annotations

from pathlib import Path


INDEX_HTML = """<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8">
  <title>农业试验站公开名录</title>
</head>
<body>
  <h1>农业试验站公开名录</h1>
  <table class="station-list">
    <tr>
      <th>站点名</th>
      <th>地区</th>
      <th>详情</th>
    </tr>
    <tr>
      <td>华北旱作农业试验站</td>
      <td>河北省</td>
      <td><a class="station-link" href="detail_001.html">查看</a></td>
    </tr>
    <tr>
      <td>江南水稻生态试验站</td>
      <td>江苏省</td>
      <td><a class="station-link" href="detail_002.html">查看</a></td>
    </tr>
    <tr>
      <td>西南山地土壤改良试验站</td>
      <td>四川省</td>
      <td><a class="station-link" href="detail_003.html">查看</a></td>
    </tr>
    <tr>
      <td>东北黑土地保护试验站</td>
      <td>黑龙江省</td>
      <td><a class="station-link" href="detail_004.html">查看</a></td>
    </tr>
  </table>
</body>
</html>
"""


DETAILS = {
    "detail_001.html": {
        "站点名": "华北旱作农业试验站",
        "试验方向": "旱作节水与小麦玉米轮作",
        "地区": "河北省石家庄市",
        "主管单位": "某农业科学院作物研究所",
        "状态": "运行中",
    },
    "detail_002.html": {
        "站点名": "江南水稻生态试验站",
        "试验方向": "水稻品种比较与稻田生态",
        "地区": "江苏省扬州市",
        "主管单位": "某省农业技术推广中心",
        "状态": "运行中",
    },
    "detail_003.html": {
        "站点名": "西南山地土壤改良试验站",
        "试验方向": "坡耕地土壤改良与养分监测",
        "地区": "四川省绵阳市",
        "主管单位": "某农业大学资源环境学院",
        "状态": "维护中",
    },
    "detail_004.html": {
        "站点名": "东北黑土地保护试验站",
        "试验方向": "黑土地保护性耕作与有机质提升",
        "地区": "黑龙江省哈尔滨市",
        "主管单位": "某黑土地保护研究中心",
        "状态": "运行中",
    },
}


def build_detail_html(fields: dict[str, str]) -> str:
    rows = "\n".join(
        f"<tr><th>{key}</th><td>{value}</td></tr>"
        for key, value in fields.items()
    )

    return f"""<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8">
  <title>{fields.get('站点名', '详情')}</title>
</head>
<body>
  <h1 class="station-title">{fields.get('站点名', '详情')}</h1>
  <table class="detail-table">
    {rows}
  </table>
</body>
</html>
"""


def write_demo_site(root: Path) -> None:
    root.mkdir(parents=True, exist_ok=True)

    (root / "index.html").write_text(INDEX_HTML, encoding="utf-8")

    for filename, fields in DETAILS.items():
        (root / filename).write_text(
            build_detail_html(fields),
            encoding="utf-8",
        )

9.5 入口文件:main.py

from __future__ import annotations

import argparse
import contextlib
import functools
import http.server
import logging
import socket
import threading
from datetime import datetime, timezone
from pathlib import Path

from agri_monitor.config import CrawlConfig, DEMO_SITE_DIR, OUTPUT_DIR
from agri_monitor.demo_site import write_demo_site
from agri_monitor.fetcher import Fetcher
from agri_monitor.models import CrawlResult
from agri_monitor.parser import AgriStationParser
from agri_monitor.storage import SQLiteStorage


def now_iso() -> str:
    return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


def find_free_port() -> int:
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.bind(("127.0.0.1", 0))
        return int(sock.getsockname()[1])


def start_demo_server(directory: Path) -> tuple[http.server.ThreadingHTTPServer, str]:
    port = find_free_port()

    handler = functools.partial(
        http.server.SimpleHTTPRequestHandler,
        directory=str(directory),
    )

    server = http.server.ThreadingHTTPServer(("127.0.0.1", port), handler)

    thread = threading.Thread(
        target=server.serve_forever,
        daemon=True,
    )
    thread.start()

    return server, f"http://127.0.0.1:{port}"


def run_crawl(config: CrawlConfig) -> CrawlResult:
    fetcher = Fetcher(
        user_agent=config.user_agent,
        timeout=config.timeout,
        referer=config.referer or config.base_url,
        sleep_seconds=config.sleep_seconds,
    )

    parser = AgriStationParser()
    storage = SQLiteStorage(config.sqlite_path)
    storage.init_schema()

    result = CrawlResult()

    try:
        logging.info("fetch list: %s", config.list_url)

        list_html = fetcher.get(config.list_url)
        detail_urls = parser.parse_list(list_html, config.list_url)

        logging.info("found detail urls: %d", len(detail_urls))

        for url in detail_urls:
            try:
                html = fetcher.get(url)
                record = parser.parse_detail(html, url, now_iso())

                action = storage.upsert_station(record)

                if action == "created":
                    result.created += 1
                elif action == "updated":
                    result.updated += 1
                else:
                    result.unchanged += 1

                logging.info("%-9s %s", action, record.station_name)

            except Exception as exc:
                result.failed += 1
                logging.exception("failed detail url=%s error=%s", url, exc)

        storage.export_csv(config.csv_path)

        logging.info("export csv: %s", config.csv_path)

        for row in storage.preview(limit=5):
            logging.info("preview: %s", row)

        return result

    finally:
        storage.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="农业试验站公开名录监控爬虫")

    parser.add_argument(
        "--base-url",
        default="",
        help="目标站点根地址,例如 https://example.org/agri",
    )
    parser.add_argument(
        "--list-path",
        default="/index.html",
        help="列表页路径",
    )
    parser.add_argument(
        "--demo",
        action="store_true",
        help="生成并启动本地样例站点,便于无外网环境跑通",
    )
    parser.add_argument(
        "--db",
        default=str(OUTPUT_DIR / "agri_stations.sqlite"),
        help="SQLite 输出路径",
    )
    parser.add_argument(
        "--csv",
        default=str(OUTPUT_DIR / "agri_stations.csv"),
        help="CSV 输出路径",
    )
    parser.add_argument(
        "--sleep",
        type=float,
        default=0.6,
        help="请求间隔秒数",
    )

    return parser.parse_args()


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
    )

    args = parse_args()

    demo_server = None

    if args.demo:
        write_demo_site(DEMO_SITE_DIR)
        demo_server, base_url = start_demo_server(DEMO_SITE_DIR)
        list_path = "/index.html"
        logging.info("demo server started: %s", base_url)
    else:
        if not args.base_url:
            raise SystemExit("请提供 --base-url,或使用 --demo 运行本地样例。")

        base_url = args.base_url
        list_path = args.list_path

    try:
        config = CrawlConfig(
            base_url=base_url,
            list_path=list_path,
            sleep_seconds=args.sleep,
            sqlite_path=Path(args.db),
            csv_path=Path(args.csv),
        )

        result = run_crawl(config)

        logging.info(
            "done: created=%d updated=%d unchanged=%d failed=%d",
            result.created,
            result.updated,
            result.unchanged,
            result.failed,
        )

    finally:
        if demo_server:
            demo_server.shutdown()


if __name__ == "__main__":
    main()

9.6 如何启动

先安装依赖:

pip install -r requirements.txt

运行本地 demo:

python main.py --demo

输出示例:

INFO demo server started: http://127.0.0.1:57693
INFO fetch list: http://127.0.0.1:57693/index.html
INFO found detail urls: 4
INFO created   华北旱作农业试验站
INFO created   江南水稻生态试验站
INFO created   西南山地土壤改良试验站
INFO created   东北黑土地保护试验站
INFO export csv: data/output/agri_stations.csv
INFO done: created=4 updated=0 unchanged=0 failed=0

第二次运行:

python main.py --demo

如果页面内容没变,会看到:

INFO unchanged 华北旱作农业试验站
INFO unchanged 江南水稻生态试验站
INFO unchanged 西南山地土壤改良试验站
INFO unchanged 东北黑土地保护试验站
INFO done: created=0 updated=0 unchanged=4 failed=0

这就是监控的基础能力:能够区分新增、更新和未变化。

9.7 输出在哪里

默认输出:

data/output/agri_stations.sqlite
data/output/agri_stations.csv

CSV 内容示例:

站点名 试验方向 地区 主管单位 状态
华北旱作农业试验站 旱作节水与小麦玉米轮作 河北省石家庄市 某农业科学院作物研究所 运行中
江南水稻生态试验站 水稻品种比较与稻田生态 江苏省扬州市 某省农业技术推广中心 运行中
西南山地土壤改良试验站 坡耕地土壤改良与养分监测 四川省绵阳市 某农业大学资源环境学院 维护中
东北黑土地保护试验站 黑土地保护性耕作与有机质提升 黑龙江省哈尔滨市 某黑土地保护研究中心 运行中

9.8 接真实公开站点

假设真实站点根地址为:

https://example.org/agri-stations

列表页路径为:

/list.html

运行:

python main.py \
  --base-url "https://example.org/agri-stations" \
  --list-path "/list.html" \
  --sleep 1.5

如果真实页面结构不同,主要改两个地方:

第一,列表页详情链接选择器:

for a_tag in soup.select("a.station-link, .station-list a, table a"):

第二,详情页字段解析规则:

for row in soup.select("tr"):

如果详情页是 JSON 接口,就不需要 BeautifulSoup,直接 response.json() 后映射字段即可。


🔟 常见问题与排错(强烈建议写)

10.1 403 怎么办

403 表示服务器拒绝访问。原因可能很多:

  • 没有设置 User-Agent。
  • Referer 不符合预期。
  • 目标站点不希望自动化访问。
  • 访问频率异常。
  • 路径本身不对外开放。

处理建议:

  1. 先用浏览器确认页面是否公开可访问。
  2. 检查 URL 是否正确。
  3. 设置合理 UA,不要使用空 UA。
  4. 降低访问频率。
  5. 如果页面明确限制自动访问,就不要继续采集。

不建议把 403 简单理解为“换代理就行”。在公开名录监控项目里,代理不是优先解法,合规和稳定才是优先项。

10.2 429 怎么办

429 通常表示请求过快。

处理建议:

  • 增加 --sleep,比如调到 2 秒、5 秒。
  • 减少并发。
  • 遇到 429 时指数退避。
  • 定时任务降低频率。
  • 缓存已抓详情页,避免重复请求。

本文 Fetcher 已经把 429 放进重试状态码:

if response.status_code in {403, 429, 500, 502, 503, 504}:
    raise FetchError(...)

但真实任务里,如果连续遇到 429,更好的做法是停止本轮任务,而不是继续硬抓。

10.3 HTML 抓到空壳怎么办

如果 requests.get() 拿到的 HTML 里没有数据,而浏览器里能看到数据,大概率是动态渲染。

排查方法:

  1. 打开浏览器开发者工具。
  2. 进入 Network 面板。
  3. 刷新页面。
  4. 搜索字段关键字,比如某个站点名。
  5. 看数据是 HTML 里直接返回,还是由 XHR / Fetch 接口返回。

如果找到了 JSON 接口,优先抓 JSON 接口。

如果没有稳定接口,可以考虑 Playwright:

from playwright.sync_api import sync_playwright

with sync_playwright() as p:
    browser = p.chromium.launch(headless=True)
    page = browser.new_page()
    page.goto(url, wait_until="networkidle")
    html = page.content()
    browser.close()

但不要一开始就用 Playwright。能用静态请求解决的页面,静态请求更简单、更省资源。

10.4 解析报错怎么办

解析报错通常有几类:

第一,选择器失效。
网站改版后,原来的 .station-link 不存在了。

第二,字段结构变化。
原来是表格,现在变成段落。

第三,字段名变化。
原来叫“主管单位”,现在叫“依托机构”。

第四,页面混入无关链接。
列表页里把导航、附件、分页都当成了详情页。

解决办法:

  • 给解析层加日志。
  • 保存失败页面到本地排查。
  • 增加字段别名。
  • 增加 URL 过滤。
  • 单独写解析单元测试。

例如可以临时保存失败页面:

from pathlib import Path

Path("debug_failed.html").write_text(html, encoding="utf-8")

然后用浏览器打开,重新确认结构。

10.5 编码乱码如何处理

老站点常见编码问题:

  • 响应头写错。
  • 页面 meta 声明不一致。
  • 实际是 GBK,但程序按 UTF-8 解码。

本文使用:

response.encoding = response.apparent_encoding or response.encoding

如果你明确知道目标站点是 GBK,可以直接写:

response.encoding = "gbk"

CSV 导出建议:

encoding="utf-8-sig"

这样用 Excel 打开中文更稳。

10.6 SQLite 被锁怎么办

如果多进程同时写 SQLite,可能遇到 database is locked。

起步项目建议:

  • 单进程写入。
  • 不要多个任务同时跑同一个数据库。
  • 大规模任务迁移到 MySQL 或 PostgreSQL。
  • 写入时使用队列,把采集和写库分开。

10.7 数据重复怎么办

重复一般来自:

  • 列表页重复链接。
  • 同一详情页多个 URL 参数。
  • HTTP 和 HTTPS 混用。
  • URL 后面带 tracking 参数。

本文先用 source_url UNIQUE 去重。更严谨可以增加 URL 规范化:

from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode

def normalize_url(url: str) -> str:
    parts = urlsplit(url)
    query_pairs = [
        (k, v)
        for k, v in parse_qsl(parts.query)
        if not k.lower().startswith("utm_")
    ]
    clean_query = urlencode(query_pairs)
    return urlunsplit((
        parts.scheme.lower(),
        parts.netloc.lower(),
        parts.path,
        clean_query,
        "",
    ))

也可以增加业务唯一键,比如:

站点名 + 地区 + 主管单位

但业务唯一键要谨慎,因为站点可能重名,主管单位名称也可能调整。

10.8 字段全是“未知”怎么办

字段全是“未知”,说明详情页解析没命中。

优先检查:

  • 详情页 HTML 是否真的拿到了。
  • 是否是动态渲染页面。
  • 字段是否不是表格或 dt/dd 结构。
  • 字段名是否不在 FIELD_ALIASES 里。
  • 是否页面有 iframe。

临时调试:

print(soup.get_text("\n")[:2000])

或者保存 HTML:

Path("debug.html").write_text(html, encoding="utf-8")

解析层不要靠猜,最好直接看页面结构。


1️⃣1️⃣ 进阶优化(可选但加分)

11.1 并发优化

本文是单线程,适合起步和温和监控。

如果真实数据量较大,可以考虑线程池:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_one(url: str):
    html = fetcher.get(url)
    return parser.parse_detail(html, url, now_iso())

with ThreadPoolExecutor(max_workers=3) as executor:
    futures = [executor.submit(fetch_one, url) for url in detail_urls]

    for future in as_completed(futures):
        record = future.result()
        storage.upsert_station(record)

但我建议把 max_workers 控制在 2 到 5,不要为了快把公开站点打满。

如果你需要更强的并发控制,可以用 asyncio + httpx。如果要做多站点采集,Scrapy 更合适。

11.2 断点续跑

断点续跑的核心是:已经成功抓过的 URL,不要重复抓。

可以加一张表:

CREATE TABLE IF NOT EXISTS crawl_tasks (
    url TEXT PRIMARY KEY,
    status TEXT NOT NULL,
    retry_count INTEGER NOT NULL DEFAULT 0,
    updated_at TEXT NOT NULL
);

状态可以设计为:

pending
success
failed
skipped

执行时先把列表页详情链接写入任务表,再逐条消费。中断后下次从 pendingfailed 继续。

小项目也可以简单一点:

seen_urls = set(
    row["source_url"]
    for row in conn.execute("SELECT source_url FROM stations")
)

不过这种方式只能跳过已入库的成功数据,不能很好记录失败任务。

11.3 日志与监控

长期任务至少要记录:

  • 本轮发现多少详情页。
  • 新增多少。
  • 更新多少。
  • 未变化多少。
  • 失败多少。
  • 失败 URL 是什么。
  • 平均耗时是多少。

本文已经有基础日志:

logging.info(
    "done: created=%d updated=%d unchanged=%d failed=%d",
    result.created,
    result.updated,
    result.unchanged,
    result.failed,
)

如果要更正式,可以增加 crawl_runs 表:

CREATE TABLE IF NOT EXISTS crawl_runs (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    started_at TEXT NOT NULL,
    finished_at TEXT,
    total_urls INTEGER NOT NULL DEFAULT 0,
    created_count INTEGER NOT NULL DEFAULT 0,
    updated_count INTEGER NOT NULL DEFAULT 0,
    unchanged_count INTEGER NOT NULL DEFAULT 0,
    failed_count INTEGER NOT NULL DEFAULT 0
);

这样每次采集都能留下运行记录,后续做仪表盘也方便。

11.4 定时任务

Linux 下可以用 cron。

比如每天凌晨 2 点运行:

0 2 * * * cd /path/to/agri_station_monitor && /path/to/agri_station_monitor/.venv/bin/python main.py --base-url "https://example.org/agri-stations" --list-path "/list.html" --sleep 2 >> logs/crawl.log 2>&1

如果任务较多,建议用 Airflow、Prefect 或系统内的任务调度平台。

简单项目不要一开始就上复杂平台。先用 cron 跑稳,再决定是否升级。

11.5 数据质量检查

可以写几条 SQL 做质量检查:

检查缺失地区:

SELECT COUNT(*) AS missing_region
FROM stations
WHERE region = '未知';

检查状态分布:

SELECT status, COUNT(*) AS total
FROM stations
GROUP BY status
ORDER BY total DESC;

检查主管单位站点数:

SELECT a.name, COUNT(*) AS total
FROM stations s
JOIN authorities a ON s.authority_id = a.id
GROUP BY a.name
ORDER BY total DESC;

检查研究方向分布:

SELECT d.name, COUNT(*) AS total
FROM stations s
JOIN research_directions d ON s.direction_id = d.id
GROUP BY d.name
ORDER BY total DESC;

这些 SQL 对后续分析很有用。爬虫只是第一步,数据进入结构化表以后,价值才慢慢出来。

11.6 变化通知

如果 updated > 0created > 0,可以发送通知。

简单方式是输出日志。进一步可以接入:

  • 邮件。
  • 企业微信机器人。
  • 钉钉机器人。
  • 飞书机器人。
  • 内部告警系统。

通知内容建议简洁:

农业试验站名录监控完成:
新增 3 条,更新 1 条,失败 0 条。
输出文件:data/output/agri_stations.csv

不要把所有详情都塞进通知里。通知只负责提醒,详情仍然回到数据库或 CSV 查看。

11.7 从 SQLite 迁移到 MySQL

当数据量变大、多任务同时写入、多人查询时,可以迁移到 MySQL 或 PostgreSQL。

迁移时主要变化在 Storage 层。Fetcher 和 Parser 不需要改。

这就是分层设计的好处:

Fetcher 只管请求
Parser 只管解析
Storage 只管存储

换数据库时,不需要重写整个爬虫。


1️⃣2️⃣ 总结与延伸阅读

本文完成了一个农业试验站公开名录监控项目的完整落地过程。

我们不是简单写了一个 requests.get(),而是把一个可维护的小型采集系统拆成了几层:

  • 请求层:负责 headers、timeout、session、重试和频控。
  • 解析层:负责列表页链接提取、详情页字段抽取、字段别名兼容。
  • 清洗层:负责空值兜底、文本整理、URL 规范处理。
  • 存储层:负责 SQLite 三表模型、去重、更新检测、CSV 导出。
  • 运行层:负责命令行参数、本地 demo 站点、日志输出。

这套设计适合很多公开名录类任务,不只适合农业试验站。比如:

  • 科研平台公开名录。
  • 实验室公开名录。
  • 技术服务站公开名录。
  • 示范基地公开名录。
  • 企业公开备案目录。
  • 行业协会会员单位目录。

后续可以继续扩展:

  1. 使用 Scrapy 管理多站点采集。
  2. 使用 Playwright 处理强动态页面。
  3. 增加任务表,实现断点续跑。
  4. 增加历史快照表,追踪字段变化。
  5. 增加地图坐标字段,做空间分布分析。
  6. 增加数据质量规则,自动发现异常字段。
  7. 增加定时任务和通知机制,实现真正的监控闭环。

我个人做这类名录项目时,最看重的不是首轮抓取速度,而是三个月后还能不能稳定复跑。页面会改、字段会缺、网络会抖、数据会重复,这些都很正常。爬虫写得稳不稳,往往就体现在这些不起眼的细节里。

最后再强调一次:公开名录采集要控制频率、尊重站点规则、只处理公开字段,不绕过登录或付费限制。技术分享的重点是自动化整理和数据治理,而不是对目标站点造成负担。


附录:完整项目代码汇总

下面把完整项目文件再集中放一遍,方便复制。

requirements.txt

requests==2.32.3
beautifulsoup4==4.12.3
lxml==5.3.0
tenacity==9.0.0
python-dateutil==2.9.0.post0

agri_monitor/init.py

# agri_monitor package

agri_monitor/config.py

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path


PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = PROJECT_ROOT / "data"
DEMO_SITE_DIR = DATA_DIR / "demo_site"
OUTPUT_DIR = DATA_DIR / "output"


@dataclass(frozen=True)
class CrawlConfig:
    base_url: str
    list_path: str = "/index.html"
    timeout: int = 10
    sleep_seconds: float = 0.6
    max_retries: int = 3
    user_agent: str = "AgriStationMonitor/1.0 (+research-contact@example.com)"
    referer: str | None = None
    sqlite_path: Path = OUTPUT_DIR / "agri_stations.sqlite"
    csv_path: Path = OUTPUT_DIR / "agri_stations.csv"

    @property
    def list_url(self) -> str:
        return self.base_url.rstrip("/") + self.list_path

agri_monitor/models.py

from __future__ import annotations

from dataclasses import dataclass


@dataclass
class StationRecord:
    source_url: str
    station_name: str
    direction_name: str
    region: str
    authority_name: str
    status: str
    content_hash: str
    crawled_at: str


@dataclass
class CrawlResult:
    created: int = 0
    updated: int = 0
    unchanged: int = 0
    failed: int = 0

agri_monitor/utils.py

from __future__ import annotations

import hashlib
import re
from urllib.parse import urljoin


def clean_text(value: str | None) -> str:
    if not value:
        return "未知"

    text = re.sub(r"\s+", " ", value).strip()
    return text if text else "未知"


def absolute_url(base_url: str, href: str) -> str:
    return urljoin(base_url, href)


def build_hash(*parts: str) -> str:
    raw = "||".join(clean_text(p) for p in parts)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()

agri_monitor/fetcher.py

from __future__ import annotations

import time
from typing import Optional

import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential


class FetchError(RuntimeError):
    pass


class Fetcher:
    def __init__(
        self,
        user_agent: str,
        timeout: int = 10,
        referer: Optional[str] = None,
        sleep_seconds: float = 0.6,
    ) -> None:
        self.timeout = timeout
        self.sleep_seconds = sleep_seconds
        self.session = requests.Session()
        self.session.headers.update({
            "User-Agent": user_agent,
            "Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
            "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
            "Connection": "keep-alive",
        })
        if referer:
            self.session.headers.update({"Referer": referer})

    @retry(
        retry=retry_if_exception_type((requests.RequestException, FetchError)),
        wait=wait_exponential(multiplier=1, min=1, max=8),
        stop=stop_after_attempt(3),
        reraise=True,
    )
    def get(self, url: str) -> str:
        time.sleep(self.sleep_seconds)

        response = self.session.get(url, timeout=self.timeout)

        if response.status_code in {403, 429, 500, 502, 503, 504}:
            raise FetchError(
                f"temporary or restricted status: {response.status_code}, url={url}"
            )

        response.raise_for_status()
        response.encoding = response.apparent_encoding or response.encoding

        return response.text

agri_monitor/parser.py

from __future__ import annotations

from bs4 import BeautifulSoup

from .models import StationRecord
from .utils import absolute_url, build_hash, clean_text


FIELD_ALIASES = {
    "站点名": "station_name",
    "站点名称": "station_name",
    "名称": "station_name",
    "试验方向": "direction_name",
    "研究方向": "direction_name",
    "方向": "direction_name",
    "地区": "region",
    "所在地区": "region",
    "区域": "region",
    "主管单位": "authority_name",
    "管理单位": "authority_name",
    "依托单位": "authority_name",
    "状态": "status",
    "运行状态": "status",
}


class AgriStationParser:
    def parse_list(self, html: str, list_url: str) -> list[str]:
        soup = BeautifulSoup(html, "lxml")
        links: list[str] = []

        for a_tag in soup.select("a.station-link, .station-list a, table a"):
            href = a_tag.get("href")
            if not href:
                continue
            links.append(absolute_url(list_url, href))

        return list(dict.fromkeys(links))

    def parse_detail(self, html: str, url: str, crawled_at: str) -> StationRecord:
        soup = BeautifulSoup(html, "lxml")

        data = {
            "station_name": "未知",
            "direction_name": "未知",
            "region": "未知",
            "authority_name": "未知",
            "status": "未知",
        }

        for row in soup.select("tr"):
            cells = row.find_all(["th", "td"])
            if len(cells) < 2:
                continue

            key = clean_text(cells[0].get_text(" "))
            value = clean_text(cells[1].get_text(" "))

            normalized_key = FIELD_ALIASES.get(key)

            if normalized_key:
                data[normalized_key] = value

        for dt in soup.select("dt"):
            key = clean_text(dt.get_text(" "))
            dd = dt.find_next_sibling("dd")
            normalized_key = FIELD_ALIASES.get(key)

            if normalized_key and dd:
                data[normalized_key] = clean_text(dd.get_text(" "))

        if data["station_name"] == "未知":
            h1 = soup.select_one("h1, .title, .station-title")
            if h1:
                data["station_name"] = clean_text(h1.get_text(" "))

        content_hash = build_hash(
            data["station_name"],
            data["direction_name"],
            data["region"],
            data["authority_name"],
            data["status"],
        )

        return StationRecord(
            source_url=url,
            station_name=data["station_name"],
            direction_name=data["direction_name"],
            region=data["region"],
            authority_name=data["authority_name"],
            status=data["status"],
            content_hash=content_hash,
            crawled_at=crawled_at,
        )

agri_monitor/storage.py

from __future__ import annotations

import csv
import sqlite3
from pathlib import Path

from .models import StationRecord


class SQLiteStorage:
    def __init__(self, sqlite_path: Path) -> None:
        self.sqlite_path = sqlite_path
        self.sqlite_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = sqlite3.connect(self.sqlite_path)
        self.conn.row_factory = sqlite3.Row

    def init_schema(self) -> None:
        self.conn.executescript(
            """
            PRAGMA foreign_keys = ON;

            CREATE TABLE IF NOT EXISTS authorities (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                created_at TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS research_directions (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                name TEXT NOT NULL UNIQUE,
                created_at TEXT NOT NULL
            );

            CREATE TABLE IF NOT EXISTS stations (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                station_name TEXT NOT NULL,
                region TEXT NOT NULL,
                status TEXT NOT NULL,
                source_url TEXT NOT NULL UNIQUE,
                authority_id INTEGER NOT NULL,
                direction_id INTEGER NOT NULL,
                content_hash TEXT NOT NULL,
                first_seen_at TEXT NOT NULL,
                last_seen_at TEXT NOT NULL,
                updated_at TEXT NOT NULL,
                FOREIGN KEY(authority_id) REFERENCES authorities(id),
                FOREIGN KEY(direction_id) REFERENCES research_directions(id)
            );

            CREATE TABLE IF NOT EXISTS change_events (
                id INTEGER PRIMARY KEY AUTOINCREMENT,
                station_id INTEGER NOT NULL,
                old_hash TEXT NOT NULL,
                new_hash TEXT NOT NULL,
                changed_at TEXT NOT NULL,
                FOREIGN KEY(station_id) REFERENCES stations(id)
            );
            """
        )
        self.conn.commit()

    def _get_or_create_id(self, table: str, name: str, now: str) -> int:
        row = self.conn.execute(
            f"SELECT id FROM {table} WHERE name = ?",
            (name,),
        ).fetchone()

        if row:
            return int(row["id"])

        cursor = self.conn.execute(
            f"INSERT INTO {table}(name, created_at) VALUES (?, ?)",
            (name, now),
        )

        return int(cursor.lastrowid)

    def upsert_station(self, record: StationRecord) -> str:
        authority_id = self._get_or_create_id(
            "authorities",
            record.authority_name,
            record.crawled_at,
        )

        direction_id = self._get_or_create_id(
            "research_directions",
            record.direction_name,
            record.crawled_at,
        )

        row = self.conn.execute(
            "SELECT id, content_hash FROM stations WHERE source_url = ?",
            (record.source_url,),
        ).fetchone()

        if not row:
            self.conn.execute(
                """
                INSERT INTO stations(
                    station_name,
                    region,
                    status,
                    source_url,
                    authority_id,
                    direction_id,
                    content_hash,
                    first_seen_at,
                    last_seen_at,
                    updated_at
                ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
                """,
                (
                    record.station_name,
                    record.region,
                    record.status,
                    record.source_url,
                    authority_id,
                    direction_id,
                    record.content_hash,
                    record.crawled_at,
                    record.crawled_at,
                    record.crawled_at,
                ),
            )
            self.conn.commit()
            return "created"

        station_id = int(row["id"])
        old_hash = str(row["content_hash"])

        if old_hash == record.content_hash:
            self.conn.execute(
                "UPDATE stations SET last_seen_at = ? WHERE id = ?",
                (record.crawled_at, station_id),
            )
            self.conn.commit()
            return "unchanged"

        self.conn.execute(
            """
            UPDATE stations
            SET station_name = ?,
                region = ?,
                status = ?,
                authority_id = ?,
                direction_id = ?,
                content_hash = ?,
                last_seen_at = ?,
                updated_at = ?
            WHERE id = ?
            """,
            (
                record.station_name,
                record.region,
                record.status,
                authority_id,
                direction_id,
                record.content_hash,
                record.crawled_at,
                record.crawled_at,
                station_id,
            ),
        )

        self.conn.execute(
            """
            INSERT INTO change_events(
                station_id,
                old_hash,
                new_hash,
                changed_at
            )
            VALUES (?, ?, ?, ?)
            """,
            (
                station_id,
                old_hash,
                record.content_hash,
                record.crawled_at,
            ),
        )

        self.conn.commit()
        return "updated"

    def export_csv(self, csv_path: Path) -> None:
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        rows = self.conn.execute(
            """
            SELECT
                s.station_name AS 站点名,
                d.name AS 试验方向,
                s.region AS 地区,
                a.name AS 主管单位,
                s.status AS 状态,
                s.source_url AS 来源链接,
                s.last_seen_at AS 最近采集时间
            FROM stations s
            JOIN authorities a ON s.authority_id = a.id
            JOIN research_directions d ON s.direction_id = d.id
            ORDER BY s.id ASC
            """
        ).fetchall()

        with csv_path.open("w", newline="", encoding="utf-8-sig") as f:
            writer = csv.writer(f)
            writer.writerow([
                "站点名",
                "试验方向",
                "地区",
                "主管单位",
                "状态",
                "来源链接",
                "最近采集时间",
            ])

            for row in rows:
                writer.writerow([row[key] for key in row.keys()])

    def preview(self, limit: int = 5) -> list[dict[str, str]]:
        rows = self.conn.execute(
            """
            SELECT
                s.station_name,
                d.name AS direction_name,
                s.region,
                a.name AS authority_name,
                s.status
            FROM stations s
            JOIN authorities a ON s.authority_id = a.id
            JOIN research_directions d ON s.direction_id = d.id
            ORDER BY s.id ASC
            LIMIT ?
            """,
            (limit,),
        ).fetchall()

        return [dict(row) for row in rows]

    def close(self) -> None:
        self.conn.close()

agri_monitor/demo_site.py

from __future__ import annotations

from pathlib import Path


INDEX_HTML = """<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8">
  <title>农业试验站公开名录</title>
</head>
<body>
  <h1>农业试验站公开名录</h1>
  <table class="station-list">
    <tr>
      <th>站点名</th>
      <th>地区</th>
      <th>详情</th>
    </tr>
    <tr>
      <td>华北旱作农业试验站</td>
      <td>河北省</td>
      <td><a class="station-link" href="detail_001.html">查看</a></td>
    </tr>
    <tr>
      <td>江南水稻生态试验站</td>
      <td>江苏省</td>
      <td><a class="station-link" href="detail_002.html">查看</a></td>
    </tr>
    <tr>
      <td>西南山地土壤改良试验站</td>
      <td>四川省</td>
      <td><a class="station-link" href="detail_003.html">查看</a></td>
    </tr>
    <tr>
      <td>东北黑土地保护试验站</td>
      <td>黑龙江省</td>
      <td><a class="station-link" href="detail_004.html">查看</a></td>
    </tr>
  </table>
</body>
</html>
"""


DETAILS = {
    "detail_001.html": {
        "站点名": "华北旱作农业试验站",
        "试验方向": "旱作节水与小麦玉米轮作",
        "地区": "河北省石家庄市",
        "主管单位": "某农业科学院作物研究所",
        "状态": "运行中",
    },
    "detail_002.html": {
        "站点名": "江南水稻生态试验站",
        "试验方向": "水稻品种比较与稻田生态",
        "地区": "江苏省扬州市",
        "主管单位": "某省农业技术推广中心",
        "状态": "运行中",
    },
    "detail_003.html": {
        "站点名": "西南山地土壤改良试验站",
        "试验方向": "坡耕地土壤改良与养分监测",
        "地区": "四川省绵阳市",
        "主管单位": "某农业大学资源环境学院",
        "状态": "维护中",
    },
    "detail_004.html": {
        "站点名": "东北黑土地保护试验站",
        "试验方向": "黑土地保护性耕作与有机质提升",
        "地区": "黑龙江省哈尔滨市",
        "主管单位": "某黑土地保护研究中心",
        "状态": "运行中",
    },
}


def build_detail_html(fields: dict[str, str]) -> str:
    rows = "\n".join(
        f"<tr><th>{key}</th><td>{value}</td></tr>"
        for key, value in fields.items()
    )

    return f"""<!doctype html>
<html lang="zh-CN">
<head>
  <meta charset="utf-8">
  <title>{fields.get('站点名', '详情')}</title>
</head>
<body>
  <h1 class="station-title">{fields.get('站点名', '详情')}</h1>
  <table class="detail-table">
    {rows}
  </table>
</body>
</html>
"""


def write_demo_site(root: Path) -> None:
    root.mkdir(parents=True, exist_ok=True)

    (root / "index.html").write_text(INDEX_HTML, encoding="utf-8")

    for filename, fields in DETAILS.items():
        (root / filename).write_text(
            build_detail_html(fields),
            encoding="utf-8",
        )

main.py

from __future__ import annotations

import argparse
import contextlib
import functools
import http.server
import logging
import socket
import threading
from datetime import datetime, timezone
from pathlib import Path

from agri_monitor.config import CrawlConfig, DEMO_SITE_DIR, OUTPUT_DIR
from agri_monitor.demo_site import write_demo_site
from agri_monitor.fetcher import Fetcher
from agri_monitor.models import CrawlResult
from agri_monitor.parser import AgriStationParser
from agri_monitor.storage import SQLiteStorage


def now_iso() -> str:
    return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")


def find_free_port() -> int:
    with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
        sock.bind(("127.0.0.1", 0))
        return int(sock.getsockname()[1])


def start_demo_server(directory: Path) -> tuple[http.server.ThreadingHTTPServer, str]:
    port = find_free_port()

    handler = functools.partial(
        http.server.SimpleHTTPRequestHandler,
        directory=str(directory),
    )

    server = http.server.ThreadingHTTPServer(("127.0.0.1", port), handler)

    thread = threading.Thread(
        target=server.serve_forever,
        daemon=True,
    )
    thread.start()

    return server, f"http://127.0.0.1:{port}"


def run_crawl(config: CrawlConfig) -> CrawlResult:
    fetcher = Fetcher(
        user_agent=config.user_agent,
        timeout=config.timeout,
        referer=config.referer or config.base_url,
        sleep_seconds=config.sleep_seconds,
    )

    parser = AgriStationParser()
    storage = SQLiteStorage(config.sqlite_path)
    storage.init_schema()

    result = CrawlResult()

    try:
        logging.info("fetch list: %s", config.list_url)

        list_html = fetcher.get(config.list_url)
        detail_urls = parser.parse_list(list_html, config.list_url)

        logging.info("found detail urls: %d", len(detail_urls))

        for url in detail_urls:
            try:
                html = fetcher.get(url)
                record = parser.parse_detail(html, url, now_iso())

                action = storage.upsert_station(record)

                if action == "created":
                    result.created += 1
                elif action == "updated":
                    result.updated += 1
                else:
                    result.unchanged += 1

                logging.info("%-9s %s", action, record.station_name)

            except Exception as exc:
                result.failed += 1
                logging.exception("failed detail url=%s error=%s", url, exc)

        storage.export_csv(config.csv_path)

        logging.info("export csv: %s", config.csv_path)

        for row in storage.preview(limit=5):
            logging.info("preview: %s", row)

        return result

    finally:
        storage.close()


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="农业试验站公开名录监控爬虫")

    parser.add_argument(
        "--base-url",
        default="",
        help="目标站点根地址,例如 https://example.org/agri",
    )
    parser.add_argument(
        "--list-path",
        default="/index.html",
        help="列表页路径",
    )
    parser.add_argument(
        "--demo",
        action="store_true",
        help="生成并启动本地样例站点,便于无外网环境跑通",
    )
    parser.add_argument(
        "--db",
        default=str(OUTPUT_DIR / "agri_stations.sqlite"),
        help="SQLite 输出路径",
    )
    parser.add_argument(
        "--csv",
        default=str(OUTPUT_DIR / "agri_stations.csv"),
        help="CSV 输出路径",
    )
    parser.add_argument(
        "--sleep",
        type=float,
        default=0.6,
        help="请求间隔秒数",
    )

    return parser.parse_args()


def main() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s %(levelname)s %(message)s",
    )

    args = parse_args()

    demo_server = None

    if args.demo:
        write_demo_site(DEMO_SITE_DIR)
        demo_server, base_url = start_demo_server(DEMO_SITE_DIR)
        list_path = "/index.html"
        logging.info("demo server started: %s", base_url)
    else:
        if not args.base_url:
            raise SystemExit("请提供 --base-url,或使用 --demo 运行本地样例。")

        base_url = args.base_url
        list_path = args.list_path

    try:
        config = CrawlConfig(
            base_url=base_url,
            list_path=list_path,
            sleep_seconds=args.sleep,
            sqlite_path=Path(args.db),
            csv_path=Path(args.csv),
        )

        result = run_crawl(config)

        logging.info(
            "done: created=%d updated=%d unchanged=%d failed=%d",
            result.created,
            result.updated,
            result.unchanged,
            result.failed,
        )

    finally:
        if demo_server:
            demo_server.shutdown()


if __name__ == "__main__":
    main()

最小运行命令

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python main.py --demo

Windows PowerShell:

python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -r requirements.txt
python main.py --demo

运行完成后查看:

data/output/agri_stations.sqlite
data/output/agri_stations.csv

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐