㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这次要采集的是一份公开的河川水质监测断面目录。工具并不复杂:以 Python、requests、JSON API、SQLite 和 CSV 为主,最终得到一份可以持续更新的站点清单。

我更愿意把这类任务称为“轻量数据工程”,而不是简单地把页面内容扒下来。真正值得花时间处理的地方,不是发出一个 HTTP 请求,而是把站点编号、断面名称、河流、地区和监测指标整理成长期可复用的数据结构,并且在重复抓取、字段缺失、站点更名和接口波动时保持输出稳定。

读完本文,你可以获得三项可以直接迁移到其他项目的能力:

  1. 用公开 API 采集站点类目录数据,不依赖脆弱的页面选择器。
  2. 将“基础站点表”和“监测记录表”关联,汇总每个断面的监测指标集合。
  3. 用站点编号、内容指纹和冲突报告完成可靠去重,而不是粗暴地调用一次 drop_duplicates()

本文使用两个公开数据集:

  • WQX_P_06:河川水质测点基础资料。
  • WQX_P_01:河川水质监测资料。

基础资料集负责回答“有哪些断面”;监测资料集负责回答“每个断面公开过哪些监测指标”。这种拆分很常见:目录数据相对稳定,监测数据不断新增。把两者混在一起抓取,既浪费请求次数,也不利于后期维护。

1️⃣ 摘要(Abstract)

本文实现一个可复现的 Python 数据采集项目:通过公开环境数据 API 获取河川水质监测断面目录,补齐每个断面的监测指标,并导出 CSV、JSON 和 SQLite 三种结果。

输出目录至少包含以下核心字段:

字段 含义
station_id 站点编号
section_name 断面名称
river 河流名称
region 地区,由县市与乡镇组合
monitoring_indicators 已公开监测指标集合

项目还保留流域、经纬度、地址、使用状态、内容哈希和采集时间,方便后续地图展示、数据质量检查与增量更新。

完成本文后,你可以:

  1. 理解站点目录与监测明细的关联方式。
  2. 掌握带超时、重试、退避和请求节流的请求层写法。
  3. 建立可审计的去重机制:主键去重、内容哈希去重、异常冲突单独输出。
  4. 将同一套框架迁移到气象站、空气质量站、雨量站、交通检测器或公共设施目录。

2️⃣ 背景与需求(Why)

2.1 为什么值得做一份断面目录

站点类数据经常被低估。

刚接触监测数据时,人们通常更关注某一天的 pH、溶解氧或氨氮数值。但当你需要做跨月份分析、区域聚合、地图可视化或自动报表时,首先遇到的问题往往是:

  • 同一个站点在不同文件中是否使用同一个编号?
  • 断面名称是否发生过空格、括号、全角半角或繁简差异?
  • 一个断面属于哪个流域、哪条河、哪个地区?
  • 某些站点是否已经停用?
  • 某个站点到底公开过哪些监测指标?
  • 重跑脚本后,数据库是否会插入一批重复记录?

一份整洁的站点目录就是数据分析的地基。

目录建立完成后,可以继续完成这些工作:

  • 统计每条河流覆盖的监测断面数量。
  • 按地区建立站点地图。
  • 找出缺少坐标或缺少河流字段的站点。
  • 将每月监测结果关联到固定站点维表。
  • 对新增、停用或字段变化的断面生成变更报告。
  • 为后续定时任务、仪表盘和数据仓库建立统一主键。

2.2 目标字段

本项目将用户关心的五个字段作为最小结果集:

目标字段 来源字段 处理方式
断面名 sitename 清理空格后映射为 section_name
河流 river 保留公开接口原值
地区 countytownship 使用 / 拼接
监测指标 itemnameitemengabbreviationitemunit 按站点编号聚合并去重
站点编号 siteid 映射为 station_id,作为首选主键

此外保留若干辅助字段:

辅助字段 用途
county 地区筛选
township 更细粒度地区筛选
basin 按流域聚合
longitudelatitude 地图展示
site_address 人工复核
status_of_use 识别停用或状态变化
content_hash 内容级去重与变更检测
collected_at 记录采集时间
indicator_source 标记指标是否成功关联

2.3 为什么不用页面爬虫作为第一选择

只要官方已经提供公开 API,我通常不会优先解析网页。

网页的职责是展示。API 的职责是交换数据。二者面对的使用场景不同。

页面爬虫常见的问题包括:

  • 前端改版后 CSS 选择器失效。
  • 表格通过 JavaScript 动态加载,requests.get() 只能抓到空壳。
  • 页面为了展示增加合并单元格、分页、缩略字段或格式化文本。
  • 列名和列顺序调整后,代码悄悄错位。
  • 每一页只能获得少量记录,采集效率低。
  • 为了模拟浏览器而引入 Playwright,部署成本变高。

公开 API 通常更适合站点目录:

  • 返回结构明确。
  • 分页方式清晰。
  • 数据格式稳定。
  • 不需要模拟点击。
  • 更容易做字段校验。
  • 更容易遵守调用限制。

这仍然属于爬虫与数据采集范畴,只是我们选择了更加克制、更加可靠的入口。

3️⃣ 合规与注意事项(必写)

3.1 robots.txt 是起点,不是通行证

robots.txt 通常放在网站根目录,用于表达自动化客户端应当遵守的抓取规则。它不是登录授权,也不是访问权限证明。

工程上至少要做到:

  1. 开始采集前读取目标网站的 robots.txt
  2. 记录检查结果。
  3. 如果明确禁止目标路径,停止自动抓取。
  4. 如果 robots.txt 暂时不可访问,降低频率,并优先遵守平台公开的 API 使用条款。
  5. 不把“浏览器可以打开”误解为“可以无限制并发调用”。

对于 API 项目,平台服务条款通常比普通网页抓取习惯更重要。本文代码会主动检查 robots.txt,也会设置请求间隔、分页上限、超时和退避机制。

3.2 频率控制:慢一点,项目反而更稳

演示代码默认每次请求后暂停 1.2 秒,不使用攻击式并发。

对于一个站点目录而言,追求每秒几十次请求没有意义。基础目录通常不需要每分钟更新。即使需要定时同步,也可以每天或每周执行一次。

推荐策略:

场景 建议
首次测试 仅抓 1 页,确认字段
获取基础目录 单线程分页,页间隔 1 秒以上
补齐监测指标 限制最大页数,先观察数据量
定时同步 每天或每周低频执行
遇到 429 立即降低频率,尊重 Retry-After
遇到持续 403 停止自动尝试,检查条款、请求头和入口是否变化

3.3 不采集敏感信息,不绕过限制

本文只处理公开环境监测目录与公开监测测项,不涉及个人身份信息、账号信息或非公开数据。

代码不会实现以下行为:

  • 不模拟破解验证码。
  • 不绕过登录。
  • 不绕过付费限制。
  • 不复用他人的 API Key。
  • 不使用高并发压测公开服务。
  • 不尝试访问未公开的后台接口。
  • 不将重试写成无限循环。
  • 不因为接口返回 403 或 429 就自动切换大量代理继续施压。

如果平台要求注册后获取 API Key,应当使用自己的 Key,并按照条款控制调用量。

3.4 日志中避免泄露 API Key

API Key 不应该硬编码进仓库。

本文使用 .env 文件保存 Key,并在日志中只输出数据集代号、分页偏移量和记录数,不打印完整请求 URL。

.env 应加入 .gitignore

.env
output/
__pycache__/
.pytest_cache/
*.sqlite3

4️⃣ 技术选型与整体流程(What / How)

4.1 静态页面、动态页面还是 API

常见采集入口可以分为三类:

类型 特征 常用工具
静态 HTML 数据直接存在于响应 HTML requests、BeautifulSoup、lxml
动态页面 HTML 只是壳,数据由 JavaScript 加载 浏览器开发者工具、Playwright
JSON API 接口直接返回结构化记录 requests、JSON 解析

本文属于第三类:公开 JSON API 采集

基础目录接口:

https://data.moenv.gov.tw/api/v2/wqx_p_06

监测明细接口:

https://data.moenv.gov.tw/api/v2/wqx_p_01

API Key 通过环境变量传入,不直接写入 URL 示例。

4.2 整体流程

完整流程可以概括为:

读取配置
  ↓
检查 robots.txt
  ↓
建立 requests.Session
  ↓
配置 headers、timeout、retry、backoff
  ↓
分页采集 WQX_P_06 基础站点目录
  ↓
分页采集 WQX_P_01 最新公开监测记录
  ↓
按 SiteId 汇总监测指标
  ↓
字段标准化
  ↓
主键去重 + 内容哈希去重 + 冲突检测
  ↓
导出 CSV / JSON / SQLite
  ↓
打印预览与质量报告

4.3 为什么选择 requests

这个项目不需要浏览器自动化。

requests 足够完成:

  • Session 复用。
  • headers 管理。
  • 参数编码。
  • 连接超时与读取超时。
  • JSON 解析。
  • HTTP 状态码检查。
  • urllib3.Retry 配合完成有限重试。
  • 按照 Retry-After 执行退避。

没有必要为了显得“高级”而引入 Playwright。爬虫项目最可贵的不是依赖多,而是依赖恰到好处。

4.4 为什么同时导出 CSV、JSON 和 SQLite

三种格式各有用途:

格式 适用场景
CSV 用 Excel、WPS、pandas 快速查看
JSON 给其他程序、前端或 API 服务使用
SQLite 增量更新、主键约束、SQL 查询、定时任务

CSV 是交付文件,JSON 是交换文件,SQLite 是本地数据库。三者同时保留,成本并不高,却能覆盖大多数后续需求。

5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

建议使用:

Python 3.11 或 Python 3.12

Python 3.10 也可以运行。为了减少环境差异,不建议使用过旧版本。

5.2 创建虚拟环境

Windows PowerShell:

python -m venv .venv
.venv\Scripts\Activate.ps1

macOS 或 Linux:

python3 -m venv .venv
source .venv/bin/activate

5.3 项目目录

推荐结构:

water-section-catalog/
├── .env.example
├── .gitignore
├── requirements.txt
├── README.md
├── water_catalog/
│   ├── __init__.py
│   ├── config.py
│   ├── client.py
│   ├── cleaner.py
│   ├── storage.py
│   └── cli.py
└── tests/
    └── test_cleaner.py

创建目录:

mkdir -p water-section-catalog/water_catalog
mkdir -p water-section-catalog/tests
cd water-section-catalog

Windows 用户也可以手动新建目录。

5.4 requirements.txt

requests>=2.31,<3
urllib3>=2,<3
python-dotenv>=1,<2
pytest>=8,<9

安装依赖:

pip install -r requirements.txt

5.5 .env.example

MOENV_API_KEY=replace_with_your_own_api_key

WATER_CATALOG_PAGE_SIZE=1000
WATER_CATALOG_REQUEST_INTERVAL=1.2
WATER_CATALOG_CONNECT_TIMEOUT=5
WATER_CATALOG_READ_TIMEOUT=30
WATER_CATALOG_OUTPUT_DIR=output

WATER_CATALOG_USER_AGENT=WaterSectionCatalogBot/1.0 (+contact: your-email@example.com)

复制并重命名:

cp .env.example .env

然后填写自己的 API Key。

Windows PowerShell:

Copy-Item .env.example .env

5.6 .gitignore

.env
output/
__pycache__/
.pytest_cache/
*.pyc
*.sqlite3

5.7 water_catalog/init.py

"""Public water monitoring section catalog collector."""

__version__ = "1.0.0"

6️⃣ 核心实现:请求层(Fetcher)

请求层负责解决四个问题:

  1. 请求发到哪里。
  2. 请求头如何设置。
  3. 网络波动如何处理。
  4. 如何在合规范围内重试。

6.1 headers:UA、referer 和 Accept

一个清晰的 User-Agent 比伪装成浏览器更合适。

本文默认:

WaterSectionCatalogBot/1.0 (+contact: your-email@example.com)

它明确说明这是一个自动化采集程序,并预留联系方式。公开数据采集不是躲猫猫,没有必要堆砌一串浏览器指纹。

请求头包含:

{
    "User-Agent": "...",
    "Accept": "application/json",
    "Referer": "https://data.moenv.gov.tw/dataset/detail/WQX_P_06",
}

Referer 不是为了绕过限制,而是为了清晰说明请求与公开数据集页面的关联。

6.2 timeout:连接和读取分开设置

不要写:

requests.get(url)

也不要只依赖默认行为。

推荐:

timeout=(5, 30)

含义是:

  • 建立连接最多等待 5 秒。
  • 连接建立后读取响应最多等待 30 秒。

如果网络卡住,脚本不会永远挂在那里。

6.3 session 与 cookie

本项目使用 requests.Session() 复用 TCP 连接。

公开 API 不需要登录 cookie,因此代码不会保存、导入或伪造 cookie。

如果未来平台调整为登录后使用,正确做法是根据公开说明申请访问权限,而不是在教程中增加绕过逻辑。

6.4 重试与退避

适合重试的情况:

  • 429 Too Many Requests
  • 500 Internal Server Error
  • 502 Bad Gateway
  • 503 Service Unavailable
  • 504 Gateway Timeout
  • 临时连接失败
  • 临时读取超时

不适合盲目重试的情况:

  • 400 Bad Request
  • 401 Unauthorized
  • 403 Forbidden
  • 参数写错
  • API Key 失效
  • 数据集代号不存在

退避的目的不是“不断撞门”,而是给服务端恢复时间。

6.5 water_catalog/config.py

from __future__ import annotations

import os
from dataclasses import dataclass
from pathlib import Path

from dotenv import load_dotenv


load_dotenv()


@dataclass(frozen=True)
class Settings:
    """Runtime configuration loaded from environment variables."""

    api_key: str
    base_url: str
    catalog_dataset: str
    observation_dataset: str
    referer: str
    user_agent: str
    page_size: int
    request_interval: float
    connect_timeout: float
    read_timeout: float
    output_dir: Path

    @classmethod
    def from_env(cls) -> "Settings":
        return cls(
            api_key=os.getenv("MOENV_API_KEY", "").strip(),
            base_url="https://data.moenv.gov.tw/api/v2",
            catalog_dataset="wqx_p_06",
            observation_dataset="wqx_p_01",
            referer="https://data.moenv.gov.tw/dataset/detail/WQX_P_06",
            user_agent=os.getenv(
                "WATER_CATALOG_USER_AGENT",
                "WaterSectionCatalogBot/1.0 (+contact: your-email@example.com)",
            ).strip(),
            page_size=int(os.getenv("WATER_CATALOG_PAGE_SIZE", "1000")),
            request_interval=float(
                os.getenv("WATER_CATALOG_REQUEST_INTERVAL", "1.2")
            ),
            connect_timeout=float(
                os.getenv("WATER_CATALOG_CONNECT_TIMEOUT", "5")
            ),
            read_timeout=float(
                os.getenv("WATER_CATALOG_READ_TIMEOUT", "30")
            ),
            output_dir=Path(
                os.getenv("WATER_CATALOG_OUTPUT_DIR", "output")
            ),
        )

    def validate(self) -> None:
        if not self.api_key:
            raise ValueError(
                "Missing MOENV_API_KEY. Copy .env.example to .env "
                "and fill in your own API key."
            )

        if self.page_size < 1 or self.page_size > 1000:
            raise ValueError(
                "WATER_CATALOG_PAGE_SIZE must be between 1 and 1000."
            )

        if self.request_interval < 0.5:
            raise ValueError(
                "Request interval is too aggressive. "
                "Use at least 0.5 seconds between requests."
            )

        if self.connect_timeout <= 0 or self.read_timeout <= 0:
            raise ValueError("Timeout values must be positive numbers.")

    @property
    def timeout(self) -> tuple[float, float]:
        return (self.connect_timeout, self.read_timeout)

6.6 water_catalog/client.py

from __future__ import annotations

import json
import logging
import time
from dataclasses import dataclass
from pathlib import Path
from typing import Any
from urllib import robotparser
from urllib.parse import urljoin

import requests
from requests import Response
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from .config import Settings


logger = logging.getLogger(__name__)


class ApiResponseError(RuntimeError):
    """Raised when the API response cannot be parsed safely."""


@dataclass
class PageResult:
    dataset: str
    offset: int
    limit: int
    records: list[dict[str, Any]]
    raw_payload: Any


def build_session(settings: Settings) -> requests.Session:
    """Create a requests session with bounded retries and clear headers."""

    retry = Retry(
        total=4,
        connect=4,
        read=4,
        status=4,
        backoff_factor=1.0,
        status_forcelist=(429, 500, 502, 503, 504),
        allowed_methods=frozenset({"GET"}),
        respect_retry_after_header=True,
        raise_on_status=False,
    )

    adapter = HTTPAdapter(
        max_retries=retry,
        pool_connections=4,
        pool_maxsize=4,
    )

    session = requests.Session()
    session.headers.update(
        {
            "User-Agent": settings.user_agent,
            "Accept": "application/json",
            "Referer": settings.referer,
        }
    )
    session.mount("https://", adapter)
    session.mount("http://", adapter)
    return session


class PublicWaterApiClient:
    """Small API client for public water monitoring datasets."""

    def __init__(self, settings: Settings) -> None:
        self.settings = settings
        self.session = build_session(settings)

    def close(self) -> None:
        self.session.close()

    def __enter__(self) -> "PublicWaterApiClient":
        return self

    def __exit__(self, exc_type, exc_value, traceback) -> None:
        self.close()

    def dataset_url(self, dataset: str) -> str:
        return urljoin(f"{self.settings.base_url}/", dataset)

    def check_robots(self) -> bool:
        """
        Check robots.txt before collecting.

        If robots.txt is unavailable, continue cautiously and rely on
        the public API terms, rate control, and explicit pagination limits.
        """

        robots_url = "https://data.moenv.gov.tw/robots.txt"
        parser = robotparser.RobotFileParser()
        parser.set_url(robots_url)

        try:
            response = self.session.get(
                robots_url,
                timeout=self.settings.timeout,
            )

            if response.status_code == 404:
                logger.warning(
                    "robots.txt was not found. Continue cautiously "
                    "under the documented API terms."
                )
                return True

            response.raise_for_status()
            parser.parse(response.text.splitlines())

            allowed = parser.can_fetch(
                self.settings.user_agent,
                self.dataset_url(self.settings.catalog_dataset),
            )

            if allowed:
                logger.info("robots.txt check passed.")
            else:
                logger.error(
                    "robots.txt disallows automated access to the target path."
                )

            return allowed

        except requests.RequestException as exc:
            logger.warning(
                "Could not read robots.txt: %s. "
                "Continue cautiously under the documented API terms.",
                exc,
            )
            return True

    def fetch_page(
        self,
        dataset: str,
        *,
        offset: int,
        limit: int,
    ) -> PageResult:
        """Fetch one JSON page."""

        url = self.dataset_url(dataset)
        params = {
            "api_key": self.settings.api_key,
            "format": "json",
            "offset": offset,
            "limit": limit,
        }

        logger.info(
            "Fetching dataset=%s offset=%s limit=%s",
            dataset,
            offset,
            limit,
        )

        response = self.session.get(
            url,
            params=params,
            timeout=self.settings.timeout,
        )

        self._raise_for_status(response)

        try:
            payload = response.json()
        except requests.JSONDecodeError as exc:
            raise ApiResponseError(
                f"Dataset {dataset} returned invalid JSON."
            ) from exc

        records = extract_records(payload)

        logger.info(
            "Received dataset=%s offset=%s records=%s",
            dataset,
            offset,
            len(records),
        )

        time.sleep(self.settings.request_interval)

        return PageResult(
            dataset=dataset,
            offset=offset,
            limit=limit,
            records=records,
            raw_payload=payload,
        )

    def fetch_all(
        self,
        dataset: str,
        *,
        max_pages: int | None = None,
        raw_dir: Path | None = None,
    ) -> list[dict[str, Any]]:
        """
        Fetch paginated records.

        max_pages is a safety valve. Use None for the small catalog dataset.
        For larger observation datasets, pass an explicit limit first.
        """

        all_records: list[dict[str, Any]] = []
        offset = 0
        page_number = 0

        while True:
            if max_pages is not None and page_number >= max_pages:
                logger.warning(
                    "Reached max_pages=%s for dataset=%s. "
                    "The output may be a bounded snapshot rather than "
                    "the complete available dataset.",
                    max_pages,
                    dataset,
                )
                break

            result = self.fetch_page(
                dataset,
                offset=offset,
                limit=self.settings.page_size,
            )

            page_number += 1

            if raw_dir is not None:
                save_raw_payload(
                    raw_dir=raw_dir,
                    dataset=dataset,
                    offset=offset,
                    payload=result.raw_payload,
                )

            if not result.records:
                break

            all_records.extend(result.records)

            if len(result.records) < self.settings.page_size:
                break

            offset += self.settings.page_size

        logger.info(
            "Finished dataset=%s pages=%s records=%s",
            dataset,
            page_number,
            len(all_records),
        )
        return all_records

    @staticmethod
    def _raise_for_status(response: Response) -> None:
        if response.status_code == 429:
            retry_after = response.headers.get("Retry-After", "unknown")
            raise RuntimeError(
                "API rate limit reached. "
                f"Retry-After={retry_after}. "
                "Stop the job and reduce request frequency."
            )

        if response.status_code in {401, 403}:
            raise RuntimeError(
                f"API request rejected with HTTP {response.status_code}. "
                "Check your own API key, the documented terms, and the "
                "dataset endpoint. Do not attempt aggressive retries."
            )

        response.raise_for_status()


def extract_records(payload: Any) -> list[dict[str, Any]]:
    """
    Extract list records from a JSON payload.

    APIs sometimes wrap records under records, data, result, or results.
    This parser remains strict enough to avoid silently accepting nonsense.
    """

    if isinstance(payload, list):
        return [
            row for row in payload
            if isinstance(row, dict)
        ]

    if not isinstance(payload, dict):
        raise ApiResponseError(
            f"Unexpected JSON root type: {type(payload).__name__}"
        )

    for key in ("records", "data", "result", "results"):
        value = payload.get(key)

        if isinstance(value, list):
            return [
                row for row in value
                if isinstance(row, dict)
            ]

        if isinstance(value, dict):
            try:
                nested = extract_records(value)
            except ApiResponseError:
                continue

            if nested:
                return nested

    raise ApiResponseError(
        "Could not find a record list in the JSON payload. "
        f"Available top-level keys: {sorted(payload.keys())}"
    )


def save_raw_payload(
    *,
    raw_dir: Path,
    dataset: str,
    offset: int,
    payload: Any,
) -> None:
    raw_dir.mkdir(parents=True, exist_ok=True)

    file_path = raw_dir / f"{dataset}_offset_{offset:08d}.json"
    file_path.write_text(
        json.dumps(payload, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

6.7 请求层设计说明

这段代码刻意做了几个限制:

  • 重试次数有限,不会无限循环。
  • 只对 GET 请求启用自动重试。
  • 连接池很小,不鼓励并发。
  • 429 会抛出明确错误,提醒降低频率。
  • 401403 不会被当作“多试几次就能成功”的异常。
  • API Key 不会出现在日志中。
  • 每一页原始 JSON 都可以落盘,方便排错和审计。

这比写一个几十行的临时脚本稍微啰嗦,但后续维护会轻松很多。

7️⃣ 核心实现:解析层(Parser)

7.1 解析方式:JSON 优先

传统网页爬虫通常要选择:

  • XPath
  • CSS Selector
  • BeautifulSoup
  • 正则表达式
  • 浏览器自动化

本文不需要这些作为主流程,因为接口直接返回 JSON。

解析步骤是:

  1. 找到 JSON 中的记录列表。
  2. 兼容 recordsdataresultresults 等常见包裹字段。
  3. 将字段名统一转为小写。
  4. 清洗空格和 Unicode 格式。
  5. 汇总监测指标。
  6. 建立内容指纹。
  7. 完成主键去重与冲突检测。

7.2 列表页如何拿详情链接

这个问题在 API 项目中需要换一个角度回答。

传统 HTML 项目通常是:

列表页
  ↓
提取详情链接
  ↓
逐个请求详情页
  ↓
抽取字段

本文的数据源已经将详情字段结构化,因此不需要逐站点访问详情页。siteid 本身就是站点的稳定关联键。

换句话说,API 版本的流程是:

基础目录接口
  ↓
获得 SiteId、SiteName、County、Township、Basin、River 等字段
  ↓
监测记录接口
  ↓
使用 SiteId 进行关联

这比详情页抓取更加节制,也减少了请求数量。

如果未来遇到只有 HTML 的站点目录,可以使用下面的通用思路:

from urllib.parse import urljoin

from bs4 import BeautifulSoup


def parse_detail_links(html: str, base_url: str) -> list[str]:
    soup = BeautifulSoup(html, "html.parser")
    links: list[str] = []

    for anchor in soup.select("table tbody a[href]"):
        href = anchor.get("href", "").strip()
        if not href:
            continue
        links.append(urljoin(base_url, href))

    return sorted(set(links))

这段代码只是通用示范,不参与本文主流程。能直接使用公开 API 时,不要为了“像爬虫”而额外抓详情页。

7.3 缺失字段怎么办

站点类数据不应因为一个字段缺失而整条丢弃。

本文采用以下策略:

字段 缺失处理
siteid 使用内容哈希生成 HASH-xxxxxxxxxxxxxxxx 备用编号
sitename 保留空值,但会在质量报告中暴露
county 保留空值
township 保留空值
river 保留空值,不用流域冒充河流
经纬度 尝试转为浮点数,失败则写入 None
监测指标 若未成功关联,写入空字符串并标注来源
使用状态 保留公开原值

有一个细节值得强调:不要把流域名称当作河流名称的替代值。

流域和河流是不同概念。字段缺失时,宁可保留为空,也不要为了让表格“看起来完整”而制造错误。

7.4 指标如何聚合

监测明细通常是一行一个测项:

站点 A + 采样日期 + pH + 数值
站点 A + 采样日期 + 溶解氧 + 数值
站点 A + 采样日期 + 氨氮 + 数值
站点 B + 采样日期 + pH + 数值

而目录需要一行一个站点:

站点 A + pH;溶解氧;氨氮
站点 B + pH

因此需要按 siteid 分组,将指标放入集合,再排序拼接。

集合可以自然去除重复测项。

7.5 water_catalog/cleaner.py

from __future__ import annotations

import hashlib
import json
import unicodedata
from collections import defaultdict
from datetime import datetime, timezone
from typing import Any


def normalize_text(value: Any) -> str:
    """
    Normalize text for stable comparison.

    - Convert None to an empty string.
    - Normalize Unicode width and compatibility characters.
    - Remove leading and trailing whitespace.
    - Collapse repeated internal whitespace.
    """

    if value is None:
        return ""

    text = unicodedata.normalize("NFKC", str(value))
    return " ".join(text.strip().split())


def lower_keys(row: dict[str, Any]) -> dict[str, Any]:
    return {
        normalize_text(key).lower(): value
        for key, value in row.items()
    }


def first_value(row: dict[str, Any], *keys: str) -> str:
    for key in keys:
        value = normalize_text(row.get(key))
        if value:
            return value
    return ""


def safe_float(value: Any) -> float | None:
    text = normalize_text(value)

    if not text:
        return None

    try:
        return float(text)
    except ValueError:
        return None


def build_region(county: str, township: str) -> str:
    parts = [
        part for part in (county, township)
        if part
    ]
    return " / ".join(parts)


def build_indicator_label(row: dict[str, Any]) -> str:
    """
    Create a readable indicator label.

    Examples:
    - pH
    - 溶解氧 [DO] (mg/L)
    - 氨氮 [NH3-N] (mg/L)
    """

    lowered = lower_keys(row)

    name = first_value(lowered, "itemname")
    abbreviation = first_value(
        lowered,
        "itemengabbreviation",
        "itemabbr",
    )
    unit = first_value(lowered, "itemunit")

    label = name or abbreviation

    if not label:
        return ""

    if abbreviation and abbreviation.lower() not in label.lower():
        label = f"{label} [{abbreviation}]"

    if unit:
        label = f"{label} ({unit})"

    return label


def build_indicator_index(
    observation_rows: list[dict[str, Any]],
) -> dict[str, list[str]]:
    """Group unique monitoring indicators by station ID."""

    grouped: dict[str, set[str]] = defaultdict(set)

    for row in observation_rows:
        lowered = lower_keys(row)
        station_id = first_value(lowered, "siteid")

        if not station_id:
            continue

        label = build_indicator_label(lowered)

        if label:
            grouped[station_id].add(label)

    return {
        station_id: sorted(labels)
        for station_id, labels in grouped.items()
    }


def build_content_hash(
    *,
    section_name: str,
    county: str,
    township: str,
    basin: str,
    river: str,
    longitude: float | None,
    latitude: float | None,
    site_address: str,
    status_of_use: str,
) -> str:
    """
    Hash normalized content.

    The hash is used for fallback IDs, exact duplicate elimination,
    and change tracking. It is not a security token.
    """

    canonical = {
        "section_name": section_name,
        "county": county,
        "township": township,
        "basin": basin,
        "river": river,
        "longitude": longitude,
        "latitude": latitude,
        "site_address": site_address,
        "status_of_use": status_of_use,
    }

    serialized = json.dumps(
        canonical,
        ensure_ascii=False,
        sort_keys=True,
        separators=(",", ":"),
    )

    return hashlib.sha256(
        serialized.encode("utf-8")
    ).hexdigest()


def normalize_station(
    row: dict[str, Any],
    indicator_index: dict[str, list[str]],
) -> dict[str, Any]:
    """Convert one source row into the target station schema."""

    lowered = lower_keys(row)

    original_station_id = first_value(lowered, "siteid")
    section_name = first_value(lowered, "sitename")
    county = first_value(lowered, "county")
    township = first_value(lowered, "township")
    basin = first_value(lowered, "basin")
    river = first_value(lowered, "river")
    longitude = safe_float(
        lowered.get("twd97lon")
    )
    latitude = safe_float(
        lowered.get("twd97lat")
    )
    site_address = first_value(
        lowered,
        "siteaddress",
    )
    status_of_use = first_value(
        lowered,
        "statusofuse",
    )

    content_hash = build_content_hash(
        section_name=section_name,
        county=county,
        township=township,
        basin=basin,
        river=river,
        longitude=longitude,
        latitude=latitude,
        site_address=site_address,
        status_of_use=status_of_use,
    )

    station_id = (
        original_station_id
        if original_station_id
        else f"HASH-{content_hash[:16]}"
    )

    indicators = indicator_index.get(
        original_station_id,
        [],
    )

    return {
        "station_id": station_id,
        "section_name": section_name,
        "river": river,
        "region": build_region(county, township),
        "monitoring_indicators": ";".join(indicators),
        "county": county,
        "township": township,
        "basin": basin,
        "longitude": longitude,
        "latitude": latitude,
        "site_address": site_address,
        "status_of_use": status_of_use,
        "content_hash": content_hash,
        "indicator_source": (
            "wqx_p_01_joined"
            if indicators
            else "not_available_in_collected_observation_snapshot"
        ),
        "collected_at": datetime.now(
            timezone.utc
        ).isoformat(),
    }


def station_score(row: dict[str, Any]) -> tuple[int, int]:
    """
    Score a station row.

    More complete rows win. Rows with a non-empty status value get a
    small bonus. This is deterministic and easy to audit.
    """

    important_fields = (
        "section_name",
        "river",
        "region",
        "county",
        "township",
        "basin",
        "longitude",
        "latitude",
        "site_address",
        "status_of_use",
        "monitoring_indicators",
    )

    completeness = sum(
        1 for field in important_fields
        if row.get(field) not in ("", None)
    )

    status_bonus = (
        1 if row.get("status_of_use") else 0
    )

    return completeness, status_bonus


def deduplicate_stations(
    station_rows: list[dict[str, Any]],
) -> tuple[list[dict[str, Any]], list[dict[str, Any]]]:
    """
    Deduplicate stations.

    Strategy:
    1. Remove exact duplicate content hashes.
    2. Group remaining rows by station_id.
    3. Keep the most complete row.
    4. Emit a conflict report when the same station_id has different hashes.
    """

    seen_hashes: set[str] = set()
    exact_unique_rows: list[dict[str, Any]] = []

    for row in station_rows:
        content_hash = row["content_hash"]

        if content_hash in seen_hashes:
            continue

        seen_hashes.add(content_hash)
        exact_unique_rows.append(row)

    grouped: dict[str, list[dict[str, Any]]] = defaultdict(list)

    for row in exact_unique_rows:
        grouped[row["station_id"]].append(row)

    final_rows: list[dict[str, Any]] = []
    conflicts: list[dict[str, Any]] = []

    for station_id, rows in grouped.items():
        ranked = sorted(
            rows,
            key=station_score,
            reverse=True,
        )

        selected = ranked[0]
        final_rows.append(selected)

        distinct_hashes = {
            row["content_hash"]
            for row in rows
        }

        if len(distinct_hashes) > 1:
            conflicts.append(
                {
                    "station_id": station_id,
                    "variant_count": len(rows),
                    "selected_content_hash": selected["content_hash"],
                    "all_content_hashes": ";".join(
                        sorted(distinct_hashes)
                    ),
                    "selected_section_name": selected["section_name"],
                    "selected_river": selected["river"],
                    "selected_region": selected["region"],
                    "review_reason": (
                        "same_station_id_with_different_content"
                    ),
                }
            )

    final_rows.sort(
        key=lambda row: (
            row.get("county", ""),
            row.get("township", ""),
            row.get("river", ""),
            row.get("section_name", ""),
            row.get("station_id", ""),
        )
    )

    conflicts.sort(
        key=lambda row: row["station_id"]
    )

    return final_rows, conflicts

7.6 为什么去重不能只写一行

很多教程会这样处理:

df.drop_duplicates(subset=["siteid"])

这对一次性分析够用,但对长期同步不够。

假设同一个站点编号出现两行:

A001, 东桥断面, 清河, 某市
A001, 东桥断面, 清河, 某市某区

如果直接保留第一行,你可能留下信息更少的记录。

再假设出现:

A001, 东桥断面, 清河, 某市
A001, 西桥断面, 清河, 某市

这可能意味着:

  • 上游数据修订。
  • 站点更名。
  • 字段映射错误。
  • 数据异常。
  • 站点编号复用。

此时不能悄悄删除其中一条,而应该输出冲突报告,交给人工复核。

8️⃣ 数据存储与导出(Storage)

8.1 CSV 字段映射表

最终 CSV 字段如下:

字段名 类型 示例值 说明
station_id text 1724 站点编号
section_name text 歪仔歪桥 断面名称
river text 罗东溪 河流
region text 宜兰县 / 三星乡 地区组合
monitoring_indicators text pH;溶解氧 [DO] (mg/L) 指标集合
county text 宜兰县 县市
township text 三星乡 乡镇
basin text 兰阳溪流域 流域
longitude real 121.000000 经度
latitude real 24.000000 纬度
site_address text ... 位置描述
status_of_use text ... 使用状态
content_hash text sha256... 内容指纹
indicator_source text wqx_p_01_joined 指标来源
collected_at text ISO 8601 时间 采集时间

8.2 SQLite 表设计

SQLite 使用 station_id 作为主键:

CREATE TABLE IF NOT EXISTS stations (
    station_id TEXT PRIMARY KEY,
    section_name TEXT NOT NULL DEFAULT '',
    river TEXT NOT NULL DEFAULT '',
    region TEXT NOT NULL DEFAULT '',
    monitoring_indicators TEXT NOT NULL DEFAULT '',
    county TEXT NOT NULL DEFAULT '',
    township TEXT NOT NULL DEFAULT '',
    basin TEXT NOT NULL DEFAULT '',
    longitude REAL,
    latitude REAL,
    site_address TEXT NOT NULL DEFAULT '',
    status_of_use TEXT NOT NULL DEFAULT '',
    content_hash TEXT NOT NULL,
    indicator_source TEXT NOT NULL DEFAULT '',
    collected_at TEXT NOT NULL
);

重复执行脚本时,使用 UPSERT 更新:

ON CONFLICT(station_id) DO UPDATE SET ...

这样可以安全重跑。

8.3 water_catalog/storage.py

from __future__ import annotations

import csv
import json
import sqlite3
from pathlib import Path
from typing import Any


STATION_FIELDS = [
    "station_id",
    "section_name",
    "river",
    "region",
    "monitoring_indicators",
    "county",
    "township",
    "basin",
    "longitude",
    "latitude",
    "site_address",
    "status_of_use",
    "content_hash",
    "indicator_source",
    "collected_at",
]


CONFLICT_FIELDS = [
    "station_id",
    "variant_count",
    "selected_content_hash",
    "all_content_hashes",
    "selected_section_name",
    "selected_river",
    "selected_region",
    "review_reason",
]


def ensure_output_dir(output_dir: Path) -> None:
    output_dir.mkdir(parents=True, exist_ok=True)


def write_csv(
    file_path: Path,
    rows: list[dict[str, Any]],
    *,
    fieldnames: list[str],
) -> None:
    """
    Write UTF-8 with BOM.

    utf-8-sig is convenient when the CSV will be opened directly
    in common spreadsheet software.
    """

    file_path.parent.mkdir(
        parents=True,
        exist_ok=True,
    )

    with file_path.open(
        "w",
        encoding="utf-8-sig",
        newline="",
    ) as file:
        writer = csv.DictWriter(
            file,
            fieldnames=fieldnames,
            extrasaction="ignore",
        )
        writer.writeheader()
        writer.writerows(rows)


def write_json(
    file_path: Path,
    rows: list[dict[str, Any]],
) -> None:
    file_path.parent.mkdir(
        parents=True,
        exist_ok=True,
    )

    file_path.write_text(
        json.dumps(
            rows,
            ensure_ascii=False,
            indent=2,
        ),
        encoding="utf-8",
    )


def connect_sqlite(
    database_path: Path,
) -> sqlite3.Connection:
    database_path.parent.mkdir(
        parents=True,
        exist_ok=True,
    )

    connection = sqlite3.connect(
        database_path
    )
    connection.row_factory = sqlite3.Row
    return connection


def initialize_database(
    connection: sqlite3.Connection,
) -> None:
    connection.execute(
        """
        CREATE TABLE IF NOT EXISTS stations (
            station_id TEXT PRIMARY KEY,
            section_name TEXT NOT NULL DEFAULT '',
            river TEXT NOT NULL DEFAULT '',
            region TEXT NOT NULL DEFAULT '',
            monitoring_indicators TEXT NOT NULL DEFAULT '',
            county TEXT NOT NULL DEFAULT '',
            township TEXT NOT NULL DEFAULT '',
            basin TEXT NOT NULL DEFAULT '',
            longitude REAL,
            latitude REAL,
            site_address TEXT NOT NULL DEFAULT '',
            status_of_use TEXT NOT NULL DEFAULT '',
            content_hash TEXT NOT NULL,
            indicator_source TEXT NOT NULL DEFAULT '',
            collected_at TEXT NOT NULL
        )
        """
    )

    connection.execute(
        """
        CREATE INDEX IF NOT EXISTS idx_stations_region
        ON stations(county, township)
        """
    )

    connection.execute(
        """
        CREATE INDEX IF NOT EXISTS idx_stations_river
        ON stations(river)
        """
    )

    connection.execute(
        """
        CREATE INDEX IF NOT EXISTS idx_stations_basin
        ON stations(basin)
        """
    )

    connection.commit()


def upsert_stations(
    connection: sqlite3.Connection,
    rows: list[dict[str, Any]],
) -> None:
    sql = """
        INSERT INTO stations (
            station_id,
            section_name,
            river,
            region,
            monitoring_indicators,
            county,
            township,
            basin,
            longitude,
            latitude,
            site_address,
            status_of_use,
            content_hash,
            indicator_source,
            collected_at
        )
        VALUES (
            :station_id,
            :section_name,
            :river,
            :region,
            :monitoring_indicators,
            :county,
            :township,
            :basin,
            :longitude,
            :latitude,
            :site_address,
            :status_of_use,
            :content_hash,
            :indicator_source,
            :collected_at
        )
        ON CONFLICT(station_id) DO UPDATE SET
            section_name = excluded.section_name,
            river = excluded.river,
            region = excluded.region,
            monitoring_indicators = excluded.monitoring_indicators,
            county = excluded.county,
            township = excluded.township,
            basin = excluded.basin,
            longitude = excluded.longitude,
            latitude = excluded.latitude,
            site_address = excluded.site_address,
            status_of_use = excluded.status_of_use,
            content_hash = excluded.content_hash,
            indicator_source = excluded.indicator_source,
            collected_at = excluded.collected_at
    """

    connection.executemany(sql, rows)
    connection.commit()


def export_all(
    *,
    output_dir: Path,
    stations: list[dict[str, Any]],
    conflicts: list[dict[str, Any]],
) -> dict[str, Path]:
    ensure_output_dir(output_dir)

    csv_path = (
        output_dir
        / "water_section_catalog.csv"
    )
    json_path = (
        output_dir
        / "water_section_catalog.json"
    )
    conflict_path = (
        output_dir
        / "water_section_conflicts.csv"
    )
    sqlite_path = (
        output_dir
        / "water_section_catalog.sqlite3"
    )

    write_csv(
        csv_path,
        stations,
        fieldnames=STATION_FIELDS,
    )

    write_json(
        json_path,
        stations,
    )

    write_csv(
        conflict_path,
        conflicts,
        fieldnames=CONFLICT_FIELDS,
    )

    with connect_sqlite(
        sqlite_path
    ) as connection:
        initialize_database(connection)
        upsert_stations(connection, stations)

    return {
        "csv": csv_path,
        "json": json_path,
        "conflicts": conflict_path,
        "sqlite": sqlite_path,
    }

9️⃣ 运行方式与结果展示(必写)

9.1 命令行入口

创建 water_catalog/cli.py

from __future__ import annotations

import argparse
import logging
import sys
from datetime import datetime
from pathlib import Path
from typing import Any

from .cleaner import (
    build_indicator_index,
    deduplicate_stations,
    normalize_station,
)
from .client import PublicWaterApiClient
from .config import Settings
from .storage import export_all


logger = logging.getLogger(__name__)


def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(
        description=(
            "Collect public water monitoring section catalog data, "
            "aggregate indicators, deduplicate stations, and export files."
        )
    )

    parser.add_argument(
        "--catalog-only",
        action="store_true",
        help=(
            "Collect only the base station catalog. "
            "Skip indicator enrichment from observation records."
        ),
    )

    parser.add_argument(
        "--observation-pages",
        type=int,
        default=20,
        help=(
            "Safety limit for observation dataset pages. "
            "Default: 20. Increase cautiously after reviewing logs."
        ),
    )

    parser.add_argument(
        "--skip-robots-check",
        action="store_true",
        help=(
            "Skip the robots.txt request only when you have manually "
            "reviewed the current policy. API terms still apply."
        ),
    )

    parser.add_argument(
        "--preview-rows",
        type=int,
        default=5,
        help="How many normalized station rows to print.",
    )

    return parser.parse_args()


def configure_logging() -> None:
    logging.basicConfig(
        level=logging.INFO,
        format=(
            "%(asctime)s | %(levelname)s | "
            "%(name)s | %(message)s"
        ),
    )


def print_preview(
    stations: list[dict[str, Any]],
    limit: int,
) -> None:
    columns = (
        "station_id",
        "section_name",
        "river",
        "region",
        "monitoring_indicators",
    )

    print("\nPreview")
    print("-" * 120)
    print(" | ".join(columns))
    print("-" * 120)

    for row in stations[:limit]:
        values = [
            str(row.get(column, ""))
            for column in columns
        ]
        print(" | ".join(values))

    print("-" * 120)


def build_raw_dir(
    output_dir: Path,
) -> Path:
    timestamp = datetime.now().strftime(
        "%Y%m%d_%H%M%S"
    )
    return output_dir / "raw" / timestamp


def main() -> int:
    configure_logging()
    args = parse_args()

    try:
        settings = Settings.from_env()
        settings.validate()
    except ValueError as exc:
        logger.error("%s", exc)
        return 2

    raw_dir = build_raw_dir(
        settings.output_dir
    )

    try:
        with PublicWaterApiClient(
            settings
        ) as client:
            if (
                not args.skip_robots_check
                and not client.check_robots()
            ):
                logger.error(
                    "Collection stopped because robots.txt "
                    "does not allow the target path."
                )
                return 3

            catalog_rows = client.fetch_all(
                settings.catalog_dataset,
                raw_dir=raw_dir,
            )

            if args.catalog_only:
                observation_rows: list[
                    dict[str, Any]
                ] = []
            else:
                observation_rows = client.fetch_all(
                    settings.observation_dataset,
                    max_pages=args.observation_pages,
                    raw_dir=raw_dir,
                )

    except Exception:
        logger.exception(
            "Collection failed."
        )
        return 1

    indicator_index = build_indicator_index(
        observation_rows
    )

    normalized_rows = [
        normalize_station(
            row,
            indicator_index,
        )
        for row in catalog_rows
    ]

    stations, conflicts = (
        deduplicate_stations(
            normalized_rows
        )
    )

    files = export_all(
        output_dir=settings.output_dir,
        stations=stations,
        conflicts=conflicts,
    )

    print_preview(
        stations,
        limit=args.preview_rows,
    )

    logger.info(
        "Catalog rows from API: %s",
        len(catalog_rows),
    )
    logger.info(
        "Observation rows collected: %s",
        len(observation_rows),
    )
    logger.info(
        "Unique stations exported: %s",
        len(stations),
    )
    logger.info(
        "Conflicts for manual review: %s",
        len(conflicts),
    )

    for label, path in files.items():
        logger.info(
            "Output %-10s -> %s",
            label,
            path,
        )

    return 0


if __name__ == "__main__":
    sys.exit(main())

9.2 如何启动

先做一次最小化测试,只抓基础目录:

python -m water_catalog.cli --catalog-only --preview-rows 5

确认可以运行后,再补齐监测指标:

python -m water_catalog.cli --observation-pages 20 --preview-rows 5

首次执行不建议把页数调得很大。先看日志,再逐步调整。

9.3 输出文件在哪里

默认输出到:

output/
├── water_section_catalog.csv
├── water_section_catalog.json
├── water_section_catalog.sqlite3
├── water_section_conflicts.csv
└── raw/
    └── YYYYMMDD_HHMMSS/
        ├── wqx_p_06_offset_00000000.json
        ├── wqx_p_01_offset_00000000.json
        ├── wqx_p_01_offset_00001000.json
        └── ...

说明:

  • water_section_catalog.csv:适合直接查看。
  • water_section_catalog.json:适合程序继续处理。
  • water_section_catalog.sqlite3:适合 SQL 查询和增量更新。
  • water_section_conflicts.csv:同一站点编号存在不同内容时,供人工核查。
  • raw/:保留每一页原始响应,方便排错。

9.4 结果示例

接口数据会更新,以下内容只展示输出格式。实际结果以运行脚本时的公开接口返回为准。

station_id,section_name,river,region,monitoring_indicators
1724,歪仔歪桥,罗东溪,宜兰县 / 三星乡,"pH;溶解氧 [DO] (mg/L);氨氮 [NH3-N] (mg/L)"
<SiteId-2>,<SiteName-2>,<River-2>,<County-2> / <Township-2>,"<Indicator-A>;<Indicator-B>"
<SiteId-3>,<SiteName-3>,<River-3>,<County-3> / <Township-3>,"<Indicator-A>;<Indicator-C>"
<SiteId-4>,<SiteName-4>,<River-4>,<County-4> / <Township-4>,"<Indicator-B>"
<SiteId-5>,<SiteName-5>,<River-5>,<County-5> / <Township-5>,""

第五行指标为空并不一定是错误,可能意味着:

  • 本次采集的监测记录分页范围尚未覆盖该站点。
  • 最新公开批次暂时没有该站点记录。
  • 站点状态发生变化。
  • 数据源字段存在缺失。

脚本会在 indicator_source 中保留状态,避免把“没有采集到”误解为“这个站点绝对没有监测指标”。

9.5 用 SQLite 查询

进入 SQLite:

sqlite3 output/water_section_catalog.sqlite3

查看站点数量:

SELECT COUNT(*) AS station_count
FROM stations;

统计每条河流的站点数量:

SELECT
    river,
    COUNT(*) AS station_count
FROM stations
WHERE river <> ''
GROUP BY river
ORDER BY station_count DESC, river ASC;

找出缺少河流字段的站点:

SELECT
    station_id,
    section_name,
    region,
    basin
FROM stations
WHERE river = ''
ORDER BY region, section_name;

找出缺少指标集合的站点:

SELECT
    station_id,
    section_name,
    river,
    region,
    indicator_source
FROM stations
WHERE monitoring_indicators = ''
ORDER BY region, river, section_name;

统计地区覆盖:

SELECT
    county,
    COUNT(*) AS station_count
FROM stations
GROUP BY county
ORDER BY station_count DESC, county ASC;

9.6 自动化测试

创建 tests/test_cleaner.py

from water_catalog.cleaner import (
    build_indicator_index,
    deduplicate_stations,
    normalize_station,
)


def test_indicator_index_removes_duplicates() -> None:
    observations = [
        {
            "siteid": "A001",
            "itemname": "溶解氧",
            "itemengabbreviation": "DO",
            "itemunit": "mg/L",
        },
        {
            "siteid": "A001",
            "itemname": "溶解氧",
            "itemengabbreviation": "DO",
            "itemunit": "mg/L",
        },
        {
            "siteid": "A001",
            "itemname": "pH",
            "itemengabbreviation": "pH",
            "itemunit": "",
        },
    ]

    index = build_indicator_index(
        observations
    )

    assert index["A001"] == [
        "pH",
        "溶解氧 [DO] (mg/L)",
    ]


def test_missing_site_id_uses_hash_fallback() -> None:
    station = normalize_station(
        {
            "sitename": "东桥断面",
            "county": "示例市",
            "township": "示例区",
            "basin": "示例流域",
            "river": "清河",
        },
        indicator_index={},
    )

    assert station["station_id"].startswith(
        "HASH-"
    )
    assert station["section_name"] == (
        "东桥断面"
    )
    assert station["river"] == "清河"


def test_same_station_id_emits_conflict() -> None:
    first = normalize_station(
        {
            "siteid": "A001",
            "sitename": "东桥断面",
            "county": "示例市",
            "township": "示例区",
            "basin": "示例流域",
            "river": "清河",
        },
        indicator_index={},
    )

    second = normalize_station(
        {
            "siteid": "A001",
            "sitename": "西桥断面",
            "county": "示例市",
            "township": "示例区",
            "basin": "示例流域",
            "river": "清河",
        },
        indicator_index={},
    )

    stations, conflicts = (
        deduplicate_stations(
            [first, second]
        )
    )

    assert len(stations) == 1
    assert len(conflicts) == 1
    assert conflicts[0]["station_id"] == (
        "A001"
    )


def test_exact_duplicates_are_removed() -> None:
    row = normalize_station(
        {
            "siteid": "A001",
            "sitename": "东桥断面",
            "county": "示例市",
            "township": "示例区",
            "basin": "示例流域",
            "river": "清河",
        },
        indicator_index={},
    )

    stations, conflicts = (
        deduplicate_stations(
            [row, dict(row)]
        )
    )

    assert len(stations) == 1
    assert conflicts == []

运行测试:

pytest -q

预期输出:

4 passed

🔟 常见问题与排错(强烈建议写)

10.1 遇到 403 怎么办

403 Forbidden 表示服务端拒绝请求。

优先排查:

  1. API Key 是否填写正确。
  2. 是否误用了别人的 Key。
  3. 数据集代号是否拼写正确。
  4. 是否超过调用限制。
  5. 平台是否调整了访问规则。
  6. 请求入口是否仍然是公开 API。
  7. 是否频率过高,触发临时限制。

不要把 403 当作“需要更复杂伪装”的信号。

合理做法是停止任务、阅读公开说明、检查日志并降低频率。

10.2 遇到 429 怎么办

429 Too Many Requests 表示请求太密集或调用次数超限。

本文代码会:

  • 对 429 启用有限重试。
  • 尊重 Retry-After
  • 最终仍失败时停止任务。
  • 提示降低频率。

可以调整 .env

WATER_CATALOG_REQUEST_INTERVAL=2.5

不要为了抓一份目录开几十个线程。目录同步通常不值得这么做。

10.3 HTML 抓到空壳怎么办

如果以后切换到其他站点,requests.get() 返回的 HTML 只有骨架,没有数据,通常有三种原因:

  1. 数据由 JavaScript 动态加载。
  2. 页面通过 iframe 嵌套。
  3. 页面内部调用 JSON API。

排查顺序:

打开浏览器开发者工具
  ↓
切换 Network
  ↓
刷新页面
  ↓
筛选 Fetch / XHR
  ↓
查看返回 JSON 的请求
  ↓
确认接口是否公开、是否允许调用

如果公开 API 存在,优先使用 API。

只有在确实需要执行前端脚本时,再考虑 Playwright。

10.4 JSON 结构变化怎么办

本文 extract_records() 已经兼容常见外层字段:

("records", "data", "result", "results")

如果结构变化,脚本会抛出异常并提示顶层字段,而不是静默返回空列表。

排查时打开:

output/raw/<timestamp>/

查看原始 JSON。

这是保留 raw snapshot 的价值:你不需要重新发请求就能调试解析器。

10.5 编码或乱码怎么办

CSV 使用:

encoding="utf-8-sig"

这是为了让常见表格软件直接打开时更友好。

JSON 使用:

encoding="utf-8"
ensure_ascii=False

如果终端显示乱码,优先确认终端编码,而不是反复修改接口响应编码。

Windows PowerShell 可以尝试:

chcp 65001

10.6 为什么指标字段为空

常见原因包括:

  • 运行时使用了 --catalog-only
  • --observation-pages 设置太小。
  • 最新公开批次没有覆盖该断面。
  • API 返回结构变化。
  • 站点编号格式变化。
  • 明细数据量较大,当前任务只采集了受控快照。

可以逐步增加页数:

python -m water_catalog.cli --observation-pages 30

观察日志后再调整,不要一次性无限抓取。

10.7 经纬度为什么无法转换

公开数据中可能存在:

  • 空字符串。
  • 特殊符号。
  • 缺失值。
  • 非数字文本。

safe_float() 会返回 None,不会让整个任务崩溃。

后续可以查询缺失坐标:

SELECT
    station_id,
    section_name,
    river,
    region
FROM stations
WHERE longitude IS NULL
   OR latitude IS NULL;

10.8 为什么同一个站点编号会进入冲突报告

冲突报告不是报错文件,而是质量检查文件。

可能原因:

  • 同站点出现多个版本。
  • 断面更名。
  • 地址调整。
  • 状态变化。
  • 坐标修订。
  • 上游重复数据不一致。

请查看:

output/water_section_conflicts.csv

不要直接删掉冲突记录后假装数据完美。

1️⃣1️⃣ 进阶优化(可选但加分)

11.1 并发:先问自己是否真的需要

这个项目默认单线程。

如果未来需要处理大量详情页,可以考虑:

  • ThreadPoolExecutor
  • asyncio
  • aiohttp
  • Scrapy

但并发之前先确认:

  1. 目标服务是否允许。
  2. 单线程是否已经足够快。
  3. 是否设置全局限速。
  4. 是否支持失败重试。
  5. 是否会重复抓取。
  6. 是否记录成功率和失败率。

站点目录通常规模有限,单线程更稳。

11.2 断点续跑

当前代码会保留原始分页文件。下一步可以增加断点恢复:

output/raw/<timestamp>/wqx_p_01_offset_00005000.json

如果脚本在偏移量 5000 后失败,可以读取已有分页文件,只请求缺失页。

基本思路:

def raw_page_exists(
    raw_dir: Path,
    dataset: str,
    offset: int,
) -> bool:
    path = (
        raw_dir
        / f"{dataset}_offset_{offset:08d}.json"
    )
    return path.exists()

更进一步,可以建立 crawl_state 表:

CREATE TABLE crawl_state (
    dataset TEXT NOT NULL,
    offset INTEGER NOT NULL,
    status TEXT NOT NULL,
    updated_at TEXT NOT NULL,
    PRIMARY KEY (dataset, offset)
);

记录:

  • pending
  • success
  • failed

11.3 增量更新与变更检测

SQLite 已经使用 station_id 主键。下一步可以增加历史表:

CREATE TABLE station_history (
    id INTEGER PRIMARY KEY AUTOINCREMENT,
    station_id TEXT NOT NULL,
    content_hash TEXT NOT NULL,
    snapshot_json TEXT NOT NULL,
    collected_at TEXT NOT NULL,
    UNIQUE(station_id, content_hash)
);

每次同步时:

  1. 查询现有 content_hash
  2. 如果 hash 不变,不写历史。
  3. 如果 hash 变化,保存新快照。
  4. 输出变更报告。

这样可以回答:

  • 哪些断面新增了?
  • 哪些断面停用了?
  • 哪些断面改名了?
  • 哪些断面河流字段变化了?
  • 哪些断面坐标修订了?

11.4 日志与监控

一个长期运行的数据任务至少记录:

指标 用途
请求页数 判断任务规模
请求成功数 判断服务可用性
请求失败数 监控波动
原始记录数 监控上游数据量
去重后站点数 监控目录规模
冲突数 监控数据质量
缺少河流字段数量 监控完整性
缺少坐标数量 监控地图可用性
未关联指标站点数量 监控明细覆盖

可以增加一个质量报告:

def build_quality_report(
    stations: list[dict],
    conflicts: list[dict],
) -> dict:
    return {
        "station_count": len(stations),
        "conflict_count": len(conflicts),
        "missing_river_count": sum(
            1 for row in stations
            if not row["river"]
        ),
        "missing_coordinate_count": sum(
            1 for row in stations
            if row["longitude"] is None
            or row["latitude"] is None
        ),
        "missing_indicator_count": sum(
            1 for row in stations
            if not row["monitoring_indicators"]
        ),
    }

再保存:

report_path.write_text(
    json.dumps(
        report,
        ensure_ascii=False,
        indent=2,
    ),
    encoding="utf-8",
)

11.5 定时任务

Linux cron 示例:每周一凌晨 3 点执行一次。

0 3 * * 1 cd /path/to/water-section-catalog && /path/to/.venv/bin/python -m water_catalog.cli --observation-pages 20 >> output/cron.log 2>&1

Windows 可以使用任务计划程序:

程序:
C:\path\to\.venv\Scripts\python.exe

参数:
-m water_catalog.cli --observation-pages 20

起始于:
C:\path\to\water-section-catalog

定时任务的重点不是“跑起来”,而是:

  • 日志是否保留。
  • 失败是否可见。
  • API Key 是否安全。
  • 输出是否覆盖旧文件。
  • 是否保存历史版本。
  • 是否有调用量上限。
  • 是否需要人工复核冲突文件。

11.6 切换到 Scrapy 的时机

当项目出现以下特征时,可以考虑 Scrapy:

  • 需要抓取多个页面类型。
  • 需要队列管理。
  • 需要 URL 去重。
  • 需要并发控制。
  • 需要中间件。
  • 需要自动重试。
  • 需要断点续爬。
  • 需要 pipeline 写数据库。
  • 需要长期维护多个站点。

本文规模不大,requests 更清晰。

11.7 切换到 Playwright 的时机

只有遇到真正依赖浏览器执行的页面时,再使用 Playwright:

  • JavaScript 渲染后才出现数据。
  • 需要等待异步请求。
  • 页面没有公开 API。
  • 需要点击分页。
  • 需要处理 iframe。
  • 需要在合规范围内模拟普通页面操作。

Playwright 的代价包括:

  • 浏览器依赖更重。
  • Docker 镜像更大。
  • 运行速度更慢。
  • 排错更复杂。
  • 页面改版仍然会影响选择器。

能用 API,不要先上浏览器。

11.8 可视化扩展

站点目录有经纬度后,可以继续做地图。

例如用 pandas 检查坐标:

import pandas as pd


df = pd.read_csv(
    "output/water_section_catalog.csv"
)

valid = df.dropna(
    subset=["longitude", "latitude"]
)

print(
    valid[
        [
            "station_id",
            "section_name",
            "river",
            "region",
            "longitude",
            "latitude",
        ]
    ].head()
)

后续可以使用:

  • Folium
  • GeoPandas
  • QGIS
  • Kepler.gl
  • Leaflet
  • 数据库空间扩展

但在画地图之前,先检查坐标系、缺失值和异常点。地图很直观,也很容易把错误画得格外漂亮。

1️⃣2️⃣ 总结与延伸阅读

这篇实战完成了一件看似简单、实际上很有代表性的工作:将公开水质监测数据整理成稳定、可复用、可审计的断面目录。

最终完成的链路是:

公开 API
  ↓
低频分页采集
  ↓
原始 JSON 留档
  ↓
字段标准化
  ↓
监测指标聚合
  ↓
站点编号主键去重
  ↓
内容哈希辅助去重
  ↓
冲突报告
  ↓
CSV / JSON / SQLite 导出

核心字段包括:

断面名、河流、地区、监测指标、站点编号

这个案例值得复用的地方,不是某一个接口地址,而是数据工程思路:

  1. 优先寻找公开、文档化的数据入口。
  2. 不为了展示技术而引入不必要的复杂度。
  3. 请求层必须有超时、有限重试、退避和频控。
  4. 缺失值应当诚实保留,不要擅自补造。
  5. 去重不只是删除重复行,还要发现冲突。
  6. 原始数据应当留档,便于复盘。
  7. 导出文件和数据库各有用途。
  8. 长期任务要考虑日志、变更检测和断点续跑。

下一步可以继续扩展:

  • 增加站点历史版本表。
  • 输出新增、修改、停用断面对比报告。
  • 用 Folium 生成交互地图。
  • 将 SQLite 切换为 PostgreSQL 或 MySQL。
  • 增加定时任务和失败通知。
  • 将同一套框架迁移到空气质量站、气象站或雨量站。
  • 在页面没有公开 API 时,再引入 BeautifulSoup、lxml、Scrapy 或 Playwright。

我很喜欢这种目录型项目。它没有炫目的算法,也没有夸张的并发,但每一处细节都直接影响数据是否可信。把基础工作做好,后面的分析、地图、报表和自动化才真正有价值。

附录:完整运行清单

第一步:创建环境

python -m venv .venv
source .venv/bin/activate
pip install -r requirements.txt

Windows PowerShell:

python -m venv .venv
.venv\Scripts\Activate.ps1
pip install -r requirements.txt

第二步:配置 API Key

cp .env.example .env

编辑 .env

MOENV_API_KEY=replace_with_your_own_api_key

第三步:运行测试

pytest -q

第四步:只抓目录

python -m water_catalog.cli --catalog-only

第五步:关联监测指标

python -m water_catalog.cli --observation-pages 20

第六步:查看 CSV

output/water_section_catalog.csv

第七步:查看冲突报告

output/water_section_conflicts.csv

第八步:查询数据库

sqlite3 output/water_section_catalog.sqlite3
SELECT
    station_id,
    section_name,
    river,
    region,
    monitoring_indicators
FROM stations
ORDER BY region, river, section_name
LIMIT 20;

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐