kafka-报错-The coordinator is not aware of this member
kafka-报错-“The coordinator is not aware of this member”。我在项目里把原来用着的 独立消费者 consumer-group-id 同时当做消费者组来消费分区信息,导致协调器找不到这个 consumer-group-id
·
问题描述
我在项目里把原来用着的 独立消费者 consumer-group-id 同时当做消费者组来消费分区信息,导致协调器找不到这个 consumer-group-id
2022-12-14 16:33:31.908 ERROR 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Offset commit failed on partition REBALANCE-ONE-TOPIC-1 at offset 13: The coordinator is not aware of this member.
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.908 INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.909 ERROR 16020 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
... 4 common frames omitted
报错复现方式
注册两个测试 topic
@Bean
public NewTopic testone() {
return TopicBuilder.name("test-topic-group").partitions(2).replicas(1).build();
}
@Bean
public NewTopic testtwo() {
return TopicBuilder.name("test-topic-standalone").partitions(1).replicas(1).build();
}
写一个消费者组
import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;
@Component
@Slf4j
public class TestErrorConsumerListener {
/**
* 消费者组
*/
@KafkaListener(groupId = "test-group", topicPartitions = {
@TopicPartition(topic = "test-topic-group", partitions = "0", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void one(String value) {
log.info("one:接收kafka消息:[{}]", value);
}
/**
* 消费者组
*/
@KafkaListener(groupId = "test-group", topicPartitions = {
@TopicPartition(topic = "test-topic-group", partitions = "1", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
})
public void two(String value) {
log.info("two:接收kafka消息:[{}]", value);
}
/**
* 独立消费者
*/
@KafkaListener(topics = "test-topic-standalone", groupId = "test-group")
public void three(String value) {
log.info("three:接收kafka消息:[{}]", value);
}
}
启动项目,发送消息
this.template.send("test-topic-group", message);
this.template.send("test-topic-standalone", message);
报错日志
2022-12-14 19:04:18.835 INFO 13416 --- [tainer#13-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener : one:接收kafka消息:[111111111]
2022-12-14 19:04:18.841 INFO 13416 --- [tainer#14-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener : three:接收kafka消息:[111111111]
2022-12-14 19:04:19.336 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Discovered group coordinator 192.168.136.136:9092 (id: 2147482646 rack: null)
2022-12-14 19:04:19.338 ERROR 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Offset commit failed on partition test-topic-group-0 at offset 1: The coordinator is not aware of this member.
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.338 INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator : [Consumer clientId=consumer-test-group-15, groupId=test-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.342 ERROR 13416 --- [tainer#13-0-C-1] o.s.k.l.KafkaMessageListenerContainer : Consumer exception
java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
... 4 common frames omitted
查看kafka服务器的消费者状态
$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test-group test-topic-standalone 0 1 1 0 consumer-test-group-16-d1a0c068-a68d-456c-a958-97b44219ef1b /192.168.136.1 consumer-test-group-16
更多推荐
已为社区贡献1条内容
所有评论(0)