1、安装kafka的python包

pip install kafka-python

2、异步发送kafka数据

import json
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口
msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8

# 异步发送
producer.send(topic_name, value=msg) # 此处传入kafka的topic
producer.close()

3、同步发送kafka数据

import json
from kafka import KafkaProducer, KafkaConsumer

producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口
msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8

# 同步发送
future = producer.send(topic_name, value=msg) # 此处传入kafka的topic
try:
     future.get(timeout=10)  # 监控一定时间内是否发送成功
 except Exception as e:  # 发送失败抛出kafka_errors
     print(e)
 producer.close()

ps:
1)在异步发送中只有当前数据发送成功或者超时后才发送下一条数据,可以保证数据的发送顺序以及监控数据是否发生成功。
2)个人理解此处的同步和异步之间的区别在于“发送成功的结果”的返回是同步返回还是异步返回,如果是同步返回就是在数据发送后及时返回结果。

4、消费kafka

import json
from kafka import KafkaProducer, KafkaConsumer

consumer = KafkaConsumer(topic_name,
                             bootstrap_servers=['xx.xx.xx.xx:xxxx'],
                             auto_offset_reset='earliest', # 设置偏移方式,当参数值为earliest时从头开始消费;当参数值为latest时从最新数据开始消费
                             group_id='dev', consumer_timeout_ms=1000 # 该参数表示1000ms内如果没有新的数据产生则停止消费,否则会一直循环等待)
res = []
for msg in consumer:
    msg = json.loads(msg.value)
    res.append(msg)
Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐