Python 数据聚合脚本设计:配置驱动 + 三层分离架构(避免 N 个 if-else 噩梦)
这是「LLM Agent 工程实战」系列第 2 篇。
第 1 篇:让 LLM 严格按指令工作 — 3 个 Skill 设计原则(链接占位,发布后可补)
一、问题:脚本是怎么变成屎山的
刚开始做一个数据聚合脚本,需求很简单:
从几个 RSS 源拉数据,用关键词打分,存数据库,生成 Markdown 报告。
第一版代码长这样:
# fetch.py 第一版(约 50 行)
import feedparser, sqlite3
URLS = [
"https://example.com/rss-a.xml",
"https://example.com/rss-b.xml",
]
KEYWORDS = ["关键词1", "关键词2", "关键词3"]
def main():
conn = sqlite3.connect("data.db")
for url in URLS:
feed = feedparser.parse(url)
for entry in feed.entries:
score = sum(1 for k in KEYWORDS if k in entry.title)
if score > 0:
conn.execute("INSERT OR IGNORE INTO articles ...", ...)
conn.commit()
跑通了。然后就开始真实需求迭代:
- “加个新源” → 改 URLS 列表
- “权威源不需要关键词命中也入选” → 加 if-else
- “不同源用不同时间窗口” → 改函数签名
- “关键词分高/中/低三档权重” → KEYWORDS 改成字典
- “加几个完全不同维度的数据(不是 RSS)” → 加新函数
- “这条记录上次推送过了,这次别再推送” → 加去重表
两周后,fetch.py 长这样:
def main():
for url in URLS:
if "auth_source" in url:
score = 5 # 权威源直接入选
cutoff = datetime.now() - timedelta(hours=168)
elif "slow_source" in url:
score = compute_score(...)
cutoff = datetime.now() - timedelta(hours=72)
elif "fast_source" in url:
# ... 又一堆 if-else
# ...(继续无止境地堆叠)
典型的屎山。每加一个需求就插一段 if-else,每改一个源就要动好几处代码。三个月后没人想维护,包括你自己。
本文记录我用配置驱动 + 三层分离架构重构这套系统的过程,以及具体的设计决策。最终代码量没增加,但新增任何需求都不用改主代码了。
二、三层分离
把代码按"变化频率"分三层:
┌──────────────────────────────┐
│ 配置层 (sources.yaml) │ ← 经常变(每周)
│ 数据源、关键词、时间窗口 │
├──────────────────────────────┤
│ 数据层 (fetch.py + 子模块) │ ← 偶尔变(每月)
│ 抓取、打分、去重、整合 │
├──────────────────────────────┤
│ 调度层 (cron + 调度器) │ ← 几乎不变
│ 触发时间、消息渠道 │
└──────────────────────────────┘
变化越频繁的越要往上提。配置文件改起来零成本,代码改起来要重新测。
三、配置层设计
3.1 用 YAML 不是 JSON
YAML 支持注释、多行字符串、引用,对人类友好:
sources:
- name: SourceA
url: https://example.com/rss-a.xml
weight: 0
tier: category_x
bypass_keywords: false # 是否跳过关键词检查(权威源用)
- name: AuthoritySource # 权威源:任何更新都直接入选
url: https://example.com/rss-auth.xml
weight: 5
tier: category_y
bypass_keywords: true
hours_back: 168 # 7 天窗口(更新慢但价值高)
- name: SlowSource
url: https://example.com/rss-slow.xml
weight: 1
tier: category_x
hours_back: 96 # 4 天窗口
# 关键词词典(三档权重)
keywords:
high: # 命中加 3 分
- 关键词A
- 关键词B
medium: # 命中加 2 分
- 关键词C
company: # 命中加 1 分
- 关键词D
# 全局过滤参数
filter:
hours_back: 24 # 默认时间窗口
min_score: 3 # 入选最低分
max_per_source: 8 # 每源最多入选条数
3.2 配置项命名的两个原则
原则 1:每个字段独立可解释
# 差:一个字段塞多个语义
- url: https://example.com/rss?weight=5&hours=168
# 好:每个字段一个语义
- url: https://example.com/rss
weight: 5
hours_back: 168
原则 2:默认值放全局,特例放单源
filter:
hours_back: 24 # 默认所有源 24h
sources:
- name: SlowSource
hours_back: 96 # 仅这个源覆盖
读取时,先读全局默认 → 检查单源是否有覆盖。代码长这样:
def get_cutoff(src, global_filter):
hours = src.get("hours_back", global_filter["hours_back"])
return datetime.now(tz.utc) - timedelta(hours=hours)
3 行解决"不同源不同窗口",比写 10 个 elif 优雅得多。
四、数据层设计
4.1 打分函数:纯函数,没有副作用
def score_article(title: str, summary: str, kw_cfg: dict) -> int:
"""关键词命中打分。纯函数,相同输入永远相同输出。"""
text = (title + " " + summary).lower()
score = 0
for k in kw_cfg["high"]:
if k.lower() in text: score += 3
for k in kw_cfg["medium"]:
if k.lower() in text: score += 2
for k in kw_cfg["company"]:
if k.lower() in text: score += 1
return score
为什么坚持写纯函数:
- 可单元测试:直接传几个例子断言
- 可缓存:同一文章重复打分时间复杂度优化
- 可调试:传几个例子手工跑就能验证
- 可替换:将来换 ML 模型打分时,只换这一个函数
4.2 单源抓取函数:参数化时间窗口
def fetch_source(src: dict, default_cutoff: datetime, kw_cfg: dict, max_count: int):
"""抓取单个数据源。所有可变参数从外部传入。"""
# 单源覆盖时间窗(特例 > 默认值)
cutoff = default_cutoff
if src.get("hours_back"):
cutoff = datetime.now(tz.utc) - timedelta(hours=src["hours_back"])
feed = feedparser.parse(src["url"])
items = []
for entry in feed.entries:
pub = parse_published(entry)
if pub and pub < cutoff:
continue
# 权威源 bypass 关键词,普通源用关键词 + 来源权重综合打分
if src.get("bypass_keywords"):
score = src["weight"]
else:
score = score_article(entry.title, entry.summary, kw_cfg) \
+ src.get("weight", 0)
items.append({"id": ..., "title": entry.title, "score": score, ...})
items.sort(key=lambda x: -x["score"])
return items[:max_count]
设计要点:
- 所有可能变的参数都从外部传入(src / kw_cfg / max_count)
- 函数本身不读全局变量,纯函数式
- bypass / cutoff 这种业务规则在函数内(流程的一部分)
4.3 主流程
def main():
cfg = load_config("sources.yaml")
default_cutoff = datetime.now(tz.utc) - timedelta(hours=cfg["filter"]["hours_back"])
all_items = []
for src in cfg["sources"]:
items = fetch_source(
src,
default_cutoff,
cfg["keywords"],
cfg["filter"]["max_per_source"],
)
all_items.extend(items)
log.info(f"{src['name']}: {len(items)} 条入选")
save_to_db(all_items)
write_markdown_report(all_items)
主流程没有任何 if-else 判断业务规则。所有规则都在配置里或子函数里。这是关键。
五、模块化扩展:子进程 vs import
需求又来了:业务侧除了从 RSS 抓数据,还要拉几个不同维度的实时数据(比如行情数据、状态数据等),每个维度独立逻辑。
两种实现方式:
方案 A:import 方式(不推荐)
from module_a import fetch_a
from module_b import fetch_b
from module_c import fetch_c
def main():
data_a = fetch_a()
data_b = fetch_b()
data_c = fetch_c()
# 整合到一个报告
问题:
- 任何一个模块的依赖问题(比如 module_a 依赖某个会偶发失败的库)会拖垮整个主流程
- 模块间依赖容易冲突(A 要 numpy 1.x,B 要 numpy 2.x)
- 调试时改了 A 必须重启整个 fetch.py
方案 B:子进程方式(推荐)
PYBIN = "/home/user/.venv/bin/python"
data_modules = [
("module_a.py", "## 模块 A 标题"),
("module_b.py", "## 模块 B 标题"),
("module_c.py", "## 模块 C 标题"),
]
for script, title in data_modules:
try:
md = subprocess.check_output(
[PYBIN, str(ROOT / script), "--markdown"],
timeout=30,
text=True,
stderr=subprocess.DEVNULL,
)
lines.append(md)
except subprocess.TimeoutExpired:
log.warning(f"{script} 超时")
lines.append(f"\n{title}\n\n_数据获取超时_\n")
except Exception as e:
log.warning(f"{script} 失败: {e}")
lines.append(f"\n{title}\n\n_数据获取失败_\n")
每个 module_x.py 都有自己的 main() 和 --markdown 参数,独立可跑:
# 调试模块 A 时,独立跑
$ /home/user/.venv/bin/python module_a.py --markdown
## 模块 A 标题
- 数据 1
- 数据 2
...
优势:
- 依赖完全隔离:每个模块可以有自己的 venv 和依赖
- 故障隔离:单模块崩溃不影响主流程(main 抓 except)
- timeout 强制可控:subprocess 的 timeout 参数生效
- stderr 可隔离:每个模块的 INFO 日志不会污染主报告输出
- 调试友好:单模块独立运行,能直接看输出
代价:每次启动子进程约 200ms 开销。对于"每天定时跑一次"的场景,4 个模块 × 200ms = 800ms,完全可以接受。
如果你的场景是"每秒跑 100 次",就该用 import;如果是"每天跑 1 次",就该用子进程。
六、实际收益
重构前 vs 重构后对比:
| 需求变更 | 重构前 | 重构后 |
|---|---|---|
| 加一个数据源 | 改 4 处代码 | 加 5 行 yaml |
| 调时间窗口 | 改函数签名 | 改 1 行 yaml |
| 加新关键词 | 改 KEYWORDS 列表,改打分函数 | 加 1 行 yaml |
| 改入选门槛 | 满代码搜 magic number | 改 1 行 yaml |
| 加新数据维度 | 重构整个流程 | 写个 module_x.py + 1 行注册 |
| 临时禁用某个源 | 注释代码 | yaml 删一节或注释 |
核心收益:从"改代码改 N 处"变成"改配置文件改 1 处"。
七、什么时候不该用配置驱动
配置驱动也有适用边界。如果出现以下情况,别强行配置化:
- 配置项数量超过代码行数 → 你做的不是配置文件,是 DSL,应该用代码
- 配置项之间有复杂依赖关系(A 依赖 B 但 B 又会被 C 改写) → 复杂依赖比 if-else 更难维护
- 配置内容会被频繁条件判断 → 还是写到代码里
- 每次"调配置"都要重启服务 → 没解决任何问题,徒增复杂度
我观察过一些团队把所有东西都塞 yaml,结果 yaml 5000 行,新人看一周看不懂业务逻辑。这是过度配置化。
合理的边界:改频率 ≥ 每周一次的,往配置层提;改频率 ≤ 每月一次的,留在代码里。
八、关键 takeaway
三句话总结:
- 变化越频繁的越要往上提层 — 数据源每周变、架构每月变、调度几乎不变,分层设计
- 主流程不写业务规则 — 业务规则要么在配置里,要么在子函数里,主流程只串流程
- 子进程隔离换扩展性 — 800ms 启动开销换"故障隔离 + 依赖隔离",对低频任务太值得
九、下一篇预告
下一篇会写第三方库踩坑系列 — 在做这套数据采集时遇到的几个真实"反直觉"问题:
- feedparser 时间字段的解析陷阱
- httpx 默认行为带来的非预期网络配置
- yfinance 的偶发性失败处理
- SQLite 去重的两个常见反模式(包括 INSERT OR IGNORE 的妙用)
每一个都是文档里查不到、stackoverflow 上没现成答案的问题,记录下来给同样踩坑的人参考。
#Python #架构设计 #软件设计 #工程实践 #原创
转载请注明来源。
更多推荐
所有评论(0)