Answer a question

just starting learning Kafka. Im trying to setup a small kafka cluster including 2 brokers. I was successfull in sending messages to my topic when both brokers are up. I want to test the behavior of my cluster when one of 2 brokers goes done. I stopped my primary broker (Kafka1) using docker stop kafka1, and i tried then to send a message to my cluster to see if my producer is able to understand that he need to send to kafka2 as kafka1 is down.

However i constantly receiving the below errors:

{"level":"ERROR","timestamp":"2022-07-19T18:59:46.891Z","logger":"kafkajs","message":"[Connection] Response Metadata(key: 3, version: 6)","broker":"localhost:39092","clientId":"my-app","error":"There is no leader for this topic-partition as we are in the middle of a leadership election","correlationId":1,"size":144}

below is my producer code:

const kafka = new Kafka({
clientId: 'my-app',
brokers: ['localhost:29092', 'localhost:39092'],
})
const producer = kafka.producer({ createPartitioner: Partitioners.LegacyPartitioner })

await producer.connect()

await producer.send({
  topic: 'coverageEvolved',
  messages: [
    { value: JSON.stringify(bodyActiveMq), key: bodyActiveMq[0].roamPartner},
  ],
})

await producer.disconnect()

and below is my docker-compose-file:

version: '2'
services:
    zookeeper:
    image: confluentinc/cp-zookeeper:latest
    restart: unless-stopped
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000
    ports:
      - 22181:2181
    volumes:
      - ./zookeeper/data:/var/lib/zookeeper/data
    kafka-1:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
            - 29092:29092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 1
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka- 
    1:9092,PLAINTEXT_HOST://localhost:29092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka1/data:/var/lib/kafka/data
      kafka-2:
        image: confluentinc/cp-kafka:latest
        depends_on:
          - zookeeper
        ports:
          - 39092:39092
        restart: unless-stopped
        environment:
          KAFKA_BROKER_ID: 2
          KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
          KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka-2:9092,PLAINTEXT_HOST://localhost:39092
          KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
          KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
          KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 2
        volumes:
          - ./kafka2/data:/var/lib/kafka/data

Answers

If you've not created your topic some other way, Kafka will default to create your coverageEvolved topic used in the code with only one replica, and only one partition.

If you kill the broker hosting that one replica, there will be no in sync replica leader that can be produced to.

You can use Kafkajs to create topics.

Also worth mentioning, there's a transactions topic that only has one replica (you're missing an environment variable for it). This is mainly only relevant for Java clients since transactional producers are enabled by default as of Kafka 3.0

Logo

云原生社区为您提供最前沿的新闻资讯和知识内容

更多推荐