1.获取数据

python version 2.7
假设我们要获取的Jenkins job名字为test_flow,该job触发了另外两个Jenkins job
test1 和test2 job.
我们要获取test_flow job的运行时间,状态,number,jobname.

我们把功能性函数写在一个文件中 Build class类中,方便主函数调用

python-jenkins模块
build.py
import jenkins
import ssl
import re
import datetime

ssl._create_default_https_context = ssl._create_unverified_context
#build 类主要获取Jenkins job参数
class Build:
    def __init__(self,jobname, url, number):
        self.jobname = jobname   
        self.number = number
        self.jenkins_url = url
        self.jenkins_server = jenkins.Jenkins(self.jenkins_url)
        self.build_console = ''
    def _getConsoleFromJenkins(self):
        self.build_console = self.jenkins_server.get_build_console_output(self.jobname, self.number)
        return self.build_console
    def get_next_job_build_number(self, name):
        number = 0
        status = "Running"
        if self.build_console == '' :
            self._getConsoleFromJenkins()
        pattern = re.compile(r'Starting building: ' + str(name) + ' #(.*)')
        match = re.search(pattern, self.build_console, 0)
        if match:
            numberstr = match.group(1)
            number = int(numberstr)
            status = "Completed"
        return {"job": name, "number": number,"status": status}
    def _setAttrFromJenkins(self):
            try:
                build_info = self.jenkins_server.get_build_info(self.jobname, self.number)
            except :
                return False
            self.href = build_info['url']
            self.duration = build_info['duration']
            self.build_time = build_info['timestamp']
            if build_info['building']:
                self.status = "Running"
            else:
                self.status = build_info['result']
            return True

    #获取jenkins job的各种信息然后组装到字典attr中
  	def attr_to_dict(self):
        attr = dict()
        if self._setAttrFromJenkins() == False:
            return None
        attr['job'] = self.jobname
        attr['number'] = self.number
        attr['href'] = self.href
        attr['duration'] = int(self.duration / 1000)
        attr['status'] = self.status
        attr['timestamp'] = self.build_time
        # cet time zone
        attr['build_time'] = datetime.datetime.utcfromtimestamp(float(self.build_time)/1000 + 3600).strftime('%Y-%m-%d %H:%M:%S')
        if self.status != "Running":
            attr['finish_time'] = datetime.datetime.utcfromtimestamp(float(self.build_time)/1000 + 3600 + float(self.duration)/1000).strftime('%Y-%m-%d %H:%M:%S')
        return attr
class Job:
    def __init__(self,jobname,url):
        self.jenkins_job = jobname
        self.jenkins_url = url
        self.jenkins_server = jenkins.Jenkins(self.jenkins_url)
    #获取最后完成的number
    def get_last_completed(self):
        job_info = self.jenkins_server.get_job_info(self.jenkins_job)
        if job_info is None:
            print("yes it is none \n")
            return 0
        return int(job_info['lastCompletedBuild']['number'])
    #获取jenkins job的已经运行的number队列
    def get_build_list(self):
        build = self.jenkins_server.get_job_info(self.jenkins_job)
        if build is None:
            print("yes it is none \n")
        return build['builds']

2.es数据库的创建

获取的数据需要存储到es数据库中,这里用到的是Elasticsearch6.01版本

es.py
from elasticsearch import Elasticsearch


class Elastic:
    def __init__(self, host, index):
        self.index = index
        self.es = Elasticsearch(host)

    def exists(self):
        return self.es.indices.exists(self.index)

    def create_index(self):
        build_mapping = {
            'properties': {
                'timestamp':{'type':'date'},
            }
        }
        create_index_body = {
            'setting': {
                'number_of_shards':1,
                'number_of_replicas':0
            },
            'mappings':{
                '_doc': {
                    'properties': {
                        'job':{'type':'keyword'},
                        'timestamp':{'type':'date'},
                        'status':{'type':'keyword'},
                        'item':{
                            'type':'object',
                            'properties':{
                                'test1':build_mapping,
                            }
                        }
                    }
                }
            }
        }

        try:
           self.es.indices.create(index=self.index,body=create_index_body)
        except TransportError as e:
            if e.error == 'index_already_exists_exception':
                pass
            else:
                raise
 	def save(self, body):
        try:
            self.es.index(self.index, doc_type='_doc', body=body )
        except :
            print("save error !!!")
    def update(self, _id, body):
        try:
            self.es.index(self.index, doc_type='_doc', id=_id, body=body )
        except :
            print("update error !!!")

3.主函数
getdata.py
#!/usr/local/bin/python
import re
import sys
import argparse
from elasticsearch import Elasticsearch
from module.es import *
from module.build import *

job_name = test_flow
jenkins_url = http://jenkins.test.com/jenkins
job = Job(job_name, jenkins_url)
doc = dict()
testjob = Build(job_name, jenkins_url, number) #建立对象
doc = testjob.attr_to_dict() #获取job参数存到字典中
es.save(doc) #把数据存储到es数据库中

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐