一、kafka简介

Kafka是最初由Linkedin公司开发,是一个分布式、分区的、多副本的、多订阅者,基于zookeeper协调的分布式日志系统(也可以当做MQ系统),常见可以用于web/nginx日志、访问日志,消息服务等等,Linkedin于2010年贡献给了Apache基金会并成为顶级开源项目。
主要应用场景是:日志收集系统和消息系统。
Kafka主要设计目标如下:

  • 以时间复杂度为O(1)的方式提供消息持久化能力,即使对TB级以上数据也能保证常数时间的访问性能。
  • 高吞吐率。即使在非常廉价的商用机器上也能做到单机支持每秒100K条消息的传输。
  • 支持Kafka Server间的消息分区,及分布式消费,同时保证每个partition内的消息顺序传输。
  • 同时支持离线数据处理和实时数据处理。
  • Scale out:支持在线水平扩展
  • Kafka将一个主题(topic)内容的消息拆分成多个分区(partition),分布式保存到不同节点,形成一个分布式的消息队列,提升系统的吞吐量。同时每个partition都可以指定副本数量,通过副本保证数据的可靠性。创建topic的时候需要指定分区和副本数量。
    在这里插入图片描述

二、kafka和zookeeper安装部署

1、单容器手动部署方案

  • 步骤1 下载镜像
//下载kafka和zookeeper的镜像
docker pull wurstmeister/kafka
docker pull zookeeper
  • 步骤2 新建容器通信网络(主要用于zookeeper和kafka之间的通信)
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network create kafka-network  (新建网络)
e714f435cb1260c2de0e842dfc902d61e6d0a0475897b15dc73362bee87f94e6
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network ls  (查看已存在的网络列表)
NETWORK ID          NAME                          DRIVER              SCOPE
cb77844cbdf1        bridge                        bridge              local
c4df20e19860        compose-files_default         bridge              local
104a0c8d74f2        compose-files_edgex-network   bridge              local
24b1b6ec3161        harbor_harbor                 bridge              local
caee95f8bdda        host                          host                local
e714f435cb12        kafka-network                 bridge              local
039be458adff        none                          null                local
[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network  (查看网络的详细信息)
[
    {
        "Name": "kafka-network",
        "Id": "e714f435cb1260c2de0e842dfc902d61e6d0a0475897b15dc73362bee87f94e6",
        "Created": "2020-09-01T21:08:55.905467925+08:00",
        "Scope": "local",
        "Driver": "bridge",
        "EnableIPv6": false,
        "IPAM": {
            "Driver": "default",
            "Options": {},
            "Config": [
                {
                    "Subnet": "172.26.0.0/16",
                    "Gateway": "172.26.0.1"
                }
            ]
        },
        "Internal": false,
        "Attachable": false,
        "Ingress": false,
        "ConfigFrom": {
            "Network": ""
        },
        "ConfigOnly": false,
        "Containers": {},  (连接的容器为空)
        "Options": {},
        "Labels": {}
    }
]
[root@iZuf6heg0pec2tuaaw1ltvZ ~]#

  • 步骤3 部署zookeeper容器
//使用上面创建的容器网络kafka-network,创建zookeeper容器
docker run --net=kafka-network --name nick_zookeeper -p 21810:2181 -d docker.io/zookeeper
  • 步骤3 部署kafka容器
//同样使用上面创建的容器网络kafka-network,创建kafka容器
docker run --net=kafka-network --name nick_kafka -p 9092:9092 \
--link kafka-network \
-e KAFKA_ZOOKEEPER_CONNECT=172.26.0.2:2181 \
-e KAFKA_ADVERTISED_HOST_NAME=47.101.72.122 \
-e KAFKA_ADVERTISED_PORT=9092 \
-d wurstmeister/kafka

KAFKA_ADVERTISED_HOST_NAME参数需要设置为宿主机地址47.103.71.190。
KAFKA_ZOOKEEPER_CONNECT参数设置nick_zookeeper容器内部地址和端口(同一宿主机内的容器互相访问要用容器内地址,查看指令为docker inspect nick_zookeeper,在Networks字段可以看到容器内ip地址)。

"Networks": {
                "kafka-network": {
                    ... ...
                    "IPAddress": "172.26.0.2",
                    ... ...
                }
            }

再次,查看下kafka-network容器网络的Containers字段,zookeeper和kafka容器都已经加入 kafka-network网络了。

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker network inspect kafka-network
[
    {
        "Name": "kafka-network",
        ... ...
        "Containers": {
            "cac4fd9ce8ff8f0236e7f6e613da9c819dc8aeade1d63596af02b325933451b0": {
                "Name": "nick_zookeeper",
                "EndpointID": "0c4583525899f6a426f43bb4fb191c739afb02bb59cb5ada182e795d163341f2",
                "MacAddress": "02:42:ac:1a:00:02",
                "IPv4Address": "172.26.0.2/16",
                "IPv6Address": ""
            },
            "d28ffb2e72b36b1ad4c049bbf5256d578c4fe9d93291720659141da05fff3bdb": {
                "Name": "nick_kafka",
                "EndpointID": "552db13d5d82cb7e2fadc123e80063dc0cfb4b450339d471704de292d7faf051",
                "MacAddress": "02:42:ac:1a:00:03",
                "IPv4Address": "172.26.0.3/16",
                "IPv6Address": ""
            }
        },
        "Options": {},
        "Labels": {}
    }
]
  • 步骤4 测试发送消息和接收消息
// 进入容器
docker exec -it nick_kafka bash
// 进入kafka所在目录
cd /opt/kafka_2.13-2.6.0/bin/
// 启动消息发送方
 ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic
// 再开一个终端进入容器,启动消息接收方
 ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning

进入kafka容器,发送消息

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash
bash-4.4# cd /opt/kafka_2.13-2.6.0/bin
bash-4.4# ./kafka-console-producer.sh --broker-list localhost:9092 --topic temptopic
>1
>2
>3
>4
>5
>6

进入kafka容器,接收消息

[root@iZuf6heg0pec2tuaaw1ltvZ ~]# docker exec -it nick_kafka bash
bash-4.4# cd /opt/kafka_2.13-2.6.0/bin/
bash-4.4# ./kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic temptopic --from-beginning
1
2
3
4
5
6

上述关于kakfa和zookeeper容器的部署操作,属于单机版单个容器部署,对于初学者建议手动操作一下。

2、docker-compose容器编排工具部署

使用docker-compose容器编排工具,可一键式安装两个容器。

  • 步骤1 首先编写docker-compose.yml
version: '3'
services:
  zookeeper:
    image: zookeeper
    container_name: nick_zookeeper
    ports:
      - "2181:2181"
    networks:
      - kafka-network
  kafka:
    image: wurstmeister/kafka
    container_name: nick_kafka
    depends_on:
      - zookeeper
    links:
      - zookeeper
    networks:
      - kafka-network
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 5
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
networks:
  kafka-network:
    driver: bridge
  • 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-single.yml up -d
Creating nick_zookeeper ... done
Creating nick_kafka ... done
Creating nick_kafka ...
  • 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-single.yml ps
     Name                   Command               State                          Ports
--------------------------------------------------------------------------------------------------------------
nick_kafka       start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp
nick_zookeeper   /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
root@ubuntu:~#

3、集群部署zookeeper和kafka

  • 步骤1 首先编写
    docker-compose.yml
version: '3'
services:
  zkwt01:
    image: zookeeper
    restart: always
    container_name: zkwt01
    ports:
      - "2181:2181"
    environment:
      ZOO_MY_ID: 1
      ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181 #这里有个隐形的坑,需要在3888端口后面加上;2181,否则zookeeper无法对外提供服务,会导致Kafka无法连接上
    networks:
      - kafka-network
  zkwt02:
    image: zookeeper
    restart: always
    container_name: zkwt02
    ports:
      - "2182:2181"
    environment:
      ZOO_MY_ID: 2
      ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181
    networks:
      - kafka-network
  zkwt03:
    image: zookeeper
    restart: always
    container_name: zkwt03
    ports:
      - "2183:2181"
    environment:
      ZOO_MY_ID: 3
      ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181
    networks:
      - kafka-network
  zkwt04:
    image: zookeeper
    restart: always
    container_name: zkwt04
    ports:
      - "2184:2181"
    environment:
      ZOO_MY_ID: 4
      PEER_TYPE: observer
      ZOO_SERVERS: server.1=zkwt01:2888:3888;2181 server.2=zkwt02:2888:3888;2181 server.3=zkwt03:2888:3888;2181 server.4=zkwt04:2888:3888:observer;2181
    networks:
      - kafka-network
  kafkawt01:
    image: wurstmeister/kafka
    restart: always
    container_name: kafkawt01
    depends_on:
      - zkwt01
      - zkwt02
      - zkwt03
      - zkwt04
    networks:
      - kafka-network
    ports:
      - "9092:9092"
    environment:
      KAFKA_BROKER_ID: 1
      KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9092  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafkawt02:
    image: wurstmeister/kafka
    restart: always
    container_name: kafkawt02
    depends_on:
      - zkwt01
      - zkwt02
      - zkwt03
      - zkwt04
    networks:
      - kafka-network
    ports:
      - "9093:9092"
    environment:
      KAFKA_BROKER_ID: 2
      KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9093  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock
  kafkawt03:
    image: wurstmeister/kafka
    restart: always
    container_name: kafkawt03
    depends_on:
      - zkwt01
      - zkwt02
      - zkwt03
      - zkwt04
    networks:
      - kafka-network
    ports:
      - "9094:9092"
    environment:
      KAFKA_BROKER_ID: 3
      KAFKA_ZOOKEEPER_CONNECT: zkwt01:2181,zkwt02:2181,zkwt03:2181,zkwt04:2181
      KAFKA_LISTENERS: PLAINTEXT://0.0.0.0:9092
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://192.168.0.205:9094  #宿主机监听端口
    volumes:
      - /var/run/docker.sock:/var/run/docker.sock

networks:
  kafka-network:
    driver: bridge
  • 步骤2 执行如下命令,启动服务docker-compose -f docker-compose-single.yml up -d
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml up -d
Creating network "root_kafka-network" with driver "bridge"
Creating kafkawt01 ... done
Creating zkwt02 ...
Creating zkwt04 ...
Creating zkwt01 ...
Creating kafkawt03 ...
Creating kafkawt02 ...
Creating kafkawt01 ...
  • 步骤3 执行如下命令,查看docker-compose -f docker-compose-single.yml ps查看服务的运行状态
root@ubuntu:~# docker-compose -f docker-compose-cluster.yml ps
  Name                 Command               State                          Ports
---------------------------------------------------------------------------------------------------------
kafkawt01   start-kafka.sh                   Up      0.0.0.0:9092->9092/tcp
kafkawt02   start-kafka.sh                   Up      0.0.0.0:9093->9092/tcp
kafkawt03   start-kafka.sh                   Up      0.0.0.0:9094->9092/tcp
zkwt01      /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2181->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
zkwt02      /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2182->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
zkwt03      /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2183->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp
zkwt04      /docker-entrypoint.sh zkSe ...   Up      0.0.0.0:2184->2181/tcp, 2888/tcp, 3888/tcp, 8080/tcp

至此,zookeeper和kafka的安装部署完成。

三、springboot集成kafka

1、创建springboot项目(生产者/消费者)

  • 步骤1 创建一个空项目enpty project,
    在这里插入图片描述
  • 步骤2 创建new module,新建生产者springboot项目,
    右击上面创建的kafka-project–>New–>Module–>Spring Initializr–>Next
    在这里插入图片描述
    输入Group,模块名称,Java version,包路径等,如下图所示,
    在这里插入图片描述
    点击Next后,添加maven依赖,kafka的依赖关系,如下图所示,
    在这里插入图片描述
    点击Next–>Finish,完成生产者springboot项目创建。
    使用如上同样的操作步骤,创建消费者springboot项目
    在这里插入图片描述

2、实现生产者业务逻辑

  • 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。
    在这里插入图片描述
  • 步骤2 打开springboot-kafka-producter项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
#### kafka配置生产者 begin ####
spring:
  kafka:
    bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094
    producer:
      # 写入失败时,重试次数。当leader节点失效,一个repli节点会替代成为leader节点,此时可能出现写入失败,
      # 当retris为0时,produce不会重复。retirs重发,此时repli节点完全成为leader节点,不会产生消息丢失。
      retries: 0
      # 每次批量发送消息的数量,produce积累到一定数据,一次发送
      batch-size: 16384
      # produce积累数据一次发送,缓存大小达到buffer.memory就发送数据
      buffer-memory: 33554432
      #procedure要求leader在考虑完成请求之前收到的确认数,用于控制发送记录在服务端的持久化,其值可以为如下:
      #acks = 0 如果设置为零,则生产者将不会等待来自服务器的任何确认,该记录将立即添加到套接字缓冲区并视为已发送。在这种情况下,无法保证服务器已收到记录,并且重试配置将不会生效(因为客户端通常不会知道任何故障),为每条记录返回的偏移量始终设置为-1。
      #acks = 1 这意味着leader会将记录写入其本地日志,但无需等待所有副本服务器的完全确认即可做出回应,在这种情况下,如果leader在确认记录后立即失败,但在将数据复制到所有的副本服务器之前,则记录将会丢失。
      #acks = all 这意味着leader将等待完整的同步副本集以确认记录,这保证了只要至少一个同步副本服务器仍然存活,记录就不会丢失,这是最强有力的保证,这相当于acks = -1的设置。
      #可以设置的值为:all, -1, 0, 1
      acks: 1
      # 指定消息key和消息体的序列化编解码方式
      key-serializer: org.apache.kafka.common.serialization.StringSerializer
      value-serializer: org.apache.kafka.common.serialization.StringSerializer
#### kafka配置生产者 end ####
  • 步骤3 创建初始化topic的配置类KafkaInitialConfiguration,在com.kafka.eason目录下面,增加config文件夹,新建KafkaInitialConfiguration类,
package com.kafka.eason.config;

import org.apache.kafka.clients.admin.NewTopic;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class KafkaInitialConfiguration {
    // 创建一个名为testTopic的Topic并设置分区数partitions为8,分区副本数replication-factor为2
    @Bean
    public NewTopic initialTopic() {
        System.out.println("begin to init initialTopic........................");
        return new NewTopic("wtopic04",8, (short) 2 );
    }

    // 如果要修改分区数,只需修改配置值重启项目即可
    // 修改分区数并不会导致数据的丢失,但是分区数只能增大不能减小
    @Bean
    public NewTopic updateTopic() {
        System.out.println("begin to init updateTopic........................");
        return new NewTopic("wtopic04",10, (short) 2 );
    }
}
  • 步骤4 创建生产者业务逻辑controller类ProducterController,在com.kafka.eason目录下面,增加controller文件夹,新建ProducterController类,
package com.kafka.eason.controller;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.support.SendResult;
import org.springframework.stereotype.Controller;
import org.springframework.util.concurrent.ListenableFuture;
import org.springframework.util.concurrent.ListenableFutureCallback;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.ResponseBody;

/**
 *
 *
 * @author Lynch
 */
@Controller
@RequestMapping("/api/kafka/")
public class ProducterController {
    @Autowired
    private KafkaTemplate<String,Object> kafkaTemplate;

    @GetMapping("send")
    @ResponseBody
    public boolean send(@RequestParam String message) {
        try {
            ListenableFuture<SendResult<String, Object>> future = kafkaTemplate.send("wtopic04", message);
            future.addCallback(new ListenableFutureCallback<SendResult<String, Object>>() {
                @Override
                public void onFailure(Throwable throwable) {
                    System.err.println("wtopic04 - 生产者 发送消息失败:" + throwable.getMessage());
                }

                @Override
                public void onSuccess(SendResult<String, Object> stringObjectSendResult) {
                    System.out.println("wtopic04 - 生产者 发送消息成功:" + stringObjectSendResult.toString());
                }
            });
        } catch (Exception e) {
            e.printStackTrace();
        }
        return true;
    }

    @GetMapping("test")
    @ResponseBody
    public String test(){
        System.out.println("hello world!");
        return "ok";
    }
}
  • 步骤5 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.kafka.eason.controller")
@ComponentScan("com.kafka.eason.config")
public class SpringbootKafkaProducterApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootKafkaProducterApplication.class, args);
    }
}

至此,生产者业务逻辑书写完成,启动主程序SpringbootKafkaProducterApplication。可以在浏览器中调用http://localhost:8080/api/kafka/send?message=wt02此地址推送生产者消息。
在这里插入图片描述
可以登录到Kafka后台,使用消费者命令查看消息是否发布成功,
在这里插入图片描述

3、实现消费者业务逻辑

  • 步骤1 添加maven依赖,此步骤在上面的操作中添加依赖环节已添加完毕。
    在这里插入图片描述
  • 步骤2 打开springboot-kafka-consumer项目里面resources,创建application.yml文件,配置kafka服务地址,以及生产者参数,
server:
  port: 8081
#### kafka配置消费者 start ####
#============== kafka ===================
# 指定kafka server的地址,集群配多个,中间,逗号隔开
spring:
  kafka:
    bootstrap-servers: 192.168.0.205:9092,192.168.0.205:9093,192.168.0.205:9094
    consumer:
      # 指定默认消费者group id --> 由于在kafka中,同一组中的consumer不会读取到同一个消息,依靠groud.id设置组名
      group-id: test
      # smallest和largest才有效,如果smallest重新0开始读取,如果是largest从logfile的offset读取。一般情况下我们都是设置smallest
      auto-offset-reset: earliest
      # enable.auto.commit:true --> 设置自动提交offset
      enable-auto-commit: true
      #如果'enable.auto.commit'为true,则消费者偏移自动提交给Kafka的频率(以毫秒为单位),默认值为5000。
      auto-commit-interval: 1000
      # 指定消息key和消息体的序列化编解码方式
      key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
      value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
#### kafka配置消费者 end ####
  • 步骤3 创建消费者业务逻辑controller类ConsumerController,在com.kafka.eason目录下面,增加controller文件夹,新建ConsumerController类,
package com.kafka.eason.controller;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.stereotype.Component;

/**
 * 消费者监听topic=testTopic的消息
 *
 * @author Lynch
 */
@Component
public class ConsumerController {

    @KafkaListener(topics = "wtopic04")
    public void onMessage(ConsumerRecord<?, ?> record){
        //insertIntoDb(buffer);//这里为插入数据库代码
        //System.out.println("message: " + message);
        System.out.println("简单消费,record:"+record.topic()+"-"+record.partition()+"-"+record.value());

    }

}
  • 步骤4 修改主程序类,增加ComponentScan扫描config和controller目录的文件
package com.kafka.eason;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.ComponentScan;

@SpringBootApplication
@ComponentScan("com.kafka.eason.controller")
public class SpringbootKafkaConsumerApplication {

    public static void main(String[] args) {
        SpringApplication.run(SpringbootKafkaConsumerApplication.class, args);
    }

}

至此,消费者业务逻辑书写完成,启动主程序SpringbootKafkaConsumerApplication。可以在浏览器中调用http://localhost:8080/api/kafka/send?message=wt03此地址推送生产者消息,分别查看下生产者和消费者项目的后台日志
生产者项目后台日志,发送消息成功!
在这里插入图片描述
消费者项目后台日志,消费新消息成功,并且输出topic,数据所在kafka的分区编号,新消息内容。
在这里插入图片描述
暂时分享到这里,后续补录基于kafka+storm实时计算相关内容
----------------over-------------------------------------------

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐