问题描述

我在项目里把原来用着的 独立消费者 consumer-group-id 同时当做消费者组来消费分区信息,导致协调器找不到这个 consumer-group-id

2022-12-14 16:33:31.908 ERROR 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Offset commit failed on partition REBALANCE-ONE-TOPIC-1 at offset 13: The coordinator is not aware of this member.
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.908  INFO 16020 --- [ntainer#0-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-spring-kafka-evo-consumer-001-9, groupId=spring-kafka-evo-consumer-001] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 16:33:31.909 ERROR 16020 --- [ntainer#0-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
	... 4 common frames omitted

报错复现方式

注册两个测试 topic

    @Bean
    public NewTopic testone() {
        return TopicBuilder.name("test-topic-group").partitions(2).replicas(1).build();
    }

    @Bean
    public NewTopic testtwo() {
        return TopicBuilder.name("test-topic-standalone").partitions(1).replicas(1).build();
    }

写一个消费者组


import lombok.extern.slf4j.Slf4j;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.PartitionOffset;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

@Component
@Slf4j
public class TestErrorConsumerListener {

    /**
     * 消费者组
     */
    @KafkaListener(groupId = "test-group", topicPartitions = {
        @TopicPartition(topic = "test-topic-group", partitions = "0", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
    })
    public void one(String value) {
        log.info("one:接收kafka消息:[{}]", value);
    }

    /**
     * 消费者组
     */
    @KafkaListener(groupId = "test-group", topicPartitions = {
        @TopicPartition(topic = "test-topic-group", partitions = "1", partitionOffsets = @PartitionOffset(partition = "*", initialOffset = "0"))
    })
    public void two(String value) {
        log.info("two:接收kafka消息:[{}]", value);
    }

    /**
     * 独立消费者
     */
    @KafkaListener(topics = "test-topic-standalone", groupId = "test-group")
    public void three(String value) {
        log.info("three:接收kafka消息:[{}]", value);
    }
}

启动项目,发送消息

this.template.send("test-topic-group", message);
this.template.send("test-topic-standalone", message);

报错日志

2022-12-14 19:04:18.835  INFO 13416 --- [tainer#13-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener    : one:接收kafka消息:[111111111]
2022-12-14 19:04:18.841  INFO 13416 --- [tainer#14-0-C-1] c.l.p.e.k.c.TestErrorConsumerListener    : three:接收kafka消息:[111111111]
2022-12-14 19:04:19.336  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Discovered group coordinator 192.168.136.136:9092 (id: 2147482646 rack: null)
2022-12-14 19:04:19.338 ERROR 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Offset commit failed on partition test-topic-group-0 at offset 1: The coordinator is not aware of this member.
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] OffsetCommit failed with Generation{generationId=-1, memberId='', protocol='null'}: The coordinator is not aware of this member.
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Resetting generation due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.338  INFO 13416 --- [tainer#13-0-C-1] o.a.k.c.c.internals.ConsumerCoordinator  : [Consumer clientId=consumer-test-group-15, groupId=test-group] Request joining group due to: encountered UNKNOWN_MEMBER_ID from OFFSET_COMMIT response
2022-12-14 19:04:19.342 ERROR 13416 --- [tainer#13-0-C-1] o.s.k.l.KafkaMessageListenerContainer    : Consumer exception

java.lang.IllegalStateException: This error handler cannot process 'org.apache.kafka.clients.consumer.CommitFailedException's; no record information is available
	at org.springframework.kafka.listener.DefaultErrorHandler.handleOtherException(DefaultErrorHandler.java:157) ~[spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.handleConsumerException(KafkaMessageListenerContainer.java:1812) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1301) [spring-kafka-2.8.9.jar:2.8.9]
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511) [na:1.8.0_251]
	at java.util.concurrent.FutureTask.run$$$capture(FutureTask.java:266) [na:1.8.0_251]
	at java.util.concurrent.FutureTask.run(FutureTask.java) [na:1.8.0_251]
	at java.lang.Thread.run(Thread.java:748) [na:1.8.0_251]
Caused by: org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1303) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1204) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1196) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1171) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:206) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:169) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:129) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:602) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:412) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:297) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:215) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.commitOffsetsSync(ConsumerCoordinator.java:1046) ~[kafka-clients-3.1.2.jar:na]
	at org.apache.kafka.clients.consumer.KafkaConsumer.commitSync(KafkaConsumer.java:1492) ~[kafka-clients-3.1.2.jar:na]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doCommitSync(KafkaMessageListenerContainer.java:3062) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitSync(KafkaMessageListenerContainer.java:3057) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.commitIfNecessary(KafkaMessageListenerContainer.java:3043) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.processCommits(KafkaMessageListenerContainer.java:2835) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1329) [spring-kafka-2.8.9.jar:2.8.9]
	at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1255) [spring-kafka-2.8.9.jar:2.8.9]
	... 4 common frames omitted

查看kafka服务器的消费者状态

$KAFKA_HOME/bin/kafka-consumer-groups.sh --bootstrap-server 127.0.0.1:9092 --group test-group --describe

GROUP           TOPIC                 PARTITION  CURRENT-OFFSET  LOG-END-OFFSET  LAG             CONSUMER-ID                                                 HOST            CLIENT-ID
test-group      test-topic-standalone 0          1               1               0               consumer-test-group-16-d1a0c068-a68d-456c-a958-97b44219ef1b /192.168.136.1  consumer-test-group-16

Logo

大数据从业者之家,一起探索大数据的无限可能!

更多推荐