㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐☆☆☆(基础级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这篇文章要做一件很具体的事:用 Python 采集公开古籍数字馆藏目录中的版本流传信息,借助 requestsurllib.robotparserpydanticsqlite3csv 等工具,把“书名、版本、刊刻年代、藏馆、链接”整理成一份可复用的数据表。

读完这篇,你大概能得到三样东西:

  1. 一套适合古籍目录页的元数据抽取思路,不再只会抓“标题 + 链接”这种普通榜单字段;
  2. 一个可以直接运行的 Python 小项目,支持请求、解析、清洗、去重、导出;
  3. 一些我个人在做目录类采集时常用的容错经验,比如版本字段不稳定、年代混杂在说明文字里、馆藏字段需要规范化等问题怎么处理。

我一直觉得,古籍目录爬虫和普通图书爬虫最大的区别在于:普通图书关心“这本书叫什么、谁写的、多少钱、评分多少”,古籍目录更关心“这个本子是什么版本、什么时候刊刻、现藏哪里、有没有数字影像入口”。这些信息经常不在同一个字段里,有时藏在摘要、题名、副题、注释、责任者说明里。写这种爬虫,不能只盯着选择器,还要理解一点目录学字段的习惯。

1️⃣ 摘要(Abstract)

本文以公开古籍数字馆藏目录为示例,用 Python 构建一个“古籍版本流传信息目录页”采集器,最终输出包含书名、版本、刊刻年代、藏馆、链接的 CSV 与 SQLite 数据库。

读完本文,你可以掌握:

  1. 如何从公开目录页或 JSON API 中抽取版本型元数据;
  2. 如何设计请求层、解析层、清洗层、存储层,让爬虫不是一次性脚本;
  3. 如何处理古籍目录里常见的字段缺失、版本描述混乱、年代表达不统一、重复记录等问题。

本文的实现重点并不是炫技,也不是追求极限并发,而是把一个可以长期维护的小爬虫写得稳一点、干净一点。尤其是古籍类数据,我更建议慢慢抓、按规矩抓、把字段解释清楚,而不是为了速度把站点打爆。

2️⃣ 背景与需求(Why)

2.1 为什么要爬古籍版本流传信息

我先说一个很朴素的需求:当我们想整理某一批古籍的流传情况时,手动打开一个个馆藏页面,复制题名、版本、年代、藏馆、链接,其实非常低效。尤其是目录页动辄几百条、几千条,人工整理不仅慢,而且容易出现格式不一致的问题。

爬取这类数据,通常有三个比较实际的用途。

第一是数据分析。比如想观察某个馆藏中明刻本、清刻本、抄本、稿本的大致比例,或者想统计某个朝代版本集中出现在哪些类目中。没有结构化数据之前,这类统计很难做。

第二是信息聚合。很多古籍资源分散在不同机构的目录系统中。即便暂时只采集一个站点,也可以先把字段模型设计好,后面再扩展到其他馆藏目录,形成自己的检索索引。

第三是自动化整理。比如定期抓取公开目录的新增记录,把新出现的题名和版本信息追加到本地数据库里。这个需求在长期做文献整理的人那里很常见,虽然听起来不复杂,但真正做起来,字段规范化非常关键。

2.2 本文目标站点与字段

本文示例采用公开古籍数字馆藏目录,技术上优先使用公开 JSON 接口。和直接解析 HTML 相比,接口返回的数据结构更稳定,也更容易遵守站点预期的访问方式。

本文目标字段如下:

字段 中文含义 说明
title 书名 目录记录的主标题,可能含卷数、别名、拼音或英文转写
version 版本 如宋刻本、元刻本、明刊本、清抄本、稿本、活字本、重刊本等
publication_date 刊刻年代 可以是具体年份、年代范围、朝代,或者从说明中抽取出的估计值
holding_institution 藏馆 该记录所属馆藏机构或馆藏集合
link 链接 详情页或数字资源入口
source_id 来源 ID 从链接或接口字段中提取,便于去重
raw_date 原始年代字段 保留原始值,避免清洗过程丢失信息
raw_summary 原始说明文本 方便后续复核版本判断
content_hash 内容哈希 用于辅助去重和变更检测
scraped_at 采集时间 记录数据生成时间

注意,前五个字段是题目要求的核心字段,后几个字段属于工程化补充。我的习惯是:只要涉及目录数据,最好保留一些原始字段。因为古籍版本判断有时并不是机器能百分百确认的,后续人工复核时,原始摘要和原始年代字段很有用。

2.3 版本型元数据和普通图书榜单的不同

普通图书榜单一般有非常规整的结构:

  • 标题;
  • 作者;
  • 评分;
  • 价格;
  • 排名;
  • 封面;
  • 链接。

这种页面往往一条记录就是一个卡片,卡片内字段稳定,爬虫只要写好 CSS 选择器就能跑。

古籍版本目录不一样。它的字段经常有几个特点:

  1. 版本信息不一定有独立字段,可能在题名、说明、注释、摘要里;
  2. 年代信息可能是“明万历”“清康熙”“ca. 1260-1368”“reprinted in 1775”这类混合表达;
  3. 藏馆字段可能不是一个干净的馆名,而是“某某馆藏集合”“某部门”“某专题数据库”;
  4. 同一书名可能有多个版本,不能只按书名去重;
  5. 一个详情页可能包含多种语言的题名,标题清洗需要保守。

所以本文的思路不是“写一个选择器把字段抓下来”,而是先把采集流程拆成四层:

采集 → 解析 → 清洗 → 存储

这样后面目标站点变了,只需要改请求参数或解析规则,其他层仍然可以复用。


3️⃣ 合规与注意事项(必写)

写爬虫之前,我一般先看三件事:站点有没有 robots.txt,是否提供 API,是否需要登录或付费。只要其中任何一项显示“不适合抓”,我都会换方案,而不是强行绕。

3.1 robots.txt 基本说明

robots.txt 是网站给自动化访问程序看的规则文件,一般位于站点根路径下。它不等于法律合同,也不一定覆盖所有使用条款,但它是最基本的礼貌边界。

爬虫启动前建议做一次检查:

from urllib import robotparser

def can_fetch(robots_url: str, user_agent: str, target_url: str) -> bool:
    rp = robotparser.RobotFileParser()
    rp.set_url(robots_url)
    rp.read()
    return rp.can_fetch(user_agent, target_url)

实际项目里,我会把这个检查放到启动流程中。如果 robots.txt 明确不允许访问目标路径,那就不抓。对于没有明确规则的页面,也要把频率降下来,避免对服务器造成压力。

3.2 频率控制,不要攻击式并发

古籍目录站点通常不是电商站,也不是专门给高频访问准备的服务。很多机构网站的首要任务是展示资源,不是承受大量爬虫请求。

本文示例默认采用单线程低频访问:

  • 每页请求后睡眠 1 到 3 秒;
  • 出现 429 时读取 Retry-After
  • 失败后指数退避;
  • 不开启攻击式并发;
  • 不请求图片大文件,只采集目录元数据。

如果只是整理元数据,绝大多数情况下不需要并发。你真正耗时的不是请求,而是后续清洗和复核。

3.3 不采集敏感信息

本文只采集公开目录元数据,不采集个人隐私,不采集登录后才可见的内容,不绕过验证码,不绕过付费限制,也不下载受限制的大批量影像文件。

比较稳妥的表达是:

只访问公开页面或公开接口,只提取目录型书目信息,只在合理频率下做研究和学习用途的数据整理。

这句话不是装样子。实际做采集时,边界感非常重要。尤其是文化遗产类资源,很多机构已经花了很大成本做数字化,能提供公开检索已经不容易。我们写代码的人,至少不要把它们当作压力测试靶场。

3.4 优先使用 API,而不是强行解析页面

如果站点公开提供 JSON API,我会优先使用 API。理由很简单:

  1. API 返回结构化数据,解析更稳定;
  2. API 通常是网站预期的机器访问方式;
  3. 页面 HTML 可能为了展示而频繁调整;
  4. API 可以减少无关静态资源请求。

本文就采用这个策略。后面仍然会讲 HTML 解析思路,因为并不是每个古籍目录站都提供 API。但在可选的情况下,API 是更合适的起点。


4️⃣ 技术选型与整体流程(What/How)

4.1 静态、动态、API:本文属于哪种

常见采集对象可以分成三类。

第一类是静态 HTML。服务端直接返回完整内容,使用 requests + BeautifulSoup/lxml 就能解析。

第二类是动态渲染页面。首屏 HTML 是空壳,真实数据由浏览器执行 JavaScript 后再加载。这种情况可以抓接口,也可以用 Playwright 渲染页面。

第三类是公开 API。服务端直接返回 JSON、XML 或其他结构化数据。本文采用这一类作为主方案。

本文选择:

  • 请求层:requests.Session
  • 合规检查:urllib.robotparser
  • 数据模型:pydantic
  • 解析层:JSON 字段读取 + 正则抽取版本和年代
  • 存储层:CSV + SQLite
  • 日志:标准库 logging
  • 配置:.env + python-dotenv

为什么不一上来用 Scrapy?因为本文目标是把古籍元数据抽取逻辑讲清楚。Scrapy 很强,但它会引入更多框架概念。对于第一版小项目,requests 足够清晰。等字段规则稳定之后,再迁移到 Scrapy 也不迟。

为什么不用 Playwright?因为目标数据可以通过公开 JSON 获取。Playwright 更适合必须渲染浏览器的场景,比如接口签名复杂、数据隐藏在前端状态里、必须触发交互后才加载。能不用浏览器,就先不用浏览器。

4.2 整体流程

流程可以写成这样:

读取配置
  ↓
检查 robots.txt
  ↓
创建 requests.Session
  ↓
分页请求目录 API
  ↓
解析每条记录
  ↓
抽取 title / version / publication_date / holding_institution / link
  ↓
字段清洗与标准化
  ↓
生成 source_id 与 content_hash
  ↓
写入 SQLite
  ↓
导出 CSV
  ↓
打印示例结果与统计信息

如果用 Mermaid 表达,大概是:

No

Yes

Start: Load Config

Check robots.txt

Allowed?

Stop

Fetch JSON Pages

Parse Result Items

Extract Version Metadata

Normalize Fields

Deduplicate by URL or Hash

Save to SQLite

Export CSV

Show Sample Rows

4.3 为什么选择 requests + JSON 解析

古籍目录采集最怕两个问题:页面结构变化和字段解释不清。使用 JSON API 可以减轻第一个问题。第二个问题则需要靠解析层和清洗层解决。

本文选择 requests,原因有四个:

  1. 学习成本低,读者能快速运行;
  2. Session 配合后能统一 headers、timeout、重试;
  3. 对 JSON API 足够稳定;
  4. 方便后续拆成 Fetcher、Parser、Storage 三层。

选择 pydantic 是为了让数据结构明确。很多爬虫脚本写到最后会变成一堆字典,字段名到处飞,后期维护很痛苦。古籍元数据尤其应该有模型,因为字段含义比普通列表页复杂得多。

选择 SQLite 是因为它轻。CSV 方便打开查看,SQLite 方便去重、查询和断点续跑。本文两者都保留:先入库,再导出 CSV。


5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

建议使用 Python 3.10 以上。本文代码在 Python 3.10、3.11、3.12 的语法范围内都可以运行。

查看版本:

python --version

建议输出类似:

Python 3.11.8

5.2 创建虚拟环境

mkdir rarebook_version_spider
cd rarebook_version_spider

python -m venv .venv

# macOS / Linux
source .venv/bin/activate

# Windows PowerShell
# .venv\Scripts\Activate.ps1

5.3 安装依赖

新建 requirements.txt

requests==2.32.3
pydantic==2.8.2
python-dotenv==1.0.1
beautifulsoup4==4.12.3
lxml==5.2.2
tqdm==4.66.5

安装:

pip install -r requirements.txt

说明一下:本文主流程走 JSON API,理论上不一定需要 beautifulsoup4lxml。但我仍然放进依赖,是为了保留 HTML 解析扩展能力。很多古籍目录站没有 API,到那时只要新增 HTML parser 即可。

5.4 推荐项目结构

rarebook_version_spider/
├── README.md
├── requirements.txt
├── .env.example
├── data/
│   ├── rarebooks.sqlite3
│   └── rarebooks.csv
├── logs/
│   └── spider.log
├── rarebook_spider/
│   ├── __init__.py
│   ├── config.py
│   ├── models.py
│   ├── fetcher.py
│   ├── parser.py
│   ├── cleaner.py
│   ├── storage.py
│   └── utils.py
├── main.py
└── tests/
    └── test_parser.py

这种结构没有什么玄学,只是把职责拆开:

  • config.py:读取配置;
  • models.py:定义数据模型;
  • fetcher.py:负责请求;
  • parser.py:负责从 JSON 或 HTML 中解析字段;
  • cleaner.py:负责清洗、规范化;
  • storage.py:负责 SQLite 和 CSV;
  • utils.py:放通用函数;
  • main.py:入口文件。

我不建议把所有逻辑塞进一个 spider.py。小脚本当然可以这样写,但一旦你开始加重试、日志、去重、导出,单文件会变得很难读。


6️⃣ 核心实现:请求层(Fetcher)

请求层的目标很简单:稳定、克制、可恢复。

一个好的 Fetcher 至少要处理这些问题:

  • headers;
  • timeout;
  • session;
  • robots 检查;
  • HTTP 状态码;
  • 重试和退避;
  • 频率控制;
  • JSON 解析异常;
  • 日志记录。

6.1 配置文件

先写 .env.example

RAREBOOK_BASE_URL=https://www.loc.gov/collections/chinese-rare-books/
RAREBOOK_ROBOTS_URL=https://www.loc.gov/robots.txt
RAREBOOK_USER_AGENT=RareBookVersionSpider/1.0 (+contact: example@example.com)
RAREBOOK_REFERER=https://www.loc.gov/collections/chinese-rare-books/
RAREBOOK_PAGE_SIZE=50
RAREBOOK_MAX_PAGES=3
RAREBOOK_SLEEP_MIN=1.0
RAREBOOK_SLEEP_MAX=3.0
RAREBOOK_TIMEOUT=20
RAREBOOK_DB_PATH=data/rarebooks.sqlite3
RAREBOOK_CSV_PATH=data/rarebooks.csv

把它复制成 .env

cp .env.example .env

这里的 User-Agent 建议写得诚实一点。如果你是真做长期项目,最好留一个联系邮箱。不要伪装成搜索引擎,不要伪装成浏览器大规模请求。技术圈很多麻烦,不是代码能力问题,而是边界感问题。

6.2 config.py

# rarebook_spider/config.py

from __future__ import annotations

from dataclasses import dataclass
import os
from dotenv import load_dotenv


@dataclass(frozen=True)
class Settings:
    base_url: str
    robots_url: str
    user_agent: str
    referer: str
    page_size: int
    max_pages: int
    sleep_min: float
    sleep_max: float
    timeout: int
    db_path: str
    csv_path: str


def load_settings() -> Settings:
    load_dotenv()

    return Settings(
        base_url=os.getenv(
            "RAREBOOK_BASE_URL",
            "https://www.loc.gov/collections/chinese-rare-books/",
        ),
        robots_url=os.getenv(
            "RAREBOOK_ROBOTS_URL",
            "https://www.loc.gov/robots.txt",
        ),
        user_agent=os.getenv(
            "RAREBOOK_USER_AGENT",
            "RareBookVersionSpider/1.0 (+contact: example@example.com)",
        ),
        referer=os.getenv(
            "RAREBOOK_REFERER",
            "https://www.loc.gov/collections/chinese-rare-books/",
        ),
        page_size=int(os.getenv("RAREBOOK_PAGE_SIZE", "50")),
        max_pages=int(os.getenv("RAREBOOK_MAX_PAGES", "3")),
        sleep_min=float(os.getenv("RAREBOOK_SLEEP_MIN", "1.0")),
        sleep_max=float(os.getenv("RAREBOOK_SLEEP_MAX", "3.0")),
        timeout=int(os.getenv("RAREBOOK_TIMEOUT", "20")),
        db_path=os.getenv("RAREBOOK_DB_PATH", "data/rarebooks.sqlite3"),
        csv_path=os.getenv("RAREBOOK_CSV_PATH", "data/rarebooks.csv"),
    )

6.3 utils.py

# rarebook_spider/utils.py

from __future__ import annotations

import hashlib
import json
import logging
import random
import re
import time
from datetime import datetime, timezone
from pathlib import Path
from typing import Any


def ensure_parent_dir(path: str) -> None:
    Path(path).parent.mkdir(parents=True, exist_ok=True)


def now_iso() -> str:
    return datetime.now(timezone.utc).isoformat(timespec="seconds")


def sleep_random(min_seconds: float, max_seconds: float) -> None:
    delay = random.uniform(min_seconds, max_seconds)
    time.sleep(delay)


def normalize_space(text: Any) -> str:
    if text is None:
        return ""
    if isinstance(text, list):
        text = " ".join(str(x) for x in text if x is not None)
    text = str(text)
    text = re.sub(r"\s+", " ", text)
    return text.strip()


def stable_hash(value: Any) -> str:
    dumped = json.dumps(value, ensure_ascii=False, sort_keys=True)
    return hashlib.sha256(dumped.encode("utf-8")).hexdigest()


def build_logger(log_path: str = "logs/spider.log") -> logging.Logger:
    ensure_parent_dir(log_path)

    logger = logging.getLogger("rarebook_spider")
    logger.setLevel(logging.INFO)
    logger.handlers.clear()

    formatter = logging.Formatter(
        fmt="%(asctime)s | %(levelname)s | %(name)s | %(message)s"
    )

    console_handler = logging.StreamHandler()
    console_handler.setFormatter(formatter)

    file_handler = logging.FileHandler(log_path, encoding="utf-8")
    file_handler.setFormatter(formatter)

    logger.addHandler(console_handler)
    logger.addHandler(file_handler)
    return logger

6.4 models.py

# rarebook_spider/models.py

from __future__ import annotations

from pydantic import BaseModel, Field, HttpUrl, field_validator


class RareBookRecord(BaseModel):
    title: str = Field(default="")
    version: str = Field(default="")
    publication_date: str = Field(default="")
    holding_institution: str = Field(default="")
    link: str = Field(default="")
    source_id: str = Field(default="")
    raw_date: str = Field(default="")
    raw_summary: str = Field(default="")
    content_hash: str = Field(default="")
    scraped_at: str = Field(default="")

    @field_validator("title", "version", "publication_date", "holding_institution", "link")
    @classmethod
    def strip_text(cls, value: str) -> str:
        return value.strip() if value else ""


class CrawlStats(BaseModel):
    pages_requested: int = 0
    records_seen: int = 0
    records_parsed: int = 0
    records_inserted: int = 0
    records_skipped: int = 0
    errors: int = 0

6.5 fetcher.py

# rarebook_spider/fetcher.py

from __future__ import annotations

import logging
import random
import time
from typing import Any
from urllib import robotparser

import requests

from .config import Settings


class FetchError(RuntimeError):
    pass


class RareBookFetcher:
    def __init__(self, settings: Settings, logger: logging.Logger) -> None:
        self.settings = settings
        self.logger = logger
        self.session = requests.Session()
        self.session.headers.update(
            {
                "User-Agent": settings.user_agent,
                "Accept": "application/json,text/html;q=0.9,*/*;q=0.8",
                "Accept-Language": "zh-CN,zh;q=0.9,en;q=0.8",
                "Referer": settings.referer,
                "Connection": "keep-alive",
            }
        )

    def robots_allowed(self, target_url: str) -> bool:
        rp = robotparser.RobotFileParser()
        rp.set_url(self.settings.robots_url)

        try:
            rp.read()
        except Exception as exc:
            self.logger.warning("robots.txt 读取失败,采用保守策略继续前请人工确认:%s", exc)
            # 这里返回 True 是为了不让示例代码直接不可用。
            # 正式项目中可以改成 False,由人工确认后再启动。
            return True

        allowed = rp.can_fetch(self.settings.user_agent, target_url)
        self.logger.info("robots check target=%s allowed=%s", target_url, allowed)
        return allowed

    def get_json(
        self,
        url: str,
        params: dict[str, Any] | None = None,
        max_retries: int = 3,
    ) -> dict[str, Any]:
        last_error: Exception | None = None

        for attempt in range(1, max_retries + 1):
            try:
                response = self.session.get(
                    url,
                    params=params,
                    timeout=self.settings.timeout,
                )

                if response.status_code == 429:
                    retry_after = response.headers.get("Retry-After")
                    if retry_after and retry_after.isdigit():
                        sleep_seconds = int(retry_after)
                    else:
                        sleep_seconds = min(60, 2 ** attempt + random.random())

                    self.logger.warning(
                        "HTTP 429 Too Many Requests, sleep %.2fs then retry",
                        sleep_seconds,
                    )
                    time.sleep(sleep_seconds)
                    continue

                if 500 <= response.status_code < 600:
                    sleep_seconds = min(60, 2 ** attempt + random.random())
                    self.logger.warning(
                        "Server error %s, sleep %.2fs then retry",
                        response.status_code,
                        sleep_seconds,
                    )
                    time.sleep(sleep_seconds)
                    continue

                if response.status_code in {403, 404}:
                    raise FetchError(
                        f"request forbidden or not found: status={response.status_code}, url={response.url}"
                    )

                response.raise_for_status()

                try:
                    return response.json()
                except ValueError as exc:
                    snippet = response.text[:300]
                    raise FetchError(f"JSON parse failed: {exc}; body={snippet!r}") from exc

            except (requests.RequestException, FetchError) as exc:
                last_error = exc
                sleep_seconds = min(60, 2 ** attempt + random.random())
                self.logger.warning(
                    "request failed attempt=%s/%s error=%s sleep=%.2fs",
                    attempt,
                    max_retries,
                    exc,
                    sleep_seconds,
                )
                time.sleep(sleep_seconds)

        raise FetchError(f"request failed after retries: {last_error}") from last_error

    def fetch_collection_page(self, page: int) -> dict[str, Any]:
        params = {
            "fo": "json",
            "c": self.settings.page_size,
            "sp": page,
            "at": "results,pagination",
        }
        self.logger.info("fetch collection page=%s page_size=%s", page, self.settings.page_size)
        return self.get_json(self.settings.base_url, params=params)

6.6 请求层说明

这里有几个细节值得说一下。

第一,headers 不只是为了避免 403。更重要的是让请求看起来像一个正常、可识别的程序。User-Agent 最好带项目名和联系方式。Referer 可以填目标集合页。Accept 明确告诉服务器我们接受 JSON。

第二,timeout 必须有。没有 timeout 的爬虫,一旦网络挂住,就会一直卡在那里。很多爬虫不是死在逻辑错误上,而是死在没有 timeout。

第三,Session 可以复用连接,也方便统一 headers。不要每次请求都临时写一遍 requests.get,后面维护起来很乱。

第四,失败处理要区分情况。429 说明请求过快,要降频。5xx 说明服务器端可能临时异常,可以重试。403 可能是权限、频率、UA 或 robots 问题,不应该盲目换代理硬冲。爬虫不是对抗游戏,尤其本文这种公开目录元数据采集,没有必要把动作做得很激进。


7️⃣ 核心实现:解析层(Parser)

解析层是本文真正的重点。因为目标不是普通榜单,而是版本型元数据。

7.1 解析方式

本文主解析方式是 JSON。基本逻辑如下:

  1. 目录 API 返回 results
  2. 遍历每条 item;
  3. 从 item 中拿标题、链接、日期、贡献者、说明;
  4. 对说明字段做版本和年代抽取;
  5. 缺失字段用空字符串或固定规则兜底;
  6. 返回 RareBookRecord

为了兼容不同接口字段,解析函数不能只认一个字段名。比如题名可能叫 title,链接可能叫 urlid,说明可能在 descriptionsummarynoteitem 子字段里。

7.2 版本抽取为什么不能太激进

古籍版本表达很复杂。比如:

  • 宋刻本;
  • 元刊本;
  • 明嘉靖刻本;
  • 明万历重刊本;
  • 清康熙内府刻本;
  • 清抄本;
  • 稿本;
  • 木活字本;
  • 朝鲜活字本;
  • reprinted in 1775;
  • carved and printed in the Yuan Dynasty;
  • Ming dynasty edition。

如果你想通过一个正则全部判断准确,很容易误伤。本文采用“保守抽取”的策略:

  • 能抽到明确版本就填;
  • 抽不到就填空;
  • 不强行根据年份推版本;
  • 保留原始说明文本,便于人工复核。

这点很重要。爬虫的任务是整理线索,不是装成目录学专家。机器抽取可以辅助,但不应该把不确定内容伪装成确定结论。

7.3 cleaner.py

# rarebook_spider/cleaner.py

from __future__ import annotations

import re
from typing import Any

from .utils import normalize_space


DYNASTY_YEAR_HINTS = {
    "宋": "960-1279",
    "元": "1271-1368",
    "明": "1368-1644",
    "清": "1644-1912",
    "民国": "1912-1949",
    "民國": "1912-1949",
}


VERSION_KEYWORDS = [
    "刻本",
    "刊本",
    "鈔本",
    "抄本",
    "稿本",
    "寫本",
    "写本",
    "活字本",
    "木活字本",
    "石印本",
    "影印本",
    "重刊本",
    "重刻本",
    "套印本",
    "朱印本",
    "藍印本",
    "蓝印本",
    "manuscript",
    "printed",
    "reprinted",
    "edition",
    "block-print",
    "woodblock",
]


CHINESE_VERSION_PATTERNS = [
    re.compile(
        r"(?P<version>"
        r"(宋|元|明|清|民國|民国|乾隆|嘉靖|萬曆|万历|康熙|雍正|道光|同治|光緒|光绪|宣統|宣统)"
        r"[^,。;;,.、]{0,30}?"
        r"(刻本|刊本|鈔本|抄本|稿本|寫本|写本|活字本|木活字本|石印本|影印本|重刊本|重刻本|套印本)"
        r")"
    ),
    re.compile(
        r"(?P<version>"
        r"[^,。;;,.、]{0,20}?"
        r"(稿本|鈔本|抄本|寫本|写本|刻本|刊本|活字本|木活字本)"
        r")"
    ),
]


ENGLISH_VERSION_PATTERNS = [
    re.compile(
        r"(?P<version>"
        r"(Song|Yuan|Ming|Qing)\s+dynasty[^.;,]{0,40}?"
        r"(edition|printed|block-print|woodblock|manuscript)"
        r")",
        re.IGNORECASE,
    ),
    re.compile(
        r"(?P<version>"
        r"(printed|reprinted|carved and printed)[^.;,]{0,50}?"
        r"(\d{3,4}|\d{4}\s*-\s*\d{4})"
        r")",
        re.IGNORECASE,
    ),
    re.compile(
        r"(?P<version>"
        r"(manuscript|woodblock print|block-print edition|movable type edition)"
        r")",
        re.IGNORECASE,
    ),
]


YEAR_PATTERN = re.compile(
    r"(?P<year_range>\b\d{3,4}\s*[-/]\s*\d{3,4}\b)|(?P<year>\b\d{3,4}\b)"
)


def join_text_fields(*values: Any) -> str:
    parts: list[str] = []
    for value in values:
        text = normalize_space(value)
        if text:
            parts.append(text)
    return " ".join(parts).strip()


def extract_version_from_text(text: str) -> str:
    text = normalize_space(text)
    if not text:
        return ""

    for pattern in CHINESE_VERSION_PATTERNS:
        match = pattern.search(text)
        if match:
            return normalize_space(match.group("version"))

    for pattern in ENGLISH_VERSION_PATTERNS:
        match = pattern.search(text)
        if match:
            return normalize_space(match.group("version"))

    lower = text.lower()
    for keyword in VERSION_KEYWORDS:
        if keyword.lower() in lower:
            # 兜底策略:如果只发现关键词,不强行截取长句,返回关键词。
            return keyword

    return ""


def extract_publication_date(raw_date: str, text: str) -> str:
    raw_date = normalize_space(raw_date)
    text = normalize_space(text)

    if raw_date:
        return raw_date

    combined = f"{raw_date} {text}".strip()

    # 优先识别年份范围,如 1260-1368
    match = YEAR_PATTERN.search(combined)
    if match:
        if match.group("year_range"):
            return normalize_space(match.group("year_range"))
        if match.group("year"):
            return match.group("year")

    # 再识别朝代
    for dynasty, year_range in DYNASTY_YEAR_HINTS.items():
        if dynasty in combined:
            return f"{dynasty}(约 {year_range})"

    dynasty_en = {
        "song dynasty": "宋(约 960-1279)",
        "yuan dynasty": "元(约 1271-1368)",
        "ming dynasty": "明(约 1368-1644)",
        "qing dynasty": "清(约 1644-1912)",
    }
    lower = combined.lower()
    for key, value in dynasty_en.items():
        if key in lower:
            return value

    return ""


def normalize_holding_institution(value: str) -> str:
    text = normalize_space(value)
    if not text:
        return "Library of Congress, Asian Division"

    # 这里保守处理,不做复杂翻译,避免误改正式馆名。
    if "Chinese Rare Book Collection" in text and "Library of Congress" in text:
        return "Chinese Rare Book Collection, Library of Congress"

    if "Library of Congress" in text:
        return "Library of Congress"

    return text


def normalize_title(value: str) -> str:
    text = normalize_space(value)
    # 删除一些接口中可能带出的重复空白,但不删除括号内信息。
    return text


def build_source_id(link: str) -> str:
    link = normalize_space(link).rstrip("/")
    if not link:
        return ""

    # 例如 https://www.loc.gov/item/xxxx/ -> item:xxxx
    parts = [p for p in link.split("/") if p]
    if len(parts) >= 2 and parts[-2] in {"item", "resource"}:
        return f"{parts[-2]}:{parts[-1]}"

    return link

7.4 parser.py

# rarebook_spider/parser.py

from __future__ import annotations

from typing import Any

from .cleaner import (
    build_source_id,
    extract_publication_date,
    extract_version_from_text,
    join_text_fields,
    normalize_holding_institution,
    normalize_title,
)
from .models import RareBookRecord
from .utils import normalize_space, now_iso, stable_hash


def get_first(item: dict[str, Any], keys: list[str]) -> Any:
    for key in keys:
        value = item.get(key)
        if value:
            return value
    return ""


def flatten_list(value: Any) -> str:
    if value is None:
        return ""
    if isinstance(value, list):
        return " ".join(normalize_space(v) for v in value if normalize_space(v))
    if isinstance(value, dict):
        return " ".join(normalize_space(v) for v in value.values() if normalize_space(v))
    return normalize_space(value)


def extract_title(item: dict[str, Any]) -> str:
    title = get_first(item, ["title", "title_original", "short_title"])
    if isinstance(title, list):
        title = title[0] if title else ""
    return normalize_title(str(title))


def extract_link(item: dict[str, Any]) -> str:
    link = get_first(item, ["url", "id", "link"])
    return normalize_space(link)


def extract_raw_date(item: dict[str, Any]) -> str:
    return normalize_space(get_first(item, ["date", "dates", "created_published"]))


def extract_contributors(item: dict[str, Any]) -> str:
    contributors = get_first(item, ["contributor", "contributors", "creator"])
    return flatten_list(contributors)


def extract_summary_text(item: dict[str, Any]) -> str:
    description = flatten_list(item.get("description"))
    summary = flatten_list(item.get("summary"))
    notes = flatten_list(item.get("notes"))
    subjects = flatten_list(item.get("subject"))
    partof = flatten_list(item.get("partof"))

    nested_item = item.get("item")
    nested_text = ""
    if isinstance(nested_item, dict):
        nested_text = join_text_fields(
            flatten_list(nested_item.get("title")),
            flatten_list(nested_item.get("notes")),
            flatten_list(nested_item.get("created_published")),
            flatten_list(nested_item.get("description")),
        )

    return join_text_fields(description, summary, notes, subjects, partof, nested_text)


def parse_result_item(item: dict[str, Any]) -> RareBookRecord:
    title = extract_title(item)
    link = extract_link(item)
    raw_date = extract_raw_date(item)
    contributors = extract_contributors(item)
    raw_summary = extract_summary_text(item)

    version_source_text = join_text_fields(title, raw_date, raw_summary)
    version = extract_version_from_text(version_source_text)
    publication_date = extract_publication_date(raw_date, version_source_text)

    holding_institution = normalize_holding_institution(contributors)

    source_id = build_source_id(link)
    content_hash = stable_hash(
        {
            "title": title,
            "version": version,
            "publication_date": publication_date,
            "holding_institution": holding_institution,
            "link": link,
        }
    )

    return RareBookRecord(
        title=title,
        version=version,
        publication_date=publication_date,
        holding_institution=holding_institution,
        link=link,
        source_id=source_id,
        raw_date=raw_date,
        raw_summary=raw_summary,
        content_hash=content_hash,
        scraped_at=now_iso(),
    )


def parse_collection_response(payload: dict[str, Any]) -> list[RareBookRecord]:
    results = payload.get("results", [])
    if not isinstance(results, list):
        return []

    records: list[RareBookRecord] = []
    for item in results:
        if not isinstance(item, dict):
            continue
        record = parse_result_item(item)

        # 题名和链接至少要有一个,否则这条记录没有实际意义。
        if record.title or record.link:
            records.append(record)

    return records

7.5 列表页如何拿详情链接

在 JSON API 场景中,列表页不再是 HTML 卡片,而是 results 数组。每个 item 通常会有 urlid 字段。解析时按优先级取:

link = item.get("url") or item.get("id") or item.get("link") or ""

如果是 HTML 页面,逻辑会变成:

from bs4 import BeautifulSoup
from urllib.parse import urljoin

def parse_links_from_html(html: str, base_url: str) -> list[str]:
    soup = BeautifulSoup(html, "lxml")
    links: list[str] = []

    for a in soup.select("a[href]"):
        text = a.get_text(strip=True)
        href = a.get("href", "")
        if not href:
            continue

        # 这里只是示例,真实项目要根据页面结构缩小选择器范围。
        if text and ("卷" in text or "jing" in text.lower() or "shu" in text.lower()):
            links.append(urljoin(base_url, href))

    return list(dict.fromkeys(links))

HTML 解析一定要缩小选择器范围,不要全页乱扫。否则导航栏、页脚、相关推荐都会混进来。

7.6 详情页如何抽字段

如果列表接口已经给了足够字段,可以不请求详情页。这个原则很实用:能少请求就少请求。

如果必须请求详情页,可以新增一个方法:

def parse_detail_json(payload: dict[str, Any]) -> dict[str, str]:
    item = payload.get("item", {})
    if not isinstance(item, dict):
        return {}

    title = item.get("title", "")
    created = item.get("created_published", "")
    notes = item.get("notes", "")
    contributors = item.get("contributors", "")

    full_text = join_text_fields(title, created, notes, contributors)
    version = extract_version_from_text(full_text)
    publication_date = extract_publication_date(created, full_text)

    return {
        "title": normalize_title(title),
        "version": version,
        "publication_date": publication_date,
        "holding_institution": normalize_holding_institution(flatten_list(contributors)),
    }

我个人建议第一版先只抓列表接口。等确认字段缺失比较严重,再补详情页。古籍目录页常见的坑是:你以为详情页字段更全,结果详情页的结构差异更大,解析成本翻倍。

7.7 缺失字段怎么办

缺失字段不要乱补。

本文策略是:

  • 书名缺失:如果链接存在,仍保留记录,但后续标记为待复核;
  • 版本缺失:填空,不猜;
  • 刊刻年代缺失:尝试从说明中抽年份或朝代,抽不到则填空;
  • 藏馆缺失:如果是单一馆藏集合,可使用配置中的默认馆名;
  • 链接缺失:如果题名存在,可以保留,但不入去重主键;
  • 原始摘要保留:方便人工回看。

写爬虫最忌讳的是“为了表格好看而编字段”。宁可空着,也不要错得很自信。


8️⃣ 数据存储与导出(Storage)

本文用 SQLite 起步,同时导出 CSV。

为什么不是 MySQL?不是 MySQL 不好,而是这个案例没有必要。SQLite 单文件、零服务、方便复制,足够支撑几万条目录元数据。

8.1 字段映射表

数据字段 数据库字段 类型 示例值
书名 title TEXT Shan hai jing : shi ba juan 山海經 : 十八卷
版本 version TEXT Ming dynasty edition / 明萬曆刻本
刊刻年代 publication_date TEXT 1775 / 明(约 1368-1644)
藏馆 holding_institution TEXT Chinese Rare Book Collection, Library of Congress
链接 link TEXT https://www.loc.gov/item/.../
来源 ID source_id TEXT item:xxxx
原始年代 raw_date TEXT 1853
原始说明 raw_summary TEXT Title page is lacking...
内容哈希 content_hash TEXT SHA256
采集时间 scraped_at TEXT 2026-06-09T12:00:00+00:00

8.2 去重策略

目录数据去重不能只看书名。因为同一书名可能有多个版本,甚至同一题名在不同馆藏中有不同抄本、刻本。

本文采用两级去重:

  1. 优先按 link 唯一;
  2. 如果没有链接,则按 content_hash 辅助判断。

数据库层把 link 设置为唯一字段。这样重复运行时,不会重复插入同一条目录记录。

8.3 storage.py

# rarebook_spider/storage.py

from __future__ import annotations

import csv
import sqlite3
from pathlib import Path
from typing import Iterable

from .models import RareBookRecord
from .utils import ensure_parent_dir


CREATE_TABLE_SQL = """
CREATE TABLE IF NOT EXISTS rarebook_records (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    title TEXT NOT NULL DEFAULT '',
    version TEXT NOT NULL DEFAULT '',
    publication_date TEXT NOT NULL DEFAULT '',
    holding_institution TEXT NOT NULL DEFAULT '',
    link TEXT NOT NULL DEFAULT '',
    source_id TEXT NOT NULL DEFAULT '',
    raw_date TEXT NOT NULL DEFAULT '',
    raw_summary TEXT NOT NULL DEFAULT '',
    content_hash TEXT NOT NULL DEFAULT '',
    scraped_at TEXT NOT NULL DEFAULT '',
    UNIQUE(link)
);
"""

CREATE_INDEX_SQL = """
CREATE INDEX IF NOT EXISTS idx_rarebook_source_id
ON rarebook_records(source_id);
"""


class RareBookStorage:
    def __init__(self, db_path: str) -> None:
        ensure_parent_dir(db_path)
        self.db_path = db_path
        self.conn = sqlite3.connect(db_path)
        self.conn.execute("PRAGMA journal_mode=WAL;")
        self.conn.execute("PRAGMA synchronous=NORMAL;")
        self.init_db()

    def init_db(self) -> None:
        self.conn.execute(CREATE_TABLE_SQL)
        self.conn.execute(CREATE_INDEX_SQL)
        self.conn.commit()

    def insert_one(self, record: RareBookRecord) -> bool:
        sql = """
        INSERT OR IGNORE INTO rarebook_records (
            title,
            version,
            publication_date,
            holding_institution,
            link,
            source_id,
            raw_date,
            raw_summary,
            content_hash,
            scraped_at
        ) VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?);
        """
        cur = self.conn.execute(
            sql,
            (
                record.title,
                record.version,
                record.publication_date,
                record.holding_institution,
                record.link,
                record.source_id,
                record.raw_date,
                record.raw_summary,
                record.content_hash,
                record.scraped_at,
            ),
        )
        self.conn.commit()
        return cur.rowcount > 0

    def insert_many(self, records: Iterable[RareBookRecord]) -> tuple[int, int]:
        inserted = 0
        skipped = 0
        for record in records:
            ok = self.insert_one(record)
            if ok:
                inserted += 1
            else:
                skipped += 1
        return inserted, skipped

    def export_csv(self, csv_path: str) -> int:
        ensure_parent_dir(csv_path)
        fields = [
            "title",
            "version",
            "publication_date",
            "holding_institution",
            "link",
            "source_id",
            "raw_date",
            "raw_summary",
            "content_hash",
            "scraped_at",
        ]

        cur = self.conn.execute(
            """
            SELECT
                title,
                version,
                publication_date,
                holding_institution,
                link,
                source_id,
                raw_date,
                raw_summary,
                content_hash,
                scraped_at
            FROM rarebook_records
            ORDER BY id ASC;
            """
        )

        rows = cur.fetchall()
        with open(csv_path, "w", encoding="utf-8-sig", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(fields)
            writer.writerows(rows)

        return len(rows)

    def sample_rows(self, limit: int = 5) -> list[dict[str, str]]:
        cur = self.conn.execute(
            """
            SELECT
                title,
                version,
                publication_date,
                holding_institution,
                link
            FROM rarebook_records
            ORDER BY id ASC
            LIMIT ?;
            """,
            (limit,),
        )
        rows = cur.fetchall()
        result: list[dict[str, str]] = []
        for row in rows:
            result.append(
                {
                    "title": row[0],
                    "version": row[1],
                    "publication_date": row[2],
                    "holding_institution": row[3],
                    "link": row[4],
                }
            )
        return result

    def count(self) -> int:
        cur = self.conn.execute("SELECT COUNT(*) FROM rarebook_records;")
        return int(cur.fetchone()[0])

    def close(self) -> None:
        self.conn.close()

8.4 为什么 CSV 用 utf-8-sig

很多人导出中文 CSV 后,用 Excel 打开会乱码。utf-8-sig 会在文件开头写入 BOM,Excel 更容易识别 UTF-8 编码。严格说 BOM 不是必须,但对普通用户友好。

如果你只在 Python、R、数据库里用,utf-8 就够了。如果要发给别人用 Excel 看,utf-8-sig 更省解释成本。


9️⃣ 运行方式与结果展示(必写)

9.1 main.py

# main.py

from __future__ import annotations

from rarebook_spider.config import load_settings
from rarebook_spider.fetcher import RareBookFetcher
from rarebook_spider.models import CrawlStats
from rarebook_spider.parser import parse_collection_response
from rarebook_spider.storage import RareBookStorage
from rarebook_spider.utils import build_logger, sleep_random


def main() -> None:
    settings = load_settings()
    logger = build_logger()

    stats = CrawlStats()
    fetcher = RareBookFetcher(settings=settings, logger=logger)
    storage = RareBookStorage(settings.db_path)

    try:
        allowed = fetcher.robots_allowed(settings.base_url)
        if not allowed:
            logger.error("robots.txt 不允许访问目标路径,程序停止。target=%s", settings.base_url)
            return

        for page in range(1, settings.max_pages + 1):
            stats.pages_requested += 1

            try:
                payload = fetcher.fetch_collection_page(page)
                records = parse_collection_response(payload)
            except Exception as exc:
                stats.errors += 1
                logger.exception("page=%s failed: %s", page, exc)
                continue

            stats.records_seen += len(payload.get("results", []))
            stats.records_parsed += len(records)

            inserted, skipped = storage.insert_many(records)
            stats.records_inserted += inserted
            stats.records_skipped += skipped

            logger.info(
                "page=%s parsed=%s inserted=%s skipped=%s",
                page,
                len(records),
                inserted,
                skipped,
            )

            sleep_random(settings.sleep_min, settings.sleep_max)

        total = storage.export_csv(settings.csv_path)
        logger.info("exported csv=%s total_rows=%s", settings.csv_path, total)

        print("\n=== Crawl Stats ===")
        print(stats.model_dump_json(indent=2))

        print("\n=== Database Count ===")
        print(storage.count())

        print("\n=== Sample Rows ===")
        for row in storage.sample_rows(limit=5):
            print("-" * 80)
            print(f"书名: {row['title']}")
            print(f"版本: {row['version']}")
            print(f"刊刻年代: {row['publication_date']}")
            print(f"藏馆: {row['holding_institution']}")
            print(f"链接: {row['link']}")

    finally:
        storage.close()


if __name__ == "__main__":
    main()

9.2 启动命令

在项目根目录执行:

python main.py

如果想先少量测试,可以把 .env 里改成:

RAREBOOK_PAGE_SIZE=10
RAREBOOK_MAX_PAGES=1

正式跑之前,建议先从 1 页开始。看日志、看 CSV、看字段是否合理,再扩大页数。

9.3 输出位置

默认输出两个文件:

data/rarebooks.sqlite3
data/rarebooks.csv

日志输出:

logs/spider.log

9.4 示例结果展示

实际输出会随接口返回内容变化,下面展示的是格式示例:

书名 版本 刊刻年代 藏馆 链接
Miao fa lian hua jing : qi juan manuscript 宋(约 960-1279) Chinese Rare Book Collection, Library of Congress 详情页链接
Yi li tu Ming dynasty edition 明(约 1368-1644) Chinese Rare Book Collection, Library of Congress 详情页链接
Xuanhe ji gu yin shi printed in 1775 1775 Chinese Rare Book Collection, Library of Congress 详情页链接
Fen men ji zhu Du Gongbu shi block-print edition 元(约 1271-1368) Chinese Rare Book Collection, Library of Congress 详情页链接
Shan hai jing : shi ba juan reprinted in 1775 1775 Chinese Rare Book Collection, Library of Congress 详情页链接

这里强调一下:示例结果里的“版本”来自规则抽取,并不等于最终目录学鉴定结论。正式使用时,建议保留 raw_summary,由人工抽查一批样本,确认规则没有明显误判。

9.5 增加一个命令行参数版本

如果你想让项目更像一个工具,而不是固定脚本,可以改造 main.py,增加命令行参数。

# main.py

from __future__ import annotations

import argparse

from rarebook_spider.config import load_settings
from rarebook_spider.fetcher import RareBookFetcher
from rarebook_spider.models import CrawlStats
from rarebook_spider.parser import parse_collection_response
from rarebook_spider.storage import RareBookStorage
from rarebook_spider.utils import build_logger, sleep_random


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Crawl rare book version metadata from a public collection API."
    )
    parser.add_argument("--max-pages", type=int, default=None, help="Max pages to crawl.")
    parser.add_argument("--page-size", type=int, default=None, help="Records per page.")
    parser.add_argument("--csv", type=str, default=None, help="CSV output path.")
    parser.add_argument("--db", type=str, default=None, help="SQLite database path.")
    return parser.parse_args()


def main() -> None:
    args = parse_args()
    settings = load_settings()

    if args.max_pages is not None:
        object.__setattr__(settings, "max_pages", args.max_pages)
    if args.page_size is not None:
        object.__setattr__(settings, "page_size", args.page_size)
    if args.csv is not None:
        object.__setattr__(settings, "csv_path", args.csv)
    if args.db is not None:
        object.__setattr__(settings, "db_path", args.db)

    logger = build_logger()
    stats = CrawlStats()
    fetcher = RareBookFetcher(settings=settings, logger=logger)
    storage = RareBookStorage(settings.db_path)

    try:
        if not fetcher.robots_allowed(settings.base_url):
            logger.error("robots.txt disallows this target path.")
            return

        for page in range(1, settings.max_pages + 1):
            stats.pages_requested += 1

            payload = fetcher.fetch_collection_page(page)
            records = parse_collection_response(payload)

            stats.records_seen += len(payload.get("results", []))
            stats.records_parsed += len(records)

            inserted, skipped = storage.insert_many(records)
            stats.records_inserted += inserted
            stats.records_skipped += skipped

            logger.info(
                "page=%s seen=%s parsed=%s inserted=%s skipped=%s",
                page,
                len(payload.get("results", [])),
                len(records),
                inserted,
                skipped,
            )

            sleep_random(settings.sleep_min, settings.sleep_max)

        total = storage.export_csv(settings.csv_path)
        print(f"Done. Exported {total} rows to {settings.csv_path}")

    finally:
        storage.close()


if __name__ == "__main__":
    main()

启动:

python main.py --max-pages 2 --page-size 20 --csv data/demo_rarebooks.csv

🔟 常见问题与排错(强烈建议写)

10.1 403 怎么办

403 表示服务器拒绝访问。常见原因包括:

  1. 目标路径不允许自动访问;
  2. User-Agent 太异常;
  3. 请求频率过高;
  4. 需要登录或权限;
  5. 目标页面限制了某些访问方式。

处理建议:

  • 先检查 robots.txt;
  • 降低频率;
  • 使用明确、真实的 User-Agent;
  • 优先找公开 API;
  • 不要用代理池硬冲;
  • 如果内容需要登录或付费,不要绕过。

我的经验是,目录元数据采集遇到 403 时,第一反应不应该是“怎么伪装”,而应该是“这个路径是不是不该抓”。很多时候换用 API 或导出功能就能解决。

10.2 429 怎么办

429 表示请求太多。本文代码已经处理了:

if response.status_code == 429:
    retry_after = response.headers.get("Retry-After")
    if retry_after and retry_after.isdigit():
        sleep_seconds = int(retry_after)
    else:
        sleep_seconds = min(60, 2 ** attempt + random.random())
    time.sleep(sleep_seconds)
    continue

如果你仍然频繁遇到 429,可以做三件事:

  1. RAREBOOK_SLEEP_MINRAREBOOK_SLEEP_MAX 调大;
  2. RAREBOOK_PAGE_SIZE 调小;
  3. 暂停一段时间再访问。

不要一边 429 一边加代理。那不是解决问题,是扩大问题。

10.3 HTML 抓到空壳怎么办

如果你抓到的 HTML 只有一个根节点、几个 JS 文件,看不到目录内容,说明页面可能是前端动态渲染。

处理路线:

  1. 打开浏览器开发者工具;
  2. 查看 Network 面板;
  3. 找 XHR 或 Fetch 请求;
  4. 优先使用接口;
  5. 如果接口不可直接复用,再考虑 Playwright。

Playwright 示例:

from playwright.sync_api import sync_playwright

def render_html(url: str, timeout_ms: int = 30000) -> str:
    with sync_playwright() as p:
        browser = p.chromium.launch(headless=True)
        page = browser.new_page(
            user_agent="RareBookVersionSpider/1.0 (+contact: example@example.com)"
        )
        page.goto(url, wait_until="networkidle", timeout=timeout_ms)
        html = page.content()
        browser.close()
        return html

但注意,Playwright 不是万能钥匙。它会真实启动浏览器,资源消耗更大,对目标站压力也更大。如果 API 能拿到数据,不建议用它。

10.4 解析报错怎么办

解析报错一般有几类:

第一,字段不存在。

解决方法:

value = item.get("title", "")

不要直接:

value = item["title"]

第二,字段类型不稳定。有时是字符串,有时是列表,有时是字典。

解决方法是写统一的 flatten_listnormalize_space

第三,选择器变化。如果是 HTML 解析,不要写太脆弱的选择器,比如:

soup.select("div:nth-child(3) > div:nth-child(2) > a")

这种选择器一改版就废。更稳的是找语义明确的 class、属性或结构范围。

第四,接口返回结构调整。解决方法是保留日志,把原始 payload 片段打印出来,或者在测试里固定一份样本。

10.5 编码和乱码如何处理

常见情况:

  • 页面编码不是 UTF-8;
  • CSV 被 Excel 错误识别;
  • 中文繁简混杂;
  • Unicode 组合字符导致看起来一样、实际不同。

处理建议:

  1. requests 通常能自动识别,但必要时设置 response.encoding
  2. CSV 用 utf-8-sig
  3. 不要随意繁简转换,除非业务明确要求;
  4. 清洗时只处理空白字符,不要过度改标题;
  5. 数据库统一用 UTF-8。

示例:

with open("data/rarebooks.csv", "w", encoding="utf-8-sig", newline="") as f:
    writer = csv.writer(f)
    writer.writerow(["title", "version", "publication_date", "holding_institution", "link"])

10.6 抽取不到版本怎么办

这是古籍目录最常见的问题。版本信息可能不在列表接口里,也可能需要打开详情页才有。

处理顺序:

  1. 检查原始字段中有没有 descriptionsummarynotes
  2. 增加详情页请求;
  3. 增加版本关键词词典;
  4. 做人工抽样复核;
  5. 对无法确认的记录保留空值。

不要为了让表格完整而乱填“未知版本”。空值比错误值更诚实。

10.7 同一书名重复怎么办

古籍里同名不同本非常正常。比如一个题名可能有明刻本、清抄本、民国影印本。去重时不能只看书名。

推荐去重顺序:

  1. link
  2. source_id
  3. title + version + publication_date + holding_institution
  4. content_hash

如果你要研究“同一书名有哪些版本”,重复反而是有价值的信息,千万别一刀切删掉。

10.8 年代字段混乱怎么办

年代字段可能出现:

  • 1853
  • 1853?
  • ca. 1260-1368
  • Ming dynasty
  • 清康熙间
  • 明萬曆
  • reprinted in 1775

建议保留两个字段:

  • raw_date:原始年代;
  • publication_date:清洗后的年代。

如果你要做统计,可以再增加:

  • start_year
  • end_year
  • dynasty
  • date_confidence

示例扩展:

def parse_year_range(value: str) -> tuple[int | None, int | None]:
    value = value.strip()
    match = re.search(r"(\d{3,4})\s*[-/]\s*(\d{3,4})", value)
    if match:
        return int(match.group(1)), int(match.group(2))

    match = re.search(r"(\d{3,4})", value)
    if match:
        year = int(match.group(1))
        return year, year

    if "明" in value:
        return 1368, 1644
    if "清" in value:
        return 1644, 1912

    return None, None

1️⃣1️⃣ 进阶优化(可选但加分)

11.1 并发:先别急着上

很多人写爬虫喜欢第一步就上并发,好像没有并发就不专业。我不太同意。尤其是古籍目录这类资源,字段清洗和准确性比速度重要。

如果确实要并发,建议只对详情页做低并发,比如 2 到 5 个 worker。目录页本身还是顺序分页更稳。

线程池示例:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_detail(fetcher, url: str) -> dict:
    return fetcher.get_json(url, params={"fo": "json", "at": "item"})

def fetch_details_slowly(fetcher, urls: list[str], max_workers: int = 3) -> list[dict]:
    results: list[dict] = []

    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_map = {executor.submit(fetch_detail, fetcher, url): url for url in urls}

        for future in as_completed(future_map):
            url = future_map[future]
            try:
                payload = future.result()
                results.append(payload)
            except Exception as exc:
                print(f"detail failed url={url} error={exc}")

    return results

即便这样,也要加频率控制。线程池不是让你无限请求的许可证。

11.2 asyncio 版本思路

如果目标站点明确允许,并且接口响应较慢,可以用 aiohttp 做异步请求。思路如下:

import asyncio
import aiohttp
import random


async def fetch_json(session: aiohttp.ClientSession, url: str, params: dict) -> dict:
    async with session.get(url, params=params, timeout=20) as resp:
        if resp.status == 429:
            await asyncio.sleep(10)
            return {}
        resp.raise_for_status()
        return await resp.json()


async def crawl_pages(base_url: str, pages: int, page_size: int) -> list[dict]:
    headers = {
        "User-Agent": "RareBookVersionSpider/1.0 (+contact: example@example.com)"
    }

    results: list[dict] = []
    connector = aiohttp.TCPConnector(limit=3)

    async with aiohttp.ClientSession(headers=headers, connector=connector) as session:
        for page in range(1, pages + 1):
            payload = await fetch_json(
                session,
                base_url,
                params={
                    "fo": "json",
                    "c": page_size,
                    "sp": page,
                    "at": "results,pagination",
                },
            )
            results.append(payload)
            await asyncio.sleep(random.uniform(1.0, 3.0))

    return results

注意,异步不等于无节制。limit=3 已经是比较保守的值。

11.3 Scrapy 化

当你要采多个馆藏,或者要把任务调度、失败重试、去重、Pipeline、日志都规范起来,Scrapy 会更合适。

Scrapy 项目结构可以是:

rarebook_scrapy/
├── scrapy.cfg
└── rarebook_scrapy/
    ├── items.py
    ├── pipelines.py
    ├── settings.py
    └── spiders/
        └── loc_rarebook.py

items.py

import scrapy


class RareBookItem(scrapy.Item):
    title = scrapy.Field()
    version = scrapy.Field()
    publication_date = scrapy.Field()
    holding_institution = scrapy.Field()
    link = scrapy.Field()
    source_id = scrapy.Field()
    raw_date = scrapy.Field()
    raw_summary = scrapy.Field()
    content_hash = scrapy.Field()
    scraped_at = scrapy.Field()

spiders/loc_rarebook.py

import scrapy

from rarebook_spider.parser import parse_collection_response


class LocRareBookSpider(scrapy.Spider):
    name = "loc_rarebook"
    allowed_domains = ["loc.gov"]

    custom_settings = {
        "DOWNLOAD_DELAY": 2.0,
        "CONCURRENT_REQUESTS_PER_DOMAIN": 2,
        "ROBOTSTXT_OBEY": True,
        "USER_AGENT": "RareBookVersionSpider/1.0 (+contact: example@example.com)",
    }

    def start_requests(self):
        base_url = "https://www.loc.gov/collections/chinese-rare-books/"
        for page in range(1, 4):
            yield scrapy.Request(
                url=f"{base_url}?fo=json&c=50&sp={page}&at=results,pagination",
                callback=self.parse,
            )

    def parse(self, response):
        payload = response.json()
        records = parse_collection_response(payload)
        for record in records:
            yield record.model_dump()

Scrapy 的好处是工程化能力强,坏处是对新手不如 requests 直观。我的建议是:第一版先用简单项目把字段规则跑通,再迁移框架。

11.4 断点续跑

断点续跑有两种常见方案。

第一种是页码游标。记录已经抓到第几页,下次从下一页继续。

CREATE TABLE IF NOT EXISTS crawl_state (
    name TEXT PRIMARY KEY,
    value TEXT NOT NULL
);

Python 代码:

def get_state(conn, name: str, default: str = "") -> str:
    cur = conn.execute("SELECT value FROM crawl_state WHERE name = ?", (name,))
    row = cur.fetchone()
    return row[0] if row else default


def set_state(conn, name: str, value: str) -> None:
    conn.execute(
        """
        INSERT INTO crawl_state(name, value)
        VALUES(?, ?)
        ON CONFLICT(name) DO UPDATE SET value=excluded.value;
        """,
        (name, value),
    )
    conn.commit()

第二种是已抓集合。把每个详情页 URL 存起来,遇到已存在的 URL 就跳过。

本文使用 SQLite 的 UNIQUE(link),已经具备最基本的断点能力。重复运行时,已存在记录不会重复插入。

11.5 日志与监控

至少记录这些指标:

  • 请求页数;
  • 看到记录数;
  • 解析成功数;
  • 插入数;
  • 跳过数;
  • 错误数;
  • 429 次数;
  • 403 次数;
  • 空版本字段比例;
  • 空年代字段比例。

字段质量监控很重要。比如版本字段 90% 都为空,说明你的抽取规则可能不够,或者列表接口本来就缺少版本信息,需要补详情页。

可以写一个简单质量报告:

def quality_report(conn) -> dict[str, float | int]:
    total = conn.execute("SELECT COUNT(*) FROM rarebook_records").fetchone()[0]
    if total == 0:
        return {"total": 0}

    empty_version = conn.execute(
        "SELECT COUNT(*) FROM rarebook_records WHERE version = ''"
    ).fetchone()[0]
    empty_date = conn.execute(
        "SELECT COUNT(*) FROM rarebook_records WHERE publication_date = ''"
    ).fetchone()[0]

    return {
        "total": total,
        "empty_version": empty_version,
        "empty_version_ratio": round(empty_version / total, 4),
        "empty_publication_date": empty_date,
        "empty_publication_date_ratio": round(empty_date / total, 4),
    }

输出:

report = quality_report(storage.conn)
print(report)

11.6 定时任务

如果你只是每月更新一次,可以用 cron。

0 3 1 * * cd /path/to/rarebook_version_spider && /path/to/.venv/bin/python main.py >> logs/cron.log 2>&1

意思是每月 1 日凌晨 3 点跑一次。

如果任务更复杂,比如多站点、多数据源、多步骤清洗,可以考虑 Airflow 或 Prefect。但别一开始就上重型调度系统。很多项目最后死掉,不是因为工具不够高级,而是因为架构比需求重太多。

11.7 多站点扩展

后续如果要采不同机构的古籍目录,我建议统一成一个抽象模型:

from abc import ABC, abstractmethod
from typing import Iterable

from .models import RareBookRecord


class BaseRareBookSource(ABC):
    name: str

    @abstractmethod
    def crawl(self) -> Iterable[RareBookRecord]:
        raise NotImplementedError

然后每个站点一个 source:

class LocRareBookSource(BaseRareBookSource):
    name = "loc_chinese_rare_books"

    def crawl(self):
        # 调用本文 fetcher + parser
        yield from []

以后再接其他站点时,不要把所有规则写在一个 parser 里。古籍目录站点的字段差异很大,强行揉在一起会越来越乱。

11.8 增加人工复核字段

我很建议给数据库加几个复核字段:

ALTER TABLE rarebook_records ADD COLUMN review_status TEXT DEFAULT 'pending';
ALTER TABLE rarebook_records ADD COLUMN reviewer_note TEXT DEFAULT '';
ALTER TABLE rarebook_records ADD COLUMN version_confidence REAL DEFAULT 0.0;

比如版本从明确字段抽到,置信度 0.9;从摘要正则抽到,置信度 0.6;只命中关键词,置信度 0.3。

改造版本抽取函数:

def extract_version_with_confidence(text: str) -> tuple[str, float]:
    text = normalize_space(text)
    if not text:
        return "", 0.0

    for pattern in CHINESE_VERSION_PATTERNS:
        match = pattern.search(text)
        if match:
            return normalize_space(match.group("version")), 0.85

    for pattern in ENGLISH_VERSION_PATTERNS:
        match = pattern.search(text)
        if match:
            return normalize_space(match.group("version")), 0.75

    lower = text.lower()
    for keyword in VERSION_KEYWORDS:
        if keyword.lower() in lower:
            return keyword, 0.35

    return "", 0.0

这比只给一个字符串更实用。因为后续做数据分析时,你可以过滤掉低置信度版本,或者优先人工复核低置信度记录。


1️⃣2️⃣ 总结与延伸阅读

这篇文章完成了一个从 0 到 1 的古籍版本流传信息采集项目。它不是简单地把页面标题扒下来,而是围绕“书名、版本、刊刻年代、藏馆、链接”这五个字段,搭建了一套比较完整的流程:

  • 请求层负责 headers、timeout、session、robots 检查、重试退避;
  • 解析层负责从 JSON 结果中抽取题名、链接、日期、说明字段;
  • 清洗层负责识别版本、刊刻年代、馆藏机构;
  • 存储层负责 SQLite 去重和 CSV 导出;
  • 运行层负责日志、统计和示例结果展示。

古籍目录采集和普通网页采集不太一样。普通列表页的重点是结构定位,古籍元数据的重点是字段理解。版本信息可能分散在标题、摘要、注释、原始年代字段里;刊刻年代可能是年份、朝代、范围,也可能是英文描述;同一书名可能对应多个不同版本,不能粗暴去重。这些问题都要求我们写代码时更保守一点。

下一步可以继续做几件事:

  1. 增加详情页解析,提高版本字段覆盖率;
  2. 给版本抽取增加置信度和人工复核流程;
  3. 接入 Scrapy,支持多站点采集;
  4. 使用 Playwright 处理没有 API 的动态页面;
  5. 增加全文搜索,比如把书名、版本、摘要写入 SQLite FTS5;
  6. 做一个小型前端检索页,按书名、朝代、版本、藏馆筛选;
  7. 做定时任务,定期更新新增目录记录。

最后说一点个人感受:爬虫写到一定阶段,真正的难点不再是“怎么把网页抓下来”,而是“怎么把抓下来的东西解释得不离谱”。古籍版本数据尤其如此。我们可以用程序提高整理效率,但不要让程序替我们做过度判断。能确定的就结构化,不能确定的就保留原文,给后续复核留下余地。这样的数据,才更经得起以后反复使用。

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐