从零到监控:手把手教你用Python prometheus-client为你的爬虫/脚本添加监控面板

凌晨三点,服务器突然告警——你的爬虫脚本已经连续失败12次,而直到数据库同步异常才被发现。这种场景对开发者来说并不陌生。当脚本运行在无人值守的环境时,传统日志监控就像在迷宫里找路标,而Prometheus提供的指标监控体系,则是为你装上了实时导航仪。

本文将用最接地气的方式,带你为Python脚本构建完整的监控体系。不同于简单的API调用教程,我们会从监控设计原则出发,通过一个真实爬虫项目的改造过程,展示如何用prometheus-client实现从数据采集到告警触发的全链路方案。

1. 监控体系设计:什么值得监控?

在开始写代码前,需要明确监控的黄金指标原则。对于后台脚本,这四个维度缺一不可:

  • 吞吐量 :单位时间处理的任务数(如每分钟抓取页面数)
  • 错误率 :失败请求占比(如HTTP 404/500响应数)
  • 时延 :任务处理耗时分布(特别是长尾请求)
  • 资源饱和度 :队列积压情况(如待处理URL数)

以新闻爬虫为例,关键指标可设计为:

from prometheus_client import Counter, Gauge, Histogram

# 吞吐量相关
pages_crawled = Counter('news_pages_crawled', '已抓取页面总数')
rss_updates = Counter('news_rss_updates', 'RSS源更新次数')

# 错误相关
http_errors = Counter('news_http_errors', 'HTTP错误数', ['status_code'])
parse_failures = Counter('news_parse_failures', '解析失败次数')

# 时延相关
page_load_time = Histogram('news_page_load_seconds', '页面加载耗时分布', 
                          buckets=(0.1, 0.5, 1, 2, 5, 10))

# 饱和度相关
pending_urls = Gauge('news_pending_urls', '待处理URL队列长度')

2. 指标埋点实战:改造现有爬虫

假设我们有一个基础爬虫脚本,原始结构如下:

# crawler_v1.py
import requests
from bs4 import BeautifulSoup

def crawl_page(url):
    try:
        response = requests.get(url, timeout=10)
        soup = BeautifulSoup(response.text, 'html.parser')
        # 解析逻辑...
        return True
    except Exception as e:
        print(f"抓取失败: {str(e)}")
        return False

urls = ["https://example.com/news/1", "https://example.com/news/2"]
for url in urls:
    crawl_page(url)

改造后的监控增强版本:

# crawler_v2.py
from prometheus_client import start_http_server
import time

# 初始化指标
metrics = {
    'total_requests': Counter('crawler_requests_total', '总请求数'),
    'failed_requests': Counter('crawler_requests_failed', '失败请求数'),
    'duration': Histogram('crawler_request_duration_seconds', '请求耗时', 
                        buckets=(0.1, 0.5, 1, 2, 5)),
    'active_tasks': Gauge('crawler_active_tasks', '活跃任务数')
}

def monitor_wrapper(func):
    def wrapped(*args, **kwargs):
        metrics['active_tasks'].inc()
        start_time = time.time()
        
        try:
            result = func(*args, **kwargs)
            metrics['total_requests'].inc()
            return result
        except Exception as e:
            metrics['failed_requests'].inc()
            raise
        finally:
            metrics['active_tasks'].dec()
            metrics['duration'].observe(time.time() - start_time)
    return wrapped

@monitor_wrapper
def crawl_page(url):
    # 原有实现...

关键改造点:

  1. 使用装饰器模式实现无侵入埋点
  2. 通过Context Manager确保异常情况也能更新指标
  3. 分层统计总请求、失败请求和耗时分布

3. 高级监控技巧:多维度标签

当需要区分不同任务类型的指标时,Prometheus的标签系统就派上用场了。比如要监控不同新闻站点的抓取情况:

site_metrics = {
    'latency': Histogram('news_site_latency', '站点延迟',
                       ['site'],  # 站点标签
                       buckets=(0.1, 0.5, 1, 2, 5)),
    'errors': Counter('news_site_errors', '站点错误',
                    ['site', 'error_type'])  # 多维度标签
}

def track_site_metrics(site):
    def decorator(func):
        def wrapped(*args, **kwargs):
            start = time.time()
            try:
                return func(*args, **kwargs)
            except requests.HTTPError as e:
                site_metrics['errors'].labels(
                    site=site, 
                    error_type=f'http_{e.response.status_code}'
                ).inc()
            except Exception as e:
                site_metrics['errors'].labels(
                    site=site,
                    error_type=type(e).__name__
                ).inc()
            finally:
                site_metrics['latency'].labels(site=site).observe(
                    time.time() - start
                )
        return wrapped
    return decorator

使用示例:

@track_site_metrics(site="tech_news")
def crawl_tech_news(url):
    # 特定站点的抓取逻辑

4. 部署与可视化:完整监控方案

完成代码改造后,按以下步骤搭建完整监控链:

  1. 暴露指标端点

    if __name__ == '__main__':
        # 在8000端口暴露指标
        start_http_server(8000)
        # 启动爬虫任务
        start_crawler()
    
  2. Prometheus配置 (prometheus.yml):

    scrape_configs:
      - job_name: 'news_crawler'
        static_configs:
          - targets: ['crawler-host:8000']
        scrape_interval: 15s
    
  3. Grafana仪表板配置 : 使用以下PromQL查询创建面板:

    • 成功率: 1 - (sum(rate(crawler_requests_failed_total[5m])) / sum(rate(crawler_requests_total[5m])))
    • 95分位耗时: histogram_quantile(0.95, sum(rate(crawler_request_duration_seconds_bucket[5m])) by (le))
    • 各站点错误分布: sum by (site) (rate(news_site_errors_total[5m]))
  4. 告警规则示例

    groups:
    - name: crawler-alerts
      rules:
      - alert: HighFailureRate
        expr: rate(crawler_requests_failed_total[5m]) / rate(crawler_requests_total[5m]) > 0.05
        for: 10m
        labels:
          severity: page
        annotations:
          summary: "爬虫失败率超过5%"
    

5. 性能优化与陷阱规避

在实际生产环境中,还需要注意:

内存泄漏预防

# 错误示例:每次调用都创建新指标
def log_request():
    Counter('request_count', '计数').inc()  # 每次创建新实例!

# 正确做法:全局注册指标
REQUEST_COUNTER = Counter('request_count', '计数')
def log_request():
    REQUEST_COUNTER.inc()

高基数标签处理

# 危险:将用户ID作为标签
Counter('api_calls', '调用次数', ['user_id'])  # 可能导致内存爆炸

# 改进方案:预先聚合
Counter('api_calls_by_type', '调用次数', ['api_type'])  # 有限枚举值

多进程支持

from prometheus_client import multiprocess

def worker_exit(worker):
    multiprocess.mark_process_dead(worker.pid)

# 启动时设置环境变量
os.environ['PROMETHEUS_MULTIPROC_DIR'] = '/tmp/metrics'

6. 扩展监控场景

同样的监控模式可以应用到各类脚本:

定时任务监控

from apscheduler.schedulers.background import BackgroundScheduler

sched = BackgroundScheduler()

@sched.scheduled_job('interval', minutes=30)
def data_sync():
    with metrics['sync_duration'].time():
        metrics['sync_runs'].inc()
        # 同步逻辑...

sched.start()

批处理作业监控

def process_batch(items):
    with metrics['batch_size'].track_inprogress():
        metrics['batch_size'].observe(len(items))
        # 处理逻辑...
        metrics['items_processed'].inc(len(items))

在Kubernetes中部署时,还可以通过Pod注解自动发现监控目标:

annotations:
  prometheus.io/scrape: "true"
  prometheus.io/port: "8000"

更多推荐