㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。

全文目录:

🌟 开篇语

哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟

  我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略反爬对抗,从数据清洗分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上

  📌 专栏食用指南(建议收藏)

  • ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
  • ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
  • ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
  • ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用

📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
  
💕订阅后更新会优先推送,按目录学习更高效💯~

0️⃣ 前言(Preface)

这篇文章要做的事情很明确:用 Python 采集公开潮位站点索引,抽取站点名、海域、坐标、观测项目、更新时间等元数据,并把每次采集结果保存成可追溯的快照。

读完这篇文章,你可以获得:

  • 一套从公开接口采集潮位站点元数据的完整工程写法;
  • 一套适合“站点索引快照归档”的 SQLite/CSV/JSON 存储方案;
  • 一些我自己在写爬虫时比较看重的细节:频率控制、失败重试、字段容错、去重和断点思路。

很多爬虫文章喜欢一上来就堆并发、堆代理、堆框架,但我一直觉得,真正稳定的采集任务,第一步不是“快”,而是“准”和“可回滚”。潮位站点元数据这种东西,更新频率通常不需要秒级追踪,适合做成定时快照:每天或每周拉一次,把站点索引沉淀下来,后面做地图展示、站点检索、数据质量分析都会省很多事。

1️⃣ 摘要(Abstract)

本文基于 NOAA CO-OPS 公开潮汐与水位站点 Metadata API,使用 Python 的 requests 完成站点索引采集,解析站点名、海域、坐标、观测项目、快照更新时间,并导出到 SQLite、CSV 和 JSON 文件。

读完后你将掌握:

  • 如何判断一个潮位站点索引任务更适合“API 采集”而不是“页面解析”;
  • 如何设计请求层、解析层、清洗层、存储层,使代码可维护、可复跑;
  • 如何用快照归档思路保存站点元数据变更,而不是每次简单覆盖旧结果。

本文不是为了鼓励无节制抓取,也不讨论绕过限制、批量撞接口、规避风控之类做法。它只围绕公开数据、合理频率、技术复现展开。

2️⃣ 背景与需求(Why)

2.1 为什么要爬潮位站点索引?

潮位站,也可以理解为水位、潮汐、港口或近岸观测站点的一类基础设施。对数据分析来说,站点索引本身就是一个很有价值的元数据表。

它不一定直接回答“某一刻水位是多少”,但它能回答:

  • 哪些站点存在?
  • 每个站点在哪里?
  • 每个站点属于哪个海域或区域?
  • 这个站点有哪些观测或产品能力?
  • 本次采集是什么时候完成的?
  • 与上一次快照相比,站点是否新增、移除、改名或产品能力发生变化?

这些问题看似基础,但在后续业务中非常关键。比如你想做潮位观测可视化,如果没有站点索引,就无法稳定地在地图上定位站点;你想批量拉取水位时间序列,如果没有站点能力清单,就容易对不支持该产品的站点发起无效请求;你想做长期归档,如果没有快照时间和 hash,就很难知道某一次数据变化是源站变化,还是你自己的代码解析错了。

2.2 本文目标

本文实现一个“潮位站公开站点索引与快照”项目,目标字段如下:

中文字段 程序字段 说明
站点名 station_name 公开站点名称
海域 sea_area 由站点区域、州/地区、是否五大湖等字段推断
坐标 latitude / longitude 十进制度坐标
观测项目 observed_items 站点支持的产品或观测能力
更新时间 snapshot_at / source_updated_at 本地快照时间,以及可选的源响应时间

这里要强调一点:很多公开元数据接口不会给每个站点提供一个严格意义上的“站点更新时间”。这种情况下,比较诚实的工程写法是保存“本次快照采集时间”,并在 HTTP 响应头里尝试记录 Last-ModifiedDate。如果源站没有明确给出站点级更新时间,不要自己编造一个看起来很像真的时间字段。

2.3 本文选用的数据源

本文使用 NOAA CO-OPS 公开 Metadata API。它是面向潮汐、水位、海洋与气象相关站点的公开服务,适合做站点索引采集。本文把它当成“公开潮位站点索引”的样例来源。

这个任务的核心不是下载历史水位时间序列,而是归档站点元数据。也就是说,我们主要采集的是“站点清单”和“站点产品能力”,不是连续观测值。

3️⃣ 合规与注意事项(必写)

写爬虫之前,我一般会先做一件没那么刺激但很必要的事:确认采集对象是不是公开信息,以及有没有清晰的使用边界。

3.1 robots.txt 基本说明

robots.txt 是站点给自动化访问程序提供访问建议的一种约定。严格说,它不是登录鉴权,也不是权限系统,但它表达了站点希望哪些路径被访问、哪些路径不要被自动访问。

在实际开发中,我会按下面的原则处理:

  1. 先看目标域名根路径下的 robots.txt
  2. 如果某路径明确禁止抓取,就不要抓;
  3. 如果目标站点提供了官方 API,优先走 API,而不是硬解析页面;
  4. 即使没有明确禁止,也不要做高频、高并发、无边界扫描;
  5. 不采集和主题无关的数据。

本文使用的是公开 API,属于更适合自动化访问的方式。相比 HTML 页面,API 的结构更稳定,字段含义更清晰,也更容易控制请求规模。

3.2 频率控制

这个项目的采集对象是站点元数据,不是秒级行情。没有必要做攻击式并发,也没有必要把请求压到极限。

本文默认策略:

  • 列表接口请求一次;
  • 每个站点的产品详情请求可选;
  • 连续详情请求之间默认 sleep 0.8 秒;
  • 对 429、500、502、503、504 做有限重试;
  • 使用指数退避,避免失败时越打越快。

我自己的习惯是,先让任务稳定跑完,再考虑速度。尤其是公共服务,别把“能跑”误解成“可以猛跑”。

3.3 不采集敏感信息

本文只采集公开站点元数据,例如站点名、坐标、观测产品等。不采集个人信息,不绕过登录,不访问付费内容,不模拟用户权限,不做批量猜测,也不做漏洞探测。

如果你的目标站点换成其他平台,也建议保持同样原则:

  • 公开页面或公开 API 才采集;
  • 不绕过验证码、登录、付费墙;
  • 不采集身份证、手机号、邮箱、账号资料等敏感字段;
  • 不把采集程序用于压测目标站;
  • 不将数据用于和原始授权不一致的场景。

4️⃣ 技术选型与整体流程(What/How)

4.1 静态、动态还是 API?

常见采集方式可以分成三类:

类型 特征 适合工具
静态页面 HTML 中直接包含目标数据 requests + BeautifulSoup / lxml
动态页面 浏览器执行 JS 后才出现数据 Playwright / Selenium
API 接口 返回 JSON/XML/CSV 等结构化数据 requests / httpx

本文属于第三类:API 采集。

原因很简单,目标数据源已经提供了结构化 Metadata API。此时再用 Playwright 打开网页、等待 JS 渲染、从 DOM 里扣字段,反而复杂、慢,而且更容易受页面改版影响。

4.2 整体流程

本文流程可以概括为:

采集 Fetch
  ↓
解析 Parse
  ↓
清洗 Normalize
  ↓
去重 Deduplicate
  ↓
存储 Store
  ↓
导出 Export

更具体一点:

1. 请求站点列表接口
2. 获取 station_id、name、lat、lng、state、products URL 等基础字段
3. 可选:逐个请求 products 接口,补充观测项目
4. 将 state / greatlakes 等信息转换为 sea_area
5. 对原始元数据计算 hash
6. 写入 SQLite 快照表
7. 导出 CSV 和 JSON
8. 下次运行时保留新的 run_id,形成历史快照

4.3 为什么选择 requests,而不是 Scrapy 或 Playwright?

这次我选择 requests,理由有三个:

第一,目标是公开 JSON API,requests 足够稳定。

第二,任务规模不大。潮位站点索引属于元数据,通常几百到几千条,不需要一上来就引入 Scrapy 的调度器、队列、中间件和 Item Pipeline。

第三,快照归档更看重可控性。用 requests.SessionRetry、SQLite,代码结构清楚,后期改成定时任务也轻松。

如果以后你要采集多个国家、多个站点体系,或者需要大规模调度,再上 Scrapy 会更合适。如果目标页面完全依赖前端渲染,再考虑 Playwright。

5️⃣ 环境准备与依赖安装(可复现)

5.1 Python 版本

建议使用:

Python 3.10+

我建议至少 3.10,是因为类型标注、路径处理和标准库体验都比较舒服。3.11 或 3.12 也可以。

5.2 安装依赖

本文尽量少引入第三方包,只需要:

pip install requests

如果你希望用虚拟环境:

python -m venv .venv

# macOS / Linux
source .venv/bin/activate

# Windows PowerShell
.venv\Scripts\Activate.ps1

pip install requests

5.3 推荐项目结构

tide_station_snapshot/
├── README.md
├── requirements.txt
├── data/
│   ├── raw/
│   ├── exports/
│   └── tide_stations.sqlite3
├── src/
│   └── tide_snapshot/
│       ├── __init__.py
│       ├── config.py
│       ├── fetcher.py
│       ├── parser.py
│       ├── storage.py
│       └── runner.py
└── tests/
    └── test_parser.py

先创建目录:

mkdir -p tide_station_snapshot/src/tide_snapshot
mkdir -p tide_station_snapshot/data/raw
mkdir -p tide_station_snapshot/data/exports
mkdir -p tide_station_snapshot/tests
cd tide_station_snapshot
touch src/tide_snapshot/__init__.py

requirements.txt

requests>=2.31.0

安装:

pip install -r requirements.txt

6️⃣ 核心实现:请求层(Fetcher)

请求层是爬虫的地基。不要把请求、解析、入库全塞在一个文件里。短脚本当然能跑,但稍微遇到失败重试、字段变更、导出格式扩展,就会开始混乱。

本文请求层负责:

  • 统一 headers;
  • 统一 timeout;
  • 使用 session 复用连接;
  • 处理 429/5xx 重试;
  • 做基础频率控制;
  • 返回 JSON 和响应头信息。

6.1 config.py

# src/tide_snapshot/config.py

from pathlib import Path

PROJECT_ROOT = Path(__file__).resolve().parents[2]

DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
EXPORT_DIR = DATA_DIR / "exports"
DB_PATH = DATA_DIR / "tide_stations.sqlite3"

COOPS_MDAPI_BASE = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi"

DEFAULT_HEADERS = {
    "User-Agent": (
        "tide-station-snapshot/1.0 "
        "(metadata archiving demo; contact: example@example.com)"
    ),
    "Accept": "application/json,text/plain,*/*",
    "Referer": "https://tidesandcurrents.noaa.gov/",
    "Connection": "keep-alive",
}

DEFAULT_TIMEOUT = 20
DEFAULT_SLEEP_SECONDS = 0.8

这里的 User-Agent 不建议伪装成浏览器。很多人喜欢复制 Chrome UA,我个人不太喜欢这种写法。既然是公开接口采集,最好说明程序用途。真实项目里可以把联系邮箱换成团队邮箱。

6.2 fetcher.py

# src/tide_snapshot/fetcher.py

from __future__ import annotations

import json
import time
from dataclasses import dataclass
from typing import Any

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

from tide_snapshot.config import DEFAULT_HEADERS, DEFAULT_SLEEP_SECONDS, DEFAULT_TIMEOUT


@dataclass(frozen=True)
class FetchResult:
    url: str
    status_code: int
    headers: dict[str, str]
    json_data: dict[str, Any]


class FetchError(RuntimeError):
    pass


class HttpFetcher:
    """
    A small HTTP fetcher for public metadata APIs.

    Features:
    - requests.Session connection reuse
    - custom headers
    - timeout
    - retry with exponential backoff
    - polite sleep between calls
    """

    def __init__(
        self,
        timeout: int = DEFAULT_TIMEOUT,
        sleep_seconds: float = DEFAULT_SLEEP_SECONDS,
        retries: int = 3,
        backoff_factor: float = 0.8,
    ) -> None:
        self.timeout = timeout
        self.sleep_seconds = sleep_seconds
        self._last_request_at = 0.0

        self.session = requests.Session()
        self.session.headers.update(DEFAULT_HEADERS)

        retry = Retry(
            total=retries,
            connect=retries,
            read=retries,
            status=retries,
            allowed_methods=frozenset(["GET"]),
            status_forcelist=(429, 500, 502, 503, 504),
            backoff_factor=backoff_factor,
            respect_retry_after_header=True,
            raise_on_status=False,
        )

        adapter = HTTPAdapter(max_retries=retry, pool_connections=5, pool_maxsize=10)
        self.session.mount("https://", adapter)
        self.session.mount("http://", adapter)

    def _pace(self) -> None:
        """
        Keep a small interval between requests.
        This is intentionally simple and enough for metadata snapshots.
        """
        now = time.monotonic()
        elapsed = now - self._last_request_at
        wait = self.sleep_seconds - elapsed
        if wait > 0:
            time.sleep(wait)
        self._last_request_at = time.monotonic()

    def get_json(self, url: str, params: dict[str, Any] | None = None) -> FetchResult:
        self._pace()

        try:
            response = self.session.get(url, params=params, timeout=self.timeout)
        except requests.RequestException as exc:
            raise FetchError(f"Request failed: {url}; reason={exc}") from exc

        if response.status_code >= 400:
            text = response.text[:500].replace("\n", " ")
            raise FetchError(
                f"Bad response: status={response.status_code}, url={response.url}, body={text}"
            )

        try:
            payload = response.json()
        except json.JSONDecodeError as exc:
            text = response.text[:500].replace("\n", " ")
            raise FetchError(
                f"JSON decode failed: url={response.url}, body={text}"
            ) from exc

        if not isinstance(payload, dict):
            raise FetchError(f"Unexpected JSON root type: {type(payload).__name__}")

        return FetchResult(
            url=response.url,
            status_code=response.status_code,
            headers=dict(response.headers),
            json_data=payload,
        )

6.3 请求层关键点说明

headers

本文设置了:

  • User-Agent:说明程序用途;
  • Accept:明确希望返回 JSON;
  • Referer:给源站一个上下文;
  • Connection:保持连接复用。

headers 不是为了“伪装”,而是为了让请求更规范。真正的稳定采集,靠的是公开数据、合理频率、错误处理,而不是把 UA 写得像某个浏览器。

timeout

没有 timeout 的爬虫,迟早会卡死。

本文设置默认 20 秒。对于元数据接口来说,20 秒已经比较宽松。如果你的网络环境稳定,可以改成 10 秒。

session/cookie

本文不需要登录,也不需要 cookie。但仍然使用 requests.Session,主要是为了复用连接和统一 headers。

如果目标站点需要登录或 cookie,而数据又不是公开数据,那就要重新评估是否适合采集。本文不涉及这类场景。

失败处理

本文通过 Retry 处理:

  • 429:请求过多;
  • 500:服务器内部错误;
  • 502/503/504:网关或服务暂时不可用。

重试不是让你一直打接口,而是让偶发波动不至于中断整个任务。重试次数一定要有限,失败日志一定要保留。

7️⃣ 核心实现:解析层(Parser)

解析层负责把源站返回的 JSON 转换成我们自己的统一结构。

7.1 解析方式

本文是 JSON API,所以不用 XPath、CSS Selector、BeautifulSoup 或 lxml。

但解析思想是一样的:

  • 先拿列表;
  • 再拿详情;
  • 字段缺失要容错;
  • 输出结构要稳定。

7.2 字段容错原则

公开接口有时会出现两类变化:

第一,字段名变化。例如有的文档写 stationList,实际 JSON 里可能是 stations

第二,字段值缺失。例如部分站点没有 state,或者 products 接口临时没有返回产品列表。

所以解析层不要写死得太脆:

payload["stations"]

更稳一点:

payload.get("stations") or payload.get("stationList") or []

7.3 parser.py

# src/tide_snapshot/parser.py

from __future__ import annotations

import hashlib
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any


SEA_AREA_BY_STATE: dict[str, str] = {
    # Pacific Coast
    "CA": "US Pacific Coast",
    "OR": "US Pacific Coast",
    "WA": "US Pacific Coast",

    # Alaska and Pacific Islands
    "AK": "Alaska Waters",
    "HI": "Central Pacific / Hawaii",
    "AS": "Pacific Islands",
    "GU": "Pacific Islands",
    "MP": "Pacific Islands",

    # Atlantic Coast
    "ME": "US Atlantic Coast",
    "NH": "US Atlantic Coast",
    "MA": "US Atlantic Coast",
    "RI": "US Atlantic Coast",
    "CT": "US Atlantic Coast",
    "NY": "US Atlantic Coast",
    "NJ": "US Atlantic Coast",
    "DE": "US Atlantic Coast",
    "MD": "US Atlantic Coast",
    "VA": "US Atlantic Coast",
    "NC": "US Atlantic Coast",
    "SC": "US Atlantic Coast",
    "GA": "US Atlantic Coast",

    # Gulf Coast
    "AL": "US Gulf Coast",
    "MS": "US Gulf Coast",
    "LA": "US Gulf Coast",
    "TX": "US Gulf Coast",

    # Florida spans both Atlantic and Gulf coasts.
    # Without a polygon or station-specific coast classification,
    # keep a neutral label.
    "FL": "US Atlantic / Gulf Coast",

    # Caribbean
    "PR": "Caribbean",
    "VI": "Caribbean",
}


@dataclass
class StationSnapshot:
    station_id: str
    station_name: str
    sea_area: str
    latitude: float | None
    longitude: float | None
    observed_items: list[str]
    source_updated_at: str | None
    snapshot_at: str
    source_url: str
    raw_hash: str
    raw_json: dict[str, Any]

    def to_public_dict(self) -> dict[str, Any]:
        data = asdict(self)
        data["observed_items"] = ";".join(self.observed_items)
        data.pop("raw_json", None)
        return data


def utc_now_iso() -> str:
    return datetime.now(timezone.utc).replace(microsecond=0).isoformat()


def stable_hash(data: Any) -> str:
    """
    Create a stable SHA256 hash for raw JSON.
    Useful for detecting metadata changes.
    """
    text = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
    return hashlib.sha256(text.encode("utf-8")).hexdigest()


def first_list(payload: dict[str, Any], *keys: str) -> list[Any]:
    for key in keys:
        value = payload.get(key)
        if isinstance(value, list):
            return value
    return []


def parse_station_list(payload: dict[str, Any]) -> list[dict[str, Any]]:
    """
    NOAA MDAPI documentation may describe station collection as stationList,
    while JSON responses can use stations in some endpoints.
    Keep both for compatibility.
    """
    stations = first_list(payload, "stations", "stationList")
    return [item for item in stations if isinstance(item, dict)]


def parse_product_names(payload: dict[str, Any]) -> list[str]:
    """
    Products response usually contains a product collection.
    Keep this parser flexible.
    """
    products = first_list(payload, "products", "productList", "ProductList")

    names: list[str] = []
    for item in products:
        if not isinstance(item, dict):
            continue

        name = (
            item.get("name")
            or item.get("productName")
            or item.get("title")
            or item.get("value")
        )

        if name is None:
            continue

        name_text = str(name).strip()
        if name_text and name_text not in names:
            names.append(name_text)

    return names


def infer_sea_area(station: dict[str, Any]) -> str:
    """
    NOAA station metadata has state/region and greatlakes flags,
    but does not always provide a direct 'sea area' field.
    So we derive a practical sea_area label for indexing.
    """
    if station.get("greatlakes") is True:
        return "Great Lakes"

    state = str(station.get("state") or "").strip().upper()
    if state in SEA_AREA_BY_STATE:
        return SEA_AREA_BY_STATE[state]

    # Some APIs may return region-like values instead of US state codes.
    name = str(station.get("name") or "").lower()
    if "lake" in name:
        return "Lake / Inland Water"
    if state:
        return f"Region: {state}"

    return "Unknown"


def parse_float(value: Any) -> float | None:
    if value is None:
        return None

    try:
        return float(value)
    except (TypeError, ValueError):
        return None


def fallback_observed_items(station: dict[str, Any], station_type: str) -> list[str]:
    """
    If product endpoint is not requested or returns empty,
    create a conservative observed_items list from station type and flags.
    """
    items: list[str] = []

    if station_type == "waterlevels":
        items.append("water_level")
    elif station_type == "tidepredictions":
        items.append("tide_predictions")
    elif station_type == "met":
        items.append("meteorological")
    elif station_type == "currents":
        items.append("currents")
    else:
        items.append(station_type)

    if station.get("tidal") is True and "tide_related" not in items:
        items.append("tide_related")

    if station.get("forecast") is True:
        items.append("forecast")

    if station.get("outlook") is True:
        items.append("high_tide_flooding_outlook")

    if station.get("HTFhistorical") is True:
        items.append("high_tide_flooding_historical")

    return items


def build_station_snapshot(
    station: dict[str, Any],
    source_url: str,
    station_type: str,
    product_names: list[str] | None = None,
    source_updated_at: str | None = None,
    snapshot_at: str | None = None,
) -> StationSnapshot:
    station_id = str(station.get("id") or "").strip()
    station_name = str(station.get("name") or "").strip()

    if not station_id:
        raise ValueError(f"Missing station id in station record: {station}")
    if not station_name:
        station_name = f"Unnamed Station {station_id}"

    observed_items = product_names or fallback_observed_items(station, station_type)

    raw_for_hash = {
        "station": station,
        "observed_items": observed_items,
        "station_type": station_type,
    }

    return StationSnapshot(
        station_id=station_id,
        station_name=station_name,
        sea_area=infer_sea_area(station),
        latitude=parse_float(station.get("lat")),
        longitude=parse_float(station.get("lng")),
        observed_items=observed_items,
        source_updated_at=source_updated_at,
        snapshot_at=snapshot_at or utc_now_iso(),
        source_url=source_url,
        raw_hash=stable_hash(raw_for_hash),
        raw_json=raw_for_hash,
    )

7.4 列表页如何拿详情链接?

虽然这里不是 HTML 页面,但“列表页拿详情链接”的逻辑仍然存在。站点列表接口通常会返回类似下面的信息:

{
  "id": "9414290",
  "name": "San Francisco",
  "lat": 37.806305,
  "lng": -122.46589,
  "products": {
    "self": "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations/9414290/products.json"
  },
  "self": "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations/9414290.json"
}

其中:

  • self 可以视为站点详情链接;
  • products.self 可以视为站点观测产品链接。

实际项目里,不要强依赖某个嵌套字段一定存在。可以优先使用接口返回的 URL,如果没有,就根据站点 ID 拼接产品接口。

7.5 详情页如何抽字段?

本文的详情主要是 products 信息。产品接口返回后,解析 productsproductList,提取每个产品的 name 字段,形成 observed_items

如果不请求产品详情,就根据站点类型和基础字段做保守推断:

  • waterlevelswater_level
  • tidepredictionstide_predictions
  • metmeteorological
  • currentscurrents

这不是完美分类,但足够作为索引层的基础字段。真正严谨的产品能力,还是建议请求 products 接口。

7.6 缺失字段怎么办?

缺失字段不要让整个任务崩掉。

本文策略:

  • station_id 缺失:跳过或抛出错误,因为无法作为主键;
  • station_name 缺失:生成 Unnamed Station {id}
  • lat/lng 缺失:保存为 NULL
  • sea_area 无法判断:保存 Unknown
  • observed_items 缺失:根据站点类型生成兜底值;
  • source_updated_at 缺失:保存为 NULL,同时保留 snapshot_at

这种处理方式比较朴素,但稳定。数据工程里最怕的是字段一缺失,整个任务直接中断;更怕的是缺失时悄悄填一个假值,后面分析的人根本不知道。

8️⃣ 数据存储与导出(Storage)

本文选择 SQLite 起步,同时导出 CSV 和 JSON。

为什么不是 MySQL?

因为这个任务是站点元数据快照,不是高并发在线业务。SQLite 单文件、零服务、方便迁移,适合本地归档、定时任务和轻量分析。后期如果要接 Web 服务或多人协作,再迁移 MySQL/PostgreSQL 也不迟。

8.1 字段映射表

字段名 类型 示例值 说明
run_id TEXT 20260610T120000Z 本次采集批次 ID
station_id TEXT 9414290 站点唯一 ID
station_name TEXT San Francisco 站点名
sea_area TEXT US Pacific Coast 推断海域
latitude REAL 37.806305 纬度
longitude REAL -122.46589 经度
observed_items_json TEXT ["water_level"] 观测项目 JSON
source_updated_at TEXT Wed, 10 Jun 2026 12:00:00 GMT 源响应时间,可为空
snapshot_at TEXT 2026-06-10T12:00:00+00:00 本地快照时间
source_url TEXT API URL 来源 URL
raw_hash TEXT SHA256 原始内容 hash
raw_json TEXT JSON 字符串 原始元数据

8.2 去重策略

本文采用两层策略:

第一层:同一批次内,run_id + station_id 唯一。

第二层:跨批次,通过 raw_hash 判断元数据是否变化。

如果你每次都想保留完整快照,就每次插入所有站点。如果你只想保存变化记录,可以启用 --skip-unchanged,当某站点 hash 与上次一致时跳过写入。

我更倾向于早期保留完整快照。磁盘成本不高,排查问题时很舒服。等任务稳定后,再考虑只保存变化记录。

8.3 storage.py

# src/tide_snapshot/storage.py

from __future__ import annotations

import csv
import json
import sqlite3
from pathlib import Path
from typing import Iterable

from tide_snapshot.parser import StationSnapshot


SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS station_snapshot (
    run_id TEXT NOT NULL,
    station_id TEXT NOT NULL,
    station_name TEXT NOT NULL,
    sea_area TEXT NOT NULL,
    latitude REAL,
    longitude REAL,
    observed_items_json TEXT NOT NULL,
    source_updated_at TEXT,
    snapshot_at TEXT NOT NULL,
    source_url TEXT NOT NULL,
    raw_hash TEXT NOT NULL,
    raw_json TEXT NOT NULL,
    created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
    PRIMARY KEY (run_id, station_id)
);

CREATE INDEX IF NOT EXISTS idx_station_snapshot_station_id
ON station_snapshot(station_id);

CREATE INDEX IF NOT EXISTS idx_station_snapshot_raw_hash
ON station_snapshot(raw_hash);

CREATE TABLE IF NOT EXISTS station_latest (
    station_id TEXT PRIMARY KEY,
    latest_hash TEXT NOT NULL,
    last_seen_run_id TEXT NOT NULL,
    last_seen_at TEXT NOT NULL
);
"""


def connect_db(db_path: Path) -> sqlite3.Connection:
    db_path.parent.mkdir(parents=True, exist_ok=True)
    conn = sqlite3.connect(str(db_path))
    conn.execute("PRAGMA journal_mode=WAL;")
    conn.execute("PRAGMA synchronous=NORMAL;")
    return conn


def init_db(conn: sqlite3.Connection) -> None:
    conn.executescript(SCHEMA_SQL)
    conn.commit()


def has_same_latest_hash(
    conn: sqlite3.Connection,
    station_id: str,
    raw_hash: str,
) -> bool:
    row = conn.execute(
        "SELECT latest_hash FROM station_latest WHERE station_id = ?",
        (station_id,),
    ).fetchone()

    return bool(row and row[0] == raw_hash)


def save_snapshot(
    conn: sqlite3.Connection,
    run_id: str,
    snapshot: StationSnapshot,
    skip_unchanged: bool = False,
) -> bool:
    """
    Save a single station snapshot.

    Returns:
        True if inserted;
        False if skipped because unchanged.
    """
    if skip_unchanged and has_same_latest_hash(
        conn, snapshot.station_id, snapshot.raw_hash
    ):
        return False

    conn.execute(
        """
        INSERT OR REPLACE INTO station_snapshot (
            run_id,
            station_id,
            station_name,
            sea_area,
            latitude,
            longitude,
            observed_items_json,
            source_updated_at,
            snapshot_at,
            source_url,
            raw_hash,
            raw_json
        )
        VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
        """,
        (
            run_id,
            snapshot.station_id,
            snapshot.station_name,
            snapshot.sea_area,
            snapshot.latitude,
            snapshot.longitude,
            json.dumps(snapshot.observed_items, ensure_ascii=False),
            snapshot.source_updated_at,
            snapshot.snapshot_at,
            snapshot.source_url,
            snapshot.raw_hash,
            json.dumps(snapshot.raw_json, ensure_ascii=False, sort_keys=True),
        ),
    )

    conn.execute(
        """
        INSERT INTO station_latest (
            station_id,
            latest_hash,
            last_seen_run_id,
            last_seen_at
        )
        VALUES (?, ?, ?, ?)
        ON CONFLICT(station_id)
        DO UPDATE SET
            latest_hash = excluded.latest_hash,
            last_seen_run_id = excluded.last_seen_run_id,
            last_seen_at = excluded.last_seen_at
        """,
        (
            snapshot.station_id,
            snapshot.raw_hash,
            run_id,
            snapshot.snapshot_at,
        ),
    )

    return True


def save_snapshots(
    conn: sqlite3.Connection,
    run_id: str,
    snapshots: Iterable[StationSnapshot],
    skip_unchanged: bool = False,
) -> tuple[int, int]:
    inserted = 0
    skipped = 0

    with conn:
        for snapshot in snapshots:
            ok = save_snapshot(
                conn=conn,
                run_id=run_id,
                snapshot=snapshot,
                skip_unchanged=skip_unchanged,
            )
            if ok:
                inserted += 1
            else:
                skipped += 1

    return inserted, skipped


def load_run_rows(conn: sqlite3.Connection, run_id: str) -> list[dict]:
    cursor = conn.execute(
        """
        SELECT
            run_id,
            station_id,
            station_name,
            sea_area,
            latitude,
            longitude,
            observed_items_json,
            source_updated_at,
            snapshot_at,
            source_url,
            raw_hash
        FROM station_snapshot
        WHERE run_id = ?
        ORDER BY station_id
        """,
        (run_id,),
    )

    columns = [desc[0] for desc in cursor.description]
    rows: list[dict] = []

    for record in cursor.fetchall():
        row = dict(zip(columns, record))
        try:
            items = json.loads(row["observed_items_json"])
        except json.JSONDecodeError:
            items = []

        row["observed_items"] = ";".join(str(x) for x in items)
        row.pop("observed_items_json", None)
        rows.append(row)

    return rows


def export_csv(rows: list[dict], output_path: Path) -> None:
    output_path.parent.mkdir(parents=True, exist_ok=True)

    if not rows:
        output_path.write_text("", encoding="utf-8")
        return

    fieldnames = list(rows[0].keys())

    with output_path.open("w", newline="", encoding="utf-8-sig") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(rows)


def export_json(rows: list[dict], output_path: Path) -> None:
    output_path.parent.mkdir(parents=True, exist_ok=True)
    output_path.write_text(
        json.dumps(rows, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

9️⃣ 运行方式与结果展示(必写)

9.1 runner.py

这个文件是入口程序,负责把请求层、解析层、存储层串起来。

# src/tide_snapshot/runner.py

from __future__ import annotations

import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any

from tide_snapshot.config import COOPS_MDAPI_BASE, DB_PATH, EXPORT_DIR, RAW_DIR
from tide_snapshot.fetcher import FetchError, HttpFetcher
from tide_snapshot.parser import (
    StationSnapshot,
    build_station_snapshot,
    parse_product_names,
    parse_station_list,
    utc_now_iso,
)
from tide_snapshot.storage import (
    connect_db,
    export_csv,
    export_json,
    init_db,
    load_run_rows,
    save_snapshots,
)


def make_run_id() -> str:
    return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")


def get_source_updated_at(headers: dict[str, str]) -> str | None:
    """
    Prefer Last-Modified. If not available, keep Date header.
    If neither exists, return None.
    """
    return headers.get("Last-Modified") or headers.get("Date")


def station_products_url(station: dict[str, Any]) -> str | None:
    products = station.get("products")
    if isinstance(products, dict):
        value = products.get("self")
        if isinstance(value, str) and value.startswith("http"):
            return value

    station_id = station.get("id")
    if station_id:
        return f"{COOPS_MDAPI_BASE}/stations/{station_id}/products.json"

    return None


def fetch_product_names(
    fetcher: HttpFetcher,
    station: dict[str, Any],
    raw_dir: Path,
    run_id: str,
) -> list[str]:
    url = station_products_url(station)
    station_id = str(station.get("id") or "unknown")

    if not url:
        return []

    try:
        result = fetcher.get_json(url)
    except FetchError as exc:
        print(f"[WARN] product request failed: station_id={station_id}; {exc}")
        return []

    raw_path = raw_dir / run_id / f"products_{station_id}.json"
    raw_path.parent.mkdir(parents=True, exist_ok=True)
    raw_path.write_text(
        json.dumps(result.json_data, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

    return parse_product_names(result.json_data)


def collect_station_snapshots(
    station_type: str,
    limit: int | None,
    with_products: bool,
    sleep_seconds: float,
) -> tuple[str, list[StationSnapshot]]:
    run_id = make_run_id()
    snapshot_at = utc_now_iso()

    fetcher = HttpFetcher(sleep_seconds=sleep_seconds)

    list_url = f"{COOPS_MDAPI_BASE}/stations.json"
    params = {
        "type": station_type,
        "units": "metric",
    }

    print(f"[INFO] run_id={run_id}")
    print(f"[INFO] requesting station list: type={station_type}")

    list_result = fetcher.get_json(list_url, params=params)
    source_updated_at = get_source_updated_at(list_result.headers)

    raw_run_dir = RAW_DIR / run_id
    raw_run_dir.mkdir(parents=True, exist_ok=True)

    (raw_run_dir / f"stations_{station_type}.json").write_text(
        json.dumps(list_result.json_data, ensure_ascii=False, indent=2),
        encoding="utf-8",
    )

    stations = parse_station_list(list_result.json_data)

    if limit is not None and limit > 0:
        stations = stations[:limit]

    print(f"[INFO] station records found: {len(stations)}")
    print(f"[INFO] with_products={with_products}")

    snapshots: list[StationSnapshot] = []

    for idx, station in enumerate(stations, start=1):
        station_id = station.get("id")
        station_name = station.get("name")
        print(f"[INFO] parsing {idx}/{len(stations)} station={station_id} {station_name}")

        product_names: list[str] | None = None
        if with_products:
            product_names = fetch_product_names(
                fetcher=fetcher,
                station=station,
                raw_dir=RAW_DIR,
                run_id=run_id,
            )

        try:
            snapshot = build_station_snapshot(
                station=station,
                source_url=str(station.get("self") or list_result.url),
                station_type=station_type,
                product_names=product_names,
                source_updated_at=source_updated_at,
                snapshot_at=snapshot_at,
            )
        except ValueError as exc:
            print(f"[WARN] skip bad station record: {exc}")
            continue

        snapshots.append(snapshot)

    return run_id, snapshots


def main() -> None:
    parser = argparse.ArgumentParser(
        description="Archive public tide/water-level station metadata snapshots."
    )

    parser.add_argument(
        "--station-type",
        default="waterlevels",
        choices=[
            "waterlevels",
            "historicwl",
            "tidepredictions",
            "met",
            "waterlevelsandmet",
            "currents",
            "currentpredictions",
        ],
        help="Station collection type to request.",
    )

    parser.add_argument(
        "--limit",
        type=int,
        default=0,
        help="Limit number of stations for testing. 0 means no limit.",
    )

    parser.add_argument(
        "--with-products",
        action="store_true",
        help="Fetch product list for every station. Slower but more detailed.",
    )

    parser.add_argument(
        "--sleep",
        type=float,
        default=0.8,
        help="Sleep seconds between successive HTTP calls.",
    )

    parser.add_argument(
        "--skip-unchanged",
        action="store_true",
        help="Skip inserting station rows whose metadata hash is unchanged.",
    )

    args = parser.parse_args()

    run_id, snapshots = collect_station_snapshots(
        station_type=args.station_type,
        limit=args.limit if args.limit > 0 else None,
        with_products=args.with_products,
        sleep_seconds=args.sleep,
    )

    conn = connect_db(DB_PATH)
    init_db(conn)

    inserted, skipped = save_snapshots(
        conn=conn,
        run_id=run_id,
        snapshots=snapshots,
        skip_unchanged=args.skip_unchanged,
    )

    rows = load_run_rows(conn, run_id)

    csv_path = EXPORT_DIR / f"tide_station_snapshot_{run_id}.csv"
    json_path = EXPORT_DIR / f"tide_station_snapshot_{run_id}.json"

    export_csv(rows, csv_path)
    export_json(rows, json_path)

    print("[DONE]")
    print(f"[DONE] run_id={run_id}")
    print(f"[DONE] total_snapshots={len(snapshots)}")
    print(f"[DONE] inserted={inserted}, skipped={skipped}")
    print(f"[DONE] sqlite={DB_PATH}")
    print(f"[DONE] csv={csv_path}")
    print(f"[DONE] json={json_path}")


if __name__ == "__main__":
    main()

9.2 启动命令

在项目根目录运行:

PYTHONPATH=src python -m tide_snapshot.runner

测试时建议限制数量:

PYTHONPATH=src python -m tide_snapshot.runner --limit 10

如果要拉取每个站点的产品详情:

PYTHONPATH=src python -m tide_snapshot.runner --limit 10 --with-products --sleep 1.0

正式跑全量水位站点:

PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels --with-products --sleep 1.0

如果只想保存变化记录:

PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels --with-products --skip-unchanged --sleep 1.0

9.3 输出位置

运行后会生成:

data/
├── raw/
│   └── 20260610T120000Z/
│       ├── stations_waterlevels.json
│       ├── products_9414290.json
│       └── ...
├── exports/
│   ├── tide_station_snapshot_20260610T120000Z.csv
│   └── tide_station_snapshot_20260610T120000Z.json
└── tide_stations.sqlite3

其中:

  • data/raw/ 保存原始响应,方便复查;
  • data/exports/ 保存面向分析的 CSV/JSON;
  • data/tide_stations.sqlite3 保存长期快照。

9.4 示例结果

下面是示例输出格式。实际结果以你运行时接口返回为准。

station_id station_name sea_area latitude longitude observed_items snapshot_at
1611400 Nawiliwili Central Pacific / Hawaii 21.954506 -159.3561 water_level;tide_related 2026-06-10T12:00:00+00:00
9414290 San Francisco US Pacific Coast 37.806305 -122.46589 water_level;tide_related;forecast 2026-06-10T12:00:00+00:00
9447130 Seattle US Pacific Coast null null water_level 2026-06-10T12:00:00+00:00
8724580 Key West US Atlantic / Gulf Coast null null water_level 2026-06-10T12:00:00+00:00

如果你运行时开启 --with-productsobserved_items 会更丰富,因为它会从产品接口里提取站点支持的产品名。

9.5 查看 SQLite 数据

进入 SQLite:

sqlite3 data/tide_stations.sqlite3

查看某次快照:

SELECT
  station_id,
  station_name,
  sea_area,
  latitude,
  longitude,
  observed_items_json,
  snapshot_at
FROM station_snapshot
ORDER BY station_id
LIMIT 5;

查看某站点历史变化:

SELECT
  run_id,
  station_id,
  station_name,
  sea_area,
  raw_hash,
  snapshot_at
FROM station_snapshot
WHERE station_id = '9414290'
ORDER BY snapshot_at DESC;

查看新增或变化情况,可以在应用层对比不同 run_id 的 hash,也可以用 SQL 做:

SELECT
  a.station_id,
  a.station_name,
  a.raw_hash AS current_hash,
  b.raw_hash AS previous_hash
FROM station_snapshot a
LEFT JOIN station_snapshot b
  ON a.station_id = b.station_id
 AND b.run_id = '上一批次run_id'
WHERE a.run_id = '当前批次run_id'
  AND (b.raw_hash IS NULL OR a.raw_hash <> b.raw_hash);

🔟 常见问题与排错(强烈建议写)

10.1 遇到 403 怎么办?

403 一般表示拒绝访问。常见原因包括:

  • 请求路径不适合自动访问;
  • headers 太异常;
  • 访问频率异常;
  • 目标服务需要授权;
  • IP 或网络环境被限制。

处理思路:

  1. 先确认是否访问公开 API;
  2. 确认 URL 是否写错;
  3. 设置清晰、真实的 User-Agent
  4. 降低访问频率;
  5. 不要尝试绕过登录或付费限制;
  6. 如果是公开服务且仍然异常,查看官方文档或联系服务方。

不要把 403 简单理解成“需要代理”。代理不是合规的万能钥匙。

10.2 遇到 429 怎么办?

429 表示请求过多。

本文已经做了两件事:

  • 对 429 做有限重试;
  • 请求之间加入 sleep。

如果仍然出现 429,可以继续降低速度:

PYTHONPATH=src python -m tide_snapshot.runner --with-products --sleep 2.0

或者先不要抓产品详情:

PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels

产品详情是一站一请求,数量多时会明显增加请求总数。很多元数据快照任务并不需要每次都拉产品详情,可以每周拉一次产品详情,每天只拉列表。

10.3 HTML 抓到空壳怎么办?

如果你采集的是网页,而不是本文这种 API,有时会发现 requests.get() 返回的 HTML 里面只有一堆 JS,没有目标数据。这通常说明页面是动态渲染的。

排查方法:

  1. 打开浏览器开发者工具;
  2. 切换到 Network 面板;
  3. 刷新页面;
  4. 查找 XHR/Fetch 请求;
  5. 看是否有 JSON 接口;
  6. 优先采集接口,而不是用浏览器自动化硬等 DOM。

如果确实没有接口,才考虑 Playwright。

但是要记住,Playwright 不是免死金牌。它更像真实浏览器,也更重。如果只是公开 JSON,没必要上它。

10.4 解析报错怎么办?

解析报错通常有几种原因:

  • 字段名变了;
  • JSON 结构变了;
  • 某个字段从对象变成数组;
  • 某个字段临时为空;
  • 接口返回错误信息,但程序当成正常数据解析。

处理方法:

第一,保存 raw 响应。本文已经把原始 JSON 保存到 data/raw/{run_id}/,排错时直接打开看。

第二,解析函数不要写得太硬。比如 parse_station_list() 同时兼容 stationsstationList

第三,关键字段缺失要有日志。比如站点 ID 缺失时跳过并打印 warn。

第四,给解析层写测试。哪怕只写几个样例,也比完全没有强。

10.5 编码或乱码怎么办?

JSON API 一般乱码较少。如果导出 CSV 后用 Excel 打开乱码,可以注意两点:

  1. CSV 使用 utf-8-sig
  2. JSON 使用 ensure_ascii=False

本文 export_csv() 已经使用:

encoding="utf-8-sig"

这对 Excel 用户比较友好。

10.6 为什么坐标会是 null?

如果源接口没有返回 latlng,本文会保存为 NULL。这比填 0 更安全。

0,0 是真实地理位置,在几内亚湾附近。把缺失坐标填成 0,后面做地图展示时会误导分析。

10.7 为什么海域是推断出来的?

因为源站元数据里通常提供的是州、地区、是否五大湖等信息,不一定直接提供“海域”字段。

本文用 stategreatlakes 推断:

  • greatlakes=true → Great Lakes;
  • CA/OR/WA → US Pacific Coast;
  • ME/MA/NY/... → US Atlantic Coast;
  • TX/LA/MS/... → US Gulf Coast;
  • PR/VI → Caribbean。

如果你有更严谨的海域边界,可以引入 GIS 多边形,用点落区算法判断。本文先用轻量映射,便于复现。

1️⃣1️⃣ 进阶优化(可选但加分)

11.1 并发优化

本文默认串行请求。对元数据快照来说,这已经够用。

如果你确实要优化,可以考虑:

  • ThreadPoolExecutor
  • asyncio + httpx.AsyncClient
  • Scrapy。

但我建议先加一个全局限速器,别让并发把请求瞬间打满。

一个简单的线程池版本思路如下:

from concurrent.futures import ThreadPoolExecutor, as_completed

def fetch_many_products(stations, max_workers=3):
    results = {}
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        future_map = {
            executor.submit(fetch_product_for_station, station): station
            for station in stations
        }

        for future in as_completed(future_map):
            station = future_map[future]
            station_id = station.get("id")
            try:
                results[station_id] = future.result()
            except Exception as exc:
                print(f"[WARN] station={station_id} failed: {exc}")

    return results

注意,这只是思路。真实使用时仍然要做限速,不要因为线程池方便就开几十上百个 worker。

11.2 断点续跑

如果全量采集 products,任务可能跑到一半失败。可以通过 raw 文件和数据库实现断点续跑。

简单策略:

  1. 每抓一个 products 响应,就写入 data/raw/{run_id}/products_{station_id}.json
  2. 下次运行时,如果文件存在,就直接读取本地 raw;
  3. 如果数据库里已有相同 run_id + station_id,就跳过;
  4. 最后补齐缺失站点。

伪代码:

def load_product_from_cache(raw_path):
    if raw_path.exists():
        return json.loads(raw_path.read_text(encoding="utf-8"))
    return None

再进一步,可以专门建一个 fetch_log 表:

CREATE TABLE fetch_log (
    run_id TEXT NOT NULL,
    station_id TEXT NOT NULL,
    url TEXT NOT NULL,
    status TEXT NOT NULL,
    error_message TEXT,
    fetched_at TEXT NOT NULL,
    PRIMARY KEY (run_id, station_id, url)
);

这样失败站点可以单独重跑。

11.3 日志与监控

当任务从“本地脚本”变成“定时生产任务”时,日志就很重要。

至少记录:

  • 本次 run_id;
  • 请求站点类型;
  • 站点总数;
  • 成功数;
  • 失败数;
  • 跳过数;
  • 输出路径;
  • 总耗时。

可以把 print 换成 logging:

import logging

logging.basicConfig(
    level=logging.INFO,
    format="%(asctime)s %(levelname)s %(message)s",
)

logger = logging.getLogger(__name__)

logger.info("start station snapshot")
logger.warning("product request failed")
logger.error("job failed", exc_info=True)

我个人很不喜欢没有日志的爬虫。它跑成功时看起来挺干净,一失败就像黑盒。

11.4 定时任务

Linux/macOS 可以用 cron:

crontab -e

每天凌晨 2 点执行:

0 2 * * * cd /path/to/tide_station_snapshot && /path/to/.venv/bin/python -m tide_snapshot.runner --station-type waterlevels --with-products --sleep 1.0 >> logs/tide_snapshot.log 2>&1

注意提前创建 logs 目录:

mkdir -p logs

如果团队里已经有 Airflow,可以把它做成 DAG:

from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime

with DAG(
    dag_id="tide_station_snapshot",
    start_date=datetime(2026, 1, 1),
    schedule="@daily",
    catchup=False,
) as dag:
    run_snapshot = BashOperator(
        task_id="run_snapshot",
        bash_command=(
            "cd /path/to/tide_station_snapshot && "
            "/path/to/.venv/bin/python -m tide_snapshot.runner "
            "--station-type waterlevels --with-products --sleep 1.0"
        ),
    )

11.5 数据质量检查

站点元数据也要做质量检查。建议加几个规则:

  • station_id 不为空;
  • station_name 不为空;
  • latitude 在 -90 到 90;
  • longitude 在 -180 到 180;
  • observed_items 不为空;
  • 同一批次站点 ID 不重复;
  • 本次站点数量不要比上次骤降太多。

示例检查函数:

def validate_snapshot(snapshot):
    errors = []

    if not snapshot.station_id:
        errors.append("missing station_id")

    if not snapshot.station_name:
        errors.append("missing station_name")

    if snapshot.latitude is not None:
        if not (-90 <= snapshot.latitude <= 90):
            errors.append(f"invalid latitude: {snapshot.latitude}")

    if snapshot.longitude is not None:
        if not (-180 <= snapshot.longitude <= 180):
            errors.append(f"invalid longitude: {snapshot.longitude}")

    if not snapshot.observed_items:
        errors.append("empty observed_items")

    return errors

数据质量检查不一定要阻断任务,但至少要记录。尤其是站点数量突然少很多时,要警惕是不是接口参数变了、网络返回错误页了,或者解析逻辑坏了。

11.6 地理增强

当前 sea_area 是通过州和区域推断的。更高级的做法是引入地理边界。

例如:

  • 准备太平洋、美国东海岸、墨西哥湾、加勒比海、五大湖的 GeoJSON;
  • shapely 判断站点坐标落在哪个 polygon;
  • 将结果写入 sea_area

依赖:

pip install shapely

伪代码:

from shapely.geometry import Point, shape
import json

def load_polygons(path):
    data = json.loads(path.read_text(encoding="utf-8"))
    polygons = []
    for feature in data["features"]:
        polygons.append(
            (
                feature["properties"]["name"],
                shape(feature["geometry"]),
            )
        )
    return polygons

def infer_area_by_polygon(lat, lng, polygons):
    if lat is None or lng is None:
        return "Unknown"

    point = Point(lng, lat)

    for name, polygon in polygons:
        if polygon.contains(point):
            return name

    return "Unknown"

这一步不是本文必需,但如果你要做严肃地图产品,建议上 GIS 判定,而不是只靠 state 映射。

11.7 从站点索引扩展到观测值采集

站点索引归档完成后,下一步通常是采集观测值。例如水位、潮汐预报、气象数据等。

不过这类数据和元数据不同:

  • 时间范围更大;
  • 数据量更大;
  • 接口限制更多;
  • 需要处理时间分片;
  • 需要处理单位、时区、基准面;
  • 需要更严格的断点续跑。

所以我建议把“站点索引”和“观测值采集”拆成两个项目或两个模块。不要在一开始就混在一起。

一个合理流程是:

站点索引快照
  ↓
筛选支持 water_level 的站点
  ↓
按站点与时间范围分片
  ↓
采集观测值
  ↓
写入时序表
  ↓
做质量检查和可视化

1️⃣2️⃣ 总结与延伸阅读

本文完成了一个完整的潮位站公开站点索引快照项目。

我们做了这些事情:

  • 选择公开 Metadata API 作为数据源;
  • 说明了合规边界和 robots.txt 基本原则;
  • requests.Session 实现请求层;
  • 加入 headers、timeout、retry、sleep;
  • 用解析层统一处理站点列表和产品列表;
  • 将站点名、海域、坐标、观测项目、更新时间映射成内部结构;
  • 用 SQLite 保存快照;
  • 用 CSV/JSON 导出结果;
  • raw_hash 做变更判断;
  • 给出运行命令、示例结果和排错方法;
  • 讨论了并发、断点续跑、日志、定时任务、GIS 增强等进阶方向。

我比较喜欢这种小而完整的项目。它没有炫技,也没有为了“像爬虫”而硬上复杂框架。它解决的是一个真实问题:把公开站点元数据稳定地采集下来,并且让每一次采集都有迹可循。

下一步可以继续扩展:

  1. 用 Scrapy 改造调度层,支持多个数据源;
  2. 用 Playwright 处理少数没有公开 API 的动态站点;
  3. 引入 GeoJSON 和 Shapely 做更严谨的海域分类;
  4. 增加观测值采集模块,形成站点索引 + 时间序列数据体系;
  5. 把 SQLite 替换成 PostgreSQL/PostGIS,支持空间查询;
  6. 接入 Airflow 或 Prefect,实现生产级定时采集;
  7. 增加数据质量报告,监控站点数量、字段缺失率和变化率。

最后说一句很实在的话:爬虫写到后面,真正拉开差距的不是“会不会请求网页”,而是能不能把请求、解析、清洗、存储、日志、合规、复跑都处理好。潮位站点索引这种任务,正好适合练这套基本功。稳一点,慢一点,留下原始数据和快照记录,后面你会感谢现在这个谨慎的自己。

🌟 文末

好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!

小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥

✅ 专栏持续更新中|建议收藏 + 订阅

墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:

✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)

📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~

✅ 互动征集

想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?

评论区留言告诉我你的需求,我会优先安排实现(更新)哒~


⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)


✅ 免责声明

本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。

使用或者参考本项目即表示您已阅读并同意以下条款:

  • 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
  • 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
  • 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
  • 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!

更多推荐