arXiv 分类新增元数据抓取:用 OAI-PMH 做一套可断点续跑的 Python 增量采集器
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
0️⃣ 前言(Preface)
这篇文章要做的事情很明确:按 arXiv 分类抓取新增或更新的论文元数据,使用 Python、requests、lxml 和 SQLite,最终产出一份可查询、可导出的本地论文元数据库。
读完之后,你可以获得三样东西:
第一,弄清楚为什么“分类新增元数据抓取”不应该继续套关键词搜索分页那一套。
第二,写出一个能真实运行的 arXiv OAI-PMH 增量采集器,字段包括标题、作者、分类、摘要、提交日期和 PDF 链接。
第三,把请求层、解析层、存储层、断点续跑、去重和导出串成一个小型工程,而不是一段临时脚本。
我个人比较喜欢这种题目,因为它不像“随便抓一个网页列表”那样只靠选择器吃饭。arXiv 本身提供了公开接口,真正的重点不是“能不能抓到”,而是如何把采集过程做得稳定、克制、可恢复、可维护。
1️⃣ 摘要(Abstract)
本文实现一个面向 arXiv 分类的增量元数据采集器,基于 arXiv OAI-PMH 公开接口,通过分类 set、datestamp 游标和 resumptionToken 分页机制,采集论文标题、作者、分类、摘要、提交日期与 PDF 链接,并将结果写入 SQLite 后导出 CSV。
读完本文,你将掌握:
- 如何用公开接口采集 arXiv 分类元数据,而不是用网页爬虫硬扒 HTML。
- 如何设计增量采集游标、去重策略和失败重试逻辑。
- 如何把采集、解析、清洗、存储做成一个结构清晰的小项目。
本文不会做 PDF 批量下载,也不会绕过登录、付费或访问限制。我们只处理公开的描述性元数据,重点放在工程实现和数据管道设计上。
2️⃣ 背景与需求(Why)
arXiv 是科研论文预印本领域非常常见的数据源。很多人第一次接触 arXiv 抓取,会直接从网页搜索结果页开始,比如搜索关键词 “large language model”,然后一页一页翻,把标题、作者和摘要提取出来。
这种做法能跑,但它不是最适合“分类新增元数据采集”的方式。
原因很简单:关键词搜索适合临时检索,分类增量适合持续同步。
假设我们想每天追踪 cs.AI、cs.LG、stat.ML 这类分类下新增或更新的论文。如果继续用关键词分页,会遇到几个问题:
第一,关键词会引入语义偏差。你搜 “LLM” 和搜 “large language model” 得到的结果不一定等价,而且论文标题或摘要里不一定写了你关心的词。
第二,分页稳定性不够理想。大结果集翻页时,排序、更新、重复、漏抓都要额外处理。
第三,它不符合增量同步的思路。我们真正想要的是“从上次同步的位置继续拉取变化”,而不是每次重跑整个搜索。
因此,这篇文章换一个角度:不抓搜索页,不用关键词分页,而是使用 arXiv 的 OAI-PMH 元数据接口,按分类 set 做增量采集。
本次目标字段如下:
| 字段 | 说明 |
|---|---|
| 标题 | 论文标题 |
| 作者 | 作者列表,保留顺序 |
| 分类 | arXiv 分类,例如 cs.AI、cs.LG |
| 摘要 | 论文摘要文本 |
| 提交日期 | arXiv 元数据里的 created 日期 |
| PDF 链接 | 根据 arXiv ID 拼接 PDF URL |
| arXiv ID | 用于唯一标识论文 |
| OAI Identifier | OAI-PMH 记录标识 |
| metadata_datestamp | OAI-PMH 记录最后修改日期,用作增量游标 |
| 抓取时间 | 本地采集时间 |
虽然题目里只要求标题、作者、分类、摘要、提交日期和 PDF 链接,但实际工程中我建议额外存储 arxiv_id、oai_identifier 和 metadata_datestamp。这三个字段不一定展示给最终用户,却是去重和断点续跑的关键。
3️⃣ 合规与注意事项(必写)
写爬虫时,我一直觉得有一条底线:能用公开接口就不要硬抓页面,能增量就不要全量,能慢一点就不要并发轰炸。
3.1 robots.txt 的基本说明
robots.txt 是网站给自动化访问程序看的访问规则说明。它通常告诉爬虫哪些路径可以访问,哪些路径不建议访问。它不是登录权限,也不是法律意见书,但作为技术人员,尊重 robots.txt 是基本礼貌。
对 arXiv 这类公共学术服务来说,尤其不应该做无差别下载。arXiv 上不仅有元数据,还有大量 PDF、源码包、历史版本文件。如果脚本不加控制地从列表页开始递归抓取,很容易对服务造成压力。
本文不做 HTML 列表页爬取,也不批量下载 PDF,只采集公开元数据,并且使用官方提供的机器访问接口。
3.2 频率控制
本项目默认设置为:
REQUEST_INTERVAL = 3.2
TIMEOUT = 30
MAX_RETRIES = 5
也就是说,每次请求之间至少间隔 3 秒以上。这个速度不快,但足够稳定。采集论文元数据不是抢票,没有必要用攻击式并发。
我建议新手不要一上来就写 asyncio,也不要把线程池开到几十上百。很多时候,爬虫失败不是因为速度慢,而是因为太急,导致被限流、被拒绝,最后还要花更多时间排错。
3.3 不采集敏感信息
本文只采集论文公开元数据,包括标题、作者、摘要、分类、提交日期和链接。
不采集用户账号信息,不尝试访问需要登录的页面,不绕过付费限制,不下载受限制资源,不伪装成正常用户进行非必要访问。
3.4 不批量下载 PDF
字段里包含 PDF 链接,并不代表要下载 PDF。本文只是把 PDF URL 存起来,方便后续人工访问或在合规场景下跳转查看。
元数据同步和全文下载是两件不同的事。很多爬虫项目出问题,就是因为一开始只想抓点标题,后来顺手把 PDF、源码包也加进来,最后流量规模完全失控。
4️⃣ 技术选型与整体流程(What/How)
4.1 静态网页、动态网页和 API 的区别
常见采集方式大致分三类:
第一类是静态网页采集。页面 HTML 中已经包含目标数据,用 requests 拿 HTML,再用 BeautifulSoup、lxml、XPath 或 CSS 选择器解析。
第二类是动态网页采集。数据由 JavaScript 后加载,直接请求页面只能拿到空壳。这时要么分析接口,要么用 Playwright、Selenium 这类浏览器自动化工具。
第三类是公开 API 或数据源。服务方明确提供机器可读的 XML、JSON、CSV 或其他格式,适合程序化采集。
这篇属于第三类:公开接口采集。
更准确地说,我们选的是 arXiv 的 OAI-PMH 元数据接口。OAI-PMH 是面向开放档案元数据同步的协议,天然包含增量采集、记录标识、时间戳和分页 token 的设计,非常适合这篇文章的需求。
4.2 为什么不用关键词分页
arXiv 普通 API 支持 search_query、start、max_results、sortBy、sortOrder。如果只是临时搜几十篇论文,普通 API 很方便。
但本篇主题是“分类新增元数据抓取”。我们的目标不是搜索某个关键词,而是持续同步某个分类下的新增或更新记录。因此,OAI-PMH 更合适。
本项目采用:
OAI-PMH ListRecords
metadataPrefix=arXiv
set=cs:cs:AI
from=上次采集到的 datestamp
其中:
cs:cs:AI
可以理解为 arXiv OAI-PMH 的分类 set 表示方式。实际项目里,你也可以换成其他分类对应的 set。
4.3 整体流程
本文项目流程如下:
采集 Fetch
↓
解析 Parse
↓
清洗 Normalize
↓
存储 Store
↓
导出 Export
展开一点看:
读取配置
↓
读取本地游标 last_datestamp
↓
请求 OAI-PMH ListRecords
↓
解析 XML records
↓
提取 arXiv ID、标题、作者、分类、摘要、提交日期
↓
拼接 PDF 链接
↓
写入 SQLite,按 arxiv_id 去重
↓
更新游标
↓
导出 CSV
4.4 为什么选 requests + lxml + SQLite
这套组合很朴素,但非常适合本文。
requests 负责 HTTP 请求。它稳定、直观,处理 headers、timeout、重试都方便。
lxml 负责 XML 解析。OAI-PMH 返回的是 XML,用 lxml 的 XPath 处理命名空间会更舒服。
SQLite 负责本地存储。它不需要额外部署数据库服务,一个文件就是一个数据库,适合教程、实验、小型同步任务。
CSV 负责结果导出。它方便打开、方便分享,也方便后续导入 pandas、Excel 或 BI 工具。
为什么不用 Playwright?因为这里没有动态页面,不需要浏览器渲染。
为什么不用 Scrapy?Scrapy 很强,但本文任务并不需要复杂调度器和分布式能力。等数据规模更大、分类更多、任务更复杂时,再迁移 Scrapy 也不迟。
5️⃣ 环境准备与依赖安装(可复现)
5.1 Python 版本
建议使用:
Python 3.10+
我个人推荐 3.11 或 3.12。本文代码没有用太新的语法,3.10 以上都比较稳。
查看版本:
python --version
5.2 创建项目目录
推荐目录如下:
arxiv-category-harvester/
├── requirements.txt
├── README.md
├── data/
│ ├── .gitkeep
├── arxiv_harvester/
│ ├── __init__.py
│ ├── config.py
│ ├── fetcher.py
│ ├── parser.py
│ ├── storage.py
│ └── main.py
└── scripts/
└── run_demo.sh
先创建目录:
mkdir -p arxiv-category-harvester/arxiv_harvester
mkdir -p arxiv-category-harvester/data
mkdir -p arxiv-category-harvester/scripts
cd arxiv-category-harvester
touch arxiv_harvester/__init__.py
touch data/.gitkeep
5.3 requirements.txt
新建 requirements.txt:
requests==2.32.3
lxml==5.3.0
python-dateutil==2.9.0.post0
安装依赖:
pip install -r requirements.txt
如果你习惯使用虚拟环境:
python -m venv .venv
# macOS / Linux
source .venv/bin/activate
# Windows PowerShell
# .venv\Scripts\Activate.ps1
pip install -r requirements.txt
6️⃣ 核心实现:请求层(Fetcher)
请求层要解决四件事:
- 带上合适的 headers。
- 设置 timeout,避免请求永久卡住。
- 控制请求频率。
- 失败时重试,并带退避等待。
本项目不需要登录,也不需要 cookie。session 的作用主要是连接复用和统一 headers。
6.1 配置文件 config.py
新建 arxiv_harvester/config.py:
from __future__ import annotations
from dataclasses import dataclass
from pathlib import Path
@dataclass(frozen=True)
class Settings:
"""
项目全局配置。
注意:
1. set_spec 可以按需要替换,例如 cs:cs:LG、cs:cs:AI。
2. User-Agent 建议写清楚项目名称和联系邮箱。
3. request_interval 不建议低于 3 秒。
"""
base_url: str = "https://oaipmh.arxiv.org/oai"
metadata_prefix: str = "arXiv"
set_spec: str = "cs:cs:AI"
user_agent: str = (
"ArxivCategoryHarvester/0.1 "
"(metadata research demo; mailto:your_email@example.com)"
)
referer: str = "https://arxiv.org/"
timeout: int = 30
max_retries: int = 5
request_interval: float = 3.2
db_path: Path = Path("data/arxiv_metadata.sqlite")
csv_path: Path = Path("data/arxiv_metadata.csv")
state_path: Path = Path("data/harvest_state.json")
DEFAULT_SETTINGS = Settings()
这里有几个细节值得说一下。
User-Agent 不要留空,也不要写成浏览器 UA 去假装普通用户。更推荐写清楚你的项目名和联系邮箱。
Referer 对 OAI-PMH 来说不是必须的,但作为请求头放上也没问题。
Cookie 不需要。凡是不需要登录的公开接口,都不应该额外引入 cookie 复杂度。
6.2 请求层 fetcher.py
新建 arxiv_harvester/fetcher.py:
from __future__ import annotations
import logging
import random
import time
from typing import Dict, Iterator, Optional
import requests
from .config import Settings
logger = logging.getLogger(__name__)
class FetchError(RuntimeError):
"""请求层统一异常。"""
class OAIHarvester:
"""
arXiv OAI-PMH 请求器。
负责:
1. 控制请求频率;
2. 自动重试;
3. 处理 resumptionToken;
4. 返回每一页 XML bytes。
"""
def __init__(self, settings: Settings):
self.settings = settings
self.session = requests.Session()
self.session.headers.update(
{
"User-Agent": settings.user_agent,
"Accept": "application/xml,text/xml;q=0.9,*/*;q=0.8",
"Referer": settings.referer,
}
)
self._last_request_at = 0.0
def _respect_rate_limit(self) -> None:
"""
简单的单进程限速器。
如果距离上一次请求不足 request_interval,就主动 sleep。
这里没有做多进程共享锁。如果你后续改成多进程或分布式,
需要把限速状态放到 Redis、数据库或任务队列层。
"""
now = time.monotonic()
elapsed = now - self._last_request_at
wait_seconds = self.settings.request_interval - elapsed
if wait_seconds > 0:
logger.debug("Rate limit sleep %.2f seconds", wait_seconds)
time.sleep(wait_seconds)
def _request(self, params: Dict[str, str]) -> bytes:
"""
发起一次 GET 请求,带重试和指数退避。
OAI-PMH 常见参数:
- verb=ListRecords
- metadataPrefix=arXiv
- set=cs:cs:AI
- from=YYYY-MM-DD
- resumptionToken=...
"""
last_error: Optional[BaseException] = None
for attempt in range(1, self.settings.max_retries + 1):
self._respect_rate_limit()
try:
logger.info("Request attempt=%s params=%s", attempt, params)
response = self.session.get(
self.settings.base_url,
params=params,
timeout=self.settings.timeout,
)
self._last_request_at = time.monotonic()
if response.status_code == 200:
return response.content
if response.status_code in {429, 500, 502, 503, 504}:
retry_after = response.headers.get("Retry-After")
if retry_after and retry_after.isdigit():
sleep_seconds = int(retry_after)
else:
sleep_seconds = min(60, 2**attempt) + random.uniform(0, 1.5)
logger.warning(
"Retryable HTTP status=%s, sleep %.2f seconds",
response.status_code,
sleep_seconds,
)
time.sleep(sleep_seconds)
continue
raise FetchError(
f"Unexpected HTTP status {response.status_code}: "
f"{response.text[:500]}"
)
except requests.RequestException as exc:
last_error = exc
sleep_seconds = min(60, 2**attempt) + random.uniform(0, 1.5)
logger.warning(
"Request exception attempt=%s error=%r, sleep %.2f seconds",
attempt,
exc,
sleep_seconds,
)
time.sleep(sleep_seconds)
raise FetchError(f"Request failed after retries: {last_error!r}")
def list_records(
self,
set_spec: str,
metadata_prefix: str,
from_date: Optional[str] = None,
limit_pages: Optional[int] = None,
) -> Iterator[bytes]:
"""
分页拉取 OAI-PMH ListRecords。
首次请求使用 set/from/metadataPrefix。
后续如果返回 resumptionToken,请求时只带:
- verb=ListRecords
- resumptionToken=xxx
limit_pages 用于本地测试,避免第一次运行拉太多。
"""
page = 0
token: Optional[str] = None
while True:
page += 1
if token:
params = {
"verb": "ListRecords",
"resumptionToken": token,
}
else:
params = {
"verb": "ListRecords",
"metadataPrefix": metadata_prefix,
"set": set_spec,
}
if from_date:
params["from"] = from_date
xml_bytes = self._request(params)
yield xml_bytes
# token 的解析放在 parser 里更合适,但请求层需要知道是否继续。
# 为了避免循环依赖,这里做一个轻量字符串判断不解析业务字段。
from .parser import extract_resumption_token
token = extract_resumption_token(xml_bytes)
if not token:
logger.info("No resumptionToken, harvest finished.")
break
if limit_pages is not None and page >= limit_pages:
logger.info("Reach limit_pages=%s, stop early.", limit_pages)
break
这里有个我比较坚持的小习惯:请求层不直接吞异常。很多教程为了让代码“看起来能跑”,会写一堆空的 except Exception: pass。短期看脚本不报错,长期看问题都被藏起来了。
真实项目里,失败要么重试,要么记录,要么抛出去让主流程处理。不要悄悄失败。
7️⃣ 核心实现:解析层(Parser)
arXiv OAI-PMH 返回的是 XML。解析层的任务是把 XML record 转成 Python 对象。
这里不使用 BeautifulSoup,因为 BeautifulSoup 更适合宽松 HTML。对于结构化 XML,我更喜欢 lxml。
7.1 字段模型
新建 arxiv_harvester/parser.py:
from __future__ import annotations
import hashlib
import json
from dataclasses import asdict, dataclass
from datetime import datetime, timezone
from typing import List, Optional, Tuple
from lxml import etree
OAI_NS = "http://www.openarchives.org/OAI/2.0/"
ARXIV_NS = "http://arxiv.org/OAI/arXiv/"
NS = {
"oai": OAI_NS,
"ax": ARXIV_NS,
}
@dataclass
class PaperRecord:
"""
论文元数据记录。
authors 和 categories 用 list 保存;
入库时再转 JSON 字符串,导出 CSV 时也可以转成分隔字符串。
"""
arxiv_id: str
oai_identifier: str
title: str
authors: List[str]
categories: List[str]
primary_category: str
abstract: str
submitted_date: str
updated_date: str
metadata_datestamp: str
pdf_url: str
set_specs: List[str]
content_hash: str
fetched_at: str
def to_storage_dict(self) -> dict:
data = asdict(self)
data["authors"] = json.dumps(self.authors, ensure_ascii=False)
data["categories"] = json.dumps(self.categories, ensure_ascii=False)
data["set_specs"] = json.dumps(self.set_specs, ensure_ascii=False)
return data
def normalize_space(value: Optional[str]) -> str:
"""
清洗多余空白。
arXiv 摘要和标题里经常有换行、多个空格。
为了导出 CSV 后更规整,这里统一压缩空白。
"""
if not value:
return ""
return " ".join(value.split())
def text_of(node: etree._Element, xpath: str) -> str:
result = node.xpath(xpath, namespaces=NS)
if not result:
return ""
first = result[0]
if isinstance(first, etree._Element):
return normalize_space(first.text)
return normalize_space(str(first))
def texts_of(node: etree._Element, xpath: str) -> List[str]:
values = node.xpath(xpath, namespaces=NS)
cleaned: List[str] = []
for value in values:
if isinstance(value, etree._Element):
text = normalize_space(value.text)
else:
text = normalize_space(str(value))
if text:
cleaned.append(text)
return cleaned
def build_pdf_url(arxiv_id: str) -> str:
"""
根据 arXiv ID 构造 PDF 链接。
注意:本文只保存链接,不批量下载 PDF。
"""
return f"https://arxiv.org/pdf/{arxiv_id}"
def make_content_hash(
title: str,
authors: List[str],
categories: List[str],
abstract: str,
submitted_date: str,
updated_date: str,
pdf_url: str,
) -> str:
payload = {
"title": title,
"authors": authors,
"categories": categories,
"abstract": abstract,
"submitted_date": submitted_date,
"updated_date": updated_date,
"pdf_url": pdf_url,
}
raw = json.dumps(payload, ensure_ascii=False, sort_keys=True)
return hashlib.sha256(raw.encode("utf-8")).hexdigest()
def parse_author(author_node: etree._Element) -> str:
"""
解析作者姓名。
arXiv OAI 的作者结构一般包含:
- keyname
- forenames
- suffix
也可能出现 name 字段。
这里做容错处理:
1. 优先读取 name;
2. 否则拼 forenames + keyname + suffix;
3. 仍然为空则返回空字符串,由上层过滤。
"""
name = text_of(author_node, "string(ax:name)")
if name:
return name
forenames = text_of(author_node, "string(ax:forenames)")
keyname = text_of(author_node, "string(ax:keyname)")
suffix = text_of(author_node, "string(ax:suffix)")
parts = [part for part in [forenames, keyname, suffix] if part]
return normalize_space(" ".join(parts))
def extract_resumption_token(xml_bytes: bytes) -> Optional[str]:
"""
从 OAI-PMH 响应中提取 resumptionToken。
没有 token 表示当前结果已经结束。
"""
parser = etree.XMLParser(
resolve_entities=False,
no_network=True,
recover=False,
)
root = etree.fromstring(xml_bytes, parser=parser)
token = root.xpath("string(.//oai:resumptionToken)", namespaces=NS)
token = normalize_space(token)
return token or None
def parse_oai_errors(root: etree._Element) -> List[str]:
errors = []
for error_node in root.xpath(".//oai:error", namespaces=NS):
code = error_node.get("code", "")
message = normalize_space(error_node.text)
errors.append(f"{code}: {message}")
return errors
def parse_records(xml_bytes: bytes) -> Tuple[List[PaperRecord], Optional[str], List[str]]:
"""
解析一页 OAI-PMH XML。
返回:
- records: 论文记录列表
- token: 下一页 resumptionToken
- errors: OAI-PMH 层面的错误信息
"""
parser = etree.XMLParser(
resolve_entities=False,
no_network=True,
recover=False,
)
root = etree.fromstring(xml_bytes, parser=parser)
errors = parse_oai_errors(root)
token = extract_resumption_token(xml_bytes)
if errors:
return [], token, errors
now = datetime.now(timezone.utc).isoformat(timespec="seconds")
records: List[PaperRecord] = []
record_nodes = root.xpath(".//oai:ListRecords/oai:record", namespaces=NS)
for record_node in record_nodes:
header = record_node.find("oai:header", namespaces=NS)
if header is None:
continue
# OAI-PMH 可能返回删除记录。本文采集论文元数据,遇到 deleted 先跳过。
if header.get("status") == "deleted":
continue
oai_identifier = text_of(header, "string(oai:identifier)")
metadata_datestamp = text_of(header, "string(oai:datestamp)")
set_specs = texts_of(header, "oai:setSpec/text()")
meta_node = record_node.find(".//ax:arXiv", namespaces=NS)
if meta_node is None:
# 缺失 metadata 时跳过,但不让整个程序崩掉。
continue
arxiv_id = text_of(meta_node, "string(ax:id)")
title = text_of(meta_node, "string(ax:title)")
abstract = text_of(meta_node, "string(ax:abstract)")
submitted_date = text_of(meta_node, "string(ax:created)")
updated_date = text_of(meta_node, "string(ax:updated)")
categories_raw = text_of(meta_node, "string(ax:categories)")
categories = [item for item in categories_raw.split() if item]
primary_category = categories[0] if categories else ""
authors = []
for author_node in meta_node.xpath(".//ax:authors/ax:author", namespaces=NS):
author_name = parse_author(author_node)
if author_name:
authors.append(author_name)
pdf_url = build_pdf_url(arxiv_id) if arxiv_id else ""
# 最低限度字段校验。标题、ID、摘要缺失时仍可保留,但 ID 缺失无法去重。
if not arxiv_id:
continue
content_hash = make_content_hash(
title=title,
authors=authors,
categories=categories,
abstract=abstract,
submitted_date=submitted_date,
updated_date=updated_date,
pdf_url=pdf_url,
)
records.append(
PaperRecord(
arxiv_id=arxiv_id,
oai_identifier=oai_identifier,
title=title,
authors=authors,
categories=categories,
primary_category=primary_category,
abstract=abstract,
submitted_date=submitted_date,
updated_date=updated_date,
metadata_datestamp=metadata_datestamp,
pdf_url=pdf_url,
set_specs=set_specs,
content_hash=content_hash,
fetched_at=now,
)
)
return records, token, errors
7.2 解析方式说明
本文用 XPath 解析 XML,不抓 HTML。
核心路径包括:
//oai:ListRecords/oai:record
record/oai:header/oai:identifier
record/oai:header/oai:datestamp
record/oai:header/oai:setSpec
record/oai:metadata/ax:arXiv/ax:id
record/oai:metadata/ax:arXiv/ax:title
record/oai:metadata/ax:arXiv/ax:authors/ax:author
record/oai:metadata/ax:arXiv/ax:categories
record/oai:metadata/ax:arXiv/ax:abstract
record/oai:metadata/ax:arXiv/ax:created
这和网页解析很不一样。网页解析经常会遇到 class 名变化、布局变化、广告插入、懒加载、AB 测试等问题。OAI-PMH XML 的稳定性通常更好,也更适合长期同步。
7.3 列表页如何拿详情链接
严格来说,本文没有“列表页”。
OAI-PMH 的 ListRecords 返回的每个 record 本身就包含元数据。我们不需要进入详情页再抽字段。
如果后续想给每篇论文补一个摘要页链接,可以根据 arXiv ID 拼接:
abs_url = f"https://arxiv.org/abs/{arxiv_id}"
PDF 链接同理:
pdf_url = f"https://arxiv.org/pdf/{arxiv_id}"
本文只保存 PDF 链接,不下载 PDF 文件。
7.4 缺失字段怎么办
缺失字段分三种情况处理:
第一,关键字段缺失,例如 arXiv ID 为空。无法去重,直接跳过。
第二,非关键字段缺失,例如 updated 为空。保留空字符串,不影响入库。
第三,结构不完整,例如 metadata 节点缺失。跳过该 record,并在日志里保留统计信息。
我不建议在解析层随便猜字段。比如作者缺失时,不要自己从标题里猜;分类缺失时,也不要根据 setSpec 自动补一个分类。采集器的职责是忠实搬运和规范化公开元数据,不是创造数据。
8️⃣ 数据存储与导出(Storage)
本文使用 SQLite 起步,同时导出 CSV。
SQLite 的好处是轻量、稳定、可迁移。你可以直接用 DB Browser for SQLite 打开,也可以后续迁移到 MySQL、PostgreSQL 或 Elasticsearch。
8.1 字段映射表
| 数据库字段 | 类型 | 示例值 | 说明 |
|---|---|---|---|
| arxiv_id | TEXT | 2401.01234 | arXiv ID,唯一 |
| oai_identifier | TEXT | oai:arXiv.org:2401.01234 | OAI-PMH 标识 |
| title | TEXT | A Survey of … | 标题 |
| authors | TEXT | [“Alice Smith”, “Bob Lee”] | JSON 数组字符串 |
| categories | TEXT | [“cs.AI”, “cs.LG”] | JSON 数组字符串 |
| primary_category | TEXT | cs.AI | 主分类 |
| abstract | TEXT | This paper … | 摘要 |
| submitted_date | TEXT | 2024-01-03 | 提交日期 |
| updated_date | TEXT | 2024-02-01 | 更新日期 |
| metadata_datestamp | TEXT | 2024-02-01 | OAI-PMH 元数据时间戳 |
| pdf_url | TEXT | https://arxiv.org/pdf/2401.01234 | PDF 链接 |
| set_specs | TEXT | [“cs:cs:AI”] | OAI-PMH set |
| content_hash | TEXT | sha256… | 内容哈希 |
| fetched_at | TEXT | 2026-06-09T… | 本地抓取时间 |
8.2 去重策略
本文用两层去重。
第一层,数据库唯一键:
arxiv_id UNIQUE
同一篇论文重复出现时,执行 upsert。
第二层,内容 hash:
content_hash
如果同一篇论文标题、摘要、分类、作者等内容变化了,就更新记录;如果内容没变,就不做无意义更新。
这比单纯 URL 去重更可靠。因为本文不是网页爬虫,URL 不是核心对象,arXiv ID 才是核心对象。
8.3 storage.py
新建 arxiv_harvester/storage.py:
from __future__ import annotations
import csv
import json
import logging
import sqlite3
from pathlib import Path
from typing import Iterable, Optional
from .parser import PaperRecord
logger = logging.getLogger(__name__)
class SQLiteStorage:
"""
SQLite 存储层。
负责:
1. 建表;
2. upsert 论文记录;
3. 保存采集状态;
4. 导出 CSV。
"""
def __init__(self, db_path: Path):
self.db_path = db_path
self.db_path.parent.mkdir(parents=True, exist_ok=True)
self.conn = sqlite3.connect(str(self.db_path))
self.conn.row_factory = sqlite3.Row
def close(self) -> None:
self.conn.close()
def init_db(self) -> None:
self.conn.executescript(
"""
CREATE TABLE IF NOT EXISTS papers (
arxiv_id TEXT PRIMARY KEY,
oai_identifier TEXT NOT NULL,
title TEXT NOT NULL,
authors TEXT NOT NULL,
categories TEXT NOT NULL,
primary_category TEXT,
abstract TEXT,
submitted_date TEXT,
updated_date TEXT,
metadata_datestamp TEXT NOT NULL,
pdf_url TEXT NOT NULL,
set_specs TEXT NOT NULL,
content_hash TEXT NOT NULL,
fetched_at TEXT NOT NULL
);
CREATE UNIQUE INDEX IF NOT EXISTS idx_papers_oai_identifier
ON papers(oai_identifier);
CREATE INDEX IF NOT EXISTS idx_papers_primary_category
ON papers(primary_category);
CREATE INDEX IF NOT EXISTS idx_papers_submitted_date
ON papers(submitted_date);
CREATE INDEX IF NOT EXISTS idx_papers_metadata_datestamp
ON papers(metadata_datestamp);
CREATE TABLE IF NOT EXISTS harvest_state (
set_spec TEXT PRIMARY KEY,
last_datestamp TEXT,
last_run_at TEXT,
total_records INTEGER NOT NULL DEFAULT 0
);
"""
)
self.conn.commit()
def upsert_many(self, records: Iterable[PaperRecord]) -> int:
"""
批量 upsert。
SQLite 的 ON CONFLICT 可以让我们用 arxiv_id 做主键去重。
WHERE 子句避免内容 hash 没变时重复更新。
"""
rows = [record.to_storage_dict() for record in records]
if not rows:
return 0
sql = """
INSERT INTO papers (
arxiv_id,
oai_identifier,
title,
authors,
categories,
primary_category,
abstract,
submitted_date,
updated_date,
metadata_datestamp,
pdf_url,
set_specs,
content_hash,
fetched_at
)
VALUES (
:arxiv_id,
:oai_identifier,
:title,
:authors,
:categories,
:primary_category,
:abstract,
:submitted_date,
:updated_date,
:metadata_datestamp,
:pdf_url,
:set_specs,
:content_hash,
:fetched_at
)
ON CONFLICT(arxiv_id) DO UPDATE SET
oai_identifier = excluded.oai_identifier,
title = excluded.title,
authors = excluded.authors,
categories = excluded.categories,
primary_category = excluded.primary_category,
abstract = excluded.abstract,
submitted_date = excluded.submitted_date,
updated_date = excluded.updated_date,
metadata_datestamp = excluded.metadata_datestamp,
pdf_url = excluded.pdf_url,
set_specs = excluded.set_specs,
content_hash = excluded.content_hash,
fetched_at = excluded.fetched_at
WHERE papers.content_hash != excluded.content_hash
OR papers.metadata_datestamp != excluded.metadata_datestamp;
"""
with self.conn:
self.conn.executemany(sql, rows)
return len(rows)
def get_last_datestamp(self, set_spec: str) -> Optional[str]:
row = self.conn.execute(
"SELECT last_datestamp FROM harvest_state WHERE set_spec = ?",
(set_spec,),
).fetchone()
if not row:
return None
return row["last_datestamp"]
def update_state(
self,
set_spec: str,
last_datestamp: Optional[str],
last_run_at: str,
total_records: int,
) -> None:
"""
保存采集游标。
注意:
last_datestamp 不要主动加一天。
OAI-PMH 的 from 是闭区间语义时,下一轮重复抓到同一天数据并不可怕;
upsert 会去重。贸然把游标加一天,反而可能漏掉同一 datestamp 的记录。
"""
if not last_datestamp:
return
with self.conn:
self.conn.execute(
"""
INSERT INTO harvest_state (
set_spec,
last_datestamp,
last_run_at,
total_records
)
VALUES (?, ?, ?, ?)
ON CONFLICT(set_spec) DO UPDATE SET
last_datestamp = excluded.last_datestamp,
last_run_at = excluded.last_run_at,
total_records = harvest_state.total_records + excluded.total_records;
""",
(set_spec, last_datestamp, last_run_at, total_records),
)
def count_papers(self) -> int:
row = self.conn.execute("SELECT COUNT(*) AS c FROM papers").fetchone()
return int(row["c"])
def export_csv(self, csv_path: Path) -> None:
csv_path.parent.mkdir(parents=True, exist_ok=True)
rows = self.conn.execute(
"""
SELECT
arxiv_id,
title,
authors,
categories,
primary_category,
abstract,
submitted_date,
updated_date,
metadata_datestamp,
pdf_url,
fetched_at
FROM papers
ORDER BY metadata_datestamp DESC, arxiv_id DESC
"""
).fetchall()
with csv_path.open("w", encoding="utf-8-sig", newline="") as f:
writer = csv.writer(f)
writer.writerow(
[
"arxiv_id",
"title",
"authors",
"categories",
"primary_category",
"abstract",
"submitted_date",
"updated_date",
"metadata_datestamp",
"pdf_url",
"fetched_at",
]
)
for row in rows:
authors = " | ".join(json.loads(row["authors"]))
categories = " | ".join(json.loads(row["categories"]))
writer.writerow(
[
row["arxiv_id"],
row["title"],
authors,
categories,
row["primary_category"],
row["abstract"],
row["submitted_date"],
row["updated_date"],
row["metadata_datestamp"],
row["pdf_url"],
row["fetched_at"],
]
)
logger.info("Export CSV: %s rows=%s", csv_path, len(rows))
这里 CSV 使用 utf-8-sig,是为了在一些表格软件里打开中文或特殊字符时更省心。严格来说,UTF-8 就够了,但做数据交付时,少踩一点编码坑也挺好。
9️⃣ 运行方式与结果展示(必写)
9.1 main.py
新建 arxiv_harvester/main.py:
from __future__ import annotations
import argparse
import logging
from datetime import datetime, timezone
from typing import Optional
from .config import DEFAULT_SETTINGS, Settings
from .fetcher import OAIHarvester
from .parser import parse_records
from .storage import SQLiteStorage
def setup_logging(verbose: bool = False) -> None:
level = logging.DEBUG if verbose else logging.INFO
logging.basicConfig(
level=level,
format="%(asctime)s | %(levelname)-8s | %(name)s | %(message)s",
)
def max_datestamp(current: Optional[str], records_datestamps: list[str]) -> Optional[str]:
"""
计算本页或本轮采集中的最大 datestamp。
datestamp 通常是 YYYY-MM-DD 格式,字符串比较可以工作。
如果后续接口返回更细粒度时间戳,也建议统一解析后比较。
"""
values = [value for value in records_datestamps if value]
if current:
values.append(current)
if not values:
return current
return max(values)
def build_settings(args: argparse.Namespace) -> Settings:
"""
根据命令行参数覆盖默认配置。
dataclass frozen=True,所以这里重新构造一个 Settings。
"""
return Settings(
base_url=DEFAULT_SETTINGS.base_url,
metadata_prefix=args.metadata_prefix,
set_spec=args.set,
user_agent=args.user_agent or DEFAULT_SETTINGS.user_agent,
referer=DEFAULT_SETTINGS.referer,
timeout=args.timeout,
max_retries=args.max_retries,
request_interval=args.request_interval,
db_path=args.db,
csv_path=args.csv,
state_path=DEFAULT_SETTINGS.state_path,
)
def parse_args() -> argparse.Namespace:
parser = argparse.ArgumentParser(
description="Harvest arXiv category metadata incrementally via OAI-PMH."
)
parser.add_argument(
"--set",
default=DEFAULT_SETTINGS.set_spec,
help="OAI-PMH set spec, e.g. cs:cs:AI, cs:cs:LG",
)
parser.add_argument(
"--metadata-prefix",
default=DEFAULT_SETTINGS.metadata_prefix,
help="OAI-PMH metadataPrefix, default: arXiv",
)
parser.add_argument(
"--from-date",
default=None,
help="Start datestamp, format YYYY-MM-DD. If omitted, use stored state.",
)
parser.add_argument(
"--limit-pages",
type=int,
default=None,
help="Limit pages for demo/testing. Omit for full incremental harvest.",
)
parser.add_argument(
"--db",
type=lambda p: DEFAULT_SETTINGS.db_path.__class__(p),
default=DEFAULT_SETTINGS.db_path,
help="SQLite database path.",
)
parser.add_argument(
"--csv",
type=lambda p: DEFAULT_SETTINGS.csv_path.__class__(p),
default=DEFAULT_SETTINGS.csv_path,
help="CSV export path.",
)
parser.add_argument(
"--timeout",
type=int,
default=DEFAULT_SETTINGS.timeout,
help="HTTP timeout seconds.",
)
parser.add_argument(
"--max-retries",
type=int,
default=DEFAULT_SETTINGS.max_retries,
help="Max retries for retryable errors.",
)
parser.add_argument(
"--request-interval",
type=float,
default=DEFAULT_SETTINGS.request_interval,
help="Minimum seconds between requests.",
)
parser.add_argument(
"--user-agent",
default=None,
help="Custom User-Agent. Recommend including contact email.",
)
parser.add_argument(
"--verbose",
action="store_true",
help="Enable debug logging.",
)
return parser.parse_args()
def main() -> None:
args = parse_args()
setup_logging(args.verbose)
logger = logging.getLogger("arxiv_harvester.main")
settings = build_settings(args)
storage = SQLiteStorage(settings.db_path)
storage.init_db()
try:
from_date = args.from_date or storage.get_last_datestamp(settings.set_spec)
logger.info("Start harvest set=%s from=%s", settings.set_spec, from_date)
harvester = OAIHarvester(settings)
total_seen = 0
total_saved = 0
latest_datestamp: Optional[str] = from_date
for page_index, xml_bytes in enumerate(
harvester.list_records(
set_spec=settings.set_spec,
metadata_prefix=settings.metadata_prefix,
from_date=from_date,
limit_pages=args.limit_pages,
),
start=1,
):
records, token, errors = parse_records(xml_bytes)
if errors:
logger.warning("OAI errors on page=%s errors=%s", page_index, errors)
# 常见 noRecordsMatch 不应该让程序崩掉。
if any("noRecordsMatch" in error for error in errors):
break
raise RuntimeError(f"OAI-PMH errors: {errors}")
saved_count = storage.upsert_many(records)
total_seen += len(records)
total_saved += saved_count
latest_datestamp = max_datestamp(
latest_datestamp,
[record.metadata_datestamp for record in records],
)
logger.info(
"Page=%s records=%s saved=%s token=%s latest_datestamp=%s",
page_index,
len(records),
saved_count,
bool(token),
latest_datestamp,
)
run_at = datetime.now(timezone.utc).isoformat(timespec="seconds")
storage.update_state(
set_spec=settings.set_spec,
last_datestamp=latest_datestamp,
last_run_at=run_at,
total_records=total_seen,
)
storage.export_csv(settings.csv_path)
logger.info(
"Done. seen=%s saved=%s total_in_db=%s db=%s csv=%s",
total_seen,
total_saved,
storage.count_papers(),
settings.db_path,
settings.csv_path,
)
finally:
storage.close()
if __name__ == "__main__":
main()
9.2 修正一个小兼容点
上面 argparse 里对 Path 的写法能跑,但不够直观。为了让代码更清爽,可以把 main.py 顶部补一个导入:
from pathlib import Path
然后将参数改成:
parser.add_argument(
"--db",
type=Path,
default=DEFAULT_SETTINGS.db_path,
help="SQLite database path.",
)
parser.add_argument(
"--csv",
type=Path,
default=DEFAULT_SETTINGS.csv_path,
help="CSV export path.",
)
完整项目里建议使用 Path 这个版本,更容易读。
9.3 scripts/run_demo.sh
新建 scripts/run_demo.sh:
#!/usr/bin/env bash
set -euo pipefail
python -m arxiv_harvester.main \
--set cs:cs:AI \
--from-date 2026-01-01 \
--limit-pages 1 \
--db data/arxiv_metadata.sqlite \
--csv data/arxiv_metadata.csv \
--request-interval 3.2 \
--verbose
赋予执行权限:
chmod +x scripts/run_demo.sh
运行:
./scripts/run_demo.sh
或者直接运行:
python -m arxiv_harvester.main \
--set cs:cs:AI \
--from-date 2026-01-01 \
--limit-pages 1
第一次测试时,建议带 --limit-pages 1。等确认解析、入库、导出都正常,再去掉限制。
9.4 输出在哪里
默认输出:
data/arxiv_metadata.sqlite
data/arxiv_metadata.csv
查看 SQLite:
sqlite3 data/arxiv_metadata.sqlite
进入后执行:
.tables
SELECT
arxiv_id,
title,
primary_category,
submitted_date,
pdf_url
FROM papers
ORDER BY metadata_datestamp DESC
LIMIT 5;
9.5 示例结果
下面是结果格式示例,实际内容以运行时接口返回为准:
| arxiv_id | title | authors | categories | submitted_date | pdf_url |
|---|---|---|---|---|---|
| 2601.00001 | Example Paper Title for AI Systems | Alice Zhang | Bob Chen | cs.AI | cs.LG |
| 2601.00002 | A Practical Study on Machine Reasoning | Carol Smith | cs.AI | 2026-01-01 | https://arxiv.org/pdf/2601.00002 |
| 2601.00003 | Efficient Metadata Pipelines for Research Search | Daniel Lee | Eva Wang | cs.DL | cs.AI |
| 2601.00004 | Robust Retrieval over Scientific Abstracts | Frank Miller | cs.IR | cs.AI | 2026-01-02 |
注意,这里的表格只是展示 CSV 形态,不是固定数据。arXiv 每天新增内容不同,分类返回也会随时间变化。
🔟 常见问题与排错(强烈建议写)
10.1 403 或 429 怎么办
403 一般代表访问被拒绝,429 一般代表请求过多。处理思路如下:
第一,降低频率。把 --request-interval 调大,比如 5 秒、10 秒。
python -m arxiv_harvester.main \
--set cs:cs:AI \
--request-interval 10
第二,确认 User-Agent。不要使用空 UA,也不要伪装成浏览器做大规模访问。
第三,减少任务范围。第一次运行不要全量同步,可以先用 --from-date 和 --limit-pages 做小范围测试。
第四,不要用代理池去绕限制。代理池不是解决合规问题的工具。对于公开学术接口,应该主动控制访问规模。
第五,确认自己没有同时开多个采集进程。本文的限速器是单进程内生效,如果你开了多个终端同时跑,总请求频率会叠加。
10.2 HTML 抓到空壳怎么办
本文不抓 HTML,所以正常情况下不会遇到“页面空壳”的问题。
如果你在其他项目中遇到 HTML 里没有数据,通常有两种原因:
第一,数据由 JavaScript 动态加载。解决方式是分析真实接口,或者用 Playwright 渲染页面。
第二,服务端根据 headers、cookie 或登录状态返回不同内容。解决方式是检查网络面板,确认请求是否需要认证。
但对于本项目,最推荐的路线仍然是:不要从 arXiv 搜索页抓 HTML,优先使用公开接口。
10.3 XML 解析报错怎么办
常见错误包括:
lxml.etree.XMLSyntaxError
排查步骤:
第一,打印响应前 500 个字符:
print(xml_bytes[:500].decode("utf-8", errors="replace"))
看看返回的是不是 XML。有时候你以为拿到了 OAI-PMH,实际拿到的是错误页或网关提示。
第二,检查命名空间。XML 解析最容易踩的坑就是 namespace。不要写:
root.xpath("//record")
应该写:
root.xpath("//oai:record", namespaces=NS)
第三,检查接口错误。OAI-PMH 可能返回:
<error code="noRecordsMatch">...</error>
这不是 XML 解析失败,而是接口明确告诉你没有匹配记录。
10.4 解析出来字段为空怎么办
字段为空一般有三种可能:
第一,XPath 写错。尤其是命名空间错了。
第二,metadataPrefix 不对。本文使用的是 arXiv,不是 oai_dc。如果你换成 Dublin Core,字段结构会完全不同。
第三,记录本身缺字段。采集器要允许非关键字段为空,不要因为一条记录不完整导致整轮任务失败。
10.5 编码或乱码如何处理
OAI-PMH XML 通常是 UTF-8。requests 返回 bytes,lxml 直接解析 bytes 即可,不建议先手动 decode 再 parse。
导出 CSV 时,本文用了:
encoding="utf-8-sig"
这是为了兼容一些表格软件。如果你后续只用 pandas 或数据库处理,普通 UTF-8 也可以。
10.6 为什么本次运行没有新增数据
可能原因包括:
第一,from-date 设置太晚,这个分类在该时间之后没有更新。
第二,OAI-PMH datestamp 是元数据修改日期,不等同于论文原始提交日期。你用 from-date 拉的是“元数据变更”,不是“提交日期筛选”。
第三,分类 set 写错。例如把 cs:cs:AI 写成了 cs.AI。OAI-PMH set 和 arXiv 分类名不是同一个字符串。
第四,本地已经抓过。由于 upsert 去重,重复记录不会造成新增数量上涨。
1️⃣1️⃣ 进阶优化(可选但加分)
11.1 并发优化
本文刻意没有做并发。原因不是不会,而是不适合一上来做。
如果后续你要采集多个分类,例如:
cs:cs:AI
cs:cs:LG
cs:cs:CL
stat:stat:ML
也不要简单地开四个线程同时请求。更稳妥的做法是:
- 分类任务排队。
- 全局限速器统一控制请求。
- 每个分类维护自己的 state。
- 入库层用同一个 arxiv_id 去重。
也就是说,并发可以有,但请求出口必须克制。
一个简单的多分类配置可以这样设计:
CATEGORY_SETS = [
"cs:cs:AI",
"cs:cs:LG",
"cs:cs:CL",
"stat:stat:ML",
]
主流程改成:
for set_spec in CATEGORY_SETS:
harvest_one_set(set_spec)
先串行跑通,比盲目并发更稳。
11.2 断点续跑
本文已经实现了基本断点续跑:
harvest_state.last_datestamp
每次运行时,如果没有传 --from-date,程序会读取本地 state,从上一次 datestamp 继续。
这里有一个细节:不要把游标主动加一天。
例如上次最大 datestamp 是:
2026-01-05
下一次仍然从:
from=2026-01-05
开始。这样可能重复抓到一部分数据,但不会漏掉同一天后续更新的记录。重复由数据库 upsert 处理。
如果你希望更严谨,可以增加一个二级游标:
last_datestamp
last_oai_identifier
当 datestamp 粒度较粗时,用 (datestamp, identifier) 组合判断是否已经处理。但对于本文这个起步项目,重复拉取同一天数据再 upsert,已经足够实用。
11.3 日志与监控
最低限度建议记录:
| 指标 | 说明 |
|---|---|
| request_count | 请求次数 |
| page_count | OAI-PMH 页数 |
| records_seen | 解析到的记录数 |
| records_saved | 入库记录数 |
| error_count | 错误次数 |
| latest_datestamp | 当前最大游标 |
| duration_seconds | 任务耗时 |
可以在 main.py 中增加统计:
import time
started = time.monotonic()
# harvest ...
duration = time.monotonic() - started
logger.info("Duration %.2f seconds", duration)
如果项目部署到服务器,可以把日志写入文件:
logging.basicConfig(
filename="data/harvester.log",
level=logging.INFO,
format="%(asctime)s | %(levelname)s | %(name)s | %(message)s",
)
更进一步,可以接 Prometheus、Grafana 或者简单地每天发一封运行报告邮件。不过我建议先把本地日志做好,不要过早上复杂监控。
11.4 定时任务
Linux/macOS 可以用 cron:
crontab -e
加入:
30 10 * * * cd /path/to/arxiv-category-harvester && /path/to/.venv/bin/python -m arxiv_harvester.main --set cs:cs:AI >> data/cron.log 2>&1
这表示每天 10:30 运行一次。具体时间可以按你的业务需求调整。
如果任务变多,可以考虑 Airflow、Prefect、Dagster 这类调度系统。但对一个分类元数据同步任务来说,cron 完全够用。
11.5 数据分析延伸
有了 SQLite 后,可以很方便地做分析。
例如统计每月论文数量:
SELECT
substr(submitted_date, 1, 7) AS month,
COUNT(*) AS paper_count
FROM papers
WHERE submitted_date IS NOT NULL
AND submitted_date != ''
GROUP BY month
ORDER BY month;
统计热门分类:
SELECT
primary_category,
COUNT(*) AS paper_count
FROM papers
GROUP BY primary_category
ORDER BY paper_count DESC
LIMIT 20;
如果想用 Python 分析:
import sqlite3
import pandas as pd
conn = sqlite3.connect("data/arxiv_metadata.sqlite")
df = pd.read_sql_query(
"""
SELECT arxiv_id, title, primary_category, submitted_date, pdf_url
FROM papers
ORDER BY submitted_date DESC
LIMIT 100
""",
conn,
)
print(df.head())
后续可以做:
- 每日新增论文邮件摘要。
- 按分类生成趋势图。
- 摘要关键词提取。
- 作者合作网络分析。
- 接入向量数据库做语义检索。
1️⃣2️⃣ 总结与延伸阅读
这篇文章完成了一套面向 arXiv 分类的增量元数据采集器。
我们没有从 HTML 搜索页硬抓,也没有沿用关键词分页,而是选择了更适合元数据同步的 OAI-PMH 接口。整个项目按请求层、解析层、存储层拆开,支持限速、重试、resumptionToken 分页、SQLite 去重、游标续跑和 CSV 导出。
最终采集字段包括:
标题
作者
分类
摘要
提交日期
PDF 链接
同时额外保留:
arXiv ID
OAI Identifier
metadata_datestamp
content_hash
fetched_at
这些额外字段是工程上真正让系统稳定起来的东西。很多教程只展示“抓到了标题”,但生产环境更关心“下次怎么继续抓”“重复怎么处理”“失败了怎么恢复”“字段变了怎么排查”。
下一步可以继续做几件事:
第一,把单分类扩展到多分类,并设计统一限速队列。
第二,把 SQLite 换成 PostgreSQL,支持更复杂的查询和多人访问。
第三,增加邮件或飞书通知,每天推送新增论文摘要。
第四,接入向量检索,对摘要做 embedding,构建本地论文搜索引擎。
第五,迁移到 Scrapy 或 Airflow,做成更标准的数据管道。
第六,如果确实要处理动态网页或非公开 API,再考虑 Playwright。但在本文这个场景里,公开元数据接口已经是最干净的选择。
爬虫不是越快越好,也不是越隐蔽越好。真正耐用的采集程序,往往是慢一点、规矩一点、日志清楚一点、失败能恢复一点。这样的代码看起来没有炫技,但跑久了会让人很安心。
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐
所有评论(0)