目录

一.kafka的概述

1.定义

2.消息队列

2.1传统消息队列应用场景

 2.2消息队列的两种模式

 二、kafka的基础架构

1.kafka基础架构的简介

2.kafka的详细基础架构

三、安装部署

1.kafka的分布式

2.集群部署

 四、命令操作

1.主题命令操作

2.生产者命令操作

 3.消费者命令操作

 五、kafka生产者模式

1.生产者消息发送流程

2、异步发送API

3.同步发送API

4、生产者分区

(1)kafka分区的好处

(3)自定义分区器

5、生产经验

(1)生产者如和提高吞吐量

 (2)数据可靠性

 (3)数据去重

 (4)数据有序

(5)数据乱序

六、kafka broker 

1、kafka broker 工作流程

(1)zookeeper存储kafka的信息

(2)kafka broker 的总体工作流程

 (3)broker的重要参数

2、文件存储

(1)文件存储机制

(2)文件清洗策略


一.kafka的概述

1.定义

kafka的传统定义:kafka是一个分布式的基于发布\订阅模式的消息队列,主要用于大数据实时处理领域

kafka的最新概念:kafka是一个开源的分布式事件流平台,(80%的公司都在用),用于高性能数据管道、流分析、数据集成和关键任务应用

2.消息队列

在大数据场景主要采用Kafka作为消息队列。在JavaEE开发中主要采用ActiveMQ、RabbitMQ、RocketMQ。

2.1传统消息队列应用场景

传统的消息队列的主要应用场景包括:缓存/消峰解耦异步通信。

(1)缓冲\消峰:这时候的消息队列相当于有一个缓冲区,将请求先写入消息队列,消息队列内的请求遵循先进先出原则,从而不会因为高并发而导致系统崩溃,毕竟服务器的资源有限(场景:以前双十一,没有消息队列,在高并发时可能会系统崩溃,现在有了消息队列,会直接返回,你下单成功或者其他,然后服务器慢慢从消息队列中处理这些任务)

 解耦:耦合性就是两个模块之间的依赖性,越高呢,维护成本越高,比如说就Producer和Consumer直接连接时,一个发生变化,另一个要做出比较大的调整,有了消息对接在中间,就能降低之间耦合性,也就是解耦

异步通信:

 2.2消息队列的两种模式

 二、kafka的基础架构

1.kafka基础架构的简介

(1)生产者:向kafka集群发送数据

(2)broker:启动的kafka集群

(3)消费者:从kafka集群消费数据

(4)zookeeper:帮助kafka实现分布式

2.kafka的详细基础架构

(1)Reducer:生产者(可能是flume、MySQL等),其实就是向kafka发送数据的

(2)Consumer:消费者(可能是MySQL、Hadoop、spark、flink),就是向kafka取数据的

(3)Consumer Group:就是消费者组,由一个或者多个consumer组成,在kafka中,消费者都是有组的,即使是在consumer创建时没有没有设置组,但是kafka会默认一个有一个组,是组直接从kafka中的leader中拉取数据,消费者组内每个消费者负责消费不同分区的数据,一个分区只能由一个组内消费者消费;消费者组之间互不影响。所有的消费者都属于某个消费者组,即消费者组是逻辑上的一个订阅者

(4)Broker:kafka代理,即kafka代理服务器,一个集群由多个broker组成,一个broker可以容纳多个topic

(5)topic:主题,可以理解成队列,但是和点对点队列不同的是,不同的消费者组都可以从topic拉去相同的消息

(由此引出推模型和拉模型的区别:

推模型 push :指定消息推送给谁,如果要给多个对象推送的话,需要推送多份。

拉模型 pull :消息发布出去,放到某个地方,感兴趣的自己来拉。只需要推一份数据。)

(6)partition:为了实现扩展性,一个非常大的topic可以分布到多个broker(即服务器)上,一个topic可以分为多个partition,每个partition是一个有序的队列

(7)replica:每个分区都有一个或者多个副本,有一个leader和多个follower组成

(8)leader:每个分区可能有多个副本,但是这些副本中会选出一个leader,即多个副本中的“主”,producer向kafka发送数据时,和consumer拉取数据时都是和leader做交互,leader会和follower之间会同步数据,

(9)follower:多个副本中的的“从”,有了副本保证数据的安全性,如果有leader挂掉,从follower选取新的leader,所以follower肯定不能和leader在同一个服务器上

三、安装部署

1.kafka的分布式

kafka是依靠zookeeper来实现分布式的,所以再启动前需要先启动zookeeper,如下图

2.集群部署

官方下载地址:Apache Kafka

(1)安装和Hadoop等都一样,解压即可

[atguigu@hadoop102 software]$ tar -zxvf kafka_2.12-3.0.0.tgz -C /opt/module/

 (2)然后修改配置文件

[atguigu@hadoop102 config]$ vim kafka/config/server.properties

 红色部分修改的位置:

#broker的全局唯一编号,不能重复,只能是数字。

broker.id=0

#处理网络请求的线程数量

num.network.threads=3

#用来处理磁盘IO的线程数量

num.io.threads=8

#发送套接字的缓冲区大小

socket.send.buffer.bytes=102400

#接收套接字的缓冲区大小

socket.receive.buffer.bytes=102400

#请求套接字的缓冲区大小

socket.request.max.bytes=104857600

#kafka运行日志(数据)存放的路径,路径不需要提前创建,kafka自动帮你创建,可以配置多个磁盘路径,路径与路径之间可以用""分隔

log.dirs=/opt/module/kafka/datas

#topic在当前broker上的分区个数

num.partitions=1

#用来恢复和清理data下数据的线程数量

num.recovery.threads.per.data.dir=1

# 每个topic创建时的副本数,默认时1个副本

offsets.topic.replication.factor=1

#segment文件保留的最长时间,超时将被删除

log.retention.hours=168

#每个segment文件的大小,默认最大1G

log.segment.bytes=1073741824

# 检查过期数据的时间,默认5分钟检查一次是否数据过期

log.retention.check.interval.ms=300000

#配置连接Zookeeper集群地址(在zk根目录下创建/kafka,方便管理)

zookeeper.connect=hadoop102:2181,hadoop103:2181,hadoop104:2181/kafka

编辑好之后分发,然后将broker.id修改一下

(3)配置环境变量

[atguigu@hadoop102 module]$ sudo vim /etc/profile.d/my_env.sh

 增加内如如下:

#KAFKA_HOME

export KAFKA_HOME=/opt/module/kafka

export PATH=$PATH:$KAFKA_HOME/bin

配置完成后分发

为每台节点刷新环境变量:

[atguigu@hadoop102 module]$ source /etc/profile

 (4)启动集群:先启动zookeeper集群,在启动kafka

(5)集群启停脚本

在/home/atguigu/bin目录下创建文件kf.sh脚本文件

[atguigu@hadoop102 bin]$ vim kf.sh

#! /bin/bash

case $1 in
"start"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------启动 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-start.sh -daemon /opt/module/kafka/config/server.properties"
    done
};;
"stop"){
    for i in hadoop102 hadoop103 hadoop104
    do
        echo " --------停止 $i Kafka-------"
        ssh $i "/opt/module/kafka/bin/kafka-server-stop.sh "
    done
};;
esac

 四、命令操作

1.主题命令操作

查看操作主题命令参数

[atguigu@hadoop102 kafka]$ bin/kafka-topics.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--create

创建主题。

--delete

删除主题。

--alter

修改主题。

--list

查看所有主题。

--describe

查看主题详细描述。

--partitions <Integer: # of partitions>

设置分区数。

--replication-factor<Integer: replication factor>

设置分区副本。

--config <String: name=value>

更新系统默认的配置。

(1)查看服务器中的topic

[atguigu@hadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092 --list

 (2)创建first topic

[atguigu@hadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092
--create --partitions 3 --replication-factor 2 --first topic

         选项说明:

   --topic 定义topic名

   --replication-factor  定义副本数

   --partitions  定义分区数

(3) 查看first主题详情

[atguigu@hadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092
--describe --first topic

 (4)修改分区数(分区数只能增加,不能减少)

[atguigu@hadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092
--alter --first topic --partition 5

(5)删除topic 

[atguigu@hadoop102 kafka]$ kafka-topics.sh --bootstrap-server hadoop102:9092
--delete --first topic 

2.生产者命令操作

查看操作生产者命令

[atguigu@hadoop102 kafka]$ kafka-console-producer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

 发送消息

[atguigu@hadoop102 kafka]$ kafka-console-producer.sh --bootstrap-server 
hadoop102:9092 --topic first

 3.消费者命令操作

查看操作消费者命令

[atguigu@hadoop102 kafka]$ kafka-console-consumer.sh

参数

描述

--bootstrap-server <String: server toconnect to>

连接的Kafka Broker主机名称和端口号。

--topic <String: topic>

操作的topic名称。

--from-beginning

从头开始消费。

--group <String: consumer group id>

指定消费者组名称。

 消费主题first中的信息(从末尾开始)

[atguigu@hadoop102 kafka]$ kafka-console-consumer.sh --bootstrap-server 
hadoop102:9092 --topic first

 把主题中所有的数据都读取出来(包括历史数据)

[atguigu@hadoop102 kafka]$ kafka-console-consumer.sh --bootstrap-server 
hadoop102:9092 --from beginning --topic first

  命令行中使用的消费者,如果不写消费者组,系统会默认分配一个随机的消费者组。所以每次启动都无法消费之前的数据。添加--from-beginning 可以从头消费。

如果是同一个组的消费者 ,能实现断点续传的功能。添加--from-beginning 也还是断点续传的。

如果生产消费到一个不存在的主题,系统会自动帮你创建这个主题,分区和副本的个数都为1个。

 五、kafka生产者模式

1.生产者消息发送流程

(1)发送原理

在消息发送的过程中,涉及到了两个线程——main线程和Sender线程。在main线程中创建了一个双端队列RecordAccumulator。main线程将消息发送给RecordAccumulator,Sender线程不断从RecordAccumulator中拉取消息发送到Kafka Broker。

 从图中的流程可以看出,生产者和kafka集群之间还有一个RecordAccumulator队列默认大小是32M,topic分区的话,producer会对应有一个分区器,数据在进入中间队列前,已经被分区器进行了分区,sender()方法在发送数据时,就直接根据分区进行拉取了,拉取时有两个参数,也就是调优参数1.batch.size :也就是批大小,只有数据累计到batch.size后,sender才会发送数据,默认16k ;2.linger.ms :也就是等待时间,如果数据未达到batch.size,sender等待linger.ms设置的时间就会发送数据,单位ms,默认值就是0ms,就是有了一条数据直接发(默认为0是因为kafka要接实时数仓,所以设置为0)

 (2)生产者重要参数列表

参数名称

描述

bootstrap.servers

生产者连接集群所需的broker地址清单。例如hadoop102:9092,hadoop103:9092,hadoop104:9092,可以设置1个或者多个,中间用逗号隔开。注意这里并非需要所有的broker地址,因为生产者从给定的broker里查找到其他broker信息。

key.serializer和value.serializer

指定发送消息的key和value的序列化类型。一定要写全类名。

buffer.memory

RecordAccumulator缓冲区总大小,默认32m

batch.size

缓冲区一批数据最大值,默认16k。适当增加该值,可以提高吞吐量,但是如果该值设置太大,会导致数据传输延迟增加。

linger.ms

如果数据迟迟未达到batch.size,sender等待linger.time之后就会发送数据。单位ms,默认值是0ms,表示没有延迟。生产环境建议该值大小为5-100ms之间。

acks

0:生产者发送过来的数据,不需要等数据落盘应答。

1:生产者发送过来的数据,Leader收到数据后应答。

-1(all):生产者发送过来的数据,Leader+和isr队列里面的所有节点收齐数据后应答。默认值是-1-1all是等价的。

max.in.flight.requests.per.connection

允许最多没有返回ack的次数,默认为5,开启幂等性要保证该值是 1-5的数字。

retries

当消息发送出现错误的时候,系统会重发消息。retries表示重试次数。默认是int最大值,2147483647

如果设置了重试,还想保证消息的有序性,需要设置

MAX_IN_FLIGHT_REQUESTS_PER_CONNECTION=1否则在重试此失败消息的时候,其他的消息可能发送成功了。

retry.backoff.ms

两次重试之间的时间间隔,默认是100ms。

enable.idempotence

是否开启幂等性,默认true,开启幂等性。

compression.type

生产者发送的所有数据的压缩方式。默认是none,也就是不压缩。

支持压缩类型:nonegzipsnappylz4zstd

2、异步发送API

生产者代码中有3必须,IP即连接地址、key和value的序列化器

(1)普通异步发送流程

创建maven项目

导入依赖

<dependencies>
     <dependency>
          <groupId>org.apache.kafka</groupId>
          <artifactId>kafka-clients</artifactId>
          <version>3.0.0</version>
     </dependency>
</dependencies>

代码编写

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducer {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

(2)带回调函数的异步发送

回调函数是实现应答机制的函数

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallBack {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        // 序列化器的serialization是一个接口,找到他的实现类
        // 我们一般都是使用String
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
                    new Callback() {
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) {
                           //(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法
                           //(2)消息发送失败  exception != null  也会调用该方法
                           if (exception == null) {
                               System.out.println(metadata);//使用打印演示
                           }else{
                               exception.printStackTrace();//打印异常信息
                           }
                       }
                    });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

3.同步发送API

package com.atguigu.kafka.producer;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerSync {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // 默认为异步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i));
            // 末尾加get为同步发送
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i)).get();
        }

        // 5. 关闭资源
        kafkaProducer.close();
    }
}

4、生产者分区

(1)kafka分区的好处

因为不同的分区分布在不同的节点上,所以便于合理使用资源,实现负载均衡

并且在不同节点上可以提高并行度

 (2)生产者发送消息的分区策略

    1>指定发送到哪一个分区  直接使用对应的分区号   不会走分区器
    2> 不写分区号  需要走分区器  有key     按照key进行hash之后取模分区个数
    3>不写分区号  需要走分区器  没有key   粘性分区缓存机制
              一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区
             如果是异步发送,数据发送的比较快   10条数据被当作一批  每一次都是一个分区
             如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批

import org.apache.kafka.clients.producer.*;
import java.util.Properties;
import java.util.concurrent.ExecutionException;

public class CustomProducerCallBackPartition {
    public static void main(String[] args) throws ExecutionException, InterruptedException {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            // (1)指定发送到哪一个分区  直接使用对应的分区号   不会走分区器
            // (2) 不写分区号  需要走分区器  有key     按照key进行hash之后取模分区个数
            // (3) 不写分区号  需要走分区器  没有key   粘性分区缓存机制
            //  一批数据发送到随机的一个分区中,下一批数据发送到另外一个分区
            // 如果是异步发送,数据发送的比较快   10条数据被当作一批  每一次都是一个分区
            // 如果是同步发送,发一条数据歇一会,导致每一条数据都是不同批
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
                    new Callback() {
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) {
                           //(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法
                           //(2)消息发送失败  exception != null  也会调用该方法
                           if (exception == null) {
                               System.out.println(metadata);
                           }else{
                               exception.printStackTrace();
                           }
                       }
                    }).get();
        }
        //Thread.sleep(20);
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

(3)自定义分区器

根据业务需求,可以自定义分区器

假设需求:发送过来的数据中如果包含atguigu,就发往0号分区,不包含atguigu,就发往1号分区

mport org.apache.kafka.clients.producer.Partitioner;
import org.apache.kafka.common.Cluster;

import java.util.Map;

// (1)实现分区器的接口
public class CustomPartitioner implements Partitioner {
/*
传参
topic:主题   key:key值   keyBytes:key序列化之后     value:value值   
valueBytes:value序列化之后      cluster:集群信息    return的是分区号
*/
    @Override
    public int partition(String topic, Object key, byte[] keyBytes, Object value, byte[] valueBytes, Cluster cluster) {
        //
        String log = value.toString();
        if (log.contains("atguigu")) {
            return 0;
        }
        return 1;
    }

    @Override
    public void close() {

    }

    @Override
    public void configure(Map<String, ?> configs) {

    }
}

然后,调用

import org.apache.kafka.clients.producer.*;

import java.util.Properties;

public class CustomProducerCallBack {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        // 添加定义的分区器,需要自定义分区的全类名
        properties.put(ProducerConfig.PARTITIONER_CLASS_CONFIG, "com.atguigu.kafka.partitioner.CustomPartitioner");

        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu" + i),
                    new Callback() {
                       @Override
                       public void onCompletion(RecordMetadata metadata, Exception exception) {
                           //(1)消息发送成功  exception == null  接受到服务端ack消息   调用该方法
                           //(2)消息发送失败  exception != null  也会调用该方法
                           if (exception == null) {
                               System.out.println(metadata);
                           }else{
                               exception.printStackTrace();
                           }
                       }
                    });
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

5、生产经验

(1)生产者如和提高吞吐量

提高吞吐量,就是提高批次传输大小,还有就是效率问题

mport org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerParameters {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();

        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");

        //调优参数,还是需要根据业务需求来调整
        //batch.size 批次大小,默认是16k,将批次大小增大,进而提高吞吐量
        properties.put(ProducerConfig.BATCH_SIZE_CONFIG,32768);
        //linger.ms 等待时长,默认是0ms,增加等待时长
        properties.put(ProducerConfig.LINGER_MS_CONFIG, 5);
        //双端队列大小,默认是32M,可以提高到64M
        properties.put(ProducerConfig.BUFFER_MEMORY_CONFIG,67108864);
        //调整压缩格式,默认没有压缩
        properties.put(ProducerConfig.COMPRESSION_TYPE_CONFIG, "snappy");

        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);

        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

 (2)数据可靠性

数据可靠性基于ack应答机制

数据完全可靠的条件:

Acks级别设置为-1,分区副本大于等于2,ISR应答的最小副本数大于等于2

副本介绍

(1)Kafka副本作用:提高数据可靠性。

(2)Kafka默认副本1个,生产环境一般配置为2个,保证数据可靠性;太多副本会增加磁盘存储空间,增加网络上数据传输,降低效率。

(3)Kafka中副本分为:Leader和Follower。Kafka生产者只会把数据发往Leader,然后Follower找Leader进行同步数据。

(4)Kafka分区中的所有副本统称为AR(Assigned Repllicas)。

 AR = ISR + OSR

ISR,表示和Leader保持同步的Follower集合。如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值由replica.lag.time.max.ms参数设定,默认30s。Leader发生故障之后,就会从ISR中选举新的Leader。

        OSR表示Follower与Leader副本同步时,延迟过多的副本。

 可靠性总结:

        acks=0,生产者数据发来,kafka集群内存接受到数据就返回ack

        acks=1,生产者数据发来,kafka集群中的leader落盘数据后返回ack

        acks=-1,生产者数据发来,kafka集群中的所有副本落盘数据后返回ack

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;

import java.util.Properties;

public class CustomProducerAcks {
    public static void main(String[] args) {
        // 1. 创建kafka生产者的配置对象
        Properties properties = new Properties();
        // 2. 给kafka配置对象添加配置信息:bootstrap.servers
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // key,value序列化(必须):
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.StringSerializer");
        //设置应答机制acks,可以去3个值,0、1、all(相当与ask = -1)
        properties.put(ProducerConfig.ACKS_CONFIG, "all");
        //重试次数retries ,默认是int最大值,2147483647
        properties.put(ProducerConfig.RETRIES_CONFIG, 3);
        // 3. 创建kafka生产者对象
        KafkaProducer<String,String> kafkaProducer = new KafkaProducer<String,String>(properties);
        // 4. 调用send方法,发送消息
        for (int i = 0; i < 10; i++) {
            kafkaProducer.send(new ProducerRecord<>("first1", "atguigu"));
        }
        // 5. 关闭资源
        kafkaProducer.close();
    }
}

副本故障处理

 (3)数据去重

1>数据传递语义

2>幂等性

开启参数enable.idempotence 默认为true,false关闭。

3>生产者事务

0.11版本的Kafka同时引入了事务的特性,为了实现跨分区跨会话的事务,需要引入一个全局唯一的Transaction ID,并将Producer获得的PID和Transaction ID绑定。这样当Producer重启后就可以通过正在进行的Transaction ID获得原来的PID。

为了管理Transaction,Kafka引入了一个新的组件Transaction Coordinator。Producer就是通过和Transaction Coordinator交互获得Transaction ID对应的任务状态。Transaction Coordinator还负责将事务所有写入Kafka的一个内部Topic,这样即使整个服务重启,由于事务状态得到保存,进行中的事务状态可以得到恢复,从而继续进行。

就是引入一个全局唯一且一致的id,然后将id和pid绑定,从而使producer重启后,kafka集群依然可以通过id获得原来的pid

注意:提前开启幂等性!!!

 (4)数据有序

分区内有序,分区间无序 

(5)数据乱序

六、kafka broker 

1、kafka broker 工作流程

(1)zookeeper存储kafka的信息

启动zookeeper客户端

[atguigu@hadoop102 zookeeper-3.5.7]$ bin/zkCli.sh

 通过命令查看kafka相关信息

[zk: localhost:2181(CONNECTED) 2] ls /kafka

(2)kafka broker 的总体工作流程

(1)查看/kafka/brokers/ids路径上的节点。

[zk: localhost:2181(CONNECTED) 2] ls /kafka/brokers/ids

[0, 1, 2]

        (2)查看/kafka/controller路径上的数据。

[zk: localhost:2181(CONNECTED) 15] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

        (3)查看/kafka/brokers/topics/first/partitions/0/state路径上的数据。

[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1,2]}

(4)停止hadoop104上的kafka。

[atguigu@hadoop104 kafka]$ bin/kafka-server-stop.sh

(5)再次查看/kafka/brokers/ids路径上的节点。

[zk: localhost:2181(CONNECTED) 3] ls /kafka/brokers/ids

[0, 1]

        (6)再次查看/kafka/controller路径上的数据。

[zk: localhost:2181(CONNECTED) 15] get /kafka/controller

{"version":1,"brokerid":0,"timestamp":"1637292471777"}

        (7)再次查看/kafka/brokers/topics/first/partitions/0/state路径上的数据。

[zk: localhost:2181(CONNECTED) 16] get /kafka/brokers/topics/first/partitions/0/state

{"controller_epoch":24,"leader":0,"version":1,"leader_epoch":18,"isr":[0,1]}

(8)启动hadoop104上的kafka。

[atguigu@hadoop104 kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties

(9)再次观察(1)、(2)、(3)步骤中的内容。

 (3)broker的重要参数

参数名称

描述

replica.lag.time.max.ms

ISR中,如果Follower长时间未向Leader发送通信请求或同步数据,则该Follower将被踢出ISR。该时间阈值,默认30s

auto.leader.rebalance.enable

默认是true。 自动Leader Partition 平衡。

leader.imbalance.per.broker.percentage

默认是10%。每个broker允许的不平衡的leader的比率。如果每个broker超过了这个值,控制器会触发leader的平衡。

leader.imbalance.check.interval.seconds

默认值300。检查leader负载是否平衡的间隔时间。

log.segment.bytes

Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G

log.index.interval.bytes

默认4kbkafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。

log.retention.hours

Kafka中数据保存的时间,默认7天。

log.retention.minutes

Kafka中数据保存的时间,分钟级别,默认关闭。

log.retention.ms

Kafka中数据保存的时间,毫秒级别,默认关闭。

log.retention.check.interval.ms

检查数据是否保存超时的间隔,默认是5分钟

log.retention.bytes

默认等于-1,表示无穷大。超过设置的所有日志总大小,删除最早的segment。

log.cleanup.policy

默认是delete,表示所有数据启用删除策略;

如果设置值为compact,表示所有数据启用压缩策略。

num.io.threads

默认是8负责写磁盘的线程数。整个参数值要占总核数的50%

num.replica.fetchers

副本拉取线程数,这个参数占总核数的50%1/3

num.network.threads

默认是3数据传输线程数,这个参数占总核数的50%2/3

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

2、文件存储

(1)文件存储机制

 查看这些文件是一些乱码信息,因为生产者上传时是有序列化,只有通过相同的反序列化后可以看到不乱码的,现在在kafka使用工具来查看,如下

[atguigu@hadoop102 first1-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.index
Dumping ./00000000000000000000.index
offset: 0 position: 0
Mismatches in :/opt/module/kafka/datas/first1-0/./00000000000000000000.index
  Index offset: 0, log offset: 9

[atguigu@hadoop102 first1-0]$ kafka-run-class.sh kafka.tools.DumpLogSegments --files ./00000000000000000000.log
Dumping ./00000000000000000000.log
Starting offset: 0
baseOffset: 0 lastOffset: 9 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 0 CreateTime: 1655729494741 size: 201 magic: 2 compresscodec: none crc: 925683915 isvalid: true
baseOffset: 10 lastOffset: 19 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 201 CreateTime: 1655729580577 size: 201 magic: 2 compresscodec: none crc: 2811645235 isvalid: true
baseOffset: 20 lastOffset: 29 count: 10 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 402 CreateTime: 1655732822744 size: 211 magic: 2 compresscodec: none crc: 2145363516 isvalid: true
baseOffset: 30 lastOffset: 31 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 613 CreateTime: 1655736655333 size: 89 magic: 2 compresscodec: none crc: 2438774302 isvalid: true
baseOffset: 32 lastOffset: 33 count: 2 baseSequence: -1 lastSequence: -1 producerId: -1 producerEpoch: -1 partitionLeaderEpoch: 0 isTransactional: false isControl: false position: 702 CreateTime: 1655736655349 size: 89 magic: 2 compresscodec: none crc: 3832231783 isvalid: true

 

 说明:日志存储参数配置

参数

描述

log.segment.bytes

Kafka中log日志是分成一块块存储的,此配置是指log日志划分 成块的大小,默认值1G

log.index.interval.bytes

默认4kbkafka里面每当写入了4kb大小的日志(.log),然后就往index文件里面记录一个索引。 稀疏索引。

(2)文件清洗策略

Kafka中默认的日志保存时间为7,可以通过调整如下参数修改保存时间。

  1. log.retention.hours,最低优先级小时,默认7天。
  2. log.retention.minutes,分钟。
  3. log.retention.ms,最高优先级毫秒。
  4. log.retention.check.interval.ms,负责设置检查周期,默认5分钟。

那么日志一旦超过了设置的时间,怎么处理呢?

Kafka中提供的日志清理策略有deletecompact两种。

1)delete日志删除:将过期数据删除

  1. log.cleanup.policy = delete    所有数据启用删除策略

(1)基于时间:默认打开segment中所有记录中的最大时间戳作为该文件时间戳。

(2)基于大小:默认关闭。超过设置的所有日志总大小,删除最早的segment。

log.retention.bytes,默认等于-1,表示无穷大。

思考:如果一个segment中有一部分数据过期,一部分没有过期,怎么处理?

答案是,继续保留

 2compact日志压缩

3、高效读写数据

1Kafka本身是分布式集群,可以采用分区技术,并行度高

2)读数据采用稀疏索引,可以快速定位要消费的数据

3)顺序写磁盘

Kafka的producer生产数据,要写入到log文件中,写的过程是一直追加到文件末端,为顺序写。官网有数据表明,同样的磁盘,顺序写能到600M/s,而随机写只有100K/s。这与磁盘的机械机构有关,顺序写之所以快,是因为其省去了大量磁头寻址的时间。

 

 4)页缓存 + 零拷贝技术

参数

描述

log.flush.interval.messages

强制页缓存刷写到磁盘的条数,默认是long的最大值,9223372036854775807。一般不建议修改,交给系统自己管理。

log.flush.interval.ms

每隔多久,刷数据到磁盘,默认是null。一般不建议修改,交给系统自己管理。

 七、kafka消费者

 

 1、kafka消费者工作流程

1.什么是消费者组?

所谓消费者组就是一组消费者的集合,为什么要引入组的概念,那是因为,采用一个消费者来消费一个topic,可能会出现数据堆积的问题,上游生产数据的速度超过消费者消费的的速度的话,就会导致数据堆积,所以我们采用一个消费者组来消费topic,众人拾材火焰高,其消费能力也是倍数递增

2.消费者组是如何消费一个主题数据的?

1>一个topic的一个partition只能被同一个消费组内的一个消费者消费,而不能拆给多个消费者,同组的消费者之间的数据是共享的,从而就是说消费者组内的消费者的数量比topic的分区数量多的话,多的消费者就是不工作的

2>消费者组之间的消费数据是互不干扰的,就是说每个消费者组消费的数据的都是完整的数据

3. 那么一个消费者组内的消费者越多消费能力越强的吗?

>   那么一定不是的,一个消费者只能消费一个topic的一个分区的数据,所以并不是越多越好的

>   如果要加强消费者组的消费能力,除了增加消费者数量,分区数量增加,只有这样并行度上去了才能提高消费力

>  但是为了提高消费组的消费能力,随意增加分区和消费者也是不可行的

一般来说,建议分区数和消费者数量保持一致是最好的,当消费组的消费能力不足时,是可以通过增加分区数量来提高并行度,但是尽量避免这样情况发生,因为,增加一个topic的分区数量这个时候,kafka会进行分区再均衡,在这个期间topic是不可用的,而且一个topic可能有多个消费者组在消费他的数据,增加分区数量会影响到每一个消费者组的,所以再创建topic的时候一定要考虑好分区数

 1.确定去协调器coordinator:每当我们创建一个消费者组的时候,kafka会分配一个broker作为该消费组的一个coordinator,coordinator节点的选择:groupid的hash值 % __consumer_offsets的分区数量,这个是系统给的

2.注册消费者,并选出leader consumer,当有了coordinate,消费者将会开始往该coordinate上进行注册,第一个注册的消费者将成为消费组的leader,后续的作为follower

3.选出leader后,leader将会从coordinate获取分区信息,并会根据分区策略给每个consumer分配分区形成一个消费策略,并将消费策略汇报给coordinate

4.coordinate将每一个consumer对应的分区下发给每一个consumer,对所有的follower而言,只知道自己的分区,不知道别人的,但是leader知道所有人的分区

5.当发生分区再均衡的时候,leader将会重复分配过程

 消费者组:

 消费者重要参数

参数名称

描述

bootstrap.servers

向Kafka集群建立初始连接用到的host/port列表。

key.deserializer和value.deserializer

指定接收消息的key和value的反序列化类型。一定要写全类名。

group.id

标记消费者所属的消费者组。

enable.auto.commit

默认值为true,消费者会自动周期性地向服务器提交偏移量。

auto.commit.interval.ms

如果设置了 enable.auto.commit 的值为true, 则该值定义了消费者偏移量向Kafka提交的频率,默认5s

auto.offset.reset

当Kafka中没有初始偏移量或当前偏移量在服务器中不存在(如,数据被删除了),该如何处理? earliest:自动重置偏移量到最早的偏移量。 latest:默认,自动重置偏移量为最新的偏移量。 none:如果消费组原来的(previous)偏移量不存在,则向消费者抛异常。 anything:向消费者抛异常。

offsets.topic.num.partitions

__consumer_offsets的分区数,默认是50个分区。

heartbeat.interval.ms

Kafka消费者和coordinator之间的心跳时间,默认3s

该条目的值必须小于 session.timeout.ms ,也不应该高于 session.timeout.ms 的1/3。

session.timeout.ms

Kafka消费者和coordinator之间连接超时时间,默认45s。超过该值,该消费者被移除,消费者组执行再平衡。

max.poll.interval.ms

消费者处理消息的最大时长,默认是5分钟。超过该值,该消费者被移除,消费者组执行再平衡。

fetch.min.bytes

默认1个字节。消费者获取服务器端一批消息最小的字节数。

fetch.max.wait.ms

默认500ms。如果没有从服务器端获取到一批数据的最小字节数。该时间到,仍然会返回数据。

fetch.max.bytes

默认Default:  5242880050 m)。消费者获取服务器端一批消息最大的字节数。如果服务器端一批次的数据大于该值(50m)仍然可以拉取回来这批数据,因此,这不是一个绝对最大值。一批次的大小受message.max.bytes (broker config)or max.message.bytes (topic config)影响。

max.poll.records

一次poll拉取数据返回消息的最大条数,默认是500条。

 2、消费者API

import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.serialization.StringDeserializer;

import java.time.Duration;
import java.util.ArrayList;
import java.util.Properties;

public class CustomConsumer {
    public static void main(String[] args) {
        // 1. 创建消费者配置对象
        Properties properties = new Properties();
        // 2. 给消费者配置对象添加参数(不同于生产者,消费者有 4个必要的配置参数)
        //  broker的ip地址
        properties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "hadoop102:9092");
        // 配置  反序列化
        properties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        properties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
        //配置消费者组(组名必须)
        properties.put(ConsumerConfig.GROUP_ID_CONFIG, "group1");
        // 3. 创建消费者对象
        KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
        // 注册消费主题
        ArrayList<String> topics = new ArrayList<>();
        topics.add("first");
        consumer.subscribe(topics);
        // 4.调用方法消费数据
        // 如果kafka集群没有新数据会造成空转
        // 填写参数为时间,如果没有拉取数据,线程睡眠一会
        while (true) {
            // 设置1s中消费的一批数据
            // Duration.ofSeconds(1)不会导致空转,拉取不到的时候睡眠1s
            ConsumerRecords<String, String> consumerRecords = consumer.poll(Duration.ofSeconds(1));
            // 打印消费数据
            for (ConsumerRecord<String, String> consumerRecord : consumerRecords) {
                System.out.println(consumerRecord.topic() + "-" + consumerRecord.partition() + "-" + consumerRecord.offset());
            }
        }
        //5.关闭资源
//        consumer.close();不使用的原因是,已关闭进程,就不会再消费数据了,进程停止就以为着JVM为断电了,不再工作
    }
}

Logo

旨在为数千万中国开发者提供一个无缝且高效的云端环境,以支持学习、使用和贡献开源项目。

更多推荐