㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这一篇我准备爬取 公开植物标本条目元数据,使用 Python 的 requests 访问公开 API,用标准库完成解析、清洗、去重、SQLite 存储与 CSV/JSON 导出,最终得到一份字段统一、可复用、可继续分析的植物标本元数据集。

读完本文,你大概能拿到三样东西:

第一,理解“唯一编号驱动型抓取”到底怎么设计,不是随便翻页保存 HTML,而是围绕标本号、馆藏编号、详情链接或 occurrence key 建立稳定的数据链路。

第二,掌握一个可运行的 Python 爬虫项目结构,从请求层、解析层、存储层到命令行入口都完整拆开,不再把所有逻辑堆在一个脚本里。

第三,得到一份适合二次分析的植物标本元数据,包括:标本号、学名、采集地、采集时间、采集人、馆藏链接等核心字段。

这类项目看起来不复杂,但真正写起来很考验工程习惯。尤其是标本数据,字段缺失、日期格式不统一、地点写法混杂、同一个标本号在不同机构里可能重复,这些细节会直接决定最后数据集是否可靠。我个人比较喜欢这种题材:它没有炫技,但很适合练基本功。

1️⃣ 摘要(Abstract)

本文以公开植物标本条目元数据为目标,使用 Python、requests、SQLite、CSV/JSON 等工具完成一个可复现的元数据抓取项目,最终产出规范化的植物标本数据集。

读完本文,你可以获得:

  1. 一套“采集 → 解析 → 清洗 → 存储 → 导出”的完整爬虫流程。
  2. 一份围绕唯一编号去重、断点续跑、失败重试设计的项目代码。
  3. 一种适合标本馆、博物馆、公开目录类站点的数据规范化思路。

本文选择 API 优先的方案,而不是强行解析网页。原因很简单:公开数据平台通常会提供结构化接口,直接读取 JSON 比抓网页更稳定,也更容易控制访问频率。网页更适合给人看,API 更适合给程序读。能走公开 API,就不要拿 HTML 选择器硬扛,这是我做这类数据采集时的基本习惯。

2️⃣ 背景与需求(Why)

植物标本馆保存了大量植物实体标本。每一份标本往往对应一条或多条公开元数据记录,常见信息包括标本编号、物种学名、采集地点、采集日期、采集人、馆藏机构、数据来源链接等。对研究者、数据分析人员、信息整理人员来说,这类数据价值很高。

为什么要爬这些数据?主要有三个原因。

第一,做数据分析。
例如统计某一植物类群在不同国家、地区、年份的采集情况;分析某些属种的采集时间跨度;观察不同标本馆的数字化记录完整度。标本数据不是普通商品数据,它背后往往有历史、地理、分类学和采集事件信息。

第二,做信息聚合。
公开标本记录分散在不同机构、不同平台中。如果每次都靠网页搜索,效率很低。把公开元数据抓下来,做成一个本地 SQLite 数据库或 CSV 文件,就可以按学名、采集人、地点、时间快速筛选。

第三,做自动化整理。
很多数据清洗工作并不适合手工完成。比如统一日期格式、合并地点字段、剔除空值、去重、生成馆藏链接、记录失败任务,这些工作交给程序更稳定。

本文的目标字段如下:

中文字段 英文字段 说明
标本号 specimen_number 优先使用 catalogNumber,如果缺失则回退到 occurrenceID 或平台 key
学名 scientific_name 优先使用公开记录中的 scientificName
采集地 collection_place 由国家、省州、县区、locality 等字段拼接
采集时间 collection_date 优先使用 eventDate,否则尝试用 year/month/day 拼接
采集人 collector 对应采集者字段,常见为 recordedBy
馆藏链接 collection_link 指向原始公开记录页面,便于复查
数据源 key source_key 平台侧唯一 key,用于去重和断点续跑
机构代码 institution_code 标本馆或机构缩写
馆藏代码 collection_code 馆藏集合代码
数据集 key dataset_key 数据集标识
数据许可证 license 记录许可信息,便于后续合规使用

其中前六个字段是本文主题要求的核心字段,后面几个是我个人强烈建议保留的辅助字段。原因很现实:只保留中文展示字段,短期看很清爽,长期用起来会很痛苦。比如“标本号”在不同机构中未必全局唯一,不带机构代码、数据源 key,后期排错会比较麻烦。

3️⃣ 合规与注意事项

爬虫不是“能访问就随便抓”。技术分享必须把边界说清楚。本文只针对公开可访问的条目元数据,不采集敏感信息,不绕过登录,不绕过付费限制,不规避反爬策略,也不做高并发压力访问。

3.1 robots.txt 基本说明

robots.txt 是网站放在根目录下的爬虫访问声明文件,常见路径类似:

https://example.org/robots.txt

它通常会告诉爬虫哪些路径允许访问,哪些路径不建议访问。对于传统 HTML 页面采集,正式运行前应该先看 robots.txt。如果目标站点明确禁止某些路径,就不要抓那些路径。

本文使用的是公开 API,并不是从页面里暴力解析标本详情 HTML。即便如此,依旧应该遵守服务条款、API 文档和合理访问频率。API 能访问,不代表可以无限请求。

3.2 频率控制

本文代码默认每次请求之间会暂停一小段时间,并且提供 --sleep 参数。建议从保守频率开始,比如:

python -m src.main --mode search --limit 100 --sleep 0.8

不要一上来就开几十个线程、几百个协程。公开数据平台不是你的压测对象,尤其是科研数据平台,很多服务资源需要供全球用户共同使用。

3.3 不做攻击式并发

本文代码没有默认开启并发。后面进阶部分会提到并发优化,但并发只是为了提升可控范围内的效率,不是为了绕过限制。真正做工程时,我更关心稳定性,而不是单次跑得多快。一个小时慢慢抓完并且数据干净,比三分钟抓崩、丢字段、触发限制要好得多。

3.4 不采集敏感信息

本文只处理公开标本元数据,例如学名、采集地、采集时间、采集人、公开记录链接等。对于涉及敏感物种、精确坐标、受保护地点、个人联系方式等信息,应遵循数据源本身的公开范围和使用要求。公开页面没有展示的,不要尝试通过接口参数、隐藏字段或非正常方式去挖。

3.5 不绕过付费或登录限制

如果某个标本馆要求登录、申请授权或限制下载,那就按规则申请,不要绕过。本文的做法是中性的、工程化的:只使用公开接口,只采集公开字段,只做合理频控,只做技术学习和数据整理。

4️⃣ 技术选型与整体流程(What/How)

4.1 静态、动态、API:本文属于哪一种?

常见爬虫目标大致分三类。

第一类是静态 HTML。
页面源码里直接有数据,用 requests + BeautifulSoup/lxml 就能解析。这种简单,但选择器容易受页面改版影响。

第二类是动态渲染页面。
网页首屏 HTML 里没有真实数据,数据由 JavaScript 后续请求接口加载。遇到这种页面,可以分析接口,也可以使用 Playwright/Selenium 渲染页面。我的习惯是优先找接口,最后才用浏览器自动化。

第三类是公开 API。
服务方直接提供结构化 JSON 接口。本文属于这一类。API 返回的数据结构更稳定,字段更明确,也更适合规范化入库。

本文使用 API-first 的采集方式,主要工具是:

requests      负责 HTTP 请求
sqlite3       负责本地数据库
csv/json      负责导出
argparse      负责命令行参数
logging       负责运行日志
hashlib       负责内容 hash
time/random   负责频控与退避

没有引入 Scrapy,是因为这个项目的核心不是大规模站点抓取,而是围绕公开 API 做规范化数据集。Scrapy 很强,但初始结构偏重;本文希望读者复制后可以直接跑起来。

没有引入 Playwright,是因为本文不需要渲染网页。只要 API 可用,就不应该为了“看起来像爬虫”而使用浏览器自动化。Playwright 适合动态页面、登录态交互、复杂前端渲染,不适合简单 JSON API 数据采集。

4.2 整体流程

本文的流程可以写成一句话:

种子编号 / 查询条件
        ↓
请求公开 API
        ↓
解析 JSON
        ↓
字段清洗与规范化
        ↓
SQLite 去重入库
        ↓
导出 CSV / JSON
        ↓
复查失败任务与日志

如果换成更工程化的模块划分,就是:

Fetcher  请求层:负责发请求、重试、频控、处理状态码
Parser   解析层:负责从 JSON 中抽取字段并规范化
Storage  存储层:负责 SQLite 建表、去重、导出
Main     入口层:负责命令行参数和流程编排

我不太喜欢把请求、解析、存储全塞进一个函数。短脚本当然可以这么写,但一旦字段增加、失败重试增加、导出格式增加,单文件脚本会很快失控。这个项目虽然不大,但我们仍然按照小型工程来组织。

5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

推荐使用:

Python 3.10+

我建议至少使用 Python 3.10。原因不是语法多先进,而是现在很多依赖和系统环境对 3.8 以下已经不太友好。用 3.10 或 3.11 基本能减少很多无意义的兼容问题。

查看版本:

python --version

5.2 创建虚拟环境

Linux/macOS:

python -m venv .venv
source .venv/bin/activate

Windows PowerShell:

python -m venv .venv
.venv\Scripts\Activate.ps1

5.3 安装依赖

本文尽量少用第三方库。核心依赖只有 requests

requirements.txt

requests==2.32.3

安装:

pip install -r requirements.txt

如果你希望命令行进度条更好看,可以额外安装 tqdm,但本文代码不强制依赖。

5.4 推荐项目结构

herbarium_meta_crawler/
├── README.md
├── requirements.txt
├── data/
│   ├── seeds.txt
│   └── output/
│       ├── herbarium_records.csv
│       ├── herbarium_records.json
│       └── herbarium.sqlite3
├── logs/
│   └── crawler.log
└── src/
    ├── __init__.py
    ├── config.py
    ├── fetcher.py
    ├── parser.py
    ├── storage.py
    ├── utils.py
    └── main.py

创建目录:

mkdir -p herbarium_meta_crawler/data/output
mkdir -p herbarium_meta_crawler/logs
mkdir -p herbarium_meta_crawler/src
touch herbarium_meta_crawler/src/__init__.py

Windows 下可以手动创建,也可以用 PowerShell:

New-Item -ItemType Directory -Force herbarium_meta_crawler\data\output
New-Item -ItemType Directory -Force herbarium_meta_crawler\logs
New-Item -ItemType Directory -Force herbarium_meta_crawler\src
New-Item -ItemType File -Force herbarium_meta_crawler\src\__init__.py

6️⃣ 核心实现:请求层(Fetcher)

请求层负责所有 HTTP 细节,包括 headers、timeout、session、失败重试和退避。这个模块写好以后,解析层就不用关心网络异常。

6.1 headers:User-Agent、Referer、Accept

请求公开 API 时,最基本的 headers 应该包含:

User-Agent
Accept
Referer

User-Agent 不建议伪装成浏览器。很多人喜欢复制浏览器 UA,但对于 API 项目,我更推荐写清楚项目名和联系方式。比如:

HerbariumMetaCrawler/1.0 (+mailto:your_email@example.com)

这不是为了“更像真人”,而是为了更透明。如果服务方发现异常访问,至少知道如何联系你。

Referer 可以放公开网站首页或项目说明地址。对 API 来说它通常不是必需字段,但保留它可以让请求更完整。

6.2 timeout

不要让请求无限等待。本文设置:

timeout=(5, 25)

含义是连接超时 5 秒,读取超时 25 秒。网络波动是常态,不设置 timeout 的爬虫迟早会卡死在某个请求上。

6.3 session/cookie

本文使用 requests.Session() 复用连接,提高稳定性。由于采集的是公开 API,不需要登录 cookie。代码里保留 session,但不写任何绕过登录或模拟账号行为。

6.4 失败处理:重试与退避

常见可重试状态码:

429 Too Many Requests
500 Internal Server Error
502 Bad Gateway
503 Service Unavailable
504 Gateway Timeout

遇到这些状态码,不要立刻密集重试。正确做法是:

第一次失败:等待 1 秒左右
第二次失败:等待 2 秒左右
第三次失败:等待 4 秒左右
再加一点随机抖动,避免固定节奏请求

如果响应头里有 Retry-After,优先尊重它。

下面是请求层完整代码。

src/config.py

from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[1]

DATA_DIR = PROJECT_ROOT / "data"
OUTPUT_DIR = DATA_DIR / "output"
LOG_DIR = PROJECT_ROOT / "logs"

OUTPUT_DIR.mkdir(parents=True, exist_ok=True)
LOG_DIR.mkdir(parents=True, exist_ok=True)

GBIF_API_BASE = "https://api.gbif.org/v1"
GBIF_OCCURRENCE_SEARCH = f"{GBIF_API_BASE}/occurrence/search"
GBIF_OCCURRENCE_DETAIL = f"{GBIF_API_BASE}/occurrence"

DEFAULT_DB_PATH = OUTPUT_DIR / "herbarium.sqlite3"
DEFAULT_CSV_PATH = OUTPUT_DIR / "herbarium_records.csv"
DEFAULT_JSON_PATH = OUTPUT_DIR / "herbarium_records.json"
DEFAULT_LOG_PATH = LOG_DIR / "crawler.log"

DEFAULT_USER_AGENT = "HerbariumMetaCrawler/1.0 (+mailto:example@example.com)"

DEFAULT_HEADERS = {
    "User-Agent": DEFAULT_USER_AGENT,
    "Accept": "application/json,text/plain,*/*",
    "Referer": "https://www.gbif.org/",
}

REQUEST_TIMEOUT = (5, 25)

RETRY_STATUS_CODES = {429, 500, 502, 503, 504}

这里的邮箱请改成你自己的联系方式。如果只是本地练习,可以保留示例值,但如果你准备长期跑任务,最好写真实可联系邮箱。

src/fetcher.py

import logging
import random
import time
from typing import Any, Dict, Optional

import requests

from .config import (
    DEFAULT_HEADERS,
    GBIF_OCCURRENCE_DETAIL,
    GBIF_OCCURRENCE_SEARCH,
    REQUEST_TIMEOUT,
    RETRY_STATUS_CODES,
)


class FetchError(RuntimeError):
    """请求失败时抛出的异常。"""


class GBIFFetcher:
    """
    GBIF Occurrence API 请求封装。

    这个类只负责网络请求,不负责字段清洗。
    """

    def __init__(
        self,
        sleep_seconds: float = 0.6,
        max_retries: int = 3,
        timeout=REQUEST_TIMEOUT,
        headers: Optional[Dict[str, str]] = None,
    ) -> None:
        self.sleep_seconds = sleep_seconds
        self.max_retries = max_retries
        self.timeout = timeout

        self.session = requests.Session()
        self.session.headers.update(headers or DEFAULT_HEADERS)

    def _sleep_between_requests(self) -> None:
        """
        普通请求之间的基础停顿。
        加一点随机抖动,避免请求节奏过于机械。
        """
        if self.sleep_seconds <= 0:
            return
        jitter = random.uniform(0, self.sleep_seconds * 0.3)
        time.sleep(self.sleep_seconds + jitter)

    def _get_json(self, url: str, params: Optional[Dict[str, Any]] = None) -> Dict[str, Any]:
        """
        GET JSON,并处理重试、退避、状态码。
        """
        last_error: Optional[Exception] = None

        for attempt in range(1, self.max_retries + 1):
            self._sleep_between_requests()

            try:
                logging.debug("GET %s params=%s attempt=%s", url, params, attempt)
                response = self.session.get(url, params=params, timeout=self.timeout)

                if response.status_code == 200:
                    try:
                        return response.json()
                    except ValueError as exc:
                        raise FetchError(f"响应不是合法 JSON: {response.text[:200]}") from exc

                if response.status_code in RETRY_STATUS_CODES:
                    retry_after = response.headers.get("Retry-After")
                    wait_seconds = self._compute_backoff(attempt, retry_after)
                    logging.warning(
                        "可重试状态码: status=%s url=%s wait=%.2fs",
                        response.status_code,
                        response.url,
                        wait_seconds,
                    )
                    time.sleep(wait_seconds)
                    continue

                raise FetchError(
                    f"请求失败: status={response.status_code}, "
                    f"url={response.url}, body={response.text[:300]}"
                )

            except (requests.Timeout, requests.ConnectionError) as exc:
                last_error = exc
                wait_seconds = self._compute_backoff(attempt)
                logging.warning(
                    "网络异常: %s attempt=%s wait=%.2fs",
                    repr(exc),
                    attempt,
                    wait_seconds,
                )
                time.sleep(wait_seconds)

            except requests.RequestException as exc:
                last_error = exc
                wait_seconds = self._compute_backoff(attempt)
                logging.warning(
                    "请求异常: %s attempt=%s wait=%.2fs",
                    repr(exc),
                    attempt,
                    wait_seconds,
                )
                time.sleep(wait_seconds)

        raise FetchError(f"超过最大重试次数,最后异常: {last_error}")

    @staticmethod
    def _compute_backoff(attempt: int, retry_after: Optional[str] = None) -> float:
        """
        计算退避时间。
        如果服务端返回 Retry-After,并且是数字,则优先使用。
        """
        if retry_after and retry_after.isdigit():
            return min(float(retry_after), 60.0)

        base = 2 ** (attempt - 1)
        jitter = random.uniform(0, 0.8)
        return min(base + jitter, 30.0)

    def search_occurrences(
        self,
        offset: int,
        limit: int,
        q: Optional[str] = None,
        country: Optional[str] = None,
        year_from: Optional[int] = None,
        year_to: Optional[int] = None,
        dataset_key: Optional[str] = None,
        catalog_number: Optional[str] = None,
    ) -> Dict[str, Any]:
        """
        检索植物保存标本记录。

        basisOfRecord=PRESERVED_SPECIMEN 表示保存标本。
        kingdomKey=6 通常用于限制植物界记录。
        """
        params: Dict[str, Any] = {
            "basisOfRecord": "PRESERVED_SPECIMEN",
            "kingdomKey": 6,
            "limit": limit,
            "offset": offset,
        }

        if q:
            params["q"] = q

        if country:
            params["country"] = country.upper()

        if dataset_key:
            params["datasetKey"] = dataset_key

        if catalog_number:
            params["catalogNumber"] = catalog_number

        if year_from and year_to:
            params["eventDate"] = f"{year_from},{year_to}"
        elif year_from:
            params["eventDate"] = f"{year_from},"
        elif year_to:
            params["eventDate"] = f",{year_to}"

        return self._get_json(GBIF_OCCURRENCE_SEARCH, params=params)

    def get_occurrence_detail(self, key: str | int) -> Dict[str, Any]:
        """
        根据 GBIF occurrence key 获取详情。
        """
        url = f"{GBIF_OCCURRENCE_DETAIL}/{key}"
        return self._get_json(url)

这一层最重要的不是“能请求”,而是“失败时知道为什么失败,并且不把服务打爆”。爬虫稳定性大部分来自这些不起眼的细节。

7️⃣ 核心实现:解析层(Parser)

解析层负责把 API 返回的 JSON 转成我们自己的数据结构。不要把外部接口字段直接当作数据库字段,因为外部字段可能很宽、很杂,而我们需要的是稳定的本地模型。

7.1 解析方式

本文使用 JSON 解析,不需要 XPath、CSS 或 BeautifulSoup。

如果目标站点是 HTML 页面,可以用:

BeautifulSoup:适合简单页面
lxml + XPath:适合结构清晰、字段位置稳定的页面
parsel:适合 Scrapy 风格选择器

但当前项目没有必要这么做。API 已经返回 JSON,直接读字典字段即可。

7.2 列表页如何拿详情链接

API 搜索接口返回的是一批 occurrence 记录。每条记录一般会有平台 key。我们可以用这个 key 生成公开馆藏链接:

collection_link = f"https://www.gbif.org/occurrence/{key}"

也可以再请求详情接口:

GET https://api.gbif.org/v1/occurrence/{key}

本文默认先用搜索结果里的字段。如果你希望字段更完整,可以开启详情补抓。详情补抓更慢,但可靠性更高。

7.3 详情页如何抽字段

目标字段映射如下:

标本号      catalogNumber / occurrenceID / key
学名        scientificName / acceptedScientificName / species
采集地      country + stateProvince + county + locality
采集时间    eventDate / year-month-day
采集人      recordedBy
馆藏链接    https://www.gbif.org/occurrence/{key}

这里最容易踩坑的是“标本号”。很多人以为 catalogNumber 一定全局唯一,但它通常只在某个机构、某个馆藏集合内有意义。因此本文数据库去重优先使用 source_keycollection_link,而不是单独拿 catalogNumber 当全局唯一键。

7.4 缺失字段怎么办

公开标本数据经常缺字段。比如:

有学名,但没有采集人
有国家,但没有 locality
有 year/month/day,但没有 eventDate
有 occurrenceID,但没有 catalogNumber

处理思路是:

能规范化就规范化
能回退就回退
不能确定就留空
不要凭空编造

字段缺失不是程序错误。真正的错误是把缺失字段硬补成看似完整的假数据。

下面是解析层代码。

src/utils.py

import hashlib
import json
import logging
from datetime import datetime
from typing import Any, Iterable, Optional


def setup_logging(log_path: str) -> None:
    """
    初始化日志。
    同时输出到控制台和文件。
    """
    logging.basicConfig(
        level=logging.INFO,
        format="%(asctime)s | %(levelname)s | %(message)s",
        handlers=[
            logging.StreamHandler(),
            logging.FileHandler(log_path, encoding="utf-8"),
        ],
    )


def clean_text(value: Any) -> str:
    """
    清洗字符串:
    - None 转空字符串
    - list/tuple 用分号拼接
    - 压缩多余空白
    """
    if value is None:
        return ""

    if isinstance(value, (list, tuple)):
        value = "; ".join(str(v) for v in value if v is not None)

    text = str(value).replace("\u0000", "").strip()
    text = " ".join(text.split())
    return text


def join_non_empty(parts: Iterable[Any], sep: str = " / ") -> str:
    """
    拼接非空字段。
    """
    cleaned = [clean_text(p) for p in parts]
    cleaned = [p for p in cleaned if p]
    return sep.join(cleaned)


def normalize_date(event_date: Any, year: Any = None, month: Any = None, day: Any = None) -> str:
    """
    规范化采集时间。

    优先使用 eventDate。
    如果没有,则尝试 year/month/day。
    不强行解析所有历史日期,只做温和清洗。
    """
    event_date_text = clean_text(event_date)
    if event_date_text:
        return event_date_text

    y = clean_text(year)
    m = clean_text(month)
    d = clean_text(day)

    if y and m and d:
        return f"{y.zfill(4)}-{m.zfill(2)}-{d.zfill(2)}"
    if y and m:
        return f"{y.zfill(4)}-{m.zfill(2)}"
    if y:
        return y.zfill(4)

    return ""


def stable_hash(data: dict) -> str:
    """
    生成内容 hash。
    用于辅助判断记录内容是否变化。
    """
    raw = json.dumps(data, ensure_ascii=False, sort_keys=True)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()


def now_iso() -> str:
    return datetime.utcnow().replace(microsecond=0).isoformat() + "Z"


def safe_int(value: Any, default: int = 0) -> int:
    try:
        return int(value)
    except (TypeError, ValueError):
        return default

src/parser.py

from dataclasses import asdict, dataclass
from typing import Any, Dict, Iterable, List

from .utils import clean_text, join_non_empty, normalize_date, stable_hash


@dataclass
class HerbariumRecord:
    """
    本项目自己的标准化记录结构。
    """
    source_key: str
    specimen_number: str
    scientific_name: str
    collection_place: str
    collection_date: str
    collector: str
    collection_link: str
    institution_code: str
    collection_code: str
    dataset_key: str
    basis_of_record: str
    license: str
    raw_hash: str

    def to_dict(self) -> Dict[str, str]:
        return asdict(self)


class GBIFParser:
    """
    GBIF occurrence JSON 解析器。
    """

    @staticmethod
    def extract_keys_from_search(payload: Dict[str, Any]) -> List[str]:
        """
        从搜索结果中提取 occurrence key。
        """
        results = payload.get("results") or []
        keys: List[str] = []

        for item in results:
            key = item.get("key") or item.get("gbifID")
            if key is not None:
                keys.append(str(key))

        return keys

    @staticmethod
    def iter_results(payload: Dict[str, Any]) -> Iterable[Dict[str, Any]]:
        """
        迭代搜索结果。
        """
        results = payload.get("results") or []
        for item in results:
            if isinstance(item, dict):
                yield item

    def parse_occurrence(self, item: Dict[str, Any]) -> HerbariumRecord:
        """
        把一条 GBIF occurrence JSON 转为 HerbariumRecord。
        """
        source_key = clean_text(item.get("key") or item.get("gbifID"))

        specimen_number = self._pick_first(
            item,
            [
                "catalogNumber",
                "occurrenceID",
                "recordNumber",
                "otherCatalogNumbers",
                "gbifID",
                "key",
            ],
        )

        scientific_name = self._pick_first(
            item,
            [
                "scientificName",
                "acceptedScientificName",
                "species",
                "genericName",
            ],
        )

        collection_place = join_non_empty(
            [
                item.get("country"),
                item.get("stateProvince"),
                item.get("county"),
                item.get("municipality"),
                item.get("locality"),
            ]
        )

        collection_date = normalize_date(
            item.get("eventDate"),
            item.get("year"),
            item.get("month"),
            item.get("day"),
        )

        collector = self._pick_first(
            item,
            [
                "recordedBy",
                "identifiedBy",
            ],
        )

        if source_key:
            collection_link = f"https://www.gbif.org/occurrence/{source_key}"
        else:
            collection_link = clean_text(item.get("references"))

        record = HerbariumRecord(
            source_key=source_key,
            specimen_number=specimen_number,
            scientific_name=scientific_name,
            collection_place=collection_place,
            collection_date=collection_date,
            collector=collector,
            collection_link=collection_link,
            institution_code=clean_text(item.get("institutionCode")),
            collection_code=clean_text(item.get("collectionCode")),
            dataset_key=clean_text(item.get("datasetKey")),
            basis_of_record=clean_text(item.get("basisOfRecord")),
            license=clean_text(item.get("license")),
            raw_hash=stable_hash(item),
        )

        return record

    @staticmethod
    def _pick_first(item: Dict[str, Any], field_names: List[str]) -> str:
        """
        从多个候选字段中取第一个非空值。
        """
        for name in field_names:
            value = clean_text(item.get(name))
            if value:
                return value
        return ""

这个解析器有一个特点:它不假设字段一定存在。爬虫解析代码最忌讳这样写:

scientific_name = item["scientificName"]

只要某条记录缺字段,程序就会炸。更稳妥的方式是:

scientific_name = item.get("scientificName", "")

再配合统一的 clean_text(),后面入库就会舒服很多。

8️⃣ 数据存储与导出(Storage)

本文选择 SQLite 起步。原因很简单:

零服务部署
跨平台
支持唯一约束
支持 SQL 查询
适合中小规模数据集
方便导出 CSV/JSON

CSV 很适合交换,但不适合断点续跑和去重。JSON 保存原始结构很方便,但查询不如数据库直接。SQLite 则比较折中:轻量,但足够工程化。

8.1 字段映射表

字段名 类型 示例值 说明
source_key TEXT 1234567890 平台记录 key,作为主要唯一字段
specimen_number TEXT K000123456 标本号或回退编号
scientific_name TEXT Rosa chinensis Jacq. 学名
collection_place TEXT China / Yunnan / Dali / Cangshan 拼接后的采集地
collection_date TEXT 1987-05-21 采集时间
collector TEXT S. Wang 采集人
collection_link TEXT https://www.gbif.org/occurrence/... 馆藏公开链接
institution_code TEXT K 机构代码
collection_code TEXT Herbarium 馆藏代码
dataset_key TEXT UUID 数据集 key
basis_of_record TEXT PRESERVED_SPECIMEN 记录类型
license TEXT CC_BY_4_0 许可证
raw_hash TEXT SHA256 原始记录 hash
created_at TEXT ISO 时间 入库时间
updated_at TEXT ISO 时间 更新时间

8.2 去重策略

本文使用三层去重:

第一层,source_key 唯一。
如果平台记录 key 存在,它通常是最可靠的唯一标识。

第二层,collection_link 唯一。
链接能帮助复查,也可以防止同一记录重复写入。

第三层,内容 hash。
如果同一个 key 的内容发生变化,可以比较 raw_hash,用于后续增量更新。

不建议只用 specimen_number 去重。标本号在单个馆藏内部可能唯一,但跨机构未必唯一。更稳的唯一判断应该是:

source_key
或
institution_code + collection_code + specimen_number
或
collection_link

下面是存储层代码。

src/storage.py

import csv
import json
import sqlite3
from pathlib import Path
from typing import Iterable, List

from .parser import HerbariumRecord
from .utils import now_iso


class SQLiteStorage:
    """
    SQLite 存储层。
    """

    def __init__(self, db_path: str | Path) -> None:
        self.db_path = Path(db_path)
        self.conn = sqlite3.connect(self.db_path)
        self.conn.row_factory = sqlite3.Row

    def init_db(self) -> None:
        """
        初始化数据表。
        """
        sql = """
        CREATE TABLE IF NOT EXISTS herbarium_records (
            id INTEGER PRIMARY KEY AUTOINCREMENT,
            source_key TEXT NOT NULL,
            specimen_number TEXT,
            scientific_name TEXT,
            collection_place TEXT,
            collection_date TEXT,
            collector TEXT,
            collection_link TEXT,
            institution_code TEXT,
            collection_code TEXT,
            dataset_key TEXT,
            basis_of_record TEXT,
            license TEXT,
            raw_hash TEXT,
            created_at TEXT NOT NULL,
            updated_at TEXT NOT NULL,
            UNIQUE(source_key)
        );

        CREATE INDEX IF NOT EXISTS idx_scientific_name
        ON herbarium_records(scientific_name);

        CREATE INDEX IF NOT EXISTS idx_specimen_number
        ON herbarium_records(specimen_number);

        CREATE INDEX IF NOT EXISTS idx_collection_date
        ON herbarium_records(collection_date);

        CREATE INDEX IF NOT EXISTS idx_dataset_key
        ON herbarium_records(dataset_key);
        """
        self.conn.executescript(sql)
        self.conn.commit()

    def upsert_record(self, record: HerbariumRecord) -> bool:
        """
        插入或更新记录。

        返回 True 表示新增或更新成功。
        """
        now = now_iso()
        data = record.to_dict()

        sql = """
        INSERT INTO herbarium_records (
            source_key,
            specimen_number,
            scientific_name,
            collection_place,
            collection_date,
            collector,
            collection_link,
            institution_code,
            collection_code,
            dataset_key,
            basis_of_record,
            license,
            raw_hash,
            created_at,
            updated_at
        )
        VALUES (
            :source_key,
            :specimen_number,
            :scientific_name,
            :collection_place,
            :collection_date,
            :collector,
            :collection_link,
            :institution_code,
            :collection_code,
            :dataset_key,
            :basis_of_record,
            :license,
            :raw_hash,
            :created_at,
            :updated_at
        )
        ON CONFLICT(source_key) DO UPDATE SET
            specimen_number=excluded.specimen_number,
            scientific_name=excluded.scientific_name,
            collection_place=excluded.collection_place,
            collection_date=excluded.collection_date,
            collector=excluded.collector,
            collection_link=excluded.collection_link,
            institution_code=excluded.institution_code,
            collection_code=excluded.collection_code,
            dataset_key=excluded.dataset_key,
            basis_of_record=excluded.basis_of_record,
            license=excluded.license,
            raw_hash=excluded.raw_hash,
            updated_at=excluded.updated_at;
        """

        data["created_at"] = now
        data["updated_at"] = now

        self.conn.execute(sql, data)
        self.conn.commit()
        return True

    def source_key_exists(self, source_key: str) -> bool:
        sql = "SELECT 1 FROM herbarium_records WHERE source_key = ? LIMIT 1"
        cur = self.conn.execute(sql, (source_key,))
        return cur.fetchone() is not None

    def count_records(self) -> int:
        cur = self.conn.execute("SELECT COUNT(*) AS cnt FROM herbarium_records")
        row = cur.fetchone()
        return int(row["cnt"])

    def fetch_all(self) -> List[dict]:
        sql = """
        SELECT
            source_key,
            specimen_number,
            scientific_name,
            collection_place,
            collection_date,
            collector,
            collection_link,
            institution_code,
            collection_code,
            dataset_key,
            basis_of_record,
            license,
            raw_hash
        FROM herbarium_records
        ORDER BY id ASC
        """
        cur = self.conn.execute(sql)
        return [dict(row) for row in cur.fetchall()]

    def export_csv(self, path: str | Path) -> None:
        rows = self.fetch_all()
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        if not rows:
            with path.open("w", encoding="utf-8-sig", newline="") as f:
                f.write("")
            return

        fieldnames = list(rows[0].keys())

        with path.open("w", encoding="utf-8-sig", newline="") as f:
            writer = csv.DictWriter(f, fieldnames=fieldnames)
            writer.writeheader()
            writer.writerows(rows)

    def export_json(self, path: str | Path) -> None:
        rows = self.fetch_all()
        path = Path(path)
        path.parent.mkdir(parents=True, exist_ok=True)

        with path.open("w", encoding="utf-8") as f:
            json.dump(rows, f, ensure_ascii=False, indent=2)

    def close(self) -> None:
        self.conn.close()

这段代码有两个实用点。

第一,使用 ON CONFLICT(source_key) DO UPDATE,同一条记录再次抓到时不会插入重复行,而是更新字段。

第二,导出 CSV 时用了 utf-8-sig。这对中文用户比较友好,因为很多人会用 Excel 打开 CSV,utf-8-sig 可以减少乱码问题。

9️⃣ 运行方式与结果展示

现在把入口文件写出来。入口文件负责读取命令行参数,决定是按查询条件分页抓取,还是按种子标本号抓取。

9.1 种子文件格式

data/seeds.txt

K000123456
E000987654
PE000112233

每行一个标本号。空行会被忽略。真实使用时,你可以把自己已有的标本号放进去,让程序按 catalogNumber 查询。

9.2 主程序代码

src/main.py

import argparse
import logging
from pathlib import Path
from typing import Iterable, List

from .config import (
    DEFAULT_CSV_PATH,
    DEFAULT_DB_PATH,
    DEFAULT_JSON_PATH,
    DEFAULT_LOG_PATH,
)
from .fetcher import FetchError, GBIFFetcher
from .parser import GBIFParser
from .storage import SQLiteStorage
from .utils import safe_int, setup_logging


def read_seed_file(path: str | Path) -> List[str]:
    seed_path = Path(path)
    if not seed_path.exists():
        raise FileNotFoundError(f"种子文件不存在: {seed_path}")

    seeds: List[str] = []
    with seed_path.open("r", encoding="utf-8") as f:
        for line in f:
            value = line.strip()
            if value and not value.startswith("#"):
                seeds.append(value)

    return seeds


def build_arg_parser() -> argparse.ArgumentParser:
    parser = argparse.ArgumentParser(
        description="抓取公开植物标本馆条目元数据,导出 SQLite/CSV/JSON。"
    )

    parser.add_argument(
        "--mode",
        choices=["search", "seeds"],
        default="search",
        help="search 表示按查询条件分页抓取;seeds 表示按 seeds.txt 中的标本号抓取。",
    )

    parser.add_argument(
        "--q",
        default=None,
        help="全文关键词,例如 Rosa、Ficus、Yunnan 等。",
    )

    parser.add_argument(
        "--country",
        default=None,
        help="国家代码,例如 CN、US、GB。为空则不限制。",
    )

    parser.add_argument(
        "--year-from",
        type=int,
        default=None,
        help="采集年份起始,例如 1900。",
    )

    parser.add_argument(
        "--year-to",
        type=int,
        default=None,
        help="采集年份结束,例如 2026。",
    )

    parser.add_argument(
        "--dataset-key",
        default=None,
        help="指定数据集 key。为空则不限制。",
    )

    parser.add_argument(
        "--limit",
        type=int,
        default=50,
        help="每页数量。建议从 20-100 开始。",
    )

    parser.add_argument(
        "--max-pages",
        type=int,
        default=2,
        help="search 模式下最多抓取页数。",
    )

    parser.add_argument(
        "--sleep",
        type=float,
        default=0.6,
        help="每次请求之间的基础停顿秒数。",
    )

    parser.add_argument(
        "--seeds",
        default="data/seeds.txt",
        help="seeds 模式下的标本号文件。",
    )

    parser.add_argument(
        "--db",
        default=str(DEFAULT_DB_PATH),
        help="SQLite 数据库路径。",
    )

    parser.add_argument(
        "--csv",
        default=str(DEFAULT_CSV_PATH),
        help="CSV 导出路径。",
    )

    parser.add_argument(
        "--json",
        default=str(DEFAULT_JSON_PATH),
        help="JSON 导出路径。",
    )

    parser.add_argument(
        "--log",
        default=str(DEFAULT_LOG_PATH),
        help="日志文件路径。",
    )

    return parser


def crawl_by_search(args, fetcher: GBIFFetcher, parser: GBIFParser, storage: SQLiteStorage) -> None:
    """
    按查询条件分页抓取。
    """
    total_saved = 0

    for page in range(args.max_pages):
        offset = page * args.limit

        logging.info(
            "开始抓取搜索页: page=%s offset=%s limit=%s",
            page + 1,
            offset,
            args.limit,
        )

        try:
            payload = fetcher.search_occurrences(
                offset=offset,
                limit=args.limit,
                q=args.q,
                country=args.country,
                year_from=args.year_from,
                year_to=args.year_to,
                dataset_key=args.dataset_key,
            )
        except FetchError as exc:
            logging.error("搜索页抓取失败: offset=%s error=%s", offset, exc)
            continue

        results = list(parser.iter_results(payload))
        if not results:
            logging.info("当前页没有结果,提前结束。")
            break

        for item in results:
            try:
                record = parser.parse_occurrence(item)

                if not record.source_key:
                    logging.warning("跳过缺少 source_key 的记录: %s", item)
                    continue

                storage.upsert_record(record)
                total_saved += 1

            except Exception as exc:
                logging.exception("解析或入库失败: %s", exc)

        end_of_records = bool(payload.get("endOfRecords"))
        count = safe_int(payload.get("count"), 0)

        logging.info(
            "完成页: page=%s results=%s total_count_hint=%s end=%s",
            page + 1,
            len(results),
            count,
            end_of_records,
        )

        if end_of_records:
            break

    logging.info("search 模式完成,本次处理记录数: %s", total_saved)


def crawl_by_seeds(args, fetcher: GBIFFetcher, parser: GBIFParser, storage: SQLiteStorage) -> None:
    """
    按标本号种子抓取。
    """
    seeds = read_seed_file(args.seeds)
    logging.info("读取种子数量: %s", len(seeds))

    total_saved = 0

    for index, catalog_number in enumerate(seeds, start=1):
        logging.info(
            "按标本号查询: %s/%s catalogNumber=%s",
            index,
            len(seeds),
            catalog_number,
        )

        try:
            payload = fetcher.search_occurrences(
                offset=0,
                limit=args.limit,
                q=args.q,
                country=args.country,
                year_from=args.year_from,
                year_to=args.year_to,
                dataset_key=args.dataset_key,
                catalog_number=catalog_number,
            )
        except FetchError as exc:
            logging.error("标本号查询失败: catalogNumber=%s error=%s", catalog_number, exc)
            continue

        results = list(parser.iter_results(payload))
        if not results:
            logging.warning("没有查到记录: catalogNumber=%s", catalog_number)
            continue

        for item in results:
            try:
                record = parser.parse_occurrence(item)
                if not record.source_key:
                    logging.warning("跳过缺少 source_key 的记录: catalogNumber=%s", catalog_number)
                    continue

                storage.upsert_record(record)
                total_saved += 1

            except Exception as exc:
                logging.exception("解析或入库失败: catalogNumber=%s error=%s", catalog_number, exc)

    logging.info("seeds 模式完成,本次处理记录数: %s", total_saved)


def main() -> None:
    arg_parser = build_arg_parser()
    args = arg_parser.parse_args()

    setup_logging(args.log)

    logging.info("启动植物标本元数据采集任务")
    logging.info("运行参数: %s", vars(args))

    fetcher = GBIFFetcher(sleep_seconds=args.sleep)
    parser = GBIFParser()
    storage = SQLiteStorage(args.db)
    storage.init_db()

    try:
        if args.mode == "search":
            crawl_by_search(args, fetcher, parser, storage)
        elif args.mode == "seeds":
            crawl_by_seeds(args, fetcher, parser, storage)
        else:
            raise ValueError(f"未知模式: {args.mode}")

        storage.export_csv(args.csv)
        storage.export_json(args.json)

        logging.info("当前数据库记录总数: %s", storage.count_records())
        logging.info("CSV 已导出: %s", args.csv)
        logging.info("JSON 已导出: %s", args.json)

    finally:
        storage.close()
        logging.info("任务结束")


if __name__ == "__main__":
    main()

9.3 启动方式

进入项目目录:

cd herbarium_meta_crawler

按搜索条件抓取,例如抓取植物保存标本记录,关键词为 Rosa,限制国家为中国,最多抓 2 页:

python -m src.main \
  --mode search \
  --q Rosa \
  --country CN \
  --year-from 1900 \
  --year-to 2026 \
  --limit 50 \
  --max-pages 2 \
  --sleep 0.8

Windows PowerShell 可以写成一行:

python -m src.main --mode search --q Rosa --country CN --year-from 1900 --year-to 2026 --limit 50 --max-pages 2 --sleep 0.8

按标本号种子抓取:

python -m src.main \
  --mode seeds \
  --seeds data/seeds.txt \
  --limit 20 \
  --sleep 1.0

9.4 输出在哪里

默认输出路径:

data/output/herbarium.sqlite3
data/output/herbarium_records.csv
data/output/herbarium_records.json
logs/crawler.log

查看 SQLite 记录:

sqlite3 data/output/herbarium.sqlite3

进入 SQLite 后执行:

SELECT
    specimen_number,
    scientific_name,
    collection_place,
    collection_date,
    collector,
    collection_link
FROM herbarium_records
LIMIT 5;

9.5 示例结果

下面是导出 CSV 的字段形态示例。真实结果会根据你运行时的查询条件、数据源更新情况而变化。

specimen_number scientific_name collection_place collection_date collector collection_link
K000123456 Rosa chinensis Jacq. China / Yunnan / Dali / Cangshan 1987-05-21 S. Wang https://www.gbif.org/occurrence/123456789
PE000112233 Ficus virens Aiton China / Guangxi / Guilin 1994-08 L. Chen https://www.gbif.org/occurrence/223456789
E000987654 Camellia sinensis (L.) Kuntze China / Zhejiang / Hangzhou 1978 H. Li https://www.gbif.org/occurrence/323456789
IBSC000456 Rhododendron simsii Planch. China / Guangdong / Guangzhou 2001-03-12 Q. Zhao https://www.gbif.org/occurrence/423456789
NY00123456 Magnolia denudata Desr. China / Hubei / Wuhan 1965-04-18 Unknown https://www.gbif.org/occurrence/523456789

如果实际结果里 collector 为空,不代表代码错了。很多历史标本记录确实没有完整采集人字段,或者采集者信息被放在不同字段里。此时应该保留空值,而不是随意填“Unknown”。如果你确实要统一展示,可以在分析阶段再把空值渲染为“Unknown”。

🔟 常见问题与排错

10.1 403 怎么办?

403 表示拒绝访问。常见原因包括:

请求头不完整
访问路径不允许
请求方式不符合接口要求
访问频率异常
目标资源本身有限制

处理思路:

第一,检查 URL 是否正确。很多 403 不是反爬,而是路径错了。

第二,设置清晰的 User-Agent。不要使用空 UA,也不要写明显异常的 UA。

第三,降低频率。把 --sleep 调大,比如:

python -m src.main --mode search --q Rosa --sleep 2.0

第四,查看目标站点的 API 文档、robots.txt 或使用条款。如果不允许访问,就停止抓取。

不建议一遇到 403 就上代理。代理不能解决合规问题,也不能解决代码错误。先确认自己访问的是公开、允许、合理的接口。

10.2 429 怎么办?

429 表示请求过多。本文代码已经对 429 做了退避处理,但你仍然应该降低整体速度。

可以这样做:

python -m src.main --mode search --q Rosa --limit 20 --max-pages 5 --sleep 2.5

也可以减少任务范围:

python -m src.main --mode search --q Rosa --country CN --year-from 2000 --year-to 2020

不要把 limit 调得过大,也不要同时启动很多个进程。对公开数据平台来说,温和访问才是长期可用的方式。

10.3 HTML 抓到空壳怎么办?

如果你抓的是某个标本馆网页,而不是本文的 API,可能会遇到 HTML 里没有数据的情况。典型表现是:

requests 返回了 HTML
BeautifulSoup 找不到字段
浏览器里能看到数据
查看网页源码却看不到数据

这通常说明页面是动态渲染的。解决思路有三个:

第一,打开浏览器开发者工具,查看 Network 面板,找到真正返回 JSON 的接口。

第二,优先请求接口,而不是解析渲染后的页面。

第三,如果接口难以复用,再考虑 Playwright。示例思路:

from playwright.sync_api import sync_playwright

def render_page(url: str) -> str:
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page()
        page.goto(url, wait_until="networkidle", timeout=30000)
        html = page.content()
        browser.close()
        return html

但 Playwright 不应该作为首选。浏览器自动化资源开销更高,速度更慢,也更容易造成不必要的访问压力。

10.4 解析报错怎么办?

常见错误:

KeyError
TypeError
JSONDecodeError
IndexError

如果是 KeyError,说明字段不一定存在,不要用 item["field"],改用 item.get("field")

如果是 TypeError,可能是字段类型与你预期不同。例如 recordedBy 有时可能是字符串,有时可能是列表。本文的 clean_text() 已经兼容了列表。

如果是 JSONDecodeError,说明响应不是 JSON。可能是服务异常、返回了错误页、被限流,或者 URL 写错。可以打印前 300 个字符排查:

print(response.status_code)
print(response.text[:300])

如果是 IndexError,多半是你假设列表一定有元素。正确写法是先判断:

items = payload.get("results") or []
if not items:
    return

10.5 编码和乱码如何处理?

API JSON 通常是 UTF-8。写文件时建议明确指定编码:

open(path, "w", encoding="utf-8")

如果 CSV 要给 Excel 打开,可以用:

encoding="utf-8-sig"

本文导出 CSV 时已经这样处理:

with path.open("w", encoding="utf-8-sig", newline="") as f:
    ...

如果你从 HTML 页面解析中文,先看响应编码:

print(response.encoding)
print(response.apparent_encoding)

必要时手动设置:

response.encoding = "utf-8"

10.6 为什么有些记录没有采集地?

标本元数据来自不同机构、不同历史时期,完整度差异很大。有些记录只有国家,没有具体 locality;有些有经纬度,但 locality 缺失;有些出于保护原因不会公开精确位置。

本文策略是:

不乱补
不猜测
有多少公开字段就保存多少

如果后续做数据分析,可以把缺失程度作为一个指标。比如统计不同数据集的地点字段完整率,这本身就是一个有价值的质量分析方向。

10.7 为什么同一个标本号出现多条记录?

可能原因包括:

不同机构使用了相同 catalogNumber
同一记录被不同数据集重复发布
标本有副份或复份记录
历史编号和新编号同时存在

所以本文没有把 catalogNumber 当成唯一主键,而是使用 source_key。如果你需要更严格的去重,可以增加复合键:

UNIQUE(institution_code, collection_code, specimen_number)

但我建议不要一开始就这么做,因为很多记录的 institution_codecollection_code 也可能为空。更稳妥的方式是先保留原始数据,再在分析阶段做去重策略比较。

1️⃣1️⃣ 进阶优化

11.1 并发优化

这个项目可以并发,但不建议一开始就并发。等你确认字段解析稳定、入库逻辑正确、频率合规后,再考虑轻量并发。

如果要并发,建议使用线程池,因为 requests 是阻塞 I/O,线程池足够用了。

示意代码:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_one_detail(fetcher, key):
    return fetcher.get_occurrence_detail(key)

def fetch_details_concurrently(fetcher, keys, max_workers=3):
    results = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_map = {
            executor.submit(fetch_one_detail, fetcher, key): key
            for key in keys
        }

        for future in as_completed(future_map):
            key = future_map[future]
            try:
                data = future.result()
                results.append(data)
            except Exception as exc:
                print(f"detail failed key={key} error={exc}")

    return results

注意这里的 max_workers=3。公开 API 项目没必要开到 50 或 100。并发越大,失败率越高,日志越乱,排错越痛苦。

11.2 asyncio 版本思路

如果你想练异步,可以用 httpx.AsyncClientaiohttp。但异步不等于无限并发。应该配合信号量:

import asyncio
import httpx

async def fetch_json(client, url, params, sem):
    async with sem:
        resp = await client.get(url, params=params, timeout=30)
        resp.raise_for_status()
        await asyncio.sleep(0.5)
        return resp.json()

async def main_async():
    sem = asyncio.Semaphore(3)
    async with httpx.AsyncClient(headers={"User-Agent": "HerbariumMetaCrawler/1.0"}) as client:
        tasks = [
            fetch_json(client, "https://api.gbif.org/v1/occurrence/search", {"limit": 20, "offset": i * 20}, sem)
            for i in range(5)
        ]
        results = await asyncio.gather(*tasks, return_exceptions=True)
        return results

如果只是几百到几千条记录,本文同步版已经够用。异步适合更大规模任务,但也更考验错误处理。

11.3 断点续跑

本文已经通过 SQLite 的唯一键支持“重复运行不重复插入”。但严格来说,这还不算完整断点续跑。完整断点续跑应该记录游标,例如:

当前 offset
当前 catalogNumber
当前数据集 key
失败次数
最后错误信息
最后运行时间

可以增加一张任务表:

CREATE TABLE IF NOT EXISTS crawl_tasks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    task_type TEXT NOT NULL,
    task_value TEXT NOT NULL,
    status TEXT NOT NULL DEFAULT 'pending',
    retry_count INTEGER NOT NULL DEFAULT 0,
    last_error TEXT,
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    UNIQUE(task_type, task_value)
);

任务状态可以设计成:

pending
running
success
failed
skipped

这样即使程序中断,下次也可以只捞 pendingfailed 的任务继续跑。

11.4 失败任务单独保存

有些失败不是临时网络问题,而是数据异常。建议把失败任务写入文件:

def append_failed_task(path, task, error):
    with open(path, "a", encoding="utf-8") as f:
        f.write(f"{task}\t{error}\n")

或者入库:

CREATE TABLE IF NOT EXISTS failed_tasks (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    task_value TEXT NOT NULL,
    error_message TEXT,
    created_at TEXT NOT NULL
);

失败任务不要只打印在控制台。控制台信息一滚过去就没了,真正排错时很难找。

11.5 日志与监控

本文已经写了基础日志。实际项目里,我会关注这些指标:

总请求数
成功请求数
失败请求数
429 次数
平均响应时间
新增记录数
更新记录数
空字段比例

简单一点可以写日志,复杂一点可以写入 SQLite 表:

CREATE TABLE IF NOT EXISTS crawl_metrics (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    metric_name TEXT NOT NULL,
    metric_value REAL NOT NULL,
    created_at TEXT NOT NULL
);

空字段比例很有用。例如:

SELECT
    COUNT(*) AS total,
    SUM(CASE WHEN collector = '' THEN 1 ELSE 0 END) AS missing_collector,
    ROUND(
        SUM(CASE WHEN collector = '' THEN 1 ELSE 0 END) * 100.0 / COUNT(*),
        2
    ) AS missing_collector_rate
FROM herbarium_records;

这能告诉你采集人字段的缺失率。如果缺失率很高,你就知道不能把采集人作为强分析字段。

11.6 定时任务

如果你要定期更新数据,可以用 cron。

Linux/macOS 编辑定时任务:

crontab -e

每天凌晨 2 点运行:

0 2 * * * cd /path/to/herbarium_meta_crawler && /path/to/.venv/bin/python -m src.main --mode search --q Rosa --limit 50 --max-pages 5 --sleep 1.0 >> logs/cron.log 2>&1

如果是更正式的数据管道,可以考虑 Airflow、Prefect 或 Dagster。但对本文这种项目,cron 已经够用了。不要一开始就上复杂调度系统,先把数据链路跑稳。

11.7 数据质量增强

基础字段抓完后,可以继续做数据质量增强。

例如,地点字段拆分:

country
stateProvince
county
locality

采集日期拆分:

year
month
day

学名字段拆分:

genus
species
infraspecificEpithet
taxonRank

还可以做字段完整度评分:

def completeness_score(row: dict) -> float:
    important_fields = [
        "specimen_number",
        "scientific_name",
        "collection_place",
        "collection_date",
        "collector",
        "collection_link",
    ]
    filled = sum(1 for field in important_fields if row.get(field))
    return filled / len(important_fields)

入库时增加一列:

completeness_score

这样后续筛选高质量记录就很方便。

11.8 原始数据归档

标准化字段很适合分析,但原始 JSON 也值得保留。因为你今天不需要的字段,明天可能就有用了。可以增加一个 raw 表:

CREATE TABLE IF NOT EXISTS raw_occurrences (
    source_key TEXT PRIMARY KEY,
    raw_json TEXT NOT NULL,
    raw_hash TEXT NOT NULL,
    created_at TEXT NOT NULL,
    updated_at TEXT NOT NULL
);

保存原始 JSON:

import json

def save_raw(conn, source_key, item, raw_hash, now):
    sql = """
    INSERT INTO raw_occurrences (source_key, raw_json, raw_hash, created_at, updated_at)
    VALUES (?, ?, ?, ?, ?)
    ON CONFLICT(source_key) DO UPDATE SET
        raw_json=excluded.raw_json,
        raw_hash=excluded.raw_hash,
        updated_at=excluded.updated_at
    """
    conn.execute(
        sql,
        (
            source_key,
            json.dumps(item, ensure_ascii=False),
            raw_hash,
            now,
            now,
        ),
    )

我自己的经验是:只保存清洗结果,后面一定会后悔。原始数据占空间不算夸张,但能省下很多回头重抓的时间。

11.9 从 requests 迁移到 Scrapy

如果后续目标变成多个标本馆 HTML 页面,Scrapy 会更合适。Scrapy 的优势是:

内置调度器
自动去重
中间件机制
Pipeline 入库
日志和统计更完整

一个 Scrapy spider 的结构大概是:

import scrapy


class HerbariumSpider(scrapy.Spider):
    name = "herbarium"
    allowed_domains = ["example.org"]
    start_urls = ["https://example.org/search?q=Rosa"]

    def parse(self, response):
        detail_links = response.css("a.record-link::attr(href)").getall()
        for link in detail_links:
            yield response.follow(link, callback=self.parse_detail)

        next_page = response.css("a.next::attr(href)").get()
        if next_page:
            yield response.follow(next_page, callback=self.parse)

    def parse_detail(self, response):
        yield {
            "specimen_number": response.css(".catalog-number::text").get(default="").strip(),
            "scientific_name": response.css(".scientific-name::text").get(default="").strip(),
            "collection_place": response.css(".locality::text").get(default="").strip(),
            "collection_date": response.css(".event-date::text").get(default="").strip(),
            "collector": response.css(".recorded-by::text").get(default="").strip(),
            "collection_link": response.url,
        }

但注意,这只是 HTML 站点的写法,不适合照搬到本文 API 项目里。

11.10 使用 Playwright 的场景

Playwright 适合这些情况:

页面必须执行 JavaScript 才能出现数据
接口参数经过前端复杂生成
页面需要点击、滚动、展开
需要截图或验证渲染结果

不适合这些情况:

已有公开 JSON API
只需要分页请求
只需要字段抽取
数据量较大

Playwright 是好工具,但不是万能钥匙。用错场景,会让项目又慢又重。

1️⃣2️⃣ 总结与延伸阅读

本文完成了一套公开植物标本元数据采集流程。我们没有把重点放在“如何绕过限制”,而是放在更值得练习的地方:

如何选择合适的数据入口
如何设计请求层
如何做失败重试和退避
如何解析缺失字段很多的公开数据
如何规范化标本号、学名、采集地、采集时间、采集人和馆藏链接
如何用 SQLite 做去重和断点续跑基础
如何导出 CSV/JSON 供后续分析

这类项目的核心不在于代码有多花,而在于数据链路是否稳。标本元数据非常适合练规范化思维:字段看似简单,但每一个字段都可能有历史包袱和数据质量问题。比如标本号不一定全局唯一,采集时间可能只有年份,采集地可能缺少 locality,采集人可能有多个写法。你越认真处理这些细节,最后得到的数据集越有用。

下一步可以继续做几件事。

第一,把原始 JSON 也保存下来。
这样后续想增加字段,不必重新抓。

第二,增加数据质量评分。
比如统计每条记录六个核心字段的完整度,筛选高质量记录做分析。

第三,迁移到 Scrapy。
如果目标变成多个 HTML 标本馆站点,Scrapy 的调度、去重和 Pipeline 会更方便。

第四,引入 Playwright。
如果目标站点是动态渲染页面,Playwright 可以作为补充方案。

第五,做可视化分析。
例如按年份统计采集数量,按地区统计记录密度,按学名统计热门类群,按机构统计字段完整度。

最后补一句个人感受:爬虫写到后面,真正拉开差距的不是会不会发请求,而是能不能把数据处理得干净、可追溯、可复查。植物标本元数据这种题材很适合训练这件事。它没有太多噱头,但它能逼你认真面对编号、字段、缺失、去重、合规和存储这些基本功。基本功扎实了,换成其他公开目录类数据,思路也能复用。

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐