㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这篇文章要做的事情很明确:按 arXiv 分类抓取新增或更新的论文元数据,使用 Python、requests、lxml 和 SQLite,最终产出一份可查询、可导出的本地论文元数据库。

读完之后,你可以获得三样东西:

第一,弄清楚为什么“分类新增元数据抓取”不应该继续套关键词搜索分页那一套。

第二,写出一个能真实运行的 arXiv OAI-PMH 增量采集器,字段包括标题、作者、分类、摘要、提交日期和 PDF 链接。

第三,把请求层、解析层、存储层、断点续跑、去重和导出串成一个小型工程,而不是一段临时脚本。

我个人比较喜欢这种题目,因为它不像“随便抓一个网页列表”那样只靠选择器吃饭。arXiv 本身提供了公开接口,真正的重点不是“能不能抓到”,而是如何把采集过程做得稳定、克制、可恢复、可维护。

1️⃣ 摘要(Abstract)

本文实现一个面向 arXiv 分类的增量元数据采集器,基于 arXiv OAI-PMH 公开接口,通过分类 set、datestamp 游标和 resumptionToken 分页机制,采集论文标题、作者、分类、摘要、提交日期与 PDF 链接,并将结果写入 SQLite 后导出 CSV。

读完本文,你将掌握:

  1. 如何用公开接口采集 arXiv 分类元数据,而不是用网页爬虫硬扒 HTML。
  2. 如何设计增量采集游标、去重策略和失败重试逻辑。
  3. 如何把采集、解析、清洗、存储做成一个结构清晰的小项目。

本文不会做 PDF 批量下载,也不会绕过登录、付费或访问限制。我们只处理公开的描述性元数据,重点放在工程实现和数据管道设计上。

2️⃣ 背景与需求(Why)

arXiv 是科研论文预印本领域非常常见的数据源。很多人第一次接触 arXiv 抓取,会直接从网页搜索结果页开始,比如搜索关键词 “large language model”,然后一页一页翻,把标题、作者和摘要提取出来。

这种做法能跑,但它不是最适合“分类新增元数据采集”的方式。

原因很简单:关键词搜索适合临时检索,分类增量适合持续同步。

假设我们想每天追踪 cs.AIcs.LGstat.ML 这类分类下新增或更新的论文。如果继续用关键词分页,会遇到几个问题:

第一,关键词会引入语义偏差。你搜 “LLM” 和搜 “large language model” 得到的结果不一定等价,而且论文标题或摘要里不一定写了你关心的词。

第二,分页稳定性不够理想。大结果集翻页时,排序、更新、重复、漏抓都要额外处理。

第三,它不符合增量同步的思路。我们真正想要的是“从上次同步的位置继续拉取变化”,而不是每次重跑整个搜索。

因此,这篇文章换一个角度:不抓搜索页,不用关键词分页,而是使用 arXiv 的 OAI-PMH 元数据接口,按分类 set 做增量采集。

本次目标字段如下:

字段 说明
标题 论文标题
作者 作者列表,保留顺序
分类 arXiv 分类,例如 cs.AI、cs.LG
摘要 论文摘要文本
提交日期 arXiv 元数据里的 created 日期
PDF 链接 根据 arXiv ID 拼接 PDF URL
arXiv ID 用于唯一标识论文
OAI Identifier OAI-PMH 记录标识
metadata_datestamp OAI-PMH 记录最后修改日期,用作增量游标
抓取时间 本地采集时间

虽然题目里只要求标题、作者、分类、摘要、提交日期和 PDF 链接,但实际工程中我建议额外存储 arxiv_idoai_identifiermetadata_datestamp。这三个字段不一定展示给最终用户,却是去重和断点续跑的关键。

3️⃣ 合规与注意事项(必写)

写爬虫时,我一直觉得有一条底线:能用公开接口就不要硬抓页面,能增量就不要全量,能慢一点就不要并发轰炸。

3.1 robots.txt 的基本说明

robots.txt 是网站给自动化访问程序看的访问规则说明。它通常告诉爬虫哪些路径可以访问,哪些路径不建议访问。它不是登录权限,也不是法律意见书,但作为技术人员,尊重 robots.txt 是基本礼貌。

对 arXiv 这类公共学术服务来说,尤其不应该做无差别下载。arXiv 上不仅有元数据,还有大量 PDF、源码包、历史版本文件。如果脚本不加控制地从列表页开始递归抓取,很容易对服务造成压力。

本文不做 HTML 列表页爬取,也不批量下载 PDF,只采集公开元数据,并且使用官方提供的机器访问接口。

3.2 频率控制

本项目默认设置为:

REQUEST_INTERVAL = 3.2
TIMEOUT = 30
MAX_RETRIES = 5

也就是说,每次请求之间至少间隔 3 秒以上。这个速度不快,但足够稳定。采集论文元数据不是抢票,没有必要用攻击式并发。

我建议新手不要一上来就写 asyncio,也不要把线程池开到几十上百。很多时候,爬虫失败不是因为速度慢,而是因为太急,导致被限流、被拒绝,最后还要花更多时间排错。

3.3 不采集敏感信息

本文只采集论文公开元数据,包括标题、作者、摘要、分类、提交日期和链接。

不采集用户账号信息,不尝试访问需要登录的页面,不绕过付费限制,不下载受限制资源,不伪装成正常用户进行非必要访问。

3.4 不批量下载 PDF

字段里包含 PDF 链接,并不代表要下载 PDF。本文只是把 PDF URL 存起来,方便后续人工访问或在合规场景下跳转查看。

元数据同步和全文下载是两件不同的事。很多爬虫项目出问题,就是因为一开始只想抓点标题,后来顺手把 PDF、源码包也加进来,最后流量规模完全失控。

4️⃣ 技术选型与整体流程(What/How)

4.1 静态网页、动态网页和 API 的区别

常见采集方式大致分三类:

第一类是静态网页采集。页面 HTML 中已经包含目标数据,用 requests 拿 HTML,再用 BeautifulSoup、lxml、XPath 或 CSS 选择器解析。

第二类是动态网页采集。数据由 JavaScript 后加载,直接请求页面只能拿到空壳。这时要么分析接口,要么用 Playwright、Selenium 这类浏览器自动化工具。

第三类是公开 API 或数据源。服务方明确提供机器可读的 XML、JSON、CSV 或其他格式,适合程序化采集。

这篇属于第三类:公开接口采集

更准确地说,我们选的是 arXiv 的 OAI-PMH 元数据接口。OAI-PMH 是面向开放档案元数据同步的协议,天然包含增量采集、记录标识、时间戳和分页 token 的设计,非常适合这篇文章的需求。

4.2 为什么不用关键词分页

arXiv 普通 API 支持 search_querystartmax_resultssortBysortOrder。如果只是临时搜几十篇论文,普通 API 很方便。

但本篇主题是“分类新增元数据抓取”。我们的目标不是搜索某个关键词,而是持续同步某个分类下的新增或更新记录。因此,OAI-PMH 更合适。

本项目采用:

OAI-PMH ListRecords
metadataPrefix=arXiv
set=cs:cs:AI
from=上次采集到的 datestamp

其中:

cs:cs:AI

可以理解为 arXiv OAI-PMH 的分类 set 表示方式。实际项目里,你也可以换成其他分类对应的 set。

4.3 整体流程

本文项目流程如下:

采集 Fetch
    ↓
解析 Parse
    ↓
清洗 Normalize
    ↓
存储 Store
    ↓
导出 Export

展开一点看:

读取配置
    ↓
读取本地游标 last_datestamp
    ↓
请求 OAI-PMH ListRecords
    ↓
解析 XML records
    ↓
提取 arXiv ID、标题、作者、分类、摘要、提交日期
    ↓
拼接 PDF 链接
    ↓
写入 SQLite,按 arxiv_id 去重
    ↓
更新游标
    ↓
导出 CSV

4.4 为什么选 requests + lxml + SQLite

这套组合很朴素,但非常适合本文。

requests 负责 HTTP 请求。它稳定、直观,处理 headers、timeout、重试都方便。

lxml 负责 XML 解析。OAI-PMH 返回的是 XML,用 lxml 的 XPath 处理命名空间会更舒服。

SQLite 负责本地存储。它不需要额外部署数据库服务,一个文件就是一个数据库,适合教程、实验、小型同步任务。

CSV 负责结果导出。它方便打开、方便分享,也方便后续导入 pandas、Excel 或 BI 工具。

为什么不用 Playwright?因为这里没有动态页面,不需要浏览器渲染。

为什么不用 Scrapy?Scrapy 很强,但本文任务并不需要复杂调度器和分布式能力。等数据规模更大、分类更多、任务更复杂时,再迁移 Scrapy 也不迟。

5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

建议使用:

Python 3.10+

我个人推荐 3.11 或 3.12。本文代码没有用太新的语法,3.10 以上都比较稳。

查看版本:

python --version

5.2 创建项目目录

推荐目录如下:

arxiv-category-harvester/
├── requirements.txt
├── README.md
├── data/
│   ├── .gitkeep
├── arxiv_harvester/
│   ├── __init__.py
│   ├── config.py
│   ├── fetcher.py
│   ├── parser.py
│   ├── storage.py
│   └── main.py
└── scripts/
    └── run_demo.sh

先创建目录:

mkdir -p arxiv-category-harvester/arxiv_harvester
mkdir -p arxiv-category-harvester/data
mkdir -p arxiv-category-harvester/scripts
cd arxiv-category-harvester

touch arxiv_harvester/__init__.py
touch data/.gitkeep

5.3 requirements.txt

新建 requirements.txt

requests==2.32.3
lxml==5.3.0
python-dateutil==2.9.0.post0

安装依赖:

pip install -r requirements.txt

如果你习惯使用虚拟环境:

python -m venv .venv

# macOS / Linux
source .venv/bin/activate

# Windows PowerShell
# .venv\Scripts\Activate.ps1

pip install -r requirements.txt

6️⃣ 核心实现:请求层(Fetcher)

请求层要解决四件事:

  1. 带上合适的 headers。
  2. 设置 timeout,避免请求永久卡住。
  3. 控制请求频率。
  4. 失败时重试,并带退避等待。

本项目不需要登录,也不需要 cookie。session 的作用主要是连接复用和统一 headers。

6.1 配置文件 config.py

新建 arxiv_harvester/config.py

from __future__ import annotations

from dataclasses import dataclass
from pathlib import Path


@dataclass(frozen=True)
class Settings:
    """
    项目全局配置。

    注意:
    1. set_spec 可以按需要替换,例如 cs:cs:LG、cs:cs:AI。
    2. User-Agent 建议写清楚项目名称和联系邮箱。
    3. request_interval 不建议低于 3 秒。
    """

    base_url: str = "https://oaipmh.arxiv.org/oai"
    metadata_prefix: str = "arXiv"
    set_spec: str = "cs:cs:AI"

    user_agent: str = (
        "ArxivCategoryHarvester/0.1 "
        "(metadata research demo; mailto:your_email@example.com)"
    )
    referer: str = "https://arxiv.org/"

    timeout: int = 30
    max_retries: int = 5
    request_interval: float = 3.2

    db_path: Path = Path("data/arxiv_metadata.sqlite")
    csv_path: Path = Path("data/arxiv_metadata.csv")
    state_path: Path = Path("data/harvest_state.json")


DEFAULT_SETTINGS = Settings()

这里有几个细节值得说一下。

User-Agent 不要留空,也不要写成浏览器 UA 去假装普通用户。更推荐写清楚你的项目名和联系邮箱。

Referer 对 OAI-PMH 来说不是必须的,但作为请求头放上也没问题。

Cookie 不需要。凡是不需要登录的公开接口,都不应该额外引入 cookie 复杂度。

6.2 请求层 fetcher.py

新建 arxiv_harvester/fetcher.py

from __future__ import annotations

import logging
import random
import time
from typing import Dict, Iterator, Optional

import requests

from .config import Settings

logger = logging.getLogger(__name__)


class FetchError(RuntimeError):
    """请求层统一异常。"""


class OAIHarvester:
    """
    arXiv OAI-PMH 请求器。

    负责:
    1. 控制请求频率;
    2. 自动重试;
    3. 处理 resumptionToken;
    4. 返回每一页 XML bytes。
    """

    def __init__(self, settings: Settings):
        self.settings = settings
        self.session = requests.Session()
        self.session.headers.update(
            {
                "User-Agent": settings.user_agent,
                "Accept": "application/xml,text/xml;q=0.9,*/*;q=0.8",
                "Referer": settings.referer,
            }
        )
        self._last_request_at = 0.0

    def _respect_rate_limit(self) -> None:
        """
        简单的单进程限速器。

        如果距离上一次请求不足 request_interval,就主动 sleep。
        这里没有做多进程共享锁。如果你后续改成多进程或分布式,
        需要把限速状态放到 Redis、数据库或任务队列层。
        """
        now = time.monotonic()
        elapsed = now - self._last_request_at
        wait_seconds = self.settings.request_interval - elapsed

        if wait_seconds > 0:
            logger.debug("Rate limit sleep %.2f seconds", wait_seconds)
            time.sleep(wait_seconds)

    def _request(self, params: Dict[str, str]) -> bytes:
        """
        发起一次 GET 请求,带重试和指数退避。

        OAI-PMH 常见参数:
        - verb=ListRecords
        - metadataPrefix=arXiv
        - set=cs:cs:AI
        - from=YYYY-MM-DD
        - resumptionToken=...
        """
        last_error: Optional[BaseException] = None

        for attempt in range(1, self.settings.max_retries + 1):
            self._respect_rate_limit()

            try:
                logger.info("Request attempt=%s params=%s", attempt, params)
                response = self.session.get(
                    self.settings.base_url,
                    params=params,
                    timeout=self.settings.timeout,
                )
                self._last_request_at = time.monotonic()

                if response.status_code == 200:
                    return response.content

                if response.status_code in {429, 500, 502, 503, 504}:
                    retry_after = response.headers.get("Retry-After")
                    if retry_after and retry_after.isdigit():
                        sleep_seconds = int(retry_after)
                    else:
                        sleep_seconds = min(60, 2**attempt) + random.uniform(0, 1.5)

                    logger.warning(
                        "Retryable HTTP status=%s, sleep %.2f seconds",
                        response.status_code,
                        sleep_seconds,
                    )
                    time.sleep(sleep_seconds)
                    continue

                raise FetchError(
                    f"Unexpected HTTP status {response.status_code}: "
                    f"{response.text[:500]}"
                )

            except requests.RequestException as exc:
                last_error = exc
                sleep_seconds = min(60, 2**attempt) + random.uniform(0, 1.5)
                logger.warning(
                    "Request exception attempt=%s error=%r, sleep %.2f seconds",
                    attempt,
                    exc,
                    sleep_seconds,
                )
                time.sleep(sleep_seconds)

        raise FetchError(f"Request failed after retries: {last_error!r}")

    def list_records(
        self,
        set_spec: str,
        metadata_prefix: str,
        from_date: Optional[str] = None,
        limit_pages: Optional[int] = None,
    ) -> Iterator[bytes]:
        """
        分页拉取 OAI-PMH ListRecords。

        首次请求使用 set/from/metadataPrefix。
        后续如果返回 resumptionToken,请求时只带:
        - verb=ListRecords
        - resumptionToken=xxx

        limit_pages 用于本地测试,避免第一次运行拉太多。
        """
        page = 0
        token: Optional[str] = None

        while True:
            page += 1

            if token:
                params = {
                    "verb": "ListRecords",
                    "resumptionToken": token,
                }
            else:
                params = {
                    "verb": "ListRecords",
                    "metadataPrefix": metadata_prefix,
                    "set": set_spec,
                }
                if from_date:
                    params["from"] = from_date

            xml_bytes = self._request(params)
            yield xml_bytes

            # token 的解析放在 parser 里更合适,但请求层需要知道是否继续。
            # 为了避免循环依赖,这里做一个轻量字符串判断不解析业务字段。
            from .parser import extract_resumption_token

            token = extract_resumption_token(xml_bytes)

            if not token:
                logger.info("No resumptionToken, harvest finished.")
                break

            if limit_pages is not None and page >= limit_pages:
                logger.info("Reach limit_pages=%s, stop early.", limit_pages)
                break

这里有个我比较坚持的小习惯:请求层不直接吞异常。很多教程为了让代码“看起来能跑”,会写一堆空的 except Exception: pass。短期看脚本不报错,长期看问题都被藏起来了。

真实项目里,失败要么重试,要么记录,要么抛出去让主流程处理。不要悄悄失败。

7️⃣ 核心实现:解析层(Parser)

arXiv OAI-PMH 返回的是 XML。解析层的任务是把 XML record 转成 Python 对象。

这里不使用 BeautifulSoup,因为 BeautifulSoup 更适合宽松 HTML。对于结构化 XML,我更喜欢 lxml。

7.1 字段模型

新建 arxiv_harvester/parser.py

from __future__ import annotations

import hashlib
import json
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import List, Optional, Tuple

from lxml import etree


OAI_NS = "http://www.openarchives.org/OAI/2.0/"
ARXIV_NS = "http://arxiv.org/OAI/arXiv/"

NS = {
    "oai": OAI_NS,
    "ax": ARXIV_NS,
}


@dataclass
class PaperRecord:
    """
    论文元数据记录。

    authors 和 categories 用 list 保存;
    入库时再转 JSON 字符串,导出 CSV 时也可以转成分隔字符串。
    """

    arxiv_id: str
    oai_identifier: str
    title: str
    authors: List[str]
    categories: List[str]
    primary_category: str
    abstract: str
    submitted_date: str
    updated_date: str
    metadata_datestamp: str
    pdf_url: str
    set_specs: List[str]
    content_hash: str
    fetched_at: str

    def to_storage_dict(self) -> dict:
        data = asdict(self)
        data["authors"] = json.dumps(self.authors, ensure_ascii=False)
        data["categories"] = json.dumps(self.categories, ensure_ascii=False)
        data["set_specs"] = json.dumps(self.set_specs, ensure_ascii=False)
        return data


def normalize_space(value: Optional[str]) -> str:
    """
    清洗多余空白。

    arXiv 摘要和标题里经常有换行、多个空格。
    为了导出 CSV 后更规整,这里统一压缩空白。
    """
    if not value:
        return ""
    return " ".join(value.split())


def text_of(node: etree._Element, xpath: str) -> str:
    result = node.xpath(xpath, namespaces=NS)
    if not result:
        return ""

    first = result[0]
    if isinstance(first, etree._Element):
        return normalize_space(first.text)
    return normalize_space(str(first))


def texts_of(node: etree._Element, xpath: str) -> List[str]:
    values = node.xpath(xpath, namespaces=NS)
    cleaned: List[str] = []

    for value in values:
        if isinstance(value, etree._Element):
            text = normalize_space(value.text)
        else:
            text = normalize_space(str(value))

        if text:
            cleaned.append(text)

    return cleaned


def build_pdf_url(arxiv_id: str) -> str:
    """
    根据 arXiv ID 构造 PDF 链接。

    注意:本文只保存链接,不批量下载 PDF。
    """
    return f"https://arxiv.org/pdf/{arxiv_id}"


def make_content_hash(
    title: str,
    authors: List[str],
    categories: List[str],
    abstract: str,
    submitted_date: str,
    updated_date: str,
    pdf_url: str,
) -> str:
    payload = {
        "title": title,
        "authors": authors,
        "categories": categories,
        "abstract": abstract,
        "submitted_date": submitted_date,
        "updated_date": updated_date,
        "pdf_url": pdf_url,
    }
    raw = json.dumps(payload, ensure_ascii=False, sort_keys=True)
    return hashlib.sha256(raw.encode("utf-8")).hexdigest()


def parse_author(author_node: etree._Element) -> str:
    """
    解析作者姓名。

    arXiv OAI 的作者结构一般包含:
    - keyname
    - forenames
    - suffix
    也可能出现 name 字段。

    这里做容错处理:
    1. 优先读取 name;
    2. 否则拼 forenames + keyname + suffix;
    3. 仍然为空则返回空字符串,由上层过滤。
    """
    name = text_of(author_node, "string(ax:name)")
    if name:
        return name

    forenames = text_of(author_node, "string(ax:forenames)")
    keyname = text_of(author_node, "string(ax:keyname)")
    suffix = text_of(author_node, "string(ax:suffix)")

    parts = [part for part in [forenames, keyname, suffix] if part]
    return normalize_space(" ".join(parts))


def extract_resumption_token(xml_bytes: bytes) -> Optional[str]:
    """
    从 OAI-PMH 响应中提取 resumptionToken。

    没有 token 表示当前结果已经结束。
    """
    parser = etree.XMLParser(
        resolve_entities=False,
        no_network=True,
        recover=False,
    )
    root = etree.fromstring(xml_bytes, parser=parser)

    token = root.xpath("string(.//oai:resumptionToken)", namespaces=NS)
    token = normalize_space(token)

    return token or None


def parse_oai_errors(root: etree._Element) -> List[str]:
    errors = []
    for error_node in root.xpath(".//oai:error", namespaces=NS):
        code = error_node.get("code", "")
        message = normalize_space(error_node.text)
        errors.append(f"{code}: {message}")
    return errors


def parse_records(xml_bytes: bytes) -> Tuple[List[PaperRecord], Optional[str], List[str]]:
    """
    解析一页 OAI-PMH XML。

    返回:
    - records: 论文记录列表
    - token: 下一页 resumptionToken
    - errors: OAI-PMH 层面的错误信息
    """
    parser = etree.XMLParser(
        resolve_entities=False,
        no_network=True,
        recover=False,
    )
    root = etree.fromstring(xml_bytes, parser=parser)

    errors = parse_oai_errors(root)
    token = extract_resumption_token(xml_bytes)

    if errors:
        return [], token, errors

    now = datetime.now(timezone.utc).isoformat(timespec="seconds")
    records: List[PaperRecord] = []

    record_nodes = root.xpath(".//oai:ListRecords/oai:record", namespaces=NS)

    for record_node in record_nodes:
        header = record_node.find("oai:header", namespaces=NS)
        if header is None:
            continue

        # OAI-PMH 可能返回删除记录。本文采集论文元数据,遇到 deleted 先跳过。
        if header.get("status") == "deleted":
            continue

        oai_identifier = text_of(header, "string(oai:identifier)")
        metadata_datestamp = text_of(header, "string(oai:datestamp)")
        set_specs = texts_of(header, "oai:setSpec/text()")

        meta_node = record_node.find(".//ax:arXiv", namespaces=NS)
        if meta_node is None:
            # 缺失 metadata 时跳过,但不让整个程序崩掉。
            continue

        arxiv_id = text_of(meta_node, "string(ax:id)")
        title = text_of(meta_node, "string(ax:title)")
        abstract = text_of(meta_node, "string(ax:abstract)")
        submitted_date = text_of(meta_node, "string(ax:created)")
        updated_date = text_of(meta_node, "string(ax:updated)")

        categories_raw = text_of(meta_node, "string(ax:categories)")
        categories = [item for item in categories_raw.split() if item]
        primary_category = categories[0] if categories else ""

        authors = []
        for author_node in meta_node.xpath(".//ax:authors/ax:author", namespaces=NS):
            author_name = parse_author(author_node)
            if author_name:
                authors.append(author_name)

        pdf_url = build_pdf_url(arxiv_id) if arxiv_id else ""

        # 最低限度字段校验。标题、ID、摘要缺失时仍可保留,但 ID 缺失无法去重。
        if not arxiv_id:
            continue

        content_hash = make_content_hash(
            title=title,
            authors=authors,
            categories=categories,
            abstract=abstract,
            submitted_date=submitted_date,
            updated_date=updated_date,
            pdf_url=pdf_url,
        )

        records.append(
            PaperRecord(
                arxiv_id=arxiv_id,
                oai_identifier=oai_identifier,
                title=title,
                authors=authors,
                categories=categories,
                primary_category=primary_category,
                abstract=abstract,
                submitted_date=submitted_date,
                updated_date=updated_date,
                metadata_datestamp=metadata_datestamp,
                pdf_url=pdf_url,
                set_specs=set_specs,
                content_hash=content_hash,
                fetched_at=now,
            )
        )

    return records, token, errors

7.2 解析方式说明

本文用 XPath 解析 XML,不抓 HTML。

核心路径包括:

//oai:ListRecords/oai:record
record/oai:header/oai:identifier
record/oai:header/oai:datestamp
record/oai:header/oai:setSpec
record/oai:metadata/ax:arXiv/ax:id
record/oai:metadata/ax:arXiv/ax:title
record/oai:metadata/ax:arXiv/ax:authors/ax:author
record/oai:metadata/ax:arXiv/ax:categories
record/oai:metadata/ax:arXiv/ax:abstract
record/oai:metadata/ax:arXiv/ax:created

这和网页解析很不一样。网页解析经常会遇到 class 名变化、布局变化、广告插入、懒加载、AB 测试等问题。OAI-PMH XML 的稳定性通常更好,也更适合长期同步。

7.3 列表页如何拿详情链接

严格来说,本文没有“列表页”。

OAI-PMH 的 ListRecords 返回的每个 record 本身就包含元数据。我们不需要进入详情页再抽字段。

如果后续想给每篇论文补一个摘要页链接,可以根据 arXiv ID 拼接:

abs_url = f"https://arxiv.org/abs/{arxiv_id}"

PDF 链接同理:

pdf_url = f"https://arxiv.org/pdf/{arxiv_id}"

本文只保存 PDF 链接,不下载 PDF 文件。

7.4 缺失字段怎么办

缺失字段分三种情况处理:

第一,关键字段缺失,例如 arXiv ID 为空。无法去重,直接跳过。

第二,非关键字段缺失,例如 updated 为空。保留空字符串,不影响入库。

第三,结构不完整,例如 metadata 节点缺失。跳过该 record,并在日志里保留统计信息。

我不建议在解析层随便猜字段。比如作者缺失时,不要自己从标题里猜;分类缺失时,也不要根据 setSpec 自动补一个分类。采集器的职责是忠实搬运和规范化公开元数据,不是创造数据。

8️⃣ 数据存储与导出(Storage)

本文使用 SQLite 起步,同时导出 CSV。

SQLite 的好处是轻量、稳定、可迁移。你可以直接用 DB Browser for SQLite 打开,也可以后续迁移到 MySQL、PostgreSQL 或 Elasticsearch。

8.1 字段映射表

数据库字段 类型 示例值 说明
arxiv_id TEXT 2401.01234 arXiv ID,唯一
oai_identifier TEXT oai:arXiv.org:2401.01234 OAI-PMH 标识
title TEXT A Survey of … 标题
authors TEXT [“Alice Smith”, “Bob Lee”] JSON 数组字符串
categories TEXT [“cs.AI”, “cs.LG”] JSON 数组字符串
primary_category TEXT cs.AI 主分类
abstract TEXT This paper … 摘要
submitted_date TEXT 2024-01-03 提交日期
updated_date TEXT 2024-02-01 更新日期
metadata_datestamp TEXT 2024-02-01 OAI-PMH 元数据时间戳
pdf_url TEXT https://arxiv.org/pdf/2401.01234 PDF 链接
set_specs TEXT [“cs:cs:AI”] OAI-PMH set
content_hash TEXT sha256… 内容哈希
fetched_at TEXT 2026-06-09T… 本地抓取时间

8.2 去重策略

本文用两层去重。

第一层,数据库唯一键:

arxiv_id UNIQUE

同一篇论文重复出现时,执行 upsert。

第二层,内容 hash:

content_hash

如果同一篇论文标题、摘要、分类、作者等内容变化了,就更新记录;如果内容没变,就不做无意义更新。

这比单纯 URL 去重更可靠。因为本文不是网页爬虫,URL 不是核心对象,arXiv ID 才是核心对象。

8.3 storage.py

新建 arxiv_harvester/storage.py

from __future__ import annotations

import csv
import json
import logging
import sqlite3
from pathlib import Path
from typing import Iterable, Optional

from .parser import PaperRecord

logger = logging.getLogger(__name__)


class SQLiteStorage:
    """
    SQLite 存储层。

    负责:
    1. 建表;
    2. upsert 论文记录;
    3. 保存采集状态;
    4. 导出 CSV。
    """

    def __init__(self, db_path: Path):
        self.db_path = db_path
        self.db_path.parent.mkdir(parents=True, exist_ok=True)
        self.conn = sqlite3.connect(str(self.db_path))
        self.conn.row_factory = sqlite3.Row

    def close(self) -> None:
        self.conn.close()

    def init_db(self) -> None:
        self.conn.executescript(
            """
            CREATE TABLE IF NOT EXISTS papers (
                arxiv_id TEXT PRIMARY KEY,
                oai_identifier TEXT NOT NULL,
                title TEXT NOT NULL,
                authors TEXT NOT NULL,
                categories TEXT NOT NULL,
                primary_category TEXT,
                abstract TEXT,
                submitted_date TEXT,
                updated_date TEXT,
                metadata_datestamp TEXT NOT NULL,
                pdf_url TEXT NOT NULL,
                set_specs TEXT NOT NULL,
                content_hash TEXT NOT NULL,
                fetched_at TEXT NOT NULL
            );

            CREATE UNIQUE INDEX IF NOT EXISTS idx_papers_oai_identifier
            ON papers(oai_identifier);

            CREATE INDEX IF NOT EXISTS idx_papers_primary_category
            ON papers(primary_category);

            CREATE INDEX IF NOT EXISTS idx_papers_submitted_date
            ON papers(submitted_date);

            CREATE INDEX IF NOT EXISTS idx_papers_metadata_datestamp
            ON papers(metadata_datestamp);

            CREATE TABLE IF NOT EXISTS harvest_state (
                set_spec TEXT PRIMARY KEY,
                last_datestamp TEXT,
                last_run_at TEXT,
                total_records INTEGER NOT NULL DEFAULT 0
            );
            """
        )
        self.conn.commit()

    def upsert_many(self, records: Iterable[PaperRecord]) -> int:
        """
        批量 upsert。

        SQLite 的 ON CONFLICT 可以让我们用 arxiv_id 做主键去重。
        WHERE 子句避免内容 hash 没变时重复更新。
        """
        rows = [record.to_storage_dict() for record in records]
        if not rows:
            return 0

        sql = """
        INSERT INTO papers (
            arxiv_id,
            oai_identifier,
            title,
            authors,
            categories,
            primary_category,
            abstract,
            submitted_date,
            updated_date,
            metadata_datestamp,
            pdf_url,
            set_specs,
            content_hash,
            fetched_at
        )
        VALUES (
            :arxiv_id,
            :oai_identifier,
            :title,
            :authors,
            :categories,
            :primary_category,
            :abstract,
            :submitted_date,
            :updated_date,
            :metadata_datestamp,
            :pdf_url,
            :set_specs,
            :content_hash,
            :fetched_at
        )
        ON CONFLICT(arxiv_id) DO UPDATE SET
            oai_identifier = excluded.oai_identifier,
            title = excluded.title,
            authors = excluded.authors,
            categories = excluded.categories,
            primary_category = excluded.primary_category,
            abstract = excluded.abstract,
            submitted_date = excluded.submitted_date,
            updated_date = excluded.updated_date,
            metadata_datestamp = excluded.metadata_datestamp,
            pdf_url = excluded.pdf_url,
            set_specs = excluded.set_specs,
            content_hash = excluded.content_hash,
            fetched_at = excluded.fetched_at
        WHERE papers.content_hash != excluded.content_hash
           OR papers.metadata_datestamp != excluded.metadata_datestamp;
        """

        with self.conn:
            self.conn.executemany(sql, rows)

        return len(rows)

    def get_last_datestamp(self, set_spec: str) -> Optional[str]:
        row = self.conn.execute(
            "SELECT last_datestamp FROM harvest_state WHERE set_spec = ?",
            (set_spec,),
        ).fetchone()

        if not row:
            return None

        return row["last_datestamp"]

    def update_state(
        self,
        set_spec: str,
        last_datestamp: Optional[str],
        last_run_at: str,
        total_records: int,
    ) -> None:
        """
        保存采集游标。

        注意:
        last_datestamp 不要主动加一天。
        OAI-PMH 的 from 是闭区间语义时,下一轮重复抓到同一天数据并不可怕;
        upsert 会去重。贸然把游标加一天,反而可能漏掉同一 datestamp 的记录。
        """
        if not last_datestamp:
            return

        with self.conn:
            self.conn.execute(
                """
                INSERT INTO harvest_state (
                    set_spec,
                    last_datestamp,
                    last_run_at,
                    total_records
                )
                VALUES (?, ?, ?, ?)
                ON CONFLICT(set_spec) DO UPDATE SET
                    last_datestamp = excluded.last_datestamp,
                    last_run_at = excluded.last_run_at,
                    total_records = harvest_state.total_records + excluded.total_records;
                """,
                (set_spec, last_datestamp, last_run_at, total_records),
            )

    def count_papers(self) -> int:
        row = self.conn.execute("SELECT COUNT(*) AS c FROM papers").fetchone()
        return int(row["c"])

    def export_csv(self, csv_path: Path) -> None:
        csv_path.parent.mkdir(parents=True, exist_ok=True)

        rows = self.conn.execute(
            """
            SELECT
                arxiv_id,
                title,
                authors,
                categories,
                primary_category,
                abstract,
                submitted_date,
                updated_date,
                metadata_datestamp,
                pdf_url,
                fetched_at
            FROM papers
            ORDER BY metadata_datestamp DESC, arxiv_id DESC
            """
        ).fetchall()

        with csv_path.open("w", encoding="utf-8-sig", newline="") as f:
            writer = csv.writer(f)
            writer.writerow(
                [
                    "arxiv_id",
                    "title",
                    "authors",
                    "categories",
                    "primary_category",
                    "abstract",
                    "submitted_date",
                    "updated_date",
                    "metadata_datestamp",
                    "pdf_url",
                    "fetched_at",
                ]
            )

            for row in rows:
                authors = " | ".join(json.loads(row["authors"]))
                categories = " | ".join(json.loads(row["categories"]))

                writer.writerow(
                    [
                        row["arxiv_id"],
                        row["title"],
                        authors,
                        categories,
                        row["primary_category"],
                        row["abstract"],
                        row["submitted_date"],
                        row["updated_date"],
                        row["metadata_datestamp"],
                        row["pdf_url"],
                        row["fetched_at"],
                    ]
                )

        logger.info("Export CSV: %s rows=%s", csv_path, len(rows))

这里 CSV 使用 utf-8-sig,是为了在一些表格软件里打开中文或特殊字符时更省心。严格来说,UTF-8 就够了,但做数据交付时,少踩一点编码坑也挺好。

9️⃣ 运行方式与结果展示(必写)

9.1 main.py

新建 arxiv_harvester/main.py

from __future__ import annotations

import argparse
import logging
from datetime import datetime, timezone
from typing import Optional

from .config import DEFAULT_SETTINGS, Settings
from .fetcher import OAIHarvester
from .parser import parse_records
from .storage import SQLiteStorage


def setup_logging(verbose: bool = False) -> None:
    level = logging.DEBUG if verbose else logging.INFO

    logging.basicConfig(
        level=level,
        format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
    )


def max_datestamp(current: Optional[str], records_datestamps: list[str]) -> Optional[str]:
    """
    计算本页或本轮采集中的最大 datestamp。

    datestamp 通常是 YYYY-MM-DD 格式,字符串比较可以工作。
    如果后续接口返回更细粒度时间戳,也建议统一解析后比较。
    """
    values = [value for value in records_datestamps if value]
    if current:
        values.append(current)

    if not values:
        return current

    return max(values)


def build_settings(args: argparse.Namespace) -> Settings:
    """
    根据命令行参数覆盖默认配置。

    dataclass frozen=True,所以这里重新构造一个 Settings。
    """
    return Settings(
        base_url=DEFAULT_SETTINGS.base_url,
        metadata_prefix=args.metadata_prefix,
        set_spec=args.set,
        user_agent=args.user_agent or DEFAULT_SETTINGS.user_agent,
        referer=DEFAULT_SETTINGS.referer,
        timeout=args.timeout,
        max_retries=args.max_retries,
        request_interval=args.request_interval,
        db_path=args.db,
        csv_path=args.csv,
        state_path=DEFAULT_SETTINGS.state_path,
    )


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description="Harvest arXiv category metadata incrementally via OAI-PMH."
    )

    parser.add_argument(
        "--set",
        default=DEFAULT_SETTINGS.set_spec,
        help="OAI-PMH set spec, e.g. cs:cs:AI, cs:cs:LG",
    )
    parser.add_argument(
        "--metadata-prefix",
        default=DEFAULT_SETTINGS.metadata_prefix,
        help="OAI-PMH metadataPrefix, default: arXiv",
    )
    parser.add_argument(
        "--from-date",
        default=None,
        help="Start datestamp, format YYYY-MM-DD. If omitted, use stored state.",
    )
    parser.add_argument(
        "--limit-pages",
        type=int,
        default=None,
        help="Limit pages for demo/testing. Omit for full incremental harvest.",
    )
    parser.add_argument(
        "--db",
        type=lambda p: DEFAULT_SETTINGS.db_path.__class__(p),
        default=DEFAULT_SETTINGS.db_path,
        help="SQLite database path.",
    )
    parser.add_argument(
        "--csv",
        type=lambda p: DEFAULT_SETTINGS.csv_path.__class__(p),
        default=DEFAULT_SETTINGS.csv_path,
        help="CSV export path.",
    )
    parser.add_argument(
        "--timeout",
        type=int,
        default=DEFAULT_SETTINGS.timeout,
        help="HTTP timeout seconds.",
    )
    parser.add_argument(
        "--max-retries",
        type=int,
        default=DEFAULT_SETTINGS.max_retries,
        help="Max retries for retryable errors.",
    )
    parser.add_argument(
        "--request-interval",
        type=float,
        default=DEFAULT_SETTINGS.request_interval,
        help="Minimum seconds between requests.",
    )
    parser.add_argument(
        "--user-agent",
        default=None,
        help="Custom User-Agent. Recommend including contact email.",
    )
    parser.add_argument(
        "--verbose",
        action="store_true",
        help="Enable debug logging.",
    )

    return parser.parse_args()


def main() -> None:
    args = parse_args()
    setup_logging(args.verbose)

    logger = logging.getLogger("arxiv_harvester.main")
    settings = build_settings(args)

    storage = SQLiteStorage(settings.db_path)
    storage.init_db()

    try:
        from_date = args.from_date or storage.get_last_datestamp(settings.set_spec)

        logger.info("Start harvest set=%s from=%s", settings.set_spec, from_date)

        harvester = OAIHarvester(settings)
        total_seen = 0
        total_saved = 0
        latest_datestamp: Optional[str] = from_date

        for page_index, xml_bytes in enumerate(
            harvester.list_records(
                set_spec=settings.set_spec,
                metadata_prefix=settings.metadata_prefix,
                from_date=from_date,
                limit_pages=args.limit_pages,
            ),
            start=1,
        ):
            records, token, errors = parse_records(xml_bytes)

            if errors:
                logger.warning("OAI errors on page=%s errors=%s", page_index, errors)
                # 常见 noRecordsMatch 不应该让程序崩掉。
                if any("noRecordsMatch" in error for error in errors):
                    break
                raise RuntimeError(f"OAI-PMH errors: {errors}")

            saved_count = storage.upsert_many(records)
            total_seen += len(records)
            total_saved += saved_count

            latest_datestamp = max_datestamp(
                latest_datestamp,
                [record.metadata_datestamp for record in records],
            )

            logger.info(
                "Page=%s records=%s saved=%s token=%s latest_datestamp=%s",
                page_index,
                len(records),
                saved_count,
                bool(token),
                latest_datestamp,
            )

        run_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
        storage.update_state(
            set_spec=settings.set_spec,
            last_datestamp=latest_datestamp,
            last_run_at=run_at,
            total_records=total_seen,
        )

        storage.export_csv(settings.csv_path)

        logger.info(
            "Done. seen=%s saved=%s total_in_db=%s db=%s csv=%s",
            total_seen,
            total_saved,
            storage.count_papers(),
            settings.db_path,
            settings.csv_path,
        )

    finally:
        storage.close()


if __name__ == "__main__":
    main()

9.2 修正一个小兼容点

上面 argparse 里对 Path 的写法能跑,但不够直观。为了让代码更清爽,可以把 main.py 顶部补一个导入:

from pathlib import Path

然后将参数改成:

parser.add_argument(
    "--db",
    type=Path,
    default=DEFAULT_SETTINGS.db_path,
    help="SQLite database path.",
)

parser.add_argument(
    "--csv",
    type=Path,
    default=DEFAULT_SETTINGS.csv_path,
    help="CSV export path.",
)

完整项目里建议使用 Path 这个版本,更容易读。

9.3 scripts/run_demo.sh

新建 scripts/run_demo.sh

#!/usr/bin/env bash
set -euo pipefail

python -m arxiv_harvester.main \
  --set cs:cs:AI \
  --from-date 2026-01-01 \
  --limit-pages 1 \
  --db data/arxiv_metadata.sqlite \
  --csv data/arxiv_metadata.csv \
  --request-interval 3.2 \
  --verbose

赋予执行权限:

chmod +x scripts/run_demo.sh

运行:

./scripts/run_demo.sh

或者直接运行:

python -m arxiv_harvester.main \
  --set cs:cs:AI \
  --from-date 2026-01-01 \
  --limit-pages 1

第一次测试时,建议带 --limit-pages 1。等确认解析、入库、导出都正常,再去掉限制。

9.4 输出在哪里

默认输出:

data/arxiv_metadata.sqlite
data/arxiv_metadata.csv

查看 SQLite:

sqlite3 data/arxiv_metadata.sqlite

进入后执行:

.tables

SELECT
  arxiv_id,
  title,
  primary_category,
  submitted_date,
  pdf_url
FROM papers
ORDER BY metadata_datestamp DESC
LIMIT 5;

9.5 示例结果

下面是结果格式示例,实际内容以运行时接口返回为准:

arxiv_id title authors categories submitted_date pdf_url
2601.00001 Example Paper Title for AI Systems Alice Zhang Bob Chen cs.AI cs.LG
2601.00002 A Practical Study on Machine Reasoning Carol Smith cs.AI 2026-01-01 https://arxiv.org/pdf/2601.00002
2601.00003 Efficient Metadata Pipelines for Research Search Daniel Lee Eva Wang cs.DL cs.AI
2601.00004 Robust Retrieval over Scientific Abstracts Frank Miller cs.IR cs.AI 2026-01-02

注意,这里的表格只是展示 CSV 形态,不是固定数据。arXiv 每天新增内容不同,分类返回也会随时间变化。

🔟 常见问题与排错(强烈建议写)

10.1 403 或 429 怎么办

403 一般代表访问被拒绝,429 一般代表请求过多。处理思路如下:

第一,降低频率。把 --request-interval 调大,比如 5 秒、10 秒。

python -m arxiv_harvester.main \
  --set cs:cs:AI \
  --request-interval 10

第二,确认 User-Agent。不要使用空 UA,也不要伪装成浏览器做大规模访问。

第三,减少任务范围。第一次运行不要全量同步,可以先用 --from-date--limit-pages 做小范围测试。

第四,不要用代理池去绕限制。代理池不是解决合规问题的工具。对于公开学术接口,应该主动控制访问规模。

第五,确认自己没有同时开多个采集进程。本文的限速器是单进程内生效,如果你开了多个终端同时跑,总请求频率会叠加。

10.2 HTML 抓到空壳怎么办

本文不抓 HTML,所以正常情况下不会遇到“页面空壳”的问题。

如果你在其他项目中遇到 HTML 里没有数据,通常有两种原因:

第一,数据由 JavaScript 动态加载。解决方式是分析真实接口,或者用 Playwright 渲染页面。

第二,服务端根据 headers、cookie 或登录状态返回不同内容。解决方式是检查网络面板,确认请求是否需要认证。

但对于本项目,最推荐的路线仍然是:不要从 arXiv 搜索页抓 HTML,优先使用公开接口。

10.3 XML 解析报错怎么办

常见错误包括:

lxml.etree.XMLSyntaxError

排查步骤:

第一,打印响应前 500 个字符:

print(xml_bytes[:500].decode("utf-8", errors="replace"))

看看返回的是不是 XML。有时候你以为拿到了 OAI-PMH,实际拿到的是错误页或网关提示。

第二,检查命名空间。XML 解析最容易踩的坑就是 namespace。不要写:

root.xpath("//record")

应该写:

root.xpath("//oai:record", namespaces=NS)

第三,检查接口错误。OAI-PMH 可能返回:

<error code="noRecordsMatch">...</error>

这不是 XML 解析失败,而是接口明确告诉你没有匹配记录。

10.4 解析出来字段为空怎么办

字段为空一般有三种可能:

第一,XPath 写错。尤其是命名空间错了。

第二,metadataPrefix 不对。本文使用的是 arXiv,不是 oai_dc。如果你换成 Dublin Core,字段结构会完全不同。

第三,记录本身缺字段。采集器要允许非关键字段为空,不要因为一条记录不完整导致整轮任务失败。

10.5 编码或乱码如何处理

OAI-PMH XML 通常是 UTF-8。requests 返回 bytes,lxml 直接解析 bytes 即可,不建议先手动 decode 再 parse。

导出 CSV 时,本文用了:

encoding="utf-8-sig"

这是为了兼容一些表格软件。如果你后续只用 pandas 或数据库处理,普通 UTF-8 也可以。

10.6 为什么本次运行没有新增数据

可能原因包括:

第一,from-date 设置太晚,这个分类在该时间之后没有更新。

第二,OAI-PMH datestamp 是元数据修改日期,不等同于论文原始提交日期。你用 from-date 拉的是“元数据变更”,不是“提交日期筛选”。

第三,分类 set 写错。例如把 cs:cs:AI 写成了 cs.AI。OAI-PMH set 和 arXiv 分类名不是同一个字符串。

第四,本地已经抓过。由于 upsert 去重,重复记录不会造成新增数量上涨。

1️⃣1️⃣ 进阶优化(可选但加分)

11.1 并发优化

本文刻意没有做并发。原因不是不会,而是不适合一上来做。

如果后续你要采集多个分类,例如:

cs:cs:AI
cs:cs:LG
cs:cs:CL
stat:stat:ML

也不要简单地开四个线程同时请求。更稳妥的做法是:

  1. 分类任务排队。
  2. 全局限速器统一控制请求。
  3. 每个分类维护自己的 state。
  4. 入库层用同一个 arxiv_id 去重。

也就是说,并发可以有,但请求出口必须克制。

一个简单的多分类配置可以这样设计:

CATEGORY_SETS = [
    "cs:cs:AI",
    "cs:cs:LG",
    "cs:cs:CL",
    "stat:stat:ML",
]

主流程改成:

for set_spec in CATEGORY_SETS:
    harvest_one_set(set_spec)

先串行跑通,比盲目并发更稳。

11.2 断点续跑

本文已经实现了基本断点续跑:

harvest_state.last_datestamp

每次运行时,如果没有传 --from-date,程序会读取本地 state,从上一次 datestamp 继续。

这里有一个细节:不要把游标主动加一天。

例如上次最大 datestamp 是:

2026-01-05

下一次仍然从:

from=2026-01-05

开始。这样可能重复抓到一部分数据,但不会漏掉同一天后续更新的记录。重复由数据库 upsert 处理。

如果你希望更严谨,可以增加一个二级游标:

last_datestamp
last_oai_identifier

当 datestamp 粒度较粗时,用 (datestamp, identifier) 组合判断是否已经处理。但对于本文这个起步项目,重复拉取同一天数据再 upsert,已经足够实用。

11.3 日志与监控

最低限度建议记录:

指标 说明
request_count 请求次数
page_count OAI-PMH 页数
records_seen 解析到的记录数
records_saved 入库记录数
error_count 错误次数
latest_datestamp 当前最大游标
duration_seconds 任务耗时

可以在 main.py 中增加统计:

import time

started = time.monotonic()

# harvest ...

duration = time.monotonic() - started
logger.info("Duration %.2f seconds", duration)

如果项目部署到服务器,可以把日志写入文件:

logging.basicConfig(
    filename="data/harvester.log",
    level=logging.INFO,
    format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)

更进一步,可以接 Prometheus、Grafana 或者简单地每天发一封运行报告邮件。不过我建议先把本地日志做好,不要过早上复杂监控。

11.4 定时任务

Linux/macOS 可以用 cron:

crontab -e

加入:

30 10 * * * cd /path/to/arxiv-category-harvester && /path/to/.venv/bin/python -m arxiv_harvester.main --set cs:cs:AI >> data/cron.log 2>&1

这表示每天 10:30 运行一次。具体时间可以按你的业务需求调整。

如果任务变多,可以考虑 Airflow、Prefect、Dagster 这类调度系统。但对一个分类元数据同步任务来说,cron 完全够用。

11.5 数据分析延伸

有了 SQLite 后,可以很方便地做分析。

例如统计每月论文数量:

SELECT
  substr(submitted_date, 1, 7) AS month,
  COUNT(*) AS paper_count
FROM papers
WHERE submitted_date IS NOT NULL
  AND submitted_date != ''
GROUP BY month
ORDER BY month;

统计热门分类:

SELECT
  primary_category,
  COUNT(*) AS paper_count
FROM papers
GROUP BY primary_category
ORDER BY paper_count DESC
LIMIT 20;

如果想用 Python 分析:

import sqlite3
import pandas as pd

conn = sqlite3.connect("data/arxiv_metadata.sqlite")

df = pd.read_sql_query(
    """
    SELECT arxiv_id, title, primary_category, submitted_date, pdf_url
    FROM papers
    ORDER BY submitted_date DESC
    LIMIT 100
    """,
    conn,
)

print(df.head())

后续可以做:

  1. 每日新增论文邮件摘要。
  2. 按分类生成趋势图。
  3. 摘要关键词提取。
  4. 作者合作网络分析。
  5. 接入向量数据库做语义检索。

1️⃣2️⃣ 总结与延伸阅读

这篇文章完成了一套面向 arXiv 分类的增量元数据采集器。

我们没有从 HTML 搜索页硬抓,也没有沿用关键词分页,而是选择了更适合元数据同步的 OAI-PMH 接口。整个项目按请求层、解析层、存储层拆开,支持限速、重试、resumptionToken 分页、SQLite 去重、游标续跑和 CSV 导出。

最终采集字段包括:

标题
作者
分类
摘要
提交日期
PDF 链接

同时额外保留:

arXiv ID
OAI Identifier
metadata_datestamp
content_hash
fetched_at

这些额外字段是工程上真正让系统稳定起来的东西。很多教程只展示“抓到了标题”,但生产环境更关心“下次怎么继续抓”“重复怎么处理”“失败了怎么恢复”“字段变了怎么排查”。

下一步可以继续做几件事:

第一,把单分类扩展到多分类,并设计统一限速队列。

第二,把 SQLite 换成 PostgreSQL,支持更复杂的查询和多人访问。

第三,增加邮件或飞书通知,每天推送新增论文摘要。

第四,接入向量检索,对摘要做 embedding,构建本地论文搜索引擎。

第五,迁移到 Scrapy 或 Airflow,做成更标准的数据管道。

第六,如果确实要处理动态网页或非公开 API,再考虑 Playwright。但在本文这个场景里,公开元数据接口已经是最干净的选择。

爬虫不是越快越好,也不是越隐蔽越好。真正耐用的采集程序,往往是慢一点、规矩一点、日志清楚一点、失败能恢复一点。这样的代码看起来没有炫技,但跑久了会让人很安心。

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐