背景:我使用docker-compose 搭建的kafka服务
kafka的简单介绍以及docker-compose部署单主机Kafka集群

Kafka API简单介绍

kafka除了用于管理和管理任务的命令行工具,Kafka还有5个用于Java和Scala的核心API

他们分别是

  • The Admin API : 用于管理和inspect topics, brokers和其他 Kafka 对象
  • The Producer API: 将事件流发布(写入)到一个或多个 Kafka topics
  • The Consumer API: 订阅(读取)一个或多个topics并处理它们生成的事件流
  • The Kafka Streams API: 用于实现流处理应用程序和微服务,它提供了更高级别的方法来处理事件流,包括转换、聚合和连接等有状态操作、窗口化、基于事件时间的处理等等。从一个或多个topics读取输入以生成一个或多个topics的输出,有效地将输入流转换为输出流。
  • The Kafka Connect API:用于构建和运行可重用 的数据导入/导出connectors,这些connectors从外部系统和应用程序消费(读取)或产生(写入)事件流,以便它们可以与 Kafka 集成。例如,与 PostgreSQL 等关系数据库的连接器可能会捕获表的每次更改。但是,在实践中,您通常不需要实现自己的connectors,因为 Kafka 社区已经提供了数百个即用型connectors。

我使用的wurstmeister/kafka镜像的kafka是2.8.1版本的,通过docker inspect命令可以查看
在这里插入图片描述

新建项目kafkademo用以测试

我使用IDEA 的Spring Initializr创建项目,引入web依赖。
我使用的spring boot 版本是2.6.3,kafka-clients 是2.8.1,和kafka server是对应的

pom.xml文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.6.3</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>
    <groupId>com.xt</groupId>
    <artifactId>kafkademo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>kafkademo</name>
    <description>kafkademo</description>
    <properties>
        <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>

        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.8.1</version>
        </dependency>
    </dependencies>
    
    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>

</project>

创建连接客户端

首先设置一个连接kafka的客户端类AdminClient,他是一个抽象类,只有一个KafkaAdminClient实现,如下代码所示,他提供了两种创建方式

//Admin的默认实现。 通过调用AdminClient中的create()方法之一来创建此类的实例。 用户不应直接引用此类。
//这个类是线程安全的。
public abstract class AdminClient implements Admin {

    /**
     * Create a new Admin with the given configuration.
     *
     * @param props The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Properties props) {
        return (AdminClient) Admin.create(props);
    }

    /**
     * Create a new Admin with the given configuration.
     *
     * @param conf The configuration.
     * @return The new KafkaAdminClient.
     */
    public static AdminClient create(Map<String, Object> conf) {
        return (AdminClient) Admin.create(conf);
    }
}

通过使用Properties来创建连接kafka server的客户端

//设置AdminClient
public static AdminClient adminClient(){
    //Properties类表示一组持久的属性。 Properties可以保存到流中或从流中加载。属性列表中的每个键及其对应的值都是一个字符串。继承自Hashtable
    Properties properties = new Properties();
    //AdminClientConfig是AdminClient配置类,它还包含用于配置条目名称的常量。
    properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka服务器IP:9092");
    properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
    properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
    //内置管理客户端的基类 ,是一个抽象类,下面有一个实现 为KafkaAdminClient,此类是保证了线程安全
    AdminClient adminClient = AdminClient.create(properties);
    return adminClient;
}

创建topic

 //创建Topic实例
public static void createTopic() {

    AdminClient adminClient = adminClient();
    // 副本因子
    Short rs = 1;
    //创建具有指定副本因子和分区数的新topic。
    NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
    //创建一批新主题。
    //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
    //CreateTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已创建。在此期间, listTopics()和describeTopics(Collection)可能不会返回有关新主题的信息。
    CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
    System.out.println("创建topic成功 : "+ topics.toString());
    System.out.println("---------------------------------------------------------------");

}

获取Topic列表

//获取Topic列表
public static void topicLists() throws Exception {
    AdminClient adminClient = adminClient();
    // 是否查看internal选项
    ListTopicsOptions options = new ListTopicsOptions();
    //设置我们是否应该列出内部topic。
    options.listInternal(true);

    //列出集群中可用的topic。
    ListTopicsResult listTopicsResult = adminClient.listTopics(options);
    //返回一个topic名称集合的future(这里是KafkaFuture)
    Set<String> names = listTopicsResult.names().get();
    //返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
    Collection<TopicListing> topicListings = listTopicsResult.listings().get();
    //返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
    KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
    // 打印names
    names.stream().forEach(System.out::println);
    System.out.println("---------------------------topic列表-------------------------");
    // 打印topicListings
    topicListings.stream().forEach((topicList)->{
        System.out.println(topicList);
    });
    System.out.println("---------------------------topic列表-------------------------");
}

获取topic的描述信息

// 获取描述topic的信息
public static void describeTopics() throws Exception {
    AdminClient adminClient = adminClient();
    //描述集群中的一些topic。
    DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

    Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();

    Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
    System.out.println("----------------------------topic信息-----------------------------");
    entries.stream().forEach((entry)->{
        System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
    });
    System.out.println("----------------------------topic信息-----------------------------");
}

修改配置信息

//修改Config信息
public static void alterConfig() throws Exception{
    AdminClient adminClient = adminClient();

    Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
    //具有配置的资源的类,需要提供type和名称  Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //包含名称、值和操作类型的更改配置条目的类  ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
    AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
    configMaps.put(configResource,Arrays.asList(alterConfigOp));
    //逐步更新指定资源的配置
    AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
    alterConfigsResult.all().get();
}

获取配置的描述信息

//获取描述配置的信息
public static void describeConfig() throws Exception{
    AdminClient adminClient = adminClient();

    ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
    //获取指定资源的配置
    DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
    Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
    System.out.println("----------------------------配置信息-----------------------------");
    configResourceConfigMap.entrySet().stream().forEach((entry)->{
        System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
    });
    System.out.println("----------------------------配置信息-----------------------------");
}

增加topic的partition数量

topic和partition的概念如果不是很清楚的话,可以去这篇博客的指定章节查看
kafka的简单介绍以及docker-compose部署单主机Kafka集群

//增加partition数量
public static void incrPartitions(int partitions) throws Exception{
   AdminClient adminClient = adminClient();
   Map<String, NewPartitions> partitionsMap = new HashMap<>();

   NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
   partitionsMap.put(TOPIC_NAME, newPartitions);
   CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
   createPartitionsResult.all().get();
}

删除Topic

//删除Topic
public static void delTopics() throws Exception {
   AdminClient adminClient = adminClient();
   //删除一批topic。
   //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
   //DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间, listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
   DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
   deleteTopicsResult.all().get();
}

完整代码

public class AdminSample {

    public final static String TOPIC_NAME="xt";

    //设置AdminClient
    public static AdminClient adminClient(){
        //Properties类表示一组持久的属性。 Properties可以保存到流中或从流中加载。属性列表中的每个键及其对应的值都是一个字符串。继承自Hashtable
        Properties properties = new Properties();
        //AdminClientConfig是AdminClient配置类,它还包含用于配置条目名称的常量。
        properties.setProperty(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG,"kafka IP:9092");
        properties.put(AdminClientConfig.CONNECTIONS_MAX_IDLE_MS_CONFIG, 10000);
        properties.put(AdminClientConfig.REQUEST_TIMEOUT_MS_CONFIG, 5000);
        //内置管理客户端的基类 ,是一个抽象类,下面有一个实现 为KafkaAdminClient,此类是保证了线程安全
        AdminClient adminClient = AdminClient.create(properties);
        return adminClient;
    }


    public static void main(String[] args) throws Exception {
        AdminClient adminClient = AdminSample.adminClient();

        System.out.println("adminClient : "+ adminClient);
        // 创建Topic实例
        createTopic();
        // 获取Topic列表
        topicLists();
        // 描述Topic
        describeTopics();
        // 修改Config
        alterConfig();
        // 查询Config
        describeConfig();
        // 增加partition数量
        incrPartitions(2);
        // 删除Topic实例
        delTopics();
    }


     //创建Topic实例
    public static void createTopic() {

        AdminClient adminClient = adminClient();
        // 副本因子
        Short rs = 1;
        //创建具有指定副本因子和分区数的新topic。
        NewTopic newTopic = new NewTopic(TOPIC_NAME, 1 , rs);
        //创建一批新主题。
        //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
        //CreateTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已创建。在此期间, listTopics()和describeTopics(Collection)可能不会返回有关新主题的信息。
        CreateTopicsResult topics = adminClient.createTopics(Arrays.asList(newTopic));
        System.out.println("创建topic成功 : "+ topics.toString());
        System.out.println("---------------------------------------------------------------");

    }


    //获取Topic列表
    public static void topicLists() throws Exception {
        AdminClient adminClient = adminClient();
        // 是否查看internal选项
        ListTopicsOptions options = new ListTopicsOptions();
        //设置我们是否应该列出内部topic。
        options.listInternal(true);

        //列出集群中可用的topic。
        ListTopicsResult listTopicsResult = adminClient.listTopics(options);
        //返回一个topic名称集合的future(这里是KafkaFuture)
        Set<String> names = listTopicsResult.names().get();
        //返回一个KafkaFuture,它产生一个 TopicListing 对象的集合
        Collection<TopicListing> topicListings = listTopicsResult.listings().get();
        //返回一个KafkaFuture,它产生一个topic名称到 TopicListing 对象的映射。
        KafkaFuture<Map<String, TopicListing>> mapKafkaFuture = listTopicsResult.namesToListings();
        // 打印names
        names.stream().forEach(System.out::println);
        System.out.println("---------------------------topic列表-------------------------");
        // 打印topicListings
        topicListings.stream().forEach((topicList)->{
            System.out.println(topicList);
        });
        System.out.println("---------------------------topic列表-------------------------");
    }

    // 获取描述topic的信息
    public static void describeTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //描述集群中的一些topic。
        DescribeTopicsResult describeTopicsResult = adminClient.describeTopics(Arrays.asList(TOPIC_NAME));

        Map<String, TopicDescription> stringTopicDescriptionMap = describeTopicsResult.all().get();

        Set<Map.Entry<String, TopicDescription>> entries = stringTopicDescriptionMap.entrySet();
        System.out.println("----------------------------topic信息-----------------------------");
        entries.stream().forEach((entry)->{
            System.out.println("name :"+entry.getKey()+" , desc: "+ entry.getValue());
        });
        System.out.println("----------------------------topic信息-----------------------------");
    }

    //修改Config信息
    public static void alterConfig() throws Exception{
        AdminClient adminClient = adminClient();

        Map<ConfigResource,Collection<AlterConfigOp>> configMaps = new HashMap<>();
        //具有配置的资源的类,需要提供type和名称  Type是他内部维护的枚举类,共有四种类型:BROKER_LOGGER((byte) 8), BROKER((byte) 4), TOPIC((byte) 2), UNKNOWN((byte) 0)
        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //包含名称、值和操作类型的更改配置条目的类  ,需要注入ConfigEntry,和操作类型,同样OpType是个枚举类
        AlterConfigOp alterConfigOp = new AlterConfigOp(new ConfigEntry("preallocate","false"),AlterConfigOp.OpType.SET);
        configMaps.put(configResource,Arrays.asList(alterConfigOp));
        //逐步更新指定资源的配置
        AlterConfigsResult alterConfigsResult = adminClient.incrementalAlterConfigs(configMaps);
        alterConfigsResult.all().get();
    }

    //获取描述配置的信息
    public static void describeConfig() throws Exception{
        AdminClient adminClient = adminClient();

        ConfigResource configResource = new ConfigResource(ConfigResource.Type.TOPIC, TOPIC_NAME);
        //获取指定资源的配置
        DescribeConfigsResult describeConfigsResult = adminClient.describeConfigs(Arrays.asList(configResource));
        Map<ConfigResource, Config> configResourceConfigMap = describeConfigsResult.all().get();
        System.out.println("----------------------------配置信息-----------------------------");
        configResourceConfigMap.entrySet().stream().forEach((entry)->{
            System.out.println("configResource : "+entry.getKey()+" , Config : "+entry.getValue());
        });
        System.out.println("----------------------------配置信息-----------------------------");
    }


    //增加partition数量
    public static void incrPartitions(int partitions) throws Exception{
        AdminClient adminClient = adminClient();
        Map<String, NewPartitions> partitionsMap = new HashMap<>();

        NewPartitions newPartitions = NewPartitions.increaseTo(partitions);
        partitionsMap.put(TOPIC_NAME, newPartitions);
        CreatePartitionsResult createPartitionsResult = adminClient.createPartitions(partitionsMap);
        createPartitionsResult.all().get();
        
    }


    //删除Topic
    public static void delTopics() throws Exception {
        AdminClient adminClient = adminClient();
        //删除一批topic。
        //此操作不是事务性的,因此它可能对某些主题成功,而对另一些主题则失败。
        //DeleteTopicsResult返回成功后,所有代理可能需要几秒钟才能意识到主题已消失。 在此期间, listTopics()和describeTopics(Collection)可能会继续返回有关已删除主题的信息。
        DeleteTopicsResult deleteTopicsResult = adminClient.deleteTopics(Arrays.asList(TOPIC_NAME));
        deleteTopicsResult.all().get();
    }


}

References:

  • https://kafka.apache.org/intro
  • https://coding.imooc.com/class/434.html

(写博客主要是对自己学习的归纳整理,资料大部分来源于书籍、网络资料和自己的实践,整理不易,但是难免有不足之处,如有错误,请大家评论区批评指正。同时感谢广大博主和广大作者辛苦整理出来的资源和分享的知识。)

Logo

CSDN联合极客时间,共同打造面向开发者的精品内容学习社区,助力成长!

更多推荐