Answer a question

I'm running Kafka in docker and I've a .NET application that I want to use to consume messages. I've followed following tutorials with no luck:
https://www.confluent.io/blog/kafka-client-cannot-connect-to-broker-on-aws-on-docker-etc/
Connect to Kafka running in Docker
Interact with kafka docker container from outside of docker host
On my consumer application I get the following error if I try to conenct directly to containers ip:

172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21502ms in state CONNECT)
Error: 1/1 brokers are down %3|1620652406.633|FAIL|rdkafka#consumer-1| [thrd:172.21.0.3:9092/bootstrap]: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed) 
Error: 172.21.0.3:9092/bootstrap: Connect to ipv4#172.21.0.3:9092 failed: Unknown error (after 21037ms in state CONNECT, 1 identical error(s) suppressed)

If I change BootstrapServers to kafka:9092 I get this error:

Error: kafka:9092/bootstrap: Failed to resolve 'kafka:9092': No such host is known.  (after 6817ms in state CONNECT, 7 identical error(s) suppressed)

My docker compose:

version: '3.8'
services:
  zookeeper:
    #image: "debezium/zookeeper:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-zookeeper:5.5.0"
  #  ports:
  #    - 2181:2181
  #    - 2888:2888
  #    - 3888:3888
    ports:
      - 2181:2181
    environment: 
      ZOOKEEPER_CLIENT_PORT: 2181

  kafka:
    #image: "debezium/kafka:${DEBEZIUM_VERSION}"
    image: "confluentinc/cp-kafka"
    ports:
      - 9092:9092
      #- 29092:29092
    depends_on:
      - zookeeper
    environment:
     - KAFKA_ZOOKEEPER_CONNECT=zookeeper:2181 
     - KAFKA_BROKERID=1
     - ALLOW_PLAINTEXT_LISTENER="yes"
     - KAFKA_LISTENER_SECURITY_PROTOCOL_MAP=PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_INTER_BROKER_LISTENER_NAME=PLAINTEXT
     - KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR=1

  connect:
    image: "debezium/connect:${DEBEZIUM_VERSION}"
    ports: 
      - 8083:8083
    depends_on:
      - kafka
      - zookeeper
    environment:
      - BOOTSTRAP_SERVERS=kafka:29092
      - GROUP_ID=1
      - CONFIG_STORAGE_TOPIC=my_connect_configs
      - OFFSET_STORAGE_TOPIC=my_connect_offsets
      - STATUS_STORAGE_TOPIC=my_source_connect_statuses

  kafdrop:
    image: "obsidiandynamics/kafdrop"
    ports:
      - 9000:9000
    depends_on: 
      - connect
    environment: 
      - KAFKA_BROKERCONNECT=kafka:29092

and C# code:

 var config = new ConsumerConfig
            {
                BootstrapServers = "kafka:9092",
                GroupId = "simple-dotnet-consumer",
                AutoOffsetReset = AutoOffsetReset.Earliest,
                EnablePartitionEof = true
            };
            using var consumer = new ConsumerBuilder<string, string>(config)
                .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
                .Build();

            consumer.Subscribe(new List<string>() { "DESKTOP-DBA3LAO.dbo.CashRegister" });
            var start = DateTime.Now;
            long messageCounter = 0;
            try
            {
                while (!(Console.KeyAvailable && Console.ReadKey().Key == ConsoleKey.Q))
                {
                    var result = consumer.Consume(TimeSpan.FromMilliseconds(100));
                    if (result == null) { continue; }
                    if (result.IsPartitionEOF) { break; }

                    ++messageCounter;
                    if (messageCounter % 1024 == 0) { Console.WriteLine($"Received message key: \"{result.Message.Key}\" value: {result.Message.Value}"); }
                }
            }
            catch (OperationCanceledException) { }

            consumer.Close();  // commit offset and unsubscribe

            var elapsed = DateTime.Now - start;
            Console.WriteLine("average throughput: {0:N3} msg/sec, {1} messages over {2:N3} sec", messageCounter / elapsed.TotalSeconds, messageCounter, elapsed.TotalSeconds);
        

Answers

You need to configure the listeners correctly. At the moment you are advertising the broker as being accessible only at hostname kafka which as @mm8 says is only going to be valid within the Docker network.

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092
     - KAFKA_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://kafka:9092

You need to have a listener which will advertise itself as localhost (or whatever hostname your code will be able to connect to it on), for example:

     - KAFKA_ADVERTISED_LISTENERS=PLAINTEXT://kafka:29092,PLAINTEXT_HOST://localhost:9092
     - KAFKA_LISTENERS=PLAINTEXT://0.0.0.0:29092,PLAINTEXT_HOST://0.0.0.0:9092

Now amend your client application to use localhost and not kafka:

BootstrapServers = "localhost:9092",

This is all covered in my blog, if you read it carefully :)

Logo

华为、百度、京东云现已入驻,来创建你的专属开发者社区吧!

更多推荐