1、报错信息:

org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {qukan_log_v3-198=2289560518}

报错原因:当消费者消费offset大于或小于当前kafka集群的offset值时,消费会报错(比如场景一:一个consumer group消费某topic,当consumer group间隔几天不消费,Kafka内部数据会自动清除之前的数据,程序再次启动时,会找之前消费到的offset进行消费,此时,若Kafka已经删除此offset值,就会产生此报错。场景二:consumer group消费一直有积压,topic保留时间为1hour,当积压的数据已经被删除,消费到被删除的数据时,会出现找不到offset情况,然后报此错误)。

解决办法:换个groupid进行消费或者解决积压问题

2、报错信息:

kafka: error while consuming
qukan_client_collect_cmd_8037_v3/23:
lz4: invalid header checksum: got 1a; expected 82

原因:sarama包版本太低,不能解压缩lz4

解决办法:

config := sarama.NewConfig()
config.Version = sarama.V2_1_0_0 (换成对应的Kafka版本)

3、报错信息:

kafka server: The client is not authorized to access this topic.

原因:带acl认证的Kafka未授权

4、报错信息:

the compression code is invalid or its codec has not been imported kafka-go

原因:当用户用kafka-go消费topic时,consumer不能自动解压缩。因此加上下面代码就能解决;

解决办法:

                          lz4.NewCompressionCodec()   // 加上这行
                          r := kafka.NewReader(kafka.ReaderConfig{
                                 Brokers: []string{"test:9092"},
                                 Topic: "test",
                                 GroupID: "test",
                           })      

5、报错信息:

panic: kafka: client has run out of available brokers to talk to (Is your cluster reachable?)

可能原因:1)网络不通,导致域名解析不了;2)连接kafka配置有错误

6、kafka consumer或producer报错:

java.lang.Exception: java.lang.IllegalStateException: No entry found for connection 11
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask$LegacySourceFunctionThread.checkThrowSourceExecutionException(SourceStreamTask.java:217)
  at org.apache.flink.streaming.runtime.tasks.SourceStreamTask.processInput(SourceStreamTask.java:133)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.run(StreamTask.java:301)
  at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:406)
  at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:705)
  at org.apache.flink.runtime.taskmanager.Task.run(Task.java:530)
  at java.lang.Thread.run(Thread.java:748)
Caused by: java.lang.IllegalStateException: No entry found for connection 11
  at org.apache.kafka.clients.ClusterConnectionStates.nodeState(ClusterConnectionStates.java:339)
  at org.apache.kafka.clients.ClusterConnectionStates.disconnected(ClusterConnectionStates.java:143)
  at org.apache.kafka.clients.NetworkClient.initiateConnect(NetworkClient.java:926)
  at org.apache.kafka.clients.NetworkClient.access$700(NetworkClient.java:67)
  at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:1090)
  at org.apache.kafka.clients.NetworkClient$DefaultMetadataUpdater.maybeUpdate(NetworkClient.java:976)
  at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:533)
  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:265)
  at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:236)
  at org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1256)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1200)
  at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1135)
  at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)

详见:flink任务常见报错及解决办法_yiweiyi329的博客-CSDN博客

7、prodecer端报错:

Caused by: org.apache.kafka.common.errors.TimeoutException: Expiring 57 record(s) for topictest-0: xxx ms has passed since last append

可能原因:首先查看kafka集群是否正常,则可能是以下原因

Expiring 1 record(s) for 2:xxx ms has passed since batch问题研究_willfly_cui的博客-程序员资料 - 程序员资料

8、flink消费数据,提交不了offset到kafka,报错如下:


throwable:org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1151)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:1081)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1026)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:1006)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:204)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:167)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:127)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.fireCompletion(ConsumerNetworkClient.java:599)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.firePendingCompletedRequests(ConsumerNetworkClient.java:409)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:294)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:233)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.pollForFetches(KafkaConsumer.java:1300)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1240)
at org.apache.flink.kafka.shaded.org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1168)
at org.apache.flink.streaming.connectors.kafka.internal.KafkaConsumerThread.run(KafkaConsumerThread.java:253)
message:Consumer subtask 80 failed async Kafka commit

详见:flink任务常见报错及解决办法_yiweiyi329的博客-CSDN博客

9、spark消费kafka报如下错误:

22/02/20 18:29:57 WARN kafka.clients.consumer.internals.AbstractCoordinator: 
[Consumer clientId=consumer-2, groupId=spark-kafka-source-8b5414cd-00e7-4af1-bef1-f450886a6011--28838279-driver-0] 
This member will leave the group because consumer poll timeout has expired. 
This means the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, 
which typically implies that the poll loop is spending too much time processing messages. 
You can address this either by increasing max.poll.interval.ms or by reducing the maximum size of batches returned in poll() with max.poll.records.

原因:异常的原因是消费超时,导致消费线程长时间无法向Coordinator节点发送心跳,Coordinator节点以为Consumer已经宕机,Coordinator于是将Consumer节点从消费组中剔除,并触发了Rebalance机制。其实这和Consumer的心跳发送机制也有关系。在大多数中间件的设计中都会分离业务线程和心跳发送线程,目的就是避免业务线程长时间执行业务流程,导致长时间无法发送心跳。但是kafka却没有这样做,kafka的目的可能是为了实现简单。

解决:优化处理过程中的耗时操作

如果消费者消费业务确实需要非常长时间,我们可以通过参数max.poll.interval.ms配置,它代表消费两次poll最大的时间间隔;

或者我们可以减少consumer每次从broker拉取的数据量,通过参数max.poll.records配置

10、flink消费kafka任务启动失败,报错如下:

Caused by: org.apache.kafka.common.KafkaException: Unexpected error in InitProducerIdResponse; The transaction timeout is larger than the maximum value allowed by the broker (as configured by transaction.max.timeout.ms).
    at org.apache.kafka.clients.producer.internals.TransactionManager$InitProducerIdHandler.handleResponse(TransactionManager.java:1151)
    at org.apache.kafka.clients.producer.internals.TransactionManager$TxnRequestHandler.onComplete(TransactionManager.java:1074)
    at org.apache.kafka.clients.ClientResponse.onComplete(ClientResponse.java:109)
    at org.apache.kafka.clients.NetworkClient.completeResponses(NetworkClient.java:569)
    at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:561)
    at org.apache.kafka.clients.producer.internals.Sender.maybeSendAndPollTransactionalRequest(Sender.java:425)
    at org.apache.kafka.clients.producer.internals.Sender.runOnce(Sender.java:311)
    at org.apache.kafka.clients.producer.internals.Sender.run(Sender.java:244)
    ... 1 more

原因:Kafka brokers 默认的最大事务超时(transaction.max.timeout.ms)为15 minutes
生产者设置事务超时不允许大于这个值,这个属性不允许为大于其值的
但是在默认的情况下,FlinkKafkaProducer011设置事务超时属性(transaction.timeout.ms)为1 hour, 超过默认transaction.max.timeout.ms15 minutes。
因此在使用EXACTLY_ONCE语义的时候需要增大transaction.max.timeout.ms或者减小任务的transaction.timeout.ms的值。

解决:kafkaPro.setProperty("transaction.timeout.ms","1000*60*5")

持续更新中...

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐