Python 实战:抓取植物标本馆公开条目元数据,构建一份规范化标本数据集
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
0️⃣ 前言(Preface)
这一篇我准备爬取 公开植物标本条目元数据,使用 Python 的 requests 访问公开 API,用标准库完成解析、清洗、去重、SQLite 存储与 CSV/JSON 导出,最终得到一份字段统一、可复用、可继续分析的植物标本元数据集。
读完本文,你大概能拿到三样东西:
第一,理解“唯一编号驱动型抓取”到底怎么设计,不是随便翻页保存 HTML,而是围绕标本号、馆藏编号、详情链接或 occurrence key 建立稳定的数据链路。
第二,掌握一个可运行的 Python 爬虫项目结构,从请求层、解析层、存储层到命令行入口都完整拆开,不再把所有逻辑堆在一个脚本里。
第三,得到一份适合二次分析的植物标本元数据,包括:标本号、学名、采集地、采集时间、采集人、馆藏链接等核心字段。
这类项目看起来不复杂,但真正写起来很考验工程习惯。尤其是标本数据,字段缺失、日期格式不统一、地点写法混杂、同一个标本号在不同机构里可能重复,这些细节会直接决定最后数据集是否可靠。我个人比较喜欢这种题材:它没有炫技,但很适合练基本功。
1️⃣ 摘要(Abstract)
本文以公开植物标本条目元数据为目标,使用 Python、requests、SQLite、CSV/JSON 等工具完成一个可复现的元数据抓取项目,最终产出规范化的植物标本数据集。
读完本文,你可以获得:
- 一套“采集 → 解析 → 清洗 → 存储 → 导出”的完整爬虫流程。
- 一份围绕唯一编号去重、断点续跑、失败重试设计的项目代码。
- 一种适合标本馆、博物馆、公开目录类站点的数据规范化思路。
本文选择 API 优先的方案,而不是强行解析网页。原因很简单:公开数据平台通常会提供结构化接口,直接读取 JSON 比抓网页更稳定,也更容易控制访问频率。网页更适合给人看,API 更适合给程序读。能走公开 API,就不要拿 HTML 选择器硬扛,这是我做这类数据采集时的基本习惯。
2️⃣ 背景与需求(Why)
植物标本馆保存了大量植物实体标本。每一份标本往往对应一条或多条公开元数据记录,常见信息包括标本编号、物种学名、采集地点、采集日期、采集人、馆藏机构、数据来源链接等。对研究者、数据分析人员、信息整理人员来说,这类数据价值很高。
为什么要爬这些数据?主要有三个原因。
第一,做数据分析。
例如统计某一植物类群在不同国家、地区、年份的采集情况;分析某些属种的采集时间跨度;观察不同标本馆的数字化记录完整度。标本数据不是普通商品数据,它背后往往有历史、地理、分类学和采集事件信息。
第二,做信息聚合。
公开标本记录分散在不同机构、不同平台中。如果每次都靠网页搜索,效率很低。把公开元数据抓下来,做成一个本地 SQLite 数据库或 CSV 文件,就可以按学名、采集人、地点、时间快速筛选。
第三,做自动化整理。
很多数据清洗工作并不适合手工完成。比如统一日期格式、合并地点字段、剔除空值、去重、生成馆藏链接、记录失败任务,这些工作交给程序更稳定。
本文的目标字段如下:
| 中文字段 | 英文字段 | 说明 |
|---|---|---|
| 标本号 | specimen_number |
优先使用 catalogNumber,如果缺失则回退到 occurrenceID 或平台 key |
| 学名 | scientific_name |
优先使用公开记录中的 scientificName |
| 采集地 | collection_place |
由国家、省州、县区、locality 等字段拼接 |
| 采集时间 | collection_date |
优先使用 eventDate,否则尝试用 year/month/day 拼接 |
| 采集人 | collector |
对应采集者字段,常见为 recordedBy |
| 馆藏链接 | collection_link |
指向原始公开记录页面,便于复查 |
| 数据源 key | source_key |
平台侧唯一 key,用于去重和断点续跑 |
| 机构代码 | institution_code |
标本馆或机构缩写 |
| 馆藏代码 | collection_code |
馆藏集合代码 |
| 数据集 key | dataset_key |
数据集标识 |
| 数据许可证 | license |
记录许可信息,便于后续合规使用 |
其中前六个字段是本文主题要求的核心字段,后面几个是我个人强烈建议保留的辅助字段。原因很现实:只保留中文展示字段,短期看很清爽,长期用起来会很痛苦。比如“标本号”在不同机构中未必全局唯一,不带机构代码、数据源 key,后期排错会比较麻烦。
3️⃣ 合规与注意事项
爬虫不是“能访问就随便抓”。技术分享必须把边界说清楚。本文只针对公开可访问的条目元数据,不采集敏感信息,不绕过登录,不绕过付费限制,不规避反爬策略,也不做高并发压力访问。
3.1 robots.txt 基本说明
robots.txt 是网站放在根目录下的爬虫访问声明文件,常见路径类似:
https://example.org/robots.txt
它通常会告诉爬虫哪些路径允许访问,哪些路径不建议访问。对于传统 HTML 页面采集,正式运行前应该先看 robots.txt。如果目标站点明确禁止某些路径,就不要抓那些路径。
本文使用的是公开 API,并不是从页面里暴力解析标本详情 HTML。即便如此,依旧应该遵守服务条款、API 文档和合理访问频率。API 能访问,不代表可以无限请求。
3.2 频率控制
本文代码默认每次请求之间会暂停一小段时间,并且提供 --sleep 参数。建议从保守频率开始,比如:
python -m src.main --mode search --limit 100 --sleep 0.8
不要一上来就开几十个线程、几百个协程。公开数据平台不是你的压测对象,尤其是科研数据平台,很多服务资源需要供全球用户共同使用。
3.3 不做攻击式并发
本文代码没有默认开启并发。后面进阶部分会提到并发优化,但并发只是为了提升可控范围内的效率,不是为了绕过限制。真正做工程时,我更关心稳定性,而不是单次跑得多快。一个小时慢慢抓完并且数据干净,比三分钟抓崩、丢字段、触发限制要好得多。
3.4 不采集敏感信息
本文只处理公开标本元数据,例如学名、采集地、采集时间、采集人、公开记录链接等。对于涉及敏感物种、精确坐标、受保护地点、个人联系方式等信息,应遵循数据源本身的公开范围和使用要求。公开页面没有展示的,不要尝试通过接口参数、隐藏字段或非正常方式去挖。
3.5 不绕过付费或登录限制
如果某个标本馆要求登录、申请授权或限制下载,那就按规则申请,不要绕过。本文的做法是中性的、工程化的:只使用公开接口,只采集公开字段,只做合理频控,只做技术学习和数据整理。
4️⃣ 技术选型与整体流程(What/How)
4.1 静态、动态、API:本文属于哪一种?
常见爬虫目标大致分三类。
第一类是静态 HTML。
页面源码里直接有数据,用 requests + BeautifulSoup/lxml 就能解析。这种简单,但选择器容易受页面改版影响。
第二类是动态渲染页面。
网页首屏 HTML 里没有真实数据,数据由 JavaScript 后续请求接口加载。遇到这种页面,可以分析接口,也可以使用 Playwright/Selenium 渲染页面。我的习惯是优先找接口,最后才用浏览器自动化。
第三类是公开 API。
服务方直接提供结构化 JSON 接口。本文属于这一类。API 返回的数据结构更稳定,字段更明确,也更适合规范化入库。
本文使用 API-first 的采集方式,主要工具是:
requests 负责 HTTP 请求
sqlite3 负责本地数据库
csv/json 负责导出
argparse 负责命令行参数
logging 负责运行日志
hashlib 负责内容 hash
time/random 负责频控与退避
没有引入 Scrapy,是因为这个项目的核心不是大规模站点抓取,而是围绕公开 API 做规范化数据集。Scrapy 很强,但初始结构偏重;本文希望读者复制后可以直接跑起来。
没有引入 Playwright,是因为本文不需要渲染网页。只要 API 可用,就不应该为了“看起来像爬虫”而使用浏览器自动化。Playwright 适合动态页面、登录态交互、复杂前端渲染,不适合简单 JSON API 数据采集。
4.2 整体流程
本文的流程可以写成一句话:
种子编号 / 查询条件
↓
请求公开 API
↓
解析 JSON
↓
字段清洗与规范化
↓
SQLite 去重入库
↓
导出 CSV / JSON
↓
复查失败任务与日志
如果换成更工程化的模块划分,就是:
Fetcher 请求层:负责发请求、重试、频控、处理状态码
Parser 解析层:负责从 JSON 中抽取字段并规范化
Storage 存储层:负责 SQLite 建表、去重、导出
Main 入口层:负责命令行参数和流程编排
我不太喜欢把请求、解析、存储全塞进一个函数。短脚本当然可以这么写,但一旦字段增加、失败重试增加、导出格式增加,单文件脚本会很快失控。这个项目虽然不大,但我们仍然按照小型工程来组织。
5️⃣ 环境准备与依赖安装(可复现)
5.1 Python 版本
推荐使用:
Python 3.10+
我建议至少使用 Python 3.10。原因不是语法多先进,而是现在很多依赖和系统环境对 3.8 以下已经不太友好。用 3.10 或 3.11 基本能减少很多无意义的兼容问题。
查看版本:
python --version
5.2 创建虚拟环境
Linux/macOS:
python -m venv .venv
source .venv/bin/activate
Windows PowerShell:
python -m venv .venv
.venv\Scripts\Activate.ps1
5.3 安装依赖
本文尽量少用第三方库。核心依赖只有 requests。
requirements.txt:
requests==2.32.3
安装:
pip install -r requirements.txt
如果你希望命令行进度条更好看,可以额外安装 tqdm,但本文代码不强制依赖。
5.4 推荐项目结构
herbarium_meta_crawler/
├── README.md
├── requirements.txt
├── data/
│ ├── seeds.txt
│ └── output/
│ ├── herbarium_records.csv
│ ├── herbarium_records.json
│ └── herbarium.sqlite3
├── logs/
│ └── crawler.log
└── src/
├── __init__.py
├── config.py
├── fetcher.py
├── parser.py
├── storage.py
├── utils.py
└── main.py
创建目录:
mkdir -p herbarium_meta_crawler/data/output
mkdir -p herbarium_meta_crawler/logs
mkdir -p herbarium_meta_crawler/src
touch herbarium_meta_crawler/src/__init__.py
Windows 下可以手动创建,也可以用 PowerShell:
New-Item -ItemType Directory -Force herbarium_meta_crawler\data\output
New-Item -ItemType Directory -Force herbarium_meta_crawler\logs
New-Item -ItemType Directory -Force herbarium_meta_crawler\src
New-Item -ItemType File -Force herbarium_meta_crawler\src\__init__.py
6️⃣ 核心实现:请求层(Fetcher)
请求层负责所有 HTTP 细节,包括 headers、timeout、session、失败重试和退避。这个模块写好以后,解析层就不用关心网络异常。
6.1 headers:User-Agent、Referer、Accept
请求公开 API 时,最基本的 headers 应该包含:
User-Agent
Accept
Referer
User-Agent 不建议伪装成浏览器。很多人喜欢复制浏览器 UA,但对于 API 项目,我更推荐写清楚项目名和联系方式。比如:
HerbariumMetaCrawler/1.0 (+mailto:your_email@example.com)
这不是为了“更像真人”,而是为了更透明。如果服务方发现异常访问,至少知道如何联系你。
Referer 可以放公开网站首页或项目说明地址。对 API 来说它通常不是必需字段,但保留它可以让请求更完整。
6.2 timeout
不要让请求无限等待。本文设置:
timeout=(5, 25)
含义是连接超时 5 秒,读取超时 25 秒。网络波动是常态,不设置 timeout 的爬虫迟早会卡死在某个请求上。
6.3 session/cookie
本文使用 requests.Session() 复用连接,提高稳定性。由于采集的是公开 API,不需要登录 cookie。代码里保留 session,但不写任何绕过登录或模拟账号行为。
6.4 失败处理:重试与退避
常见可重试状态码:
429 Too Many Requests
500 Internal Server Error
502 Bad Gateway
503 Service Unavailable
504 Gateway Timeout
遇到这些状态码,不要立刻密集重试。正确做法是:
第一次失败:等待 1 秒左右
第二次失败:等待 2 秒左右
第三次失败:等待 4 秒左右
再加一点随机抖动,避免固定节奏请求
如果响应头里有 Retry-After,优先尊重它。
下面是请求层完整代码。
src/config.py
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[1]
DATA_DIR = PROJECT_ROOT / "data"
OUTPUT_DIR = DATA_DIR / "output"
LOG_DIR = PROJECT_ROOT / "logs"
OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)
GBIF_API_BASE = "https://api.gbif.org/v1"
GBIF_OCCURRENCE_SEARCH = f"{GBIF_API_BASE}/occurrence/search"
GBIF_OCCURRENCE_DETAIL = f"{GBIF_API_BASE}/occurrence"
DEFAULT_DB_PATH = OUTPUT_DIR / "herbarium.sqlite3"
DEFAULT_CSV_PATH = OUTPUT_DIR / "herbarium_records.csv"
DEFAULT_JSON_PATH = OUTPUT_DIR / "herbarium_records.json"
DEFAULT_LOG_PATH = LOG_DIR / "crawler.log"
DEFAULT_USER_AGENT = "HerbariumMetaCrawler/1.0 (+mailto:example@example.com)"
DEFAULT_HEADERS = {
"User-Agent": DEFAULT_USER_AGENT,
"Accept": "application/json,text/plain,*/*",
"Referer": "https://www.gbif.org/",
}
REQUEST_TIMEOUT = (5, 25)
RETRY_STATUS_CODES = {429, 500, 502, 503, 504}
这里的邮箱请改成你自己的联系方式。如果只是本地练习,可以保留示例值,但如果你准备长期跑任务,最好写真实可联系邮箱。
src/fetcher.py
import logging
import random
import time
from typing import Any, Dict, Optional
import requests
from .config import (
DEFAULT_HEADERS,
GBIF_OCCURRENCE_DETAIL,
GBIF_OCCURRENCE_SEARCH,
REQUEST_TIMEOUT,
RETRY_STATUS_CODES,
)
class FetchError(RuntimeError):
"""请求失败时抛出的异常。"""
class GBIFFetcher:
"""
GBIF Occurrence API 请求封装。
这个类只负责网络请求,不负责字段清洗。
"""
def __init__(
self,
sleep_seconds: float = 0.6,
max_retries: int = 3,
timeout=REQUEST_TIMEOUT,
headers: Optional[Dict[str, str]] = None,
) -> None:
self.sleep_seconds = sleep_seconds
self.max_retries = max_retries
self.timeout = timeout
self.session = requests.Session()
self.session.headers.update(headers or DEFAULT_HEADERS)
def _sleep_between_requests(self) -> None:
"""
普通请求之间的基础停顿。
加一点随机抖动,避免请求节奏过于机械。
"""
if self.sleep_seconds <= 0:
return
jitter = random.uniform(0, self.sleep_seconds * 0.3)
time.sleep(self.sleep_seconds + jitter)
def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
"""
GET JSON,并处理重试、退避、状态码。
"""
last_error: Optional[Exception] = None
for attempt in range(1, self.max_retries + 1):
self._sleep_between_requests()
try:
logging.debug("GET %s params=%s attempt=%s", url, params, attempt)
response = self.session.get(url, params=params, timeout=self.timeout)
if response.status_code == 200:
try:
return response.json()
except ValueError as exc:
raise FetchError(f"响应不是合法 JSON: {response.text[:200]}") from exc
if response.status_code in RETRY_STATUS_CODES:
retry_after = response.headers.get("Retry-After")
wait_seconds = self._compute_backoff(attempt, retry_after)
logging.warning(
"可重试状态码: status=%s url=%s wait=%.2fs",
response.status_code,
response.url,
wait_seconds,
)
time.sleep(wait_seconds)
continue
raise FetchError(
f"请求失败: status={response.status_code}, "
f"url={response.url}, body={response.text[:300]}"
)
except (requests.Timeout, requests.ConnectionError) as exc:
last_error = exc
wait_seconds = self._compute_backoff(attempt)
logging.warning(
"网络异常: %s attempt=%s wait=%.2fs",
repr(exc),
attempt,
wait_seconds,
)
time.sleep(wait_seconds)
except requests.RequestException as exc:
last_error = exc
wait_seconds = self._compute_backoff(attempt)
logging.warning(
"请求异常: %s attempt=%s wait=%.2fs",
repr(exc),
attempt,
wait_seconds,
)
time.sleep(wait_seconds)
raise FetchError(f"超过最大重试次数,最后异常: {last_error}")
@staticmethod
def _compute_backoff(attempt: int, retry_after: Optional[str] = None) -> float:
"""
计算退避时间。
如果服务端返回 Retry-After,并且是数字,则优先使用。
"""
if retry_after and retry_after.isdigit():
return min(float(retry_after), 60.0)
base = 2 ** (attempt - 1)
jitter = random.uniform(0, 0.8)
return min(base + jitter, 30.0)
def search_occurrences(
self,
offset: int,
limit: int,
q: Optional[str] = None,
country: Optional[str] = None,
year_from: Optional[int] = None,
year_to: Optional[int] = None,
dataset_key: Optional[str] = None,
catalog_number: Optional[str] = None,
) -> Dict[str, Any]:
"""
检索植物保存标本记录。
basisOfRecord=PRESERVED_SPECIMEN 表示保存标本。
kingdomKey=6 通常用于限制植物界记录。
"""
params: Dict[str, Any] = {
"basisOfRecord": "PRESERVED_SPECIMEN",
"kingdomKey": 6,
"limit": limit,
"offset": offset,
}
if q:
params["q"] = q
if country:
params["country"] = country.upper()
if dataset_key:
params["datasetKey"] = dataset_key
if catalog_number:
params["catalogNumber"] = catalog_number
if year_from and year_to:
params["eventDate"] = f"{year_from},{year_to}"
elif year_from:
params["eventDate"] = f"{year_from},"
elif year_to:
params["eventDate"] = f",{year_to}"
return self._get_json(GBIF_OCCURRENCE_SEARCH, params=params)
def get_occurrence_detail(self, key: str | int) -> Dict[str, Any]:
"""
根据 GBIF occurrence key 获取详情。
"""
url = f"{GBIF_OCCURRENCE_DETAIL}/{key}"
return self._get_json(url)
这一层最重要的不是“能请求”,而是“失败时知道为什么失败,并且不把服务打爆”。爬虫稳定性大部分来自这些不起眼的细节。
7️⃣ 核心实现:解析层(Parser)
解析层负责把 API 返回的 JSON 转成我们自己的数据结构。不要把外部接口字段直接当作数据库字段,因为外部字段可能很宽、很杂,而我们需要的是稳定的本地模型。
7.1 解析方式
本文使用 JSON 解析,不需要 XPath、CSS 或 BeautifulSoup。
如果目标站点是 HTML 页面,可以用:
BeautifulSoup:适合简单页面
lxml + XPath:适合结构清晰、字段位置稳定的页面
parsel:适合 Scrapy 风格选择器
但当前项目没有必要这么做。API 已经返回 JSON,直接读字典字段即可。
7.2 列表页如何拿详情链接
API 搜索接口返回的是一批 occurrence 记录。每条记录一般会有平台 key。我们可以用这个 key 生成公开馆藏链接:
collection_link = f"https://www.gbif.org/occurrence/{key}"
也可以再请求详情接口:
GET https://api.gbif.org/v1/occurrence/{key}
本文默认先用搜索结果里的字段。如果你希望字段更完整,可以开启详情补抓。详情补抓更慢,但可靠性更高。
7.3 详情页如何抽字段
目标字段映射如下:
标本号 catalogNumber / occurrenceID / key
学名 scientificName / acceptedScientificName / species
采集地 country + stateProvince + county + locality
采集时间 eventDate / year-month-day
采集人 recordedBy
馆藏链接 https://www.gbif.org/occurrence/{key}
这里最容易踩坑的是“标本号”。很多人以为 catalogNumber 一定全局唯一,但它通常只在某个机构、某个馆藏集合内有意义。因此本文数据库去重优先使用 source_key 和 collection_link,而不是单独拿 catalogNumber 当全局唯一键。
7.4 缺失字段怎么办
公开标本数据经常缺字段。比如:
有学名,但没有采集人
有国家,但没有 locality
有 year/month/day,但没有 eventDate
有 occurrenceID,但没有 catalogNumber
处理思路是:
能规范化就规范化
能回退就回退
不能确定就留空
不要凭空编造
字段缺失不是程序错误。真正的错误是把缺失字段硬补成看似完整的假数据。
下面是解析层代码。
src/utils.py
import hashlib
import json
import logging
from datetime import datetime
from typing import Any, Iterable, Optional
def setup_logging(log_path: str) -> None:
"""
初始化日志。
同时输出到控制台和文件。
"""
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(message)s",
handlers=[
logging.StreamHandler(),
logging.FileHandler(log_path, encoding="utf-8"),
],
)
def clean_text(value: Any) -> str:
"""
清洗字符串:
- None 转空字符串
- list/tuple 用分号拼接
- 压缩多余空白
"""
if value is None:
return ""
if isinstance(value, (list, tuple)):
value = "; ".join(str(v) for v in value if v is not None)
text = str(value).replace("\u0000", "").strip()
text = " ".join(text.split())
return text
def join_non_empty(parts: Iterable[Any], sep: str = " / ") -> str:
"""
拼接非空字段。
"""
cleaned = [clean_text(p) for p in parts]
cleaned = [p for p in cleaned if p]
return sep.join(cleaned)
def normalize_date(event_date: Any, year: Any = None, month: Any = None, day: Any = None) -> str:
"""
规范化采集时间。
优先使用 eventDate。
如果没有,则尝试 year/month/day。
不强行解析所有历史日期,只做温和清洗。
"""
event_date_text = clean_text(event_date)
if event_date_text:
return event_date_text
y = clean_text(year)
m = clean_text(month)
d = clean_text(day)
if y and m and d:
return f"{y.zfill(4)}-{m.zfill(2)}-{d.zfill(2)}"
if y and m:
return f"{y.zfill(4)}-{m.zfill(2)}"
if y:
return y.zfill(4)
return ""
def stable_hash(data: dict) -> str:
"""
生成内容 hash。
用于辅助判断记录内容是否变化。
"""
raw = json.dumps(data, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def now_iso() -> str:
return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"
def safe_int(value: Any, default: int = 0) -> int:
try:
return int(value)
except (TypeError, ValueError):
return default
src/parser.py
from dataclasses import asdict, dataclass
from typing import Any, Dict, Iterable, List
from .utils import clean_text, join_non_empty, normalize_date, stable_hash
@dataclass
class HerbariumRecord:
"""
本项目自己的标准化记录结构。
"""
source_key: str
specimen_number: str
scientific_name: str
collection_place: str
collection_date: str
collector: str
collection_link: str
institution_code: str
collection_code: str
dataset_key: str
basis_of_record: str
license: str
raw_hash: str
def to_dict(self) -> Dict[str, str]:
return asdict(self)
class GBIFParser:
"""
GBIF occurrence JSON 解析器。
"""
@staticmethod
def extract_keys_from_search(payload: Dict[str, Any]) -> List[str]:
"""
从搜索结果中提取 occurrence key。
"""
results = payload.get("results") or []
keys: List[str] = []
for item in results:
key = item.get("key") or item.get("gbifID")
if key is not None:
keys.append(str(key))
return keys
@staticmethod
def iter_results(payload: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
"""
迭代搜索结果。
"""
results = payload.get("results") or []
for item in results:
if isinstance(item, dict):
yield item
def parse_occurrence(self, item: Dict[str, Any]) -> HerbariumRecord:
"""
把一条 GBIF occurrence JSON 转为 HerbariumRecord。
"""
source_key = clean_text(item.get("key") or item.get("gbifID"))
specimen_number = self._pick_first(
item,
[
"catalogNumber",
"occurrenceID",
"recordNumber",
"otherCatalogNumbers",
"gbifID",
"key",
],
)
scientific_name = self._pick_first(
item,
[
"scientificName",
"acceptedScientificName",
"species",
"genericName",
],
)
collection_place = join_non_empty(
[
item.get("country"),
item.get("stateProvince"),
item.get("county"),
item.get("municipality"),
item.get("locality"),
]
)
collection_date = normalize_date(
item.get("eventDate"),
item.get("year"),
item.get("month"),
item.get("day"),
)
collector = self._pick_first(
item,
[
"recordedBy",
"identifiedBy",
],
)
if source_key:
collection_link = f"https://www.gbif.org/occurrence/{source_key}"
else:
collection_link = clean_text(item.get("references"))
record = HerbariumRecord(
source_key=source_key,
specimen_number=specimen_number,
scientific_name=scientific_name,
collection_place=collection_place,
collection_date=collection_date,
collector=collector,
collection_link=collection_link,
institution_code=clean_text(item.get("institutionCode")),
collection_code=clean_text(item.get("collectionCode")),
dataset_key=clean_text(item.get("datasetKey")),
basis_of_record=clean_text(item.get("basisOfRecord")),
license=clean_text(item.get("license")),
raw_hash=stable_hash(item),
)
return record
@staticmethod
def _pick_first(item: Dict[str, Any], field_names: List[str]) -> str:
"""
从多个候选字段中取第一个非空值。
"""
for name in field_names:
value = clean_text(item.get(name))
if value:
return value
return ""
这个解析器有一个特点:它不假设字段一定存在。爬虫解析代码最忌讳这样写:
scientific_name = item["scientificName"]
只要某条记录缺字段,程序就会炸。更稳妥的方式是:
scientific_name = item.get("scientificName", "")
再配合统一的 clean_text(),后面入库就会舒服很多。
8️⃣ 数据存储与导出(Storage)
本文选择 SQLite 起步。原因很简单:
零服务部署
跨平台
支持唯一约束
支持 SQL 查询
适合中小规模数据集
方便导出 CSV/JSON
CSV 很适合交换,但不适合断点续跑和去重。JSON 保存原始结构很方便,但查询不如数据库直接。SQLite 则比较折中:轻量,但足够工程化。
8.1 字段映射表
| 字段名 | 类型 | 示例值 | 说明 |
|---|---|---|---|
source_key |
TEXT | 1234567890 |
平台记录 key,作为主要唯一字段 |
specimen_number |
TEXT | K000123456 |
标本号或回退编号 |
scientific_name |
TEXT | Rosa chinensis Jacq. |
学名 |
collection_place |
TEXT | China / Yunnan / Dali / Cangshan |
拼接后的采集地 |
collection_date |
TEXT | 1987-05-21 |
采集时间 |
collector |
TEXT | S. Wang |
采集人 |
collection_link |
TEXT | https://www.gbif.org/occurrence/... |
馆藏公开链接 |
institution_code |
TEXT | K |
机构代码 |
collection_code |
TEXT | Herbarium |
馆藏代码 |
dataset_key |
TEXT | UUID | 数据集 key |
basis_of_record |
TEXT | PRESERVED_SPECIMEN |
记录类型 |
license |
TEXT | CC_BY_4_0 |
许可证 |
raw_hash |
TEXT | SHA256 | 原始记录 hash |
created_at |
TEXT | ISO 时间 | 入库时间 |
updated_at |
TEXT | ISO 时间 | 更新时间 |
8.2 去重策略
本文使用三层去重:
第一层,source_key 唯一。
如果平台记录 key 存在,它通常是最可靠的唯一标识。
第二层,collection_link 唯一。
链接能帮助复查,也可以防止同一记录重复写入。
第三层,内容 hash。
如果同一个 key 的内容发生变化,可以比较 raw_hash,用于后续增量更新。
不建议只用 specimen_number 去重。标本号在单个馆藏内部可能唯一,但跨机构未必唯一。更稳的唯一判断应该是:
source_key
或
institution_code + collection_code + specimen_number
或
collection_link
下面是存储层代码。
src/storage.py
import csv
import json
import sqlite3
from pathlib import Path
from typing import Iterable, List
from .parser import HerbariumRecord
from .utils import now_iso
class SQLiteStorage:
"""
SQLite 存储层。
"""
def __init__(self, db_path: str | Path) -> None:
self.db_path = Path(db_path)
self.conn = sqlite3.connect(self.db_path)
self.conn.row_factory = sqlite3.Row
def init_db(self) -> None:
"""
初始化数据表。
"""
sql = """
CREATE TABLE IF NOT EXISTS herbarium_records (
id INTEGER PRIMARY KEY AUTOINCREMENT,
source_key TEXT NOT NULL,
specimen_number TEXT,
scientific_name TEXT,
collection_place TEXT,
collection_date TEXT,
collector TEXT,
collection_link TEXT,
institution_code TEXT,
collection_code TEXT,
dataset_key TEXT,
basis_of_record TEXT,
license TEXT,
raw_hash TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(source_key)
);
CREATE INDEX IF NOT EXISTS idx_scientific_name
ON herbarium_records(scientific_name);
CREATE INDEX IF NOT EXISTS idx_specimen_number
ON herbarium_records(specimen_number);
CREATE INDEX IF NOT EXISTS idx_collection_date
ON herbarium_records(collection_date);
CREATE INDEX IF NOT EXISTS idx_dataset_key
ON herbarium_records(dataset_key);
"""
self.conn.executescript(sql)
self.conn.commit()
def upsert_record(self, record: HerbariumRecord) -> bool:
"""
插入或更新记录。
返回 True 表示新增或更新成功。
"""
now = now_iso()
data = record.to_dict()
sql = """
INSERT INTO herbarium_records (
source_key,
specimen_number,
scientific_name,
collection_place,
collection_date,
collector,
collection_link,
institution_code,
collection_code,
dataset_key,
basis_of_record,
license,
raw_hash,
created_at,
updated_at
)
VALUES (
:source_key,
:specimen_number,
:scientific_name,
:collection_place,
:collection_date,
:collector,
:collection_link,
:institution_code,
:collection_code,
:dataset_key,
:basis_of_record,
:license,
:raw_hash,
:created_at,
:updated_at
)
ON CONFLICT(source_key) DO UPDATE SET
specimen_number=excluded.specimen_number,
scientific_name=excluded.scientific_name,
collection_place=excluded.collection_place,
collection_date=excluded.collection_date,
collector=excluded.collector,
collection_link=excluded.collection_link,
institution_code=excluded.institution_code,
collection_code=excluded.collection_code,
dataset_key=excluded.dataset_key,
basis_of_record=excluded.basis_of_record,
license=excluded.license,
raw_hash=excluded.raw_hash,
updated_at=excluded.updated_at;
"""
data["created_at"] = now
data["updated_at"] = now
self.conn.execute(sql, data)
self.conn.commit()
return True
def source_key_exists(self, source_key: str) -> bool:
sql = "SELECT 1 FROM herbarium_records WHERE source_key = ? LIMIT 1"
cur = self.conn.execute(sql, (source_key,))
return cur.fetchone() is not None
def count_records(self) -> int:
cur = self.conn.execute("SELECT COUNT(*) AS cnt FROM herbarium_records")
row = cur.fetchone()
return int(row["cnt"])
def fetch_all(self) -> List[dict]:
sql = """
SELECT
source_key,
specimen_number,
scientific_name,
collection_place,
collection_date,
collector,
collection_link,
institution_code,
collection_code,
dataset_key,
basis_of_record,
license,
raw_hash
FROM herbarium_records
ORDER BY id ASC
"""
cur = self.conn.execute(sql)
return [dict(row) for row in cur.fetchall()]
def export_csv(self, path: str | Path) -> None:
rows = self.fetch_all()
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
if not rows:
with path.open("w", encoding="utf-8-sig", newline="") as f:
f.write("")
return
fieldnames = list(rows[0].keys())
with path.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def export_json(self, path: str | Path) -> None:
rows = self.fetch_all()
path = Path(path)
path.parent.mkdir(parents=True, exist_ok=True)
with path.open("w", encoding="utf-8") as f:
json.dump(rows, f, ensure_ascii=False, indent=2)
def close(self) -> None:
self.conn.close()
这段代码有两个实用点。
第一,使用 ON CONFLICT(source_key) DO UPDATE,同一条记录再次抓到时不会插入重复行,而是更新字段。
第二,导出 CSV 时用了 utf-8-sig。这对中文用户比较友好,因为很多人会用 Excel 打开 CSV,utf-8-sig 可以减少乱码问题。
9️⃣ 运行方式与结果展示
现在把入口文件写出来。入口文件负责读取命令行参数,决定是按查询条件分页抓取,还是按种子标本号抓取。
9.1 种子文件格式
data/seeds.txt:
K000123456
E000987654
PE000112233
每行一个标本号。空行会被忽略。真实使用时,你可以把自己已有的标本号放进去,让程序按 catalogNumber 查询。
9.2 主程序代码
src/main.py
import argparse
import logging
from pathlib import Path
from typing import Iterable, List
from .config import (
DEFAULT_CSV_PATH,
DEFAULT_DB_PATH,
DEFAULT_JSON_PATH,
DEFAULT_LOG_PATH,
)
from .fetcher import FetchError, GBIFFetcher
from .parser import GBIFParser
from .storage import SQLiteStorage
from .utils import safe_int, setup_logging
def read_seed_file(path: str | Path) -> List[str]:
seed_path = Path(path)
if not seed_path.exists():
raise FileNotFoundError(f"种子文件不存在: {seed_path}")
seeds: List[str] = []
with seed_path.open("r", encoding="utf-8") as f:
for line in f:
value = line.strip()
if value and not value.startswith("#"):
seeds.append(value)
return seeds
def build_arg_parser() -> argparse.ArgumentParser:
parser = argparse.ArgumentParser(
description="抓取公开植物标本馆条目元数据,导出 SQLite/CSV/JSON。"
)
parser.add_argument(
"--mode",
choices=["search", "seeds"],
default="search",
help="search 表示按查询条件分页抓取;seeds 表示按 seeds.txt 中的标本号抓取。",
)
parser.add_argument(
"--q",
default=None,
help="全文关键词,例如 Rosa、Ficus、Yunnan 等。",
)
parser.add_argument(
"--country",
default=None,
help="国家代码,例如 CN、US、GB。为空则不限制。",
)
parser.add_argument(
"--year-from",
type=int,
default=None,
help="采集年份起始,例如 1900。",
)
parser.add_argument(
"--year-to",
type=int,
default=None,
help="采集年份结束,例如 2026。",
)
parser.add_argument(
"--dataset-key",
default=None,
help="指定数据集 key。为空则不限制。",
)
parser.add_argument(
"--limit",
type=int,
default=50,
help="每页数量。建议从 20-100 开始。",
)
parser.add_argument(
"--max-pages",
type=int,
default=2,
help="search 模式下最多抓取页数。",
)
parser.add_argument(
"--sleep",
type=float,
default=0.6,
help="每次请求之间的基础停顿秒数。",
)
parser.add_argument(
"--seeds",
default="data/seeds.txt",
help="seeds 模式下的标本号文件。",
)
parser.add_argument(
"--db",
default=str(DEFAULT_DB_PATH),
help="SQLite 数据库路径。",
)
parser.add_argument(
"--csv",
default=str(DEFAULT_CSV_PATH),
help="CSV 导出路径。",
)
parser.add_argument(
"--json",
default=str(DEFAULT_JSON_PATH),
help="JSON 导出路径。",
)
parser.add_argument(
"--log",
default=str(DEFAULT_LOG_PATH),
help="日志文件路径。",
)
return parser
def crawl_by_search(args, fetcher: GBIFFetcher, parser: GBIFParser, storage: SQLiteStorage) -> None:
"""
按查询条件分页抓取。
"""
total_saved = 0
for page in range(args.max_pages):
offset = page * args.limit
logging.info(
"开始抓取搜索页: page=%s offset=%s limit=%s",
page + 1,
offset,
args.limit,
)
try:
payload = fetcher.search_occurrences(
offset=offset,
limit=args.limit,
q=args.q,
country=args.country,
year_from=args.year_from,
year_to=args.year_to,
dataset_key=args.dataset_key,
)
except FetchError as exc:
logging.error("搜索页抓取失败: offset=%s error=%s", offset, exc)
continue
results = list(parser.iter_results(payload))
if not results:
logging.info("当前页没有结果,提前结束。")
break
for item in results:
try:
record = parser.parse_occurrence(item)
if not record.source_key:
logging.warning("跳过缺少 source_key 的记录: %s", item)
continue
storage.upsert_record(record)
total_saved += 1
except Exception as exc:
logging.exception("解析或入库失败: %s", exc)
end_of_records = bool(payload.get("endOfRecords"))
count = safe_int(payload.get("count"), 0)
logging.info(
"完成页: page=%s results=%s total_count_hint=%s end=%s",
page + 1,
len(results),
count,
end_of_records,
)
if end_of_records:
break
logging.info("search 模式完成,本次处理记录数: %s", total_saved)
def crawl_by_seeds(args, fetcher: GBIFFetcher, parser: GBIFParser, storage: SQLiteStorage) -> None:
"""
按标本号种子抓取。
"""
seeds = read_seed_file(args.seeds)
logging.info("读取种子数量: %s", len(seeds))
total_saved = 0
for index, catalog_number in enumerate(seeds, start=1):
logging.info(
"按标本号查询: %s/%s catalogNumber=%s",
index,
len(seeds),
catalog_number,
)
try:
payload = fetcher.search_occurrences(
offset=0,
limit=args.limit,
q=args.q,
country=args.country,
year_from=args.year_from,
year_to=args.year_to,
dataset_key=args.dataset_key,
catalog_number=catalog_number,
)
except FetchError as exc:
logging.error("标本号查询失败: catalogNumber=%s error=%s", catalog_number, exc)
continue
results = list(parser.iter_results(payload))
if not results:
logging.warning("没有查到记录: catalogNumber=%s", catalog_number)
continue
for item in results:
try:
record = parser.parse_occurrence(item)
if not record.source_key:
logging.warning("跳过缺少 source_key 的记录: catalogNumber=%s", catalog_number)
continue
storage.upsert_record(record)
total_saved += 1
except Exception as exc:
logging.exception("解析或入库失败: catalogNumber=%s error=%s", catalog_number, exc)
logging.info("seeds 模式完成,本次处理记录数: %s", total_saved)
def main() -> None:
arg_parser = build_arg_parser()
args = arg_parser.parse_args()
setup_logging(args.log)
logging.info("启动植物标本元数据采集任务")
logging.info("运行参数: %s", vars(args))
fetcher = GBIFFetcher(sleep_seconds=args.sleep)
parser = GBIFParser()
storage = SQLiteStorage(args.db)
storage.init_db()
try:
if args.mode == "search":
crawl_by_search(args, fetcher, parser, storage)
elif args.mode == "seeds":
crawl_by_seeds(args, fetcher, parser, storage)
else:
raise ValueError(f"未知模式: {args.mode}")
storage.export_csv(args.csv)
storage.export_json(args.json)
logging.info("当前数据库记录总数: %s", storage.count_records())
logging.info("CSV 已导出: %s", args.csv)
logging.info("JSON 已导出: %s", args.json)
finally:
storage.close()
logging.info("任务结束")
if __name__ == "__main__":
main()
9.3 启动方式
进入项目目录:
cd herbarium_meta_crawler
按搜索条件抓取,例如抓取植物保存标本记录,关键词为 Rosa,限制国家为中国,最多抓 2 页:
python -m src.main \
--mode search \
--q Rosa \
--country CN \
--year-from 1900 \
--year-to 2026 \
--limit 50 \
--max-pages 2 \
--sleep 0.8
Windows PowerShell 可以写成一行:
python -m src.main --mode search --q Rosa --country CN --year-from 1900 --year-to 2026 --limit 50 --max-pages 2 --sleep 0.8
按标本号种子抓取:
python -m src.main \
--mode seeds \
--seeds data/seeds.txt \
--limit 20 \
--sleep 1.0
9.4 输出在哪里
默认输出路径:
data/output/herbarium.sqlite3
data/output/herbarium_records.csv
data/output/herbarium_records.json
logs/crawler.log
查看 SQLite 记录:
sqlite3 data/output/herbarium.sqlite3
进入 SQLite 后执行:
SELECT
specimen_number,
scientific_name,
collection_place,
collection_date,
collector,
collection_link
FROM herbarium_records
LIMIT 5;
9.5 示例结果
下面是导出 CSV 的字段形态示例。真实结果会根据你运行时的查询条件、数据源更新情况而变化。
| specimen_number | scientific_name | collection_place | collection_date | collector | collection_link |
|---|---|---|---|---|---|
| K000123456 | Rosa chinensis Jacq. | China / Yunnan / Dali / Cangshan | 1987-05-21 | S. Wang | https://www.gbif.org/occurrence/123456789 |
| PE000112233 | Ficus virens Aiton | China / Guangxi / Guilin | 1994-08 | L. Chen | https://www.gbif.org/occurrence/223456789 |
| E000987654 | Camellia sinensis (L.) Kuntze | China / Zhejiang / Hangzhou | 1978 | H. Li | https://www.gbif.org/occurrence/323456789 |
| IBSC000456 | Rhododendron simsii Planch. | China / Guangdong / Guangzhou | 2001-03-12 | Q. Zhao | https://www.gbif.org/occurrence/423456789 |
| NY00123456 | Magnolia denudata Desr. | China / Hubei / Wuhan | 1965-04-18 | Unknown | https://www.gbif.org/occurrence/523456789 |
如果实际结果里 collector 为空,不代表代码错了。很多历史标本记录确实没有完整采集人字段,或者采集者信息被放在不同字段里。此时应该保留空值,而不是随意填“Unknown”。如果你确实要统一展示,可以在分析阶段再把空值渲染为“Unknown”。
🔟 常见问题与排错
10.1 403 怎么办?
403 表示拒绝访问。常见原因包括:
请求头不完整
访问路径不允许
请求方式不符合接口要求
访问频率异常
目标资源本身有限制
处理思路:
第一,检查 URL 是否正确。很多 403 不是反爬,而是路径错了。
第二,设置清晰的 User-Agent。不要使用空 UA,也不要写明显异常的 UA。
第三,降低频率。把 --sleep 调大,比如:
python -m src.main --mode search --q Rosa --sleep 2.0
第四,查看目标站点的 API 文档、robots.txt 或使用条款。如果不允许访问,就停止抓取。
不建议一遇到 403 就上代理。代理不能解决合规问题,也不能解决代码错误。先确认自己访问的是公开、允许、合理的接口。
10.2 429 怎么办?
429 表示请求过多。本文代码已经对 429 做了退避处理,但你仍然应该降低整体速度。
可以这样做:
python -m src.main --mode search --q Rosa --limit 20 --max-pages 5 --sleep 2.5
也可以减少任务范围:
python -m src.main --mode search --q Rosa --country CN --year-from 2000 --year-to 2020
不要把 limit 调得过大,也不要同时启动很多个进程。对公开数据平台来说,温和访问才是长期可用的方式。
10.3 HTML 抓到空壳怎么办?
如果你抓的是某个标本馆网页,而不是本文的 API,可能会遇到 HTML 里没有数据的情况。典型表现是:
requests 返回了 HTML
BeautifulSoup 找不到字段
浏览器里能看到数据
查看网页源码却看不到数据
这通常说明页面是动态渲染的。解决思路有三个:
第一,打开浏览器开发者工具,查看 Network 面板,找到真正返回 JSON 的接口。
第二,优先请求接口,而不是解析渲染后的页面。
第三,如果接口难以复用,再考虑 Playwright。示例思路:
from playwright.sync_api import sync_playwright
def render_page(url: str) -> str:
with sync_playwright() as p:
browser = p.chromium.launch(headless=True)
page = browser.new_page()
page.goto(url, wait_until="networkidle", timeout=30000)
html = page.content()
browser.close()
return html
但 Playwright 不应该作为首选。浏览器自动化资源开销更高,速度更慢,也更容易造成不必要的访问压力。
10.4 解析报错怎么办?
常见错误:
KeyError
TypeError
JSONDecodeError
IndexError
如果是 KeyError,说明字段不一定存在,不要用 item["field"],改用 item.get("field")。
如果是 TypeError,可能是字段类型与你预期不同。例如 recordedBy 有时可能是字符串,有时可能是列表。本文的 clean_text() 已经兼容了列表。
如果是 JSONDecodeError,说明响应不是 JSON。可能是服务异常、返回了错误页、被限流,或者 URL 写错。可以打印前 300 个字符排查:
print(response.status_code)
print(response.text[:300])
如果是 IndexError,多半是你假设列表一定有元素。正确写法是先判断:
items = payload.get("results") or []
if not items:
return
10.5 编码和乱码如何处理?
API JSON 通常是 UTF-8。写文件时建议明确指定编码:
open(path, "w", encoding="utf-8")
如果 CSV 要给 Excel 打开,可以用:
encoding="utf-8-sig"
本文导出 CSV 时已经这样处理:
with path.open("w", encoding="utf-8-sig", newline="") as f:
...
如果你从 HTML 页面解析中文,先看响应编码:
print(response.encoding)
print(response.apparent_encoding)
必要时手动设置:
response.encoding = "utf-8"
10.6 为什么有些记录没有采集地?
标本元数据来自不同机构、不同历史时期,完整度差异很大。有些记录只有国家,没有具体 locality;有些有经纬度,但 locality 缺失;有些出于保护原因不会公开精确位置。
本文策略是:
不乱补
不猜测
有多少公开字段就保存多少
如果后续做数据分析,可以把缺失程度作为一个指标。比如统计不同数据集的地点字段完整率,这本身就是一个有价值的质量分析方向。
10.7 为什么同一个标本号出现多条记录?
可能原因包括:
不同机构使用了相同 catalogNumber
同一记录被不同数据集重复发布
标本有副份或复份记录
历史编号和新编号同时存在
所以本文没有把 catalogNumber 当成唯一主键,而是使用 source_key。如果你需要更严格的去重,可以增加复合键:
UNIQUE(institution_code, collection_code, specimen_number)
但我建议不要一开始就这么做,因为很多记录的 institution_code 或 collection_code 也可能为空。更稳妥的方式是先保留原始数据,再在分析阶段做去重策略比较。
1️⃣1️⃣ 进阶优化
11.1 并发优化
这个项目可以并发,但不建议一开始就并发。等你确认字段解析稳定、入库逻辑正确、频率合规后,再考虑轻量并发。
如果要并发,建议使用线程池,因为 requests 是阻塞 I/O,线程池足够用了。
示意代码:
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_one_detail(fetcher, key):
return fetcher.get_occurrence_detail(key)
def fetch_details_concurrently(fetcher, keys, max_workers=3):
results = []
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(fetch_one_detail, fetcher, key): key
for key in keys
}
for future in as_completed(future_map):
key = future_map[future]
try:
data = future.result()
results.append(data)
except Exception as exc:
print(f"detail failed key={key} error={exc}")
return results
注意这里的 max_workers=3。公开 API 项目没必要开到 50 或 100。并发越大,失败率越高,日志越乱,排错越痛苦。
11.2 asyncio 版本思路
如果你想练异步,可以用 httpx.AsyncClient 或 aiohttp。但异步不等于无限并发。应该配合信号量:
import asyncio
import httpx
async def fetch_json(client, url, params, sem):
async with sem:
resp = await client.get(url, params=params, timeout=30)
resp.raise_for_status()
await asyncio.sleep(0.5)
return resp.json()
async def main_async():
sem = asyncio.Semaphore(3)
async with httpx.AsyncClient(headers={"User-Agent": "HerbariumMetaCrawler/1.0"}) as client:
tasks = [
fetch_json(client, "https://api.gbif.org/v1/occurrence/search", {"limit": 20, "offset": i * 20}, sem)
for i in range(5)
]
results = await asyncio.gather(*tasks, return_exceptions=True)
return results
如果只是几百到几千条记录,本文同步版已经够用。异步适合更大规模任务,但也更考验错误处理。
11.3 断点续跑
本文已经通过 SQLite 的唯一键支持“重复运行不重复插入”。但严格来说,这还不算完整断点续跑。完整断点续跑应该记录游标,例如:
当前 offset
当前 catalogNumber
当前数据集 key
失败次数
最后错误信息
最后运行时间
可以增加一张任务表:
CREATE TABLE IF NOT EXISTS crawl_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_type TEXT NOT NULL,
task_value TEXT NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
retry_count INTEGER NOT NULL DEFAULT 0,
last_error TEXT,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL,
UNIQUE(task_type, task_value)
);
任务状态可以设计成:
pending
running
success
failed
skipped
这样即使程序中断,下次也可以只捞 pending 和 failed 的任务继续跑。
11.4 失败任务单独保存
有些失败不是临时网络问题,而是数据异常。建议把失败任务写入文件:
def append_failed_task(path, task, error):
with open(path, "a", encoding="utf-8") as f:
f.write(f"{task}\t{error}\n")
或者入库:
CREATE TABLE IF NOT EXISTS failed_tasks (
id INTEGER PRIMARY KEY AUTOINCREMENT,
task_value TEXT NOT NULL,
error_message TEXT,
created_at TEXT NOT NULL
);
失败任务不要只打印在控制台。控制台信息一滚过去就没了,真正排错时很难找。
11.5 日志与监控
本文已经写了基础日志。实际项目里,我会关注这些指标:
总请求数
成功请求数
失败请求数
429 次数
平均响应时间
新增记录数
更新记录数
空字段比例
简单一点可以写日志,复杂一点可以写入 SQLite 表:
CREATE TABLE IF NOT EXISTS crawl_metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
metric_name TEXT NOT NULL,
metric_value REAL NOT NULL,
created_at TEXT NOT NULL
);
空字段比例很有用。例如:
SELECT
COUNT(*) AS total,
SUM(CASE WHEN collector = '' THEN 1 ELSE 0 END) AS missing_collector,
ROUND(
SUM(CASE WHEN collector = '' THEN 1 ELSE 0 END) * 100.0 / COUNT(*),
2
) AS missing_collector_rate
FROM herbarium_records;
这能告诉你采集人字段的缺失率。如果缺失率很高,你就知道不能把采集人作为强分析字段。
11.6 定时任务
如果你要定期更新数据,可以用 cron。
Linux/macOS 编辑定时任务:
crontab -e
每天凌晨 2 点运行:
0 2 * * * cd /path/to/herbarium_meta_crawler && /path/to/.venv/bin/python -m src.main --mode search --q Rosa --limit 50 --max-pages 5 --sleep 1.0 >> logs/cron.log 2>&1
如果是更正式的数据管道,可以考虑 Airflow、Prefect 或 Dagster。但对本文这种项目,cron 已经够用了。不要一开始就上复杂调度系统,先把数据链路跑稳。
11.7 数据质量增强
基础字段抓完后,可以继续做数据质量增强。
例如,地点字段拆分:
country
stateProvince
county
locality
采集日期拆分:
year
month
day
学名字段拆分:
genus
species
infraspecificEpithet
taxonRank
还可以做字段完整度评分:
def completeness_score(row: dict) -> float:
important_fields = [
"specimen_number",
"scientific_name",
"collection_place",
"collection_date",
"collector",
"collection_link",
]
filled = sum(1 for field in important_fields if row.get(field))
return filled / len(important_fields)
入库时增加一列:
completeness_score
这样后续筛选高质量记录就很方便。
11.8 原始数据归档
标准化字段很适合分析,但原始 JSON 也值得保留。因为你今天不需要的字段,明天可能就有用了。可以增加一个 raw 表:
CREATE TABLE IF NOT EXISTS raw_occurrences (
source_key TEXT PRIMARY KEY,
raw_json TEXT NOT NULL,
raw_hash TEXT NOT NULL,
created_at TEXT NOT NULL,
updated_at TEXT NOT NULL
);
保存原始 JSON:
import json
def save_raw(conn, source_key, item, raw_hash, now):
sql = """
INSERT INTO raw_occurrences (source_key, raw_json, raw_hash, created_at, updated_at)
VALUES (?, ?, ?, ?, ?)
ON CONFLICT(source_key) DO UPDATE SET
raw_json=excluded.raw_json,
raw_hash=excluded.raw_hash,
updated_at=excluded.updated_at
"""
conn.execute(
sql,
(
source_key,
json.dumps(item, ensure_ascii=False),
raw_hash,
now,
now,
),
)
我自己的经验是:只保存清洗结果,后面一定会后悔。原始数据占空间不算夸张,但能省下很多回头重抓的时间。
11.9 从 requests 迁移到 Scrapy
如果后续目标变成多个标本馆 HTML 页面,Scrapy 会更合适。Scrapy 的优势是:
内置调度器
自动去重
中间件机制
Pipeline 入库
日志和统计更完整
一个 Scrapy spider 的结构大概是:
import scrapy
class HerbariumSpider(scrapy.Spider):
name = "herbarium"
allowed_domains = ["example.org"]
start_urls = ["https://example.org/search?q=Rosa"]
def parse(self, response):
detail_links = response.css("a.record-link::attr(href)").getall()
for link in detail_links:
yield response.follow(link, callback=self.parse_detail)
next_page = response.css("a.next::attr(href)").get()
if next_page:
yield response.follow(next_page, callback=self.parse)
def parse_detail(self, response):
yield {
"specimen_number": response.css(".catalog-number::text").get(default="").strip(),
"scientific_name": response.css(".scientific-name::text").get(default="").strip(),
"collection_place": response.css(".locality::text").get(default="").strip(),
"collection_date": response.css(".event-date::text").get(default="").strip(),
"collector": response.css(".recorded-by::text").get(default="").strip(),
"collection_link": response.url,
}
但注意,这只是 HTML 站点的写法,不适合照搬到本文 API 项目里。
11.10 使用 Playwright 的场景
Playwright 适合这些情况:
页面必须执行 JavaScript 才能出现数据
接口参数经过前端复杂生成
页面需要点击、滚动、展开
需要截图或验证渲染结果
不适合这些情况:
已有公开 JSON API
只需要分页请求
只需要字段抽取
数据量较大
Playwright 是好工具,但不是万能钥匙。用错场景,会让项目又慢又重。
1️⃣2️⃣ 总结与延伸阅读
本文完成了一套公开植物标本元数据采集流程。我们没有把重点放在“如何绕过限制”,而是放在更值得练习的地方:
如何选择合适的数据入口
如何设计请求层
如何做失败重试和退避
如何解析缺失字段很多的公开数据
如何规范化标本号、学名、采集地、采集时间、采集人和馆藏链接
如何用 SQLite 做去重和断点续跑基础
如何导出 CSV/JSON 供后续分析
这类项目的核心不在于代码有多花,而在于数据链路是否稳。标本元数据非常适合练规范化思维:字段看似简单,但每一个字段都可能有历史包袱和数据质量问题。比如标本号不一定全局唯一,采集时间可能只有年份,采集地可能缺少 locality,采集人可能有多个写法。你越认真处理这些细节,最后得到的数据集越有用。
下一步可以继续做几件事。
第一,把原始 JSON 也保存下来。
这样后续想增加字段,不必重新抓。
第二,增加数据质量评分。
比如统计每条记录六个核心字段的完整度,筛选高质量记录做分析。
第三,迁移到 Scrapy。
如果目标变成多个 HTML 标本馆站点,Scrapy 的调度、去重和 Pipeline 会更方便。
第四,引入 Playwright。
如果目标站点是动态渲染页面,Playwright 可以作为补充方案。
第五,做可视化分析。
例如按年份统计采集数量,按地区统计记录密度,按学名统计热门类群,按机构统计字段完整度。
最后补一句个人感受:爬虫写到后面,真正拉开差距的不是会不会发请求,而是能不能把数据处理得干净、可追溯、可复查。植物标本元数据这种题材很适合训练这件事。它没有太多噱头,但它能逼你认真面对编号、字段、缺失、去重、合规和存储这些基本功。基本功扎实了,换成其他公开目录类数据,思路也能复用。
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐
所有评论(0)