1、问题:目前现网环境中使用到的kafka服务器是别人的,我们无法登入,现在想查看某一个topic的消费信息

当前服务器没有安装kafka应用程序,所以也无法使用kafka-console-consumer.sh来连接,写一个java程序来上传包在运行过于复杂,可以考虑使用python脚本来连接测试消费数据

首先 ,默认linux环境自带了python,我们只需要安装一个python的kafka的第三方库即可

# 上传kafka-1.3.5.tar.gz
[root@k8s-fengfan opt]# tar -zxvf kafka-1.3.5.tar.gz
[root@k8s-fengfan opt]# cd kafka-1.3.5
[root@k8s-fengfan kafka-1.3.5]# python setup.py install
# 安装完毕后可以使用python连接kafka

2、编写python脚本

kafka-consumer.py

from kafka import KafkaConsumer

consumer = KafkaConsumer('epic-choppp-receive', bootstrap_servers=['k8s-fengfan:9092'])
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv
  • epic-chopper-receive:订阅的topic
  • k8s-fengfan:9092:连接kafka的地址

3、启动kafka、运行python

  • ./kafka-console-producer.sh --broker-list k8s-fengfan:9092 --topic epic-choppp-receive
  • python consumer.py

image-20201224175557113

这样就可以简单测试kafka消费结果了

TIP:有时候会启动失败

image-20201224170613541

需要指定一下api_version

from kafka import KafkaConsumer

consumer = KafkaConsumer('epic-choppp-receive', bootstrap_servers=['k8s-fengfan:9092'],api_version=(0,10,1))
for msg in consumer:
    recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, msg.value)
    print recv
Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐