Spring kafka - Failed to construct kafka consumer
Answer a question
I'm having an issue with getting my Kafka / confluent spring boot with gradle project up and running. I originally had just a producer in this test project and everything was running well. I then added a Kafka consumer and now I get an exception on start up. Would anyone be able to spot the problem here:
Firstly this is the stacktrace
2021-01-22 19:56:08.566 WARN 61123 --- [ main] ConfigServletWebServerApplicationContext : Exception encountered during context initialization - cancelling refresh attempt: org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
2021-01-22 19:56:08.573 INFO 61123 --- [ main] j.LocalContainerEntityManagerFactoryBean : Closing JPA EntityManagerFactory for persistence unit 'default'
2021-01-22 19:56:08.575 INFO 61123 --- [ main] o.s.s.concurrent.ThreadPoolTaskExecutor : Shutting down ExecutorService 'applicationTaskExecutor'
2021-01-22 19:56:08.576 INFO 61123 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown initiated...
2021-01-22 19:56:08.584 INFO 61123 --- [ main] com.zaxxer.hikari.HikariDataSource : HikariPool-1 - Shutdown completed.
2021-01-22 19:56:08.586 INFO 61123 --- [ main] o.apache.catalina.core.StandardService : Stopping service [Tomcat]
2021-01-22 19:56:08.597 INFO 61123 --- [ main] ConditionEvaluationReportLoggingListener :
Error starting ApplicationContext. To display the conditions report re-run your application with 'debug' enabled.
2021-01-22 19:36:06.216 ERROR 61013 --- [ main] o.s.boot.SpringApplication : Application run failed
org.springframework.context.ApplicationContextException: Failed to start bean 'org.springframework.kafka.config.internalKafkaListenerEndpointRegistry'; nested exception is org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:185)
at org.springframework.context.support.DefaultLifecycleProcessor.access$200(DefaultLifecycleProcessor.java:53)
at org.springframework.context.support.DefaultLifecycleProcessor$LifecycleGroup.start(DefaultLifecycleProcessor.java:360)
at org.springframework.context.support.DefaultLifecycleProcessor.startBeans(DefaultLifecycleProcessor.java:158)
at org.springframework.context.support.DefaultLifecycleProcessor.onRefresh(DefaultLifecycleProcessor.java:122)
at org.springframework.context.support.AbstractApplicationContext.finishRefresh(AbstractApplicationContext.java:895)
at org.springframework.context.support.AbstractApplicationContext.refresh(AbstractApplicationContext.java:554)
at org.springframework.boot.web.servlet.context.ServletWebServerApplicationContext.refresh(ServletWebServerApplicationContext.java:143)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:758)
at org.springframework.boot.SpringApplication.refresh(SpringApplication.java:750)
at org.springframework.boot.SpringApplication.refreshContext(SpringApplication.java:397)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:315)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1237)
at org.springframework.boot.SpringApplication.run(SpringApplication.java:1226)
at com.test.DevlyAuthServiceApplication.main(DevlyAuthServiceApplication.java:39)
Caused by: org.apache.kafka.common.KafkaException: Failed to construct kafka consumer
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:825)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:631)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createRawConsumer(DefaultKafkaConsumerFactory.java:358)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:326)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumerWithAdjustedProperties(DefaultKafkaConsumerFactory.java:302)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createKafkaConsumer(DefaultKafkaConsumerFactory.java:269)
at org.springframework.kafka.core.DefaultKafkaConsumerFactory.createConsumer(DefaultKafkaConsumerFactory.java:243)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.<init>(KafkaMessageListenerContainer.java:639)
at org.springframework.kafka.listener.KafkaMessageListenerContainer.doStart(KafkaMessageListenerContainer.java:305)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.listener.ConcurrentMessageListenerContainer.doStart(ConcurrentMessageListenerContainer.java:204)
at org.springframework.kafka.listener.AbstractMessageListenerContainer.start(AbstractMessageListenerContainer.java:338)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.startIfNecessary(KafkaListenerEndpointRegistry.java:312)
at org.springframework.kafka.config.KafkaListenerEndpointRegistry.start(KafkaListenerEndpointRegistry.java:257)
at org.springframework.context.support.DefaultLifecycleProcessor.doStart(DefaultLifecycleProcessor.java:182)
... 14 common frames omitted
Caused by: java.lang.NoClassDefFoundError: org/apache/kafka/common/requests/IsolationLevel
at io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor.configure(MonitoringConsumerInterceptor.java:46)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstance(AbstractConfig.java:376)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:436)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:417)
at org.apache.kafka.common.config.AbstractConfig.getConfiguredInstances(AbstractConfig.java:404)
at org.apache.kafka.clients.consumer.KafkaConsumer.<init>(KafkaConsumer.java:711)
... 28 common frames omitted
Caused by: java.lang.ClassNotFoundException: org.apache.kafka.common.requests.IsolationLevel
at java.base/jdk.internal.loader.BuiltinClassLoader.loadClass(BuiltinClassLoader.java:602)
at java.base/jdk.internal.loader.ClassLoaders$AppClassLoader.loadClass(ClassLoaders.java:178)
at java.base/java.lang.ClassLoader.loadClass(ClassLoader.java:522)
... 34 common frames omitted
This is my build.gradle:
plugins {
id 'org.springframework.boot' version '2.3.2.RELEASE'
id 'io.spring.dependency-management' version '1.0.8.RELEASE'
id 'java'
id "com.commercehub.gradle.plugin.avro" version "0.21.0"
id "idea"
}
group 'com.test.tge-auth-service'
version '1.0'
java {
sourceCompatibility = JavaVersion.VERSION_14
targetCompatibility = JavaVersion.VERSION_14
}
ext {
avroVersion = "1.10.1"
}
repositories {
mavenCentral()
jcenter()
maven {
url "https://packages.confluent.io/maven/"
}
}
avro {
createSetters = true
fieldVisibility = "PRIVATE"
}
dependencies {
// providedRuntime 'org.springframework.boot:spring-boot-starter-tomcat'
compile group: 'co.elastic.logging', name: 'logback-ecs-encoder', version: '0.5.2'
compile group: 'com.amazonaws', name: 'aws-java-sdk', version: '1.11.860'
compile group: 'io.springfox', name: 'springfox-swagger-ui', version: '3.0.0'
compile group: 'io.springfox', name: 'springfox-boot-starter', version: '3.0.0'
compile('org.springframework.boot:spring-boot-starter-data-elasticsearch')
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-mongodb', version: '2.3.3.RELEASE'
compile group: 'org.springframework.data', name: 'spring-data-elasticsearch', version: '4.0.4.RELEASE'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-data-jpa', version: '2.3.3.RELEASE'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-security', version: '2.3.3.RELEASE'
compile group: 'org.springframework.security', name: 'spring-security-oauth2-client', version: '5.4.0'
compile group: 'org.springframework.boot', name: 'spring-boot-starter-validation', version: '2.4.2'
compile group: 'org.springframework.kafka', name: 'spring-kafka', version: '2.6.5'
compile group: 'javax.validation', name: 'validation-api', version: '2.0.1.Final'
compile group: 'com.fasterxml.jackson.datatype', name: 'jackson-datatype-jsr310', version: '2.11.2'
compile group: 'mysql', name: 'mysql-connector-java', version: '8.0.21'
compile group: 'io.jsonwebtoken', name: 'jjwt', version: '0.9.1'
compile group: 'org.openapitools', name: 'jackson-databind-nullable', version: '0.2.1'
compile group: 'commons-io', name: 'commons-io', version: '2.6'
compile group: 'org.apache.commons', name: 'commons-collections4', version: '4.4'
compile group: 'org.apache.commons', name: 'commons-lang3', version: '3.11'
compile group: 'org.passay', name: 'passay', version: '1.6.0'
compile group: 'com.google.guava', name: 'guava', version: '30.0-jre'
compile group: 'io.confluent', name: 'kafka-schema-registry-client', version: '5.4.0'
compile group: 'io.confluent', name: 'kafka-avro-serializer', version: '5.4.0'
compile group: 'io.confluent', name: 'monitoring-interceptors', version: '5.4.0'
compile(group: 'io.confluent', name: 'kafka-streams-avro-serde', version:'5.4.0') {
exclude(module: 'log4j-over-slf4j')
}
compile "org.apache.avro:avro:1.10.1"
implementation "org.apache.avro:avro:${avroVersion}"
compileOnly 'org.projectlombok:lombok:1.18.12'
annotationProcessor 'org.projectlombok:lombok:1.18.12'
implementation 'com.amazonaws:aws-java-sdk-s3'
implementation 'org.springframework.boot:spring-boot-starter-web'
testCompile group: 'junit', name: 'junit', version: '4.12'
testCompileOnly 'org.projectlombok:lombok:1.18.12'
testAnnotationProcessor 'org.projectlombok:lombok:1.18.12'
testImplementation('org.springframework.boot:spring-boot-starter-test') {
exclude group: 'org.junit.vintage', module: 'junit-vintage-engine'
}
jar {
manifest {
attributes(
'Main-Class': 'com.test.SpringBootPersistenceApplication'
)
}
from {
configurations.runtimeClasspath.collect { it.isDirectory() ? it : zipTree(it) }
}
}
}
Here is my producer which works:
import com.test.messages.avro.model.User;
import lombok.RequiredArgsConstructor;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
@CommonsLog(topic = "Producer Logger")
@RequiredArgsConstructor
public class ProducerK {
@Value("${topic.name}")
private String TOPIC;
private final KafkaTemplate<String, User> kafkaTemplate;
void sendMessage(User user) {
this.kafkaTemplate.send(this.TOPIC, "key", user);
log.info(String.format("Produced user -> %s", user));
}
}
And finally here is my Consumer:
import com.test.messages.avro.model.User;
import lombok.extern.apachecommons.CommonsLog;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Service;
import java.io.IOException;
@Service
@CommonsLog(topic = "Consumer Logger")
public class Consumer {
@KafkaListener(
topics = "#{'${topic.name}'}",
groupId = "simple-consumer"
)
public void consume(User record) throws IOException {
log.info(String.format("Consumed message -> %s", record));
}
}
If its of any help here is also my application.yaml file:
topic:
name: users-2kb
partitions-num: 3
replication-factor: 1
spring:
kafka:
properties:
bootstrap.servers: localhost:9092
schema.registry.url: http://localhost:8081
consumer:
group-id: my-microservice
auto-offset-reset: latest
key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
value-deserializer: io.confluent.kafka.serializers.KafkaAvroDeserializer
properties:
specific.avro.reader: true
interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringConsumerInterceptor
producer:
key-serializer: org.apache.kafka.common.serialization.StringSerializer
value-serializer: io.confluent.kafka.serializers.KafkaAvroSerializer
properties:
interceptor.classes: io.confluent.monitoring.clients.interceptor.MonitoringProducerInterceptor
template:
default-topic:
logging:
level:
root: info
Thank you for any help you can provide (I'm going out of my mind with this :D)
Answers
Boot 2.3 uses spring-kafka 2.5 by default (and kafka-clients 2.5.0); since you have overridden its prescribed spring-kafka version to 2.6.5, you must override all of the kafka dependencies to match
kafka-clients 2.6.1, kafka-streams 2.6.1 (if you are using them).
If you are using the embedded Kafka broker in tests, there are a bunch of other jars you need. See https://docs.spring.io/spring-kafka/docs/current/reference/html/#update-deps
2.6.x is used by Boot 2.4 and will bring in all the right versions.
Confluent 5.4 uses Kafka 2.4.
You should use the version of confluent that matches Spring Boot's prescribed versions of spring-kafka, kafka-clients.
If you use Boot 2.4.x, use confluent 6.0.
https://docs.confluent.io/platform/current/installation/versions-interoperability.html
更多推荐
所有评论(0)