Answer a question

Please help I'm wondering why the kafka producer always connect to the localhost however there the broker ip is not the localhost. So, is there any help ? any ideas ?

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.common.serialization.StringSerializer;

import java.util.Properties;

public class ProducerDemo {

    public static void main(String[] args) throws Exception{
        String bootstrapServers = "192.168.199.137:9092";

        Properties properties = new Properties();
        properties.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
        properties.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
        properties.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());

        // create the producer
        KafkaProducer<String, String> producer = new KafkaProducer<String, String>(properties);

        System.out.println("kafka IP   " + properties.getProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG));


        // create producer record
        ProducerRecord<String, String> record = new ProducerRecord<String, String>("first_topic", "Hello world");


        // send data
        producer.send(record);
        producer.flush();
        producer.close();

    }

}

And here is the pom content

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>org.example</groupId>
    <artifactId>KafkaBeginnersCourse</artifactId>
    <version>1.0-SNAPSHOT</version>

    <packaging>jar</packaging>

    <dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka-clients -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.0</version>
        </dependency>

        <!-- https://mvnrepository.com/artifact/org.slf4j/slf4j-simple -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-simple</artifactId>
            <version>1.7.28</version>
            <!--scope>test</scope-->
        </dependency>
    </dependencies>
</project>

Here is a list from the output console

[kafka-producer-network-thread | producer-1] INFO org.apache.kafka.clients.Metadata - [Producer clientId=producer-1] Cluster ID: 0TPD87gWR0G18RLKk4gPow
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.
[kafka-producer-network-thread | producer-1] WARN org.apache.kafka.clients.NetworkClient - [Producer clientId=producer-1] Connection to node 1 (localhost/127.0.0.1:9092) could not be established. Broker may not be available.

Answers

It seems there is a problem in your advertised.listeners setting in server.properties file as @cricket_007 has already said. Let me try to explain it in detail.

When your producer try to connect Kafka broker, broker sends advertised hostname to client to use, then producer connect to broker with using this address. So normally communication is like this:

enter image description here

But in your case communication between producer and broker is like this:

enter image description here

P.S: You can assume public IP in images as private IP for your case.

As a result you should set your advertised listeners in server.properties like this:

advertised.listeners=PLAINTEXT://192.168.199.137:9092

Reference for images: https://www.udemy.com/course/kafka-cluster-setup/

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐