Python多线程写Kafka
#!/usr/bin/env python# -*- coding: utf-8 -*-# @Author: 风过无言花易落# @Date: 2019/9/26 14:00# @Desc: kafka生产脚本(已topic作为启动线程数,消息总数会平均分配入topic中)from kafka import KafkaProducerfrom kafka.errors import KafkaErr
·
#!/usr/bin/env python
# -*- coding: utf-8 -*-
# @Author: 风过无言花易落
# @Date : 2019/9/26 14:00
# @Desc : kafka生产脚本(已topic作为启动线程数,消息总数会平均分配入topic中)
from kafka import KafkaProducer
from kafka.errors import KafkaError
import json
import timeit,time
import threading
import random
import num
global numg
numg = num.nummsg
class Kafka_producer():
'''
使用kafka的生产模块
'''
def __init__(self, kafkahost, kafkatopic):
self.kafkaHost = kafkahost
self.kafkatopic = kafkatopic
self.producer = KafkaProducer(bootstrap_servers='{kafka_host}'.format(
kafka_host=self.kafkaHost
))
def sendjsondata(self, params):
try:
parmas_message = json.dumps(params)
producer = self.producer
producer.send(self.kafkatopic, parmas_message.encode('utf-8'))
# flush(), 强制刷新,扫尾工作,主要是为了,让数据流在管道的传输工程中全部传输过去
# producer.flush() # 在不考虑数据安全的情况可以注掉,影响效率
except KafkaError as e:
print(e)
def __del__(self):
self.producer.close()
def cdrbody():
number,imsi = random.choice(numg)
plmn = 46001
prov = 74
up_vol = random.randint(1, 157286) # 上行流量
down_vol = random.randint(1, 157286) # 上行流量
nowDate = time.strftime('%Y%m%d', time.localtime(time.time()))
nowTime = time.strftime('%H%M%S', time.localtime(time.time()))
return number,imsi,plmn,nowDate,nowTime,up_vol,down_vol ,prov
def main(host,count, topic, num, random_type):
'''
producer mes
:return:
'''
# 测试生产模块
producer = Kafka_producer(host, topic)
# 平均插入的消息数
for i in range(count):
msg_body = []
if random_type:
enum = random.randint(1,num)
else:
enum = num
for x in range(enum):
number, imsi, plmn, nowDate, nowTime, up_vol, down_vol ,prov = cdrbody()
cdr = {
"CDR": "3,PC1545036215.2052611843.2000.805306365.VISIT,11,0,86{},{},116.79.197.0,,1697867058,1467184619,116.79.197.0,3GWAP,,0,{},,{},{},1,1234569011,{},{},,46001128251697867058,6"
.format(number, imsi, plmn, nowDate, nowTime, up_vol, down_vol)
}
msg_body.append(cdr)
params = {
"HEADER": {
"PROV_CODE": prov,
"SERV_TYPE": "GPP",
"DATETIME": "20190926145248",
"RECORD_NUM": enum
},
"MBODY": msg_body
}
print(params)
# with open('mes.log','a') as fobj:
# fobj.write(str(params) + '\n')
producer.sendjsondata(params)
class Creat_Thread(threading.Thread):
def __init__(self, host, count, tName, num, random_type):
threading.Thread.__init__(self)
self.count = count
self.tName = tName
self.host = host
self.num = num
self.random_type = random_type
def run(self):
start = timeit.default_timer()
print("Starting-Topic: " + self.tName)
main(self.host,self.count, self.tName, self.num, random_type)
end = timeit.default_timer()
print("Exiting-Topic:" + self.tName, '--Running time: %s Seconds' % (end - start),self.count)
if __name__ == '__main__':
host = "10.111.150.192:9092,10.111.150.93:9092,10.111.150.94:9092"
#*************************************************************************************************************#
topic_list = ['YH-XXJF-74'] # topic,多个topic以逗号分隔
random_type = False # 随机类型 True :随机,False;不随机
count_ms = 1 # 消息总数
num = 25 # 消息体中的话单数,当random_type为真时此变量作为单条消息最大随机条数
# *************************************************************************************************************#
topic_nu = len(topic_list) # 获取topic总数
count = int(count_ms / topic_nu)
# 创建新线程
threads = []
for tName in topic_list:
thread = Creat_Thread(host,count, tName, num, random_type)
thread.start()
threads.append(thread)
# 等待所有线程完成
for t in threads:
t.join()
print("Exiting Main Thread")
更多推荐
已为社区贡献3条内容
所有评论(0)