Python 实战:农业试验站公开名录监控系统,从采集、解析到三表模型入库
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
0️⃣ 前言(Preface)
这篇文章要做的事情很明确:用 Python、Requests、BeautifulSoup 和 SQLite,采集农业试验站公开名录中的站点名、试验方向、地区、主管单位、状态等字段,并把它整理成适合长期监控的“机构 / 站点 / 研究方向”数据模型。
读完本文,你可以获得:
- 一套可直接运行的公开名录采集项目骨架,不是只贴几行选择器的玩具脚本。
- 一个适合机构类、站点类、研究方向类信息沉淀的 SQLite 三表模型设计思路。
- 一套从请求、解析、清洗、去重、导出到后续监控的完整工程化做法。
这类项目不追求炫技,重点是稳定、可维护、能复跑。公开名录采集看起来简单,真正落地时麻烦通常不在“请求一个页面”,而在字段不齐、页面结构变化、重复数据、历史状态变化和后续自动化监控。
1️⃣ 摘要(Abstract)
本文将基于 Python 编写一个农业试验站公开名录监控爬虫,使用 requests 完成页面请求,使用 BeautifulSoup + lxml 完成 HTML 解析,最终将站点名、试验方向、地区、主管单位、状态等字段写入 SQLite,并导出 CSV 结果文件。
读完本文,你会掌握:
- 如何把“公开名录列表页 → 详情页 → 结构化字段”拆成稳定的采集流程。
- 如何设计“主管单位表、研究方向表、站点表”的三表模型。
- 如何处理常见的 403、429、空壳 HTML、乱码、字段缺失和重复抓取问题。
为了保证可复现,本文没有绑定某一个具体网站。真实项目中你可能面对的是农业科研机构官网、地方农业技术推广中心官网、试验站公示页面或公开名录平台。本文采用一个本地 demo 站点来模拟真实列表页和详情页,代码运行方式与真实 HTTP 页面一致,因此后续替换真实公开站点时,不需要推翻项目结构。
2️⃣ 背景与需求(Why)
农业试验站、科研基地、长期定位试验点、区域技术服务站等信息,往往分散在不同单位的公开网页里。单页查看没有问题,但如果我们想做持续分析和变化监控,人工维护 Excel 很快就会变得吃力。
比如,一个数据团队可能会关心这些问题:
- 某个地区有哪些农业试验站?
- 不同试验方向的站点分布是否均衡?
- 某些站点的运行状态是否从“运行中”变为“维护中”?
- 某个主管单位名下到底有哪些公开站点?
- 后续是否可以把这些站点接入地图、检索系统或内部知识库?
这就是爬虫和结构化存储的价值:把散落在网页上的半结构化信息,变成可以查询、去重、比对和长期维护的数据资产。
2.1 为什么要爬
这里的“爬”不是为了做高并发抓取,也不是为了绕过限制获取非公开数据。它更接近于信息整理和自动化监控。
具体价值有三个:
第一,降低人工整理成本。
公开名录一旦有几十页、几百条,人工复制字段就很容易漏、错、重复。
第二,提高更新感知能力。
如果每周或每月定时采集一次,就可以判断某些站点是否新增、状态是否变化、主管单位是否调整。
第三,方便数据分析。
当站点、主管单位、研究方向分表以后,可以很自然地做地区统计、方向统计、机构关联分析。
2.2 目标字段
本文的目标字段如下:
| 字段 | 含义 | 示例 |
|---|---|---|
| 站点名 | 农业试验站或试验基地名称 | 华北旱作农业试验站 |
| 试验方向 | 主要研究、试验或示范方向 | 旱作节水与小麦玉米轮作 |
| 地区 | 所在省市或区域 | 河北省石家庄市 |
| 主管单位 | 管理、依托或主管机构 | 某农业科学院作物研究所 |
| 状态 | 当前公开展示的运行状态 | 运行中 / 维护中 / 暂停 |
真实网站中字段名称不一定完全一致。比如“试验方向”可能写成“研究方向”,“主管单位”可能写成“依托单位”,“状态”可能写成“运行状态”。所以解析层不能写死一种情况,需要做字段别名兼容。
2.3 数据模型目标
这篇文章的实战重点是“机构 / 站点 / 研究方向”三表模型。
起步模型如下:
authorities:主管单位表。research_directions:研究方向表。stations:站点表。
站点表通过外键关联主管单位和研究方向。
如果一个站点未来对应多个研究方向,可以扩展为四表模型,即增加 station_direction_relations 关系表。但这篇文章先以“一站点对应一个主方向”的三表模型起步,便于落地。
3️⃣ 合规与注意事项(必写)
爬虫文章必须先把边界说清楚。技术本身没有问题,但采集方式、频率和目标范围要克制。
3.1 robots.txt 基本说明
在接入真实网站前,建议先访问目标站点根目录下的 robots.txt,例如:
https://example.org/robots.txt
robots.txt 是网站给自动化访问程序的爬取建议。它通常会说明哪些路径允许访问、哪些路径不建议或不允许自动化抓取。
需要注意:
robots.txt不是登录授权,也不代表可以采集所有内容。- 即使某个路径没有被禁止,也要控制访问频率。
- 如果网站明确禁止自动化访问某些目录,就不要抓取这些目录。
本文示例只采集公开 HTML 页面,不涉及登录态、验证码、付费墙或非公开接口。
3.2 频率控制
很多公开名录页面本身访问量并不大,没必要用攻击式并发。
建议:
- 单线程起步。
- 每次请求间隔 0.5 到 2 秒。
- 遇到 429、503 等响应时暂停或退避。
- 不对同一个站点进行短时间高频扫描。
- 定时任务按天、周、月运行,而不是按秒运行。
本文代码默认每次请求间隔 0.6 秒,并且在失败时使用指数退避重试。
3.3 不采集敏感信息
本文目标字段只有站点名、试验方向、地区、主管单位、状态,均属于公开目录类字段。
不建议采集:
- 个人身份证、手机号、邮箱等个人敏感信息。
- 登录后才能看到的数据。
- 明确需要付费或授权才能访问的数据。
- 通过绕过验证码、破解接口参数、伪装登录态等方式获取的数据。
我们把项目定位为公开信息整理和监控,不做越界采集。
3.4 不绕过付费或登录限制
如果目标站点需要登录,或者明确有权限控制,本文方案不应该用于绕过限制。
中性的做法是:
- 使用公开页面。
- 使用公开接口。
- 如需使用内部系统,获得授权后再接入。
- 不破解、不撞库、不绕过验证码、不批量模拟异常行为。
爬虫能不能长期运行,不只取决于技术,更取决于访问方式是否稳定、合理、有边界。
4️⃣ 技术选型与整体流程(What / How)
4.1 静态、动态还是 API
常见名录页面大概有三种:
第一种是静态 HTML。
列表页直接返回完整 HTML,详情页也是 HTML。字段存在于表格、列表或段落中。
第二种是动态渲染页面。
浏览器能看到数据,但 requests.get() 拿到的是空壳,数据由 JavaScript 后续加载。
第三种是接口返回 JSON。
页面通过接口拿数据,接口返回结构化 JSON。
本文采用第一种:静态或半结构化 HTML 页面。
原因很简单:很多公开名录、公示页面、单位官网栏目仍然是普通 HTML。对于这种场景,requests + BeautifulSoup + lxml 足够稳定,维护成本也比 Playwright 低。
如果你拿到的页面是动态渲染,可以先在浏览器开发者工具里看 Network 面板,确认是否存在 JSON 接口。如果有接口,优先抓接口;如果没有稳定接口,再考虑 Playwright。
4.2 整体流程
本文项目流程如下:
采集 Fetch
↓
解析 Parse
↓
清洗 Normalize
↓
去重 Deduplicate
↓
存储 Storage
↓
导出 Export
↓
监控 Monitor
更细一点:
列表页请求
↓
提取详情页 URL
↓
详情页请求
↓
抽取:站点名、试验方向、地区、主管单位、状态
↓
字段清洗:去空格、字段别名统一、缺失值兜底
↓
计算 content_hash
↓
SQLite 三表入库
↓
判断新增 / 更新 / 未变化
↓
导出 CSV
4.3 为什么不用一上来就 Scrapy
Scrapy 很强,尤其适合大规模、多站点、复杂调度和中间件管理。但这篇文章的目标是建立一个干净、易读、可复现的项目模板。对于几十页、几百页的公开名录监控,requests + bs4 足够。
后续如果你要做多站点批量采集,再把 Fetcher、Parser、Storage 抽象迁移到 Scrapy 也不难。
4.4 为什么不用 Playwright
Playwright 适合处理强动态页面、前端渲染页面、需要浏览器环境的页面。但它的成本更高:
- 运行更慢。
- 环境更重。
- 容器化部署更麻烦。
- 对简单 HTML 页面属于“杀鸡用牛刀”。
所以本文先选择轻量路线。判断页面确实是动态渲染后,再升级到 Playwright。
5️⃣ 环境准备与依赖安装(可复现)
5.1 Python 版本
推荐使用:
Python 3.10+
我个人习惯用 Python 3.11 或 3.12。本文代码没有使用特别新的语法,Python 3.10 以上都比较稳。
5.2 创建虚拟环境
mkdir agri_station_monitor
cd agri_station_monitor
python -m venv .venv
# macOS / Linux
source .venv/bin/activate
# Windows PowerShell
.venv\Scripts\Activate.ps1
5.3 安装依赖
新建 requirements.txt:
requests==2.32.3
beautifulsoup4==4.12.3
lxml==5.3.0
tenacity==9.0.0
python-dateutil==2.9.0.post0
安装:
pip install -r requirements.txt
这里几个依赖的作用:
| 依赖 | 作用 |
|---|---|
| requests | HTTP 请求 |
| beautifulsoup4 | HTML 解析 |
| lxml | 更快、更稳定的 HTML 解析器 |
| tenacity | 重试和退避 |
| python-dateutil | 时间处理扩展,本文保留为后续扩展 |
5.4 推荐项目结构
agri_station_monitor/
├── main.py
├── requirements.txt
├── agri_monitor/
│ ├── __init__.py
│ ├── config.py
│ ├── demo_site.py
│ ├── fetcher.py
│ ├── models.py
│ ├── parser.py
│ ├── storage.py
│ └── utils.py
└── data/
├── demo_site/
│ ├── index.html
│ ├── detail_001.html
│ ├── detail_002.html
│ ├── detail_003.html
│ └── detail_004.html
└── output/
├── agri_stations.sqlite
└── agri_stations.csv
这个结构有几个好处:
- 请求层、解析层、存储层分离。
- 后续更换目标站点时,只需要改 Parser 或配置。
- 数据输出统一放到
data/output/。 - demo 页面自动生成,不依赖外网。
6️⃣ 核心实现:请求层(Fetcher)
请求层的职责很单纯:给一个 URL,返回页面 HTML。
但真正写的时候,不能只写:
requests.get(url).text
这类写法用于临时验证可以,长期任务会遇到很多问题:
- 没有 User-Agent,容易被拒绝。
- 没有 timeout,网络卡住时程序一直挂着。
- 没有 session,不能复用连接。
- 没有重试,偶发 503 就失败。
- 没有频率控制,容易触发限制。
6.1 Fetcher 设计重点
本文请求层包含以下设计:
headers:设置 UA、Accept、Accept-Language、Referer。timeout:每次请求设置超时时间。session:复用 TCP 连接,也方便后续处理 cookie。retry:遇到 403、429、5xx 等状态时重试。sleep_seconds:请求间隔,避免短时间高频访问。
6.2 完整代码:agri_monitor/fetcher.py
from __future__ import annotations
import time
from typing import Optional
import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
class FetchError(RuntimeError):
pass
class Fetcher:
def __init__(
self,
user_agent: str,
timeout: int = 10,
referer: Optional[str] = None,
sleep_seconds: float = 0.6,
) -> None:
self.timeout = timeout
self.sleep_seconds = sleep_seconds
self.session = requests.Session()
self.session.headers.update({
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
})
if referer:
self.session.headers.update({"Referer": referer})
@retry(
retry=retry_if_exception_type((requests.RequestException, FetchError)),
wait=wait_exponential(multiplier=1, min=1, max=8),
stop=stop_after_attempt(3),
reraise=True,
)
def get(self, url: str) -> str:
time.sleep(self.sleep_seconds)
response = self.session.get(url, timeout=self.timeout)
if response.status_code in {403, 429, 500, 502, 503, 504}:
raise FetchError(f"temporary or restricted status: {response.status_code}, url={url}")
response.raise_for_status()
# 一些老站点可能没有明确声明 UTF-8。
# apparent_encoding 不完美,但比完全依赖响应头更稳。
response.encoding = response.apparent_encoding or response.encoding
return response.text
6.3 关于 headers
User-Agent 不建议伪装得太夸张。比如把自己伪装成最新版 Chrome,不一定更好。对于合规采集,更建议使用明确、温和的 UA:
AgriStationMonitor/1.0 (+research-contact@example.com)
如果是真实单位内部项目,可以写成:
YourOrgDataCollector/1.0 (+contact@example.org)
这样对方运维看到日志时也容易判断访问来源。
6.4 关于 timeout
timeout=10 是一个相对保守的起步值。公网环境不稳定时可以调到 15 或 20,但不建议完全不设置。
6.5 关于 session / cookie
本文 demo 不需要 cookie。
如果目标站点的公开页面需要设置一次 cookie 才能访问,也可以用 requests.Session() 保存。注意这里说的是公开页面常规 cookie,不包括绕过登录限制。
6.6 关于失败处理
本文用 tenacity 做指数退避:
wait_exponential(multiplier=1, min=1, max=8)
大概意思是失败后不要马上疯狂重试,而是逐步拉长等待时间。很多临时错误过几秒就恢复了,温和重试比暴力重试更适合长期任务。
7️⃣ 核心实现:解析层(Parser)
解析层是这个项目最容易变脏的地方。不同站点的 HTML 写法差异很大:
有的详情页是表格:
<tr>
<th>站点名</th>
<td>华北旱作农业试验站</td>
</tr>
有的是定义列表:
<dt>站点名</dt>
<dd>华北旱作农业试验站</dd>
还有的把字段写在段落里,需要正则或更复杂的规则。
本文先覆盖两类最常见结构:
- 表格结构:
tr > th/td - 定义列表结构:
dt + dd
7.1 列表页如何拿详情链接
列表页通常有多个详情链接:
<a class="station-link" href="detail_001.html">查看</a>
解析时要注意:
- 相对链接转绝对链接。
- 去重。
- 过滤空链接。
- 不要把无关导航链接当成详情链接。
本文选择器:
a.station-link, .station-list a, table a
真实项目中建议优先使用更稳定的选择器,比如固定栏目容器下的链接:
#content .station-list a
或者根据 URL 特征过滤:
if "/station/detail/" in href:
...
7.2 详情页如何抽字段
详情页解析要做字段别名:
FIELD_ALIASES = {
"站点名": "station_name",
"站点名称": "station_name",
"名称": "station_name",
"试验方向": "direction_name",
"研究方向": "direction_name",
"方向": "direction_name",
"地区": "region",
"所在地区": "region",
"区域": "region",
"主管单位": "authority_name",
"管理单位": "authority_name",
"依托单位": "authority_name",
"状态": "status",
"运行状态": "status",
}
这一步很关键。真实名录页面里字段名并不总是统一的。如果不做别名兼容,换一个栏目就可能解析不到。
7.3 缺失字段怎么办
缺失字段不要让程序直接崩掉。
本文策略:
- 字段缺失时填
"未知"。 - 站点名缺失时,用页面标题或
h1兜底。 - 解析失败的详情页记录日志,不影响其他页面继续采集。
为什么不用空字符串?
因为空字符串在后续分析里很容易被忽略,而 "未知" 更适合做质量检查。比如你可以直接统计:
SELECT COUNT(*) FROM stations WHERE region = '未知';
7.4 完整代码:agri_monitor/parser.py
from __future__ import annotations
from bs4 import BeautifulSoup
from .models import StationRecord
from .utils import absolute_url, build_hash, clean_text
FIELD_ALIASES = {
"站点名": "station_name",
"站点名称": "station_name",
"名称": "station_name",
"试验方向": "direction_name",
"研究方向": "direction_name",
"方向": "direction_name",
"地区": "region",
"所在地区": "region",
"区域": "region",
"主管单位": "authority_name",
"管理单位": "authority_name",
"依托单位": "authority_name",
"状态": "status",
"运行状态": "status",
}
class AgriStationParser:
def parse_list(self, html: str, list_url: str) -> list[str]:
soup = BeautifulSoup(html, "lxml")
links: list[str] = []
for a_tag in soup.select("a.station-link, .station-list a, table a"):
href = a_tag.get("href")
if not href:
continue
links.append(absolute_url(list_url, href))
# dict.fromkeys 可以在保留顺序的同时去重。
return list(dict.fromkeys(links))
def parse_detail(self, html: str, url: str, crawled_at: str) -> StationRecord:
soup = BeautifulSoup(html, "lxml")
data = {
"station_name": "未知",
"direction_name": "未知",
"region": "未知",
"authority_name": "未知",
"status": "未知",
}
# 兼容“字段-值”表格结构:<tr><th>站点名</th><td>...</td></tr>
for row in soup.select("tr"):
cells = row.find_all(["th", "td"])
if len(cells) < 2:
continue
key = clean_text(cells[0].get_text(" "))
value = clean_text(cells[1].get_text(" "))
normalized_key = FIELD_ALIASES.get(key)
if normalized_key:
data[normalized_key] = value
# 兼容 definition list:<dt>站点名</dt><dd>...</dd>
for dt in soup.select("dt"):
key = clean_text(dt.get_text(" "))
dd = dt.find_next_sibling("dd")
normalized_key = FIELD_ALIASES.get(key)
if normalized_key and dd:
data[normalized_key] = clean_text(dd.get_text(" "))
# 页面标题兜底,避免详情页没有“站点名”时完全丢失。
if data["station_name"] == "未知":
h1 = soup.select_one("h1, .title, .station-title")
if h1:
data["station_name"] = clean_text(h1.get_text(" "))
content_hash = build_hash(
data["station_name"],
data["direction_name"],
data["region"],
data["authority_name"],
data["status"],
)
return StationRecord(
source_url=url,
station_name=data["station_name"],
direction_name=data["direction_name"],
region=data["region"],
authority_name=data["authority_name"],
status=data["status"],
content_hash=content_hash,
crawled_at=crawled_at,
)
8️⃣ 数据存储与导出(Storage)
本文使用 SQLite 起步。
原因很实际:
- 不需要安装 MySQL。
- 单机任务足够用。
- 方便导出 CSV。
- 适合小型监控任务。
- 后续迁移 MySQL 或 PostgreSQL 成本不高。
8.1 字段映射表
| 中文字段 | 程序字段 | 数据类型 | 示例值 |
|---|---|---|---|
| 站点名 | station_name | TEXT | 华北旱作农业试验站 |
| 试验方向 | direction_name | TEXT | 旱作节水与小麦玉米轮作 |
| 地区 | region | TEXT | 河北省石家庄市 |
| 主管单位 | authority_name | TEXT | 某农业科学院作物研究所 |
| 状态 | status | TEXT | 运行中 |
| 来源链接 | source_url | TEXT | http://127.0.0.1:8000/detail_001.html |
| 内容哈希 | content_hash | TEXT | sha256 字符串 |
| 首次发现时间 | first_seen_at | TEXT | 2026-06-10T14:00:00+08:00 |
| 最近采集时间 | last_seen_at | TEXT | 2026-06-10T14:00:00+08:00 |
8.2 三表模型
核心表:
authorities
research_directions
stations
关系:
authorities.id <── stations.authority_id
research_directions.id <── stations.direction_id
也就是:
一个主管单位 -> 多个站点
一个研究方向 -> 多个站点
一个站点 -> 一个主管单位、一个主研究方向
真实业务里,如果一个站点有多个方向,可以这样扩展:
stations
research_directions
station_direction_relations
不过起步不建议过度设计。先保证字段稳定入库,再看数据复杂度决定是否拆关系表。
8.3 去重策略
本文采用两层去重。
第一层:URL 唯一。
source_url TEXT NOT NULL UNIQUE
同一个详情页不会重复插入。
第二层:内容 hash。
content_hash = sha256(站点名 + 试验方向 + 地区 + 主管单位 + 状态)
如果同一个 URL 的字段内容发生变化,就认为这条站点信息更新了。
这样可以判断:
- 新增:URL 不存在。
- 未变化:URL 存在,hash 一样。
- 更新:URL 存在,hash 不一样。
8.4 完整代码:agri_monitor/storage.py
from __future__ import annotations
import csv
import sqlite3
from pathlib import Path
from .models import StationRecord
class SQLiteStorage:
def __init__(self, sqlite_path: Path) -> None:
self.sqlite_path = sqlite_path
self.sqlite_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(self.sqlite_path)
self.conn.row_factory = sqlite3.Row
def init_schema(self) -> None:
self.conn.executescript(
"""
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS authorities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS research_directions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS stations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
station_name TEXT NOT NULL,
region TEXT NOT NULL,
status TEXT NOT NULL,
source_url TEXT NOT NULL UNIQUE,
authority_id INTEGER NOT NULL,
direction_id INTEGER NOT NULL,
content_hash TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(authority_id) REFERENCES authorities(id),
FOREIGN KEY(direction_id) REFERENCES research_directions(id)
);
CREATE TABLE IF NOT EXISTS change_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
station_id INTEGER NOT NULL,
old_hash TEXT NOT NULL,
new_hash TEXT NOT NULL,
changed_at TEXT NOT NULL,
FOREIGN KEY(station_id) REFERENCES stations(id)
);
"""
)
self.conn.commit()
def _get_or_create_id(self, table: str, name: str, now: str) -> int:
row = self.conn.execute(
f"SELECT id FROM {table} WHERE name = ?",
(name,),
).fetchone()
if row:
return int(row["id"])
cursor = self.conn.execute(
f"INSERT INTO {table}(name, created_at) VALUES (?, ?)",
(name, now),
)
return int(cursor.lastrowid)
def upsert_station(self, record: StationRecord) -> str:
authority_id = self._get_or_create_id(
"authorities",
record.authority_name,
record.crawled_at,
)
direction_id = self._get_or_create_id(
"research_directions",
record.direction_name,
record.crawled_at,
)
row = self.conn.execute(
"SELECT id, content_hash FROM stations WHERE source_url = ?",
(record.source_url,),
).fetchone()
if not row:
self.conn.execute(
"""
INSERT INTO stations(
station_name,
region,
status,
source_url,
authority_id,
direction_id,
content_hash,
first_seen_at,
last_seen_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
record.station_name,
record.region,
record.status,
record.source_url,
authority_id,
direction_id,
record.content_hash,
record.crawled_at,
record.crawled_at,
record.crawled_at,
),
)
self.conn.commit()
return "created"
station_id = int(row["id"])
old_hash = str(row["content_hash"])
if old_hash == record.content_hash:
self.conn.execute(
"UPDATE stations SET last_seen_at = ? WHERE id = ?",
(record.crawled_at, station_id),
)
self.conn.commit()
return "unchanged"
self.conn.execute(
"""
UPDATE stations
SET station_name = ?,
region = ?,
status = ?,
authority_id = ?,
direction_id = ?,
content_hash = ?,
last_seen_at = ?,
updated_at = ?
WHERE id = ?
""",
(
record.station_name,
record.region,
record.status,
authority_id,
direction_id,
record.content_hash,
record.crawled_at,
record.crawled_at,
station_id,
),
)
self.conn.execute(
"""
INSERT INTO change_events(
station_id,
old_hash,
new_hash,
changed_at
)
VALUES (?, ?, ?, ?)
""",
(
station_id,
old_hash,
record.content_hash,
record.crawled_at,
),
)
self.conn.commit()
return "updated"
def export_csv(self, csv_path: Path) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
rows = self.conn.execute(
"""
SELECT
s.station_name AS 站点名,
d.name AS 试验方向,
s.region AS 地区,
a.name AS 主管单位,
s.status AS 状态,
s.source_url AS 来源链接,
s.last_seen_at AS 最近采集时间
FROM stations s
JOIN authorities a ON s.authority_id = a.id
JOIN research_directions d ON s.direction_id = d.id
ORDER BY s.id ASC
"""
).fetchall()
with csv_path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow([
"站点名",
"试验方向",
"地区",
"主管单位",
"状态",
"来源链接",
"最近采集时间",
])
for row in rows:
writer.writerow([row[key] for key in row.keys()])
def preview(self, limit: int = 5) -> list[dict[str, str]]:
rows = self.conn.execute(
"""
SELECT
s.station_name,
d.name AS direction_name,
s.region,
a.name AS authority_name,
s.status
FROM stations s
JOIN authorities a ON s.authority_id = a.id
JOIN research_directions d ON s.direction_id = d.id
ORDER BY s.id ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(row) for row in rows]
def close(self) -> None:
self.conn.close()
8.5 为什么 CSV 使用 utf-8-sig
很多人会遇到一个小问题:CSV 用 UTF-8 写出后,Excel 打开中文乱码。
解决办法之一是使用:
encoding="utf-8-sig"
这样 Excel 识别起来更友好。
9️⃣ 运行方式与结果展示(必写)
为了让项目能在没有外网、没有真实目标站点的情况下跑通,本文代码内置一个 demo 站点生成器。运行时会自动创建本地 HTML 页面,并启动一个本地 HTTP 服务。
9.1 配置文件:agri_monitor/config.py
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = PROJECT_ROOT / "data"
DEMO_SITE_DIR = DATA_DIR / "demo_site"
OUTPUT_DIR = DATA_DIR / "output"
@dataclass(frozen=True)
class CrawlConfig:
base_url: str
list_path: str = "/index.html"
timeout: int = 10
sleep_seconds: float = 0.6
max_retries: int = 3
user_agent: str = "AgriStationMonitor/1.0 (+research-contact@example.com)"
referer: str | None = None
sqlite_path: Path = OUTPUT_DIR / "agri_stations.sqlite"
csv_path: Path = OUTPUT_DIR / "agri_stations.csv"
@property
def list_url(self) -> str:
return self.base_url.rstrip("/") + self.list_path
9.2 数据模型:agri_monitor/models.py
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class StationRecord:
source_url: str
station_name: str
direction_name: str
region: str
authority_name: str
status: str
content_hash: str
crawled_at: str
@dataclass
class CrawlResult:
created: int = 0
updated: int = 0
unchanged: int = 0
failed: int = 0
9.3 工具函数:agri_monitor/utils.py
from __future__ import annotations
import hashlib
import re
from urllib.parse import urljoin
def clean_text(value: str | None) -> str:
if not value:
return "未知"
text = re.sub(r"\s+", " ", value).strip()
return text if text else "未知"
def absolute_url(base_url: str, href: str) -> str:
return urljoin(base_url, href)
def build_hash(*parts: str) -> str:
raw = "||".join(clean_text(p) for p in parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
9.4 本地 demo 站点:agri_monitor/demo_site.py
这部分代码用于生成模拟页面。它的作用不是造假数据,而是让项目在任何环境下都能跑通。
真实接站点时,可以不使用 --demo。
from __future__ import annotations
from pathlib import Path
INDEX_HTML = """<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>农业试验站公开名录</title>
</head>
<body>
<h1>农业试验站公开名录</h1>
<table class="station-list">
<tr>
<th>站点名</th>
<th>地区</th>
<th>详情</th>
</tr>
<tr>
<td>华北旱作农业试验站</td>
<td>河北省</td>
<td><a class="station-link" href="detail_001.html">查看</a></td>
</tr>
<tr>
<td>江南水稻生态试验站</td>
<td>江苏省</td>
<td><a class="station-link" href="detail_002.html">查看</a></td>
</tr>
<tr>
<td>西南山地土壤改良试验站</td>
<td>四川省</td>
<td><a class="station-link" href="detail_003.html">查看</a></td>
</tr>
<tr>
<td>东北黑土地保护试验站</td>
<td>黑龙江省</td>
<td><a class="station-link" href="detail_004.html">查看</a></td>
</tr>
</table>
</body>
</html>
"""
DETAILS = {
"detail_001.html": {
"站点名": "华北旱作农业试验站",
"试验方向": "旱作节水与小麦玉米轮作",
"地区": "河北省石家庄市",
"主管单位": "某农业科学院作物研究所",
"状态": "运行中",
},
"detail_002.html": {
"站点名": "江南水稻生态试验站",
"试验方向": "水稻品种比较与稻田生态",
"地区": "江苏省扬州市",
"主管单位": "某省农业技术推广中心",
"状态": "运行中",
},
"detail_003.html": {
"站点名": "西南山地土壤改良试验站",
"试验方向": "坡耕地土壤改良与养分监测",
"地区": "四川省绵阳市",
"主管单位": "某农业大学资源环境学院",
"状态": "维护中",
},
"detail_004.html": {
"站点名": "东北黑土地保护试验站",
"试验方向": "黑土地保护性耕作与有机质提升",
"地区": "黑龙江省哈尔滨市",
"主管单位": "某黑土地保护研究中心",
"状态": "运行中",
},
}
def build_detail_html(fields: dict[str, str]) -> str:
rows = "\n".join(
f"<tr><th>{key}</th><td>{value}</td></tr>"
for key, value in fields.items()
)
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>{fields.get('站点名', '详情')}</title>
</head>
<body>
<h1 class="station-title">{fields.get('站点名', '详情')}</h1>
<table class="detail-table">
{rows}
</table>
</body>
</html>
"""
def write_demo_site(root: Path) -> None:
root.mkdir(parents=True, exist_ok=True)
(root / "index.html").write_text(INDEX_HTML, encoding="utf-8")
for filename, fields in DETAILS.items():
(root / filename).write_text(
build_detail_html(fields),
encoding="utf-8",
)
9.5 入口文件:main.py
from __future__ import annotations
import argparse
import contextlib
import functools
import http.server
import logging
import socket
import threading
from datetime import datetime, timezone
from pathlib import Path
from agri_monitor.config import CrawlConfig, DEMO_SITE_DIR, OUTPUT_DIR
from agri_monitor.demo_site import write_demo_site
from agri_monitor.fetcher import Fetcher
from agri_monitor.models import CrawlResult
from agri_monitor.parser import AgriStationParser
from agri_monitor.storage import SQLiteStorage
def now_iso() -> str:
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
def find_free_port() -> int:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def start_demo_server(directory: Path) -> tuple[http.server.ThreadingHTTPServer, str]:
port = find_free_port()
handler = functools.partial(
http.server.SimpleHTTPRequestHandler,
directory=str(directory),
)
server = http.server.ThreadingHTTPServer(("127.0.0.1", port), handler)
thread = threading.Thread(
target=server.serve_forever,
daemon=True,
)
thread.start()
return server, f"http://127.0.0.1:{port}"
def run_crawl(config: CrawlConfig) -> CrawlResult:
fetcher = Fetcher(
user_agent=config.user_agent,
timeout=config.timeout,
referer=config.referer or config.base_url,
sleep_seconds=config.sleep_seconds,
)
parser = AgriStationParser()
storage = SQLiteStorage(config.sqlite_path)
storage.init_schema()
result = CrawlResult()
try:
logging.info("fetch list: %s", config.list_url)
list_html = fetcher.get(config.list_url)
detail_urls = parser.parse_list(list_html, config.list_url)
logging.info("found detail urls: %d", len(detail_urls))
for url in detail_urls:
try:
html = fetcher.get(url)
record = parser.parse_detail(html, url, now_iso())
action = storage.upsert_station(record)
if action == "created":
result.created += 1
elif action == "updated":
result.updated += 1
else:
result.unchanged += 1
logging.info("%-9s %s", action, record.station_name)
except Exception as exc:
result.failed += 1
logging.exception("failed detail url=%s error=%s", url, exc)
storage.export_csv(config.csv_path)
logging.info("export csv: %s", config.csv_path)
for row in storage.preview(limit=5):
logging.info("preview: %s", row)
return result
finally:
storage.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="农业试验站公开名录监控爬虫")
parser.add_argument(
"--base-url",
default="",
help="目标站点根地址,例如 https://example.org/agri",
)
parser.add_argument(
"--list-path",
default="/index.html",
help="列表页路径",
)
parser.add_argument(
"--demo",
action="store_true",
help="生成并启动本地样例站点,便于无外网环境跑通",
)
parser.add_argument(
"--db",
default=str(OUTPUT_DIR / "agri_stations.sqlite"),
help="SQLite 输出路径",
)
parser.add_argument(
"--csv",
default=str(OUTPUT_DIR / "agri_stations.csv"),
help="CSV 输出路径",
)
parser.add_argument(
"--sleep",
type=float,
default=0.6,
help="请求间隔秒数",
)
return parser.parse_args()
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
args = parse_args()
demo_server = None
if args.demo:
write_demo_site(DEMO_SITE_DIR)
demo_server, base_url = start_demo_server(DEMO_SITE_DIR)
list_path = "/index.html"
logging.info("demo server started: %s", base_url)
else:
if not args.base_url:
raise SystemExit("请提供 --base-url,或使用 --demo 运行本地样例。")
base_url = args.base_url
list_path = args.list_path
try:
config = CrawlConfig(
base_url=base_url,
list_path=list_path,
sleep_seconds=args.sleep,
sqlite_path=Path(args.db),
csv_path=Path(args.csv),
)
result = run_crawl(config)
logging.info(
"done: created=%d updated=%d unchanged=%d failed=%d",
result.created,
result.updated,
result.unchanged,
result.failed,
)
finally:
if demo_server:
demo_server.shutdown()
if __name__ == "__main__":
main()
9.6 如何启动
先安装依赖:
pip install -r requirements.txt
运行本地 demo:
python main.py --demo
输出示例:
INFO demo server started: http://127.0.0.1:57693
INFO fetch list: http://127.0.0.1:57693/index.html
INFO found detail urls: 4
INFO created 华北旱作农业试验站
INFO created 江南水稻生态试验站
INFO created 西南山地土壤改良试验站
INFO created 东北黑土地保护试验站
INFO export csv: data/output/agri_stations.csv
INFO done: created=4 updated=0 unchanged=0 failed=0
第二次运行:
python main.py --demo
如果页面内容没变,会看到:
INFO unchanged 华北旱作农业试验站
INFO unchanged 江南水稻生态试验站
INFO unchanged 西南山地土壤改良试验站
INFO unchanged 东北黑土地保护试验站
INFO done: created=0 updated=0 unchanged=4 failed=0
这就是监控的基础能力:能够区分新增、更新和未变化。
9.7 输出在哪里
默认输出:
data/output/agri_stations.sqlite
data/output/agri_stations.csv
CSV 内容示例:
| 站点名 | 试验方向 | 地区 | 主管单位 | 状态 |
|---|---|---|---|---|
| 华北旱作农业试验站 | 旱作节水与小麦玉米轮作 | 河北省石家庄市 | 某农业科学院作物研究所 | 运行中 |
| 江南水稻生态试验站 | 水稻品种比较与稻田生态 | 江苏省扬州市 | 某省农业技术推广中心 | 运行中 |
| 西南山地土壤改良试验站 | 坡耕地土壤改良与养分监测 | 四川省绵阳市 | 某农业大学资源环境学院 | 维护中 |
| 东北黑土地保护试验站 | 黑土地保护性耕作与有机质提升 | 黑龙江省哈尔滨市 | 某黑土地保护研究中心 | 运行中 |
9.8 接真实公开站点
假设真实站点根地址为:
https://example.org/agri-stations
列表页路径为:
/list.html
运行:
python main.py \
--base-url "https://example.org/agri-stations" \
--list-path "/list.html" \
--sleep 1.5
如果真实页面结构不同,主要改两个地方:
第一,列表页详情链接选择器:
for a_tag in soup.select("a.station-link, .station-list a, table a"):
第二,详情页字段解析规则:
for row in soup.select("tr"):
如果详情页是 JSON 接口,就不需要 BeautifulSoup,直接 response.json() 后映射字段即可。
🔟 常见问题与排错(强烈建议写)
10.1 403 怎么办
403 表示服务器拒绝访问。原因可能很多:
- 没有设置 User-Agent。
- Referer 不符合预期。
- 目标站点不希望自动化访问。
- 访问频率异常。
- 路径本身不对外开放。
处理建议:
- 先用浏览器确认页面是否公开可访问。
- 检查 URL 是否正确。
- 设置合理 UA,不要使用空 UA。
- 降低访问频率。
- 如果页面明确限制自动访问,就不要继续采集。
不建议把 403 简单理解为“换代理就行”。在公开名录监控项目里,代理不是优先解法,合规和稳定才是优先项。
10.2 429 怎么办
429 通常表示请求过快。
处理建议:
- 增加
--sleep,比如调到 2 秒、5 秒。 - 减少并发。
- 遇到 429 时指数退避。
- 定时任务降低频率。
- 缓存已抓详情页,避免重复请求。
本文 Fetcher 已经把 429 放进重试状态码:
if response.status_code in {403, 429, 500, 502, 503, 504}:
raise FetchError(...)
但真实任务里,如果连续遇到 429,更好的做法是停止本轮任务,而不是继续硬抓。
10.3 HTML 抓到空壳怎么办
如果 requests.get() 拿到的 HTML 里没有数据,而浏览器里能看到数据,大概率是动态渲染。
排查方法:
- 打开浏览器开发者工具。
- 进入 Network 面板。
- 刷新页面。
- 搜索字段关键字,比如某个站点名。
- 看数据是 HTML 里直接返回,还是由 XHR / Fetch 接口返回。
如果找到了 JSON 接口,优先抓 JSON 接口。
如果没有稳定接口,可以考虑 Playwright:
from playwright.sync_api import sync_playwright
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle")
html = page.content()
browser.close()
但不要一开始就用 Playwright。能用静态请求解决的页面,静态请求更简单、更省资源。
10.4 解析报错怎么办
解析报错通常有几类:
第一,选择器失效。
网站改版后,原来的 .station-link 不存在了。
第二,字段结构变化。
原来是表格,现在变成段落。
第三,字段名变化。
原来叫“主管单位”,现在叫“依托机构”。
第四,页面混入无关链接。
列表页里把导航、附件、分页都当成了详情页。
解决办法:
- 给解析层加日志。
- 保存失败页面到本地排查。
- 增加字段别名。
- 增加 URL 过滤。
- 单独写解析单元测试。
例如可以临时保存失败页面:
from pathlib import Path
Path("debug_failed.html").write_text(html, encoding="utf-8")
然后用浏览器打开,重新确认结构。
10.5 编码乱码如何处理
老站点常见编码问题:
- 响应头写错。
- 页面 meta 声明不一致。
- 实际是 GBK,但程序按 UTF-8 解码。
本文使用:
response.encoding = response.apparent_encoding or response.encoding
如果你明确知道目标站点是 GBK,可以直接写:
response.encoding = "gbk"
CSV 导出建议:
encoding="utf-8-sig"
这样用 Excel 打开中文更稳。
10.6 SQLite 被锁怎么办
如果多进程同时写 SQLite,可能遇到 database is locked。
起步项目建议:
- 单进程写入。
- 不要多个任务同时跑同一个数据库。
- 大规模任务迁移到 MySQL 或 PostgreSQL。
- 写入时使用队列,把采集和写库分开。
10.7 数据重复怎么办
重复一般来自:
- 列表页重复链接。
- 同一详情页多个 URL 参数。
- HTTP 和 HTTPS 混用。
- URL 后面带 tracking 参数。
本文先用 source_url UNIQUE 去重。更严谨可以增加 URL 规范化:
from urllib.parse import urlsplit, urlunsplit, parse_qsl, urlencode
def normalize_url(url: str) -> str:
parts = urlsplit(url)
query_pairs = [
(k, v)
for k, v in parse_qsl(parts.query)
if not k.lower().startswith("utm_")
]
clean_query = urlencode(query_pairs)
return urlunsplit((
parts.scheme.lower(),
parts.netloc.lower(),
parts.path,
clean_query,
"",
))
也可以增加业务唯一键,比如:
站点名 + 地区 + 主管单位
但业务唯一键要谨慎,因为站点可能重名,主管单位名称也可能调整。
10.8 字段全是“未知”怎么办
字段全是“未知”,说明详情页解析没命中。
优先检查:
- 详情页 HTML 是否真的拿到了。
- 是否是动态渲染页面。
- 字段是否不是表格或 dt/dd 结构。
- 字段名是否不在
FIELD_ALIASES里。 - 是否页面有 iframe。
临时调试:
print(soup.get_text("\n")[:2000])
或者保存 HTML:
Path("debug.html").write_text(html, encoding="utf-8")
解析层不要靠猜,最好直接看页面结构。
1️⃣1️⃣ 进阶优化(可选但加分)
11.1 并发优化
本文是单线程,适合起步和温和监控。
如果真实数据量较大,可以考虑线程池:
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_one(url: str):
html = fetcher.get(url)
return parser.parse_detail(html, url, now_iso())
with ThreadPoolExecutor(max_workers=3) as executor:
futures = [executor.submit(fetch_one, url) for url in detail_urls]
for future in as_completed(futures):
record = future.result()
storage.upsert_station(record)
但我建议把 max_workers 控制在 2 到 5,不要为了快把公开站点打满。
如果你需要更强的并发控制,可以用 asyncio + httpx。如果要做多站点采集,Scrapy 更合适。
11.2 断点续跑
断点续跑的核心是:已经成功抓过的 URL,不要重复抓。
可以加一张表:
CREATE TABLE IF NOT EXISTS crawl_tasks (
url TEXT PRIMARY KEY,
status TEXT NOT NULL,
retry_count INTEGER NOT NULL DEFAULT 0,
updated_at TEXT NOT NULL
);
状态可以设计为:
pending
success
failed
skipped
执行时先把列表页详情链接写入任务表,再逐条消费。中断后下次从 pending 或 failed 继续。
小项目也可以简单一点:
seen_urls = set(
row["source_url"]
for row in conn.execute("SELECT source_url FROM stations")
)
不过这种方式只能跳过已入库的成功数据,不能很好记录失败任务。
11.3 日志与监控
长期任务至少要记录:
- 本轮发现多少详情页。
- 新增多少。
- 更新多少。
- 未变化多少。
- 失败多少。
- 失败 URL 是什么。
- 平均耗时是多少。
本文已经有基础日志:
logging.info(
"done: created=%d updated=%d unchanged=%d failed=%d",
result.created,
result.updated,
result.unchanged,
result.failed,
)
如果要更正式,可以增加 crawl_runs 表:
CREATE TABLE IF NOT EXISTS crawl_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
started_at TEXT NOT NULL,
finished_at TEXT,
total_urls INTEGER NOT NULL DEFAULT 0,
created_count INTEGER NOT NULL DEFAULT 0,
updated_count INTEGER NOT NULL DEFAULT 0,
unchanged_count INTEGER NOT NULL DEFAULT 0,
failed_count INTEGER NOT NULL DEFAULT 0
);
这样每次采集都能留下运行记录,后续做仪表盘也方便。
11.4 定时任务
Linux 下可以用 cron。
比如每天凌晨 2 点运行:
0 2 * * * cd /path/to/agri_station_monitor && /path/to/agri_station_monitor/.venv/bin/python main.py --base-url "https://example.org/agri-stations" --list-path "/list.html" --sleep 2 >> logs/crawl.log 2>&1
如果任务较多,建议用 Airflow、Prefect 或系统内的任务调度平台。
简单项目不要一开始就上复杂平台。先用 cron 跑稳,再决定是否升级。
11.5 数据质量检查
可以写几条 SQL 做质量检查:
检查缺失地区:
SELECT COUNT(*) AS missing_region
FROM stations
WHERE region = '未知';
检查状态分布:
SELECT status, COUNT(*) AS total
FROM stations
GROUP BY status
ORDER BY total DESC;
检查主管单位站点数:
SELECT a.name, COUNT(*) AS total
FROM stations s
JOIN authorities a ON s.authority_id = a.id
GROUP BY a.name
ORDER BY total DESC;
检查研究方向分布:
SELECT d.name, COUNT(*) AS total
FROM stations s
JOIN research_directions d ON s.direction_id = d.id
GROUP BY d.name
ORDER BY total DESC;
这些 SQL 对后续分析很有用。爬虫只是第一步,数据进入结构化表以后,价值才慢慢出来。
11.6 变化通知
如果 updated > 0 或 created > 0,可以发送通知。
简单方式是输出日志。进一步可以接入:
- 邮件。
- 企业微信机器人。
- 钉钉机器人。
- 飞书机器人。
- 内部告警系统。
通知内容建议简洁:
农业试验站名录监控完成:
新增 3 条,更新 1 条,失败 0 条。
输出文件:data/output/agri_stations.csv
不要把所有详情都塞进通知里。通知只负责提醒,详情仍然回到数据库或 CSV 查看。
11.7 从 SQLite 迁移到 MySQL
当数据量变大、多任务同时写入、多人查询时,可以迁移到 MySQL 或 PostgreSQL。
迁移时主要变化在 Storage 层。Fetcher 和 Parser 不需要改。
这就是分层设计的好处:
Fetcher 只管请求
Parser 只管解析
Storage 只管存储
换数据库时,不需要重写整个爬虫。
1️⃣2️⃣ 总结与延伸阅读
本文完成了一个农业试验站公开名录监控项目的完整落地过程。
我们不是简单写了一个 requests.get(),而是把一个可维护的小型采集系统拆成了几层:
- 请求层:负责 headers、timeout、session、重试和频控。
- 解析层:负责列表页链接提取、详情页字段抽取、字段别名兼容。
- 清洗层:负责空值兜底、文本整理、URL 规范处理。
- 存储层:负责 SQLite 三表模型、去重、更新检测、CSV 导出。
- 运行层:负责命令行参数、本地 demo 站点、日志输出。
这套设计适合很多公开名录类任务,不只适合农业试验站。比如:
- 科研平台公开名录。
- 实验室公开名录。
- 技术服务站公开名录。
- 示范基地公开名录。
- 企业公开备案目录。
- 行业协会会员单位目录。
后续可以继续扩展:
- 使用 Scrapy 管理多站点采集。
- 使用 Playwright 处理强动态页面。
- 增加任务表,实现断点续跑。
- 增加历史快照表,追踪字段变化。
- 增加地图坐标字段,做空间分布分析。
- 增加数据质量规则,自动发现异常字段。
- 增加定时任务和通知机制,实现真正的监控闭环。
我个人做这类名录项目时,最看重的不是首轮抓取速度,而是三个月后还能不能稳定复跑。页面会改、字段会缺、网络会抖、数据会重复,这些都很正常。爬虫写得稳不稳,往往就体现在这些不起眼的细节里。
最后再强调一次:公开名录采集要控制频率、尊重站点规则、只处理公开字段,不绕过登录或付费限制。技术分享的重点是自动化整理和数据治理,而不是对目标站点造成负担。
附录:完整项目代码汇总
下面把完整项目文件再集中放一遍,方便复制。
requirements.txt
requests==2.32.3
beautifulsoup4==4.12.3
lxml==5.3.0
tenacity==9.0.0
python-dateutil==2.9.0.post0
agri_monitor/init.py
# agri_monitor package
agri_monitor/config.py
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = PROJECT_ROOT / "data"
DEMO_SITE_DIR = DATA_DIR / "demo_site"
OUTPUT_DIR = DATA_DIR / "output"
@dataclass(frozen=True)
class CrawlConfig:
base_url: str
list_path: str = "/index.html"
timeout: int = 10
sleep_seconds: float = 0.6
max_retries: int = 3
user_agent: str = "AgriStationMonitor/1.0 (+research-contact@example.com)"
referer: str | None = None
sqlite_path: Path = OUTPUT_DIR / "agri_stations.sqlite"
csv_path: Path = OUTPUT_DIR / "agri_stations.csv"
@property
def list_url(self) -> str:
return self.base_url.rstrip("/") + self.list_path
agri_monitor/models.py
from __future__ import annotations
from dataclasses import dataclass
@dataclass
class StationRecord:
source_url: str
station_name: str
direction_name: str
region: str
authority_name: str
status: str
content_hash: str
crawled_at: str
@dataclass
class CrawlResult:
created: int = 0
updated: int = 0
unchanged: int = 0
failed: int = 0
agri_monitor/utils.py
from __future__ import annotations
import hashlib
import re
from urllib.parse import urljoin
def clean_text(value: str | None) -> str:
if not value:
return "未知"
text = re.sub(r"\s+", " ", value).strip()
return text if text else "未知"
def absolute_url(base_url: str, href: str) -> str:
return urljoin(base_url, href)
def build_hash(*parts: str) -> str:
raw = "||".join(clean_text(p) for p in parts)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
agri_monitor/fetcher.py
from __future__ import annotations
import time
from typing import Optional
import requests
from tenacity import retry, retry_if_exception_type, stop_after_attempt, wait_exponential
class FetchError(RuntimeError):
pass
class Fetcher:
def __init__(
self,
user_agent: str,
timeout: int = 10,
referer: Optional[str] = None,
sleep_seconds: float = 0.6,
) -> None:
self.timeout = timeout
self.sleep_seconds = sleep_seconds
self.session = requests.Session()
self.session.headers.update({
"User-Agent": user_agent,
"Accept": "text/html,application/xhtml+xml,application/xml;q=0.9,*/*;q=0.8",
"Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
"Connection": "keep-alive",
})
if referer:
self.session.headers.update({"Referer": referer})
@retry(
retry=retry_if_exception_type((requests.RequestException, FetchError)),
wait=wait_exponential(multiplier=1, min=1, max=8),
stop=stop_after_attempt(3),
reraise=True,
)
def get(self, url: str) -> str:
time.sleep(self.sleep_seconds)
response = self.session.get(url, timeout=self.timeout)
if response.status_code in {403, 429, 500, 502, 503, 504}:
raise FetchError(
f"temporary or restricted status: {response.status_code}, url={url}"
)
response.raise_for_status()
response.encoding = response.apparent_encoding or response.encoding
return response.text
agri_monitor/parser.py
from __future__ import annotations
from bs4 import BeautifulSoup
from .models import StationRecord
from .utils import absolute_url, build_hash, clean_text
FIELD_ALIASES = {
"站点名": "station_name",
"站点名称": "station_name",
"名称": "station_name",
"试验方向": "direction_name",
"研究方向": "direction_name",
"方向": "direction_name",
"地区": "region",
"所在地区": "region",
"区域": "region",
"主管单位": "authority_name",
"管理单位": "authority_name",
"依托单位": "authority_name",
"状态": "status",
"运行状态": "status",
}
class AgriStationParser:
def parse_list(self, html: str, list_url: str) -> list[str]:
soup = BeautifulSoup(html, "lxml")
links: list[str] = []
for a_tag in soup.select("a.station-link, .station-list a, table a"):
href = a_tag.get("href")
if not href:
continue
links.append(absolute_url(list_url, href))
return list(dict.fromkeys(links))
def parse_detail(self, html: str, url: str, crawled_at: str) -> StationRecord:
soup = BeautifulSoup(html, "lxml")
data = {
"station_name": "未知",
"direction_name": "未知",
"region": "未知",
"authority_name": "未知",
"status": "未知",
}
for row in soup.select("tr"):
cells = row.find_all(["th", "td"])
if len(cells) < 2:
continue
key = clean_text(cells[0].get_text(" "))
value = clean_text(cells[1].get_text(" "))
normalized_key = FIELD_ALIASES.get(key)
if normalized_key:
data[normalized_key] = value
for dt in soup.select("dt"):
key = clean_text(dt.get_text(" "))
dd = dt.find_next_sibling("dd")
normalized_key = FIELD_ALIASES.get(key)
if normalized_key and dd:
data[normalized_key] = clean_text(dd.get_text(" "))
if data["station_name"] == "未知":
h1 = soup.select_one("h1, .title, .station-title")
if h1:
data["station_name"] = clean_text(h1.get_text(" "))
content_hash = build_hash(
data["station_name"],
data["direction_name"],
data["region"],
data["authority_name"],
data["status"],
)
return StationRecord(
source_url=url,
station_name=data["station_name"],
direction_name=data["direction_name"],
region=data["region"],
authority_name=data["authority_name"],
status=data["status"],
content_hash=content_hash,
crawled_at=crawled_at,
)
agri_monitor/storage.py
from __future__ import annotations
import csv
import sqlite3
from pathlib import Path
from .models import StationRecord
class SQLiteStorage:
def __init__(self, sqlite_path: Path) -> None:
self.sqlite_path = sqlite_path
self.sqlite_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(self.sqlite_path)
self.conn.row_factory = sqlite3.Row
def init_schema(self) -> None:
self.conn.executescript(
"""
PRAGMA foreign_keys = ON;
CREATE TABLE IF NOT EXISTS authorities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS research_directions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT NOT NULL UNIQUE,
created_at TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS stations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
station_name TEXT NOT NULL,
region TEXT NOT NULL,
status TEXT NOT NULL,
source_url TEXT NOT NULL UNIQUE,
authority_id INTEGER NOT NULL,
direction_id INTEGER NOT NULL,
content_hash TEXT NOT NULL,
first_seen_at TEXT NOT NULL,
last_seen_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
FOREIGN KEY(authority_id) REFERENCES authorities(id),
FOREIGN KEY(direction_id) REFERENCES research_directions(id)
);
CREATE TABLE IF NOT EXISTS change_events (
id INTEGER PRIMARY KEY AUTOINCREMENT,
station_id INTEGER NOT NULL,
old_hash TEXT NOT NULL,
new_hash TEXT NOT NULL,
changed_at TEXT NOT NULL,
FOREIGN KEY(station_id) REFERENCES stations(id)
);
"""
)
self.conn.commit()
def _get_or_create_id(self, table: str, name: str, now: str) -> int:
row = self.conn.execute(
f"SELECT id FROM {table} WHERE name = ?",
(name,),
).fetchone()
if row:
return int(row["id"])
cursor = self.conn.execute(
f"INSERT INTO {table}(name, created_at) VALUES (?, ?)",
(name, now),
)
return int(cursor.lastrowid)
def upsert_station(self, record: StationRecord) -> str:
authority_id = self._get_or_create_id(
"authorities",
record.authority_name,
record.crawled_at,
)
direction_id = self._get_or_create_id(
"research_directions",
record.direction_name,
record.crawled_at,
)
row = self.conn.execute(
"SELECT id, content_hash FROM stations WHERE source_url = ?",
(record.source_url,),
).fetchone()
if not row:
self.conn.execute(
"""
INSERT INTO stations(
station_name,
region,
status,
source_url,
authority_id,
direction_id,
content_hash,
first_seen_at,
last_seen_at,
updated_at
) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
record.station_name,
record.region,
record.status,
record.source_url,
authority_id,
direction_id,
record.content_hash,
record.crawled_at,
record.crawled_at,
record.crawled_at,
),
)
self.conn.commit()
return "created"
station_id = int(row["id"])
old_hash = str(row["content_hash"])
if old_hash == record.content_hash:
self.conn.execute(
"UPDATE stations SET last_seen_at = ? WHERE id = ?",
(record.crawled_at, station_id),
)
self.conn.commit()
return "unchanged"
self.conn.execute(
"""
UPDATE stations
SET station_name = ?,
region = ?,
status = ?,
authority_id = ?,
direction_id = ?,
content_hash = ?,
last_seen_at = ?,
updated_at = ?
WHERE id = ?
""",
(
record.station_name,
record.region,
record.status,
authority_id,
direction_id,
record.content_hash,
record.crawled_at,
record.crawled_at,
station_id,
),
)
self.conn.execute(
"""
INSERT INTO change_events(
station_id,
old_hash,
new_hash,
changed_at
)
VALUES (?, ?, ?, ?)
""",
(
station_id,
old_hash,
record.content_hash,
record.crawled_at,
),
)
self.conn.commit()
return "updated"
def export_csv(self, csv_path: Path) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
rows = self.conn.execute(
"""
SELECT
s.station_name AS 站点名,
d.name AS 试验方向,
s.region AS 地区,
a.name AS 主管单位,
s.status AS 状态,
s.source_url AS 来源链接,
s.last_seen_at AS 最近采集时间
FROM stations s
JOIN authorities a ON s.authority_id = a.id
JOIN research_directions d ON s.direction_id = d.id
ORDER BY s.id ASC
"""
).fetchall()
with csv_path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.writer(f)
writer.writerow([
"站点名",
"试验方向",
"地区",
"主管单位",
"状态",
"来源链接",
"最近采集时间",
])
for row in rows:
writer.writerow([row[key] for key in row.keys()])
def preview(self, limit: int = 5) -> list[dict[str, str]]:
rows = self.conn.execute(
"""
SELECT
s.station_name,
d.name AS direction_name,
s.region,
a.name AS authority_name,
s.status
FROM stations s
JOIN authorities a ON s.authority_id = a.id
JOIN research_directions d ON s.direction_id = d.id
ORDER BY s.id ASC
LIMIT ?
""",
(limit,),
).fetchall()
return [dict(row) for row in rows]
def close(self) -> None:
self.conn.close()
agri_monitor/demo_site.py
from __future__ import annotations
from pathlib import Path
INDEX_HTML = """<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>农业试验站公开名录</title>
</head>
<body>
<h1>农业试验站公开名录</h1>
<table class="station-list">
<tr>
<th>站点名</th>
<th>地区</th>
<th>详情</th>
</tr>
<tr>
<td>华北旱作农业试验站</td>
<td>河北省</td>
<td><a class="station-link" href="detail_001.html">查看</a></td>
</tr>
<tr>
<td>江南水稻生态试验站</td>
<td>江苏省</td>
<td><a class="station-link" href="detail_002.html">查看</a></td>
</tr>
<tr>
<td>西南山地土壤改良试验站</td>
<td>四川省</td>
<td><a class="station-link" href="detail_003.html">查看</a></td>
</tr>
<tr>
<td>东北黑土地保护试验站</td>
<td>黑龙江省</td>
<td><a class="station-link" href="detail_004.html">查看</a></td>
</tr>
</table>
</body>
</html>
"""
DETAILS = {
"detail_001.html": {
"站点名": "华北旱作农业试验站",
"试验方向": "旱作节水与小麦玉米轮作",
"地区": "河北省石家庄市",
"主管单位": "某农业科学院作物研究所",
"状态": "运行中",
},
"detail_002.html": {
"站点名": "江南水稻生态试验站",
"试验方向": "水稻品种比较与稻田生态",
"地区": "江苏省扬州市",
"主管单位": "某省农业技术推广中心",
"状态": "运行中",
},
"detail_003.html": {
"站点名": "西南山地土壤改良试验站",
"试验方向": "坡耕地土壤改良与养分监测",
"地区": "四川省绵阳市",
"主管单位": "某农业大学资源环境学院",
"状态": "维护中",
},
"detail_004.html": {
"站点名": "东北黑土地保护试验站",
"试验方向": "黑土地保护性耕作与有机质提升",
"地区": "黑龙江省哈尔滨市",
"主管单位": "某黑土地保护研究中心",
"状态": "运行中",
},
}
def build_detail_html(fields: dict[str, str]) -> str:
rows = "\n".join(
f"<tr><th>{key}</th><td>{value}</td></tr>"
for key, value in fields.items()
)
return f"""<!doctype html>
<html lang="zh-CN">
<head>
<meta charset="utf-8">
<title>{fields.get('站点名', '详情')}</title>
</head>
<body>
<h1 class="station-title">{fields.get('站点名', '详情')}</h1>
<table class="detail-table">
{rows}
</table>
</body>
</html>
"""
def write_demo_site(root: Path) -> None:
root.mkdir(parents=True, exist_ok=True)
(root / "index.html").write_text(INDEX_HTML, encoding="utf-8")
for filename, fields in DETAILS.items():
(root / filename).write_text(
build_detail_html(fields),
encoding="utf-8",
)
main.py
from __future__ import annotations
import argparse
import contextlib
import functools
import http.server
import logging
import socket
import threading
from datetime import datetime, timezone
from pathlib import Path
from agri_monitor.config import CrawlConfig, DEMO_SITE_DIR, OUTPUT_DIR
from agri_monitor.demo_site import write_demo_site
from agri_monitor.fetcher import Fetcher
from agri_monitor.models import CrawlResult
from agri_monitor.parser import AgriStationParser
from agri_monitor.storage import SQLiteStorage
def now_iso() -> str:
return datetime.now(timezone.utc).astimezone().isoformat(timespec="seconds")
def find_free_port() -> int:
with contextlib.closing(socket.socket(socket.AF_INET, socket.SOCK_STREAM)) as sock:
sock.bind(("127.0.0.1", 0))
return int(sock.getsockname()[1])
def start_demo_server(directory: Path) -> tuple[http.server.ThreadingHTTPServer, str]:
port = find_free_port()
handler = functools.partial(
http.server.SimpleHTTPRequestHandler,
directory=str(directory),
)
server = http.server.ThreadingHTTPServer(("127.0.0.1", port), handler)
thread = threading.Thread(
target=server.serve_forever,
daemon=True,
)
thread.start()
return server, f"http://127.0.0.1:{port}"
def run_crawl(config: CrawlConfig) -> CrawlResult:
fetcher = Fetcher(
user_agent=config.user_agent,
timeout=config.timeout,
referer=config.referer or config.base_url,
sleep_seconds=config.sleep_seconds,
)
parser = AgriStationParser()
storage = SQLiteStorage(config.sqlite_path)
storage.init_schema()
result = CrawlResult()
try:
logging.info("fetch list: %s", config.list_url)
list_html = fetcher.get(config.list_url)
detail_urls = parser.parse_list(list_html, config.list_url)
logging.info("found detail urls: %d", len(detail_urls))
for url in detail_urls:
try:
html = fetcher.get(url)
record = parser.parse_detail(html, url, now_iso())
action = storage.upsert_station(record)
if action == "created":
result.created += 1
elif action == "updated":
result.updated += 1
else:
result.unchanged += 1
logging.info("%-9s %s", action, record.station_name)
except Exception as exc:
result.failed += 1
logging.exception("failed detail url=%s error=%s", url, exc)
storage.export_csv(config.csv_path)
logging.info("export csv: %s", config.csv_path)
for row in storage.preview(limit=5):
logging.info("preview: %s", row)
return result
finally:
storage.close()
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(description="农业试验站公开名录监控爬虫")
parser.add_argument(
"--base-url",
default="",
help="目标站点根地址,例如 https://example.org/agri",
)
parser.add_argument(
"--list-path",
default="/index.html",
help="列表页路径",
)
parser.add_argument(
"--demo",
action="store_true",
help="生成并启动本地样例站点,便于无外网环境跑通",
)
parser.add_argument(
"--db",
default=str(OUTPUT_DIR / "agri_stations.sqlite"),
help="SQLite 输出路径",
)
parser.add_argument(
"--csv",
default=str(OUTPUT_DIR / "agri_stations.csv"),
help="CSV 输出路径",
)
parser.add_argument(
"--sleep",
type=float,
default=0.6,
help="请求间隔秒数",
)
return parser.parse_args()
def main() -> None:
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
args = parse_args()
demo_server = None
if args.demo:
write_demo_site(DEMO_SITE_DIR)
demo_server, base_url = start_demo_server(DEMO_SITE_DIR)
list_path = "/index.html"
logging.info("demo server started: %s", base_url)
else:
if not args.base_url:
raise SystemExit("请提供 --base-url,或使用 --demo 运行本地样例。")
base_url = args.base_url
list_path = args.list_path
try:
config = CrawlConfig(
base_url=base_url,
list_path=list_path,
sleep_seconds=args.sleep,
sqlite_path=Path(args.db),
csv_path=Path(args.csv),
)
result = run_crawl(config)
logging.info(
"done: created=%d updated=%d unchanged=%d failed=%d",
result.created,
result.updated,
result.unchanged,
result.failed,
)
finally:
if demo_server:
demo_server.shutdown()
if __name__ == "__main__":
main()
最小运行命令
python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt
python main.py --demo
Windows PowerShell:
python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -r requirements.txt
python main.py --demo
运行完成后查看:
data/output/agri_stations.sqlite
data/output/agri_stations.csv
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐
所有评论(0)