#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: 风过无言花易落
# @Date  : 2019/9/26 14:00
# @Desc  : kafka生产脚本(已topic作为启动线程数,消息总数会平均分配入topic中)

from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import timeit,time
import threading
import random
import num
global numg
numg = num.nummsg

class Kafka_producer():
    '''
    使用kafka的生产模块
    '''
    def __init__(self, kafkahost, kafkatopic):
        self.kafkaHost = kafkahost
        self.kafkatopic = kafkatopic
        self.producer = KafkaProducer(bootstrap_servers='{kafka_host}'.format(
            kafka_host=self.kafkaHost
        ))

    def sendjsondata(self, params):
        try:
            parmas_message = json.dumps(params)
            producer = self.producer
            producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
            # flush(), 强制刷新,扫尾工作,主要是为了,让数据流在管道的传输工程中全部传输过去
            # producer.flush() # 在不考虑数据安全的情况可以注掉,影响效率
        except KafkaError as e:
            print(e)

    def __del__(self):
        self.producer.close()

def cdrbody():
    number,imsi = random.choice(numg)
    plmn = 46001
    prov = 74
    up_vol = random.randint(1, 157286)   # 上行流量
    down_vol = random.randint(1, 157286) # 上行流量
    nowDate = time.strftime('%Y%m%d', time.localtime(time.time()))
    nowTime = time.strftime('%H%M%S', time.localtime(time.time()))
    return number,imsi,plmn,nowDate,nowTime,up_vol,down_vol ,prov

def main(host,count, topic, num, random_type):
    '''
    producer mes
    :return:
    '''
    # 测试生产模块
    producer = Kafka_producer(host, topic)
    # 平均插入的消息数
    for i in range(count):
        msg_body = []
        if random_type:
            enum = random.randint(1,num)
        else:
            enum = num
        for x in range(enum):
            number, imsi, plmn, nowDate, nowTime, up_vol, down_vol ,prov = cdrbody()
            cdr = {
                "CDR": "3,PC1545036215.2052611843.2000.805306365.VISIT,11,0,86{},{},116.79.197.0,,1697867058,1467184619,116.79.197.0,3GWAP,,0,{},,{},{},1,1234569011,{},{},,46001128251697867058,6"
                .format(number, imsi, plmn, nowDate, nowTime, up_vol, down_vol)
                    }
            msg_body.append(cdr)
        params = {
            "HEADER": {
                "PROV_CODE": prov,
                "SERV_TYPE": "GPP",
                "DATETIME": "20190926145248",
                "RECORD_NUM": enum
            },
            "MBODY": msg_body
        }
        print(params)
        # with open('mes.log','a') as fobj:
        #     fobj.write(str(params) + '\n')
        producer.sendjsondata(params)

class Creat_Thread(threading.Thread):
    def __init__(self, host, count, tName, num, random_type):
        threading.Thread.__init__(self)
        self.count = count
        self.tName = tName
        self.host = host
        self.num = num
        self.random_type = random_type

    def run(self):
        start = timeit.default_timer()
        print("Starting-Topic: " + self.tName)
        main(self.host,self.count, self.tName, self.num, random_type)
        end = timeit.default_timer()
        print("Exiting-Topic:" + self.tName, '--Running time: %s Seconds' % (end - start),self.count)

if __name__ == '__main__':
    host = "10.111.150.192:9092,10.111.150.93:9092,10.111.150.94:9092"
    #*************************************************************************************************************#
    topic_list = ['YH-XXJF-74']  # topic,多个topic以逗号分隔

    random_type = False # 随机类型 True :随机,False;不随机

    count_ms = 1 # 消息总数

    num = 25 # 消息体中的话单数,当random_type为真时此变量作为单条消息最大随机条数

    # *************************************************************************************************************#
    topic_nu = len(topic_list)  # 获取topic总数
    count = int(count_ms / topic_nu)
    # 创建新线程
    threads = []
    for tName in topic_list:
        thread = Creat_Thread(host,count, tName, num, random_type)
        thread.start()
        threads.append(thread)
    # 等待所有线程完成
    for t in threads:
        t.join()
    print("Exiting Main Thread")

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐