记录Python生产和消费kafka
1、安装kafka的python包pip install kafka-python2、异步发送kafka数据import jsonfrom kafka import KafkaProducer, KafkaConsumerproducer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口msg = j
·
1、安装kafka的python包
pip install kafka-python
2、异步发送kafka数据
import json
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口
msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8
# 异步发送
producer.send(topic_name, value=msg) # 此处传入kafka的topic
producer.close()
3、同步发送kafka数据
import json
from kafka import KafkaProducer, KafkaConsumer
producer = KafkaProducer(bootstrap_servers=['xx.xx.xx.xx:xxxx']) # 此处传入kafka的地址和端口
msg = json.dumps(data).encode() # 必须要编码为字节类型的数据,不可以用utf-8
# 同步发送
future = producer.send(topic_name, value=msg) # 此处传入kafka的topic
try:
future.get(timeout=10) # 监控一定时间内是否发送成功
except Exception as e: # 发送失败抛出kafka_errors
print(e)
producer.close()
ps:
1)在异步发送中只有当前数据发送成功或者超时后才发送下一条数据,可以保证数据的发送顺序以及监控数据是否发生成功。
2)个人理解此处的同步和异步之间的区别在于“发送成功的结果”的返回是同步返回还是异步返回,如果是同步返回就是在数据发送后及时返回结果。
4、消费kafka
import json
from kafka import KafkaProducer, KafkaConsumer
consumer = KafkaConsumer(topic_name,
bootstrap_servers=['xx.xx.xx.xx:xxxx'],
auto_offset_reset='earliest', # 设置偏移方式,当参数值为earliest时从头开始消费;当参数值为latest时从最新数据开始消费
group_id='dev', consumer_timeout_ms=1000 # 该参数表示1000ms内如果没有新的数据产生则停止消费,否则会一直循环等待)
res = []
for msg in consumer:
msg = json.loads(msg.value)
res.append(msg)
更多推荐
已为社区贡献1条内容
所有评论(0)