想要运行kafka。发现windows系统运行kafka经常报问题怎么办?linux系统环境太复杂怎么办

本文针对kafka服务器的启动,介绍如何利用docker在容器内运行kakfa的server。

1. 环境说明

docker提供了相对独立简洁的运行环境,能够让kafka的服务端良好独立地运行,遇到问题也方便进行排查。

经常使用的kafka镜像有wurminster/kafka, bitnami/kafka这两种,都可以使用。本文使用bitnami/kafka,并结合zookeeper使用(kafka的运行基础)。

docker: 24.0.6

镜像bitnami/kafka: 2.8.1

镜像bitnami/zookeeper: latest

镜像dushixiang/kafka-map:latest(kafka的管理界面,可同时启动使用)

不同版本基本不会对运行产生影响。

2. 方法介绍

为了让kafka有健全的运行体系,并且方便我们一键式地开启或关闭kafka的server,利用docker compose工具来建立一个由多容器构成的整体服务。

2.1. 文件准备

创建一个kafka文件夹,文件夹中创建2个文本文件,重命名为.envcompose.yaml

2.1.1. .env文件

.env文件的作用为设置环境变量,方便我们修改相关的参数,如端口、版本等。

ZOOKEEPER_IMAGE_VERSION='latest'
KAFKA_IMAGE_VERSION='2.8.1'
KAFKA_MAP_IMAGE_VERSION='latest'
ZOOKEEPER_PORT=2181
KAFKA_PORT=9092
KAFKA_MAP_PORT=9001

2.1.2. compose.yaml文件

docker compose进行启动服务的执行文件,根据文件内容运行对应镜像。

将生成zookeeper,kafka,kafka-map三个容器。

services:
  zookeeper:
    image: bitnami/zookeeper:${ZOOKEEPER_IMAGE_VERSION}
    container_name: zookeeper
    ports:
      - ${ZOOKEEPER_PORT}:2181
    environment:
      - ALLOW_ANONYMOUS_LOGIN=yes
    networks:
      - app-tier

  kafka:
    image: bitnami/kafka:${KAFKA_IMAGE_VERSION}
    container_name: kafka
    ports:
      - ${KAFKA_PORT}:9092
    environment:
      - KAFKA_CFG_NODE_ID=1
      - ALLOW_PLAINTEXT_LISTENER=yes
      - KAFKA_CFG_ZOOKEEPER_CONNECT=zookeeper:2181
      # - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092  不用advertised会导致client无法连接
      - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://localhost:${KAFKA_PORT}
    networks:
      - app-tier

  kafka-map:
    image: dushixiang/kafka-map:${KAFKA_MAP_IMAGE_VERSION}
    container_name: kafka-map
    restart: always
    ports:
      - ${KAFKA_MAP_PORT}:8080
    environment:
      - DEFAULT_USERNAME=admin
      - DEFAULT_PASSWORD=admin
    volumes:
      - /opt/kafka-map/data/:/usr/local/kafka-map/data
    networks:
      - app-tier

networks:
  app-tier:

2.2. 运行命令

打开cmd并cd到kafka文件夹的路径

输入下方命令

docker compose up -d

 成功运行将显示

此时即说明kafka的server已运行成功。

3. 后续操作

3.1. 运行client客户端(consumer和producer)

我的本机是windows,可以通过在windows安装kafka来生成producer和consumer。

也可以利用python代码,来生成producer和consumer。

client客户端的运行将依赖于docker生成的kafka server端,如果server停止运行,client会出问题。

3.1.1. 通过windows安装的kafka运行

首先在cmd中cd到kafka的执行路径。

cd "G:\kafka_2.13-2.8.1\bin\windows"

利用以下代码产生consumer或producer,注意一个窗口只能开一个,两个都开要两个窗口。

# 启动consumer
kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic test --from-beginning
# 启动producer
kafka-console-producer.bat --broker-list localhost:9092 --topic test

3.1.2. python代码生成

首先python环境中需安装kafka-python

producer: 每秒钟由producer发送时间

from kafka import KafkaProducer
import time


if __name__ == "__main__":
    producer = KafkaProducer(bootstrap_servers='localhost:9092',
                             api_version=(2, 8, 1))
    print('Producer is ready to send messages')
    while True:
        time.sleep(1)
        msg = time.strftime('%Y-%m-%d %H:%M:%S', time.localtime())
        print(msg)
        msg = msg.encode('utf-8')
        producer.send(topic='python', value=msg)

consumer

from kafka import KafkaConsumer
import os


if __name__ == "__main__":
    consumer = KafkaConsumer('python',
                            bootstrap_servers=['localhost:9092'],
                            group_id = str(os.getpid()),
                            api_version=(2, 8, 1),
                            auto_commit_interval_ms=5000,
                            )

    for msg in consumer:
        value = msg.value.decode('utf-8')
        recv = "%s:%d:%d: key=%s value=%s" % (msg.topic, msg.partition, msg.offset, msg.key, value)
        print(recv)

3.2. kafka相关信息查看

这里以通过zookeeper查看当前已有的broker(即kafka的server端)为例。

在cmd中运行

docker exec -it zookeeper bin/bash zkCli.sh

进入该容器后,运行

# 查看当前已有的broker的id
ls /brokers/ids
# 输出: [1, 2, 3]
# 获取该broker的信息
get /brokers/ids/<某id>

参考资料

Docker安装Kafka教程(超详细)

kafka各种环境安装(window,linux,docker,k8s),包含KRaft模式

解决Docker容器连接 Kafka 连接失败问题

Logo

一起探索未来云端世界的核心,云原生技术专区带您领略创新、高效和可扩展的云计算解决方案,引领您在数字化时代的成功之路。

更多推荐