1.说明

        实验一下,在spring-boot中自行创建kafkaConsumer实例,进行消费,根据配置的不同,得出相应的结果。此例中,我使用 kafkaListenerContainerFactory自行创建了kafkaListener实例,试验了集中不同配置创建出来的实例,查看offset的提交结果。

 

2.代码

采用典型的maven结构,/resources/application.yml配置文件

server:
  port: 8998
  servlet:
    context-path: /

spring:
  application:
    name: my-kafka
  profiles:
    active: dev

mykafka:
  bootstrapServers:  localhost:9092
  groupId: qws
  #后台的心跳线程必须在30秒之内提交心跳,否则会reBalance
  sessionTimeOut: 30000
  autoOffsetReset: earliest
  #取消自动提交,即便如此 spring会帮助我们自动提交
  enableAutoCommit: false
  #自动提交间隔
  autoCommitInterval: 1000
  #拉取的最小字节
  fetchMinSize: 1
  #拉去最小字节的最大等待时间
  fetchMaxWait: 500
  maxPollRecords: 50
  #300秒的提交间隔,如果程序大于300秒提交,会报错
  maxPollInterval: 300000
  #心跳间隔
  heartbeatInterval: 10000
  keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
  valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer




logging:
  config: classpath:logback-spring.xml
  path: ../logs
  level:
   root: info




/resources/logback-spring.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<!-- 日志级别从低到高分为TRACE < DEBUG < INFO < WARN < ERROR < FATAL,如果设置为WARN,则低于WARN的信息都不会输出 -->
<!-- scan:当此属性设置为true时,配置文件如果发生改变,将会被重新加载,默认值为true -->
<!-- scanPeriod:设置监测配置文件是否有修改的时间间隔,如果没有给出时间单位,默认单位是毫秒。当scan为true时,此属性生效。默认的时间间隔为1分钟。 -->
<!-- debug:当此属性设置为true时,将打印出logback内部日志信息,实时查看logback运行状态。默认值为false。 -->
<configuration  scan="true" scanPeriod="60 seconds">
    
    <springProperty scope="context" name="logLevel" source="logging.level.root"/>
    <springProperty scope="context" name="logPath" source="logging.path"/>
    <springProperty scope="context" name="applicationName" source="spring.application.name"/>

    <contextName>logback</contextName>

    <!-- 彩色日志 -->
    <!-- 彩色日志依赖的渲染类 -->
    <conversionRule conversionWord="clr" converterClass="org.springframework.boot.logging.logback.ColorConverter" />
    <conversionRule conversionWord="wex" converterClass="org.springframework.boot.logging.logback.WhitespaceThrowableProxyConverter" />
    <conversionRule conversionWord="wEx" converterClass="org.springframework.boot.logging.logback.ExtendedWhitespaceThrowableProxyConverter" />
    <!-- 彩色日志格式 -->
    <property name="CONSOLE_LOG_PATTERN" value="${CONSOLE_LOG_PATTERN:-%clr(%d{yyyy-MM-dd HH:mm:ss.SSS}){faint} %clr(${LOG_LEVEL_PATTERN:-%5p}) %clr(${PID:- }){magenta} %clr(---){faint} %clr([%15.15t]){faint} %clr(%-40.40logger{39}){cyan} %clr(:){faint} %m%n${LOG_EXCEPTION_CONVERSION_WORD:-%wEx}}"/>


    <!--输出到控制台-->
    <appender name="CONSOLE" class="ch.qos.logback.core.ConsoleAppender">
        <!--此日志appender是为开发使用,只配置最底级别,控制台输出的日志级别是大于或等于此级别的日志信息-->
        <filter class="ch.qos.logback.classic.filter.ThresholdFilter">
            <level>info</level>
        </filter>
        <encoder>
            <Pattern>${CONSOLE_LOG_PATTERN}</Pattern>
            <!-- 设置字符集 -->
            <charset>UTF-8</charset>
        </encoder>
    </appender>


    <!--输出到文件-->

    <!-- 时间滚动输出 level为 DEBUG 日志 -->
    <appender name="DEBUG_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logPath}/${applicationName}-debug.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 日志归档 -->
            <fileNamePattern>${logPath}/debug/${applicationName}/log-debug-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录debug级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>debug</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 INFO 日志 -->
    <appender name="INFO_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logPath}/${applicationName}-info.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset>
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <!-- 每天日志归档路径以及格式 -->
            <fileNamePattern>${logPath}/info/${applicationName}/log-info-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录info级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>info</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!-- 时间滚动输出 level为 WARN 日志 -->
    <appender name="WARN_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logPath}/${applicationName}-warn.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${logPath}/warn/${applicationName}/log-warn-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录warn级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>warn</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>


    <!-- 时间滚动输出 level为 ERROR 日志 -->
    <appender name="ERROR_FILE" class="ch.qos.logback.core.rolling.RollingFileAppender">
        <!-- 正在记录的日志文件的路径及文件名 -->
        <file>${logPath}/${applicationName}-error.log</file>
        <!--日志文件输出格式-->
        <encoder>
            <pattern>%d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{50} - %msg%n</pattern>
            <charset>UTF-8</charset> <!-- 此处设置字符集 -->
        </encoder>
        <!-- 日志记录器的滚动策略,按日期,按大小记录 -->
        <rollingPolicy class="ch.qos.logback.core.rolling.TimeBasedRollingPolicy">
            <fileNamePattern>${logPath}/error/${applicationName}/log-error-%d{yyyy-MM-dd}.%i.log</fileNamePattern>
            <timeBasedFileNamingAndTriggeringPolicy class="ch.qos.logback.core.rolling.SizeAndTimeBasedFNATP">
                <maxFileSize>100MB</maxFileSize>
            </timeBasedFileNamingAndTriggeringPolicy>
            <!--日志文件保留天数-->
            <maxHistory>15</maxHistory>
        </rollingPolicy>
        <!-- 此日志文件只记录ERROR级别的 -->
        <filter class="ch.qos.logback.classic.filter.LevelFilter">
            <level>ERROR</level>
            <onMatch>ACCEPT</onMatch>
            <onMismatch>DENY</onMismatch>
        </filter>
    </appender>

    <!--
        <logger>用来设置某一个包或者具体的某一个类的日志打印级别、
        以及指定<appender>。<logger>仅有一个name属性,
        一个可选的level和一个可选的addtivity属性。
        name:用来指定受此logger约束的某一个包或者具体的某一个类。
        level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
              还有一个特俗值INHERITED或者同义词NULL,代表强制执行上级的级别。
              如果未设置此属性,那么当前logger将会继承上级的级别。
        addtivity:是否向上级logger传递打印信息。默认是true。
    -->
    <!--<logger name="org.springframework.web" level="info"/>-->
    <!--<logger name="org.springframework.scheduling.annotation.ScheduledAnnotationBeanPostProcessor" level="INFO"/>-->
    <!--
        使用mybatis的时候,sql语句是debug下才会打印,而这里我们只配置了info,所以想要查看sql语句的话,有以下两种操作:
        第一种把<root level="info">改成<root level="DEBUG">这样就会打印sql,不过这样日志那边会出现很多其他消息
        第二种就是单独给dao下目录配置debug模式,代码如下,这样配置sql语句会打印,其他还是正常info级别:
     -->

    <!--
        root节点是必选节点,用来指定最基础的日志输出级别,只有一个level属性
        level:用来设置打印级别,大小写无关:TRACE, DEBUG, INFO, WARN, ERROR, ALL 和 OFF,
        不能设置为INHERITED或者同义词NULL。默认是DEBUG
        可以包含零个或多个元素,标识这个appender将会添加到这个logger。
    -->

    <!--开发环境:打印控制台-->
    <!--<springProfile name="dev">-->
        <!--<logger name="com.nmys.view" level="debug"/>-->
    <!--</springProfile>-->

    <root level="debug">
        <appender-ref ref="CONSOLE" />
        <appender-ref ref="DEBUG_FILE" />
        <appender-ref ref="INFO_FILE" />
        <appender-ref ref="WARN_FILE" />
        <appender-ref ref="ERROR_FILE" />
    </root>

    <!--生产环境:输出到文件-->
    <!--<springProfile name="pro">-->
    <!--<root level="info">-->
    <!--<appender-ref ref="CONSOLE" />-->
    <!--<appender-ref ref="DEBUG_FILE" />-->
    <!--<appender-ref ref="INFO_FILE" />-->
    <!--<appender-ref ref="ERROR_FILE" />-->
    <!--<appender-ref ref="WARN_FILE" />-->
    <!--</root>-->
    <!--</springProfile>-->

</configuration>

pom文件:

其中 spring-boot/spring-cloud 的相关依赖的版本为 <version>2.1.5.RELEASE</version>

spring-kafka的依赖版本为 <spring-kafka.version>2.2.6.RELEASE</spring-kafka.version>
kafka-clients的依赖版本为 <kafka.version>2.0.1</kafka.version> 并且kafka服务也是2.0.1版本

其他版本自便,影响不大。

<dependencies>

        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-configuration-processor</artifactId>
            <optional>true</optional>
        </dependency>

        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <scope>provided</scope>
        </dependency>

        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.kafka</groupId>
            <artifactId>spring-kafka</artifactId>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
        </dependency>

    </dependencies>

    <build>
        <plugins>

            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <executions>
                    <execution>
                        <goals>
                            <goal>repackage</goal>
                        </goals>
                        <configuration>
                            <mainClass>cn.qws.server.MyKafkaTest</mainClass>
                        </configuration>
                    </execution>
                </executions>
            </plugin>
        </plugins>

    </build>
</project>

kafka监听的实际代码执行类:

package cn.qws.server.kafka.config.listener;

import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.annotation.TopicPartition;
import org.springframework.stereotype.Component;

import java.util.List;
import java.util.Optional;

/**
 * @author qiwenshuai
 * @note
 * @since 20-9-15 21:36 by jdk 1.8
 */
@Slf4j
@Component
public class MyListener {

    @KafkaListener(topics = "pkafka", containerFactory = "kafkaListenerContainerFactory")
    public void listenPartition0(List<ConsumerRecord<?, ?>> records) throws InterruptedException {
        log.info("Id0 Listener, Thread ID: " + Thread.currentThread().getId());
        log.info("Id0 records size " + records.size());
//        Thread.sleep(500000L);
        for (ConsumerRecord<?, ?> record : records) {
            Optional<?> kafkaMessage = Optional.ofNullable(record.value());
            log.info("Received: " + record);
            if (kafkaMessage.isPresent()) {
                Object message = record.value();
                log.info("p0 Received message={}", message);
            }
        }
    }

}

kafka消费者配置文件映射类

package cn.qws.server.kafka.config.properties;

import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.stereotype.Component;

/**
 * @author qiwenshuai
 * @note
 * @since 20-9-15 21:04 by jdk 1.8
 */
@Component
//指定配置文件的前缀
@ConfigurationProperties(prefix = "mykafka")
@Getter
@Setter
public class KafkaListenerProperties {

    private String groupId;

    private String sessionTimeOut;

    private String bootstrapServers;

    private String autoOffsetReset;

    private boolean enableAutoCommit;

    private String autoCommitInterval;

    private String fetchMinSize;

    private String fetchMaxWait;

    private String maxPollRecords;

    private String maxPollInterval;

    private String heartbeatInterval;

    private String keyDeserializer;

    private String valueDeserializer;


}

消费者实际配置类:

package cn.qws.server.kafka.config;

import cn.qws.server.kafka.config.properties.KafkaListenerProperties;
import com.alibaba.fastjson.JSONObject;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.config.ConcurrentKafkaListenerContainerFactory;
import org.springframework.kafka.config.KafkaListenerContainerFactory;
import org.springframework.kafka.core.ConsumerFactory;
import org.springframework.kafka.core.DefaultKafkaConsumerFactory;
import org.springframework.kafka.listener.ConcurrentMessageListenerContainer;

import java.util.HashMap;
import java.util.Map;

/**
 * @author qiwenshuai
 * @note 消费者配置
 * @since 20-9-15 20:52 by jdk 1.8
 */
@Configuration
@EnableConfigurationProperties(KafkaListenerProperties.class)
@Slf4j
public class KafkaConsumerConfig {

    @Autowired
    KafkaListenerProperties kafkaListenerProperties;


    @Bean
    KafkaListenerContainerFactory<ConcurrentMessageListenerContainer<String, String>> kafkaListenerContainerFactory() {
        ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        // 并发数 多个微服务实例会均分
        factory.setConcurrency(1);
        factory.setBatchListener(true);
        ContainerProperties containerProperties = factory.getContainerProperties();
        // 是否设置手动提交
        containerProperties.setAckMode(ContainerProperties.AckMode.MANUAL_IMMEDIATE);

        return factory;
    }

    private ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> consumerConfigs = consumerConfigs();
        log.info("消费者的配置信息:{}",JSONObject.toJSONString(consumerConfigs));
        return new DefaultKafkaConsumerFactory<>(consumerConfigs);
    }


    @Bean
    public Map<String, Object> consumerConfigs() {
        Map<String, Object> propsMap = new HashMap<>();
        // 服务器地址
        propsMap.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaListenerProperties.getBootstrapServers());
        // 是否自动提交
        propsMap.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, kafkaListenerProperties.isEnableAutoCommit());
        // 自动提交间隔
        propsMap.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getAutoCommitInterval());
        //会话时间
        propsMap.put(ConsumerConfig.SESSION_TIMEOUT_MS_CONFIG, kafkaListenerProperties.getSessionTimeOut());
        //key序列化
        propsMap.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getKeyDeserializer());
        //value序列化
        propsMap.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, kafkaListenerProperties.getValueDeserializer());
        // 心跳时间
        propsMap.put(ConsumerConfig.HEARTBEAT_INTERVAL_MS_CONFIG, kafkaListenerProperties.getHeartbeatInterval());

        // 分组id
        propsMap.put(ConsumerConfig.GROUP_ID_CONFIG, kafkaListenerProperties.getGroupId());
        //消费策略
        propsMap.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, kafkaListenerProperties.getAutoOffsetReset());
        // poll记录数
        propsMap.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, kafkaListenerProperties.getMaxPollRecords());
        //poll时间
        propsMap.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, kafkaListenerProperties.getMaxPollInterval());
        return propsMap;
    }

}

启动类:

一个简单的main方法, springApplication.run(xxx.class,args);

以上代码部分已经贴完。

测试

1.在配置 enable_auto_commit不同的情况下(false/true) ,

2.设置初始化是否设置手动提交的情况下,

3.设置消费实例入参有无 Acknowledgment 和Consumer的情况下

查看offset是否会提交。

创建topic的shell: 4分区单副本

bin/kafka-topics.sh --create --zookeeper localhost:2181 --topic pkafka--partitions 4 --replication-factor 1

生产消息的shell:

bin/kafka-console-producer.sh --broker-list localhost:9092 --topic pkafka

3.结论

设置enable-auto-commit为false的情况
是否设置enable-auto-commit是否设置手动提交入参有无consumer入参有无Acknowledgment代码里否提交了备注
falseyesyesnono没有提交offset
falseyesnoyesno没有提交offset
falseyesnonono没有提交offset
falsefalseyesnonospring自动提交
falsefalsenoyesno监听listener的容器报错,但是spring自动提交
falsefalsenononospring自动提交
设置enable-auto-commit为true的情况
是否设置enable-auto-commit是否设置手动提交入参有无consumer入参有无Acknowledgment代码里否提交了备注
yesnoyesnonospring自动提交
yesnonoyesno监听listener的容器报错,但是spring自动提交
yesnonononospring自动提交

设置自动提交以后,无法再设置手动提交,会报错,所以只能设置可以自动提交,并且不设置手动提交进行测试。

4.题外

消费者中关于rebalance影响的参数:

1.session.time.out 默认值是10000毫秒。

2.max.poll.interval 默认值是300000毫秒。

3.max.poll.records默认值是500条。

4.消费者个数 代码中为:

触发rebalance的情况:

1. 在session.time.out时间内,如果心跳线程(目前新版本kafka是另外单启的心跳线程,我用2.0.1)没有发心跳到kafka证明自己存活的话,会触发rebalance.

2.在max.poll.interval时间内,如果max.poll.records批量处理的条数没有处理完,即在5分钟内这500条没有处理完,kafka会认为客户端不行了,会触发rebalance,并且重复消费。(报错内容:

Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured max.poll.interval.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.)

3.消费者个数新增,比如我一个微服务中针对某topic配置了2个消费者并发数,并且kafka端配置的topic分区为4,那么在我启动单一微服务的时候,4个分区分配给1个服务的2个消费者,每个消费者消费2个分区的数据。当我再启动同样的微服务的时候,分区会平均分配给4个消费者。这种情况也会触发rebalance。

 

推荐消费者端自行保存kafka的offset,避免在遇到不确定数据,处理起来非常耗时的情况下一直重复消费。

 

 

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐