1、安装zookeeper

1.1 通过Deployment创建zookeeper的Pod应用

zookeeper-deployment.yaml

apiVersion: apps/v1
kind: Deployment
metadata:
  labels:
    app: zookeeper
  name: zookeeper
spec:
  replicas: 1
  template:
    metadata:
      labels:
        app: zookeeper
    spec:
      containers:
      - image: wurstmeister/zookeeper
        imagePullPolicy: IfNotPresent
        name: zookeeper
        ports:
        - containerPort: 2181

1.2 创建zookeeper的svc服务

zookeeper-svc.yaml

apiVersion: v1
kind: Service
metadata:
  labels:
    app: zookeeper-service
  name: zookeeper-service
spec:
  ports:
  - name: zookeeper-port
    port: 2181
    targetPort: 2181
  selector:
    app: zookeeper

2、安装kafaka

2.1 创建kafaka的svc服务

kafka-svc.yaml

apiVersion: v1
kind: Service
metadata:
  name: kafka-service
  labels:
    app: kafka
spec:
  type: NodePort
  ports:
  - port: 9092
    name: kafka-port
    targetPort: 9092
    nodePort: 30092
    protocol: TCP
  selector:
    app: kafka

2.2 通过Deployment创建kafaka的pod应用

kafka-deployment.yaml

kind: Deployment
apiVersion: apps/v1
metadata:
  name: kafka-deployment
spec:
  replicas: 1
  selector:
    matchLabels:
      name: kafka
  template:
    metadata:
      labels:
        name: kafka
        app: kafka
    spec:
      containers:
      - name: kafka
        image: wurstmeister/kafka
        imagePullPolicy: IfNotPresent
        ports:
        - containerPort: 9092
        env:
        - name: KAFKA_ADVERTISED_PORT
          value: "9092"
        - name: KAFKA_ADVERTISED_HOST_NAME
          value: 10.104.243.20 
        - name: KAFKA_ZOOKEEPER_CONNECT
          value: 10.98.84.1:2181
        - name: KAFKA_BROKER_ID
          value: "1"

备注:以上操作步骤,请注意先后顺序。KAFKA_ADVERTISED_HOST_NAME,请修改为实际的kafaka的svc的CLUSTER-IP
KAFKA_ZOOKEEPER_CONNECT,请修改为实际的zookeeper的CLUSTER-IP:2181

3 kafaka基本功能测试

3.1 kafaka向zookeeper注册哪些记录?

在k8s环境的master节点服务器上,使用如下命令进入zookeeper的容器
kubectl exec -it zookeeper-58759999cc-nhsxv bash
备注:请根据实际情况修改zookeeper-58759999cc-nhsxv
让后进入bin目录,运行zk客户端,

/opt/zookeeper-3.4.13# cd bin/
/opt/zookeeper-3.4.13/bin# ll
total 52
drwxr-xr-x  2 501 staff 4096 Sep 10 00:49 ./
drwxr-xr-x 11 501 staff 4096 Sep 10 00:49 ../
-rwxr-xr-x  4 501 staff  232 Jun 29  2018 README.txt*
-rwxr-xr-x  4 501 staff 1937 Jun 29  2018 zkCleanup.sh*
-rwxr-xr-x  4 501 staff 1056 Jun 29  2018 zkCli.cmd*
-rwxr-xr-x  4 501 staff 1534 Jun 29  2018 zkCli.sh*
-rwxr-xr-x  4 501 staff 1759 Jun 29  2018 zkEnv.cmd*
-rwxr-xr-x  4 501 staff 2696 Jun 29  2018 zkEnv.sh*
-rwxr-xr-x  4 501 staff 1089 Jun 29  2018 zkServer.cmd*
-rwxr-xr-x  4 501 staff 6773 Jun 29  2018 zkServer.sh*
-rwxr-xr-x  4 501 staff  996 Jun 29  2018 zkTxnLogToolkit.cmd*
-rwxr-xr-x  4 501 staff 1385 Jun 29  2018 zkTxnLogToolkit.sh*
/opt/zookeeper-3.4.13/bin# ./zkCli.sh
[zk: localhost:2181(CONNECTED) 10] ls /brokers
[seqid, topics, ids]

以上可以看出来kafaka向zk注册了[seqid, topics, ids]

3.2 运行生产者kafka-console-producer.sh

在k8s环境的master节点服务器上,使用如下命令进入kafaka的容器
kubectl exec -it kafka-deployment-6d565b947-4cvz7 bash
备注:请根据实际情况修改kafka-deployment-6d565b947-4cvz7

bash-4.4# cd /opt/kafka/bin/
bash-4.4# kafka-console-producer.sh --broker-list 10.104.243.20:9092 --topic HelloWorld
>zhonguo
备注:10.104.243.20:9092这里是kafaka的svc的CLUSTER-IP

3.3 运行消费者

在k8s环境的master节点服务器上,使用如下命令进入kafaka的容器
kubectl exec -it kafka-deployment-6d565b947-4cvz7 bash
备注:请根据实际情况修改kafka-deployment-6d565b947-4cvz7

bash-4.4# kafka-console-consumer.sh --bootstrap-server 10.104.243.20:9092 --from-beginning --topic  HelloWorld
zhonguo
备注:10.104.243.20:9092这里是kafaka的svc的CLUSTER-IP

生产者生产的消息确实被消费者消费成功。
再来,看下zookeeper中的记录,

[zk: localhost:2181(CONNECTED) 11] ls /brokers/topics
[__consumer_offsets, HelloWorld]

topic成功存入zookeeper中。

Logo

K8S/Kubernetes社区为您提供最前沿的新闻资讯和知识内容

更多推荐