潮位站公开站点索引与快照:用 Python 采集站点元数据并归档
㊗️本期内容已收录至专栏《Python爬虫实战》,持续完善知识体系与项目实战,建议先订阅收藏,后续查阅更方便~
㊙️本期爬虫难度指数:⭐⭐⭐☆☆(进阶级)
🉐福利: 一次订阅后,专栏内的所有文章可永久免费看,持续更新中,保底1000+(篇)硬核实战内容。
全文目录:
🌟 开篇语
哈喽,各位小伙伴们你们好呀~我是【喵手】。
运营社区: C站 / 掘金 / 腾讯云 / 阿里云 / 华为云 / 51CTO
欢迎大家常来逛逛,一起学习,一起进步~🌟
我长期专注 Python 爬虫工程化实战,主理专栏👉 《Python爬虫实战》:从采集策略到反爬对抗,从数据清洗到分布式调度,持续输出可复用的方法论与可落地案例。内容主打一个“能跑、能用、能扩展”,让数据价值真正做到——抓得到、洗得净、用得上。
📌 专栏食用指南(建议收藏)
- ✅ 入门基础:环境搭建 / 请求与解析 / 数据落库
- ✅ 进阶提升:登录鉴权 / 动态渲染 / 反爬对抗
- ✅ 工程实战:异步并发 / 分布式调度 / 监控与容错
- ✅ 项目落地:数据治理 / 可视化分析 / 场景化应用
📣 专栏推广时间:如果你想系统学爬虫,而不是碎片化东拼西凑,欢迎订阅专栏👉《Python爬虫实战》👈,一次订阅后,专栏内的所有文章可永久免费阅读,持续更新中。
💕订阅后更新会优先推送,按目录学习更高效💯~
0️⃣ 前言(Preface)
这篇文章要做的事情很明确:用 Python 采集公开潮位站点索引,抽取站点名、海域、坐标、观测项目、更新时间等元数据,并把每次采集结果保存成可追溯的快照。
读完这篇文章,你可以获得:
- 一套从公开接口采集潮位站点元数据的完整工程写法;
- 一套适合“站点索引快照归档”的 SQLite/CSV/JSON 存储方案;
- 一些我自己在写爬虫时比较看重的细节:频率控制、失败重试、字段容错、去重和断点思路。
很多爬虫文章喜欢一上来就堆并发、堆代理、堆框架,但我一直觉得,真正稳定的采集任务,第一步不是“快”,而是“准”和“可回滚”。潮位站点元数据这种东西,更新频率通常不需要秒级追踪,适合做成定时快照:每天或每周拉一次,把站点索引沉淀下来,后面做地图展示、站点检索、数据质量分析都会省很多事。
1️⃣ 摘要(Abstract)
本文基于 NOAA CO-OPS 公开潮汐与水位站点 Metadata API,使用 Python 的 requests 完成站点索引采集,解析站点名、海域、坐标、观测项目、快照更新时间,并导出到 SQLite、CSV 和 JSON 文件。
读完后你将掌握:
- 如何判断一个潮位站点索引任务更适合“API 采集”而不是“页面解析”;
- 如何设计请求层、解析层、清洗层、存储层,使代码可维护、可复跑;
- 如何用快照归档思路保存站点元数据变更,而不是每次简单覆盖旧结果。
本文不是为了鼓励无节制抓取,也不讨论绕过限制、批量撞接口、规避风控之类做法。它只围绕公开数据、合理频率、技术复现展开。
2️⃣ 背景与需求(Why)
2.1 为什么要爬潮位站点索引?
潮位站,也可以理解为水位、潮汐、港口或近岸观测站点的一类基础设施。对数据分析来说,站点索引本身就是一个很有价值的元数据表。
它不一定直接回答“某一刻水位是多少”,但它能回答:
- 哪些站点存在?
- 每个站点在哪里?
- 每个站点属于哪个海域或区域?
- 这个站点有哪些观测或产品能力?
- 本次采集是什么时候完成的?
- 与上一次快照相比,站点是否新增、移除、改名或产品能力发生变化?
这些问题看似基础,但在后续业务中非常关键。比如你想做潮位观测可视化,如果没有站点索引,就无法稳定地在地图上定位站点;你想批量拉取水位时间序列,如果没有站点能力清单,就容易对不支持该产品的站点发起无效请求;你想做长期归档,如果没有快照时间和 hash,就很难知道某一次数据变化是源站变化,还是你自己的代码解析错了。
2.2 本文目标
本文实现一个“潮位站公开站点索引与快照”项目,目标字段如下:
| 中文字段 | 程序字段 | 说明 |
|---|---|---|
| 站点名 | station_name |
公开站点名称 |
| 海域 | sea_area |
由站点区域、州/地区、是否五大湖等字段推断 |
| 坐标 | latitude / longitude |
十进制度坐标 |
| 观测项目 | observed_items |
站点支持的产品或观测能力 |
| 更新时间 | snapshot_at / source_updated_at |
本地快照时间,以及可选的源响应时间 |
这里要强调一点:很多公开元数据接口不会给每个站点提供一个严格意义上的“站点更新时间”。这种情况下,比较诚实的工程写法是保存“本次快照采集时间”,并在 HTTP 响应头里尝试记录 Last-Modified 或 Date。如果源站没有明确给出站点级更新时间,不要自己编造一个看起来很像真的时间字段。
2.3 本文选用的数据源
本文使用 NOAA CO-OPS 公开 Metadata API。它是面向潮汐、水位、海洋与气象相关站点的公开服务,适合做站点索引采集。本文把它当成“公开潮位站点索引”的样例来源。
这个任务的核心不是下载历史水位时间序列,而是归档站点元数据。也就是说,我们主要采集的是“站点清单”和“站点产品能力”,不是连续观测值。
3️⃣ 合规与注意事项(必写)
写爬虫之前,我一般会先做一件没那么刺激但很必要的事:确认采集对象是不是公开信息,以及有没有清晰的使用边界。
3.1 robots.txt 基本说明
robots.txt 是站点给自动化访问程序提供访问建议的一种约定。严格说,它不是登录鉴权,也不是权限系统,但它表达了站点希望哪些路径被访问、哪些路径不要被自动访问。
在实际开发中,我会按下面的原则处理:
- 先看目标域名根路径下的
robots.txt; - 如果某路径明确禁止抓取,就不要抓;
- 如果目标站点提供了官方 API,优先走 API,而不是硬解析页面;
- 即使没有明确禁止,也不要做高频、高并发、无边界扫描;
- 不采集和主题无关的数据。
本文使用的是公开 API,属于更适合自动化访问的方式。相比 HTML 页面,API 的结构更稳定,字段含义更清晰,也更容易控制请求规模。
3.2 频率控制
这个项目的采集对象是站点元数据,不是秒级行情。没有必要做攻击式并发,也没有必要把请求压到极限。
本文默认策略:
- 列表接口请求一次;
- 每个站点的产品详情请求可选;
- 连续详情请求之间默认 sleep 0.8 秒;
- 对 429、500、502、503、504 做有限重试;
- 使用指数退避,避免失败时越打越快。
我自己的习惯是,先让任务稳定跑完,再考虑速度。尤其是公共服务,别把“能跑”误解成“可以猛跑”。
3.3 不采集敏感信息
本文只采集公开站点元数据,例如站点名、坐标、观测产品等。不采集个人信息,不绕过登录,不访问付费内容,不模拟用户权限,不做批量猜测,也不做漏洞探测。
如果你的目标站点换成其他平台,也建议保持同样原则:
- 公开页面或公开 API 才采集;
- 不绕过验证码、登录、付费墙;
- 不采集身份证、手机号、邮箱、账号资料等敏感字段;
- 不把采集程序用于压测目标站;
- 不将数据用于和原始授权不一致的场景。
4️⃣ 技术选型与整体流程(What/How)
4.1 静态、动态还是 API?
常见采集方式可以分成三类:
| 类型 | 特征 | 适合工具 |
|---|---|---|
| 静态页面 | HTML 中直接包含目标数据 | requests + BeautifulSoup / lxml |
| 动态页面 | 浏览器执行 JS 后才出现数据 | Playwright / Selenium |
| API 接口 | 返回 JSON/XML/CSV 等结构化数据 | requests / httpx |
本文属于第三类:API 采集。
原因很简单,目标数据源已经提供了结构化 Metadata API。此时再用 Playwright 打开网页、等待 JS 渲染、从 DOM 里扣字段,反而复杂、慢,而且更容易受页面改版影响。
4.2 整体流程
本文流程可以概括为:
采集 Fetch
↓
解析 Parse
↓
清洗 Normalize
↓
去重 Deduplicate
↓
存储 Store
↓
导出 Export
更具体一点:
1. 请求站点列表接口
2. 获取 station_id、name、lat、lng、state、products URL 等基础字段
3. 可选:逐个请求 products 接口,补充观测项目
4. 将 state / greatlakes 等信息转换为 sea_area
5. 对原始元数据计算 hash
6. 写入 SQLite 快照表
7. 导出 CSV 和 JSON
8. 下次运行时保留新的 run_id,形成历史快照
4.3 为什么选择 requests,而不是 Scrapy 或 Playwright?
这次我选择 requests,理由有三个:
第一,目标是公开 JSON API,requests 足够稳定。
第二,任务规模不大。潮位站点索引属于元数据,通常几百到几千条,不需要一上来就引入 Scrapy 的调度器、队列、中间件和 Item Pipeline。
第三,快照归档更看重可控性。用 requests.Session、Retry、SQLite,代码结构清楚,后期改成定时任务也轻松。
如果以后你要采集多个国家、多个站点体系,或者需要大规模调度,再上 Scrapy 会更合适。如果目标页面完全依赖前端渲染,再考虑 Playwright。
5️⃣ 环境准备与依赖安装(可复现)
5.1 Python 版本
建议使用:
Python 3.10+
我建议至少 3.10,是因为类型标注、路径处理和标准库体验都比较舒服。3.11 或 3.12 也可以。
5.2 安装依赖
本文尽量少引入第三方包,只需要:
pip install requests
如果你希望用虚拟环境:
python -m venv .venv
# macOS / Linux
source .venv/bin/activate
# Windows PowerShell
.venv\Scripts\Activate.ps1
pip install requests
5.3 推荐项目结构
tide_station_snapshot/
├── README.md
├── requirements.txt
├── data/
│ ├── raw/
│ ├── exports/
│ └── tide_stations.sqlite3
├── src/
│ └── tide_snapshot/
│ ├── __init__.py
│ ├── config.py
│ ├── fetcher.py
│ ├── parser.py
│ ├── storage.py
│ └── runner.py
└── tests/
└── test_parser.py
先创建目录:
mkdir -p tide_station_snapshot/src/tide_snapshot
mkdir -p tide_station_snapshot/data/raw
mkdir -p tide_station_snapshot/data/exports
mkdir -p tide_station_snapshot/tests
cd tide_station_snapshot
touch src/tide_snapshot/__init__.py
requirements.txt:
requests>=2.31.0
安装:
pip install -r requirements.txt
6️⃣ 核心实现:请求层(Fetcher)
请求层是爬虫的地基。不要把请求、解析、入库全塞在一个文件里。短脚本当然能跑,但稍微遇到失败重试、字段变更、导出格式扩展,就会开始混乱。
本文请求层负责:
- 统一 headers;
- 统一 timeout;
- 使用 session 复用连接;
- 处理 429/5xx 重试;
- 做基础频率控制;
- 返回 JSON 和响应头信息。
6.1 config.py
# src/tide_snapshot/config.py
from pathlib import Path
PROJECT_ROOT = Path(__file__).resolve().parents[2]
DATA_DIR = PROJECT_ROOT / "data"
RAW_DIR = DATA_DIR / "raw"
EXPORT_DIR = DATA_DIR / "exports"
DB_PATH = DATA_DIR / "tide_stations.sqlite3"
COOPS_MDAPI_BASE = "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi"
DEFAULT_HEADERS = {
"User-Agent": (
"tide-station-snapshot/1.0 "
"(metadata archiving demo; contact: example@example.com)"
),
"Accept": "application/json,text/plain,*/*",
"Referer": "https://tidesandcurrents.noaa.gov/",
"Connection": "keep-alive",
}
DEFAULT_TIMEOUT = 20
DEFAULT_SLEEP_SECONDS = 0.8
这里的 User-Agent 不建议伪装成浏览器。很多人喜欢复制 Chrome UA,我个人不太喜欢这种写法。既然是公开接口采集,最好说明程序用途。真实项目里可以把联系邮箱换成团队邮箱。
6.2 fetcher.py
# src/tide_snapshot/fetcher.py
from __future__ import annotations
import json
import time
from dataclasses import dataclass
from typing import Any
import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry
from tide_snapshot.config import DEFAULT_HEADERS, DEFAULT_SLEEP_SECONDS, DEFAULT_TIMEOUT
@dataclass(frozen=True)
class FetchResult:
url: str
status_code: int
headers: dict[str, str]
json_data: dict[str, Any]
class FetchError(RuntimeError):
pass
class HttpFetcher:
"""
A small HTTP fetcher for public metadata APIs.
Features:
- requests.Session connection reuse
- custom headers
- timeout
- retry with exponential backoff
- polite sleep between calls
"""
def __init__(
self,
timeout: int = DEFAULT_TIMEOUT,
sleep_seconds: float = DEFAULT_SLEEP_SECONDS,
retries: int = 3,
backoff_factor: float = 0.8,
) -> None:
self.timeout = timeout
self.sleep_seconds = sleep_seconds
self._last_request_at = 0.0
self.session = requests.Session()
self.session.headers.update(DEFAULT_HEADERS)
retry = Retry(
total=retries,
connect=retries,
read=retries,
status=retries,
allowed_methods=frozenset(["GET"]),
status_forcelist=(429, 500, 502, 503, 504),
backoff_factor=backoff_factor,
respect_retry_after_header=True,
raise_on_status=False,
)
adapter = HTTPAdapter(max_retries=retry, pool_connections=5, pool_maxsize=10)
self.session.mount("https://", adapter)
self.session.mount("http://", adapter)
def _pace(self) -> None:
"""
Keep a small interval between requests.
This is intentionally simple and enough for metadata snapshots.
"""
now = time.monotonic()
elapsed = now - self._last_request_at
wait = self.sleep_seconds - elapsed
if wait > 0:
time.sleep(wait)
self._last_request_at = time.monotonic()
def get_json(self, url: str, params: dict[str, Any] | None = None) -> FetchResult:
self._pace()
try:
response = self.session.get(url, params=params, timeout=self.timeout)
except requests.RequestException as exc:
raise FetchError(f"Request failed: {url}; reason={exc}") from exc
if response.status_code >= 400:
text = response.text[:500].replace("\n", " ")
raise FetchError(
f"Bad response: status={response.status_code}, url={response.url}, body={text}"
)
try:
payload = response.json()
except json.JSONDecodeError as exc:
text = response.text[:500].replace("\n", " ")
raise FetchError(
f"JSON decode failed: url={response.url}, body={text}"
) from exc
if not isinstance(payload, dict):
raise FetchError(f"Unexpected JSON root type: {type(payload).__name__}")
return FetchResult(
url=response.url,
status_code=response.status_code,
headers=dict(response.headers),
json_data=payload,
)
6.3 请求层关键点说明
headers
本文设置了:
User-Agent:说明程序用途;Accept:明确希望返回 JSON;Referer:给源站一个上下文;Connection:保持连接复用。
headers 不是为了“伪装”,而是为了让请求更规范。真正的稳定采集,靠的是公开数据、合理频率、错误处理,而不是把 UA 写得像某个浏览器。
timeout
没有 timeout 的爬虫,迟早会卡死。
本文设置默认 20 秒。对于元数据接口来说,20 秒已经比较宽松。如果你的网络环境稳定,可以改成 10 秒。
session/cookie
本文不需要登录,也不需要 cookie。但仍然使用 requests.Session,主要是为了复用连接和统一 headers。
如果目标站点需要登录或 cookie,而数据又不是公开数据,那就要重新评估是否适合采集。本文不涉及这类场景。
失败处理
本文通过 Retry 处理:
- 429:请求过多;
- 500:服务器内部错误;
- 502/503/504:网关或服务暂时不可用。
重试不是让你一直打接口,而是让偶发波动不至于中断整个任务。重试次数一定要有限,失败日志一定要保留。
7️⃣ 核心实现:解析层(Parser)
解析层负责把源站返回的 JSON 转换成我们自己的统一结构。
7.1 解析方式
本文是 JSON API,所以不用 XPath、CSS Selector、BeautifulSoup 或 lxml。
但解析思想是一样的:
- 先拿列表;
- 再拿详情;
- 字段缺失要容错;
- 输出结构要稳定。
7.2 字段容错原则
公开接口有时会出现两类变化:
第一,字段名变化。例如有的文档写 stationList,实际 JSON 里可能是 stations。
第二,字段值缺失。例如部分站点没有 state,或者 products 接口临时没有返回产品列表。
所以解析层不要写死得太脆:
payload["stations"]
更稳一点:
payload.get("stations") or payload.get("stationList") or []
7.3 parser.py
# src/tide_snapshot/parser.py
from __future__ import annotations
import hashlib
import json
from dataclasses import dataclass, asdict
from datetime import datetime, timezone
from typing import Any
SEA_AREA_BY_STATE: dict[str, str] = {
# Pacific Coast
"CA": "US Pacific Coast",
"OR": "US Pacific Coast",
"WA": "US Pacific Coast",
# Alaska and Pacific Islands
"AK": "Alaska Waters",
"HI": "Central Pacific / Hawaii",
"AS": "Pacific Islands",
"GU": "Pacific Islands",
"MP": "Pacific Islands",
# Atlantic Coast
"ME": "US Atlantic Coast",
"NH": "US Atlantic Coast",
"MA": "US Atlantic Coast",
"RI": "US Atlantic Coast",
"CT": "US Atlantic Coast",
"NY": "US Atlantic Coast",
"NJ": "US Atlantic Coast",
"DE": "US Atlantic Coast",
"MD": "US Atlantic Coast",
"VA": "US Atlantic Coast",
"NC": "US Atlantic Coast",
"SC": "US Atlantic Coast",
"GA": "US Atlantic Coast",
# Gulf Coast
"AL": "US Gulf Coast",
"MS": "US Gulf Coast",
"LA": "US Gulf Coast",
"TX": "US Gulf Coast",
# Florida spans both Atlantic and Gulf coasts.
# Without a polygon or station-specific coast classification,
# keep a neutral label.
"FL": "US Atlantic / Gulf Coast",
# Caribbean
"PR": "Caribbean",
"VI": "Caribbean",
}
@dataclass
class StationSnapshot:
station_id: str
station_name: str
sea_area: str
latitude: float | None
longitude: float | None
observed_items: list[str]
source_updated_at: str | None
snapshot_at: str
source_url: str
raw_hash: str
raw_json: dict[str, Any]
def to_public_dict(self) -> dict[str, Any]:
data = asdict(self)
data["observed_items"] = ";".join(self.observed_items)
data.pop("raw_json", None)
return data
def utc_now_iso() -> str:
return datetime.now(timezone.utc).replace(microsecond=0).isoformat()
def stable_hash(data: Any) -> str:
"""
Create a stable SHA256 hash for raw JSON.
Useful for detecting metadata changes.
"""
text = json.dumps(data, ensure_ascii=False, sort_keys=True, separators=(",", ":"))
return hashlib.sha256(text.encode("utf-8")).hexdigest()
def first_list(payload: dict[str, Any], *keys: str) -> list[Any]:
for key in keys:
value = payload.get(key)
if isinstance(value, list):
return value
return []
def parse_station_list(payload: dict[str, Any]) -> list[dict[str, Any]]:
"""
NOAA MDAPI documentation may describe station collection as stationList,
while JSON responses can use stations in some endpoints.
Keep both for compatibility.
"""
stations = first_list(payload, "stations", "stationList")
return [item for item in stations if isinstance(item, dict)]
def parse_product_names(payload: dict[str, Any]) -> list[str]:
"""
Products response usually contains a product collection.
Keep this parser flexible.
"""
products = first_list(payload, "products", "productList", "ProductList")
names: list[str] = []
for item in products:
if not isinstance(item, dict):
continue
name = (
item.get("name")
or item.get("productName")
or item.get("title")
or item.get("value")
)
if name is None:
continue
name_text = str(name).strip()
if name_text and name_text not in names:
names.append(name_text)
return names
def infer_sea_area(station: dict[str, Any]) -> str:
"""
NOAA station metadata has state/region and greatlakes flags,
but does not always provide a direct 'sea area' field.
So we derive a practical sea_area label for indexing.
"""
if station.get("greatlakes") is True:
return "Great Lakes"
state = str(station.get("state") or "").strip().upper()
if state in SEA_AREA_BY_STATE:
return SEA_AREA_BY_STATE[state]
# Some APIs may return region-like values instead of US state codes.
name = str(station.get("name") or "").lower()
if "lake" in name:
return "Lake / Inland Water"
if state:
return f"Region: {state}"
return "Unknown"
def parse_float(value: Any) -> float | None:
if value is None:
return None
try:
return float(value)
except (TypeError, ValueError):
return None
def fallback_observed_items(station: dict[str, Any], station_type: str) -> list[str]:
"""
If product endpoint is not requested or returns empty,
create a conservative observed_items list from station type and flags.
"""
items: list[str] = []
if station_type == "waterlevels":
items.append("water_level")
elif station_type == "tidepredictions":
items.append("tide_predictions")
elif station_type == "met":
items.append("meteorological")
elif station_type == "currents":
items.append("currents")
else:
items.append(station_type)
if station.get("tidal") is True and "tide_related" not in items:
items.append("tide_related")
if station.get("forecast") is True:
items.append("forecast")
if station.get("outlook") is True:
items.append("high_tide_flooding_outlook")
if station.get("HTFhistorical") is True:
items.append("high_tide_flooding_historical")
return items
def build_station_snapshot(
station: dict[str, Any],
source_url: str,
station_type: str,
product_names: list[str] | None = None,
source_updated_at: str | None = None,
snapshot_at: str | None = None,
) -> StationSnapshot:
station_id = str(station.get("id") or "").strip()
station_name = str(station.get("name") or "").strip()
if not station_id:
raise ValueError(f"Missing station id in station record: {station}")
if not station_name:
station_name = f"Unnamed Station {station_id}"
observed_items = product_names or fallback_observed_items(station, station_type)
raw_for_hash = {
"station": station,
"observed_items": observed_items,
"station_type": station_type,
}
return StationSnapshot(
station_id=station_id,
station_name=station_name,
sea_area=infer_sea_area(station),
latitude=parse_float(station.get("lat")),
longitude=parse_float(station.get("lng")),
observed_items=observed_items,
source_updated_at=source_updated_at,
snapshot_at=snapshot_at or utc_now_iso(),
source_url=source_url,
raw_hash=stable_hash(raw_for_hash),
raw_json=raw_for_hash,
)
7.4 列表页如何拿详情链接?
虽然这里不是 HTML 页面,但“列表页拿详情链接”的逻辑仍然存在。站点列表接口通常会返回类似下面的信息:
{
"id": "9414290",
"name": "San Francisco",
"lat": 37.806305,
"lng": -122.46589,
"products": {
"self": "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations/9414290/products.json"
},
"self": "https://api.tidesandcurrents.noaa.gov/mdapi/prod/webapi/stations/9414290.json"
}
其中:
self可以视为站点详情链接;products.self可以视为站点观测产品链接。
实际项目里,不要强依赖某个嵌套字段一定存在。可以优先使用接口返回的 URL,如果没有,就根据站点 ID 拼接产品接口。
7.5 详情页如何抽字段?
本文的详情主要是 products 信息。产品接口返回后,解析 products 或 productList,提取每个产品的 name 字段,形成 observed_items。
如果不请求产品详情,就根据站点类型和基础字段做保守推断:
waterlevels→water_leveltidepredictions→tide_predictionsmet→meteorologicalcurrents→currents
这不是完美分类,但足够作为索引层的基础字段。真正严谨的产品能力,还是建议请求 products 接口。
7.6 缺失字段怎么办?
缺失字段不要让整个任务崩掉。
本文策略:
station_id缺失:跳过或抛出错误,因为无法作为主键;station_name缺失:生成Unnamed Station {id};lat/lng缺失:保存为NULL;sea_area无法判断:保存Unknown;observed_items缺失:根据站点类型生成兜底值;source_updated_at缺失:保存为NULL,同时保留snapshot_at。
这种处理方式比较朴素,但稳定。数据工程里最怕的是字段一缺失,整个任务直接中断;更怕的是缺失时悄悄填一个假值,后面分析的人根本不知道。
8️⃣ 数据存储与导出(Storage)
本文选择 SQLite 起步,同时导出 CSV 和 JSON。
为什么不是 MySQL?
因为这个任务是站点元数据快照,不是高并发在线业务。SQLite 单文件、零服务、方便迁移,适合本地归档、定时任务和轻量分析。后期如果要接 Web 服务或多人协作,再迁移 MySQL/PostgreSQL 也不迟。
8.1 字段映射表
| 字段名 | 类型 | 示例值 | 说明 |
|---|---|---|---|
run_id |
TEXT | 20260610T120000Z |
本次采集批次 ID |
station_id |
TEXT | 9414290 |
站点唯一 ID |
station_name |
TEXT | San Francisco |
站点名 |
sea_area |
TEXT | US Pacific Coast |
推断海域 |
latitude |
REAL | 37.806305 |
纬度 |
longitude |
REAL | -122.46589 |
经度 |
observed_items_json |
TEXT | ["water_level"] |
观测项目 JSON |
source_updated_at |
TEXT | Wed, 10 Jun 2026 12:00:00 GMT |
源响应时间,可为空 |
snapshot_at |
TEXT | 2026-06-10T12:00:00+00:00 |
本地快照时间 |
source_url |
TEXT | API URL | 来源 URL |
raw_hash |
TEXT | SHA256 | 原始内容 hash |
raw_json |
TEXT | JSON 字符串 | 原始元数据 |
8.2 去重策略
本文采用两层策略:
第一层:同一批次内,run_id + station_id 唯一。
第二层:跨批次,通过 raw_hash 判断元数据是否变化。
如果你每次都想保留完整快照,就每次插入所有站点。如果你只想保存变化记录,可以启用 --skip-unchanged,当某站点 hash 与上次一致时跳过写入。
我更倾向于早期保留完整快照。磁盘成本不高,排查问题时很舒服。等任务稳定后,再考虑只保存变化记录。
8.3 storage.py
# src/tide_snapshot/storage.py
from __future__ import annotations
import csv
import json
import sqlite3
from pathlib import Path
from typing import Iterable
from tide_snapshot.parser import StationSnapshot
SCHEMA_SQL = """
CREATE TABLE IF NOT EXISTS station_snapshot (
run_id TEXT NOT NULL,
station_id TEXT NOT NULL,
station_name TEXT NOT NULL,
sea_area TEXT NOT NULL,
latitude REAL,
longitude REAL,
observed_items_json TEXT NOT NULL,
source_updated_at TEXT,
snapshot_at TEXT NOT NULL,
source_url TEXT NOT NULL,
raw_hash TEXT NOT NULL,
raw_json TEXT NOT NULL,
created_at TEXT NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (run_id, station_id)
);
CREATE INDEX IF NOT EXISTS idx_station_snapshot_station_id
ON station_snapshot(station_id);
CREATE INDEX IF NOT EXISTS idx_station_snapshot_raw_hash
ON station_snapshot(raw_hash);
CREATE TABLE IF NOT EXISTS station_latest (
station_id TEXT PRIMARY KEY,
latest_hash TEXT NOT NULL,
last_seen_run_id TEXT NOT NULL,
last_seen_at TEXT NOT NULL
);
"""
def connect_db(db_path: Path) -> sqlite3.Connection:
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL;")
conn.execute("PRAGMA synchronous=NORMAL;")
return conn
def init_db(conn: sqlite3.Connection) -> None:
conn.executescript(SCHEMA_SQL)
conn.commit()
def has_same_latest_hash(
conn: sqlite3.Connection,
station_id: str,
raw_hash: str,
) -> bool:
row = conn.execute(
"SELECT latest_hash FROM station_latest WHERE station_id = ?",
(station_id,),
).fetchone()
return bool(row and row[0] == raw_hash)
def save_snapshot(
conn: sqlite3.Connection,
run_id: str,
snapshot: StationSnapshot,
skip_unchanged: bool = False,
) -> bool:
"""
Save a single station snapshot.
Returns:
True if inserted;
False if skipped because unchanged.
"""
if skip_unchanged and has_same_latest_hash(
conn, snapshot.station_id, snapshot.raw_hash
):
return False
conn.execute(
"""
INSERT OR REPLACE INTO station_snapshot (
run_id,
station_id,
station_name,
sea_area,
latitude,
longitude,
observed_items_json,
source_updated_at,
snapshot_at,
source_url,
raw_hash,
raw_json
)
VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?)
""",
(
run_id,
snapshot.station_id,
snapshot.station_name,
snapshot.sea_area,
snapshot.latitude,
snapshot.longitude,
json.dumps(snapshot.observed_items, ensure_ascii=False),
snapshot.source_updated_at,
snapshot.snapshot_at,
snapshot.source_url,
snapshot.raw_hash,
json.dumps(snapshot.raw_json, ensure_ascii=False, sort_keys=True),
),
)
conn.execute(
"""
INSERT INTO station_latest (
station_id,
latest_hash,
last_seen_run_id,
last_seen_at
)
VALUES (?, ?, ?, ?)
ON CONFLICT(station_id)
DO UPDATE SET
latest_hash = excluded.latest_hash,
last_seen_run_id = excluded.last_seen_run_id,
last_seen_at = excluded.last_seen_at
""",
(
snapshot.station_id,
snapshot.raw_hash,
run_id,
snapshot.snapshot_at,
),
)
return True
def save_snapshots(
conn: sqlite3.Connection,
run_id: str,
snapshots: Iterable[StationSnapshot],
skip_unchanged: bool = False,
) -> tuple[int, int]:
inserted = 0
skipped = 0
with conn:
for snapshot in snapshots:
ok = save_snapshot(
conn=conn,
run_id=run_id,
snapshot=snapshot,
skip_unchanged=skip_unchanged,
)
if ok:
inserted += 1
else:
skipped += 1
return inserted, skipped
def load_run_rows(conn: sqlite3.Connection, run_id: str) -> list[dict]:
cursor = conn.execute(
"""
SELECT
run_id,
station_id,
station_name,
sea_area,
latitude,
longitude,
observed_items_json,
source_updated_at,
snapshot_at,
source_url,
raw_hash
FROM station_snapshot
WHERE run_id = ?
ORDER BY station_id
""",
(run_id,),
)
columns = [desc[0] for desc in cursor.description]
rows: list[dict] = []
for record in cursor.fetchall():
row = dict(zip(columns, record))
try:
items = json.loads(row["observed_items_json"])
except json.JSONDecodeError:
items = []
row["observed_items"] = ";".join(str(x) for x in items)
row.pop("observed_items_json", None)
rows.append(row)
return rows
def export_csv(rows: list[dict], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
if not rows:
output_path.write_text("", encoding="utf-8")
return
fieldnames = list(rows[0].keys())
with output_path.open("w", newline="", encoding="utf-8-sig") as f:
writer = csv.DictWriter(f, fieldnames=fieldnames)
writer.writeheader()
writer.writerows(rows)
def export_json(rows: list[dict], output_path: Path) -> None:
output_path.parent.mkdir(parents=True, exist_ok=True)
output_path.write_text(
json.dumps(rows, ensure_ascii=False, indent=2),
encoding="utf-8",
)
9️⃣ 运行方式与结果展示(必写)
9.1 runner.py
这个文件是入口程序,负责把请求层、解析层、存储层串起来。
# src/tide_snapshot/runner.py
from __future__ import annotations
import argparse
import json
from datetime import datetime, timezone
from pathlib import Path
from typing import Any
from tide_snapshot.config import COOPS_MDAPI_BASE, DB_PATH, EXPORT_DIR, RAW_DIR
from tide_snapshot.fetcher import FetchError, HttpFetcher
from tide_snapshot.parser import (
StationSnapshot,
build_station_snapshot,
parse_product_names,
parse_station_list,
utc_now_iso,
)
from tide_snapshot.storage import (
connect_db,
export_csv,
export_json,
init_db,
load_run_rows,
save_snapshots,
)
def make_run_id() -> str:
return datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%SZ")
def get_source_updated_at(headers: dict[str, str]) -> str | None:
"""
Prefer Last-Modified. If not available, keep Date header.
If neither exists, return None.
"""
return headers.get("Last-Modified") or headers.get("Date")
def station_products_url(station: dict[str, Any]) -> str | None:
products = station.get("products")
if isinstance(products, dict):
value = products.get("self")
if isinstance(value, str) and value.startswith("http"):
return value
station_id = station.get("id")
if station_id:
return f"{COOPS_MDAPI_BASE}/stations/{station_id}/products.json"
return None
def fetch_product_names(
fetcher: HttpFetcher,
station: dict[str, Any],
raw_dir: Path,
run_id: str,
) -> list[str]:
url = station_products_url(station)
station_id = str(station.get("id") or "unknown")
if not url:
return []
try:
result = fetcher.get_json(url)
except FetchError as exc:
print(f"[WARN] product request failed: station_id={station_id}; {exc}")
return []
raw_path = raw_dir / run_id / f"products_{station_id}.json"
raw_path.parent.mkdir(parents=True, exist_ok=True)
raw_path.write_text(
json.dumps(result.json_data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
return parse_product_names(result.json_data)
def collect_station_snapshots(
station_type: str,
limit: int | None,
with_products: bool,
sleep_seconds: float,
) -> tuple[str, list[StationSnapshot]]:
run_id = make_run_id()
snapshot_at = utc_now_iso()
fetcher = HttpFetcher(sleep_seconds=sleep_seconds)
list_url = f"{COOPS_MDAPI_BASE}/stations.json"
params = {
"type": station_type,
"units": "metric",
}
print(f"[INFO] run_id={run_id}")
print(f"[INFO] requesting station list: type={station_type}")
list_result = fetcher.get_json(list_url, params=params)
source_updated_at = get_source_updated_at(list_result.headers)
raw_run_dir = RAW_DIR / run_id
raw_run_dir.mkdir(parents=True, exist_ok=True)
(raw_run_dir / f"stations_{station_type}.json").write_text(
json.dumps(list_result.json_data, ensure_ascii=False, indent=2),
encoding="utf-8",
)
stations = parse_station_list(list_result.json_data)
if limit is not None and limit > 0:
stations = stations[:limit]
print(f"[INFO] station records found: {len(stations)}")
print(f"[INFO] with_products={with_products}")
snapshots: list[StationSnapshot] = []
for idx, station in enumerate(stations, start=1):
station_id = station.get("id")
station_name = station.get("name")
print(f"[INFO] parsing {idx}/{len(stations)} station={station_id} {station_name}")
product_names: list[str] | None = None
if with_products:
product_names = fetch_product_names(
fetcher=fetcher,
station=station,
raw_dir=RAW_DIR,
run_id=run_id,
)
try:
snapshot = build_station_snapshot(
station=station,
source_url=str(station.get("self") or list_result.url),
station_type=station_type,
product_names=product_names,
source_updated_at=source_updated_at,
snapshot_at=snapshot_at,
)
except ValueError as exc:
print(f"[WARN] skip bad station record: {exc}")
continue
snapshots.append(snapshot)
return run_id, snapshots
def main() -> None:
parser = argparse.ArgumentParser(
description="Archive public tide/water-level station metadata snapshots."
)
parser.add_argument(
"--station-type",
default="waterlevels",
choices=[
"waterlevels",
"historicwl",
"tidepredictions",
"met",
"waterlevelsandmet",
"currents",
"currentpredictions",
],
help="Station collection type to request.",
)
parser.add_argument(
"--limit",
type=int,
default=0,
help="Limit number of stations for testing. 0 means no limit.",
)
parser.add_argument(
"--with-products",
action="store_true",
help="Fetch product list for every station. Slower but more detailed.",
)
parser.add_argument(
"--sleep",
type=float,
default=0.8,
help="Sleep seconds between successive HTTP calls.",
)
parser.add_argument(
"--skip-unchanged",
action="store_true",
help="Skip inserting station rows whose metadata hash is unchanged.",
)
args = parser.parse_args()
run_id, snapshots = collect_station_snapshots(
station_type=args.station_type,
limit=args.limit if args.limit > 0 else None,
with_products=args.with_products,
sleep_seconds=args.sleep,
)
conn = connect_db(DB_PATH)
init_db(conn)
inserted, skipped = save_snapshots(
conn=conn,
run_id=run_id,
snapshots=snapshots,
skip_unchanged=args.skip_unchanged,
)
rows = load_run_rows(conn, run_id)
csv_path = EXPORT_DIR / f"tide_station_snapshot_{run_id}.csv"
json_path = EXPORT_DIR / f"tide_station_snapshot_{run_id}.json"
export_csv(rows, csv_path)
export_json(rows, json_path)
print("[DONE]")
print(f"[DONE] run_id={run_id}")
print(f"[DONE] total_snapshots={len(snapshots)}")
print(f"[DONE] inserted={inserted}, skipped={skipped}")
print(f"[DONE] sqlite={DB_PATH}")
print(f"[DONE] csv={csv_path}")
print(f"[DONE] json={json_path}")
if __name__ == "__main__":
main()
9.2 启动命令
在项目根目录运行:
PYTHONPATH=src python -m tide_snapshot.runner
测试时建议限制数量:
PYTHONPATH=src python -m tide_snapshot.runner --limit 10
如果要拉取每个站点的产品详情:
PYTHONPATH=src python -m tide_snapshot.runner --limit 10 --with-products --sleep 1.0
正式跑全量水位站点:
PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels --with-products --sleep 1.0
如果只想保存变化记录:
PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels --with-products --skip-unchanged --sleep 1.0
9.3 输出位置
运行后会生成:
data/
├── raw/
│ └── 20260610T120000Z/
│ ├── stations_waterlevels.json
│ ├── products_9414290.json
│ └── ...
├── exports/
│ ├── tide_station_snapshot_20260610T120000Z.csv
│ └── tide_station_snapshot_20260610T120000Z.json
└── tide_stations.sqlite3
其中:
data/raw/保存原始响应,方便复查;data/exports/保存面向分析的 CSV/JSON;data/tide_stations.sqlite3保存长期快照。
9.4 示例结果
下面是示例输出格式。实际结果以你运行时接口返回为准。
| station_id | station_name | sea_area | latitude | longitude | observed_items | snapshot_at |
|---|---|---|---|---|---|---|
| 1611400 | Nawiliwili | Central Pacific / Hawaii | 21.954506 | -159.3561 | water_level;tide_related | 2026-06-10T12:00:00+00:00 |
| 9414290 | San Francisco | US Pacific Coast | 37.806305 | -122.46589 | water_level;tide_related;forecast | 2026-06-10T12:00:00+00:00 |
| 9447130 | Seattle | US Pacific Coast | null | null | water_level | 2026-06-10T12:00:00+00:00 |
| 8724580 | Key West | US Atlantic / Gulf Coast | null | null | water_level | 2026-06-10T12:00:00+00:00 |
如果你运行时开启 --with-products,observed_items 会更丰富,因为它会从产品接口里提取站点支持的产品名。
9.5 查看 SQLite 数据
进入 SQLite:
sqlite3 data/tide_stations.sqlite3
查看某次快照:
SELECT
station_id,
station_name,
sea_area,
latitude,
longitude,
observed_items_json,
snapshot_at
FROM station_snapshot
ORDER BY station_id
LIMIT 5;
查看某站点历史变化:
SELECT
run_id,
station_id,
station_name,
sea_area,
raw_hash,
snapshot_at
FROM station_snapshot
WHERE station_id = '9414290'
ORDER BY snapshot_at DESC;
查看新增或变化情况,可以在应用层对比不同 run_id 的 hash,也可以用 SQL 做:
SELECT
a.station_id,
a.station_name,
a.raw_hash AS current_hash,
b.raw_hash AS previous_hash
FROM station_snapshot a
LEFT JOIN station_snapshot b
ON a.station_id = b.station_id
AND b.run_id = '上一批次run_id'
WHERE a.run_id = '当前批次run_id'
AND (b.raw_hash IS NULL OR a.raw_hash <> b.raw_hash);
🔟 常见问题与排错(强烈建议写)
10.1 遇到 403 怎么办?
403 一般表示拒绝访问。常见原因包括:
- 请求路径不适合自动访问;
- headers 太异常;
- 访问频率异常;
- 目标服务需要授权;
- IP 或网络环境被限制。
处理思路:
- 先确认是否访问公开 API;
- 确认 URL 是否写错;
- 设置清晰、真实的
User-Agent; - 降低访问频率;
- 不要尝试绕过登录或付费限制;
- 如果是公开服务且仍然异常,查看官方文档或联系服务方。
不要把 403 简单理解成“需要代理”。代理不是合规的万能钥匙。
10.2 遇到 429 怎么办?
429 表示请求过多。
本文已经做了两件事:
- 对 429 做有限重试;
- 请求之间加入 sleep。
如果仍然出现 429,可以继续降低速度:
PYTHONPATH=src python -m tide_snapshot.runner --with-products --sleep 2.0
或者先不要抓产品详情:
PYTHONPATH=src python -m tide_snapshot.runner --station-type waterlevels
产品详情是一站一请求,数量多时会明显增加请求总数。很多元数据快照任务并不需要每次都拉产品详情,可以每周拉一次产品详情,每天只拉列表。
10.3 HTML 抓到空壳怎么办?
如果你采集的是网页,而不是本文这种 API,有时会发现 requests.get() 返回的 HTML 里面只有一堆 JS,没有目标数据。这通常说明页面是动态渲染的。
排查方法:
- 打开浏览器开发者工具;
- 切换到 Network 面板;
- 刷新页面;
- 查找 XHR/Fetch 请求;
- 看是否有 JSON 接口;
- 优先采集接口,而不是用浏览器自动化硬等 DOM。
如果确实没有接口,才考虑 Playwright。
但是要记住,Playwright 不是免死金牌。它更像真实浏览器,也更重。如果只是公开 JSON,没必要上它。
10.4 解析报错怎么办?
解析报错通常有几种原因:
- 字段名变了;
- JSON 结构变了;
- 某个字段从对象变成数组;
- 某个字段临时为空;
- 接口返回错误信息,但程序当成正常数据解析。
处理方法:
第一,保存 raw 响应。本文已经把原始 JSON 保存到 data/raw/{run_id}/,排错时直接打开看。
第二,解析函数不要写得太硬。比如 parse_station_list() 同时兼容 stations 和 stationList。
第三,关键字段缺失要有日志。比如站点 ID 缺失时跳过并打印 warn。
第四,给解析层写测试。哪怕只写几个样例,也比完全没有强。
10.5 编码或乱码怎么办?
JSON API 一般乱码较少。如果导出 CSV 后用 Excel 打开乱码,可以注意两点:
- CSV 使用
utf-8-sig; - JSON 使用
ensure_ascii=False。
本文 export_csv() 已经使用:
encoding="utf-8-sig"
这对 Excel 用户比较友好。
10.6 为什么坐标会是 null?
如果源接口没有返回 lat 或 lng,本文会保存为 NULL。这比填 0 更安全。
0,0 是真实地理位置,在几内亚湾附近。把缺失坐标填成 0,后面做地图展示时会误导分析。
10.7 为什么海域是推断出来的?
因为源站元数据里通常提供的是州、地区、是否五大湖等信息,不一定直接提供“海域”字段。
本文用 state 和 greatlakes 推断:
greatlakes=true→ Great Lakes;CA/OR/WA→ US Pacific Coast;ME/MA/NY/...→ US Atlantic Coast;TX/LA/MS/...→ US Gulf Coast;PR/VI→ Caribbean。
如果你有更严谨的海域边界,可以引入 GIS 多边形,用点落区算法判断。本文先用轻量映射,便于复现。
1️⃣1️⃣ 进阶优化(可选但加分)
11.1 并发优化
本文默认串行请求。对元数据快照来说,这已经够用。
如果你确实要优化,可以考虑:
ThreadPoolExecutor;asyncio+httpx.AsyncClient;- Scrapy。
但我建议先加一个全局限速器,别让并发把请求瞬间打满。
一个简单的线程池版本思路如下:
from concurrent.futures import ThreadPoolExecutor, as_completed
def fetch_many_products(stations, max_workers=3):
results = {}
with ThreadPoolExecutor(max_workers=max_workers) as executor:
future_map = {
executor.submit(fetch_product_for_station, station): station
for station in stations
}
for future in as_completed(future_map):
station = future_map[future]
station_id = station.get("id")
try:
results[station_id] = future.result()
except Exception as exc:
print(f"[WARN] station={station_id} failed: {exc}")
return results
注意,这只是思路。真实使用时仍然要做限速,不要因为线程池方便就开几十上百个 worker。
11.2 断点续跑
如果全量采集 products,任务可能跑到一半失败。可以通过 raw 文件和数据库实现断点续跑。
简单策略:
- 每抓一个 products 响应,就写入
data/raw/{run_id}/products_{station_id}.json; - 下次运行时,如果文件存在,就直接读取本地 raw;
- 如果数据库里已有相同 run_id + station_id,就跳过;
- 最后补齐缺失站点。
伪代码:
def load_product_from_cache(raw_path):
if raw_path.exists():
return json.loads(raw_path.read_text(encoding="utf-8"))
return None
再进一步,可以专门建一个 fetch_log 表:
CREATE TABLE fetch_log (
run_id TEXT NOT NULL,
station_id TEXT NOT NULL,
url TEXT NOT NULL,
status TEXT NOT NULL,
error_message TEXT,
fetched_at TEXT NOT NULL,
PRIMARY KEY (run_id, station_id, url)
);
这样失败站点可以单独重跑。
11.3 日志与监控
当任务从“本地脚本”变成“定时生产任务”时,日志就很重要。
至少记录:
- 本次 run_id;
- 请求站点类型;
- 站点总数;
- 成功数;
- 失败数;
- 跳过数;
- 输出路径;
- 总耗时。
可以把 print 换成 logging:
import logging
logging.basicConfig(
level=logging.INFO,
format="%(asctime)s %(levelname)s %(message)s",
)
logger = logging.getLogger(__name__)
logger.info("start station snapshot")
logger.warning("product request failed")
logger.error("job failed", exc_info=True)
我个人很不喜欢没有日志的爬虫。它跑成功时看起来挺干净,一失败就像黑盒。
11.4 定时任务
Linux/macOS 可以用 cron:
crontab -e
每天凌晨 2 点执行:
0 2 * * * cd /path/to/tide_station_snapshot && /path/to/.venv/bin/python -m tide_snapshot.runner --station-type waterlevels --with-products --sleep 1.0 >> logs/tide_snapshot.log 2>&1
注意提前创建 logs 目录:
mkdir -p logs
如果团队里已经有 Airflow,可以把它做成 DAG:
from airflow import DAG
from airflow.operators.bash import BashOperator
from datetime import datetime
with DAG(
dag_id="tide_station_snapshot",
start_date=datetime(2026, 1, 1),
schedule="@daily",
catchup=False,
) as dag:
run_snapshot = BashOperator(
task_id="run_snapshot",
bash_command=(
"cd /path/to/tide_station_snapshot && "
"/path/to/.venv/bin/python -m tide_snapshot.runner "
"--station-type waterlevels --with-products --sleep 1.0"
),
)
11.5 数据质量检查
站点元数据也要做质量检查。建议加几个规则:
station_id不为空;station_name不为空;latitude在 -90 到 90;longitude在 -180 到 180;observed_items不为空;- 同一批次站点 ID 不重复;
- 本次站点数量不要比上次骤降太多。
示例检查函数:
def validate_snapshot(snapshot):
errors = []
if not snapshot.station_id:
errors.append("missing station_id")
if not snapshot.station_name:
errors.append("missing station_name")
if snapshot.latitude is not None:
if not (-90 <= snapshot.latitude <= 90):
errors.append(f"invalid latitude: {snapshot.latitude}")
if snapshot.longitude is not None:
if not (-180 <= snapshot.longitude <= 180):
errors.append(f"invalid longitude: {snapshot.longitude}")
if not snapshot.observed_items:
errors.append("empty observed_items")
return errors
数据质量检查不一定要阻断任务,但至少要记录。尤其是站点数量突然少很多时,要警惕是不是接口参数变了、网络返回错误页了,或者解析逻辑坏了。
11.6 地理增强
当前 sea_area 是通过州和区域推断的。更高级的做法是引入地理边界。
例如:
- 准备太平洋、美国东海岸、墨西哥湾、加勒比海、五大湖的 GeoJSON;
- 用
shapely判断站点坐标落在哪个 polygon; - 将结果写入
sea_area。
依赖:
pip install shapely
伪代码:
from shapely.geometry import Point, shape
import json
def load_polygons(path):
data = json.loads(path.read_text(encoding="utf-8"))
polygons = []
for feature in data["features"]:
polygons.append(
(
feature["properties"]["name"],
shape(feature["geometry"]),
)
)
return polygons
def infer_area_by_polygon(lat, lng, polygons):
if lat is None or lng is None:
return "Unknown"
point = Point(lng, lat)
for name, polygon in polygons:
if polygon.contains(point):
return name
return "Unknown"
这一步不是本文必需,但如果你要做严肃地图产品,建议上 GIS 判定,而不是只靠 state 映射。
11.7 从站点索引扩展到观测值采集
站点索引归档完成后,下一步通常是采集观测值。例如水位、潮汐预报、气象数据等。
不过这类数据和元数据不同:
- 时间范围更大;
- 数据量更大;
- 接口限制更多;
- 需要处理时间分片;
- 需要处理单位、时区、基准面;
- 需要更严格的断点续跑。
所以我建议把“站点索引”和“观测值采集”拆成两个项目或两个模块。不要在一开始就混在一起。
一个合理流程是:
站点索引快照
↓
筛选支持 water_level 的站点
↓
按站点与时间范围分片
↓
采集观测值
↓
写入时序表
↓
做质量检查和可视化
1️⃣2️⃣ 总结与延伸阅读
本文完成了一个完整的潮位站公开站点索引快照项目。
我们做了这些事情:
- 选择公开 Metadata API 作为数据源;
- 说明了合规边界和 robots.txt 基本原则;
- 用
requests.Session实现请求层; - 加入 headers、timeout、retry、sleep;
- 用解析层统一处理站点列表和产品列表;
- 将站点名、海域、坐标、观测项目、更新时间映射成内部结构;
- 用 SQLite 保存快照;
- 用 CSV/JSON 导出结果;
- 用
raw_hash做变更判断; - 给出运行命令、示例结果和排错方法;
- 讨论了并发、断点续跑、日志、定时任务、GIS 增强等进阶方向。
我比较喜欢这种小而完整的项目。它没有炫技,也没有为了“像爬虫”而硬上复杂框架。它解决的是一个真实问题:把公开站点元数据稳定地采集下来,并且让每一次采集都有迹可循。
下一步可以继续扩展:
- 用 Scrapy 改造调度层,支持多个数据源;
- 用 Playwright 处理少数没有公开 API 的动态站点;
- 引入 GeoJSON 和 Shapely 做更严谨的海域分类;
- 增加观测值采集模块,形成站点索引 + 时间序列数据体系;
- 把 SQLite 替换成 PostgreSQL/PostGIS,支持空间查询;
- 接入 Airflow 或 Prefect,实现生产级定时采集;
- 增加数据质量报告,监控站点数量、字段缺失率和变化率。
最后说一句很实在的话:爬虫写到后面,真正拉开差距的不是“会不会请求网页”,而是能不能把请求、解析、清洗、存储、日志、合规、复跑都处理好。潮位站点索引这种任务,正好适合练这套基本功。稳一点,慢一点,留下原始数据和快照记录,后面你会感谢现在这个谨慎的自己。
🌟 文末
好啦~以上就是本期的全部内容啦!如果你在实践过程中遇到任何疑问,欢迎在评论区留言交流,我看到都会尽量回复~咱们下期见!
小伙伴们在批阅的过程中,如果觉得文章不错,欢迎点赞、收藏、关注哦~
三连就是对我写作道路上最好的鼓励与支持! ❤️🔥
✅ 专栏持续更新中|建议收藏 + 订阅
墙裂推荐订阅专栏 👉 《Python爬虫实战》,本专栏秉承着以“入门 → 进阶 → 工程化 → 项目落地”的路线持续更新,争取让每一期内容都做到:
✅ 讲得清楚(原理)|✅ 跑得起来(代码)|✅ 用得上(场景)|✅ 扛得住(工程化)
📣 想系统提升的小伙伴:强烈建议先订阅专栏 《Python爬虫实战》,再按目录大纲顺序学习,效率十倍上升~
✅ 互动征集
想让我把【某站点/某反爬/某验证码/某分布式方案】等写成某期实战?
评论区留言告诉我你的需求,我会优先安排实现(更新)哒~
⭐️ 若喜欢我,就请关注我叭~(更新不迷路)
⭐️ 若对你有用,就请点赞支持一下叭~(给我一点点动力)
⭐️ 若有疑问,就请评论留言告诉我叭~(我会补坑 & 更新迭代)
✅ 免责声明
本文爬虫思路、相关技术和代码仅用于学习参考,对阅读本文后的进行爬虫行为的用户本作者不承担任何法律责任。
使用或者参考本项目即表示您已阅读并同意以下条款:
- 合法使用: 不得将本项目用于任何违法、违规或侵犯他人权益的行为,包括但不限于网络攻击、诈骗、绕过身份验证、未经授权的数据抓取等。
- 风险自负: 任何因使用本项目而产生的法律责任、技术风险或经济损失,由使用者自行承担,项目作者不承担任何形式的责任。
- 禁止滥用: 不得将本项目用于违法牟利、黑产活动或其他不当商业用途。
- 使用或者参考本项目即视为同意上述条款,即 “谁使用,谁负责” 。如不同意,请立即停止使用并删除本项目。!!!
更多推荐
所有评论(0)