上个月老板说:"能不能搞个实时的排名看板?我要随时能看到我们关键词的波动。“我嘴上说"行”,心里想的是又要手动整理Excel了。后来花了两天搭了套自动化的东西,没想到运行一个月后,老板自己都不记得日报这回事了。

一、需求场景:从日报到实时看板

我们团队管着二十多个站点,主要关键词大概 800 个。之前的流程是这样的:

  1. 每天早上跑一次采集脚本
  2. 导出 CSV
  3. 手动用 Excel 透视一下
  4. 发日报邮件

问题很明显:日报是昨天的数据,你永远不知道今天发生了什么。有时候凌晨搜索引擎动了,排名掉了大半页,等到第二天才发现,损失已经造成了。

需求很明确:数据要实时刷新,看板要一目了然,最好还能做趋势对比。

二、整体架构

定时调度 (APScheduler)
    |
    ↓
采集服务 (Python + SerpBase API)
    |
    ↓
数据清洗 + 计算 (pandas)
    |
    ↓
时序数据库 (InfluxDB)
    |
    ↓
可视化 (Grafana)

选型理由:

  • SerpBase API:800 个关键词 × 每天采样 4 次 = 3200 次/天,用 $10 Starter 包够跑一个月。重点是 API 返回直接是结构化的 JSON,省掉了 HTML 解析的步骤。
  • InfluxDB:专门存时序数据,排名本身就是时序数据,天然适合。
  • Grafana:现成的看板方案,不用自己写前端。

三、核心代码实现

3.1 采集服务

import requests
import time
from datetime import datetime
from influxdb_client import InfluxDBClient, Point
from influxdb_client.client.write_api import SYNCHRONOUS
from apscheduler.schedulers.blocking import BlockingScheduler

class RankCollector:
    def __init__(self, serp_key: str, influx_config: dict):
        self.serp_key = serp_key
        self.serp_url = "https://api.serpbase.dev/google/search"
        self.influx = InfluxDBClient(
            url=influx_config["url"],
            token=influx_config["token"],
            org=influx_config["org"]
        )
        self.write_api = self.influx.write_api(write_type=SYNCHRONOUS)
        self.bucket = influx_config["bucket"]
    
    def search_keyword(self, keyword: str, gl: str = "us") -> dict:
        headers = {
            "X-API-Key": self.serp_key,
            "Content-Type": "application/json"
        }
        body = {"q": keyword, "hl": "en", "gl": gl, "page": 1}
        resp = requests.post(self.serp_url, headers=headers, json=body, timeout=30)
        return resp.json()
    
    def find_my_domain(self, results: list, domain: str) -> int:
        for item in results:
            if domain in item.get("link", ""):
                return item.get("rank", 999)
        return 999  # 没排上
    
    def collect(self, keywords: list, domain: str, gl: str):
        ts = datetime.utcnow()
        for kw in keywords:
            data = self.search_keyword(kw, gl)
            organic = data.get("organic", [])
            rank = self.find_my_domain(organic, domain)
            
            point = Point("seo_rank") \
                .tag("keyword", kw) \
                .tag("domain", domain) \
                .tag("country", gl) \
                .field("position", rank) \
                .time(ts)
            
            self.write_api.write(bucket=self.bucket, record=point)
            print(f"[{ts}] {kw} (gl={gl}) -> rank {rank}")
            time.sleep(0.8)  # 限流
    
    def run_all(self):
        sites = [
            {"domain": "example.com", "keywords": ["python tutorial", "django guide", ...], "gl": "us"},
            {"domain": "example.de", "keywords": ["python tutorial", "django guide", ...], "gl": "de"},
        ]
        for site in sites:
            self.collect(site["keywords"], site["domain"], site["gl"])

# 定时任务:每天6点、12点、18点、0点各跑一次
scheduler = BlockingScheduler()
collector = RankCollector(serp_key="YOUR_KEY", influx_config={...})

@scheduler.scheduled_job("cron", hour="6,12,18,0", minute="0")
def scheduled_collect():
    collector.run_all()

scheduler.start()

3.2 增量检查(发现问题即时告警)

光定时采集还不够,排名大幅波动时需要即时通知。

class RankAlert:
    def __init__(self, influx_config: dict):
        self.client = InfluxDBClient(**influx_config)
        self.query_api = self.client.query_api()
    
    def check_drops(self, threshold: int = 5, hours: int = 6):
        query = f'''
        from(bucket: "seo_bucket")
            |> range(start: -{hours}h)
            |> filter(fn: (r) => r._field == "position")
            |> aggregateWindow(every: 6h, fn: last)
            |> difference()
            |> filter(fn: (r) => r._value >= {threshold})
        '''
        result = self.query_api.query(query)
        alerts = []
        for table in result:
            for record in table.records:
                alerts.append({
                    "keyword": record["keyword"],
                    "domain": record["domain"],
                    "drop": record["_value"],
                    "time": record["_time"]
                })
        return alerts

四、Grafana 看板配置

InfluxDB 数据源连上 Grafana 后,我配了几个面板:

4.1 排名走势图

查询语句:

from(bucket: "seo_bucket")
  |> range(start: -v.timeRange)
  |> filter(fn: (r) => r._measurement == "seo_rank" and r.keyword == "python tutorial")
  |> yield(name: "position")

图表类型用 time series,Y 轴反转(排名1在顶部)。可以同时叠加多个关键词的趋势线对比。

4.2 当前排名快照

用 Stat 面板展示当前最新排名,按关键词分组。绿色(1-5)、黄色(6-15)、红色(15+),一目了然。

4.3 排名波动排行

用 Bar gauge 展示最近 24 小时波动最大的关键词,方便快速定位"出问题"的词。

4.4 告警规则

在 Grafana 里配了个简单规则:任何关键词排名下跌超过 10 位,往钉钉群里推一条告警。

五、运行效果和成本

5.1 一个月后的数据

指标 之前(手动日报) 之后(Grafana 看板)
数据延迟 T+1 实时(6h间隔)
发现问题时间 平均 12h 平均 2h
每周手工工作量 3-4 小时 几乎为 0
老板满意度 一般 “这个好”

5.2 成本

SerpBase 费用:
  800 词 × 4 次/天 × 30 天 = 96,000 次搜索/月
  用 Growth 包 $50(12.5万次),单价 $0.40/千次
  实际花费: $50/月

Infrastructure:
  1 台轻量云服务器: ¥99/月 (~$14)
  InfluxDB + Grafana: 免费(开源)
  
总计: ~$64/月

比起之前手动搞日报的人力成本(假设时薪 $20,每周 3 小时 = $240/月),这个方案的成本几乎可以忽略。

六、踩坑记录

坑1:采样频率太高被封

一开始我设的是每 2 小时跑一次,跑了三天,SerpBase 那边开始返回 429。原因是频率太高加上每个关键词间隔不够。后来改成一小时限流 + 每天 4 次采样,完全没问题。

坑2:排名数据忽高忽低

Google 的 SERP 本身就有波动,同一天搜同一个词,不同时间的排名可能差 3-5 位。这不是 API 的问题,是 Google 自己的波动。我的处理方式是取每日中位数而不是快照值。

# 在 InfluxDB 查询时做聚合
from(bucket: "seo_bucket")
  |> range(start: -1d)
  |> filter(fn: (r) => r._measurement == "seo_rank")
  |> median()

坑3:Grafana 时间线不对齐

InfluxDB 写入时如果用本地时间,夏令时切换时会有偏移。统一用 UTC 时间写入,Grafana 展示时再转成本地时区。

七、总结

这套东西做下来,最大的感受是:技术门槛真的不高

最难的部分其实是"想清楚要监控什么、怎么展示"。一旦需求定了,SerpBase 拿数据、InfluxDB 存数据、Grafana 画图,三个组件接起来就完事了。

如果你的团队还在用手工做 SEO 排名追踪,花两天时间搭这套自动化系统,性价比极高。


后续计划:把搜索量(volume)和点击率(CTR)预测也加进来,做个更完整的流量预测模型。不过那得等我先把 InfluxDB 的 continuous query 搞好。

更多推荐