python 接入钉钉群告警
背景1 环境 python3.72 邮件控制服务(由于公司用了zabbix(服务器基础设施),elk(业务报警),promethues alertManager(主要是k8s上面使用),grafana(部分promethues收集的数据也由此处报,此处设置报警更为简单,但弊端在于当图形有变量的时候则无法进行报警,待后续版本的优化),报警不进行控制,一天产生的将会有近3k封,单独设置则会...
背景
1 环境 python3.7
2 邮件控制服务(由于公司用了zabbix(服务器基础设施),elk(业务报警),promethues alertManager(主要是k8s上面使用),grafana(部分promethues收集的数据也由此处报,此处设置报警更为简单,但弊端在于当图形有变量的时候则无法进行报警,待后续版本的优化),报警不进行控制,一天产生的将会有近3k封,单独设置则会增加工作量,就直接写了个过滤器(也是python),后面有需求需要报警到钉钉)
3 邮件通过钉钉告警
钉钉告警的官方链接
https://ding-doc.dingtalk.com/doc#/serverapi3/iydd5h 钉钉自定义机器人开发平台
注意:
1 每个机器人每分钟最多发送20条。如果超过20条,会限流10分钟
一 、开通钉钉机器人
1 点击右上角群设置,智能群助手
2 点击添加机器人
3 点击按钮+
4 点击自定义
5 单击添加。
7 完成安全设置(至少选择一种)。有关安全配置详细内容,请参见机器人开发的安全配置节点。
勾选我已阅读并同意《自定义机器人服务及免责条款》,并单击完成。
8 机器人创建成功后,您可以单击复制,复制其webhook地址;或单击设置说明,查看机器人设置、调试等说明文档。
9 节点输出
调用成功的输出数据为钉钉API返回的数据,如下示例。具体钉钉API返回数据,请参见钉钉开发文档。
{
"errmsg": "ok",
"errcode": 0
}
大部分的代码都是可以通过钉钉的说明文档即可完整做出,但是为了方便还是记录一下。
#_*_coding:utf-8_*_
# Author: yehuai
# Creation time: 2020/1/15 14:40
import base64
import hashlib
import hmac
import time
import urllib
import requests
import json
TOKEN = "通过钉钉群,添加群机器人,然后在里面的"
SECRET = "钉钉机器人中可以获取到"
headers = {'Content-Type': 'application/json;charset=utf-8'}
atmobile_list = [
"#手机号,每次邮件会艾特此人",
""
],
def get_url():
# 按官方的要求进行加密,并且生成完成的链接,官方是py2,此处是py3
# 钉钉官方要求,请求的url中必须携带三个参数,access_token, timestamp,sign(签名是由secret加密而来)
timestamp = round(time.time() * 1000)
secret_enc = secret.encode('utf-8')
string_to_sign = '{}\n{}'.format(timestamp, SECRET)
string_to_sign_enc = string_to_sign.encode('utf-8')
hmac_code = hmac.new(secret_enc, string_to_sign_enc, digestmod=hashlib.sha256).digest()
sign = urllib.parse.quote_plus(base64.b64encode(hmac_code))
# 完整的url
api_url = "https://oapi.dingtalk.com/robot/send?access_token={}×tamp={}&sign={}".format(TOKEN, timestamp, sign)
return api_url
def ddmsgsend(text):
api_url = get_url()
# 传输方式为文本
json_text = {
"msgtype": "text",
# 艾特人的方式
"at": {
# 艾特人按手机艾特
"atMobiles": atmobile_list,
"isAtAll": False
},
# 发送文本
"text": {
"content": text
}
}
# 请求url
return requests.post(api_url, json.dumps(json_text), headers=headers).content.decode("utf8")
if __name__ == '__main__':
text = "微服务报警(测试) \n" \
"告警时间:2020-01-15 14:00:55 \n" \
"告警服务:数据同步服务微服务;\n" \
"replicas:5个;\n" \
"当前存活节点:1个;\n" \
"存活节点 id:['data-sync-service-10-244-88-18-20200'] \n" \
"事件等级:highest;\n" \
"告警信息:data-sync-service replicas 与存活节点不一致,请及时处理"
res = ddmsgsend(text)
print(res)
更多推荐
所有评论(0)