Answer a question

I'm creating an avro class that contains a string and a map as fields. I can generate the avro class through maven, and i was able to create a registry in localhost:8081

.avsc file:

    {
"type":"record",
"name":"AvroClass",
"namespace":"belliPack.avro",
"fields":[
{
"name":"title",
"type":"string"
},
{
"name":"map",
"type": {"type": "map", "values": "double"}
}
]
}

Schema registry returns this: $ curl -X GET http://localhost:8081/subjects/teste1-value/versions/1

{"subject":"teste1-value","version":1,"id":42,"schema":"{"type":"record","name":"AvroClass","namespace":"belliPack.avro","fields":[{"name":"title","type":"string"},{"name":"map","type":{"type":"map","values":"double"}}]}"}

My Kafka Producer Class is:

public KafkaProducer<String, AvroClass> createKafkaProducer() {
    String bootstrapServer = "127.0.0.1:9092";
    String schemaRegistryURL = "127.0.0.1:8081";

    //create Producer properties
    Properties properties = new Properties();
    //kafka documentation>producer configs
    properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
    properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
    properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class.getName());
    properties.setProperty(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG,schemaRegistryURL);

    //create producer
    KafkaProducer<String, AvroClass> producer = new KafkaProducer<>(properties);
    return producer;
}

But when running my Kafka Producer have this error:

    Exception in thread "Thread-1" Exception in thread "Thread-3" org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)
org.apache.kafka.common.errors.SerializationException: Error serializing Avro message
Caused by: java.net.MalformedURLException: no protocol: 127.0.0.1:8081/subjects/teste1-value/versions
at java.base/java.net.URL.(URL.java:644)
at java.base/java.net.URL.(URL.java:540)
at java.base/java.net.URL.(URL.java:487)
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:175)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:256)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:356)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:348)
at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:334)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:168)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:222)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:198)
at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:70)
at io.confluent.kafka.serializers.KafkaAvroSerializer.serialize(KafkaAvroSerializer.java:53)
at org.apache.kafka.common.serialization.Serializer.serialize(Serializer.java:62)
at org.apache.kafka.clients.producer.KafkaProducer.doSend(KafkaProducer.java:903)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:865)
at org.apache.kafka.clients.producer.KafkaProducer.send(KafkaProducer.java:752)
at belliPack.Kafka.Kafka_Producer.sendData(Kafka_Producer.java:32)
at belliPack.OPC.ExtractNodeValues.run(ExtractNodeValues.java:82)
at java.base/java.lang.Thread.run(Thread.java:834)```

Answers

java.net.MalformedURLException: no protocol

How would the client know if you want http or https? There's no default, so you must provide it on the registry url

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐