Kafka 消费消息回滚以及linux查询GroupId消费情况
用相同的GroupId消费已经消费过的信息,可以进行回滚。import timeimport redisimport jsonimport mathimport Queueimport osdef cont(t):time_local = time.localtime(t)dt = time.strftime("%Y-%m-%d %H:%M:%S",time...
·
用相同的GroupId消费已经消费过的信息,可以进行回滚。
import time
import redis
import json
import math
import Queue
import os
def cont(t):
time_local = time.localtime(t)
dt = time.strftime("%Y-%m-%d %H:%M:%S",time_local)
return dt
def seek2ts(ts,topic,consumer):
x= [0,0,0,0,0,0 ]
ts*=1000
m=[]
for y in range(6):
i=TopicPartition(topic, y)
x[y] = consumer.offsets_for_times({i:ts})[i][0]
m.append(i)
consumer.assign(m)
for y in range(6):
i=TopicPartition(topic, y)
consumer.seek(i,x[y])
return x
def main():
consumer = KafkaConsumer(bootstrap_servers=[#集群IP] , group_id ="")
seek2ts(1530748800#回滚到的时间戳,"#TOPIC",consumer)
consumer.poll()
consumer.commit()
print "done"
main()
在linux查询kafka中GroupId的消费情况:
/root/kafka_2.11-0.11.0.2/bin/kafka-run-class.sh kafka.admin.ConsumerGroupCommand --bootstrap-server k3:9092 --describe --group online_dist2ali
k3:9092 是kafka地址
online_dist2ali 是GroupId
更多推荐
已为社区贡献1条内容
所有评论(0)