背景:

​ 目前prometheus监控组件主要是通过其他组件提供的metrics接口来获取组件的指标监控项,然后设定相关规则(报警阈值),最后通过alertmanager将达到阈值的指标发送给相关运维人员(可以发送到微信,钉钉,邮件等)。

​ 那么,如果公司想通过prometheus监控自己开发的应用服务,并且该服务并没有提供metrics接口,就可以通过pushGateway的方式,将自定义的指标数据推送到网关,然后prometheus会定期从gateway拉取指标数据,从而达到目的。本案例就是通过python脚本监控自己的服务,将自定义的指标数据推送至网关来实现的。

import time
import requests
import json
from datetime import datetime
import socket
from prometheus_client import CollectorRegistry, Gauge, push_to_gateway

# 获取配置信息
pushAddress = 'http://localhost:9091'
statusField = 'status'
# flink任务正常运行的canal实例接口
flinkJobUrl = 'http://localhost:8199/data/getCanalTaskInstance'
metrics_help = 'flink_job_running_status and kafka and canal'
job = 'flink_job'
name = 'flink_job_running_status_test1'

print("本机ip为:", socket.gethostbyname(socket.gethostname()))

start0 = datetime.now()
# 根据接口获取label数据
try:
    response2 = requests.get(flinkJobUrl)
    flinkJobs = json.loads(response2.text)
    jobInfos = flinkJobs.get('data')
    print("共采集flinkJobs数:", len(jobInfos))
except Exception as e:
    print("flink任务canal任务实例获取失败")
print("获取数据耗时:", (datetime.now() - start0).seconds)


# label实体类
class CustomerLabel:
    def __init__(self, jobId, jobName, destination, topic, sinkType, datasourceName, dbName, creator, createTime,
                 status):
        self.jobId = jobId
        self.jobName = jobName
        self.destination = destination
        self.topic = topic
        self.sinkType = sinkType
        self.datasourceName = datasourceName
        self.dbName = dbName
        self.creator = creator
        self.createTime = createTime
        self.status = status


# 构造metrics数据
def push_metrics_gateway(url, job, metricsName, helps):
    registry = CollectorRegistry()
    labels = ['jobId', 'jobName', 'destination', 'topic', 'sinkType', 'datasourceName', 'dbName', 'creator',
              'createTime']
    labelObj = set()
    start1 = datetime.now()
    for data in jobInfos:
        label = CustomerLabel(data['job_id'], data['job_name'], data['destination'], data['topic'], data['sink_type'],
                              data['datasource_name'], data['db_name'], data['creator'], data['create_time'],
                              data[statusField])
        labelObj.add(label)
    gau = Gauge(metricsName, helps, labels, registry=registry)
    print("构造数据耗时:", (datetime.now() - start1).seconds)
    start2 = datetime.now()
    for obj in labelObj:
        gau.labels(obj.jobId, obj.jobName, obj.destination, obj.topic, obj.sinkType, obj.datasourceName, obj.dbName,
                   obj.creator, obj.createTime).set(int(obj.status))
        push_to_gateway(url, job=job, registry=registry, timeout=20)
    print("推送数据耗时:", (datetime.now() - start2).seconds)


# 推送数据到网关
try:
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "开始推送数据到Prometheus网关")
    push_metrics_gateway(pushAddress, job, name, metrics_help)
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + "metrics push gateway successfully")
except Exception as e:
    print(time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()) + " : metrics push gateway exception")

以上代码实现了通过数据服务接口从mysql数据库查询出来关键数据,然后组装成metrics特定的数据结构,推送到指定网关的案例。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐