① Config 分布式配置中心

一.概述

1. 分布式系统面临的配置问题

微服务意味着要将单体应用中的业务拆分成一个个子服务,每个服务的粒度相对较小,因此系统中会出现大量的服务。由于每个服务都需要必要的配置信息才能运行,所以一套集中式的、动态的配置管理设施是必不可少的。

SpringCloud提供了ConfigServer【配置中心】来解决这个问题,我们每一个微服务自己都带着一个application.yml,上百个配置文件的管理就要管理上百个application.yml

2. 什么是配置中心

在这里插入图片描述

SpringCloud Config为微服务架构中的微服务提供集中化的外部配置支持,配置服务器为各个不同微服务应用的所有环境提供了一个中心化的外部配置

官网: https://cloud.spring.io/spring-cloud-static/spring-cloud-config/2.2.1.RELEASE/reference/html/

由于SpringCloud Config默认使用Git来存储配置文件(也有其它方式,比如支持SVN和本地文件),但最推荐的还是Git,而且使用的是http/https访问的形式。

3. 配置中心怎么用

SpringCloud Config分为服务端客户端两部分。

  • 服务端也称为分布式配置中心,它是一个独立的微服务应用,用来连接配置服务器并为客户端提供获取配置信息,加密/解密信息等访问接口。
  • 客户端则是通过指定的配置中心来管理应用资源,以及与业务相关的配置内容,并在启动的时候从配置中心获取和加载配置信息配置服务器,默认采用git来存储配置信息,这样就有助于对环境配置进行版本管理,并且可以通过git客户端工具来方便的管理和访问配置内容。

4. 配置中心能做什么

  • 集中管理配置文件
  • 不同环境不同配置,动态化的配置更新,分环境部署比如 dev/test/prod/beta/release
  • 运行期间动态调整配置,不再需要在每个服务部署的机器上编写配置文件,服务回想配置中心统一拉去配置自己的信息。
  • 当配置发生变动时,服务不需要重启即可感知到配置的变化并应用新的配置。
  • 将配置信息以Rest接口的形式暴露。

二.Config服务端配置与测试

1. 搭建

  1. 在github上创建一个springcloud_config的新仓库

  2. 获得新仓库的地址:
    https://gitee.com/jm1107/springcloud-config.git

  3. 本地硬盘目录上新建git仓库并clone
    在这里插入图片描述

  4. 创建文件

在D:\workspace\SpringCloud2022\springcloud-config创建自己的配置文件.

该文件可以从周阳老师的github仓库克隆一份,复制到自己的仓库 https://github.com/zzyybs/springcloud-config

  1. 新建Module模块
    模块名:cloud-config-center-3344,它即为Cloud的配置中心模块CloudConfig Center

  2. POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2022</artifactId>
        <groupId>com.jm.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-config-center-3344</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-config-server</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


</project>
  1. YML
server:
  port: 3344

spring:
  application:
    name:  cloud-config-center #注册进Eureka服务器的微服务名
  cloud:
    config:
      server:
        git:
          uri: https://gitee.com/jm1107/springcloud-config.git #Gitee上面的git仓库名字
          ####搜索目录
          search-paths:
            - springcloud-config
      ####读取分支
      label: master

#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka


  1. 主启动类
package com.jm.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.config.server.EnableConfigServer;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
@EnableConfigServer
public class ConfigCenterMain3344 {
     public static void main(String[] args) {
           SpringApplication.run(ConfigCenterMain3344.class, args);
     }
}
  1. 测试

在这里插入图片描述

2. 配置读取规则

  1. /{label}/{application}-{profile}.yml(最推荐使用这种方式)

1) 读取master分支

  • http://localhost:3344/master/config-dev.yml
  • http://localhost:3344/master/config-test.yml
  • http://localhost:3344/master/config-prod.yml

2)读取dev分支

  • http://localhost:3344/dev/config-dev.yml
  • http://localhost:3344/dev/config-test.yml
  • http://localhost:3344/dev/config-prod.yml
  1. /{application}-{profile}.yml
  • http://localhost:3344/config-dev.yml
  • http://localhost:3344/config-test.yml
  • http://localhost:3344/config-prod.yml

没有了分支/那一项,是因为在application.yml中配置了 label: master
所以会先去读取master的 (就算不配label 也会先去master寻找)

  1. /{application}/{profile}[/{label}]

这样读取的是 JSON串

  • http://localhost:3344/config/dev/master
  • http://localhost:3344/config/test/master
  • http://localhost:3344/config/prod/master
  1. 总结
  • label:分支(branch)
  • name:服务名
  • profiles:环境(dev/test/prod)

三.Config客户端配置和测试

1. 搭建

  1. 新建cloud-config-client-3355模块

  2. POM

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2022</artifactId>
        <groupId>com.jm.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-config-client-3355</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>
</project>
  1. bootstrap.yml

applicaiton.yml用户级的资源配置项
bootstrap.yml系统级的,优先级更加高

Spring Cloud会创建一个Bootstrap Context,作为Spring应用的Application Context的父上下文。

初始化的时候,BootstrapContext负责从外部源加载配置属性并解析配置。这两个上下文共享一个从外部获取的Environment。

Bootstrap属性有高优先级,默认情况下,它们不会被本地配置覆盖。Bootstrap context和Application Context有着不同的约定,所以新增了一个bootstrap.yml文件,保证Bootstrap Context和Application Context配置的分离。
要将Client模块下的application.yml文件改为bootstrap.yml,这是很关键的,因为bootstrap.yml是比application.yml先加载的。bootstrap.yml优先级高于application.yml。

server:
  port: 3355

spring:
  application:
    name: config-client
  cloud:
    # config 配置
    config:
      label: master #分支名称
      name: config # 配置文件名称
      profile: prod # 读取后缀名称 上述3个综合:master分支上config-dev.yml的配置文件被读取http://localhost:3344/master/config-dev.yml
      uri: http://localhost:3344 # 配置中心地址
eureka:
  client:
    register-with-eureka: true
    fetch-registry: true
    service-url:
      defaultZone: http://eureka7001.com:7001/eureka

在这里插入图片描述

  1. 主启动类
package com.jm.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3355 {
     public static void main(String[] args) {
           SpringApplication.run(ConfigClientMain3355.class, args);
     }
}
  1. 业务类

@Value("${config.info}')是从git上的配置文件中获取到的

@RestController
@RefreshScope
public class ConfigClientController {
    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String getConfigInfo()
    {
        return configInfo;
    }
}
  1. 测试

启动Config配置中心3344微服务并自测

启动3355作为Client准备访问

四.动态刷新问题

1. 问题:分布式的动态刷新问题

  • 修改Gitee上的配置文件内容做调整
  • 刷新3344,发现ConfigServer配置中心立刻响应
  • 刷新3355,发现ConfigClient客户端没有任何响应
  • 3355没有变化除非自己重启或者重新加载
  • 难到每次运维修改配置文件,客户端都需要重启?

2. Config动态手动刷新

避免每次更新配置都要重启客户端微服务3355

  1. 修改3355客户端模块

  2. POM引入actuator

<!--图形化监控依赖-->
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-actuator</artifactId>
</dependency>
  1. 改YML
# 暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"
  1. controller层添加@RefreshScope注解
@RestController
@RefreshScope//<------------
public class ConfigClientController {
    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String getConfigInfo()
    {
        return configInfo;
    }
}
  1. 测试
    此时修改gitee配置文件内容 -> 访问3344 -> 访问3355

访问 http://localhost:3355/configInfo,发现并没有修改,需要运维人员发送Post请求刷新3355

curl -X POST "http://localhost:3355/actuator/refresh"

再次访问http://localhost:3355/configInfo,发现配置已经需改了,成功实现了客户端3355刷新到最新配置内容,避免了服务重启。

3. 存留问题

但动态手动刷新存在多个问题:

  • 假如有多个微服务客户端3355/3366/3377…
  • 每个微服务都要执行—次post请求,手动刷新?
  • 可否广播,一次通知,处处生效?
  • 我们想大范围的自动刷新

② Bus 消息总线

一.概念

Spring Cloud Bus将分布式系统的节点与轻量级消息代理链接。这可以用于广播状态更改(例如配置更改)或其他管理指令。一个关键的想法是,Bus就像一个扩展的Spring Boot应用程序的分布式执行器,但也可以用作应用程序之间的通信渠道。当前唯一的实现是使用AMQP代理作为传输,但是相同的基本功能集(还有一些取决于传输)在其他传输的路线图上。

一言以蔽之,分布式自动刷新配置功能。Spring Cloud Bus配合Spring Cloud Config使用可以实现配置的动态刷新。

是什么

Spring Cloud Bus 配合Spring Cloud Config 使用可以实现配置的动态刷新。

在这里插入图片描述

Spring Cloud Bus是用来将分布式系统的节点与轻量级消息系统链接起来的框架,它整合了Java的事件处理机制和消息中间件的功能。Spring Clud Bus目前支持RabbitMQ和Kafka

能干嘛

Spring Cloud Bus能管理和传播分布式系统间的消息,就像一个分布式执行器,可用于广播状态更改、事件推送等,也可以当作微服务间的通信通道。

在这里插入图片描述

为何被称为总线

在微服务架构的系统中,通常会使用轻量级的消息代理来构建一个共用的消息主题,并让系统中所有微服务实例都连接上来。由于该主题中产生的消息会被所有实例监听和消费,所以称它为消息总线。在总线上的各个实例,都可以方便地广播一些需要让其他连接在该主题上的实例都知道的消息。

基本原理

ConfigClient实例都监听MQ中同一个topic(默认是springCloudBus)。当一个服务刷新数据的时候,它会把这个信息放入到Topic中,这样其它监听同一Topic的服务就能得到通知,然后去更新自身的配置。

二.RabbitMQ服务搭建

参考:RabbitMQ一文

三.Bus动态刷新全局广播

必须先具备良好的RabbitMQ环境先

1. 创建项目

演示广播效果,增加复杂度,再以3355为模板再制作一个3366

POM:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2022</artifactId>
        <groupId>com.jm.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-config-client-3355</artifactId>

    <dependencies>
        <!--添加消息总线RabbitMQ支持-->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-config</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>


</project>

bootstrap.yml:

 
server:
  port: 3366

spring:
  application:
    name: config-client
  cloud:
    #Config客户端配置
    config:
      label: master #分支名称
      name: config #配置文件名称
      profile: dev #读取后缀名称   上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.yml
      uri: http://localhost:3344 #配置中心地址

#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

# 暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"
 

主启动类:

package com.jm.springcloud;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
public class ConfigClientMain3355 {
     public static void main(String[] args) {
           SpringApplication.run(ConfigClientMain3355.class, args);
     }
}

业务类:

@RestController
@RefreshScope
public class ConfigClientController {
    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String getConfigInfo()
    {
        return configInfo;
    }
}

2. 设计思想

1.利用消息总线触发一个客户端/bus/refresh,而刷新所有客户端的配置

在这里插入图片描述

2.利用消息总线触发一个服务端ConfigServer的/bus/refresh端点,而刷新所有客户端的配置

在这里插入图片描述

图二的架构显然更加适合,图—不适合的原因如下:

  1. 打破了微服务的职责单一性,因为微服务本身是业务模块,它本不应该承担配置刷新的职责。
  2. 破坏了微服务各节点的对等性。
  3. 有一定的局限性。例如,微服务在迁移时,它的网络地址常常会发生变化,此时如果想要做到自动刷新,那就会增加更多的修改。

3. 具体操作

给cloud-config-center-3344配置中心服务端添加消息总线支持

XML:

        <!-- 添加消息总线RabbitMQ支持 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator</artifactId>
        </dependency>

application.yml

server:
  port: 3344

spring:
  application:
    name:  cloud-config-center #注册进Eureka服务器的微服务名
  cloud:
    config:
      server:
        git:
          uri: https://gitee.com/jm1107/springcloud-config.git #Gitee上面的git仓库名字
          ####搜索目录
          search-paths:
            - springcloud-config
      ####读取分支
      label: master
  #rabbitmq相关配置
  rabbitmq:
    host: 192.168.36.100
    port: 5672
    username: admin
    password: "123"

#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

##rabbitmq相关配置,暴露bus刷新配置的端点
management:
  endpoints: #暴露bus刷新配置的端点
    web:
      exposure:
        include: "bus-refresh"

给cloud-config-client-3355客户端添加消息总线支持

XML:

        <!-- 添加消息总线RabbitMQ支持 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-bus-amqp</artifactId>
        </dependency>

        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-actuator</artifactId>
        </dependency>

application.yml

server:
  port: 3355

spring:
  application:
    name: config-client
  cloud:
    #Config客户端配置
    config:
      label: master #分支名称
      name: config #配置文件名称
      profile: dev #读取后缀名称   上述3个综合:master分支上config-dev.yml的配置文件被读取http://config-3344.com:3344/master/config-dev.yml
      uri: http://localhost:3344 #配置中心地址k
  #rabbitmq相关配置 15672是Web管理界面的端口;5672是MQ访问的端口
  rabbitmq:
    host: 192.168.36.100
    port: 5672
    username: admin
    password: "123"

#服务注册到eureka地址
eureka:
  client:
    service-url:
      defaultZone: http://localhost:7001/eureka

# 暴露监控端点
management:
  endpoints:
    web:
      exposure:
        include: "*"

controller:

@RestController
@RefreshScope // ----具备刷新的能力
public class ConfigClientController {
    @Value("${config.info}")
    private String configInfo;

    @GetMapping("/configInfo")
    public String getConfigInfo()
    {
        return configInfo;
    }
}

4. 测试

  • 启动
    • EurekaMain7001
    • ConfigcenterMain3344
    • ConfigclientMain3355
    • ConfigclicntMain3366
  • 修改gitee上配置文件内容,增加版本号
  • 发送POST请求:curl -X POST "http://localhost:3344/actuator/bus-refresh"

配置中心:http://localhost:3344/master/config-dev.yml

客户端:

获取配置信息,发现都已经刷新成功

四.Bus 动态刷新定点通知

不想全部通知,只想定点通知

  • 只通知3355
  • 不通知3366

简单一句话 - 指定具体某一个实例生效而不是全部

公式:http://localhost:配置中心端口/actuator/bus-refresh/{destination}

/bus/refresh请求不再发送到具体的服务实例上,而是发给config server通过destination参数类指定需要更新配置的服务或实例

案例:

  • 我们这里以刷新运行在3355端口上的config-client(配置文件中设定的应用名称)为例,只通知3355,不通知3366
  • curl -X POST "http://localhost:3344/actuator/bus-refresh/config-client:3355"

在这里插入图片描述

③ Stream 消息驱动

常见MQ(消息中间件):ActiveMQ、RabbitMQ、RocketMQ、Kafka

有没有一种新的技术诞生,让我们不再关注具体MQ的细节,我们只需要用一种适配绑定的方式,自动的给我们在各种MQ内切换。(类似于Hibernate)?

Cloud Stream是什么?

屏蔽底层消息中间件的差异,降低切换成本,统一消息的编程模型。

一.概念

官方文档

Cloud Stream中文指导手册

什么是Spring Cloud Stream?

官方定义Spring Cloud Stream是一个构建消息驱动微服务的框架。应用程序通过inputs或者 outputs 来与Spring Cloud Stream中binder对象交互。

通过我们配置来binding(绑定),而Spring Cloud Stream 的binder对象负责与消息中间件交互。所以,我们只需要搞清楚如何与Spring Cloud Stream交互就可以方便使用消息驱动的方式。
通过使用Spring Integration来连接消息代理中间件以实现消息事件驱动。
Spring Cloud Stream为一些供应商的消息中间件产品提供了个性化的自动化配置实现,引用了发布-订阅、消费组、分区的三个核心概念。

目前仅支持RabbitMQ、 Kafka

二.设计理念

标准的MQ执行流程:

在这里插入图片描述

  • 生产者/消费者之间靠消息媒介传递信息内容
  • 消息必须走特定的通道 - 消息通道 Message Channel
  • 消息通道里的消息如何被消费呢,谁负责收发处理 - 消息通道MessageChannel的子接口SubscribableChannel,由MessageHandler消息处理器所订阅。

为什么用Cloud Stream?

比方说我们用到了RabbitMQ和Kafka,由于这两个消息中间件的架构上的不同,像RabbitMQ有exchange,kafka有Topic和Partitions分区。

在这里插入图片描述

这些中间件的差异性导致我们实际项目开发给我们造成了一定的困扰,我们如果用了两个消息队列的其中一种,后面的业务需求,我想往另外一种消息队列进行迁移,这时候无疑就是一个灾难性的,一大堆东西都要重新推倒重新做,因为它跟我们的系统耦合了,这时候Spring Cloud Stream给我们提供了—种解耦合的方式。

Stream凭什么可以统一底层差异?

在没有绑定器这个概念的情况下,我们的SpringBoot应用要直接与消息中间件进行信息交互的时候,由于各消息中间件构建的初衷不同,它们的实现细节上会有较大的差异性,通过定义绑定器作为中间层,完美地实现了应用程序与消息中间件细节之间的隔离。通过向应用程序暴露统一的Channel通道,使得应用程序不需要再考虑各种不同的消息中间件实现。

通过定义绑定器Binder作为中间层,实现了应用程序与消息中间件细节之间的隔离。

Binder:

  • INPUT对应于消费者
  • OUTPUT对应于生产者

在这里插入图片描述

Stream中的消息通信方式遵循了发布-订阅模式

Topic主题进行广播

  • 在RabbitMQ就是Exchange
  • 在Kakfa中就是Topic

三.stream编码常用注解

Spring Cloud Stream标准流程套路

在这里插入图片描述

在这里插入图片描述

  • Binder : 很方便的连接中间件,屏蔽差异。
  • Channel : 通道,是队列Queue的一种抽象,在消息通讯系统中就是实现存储和转发的媒介,通过Channel对队列进行配置。
  • Source和Sink: 简单的可理解为参照对象是Spring Cloud Stream自身,从Stream发布消息就是输出,接受消息就是输入

编码API和常用注解

组成说明
Middleware中间件,目前只支持RabbitMQ和Kafka
BinderBinder是应用与消息中间件之间的封装,目前实行了Kafka和RabbitMQ的Binder,通过Binder可以很方便的连接中间件,可以动态的改变消息类型(对应于Kafka的topic,RabbitMQ的exchange),这些都可以通过配置文件来实现
@Input注解标识输入通道,通过该输入通道接收到的消息进入应用程序
@Output注解标识输出通道,发布的消息将通过该通道离开应用程序
@StreamListener监听队列,用于消费者的队列的消息接收
@EnableBinding指信道channel和exchange绑定在一起

四.消息驱动案例

准备RabbitMQ环境(https://www.yuque.com/toufayouheiyouchang/gwrwfp/wd1gdy

工程中新建三个子模块

  • cloud-stream-rabbitmq-provider8801,作为生产者进行发消息模块
  • cloud-stream-rabbitmq-consumer8802,作为消息接收模块
  • cloud-stream-rabbitmq-consumer8803,作为消息接收模块

1. 消息生产者

创建项目:

cloud-stream-rabbitmq-provider8801

POM文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2022</artifactId>
        <groupId>com.jm.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-provider8801</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

application.yml:

server:
  port: 8801

spring:
  application:
    name: cloud-stream-provider
  # actuator 健康检查需要配置,不配置会尝试去连接 localhost
  rabbitmq:
    host: 192.168.36.100
    port: 5672
    username: admin
    password: "123"
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.36.100
                port: 5672
                username: admin
                password: "123"
      bindings: # 服务的整合处理
        output: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为json,文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: send-8801.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

主启动类:

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8801 {
     public static void main(String[] args) {
           SpringApplication.run(StreamMQMain8801.class, args);
     }
}

业务类:

发现消息的服务:

public interface IMessageProvider {
    public String send();
}

服务实现类:

@EnableBinding(Source.class)
@Slf4j
public class MessageProviderImpl implements IMessageProvider {
    @Resource
    private MessageChannel output;// 消息发送通道

    @Override
    public String send() {
        String serial = UUID.randomUUID().toString();
        output.send(MessageBuilder.withPayload(serial).build());
        log.info("*****serial: "+serial);
        return null;
    }
}

controller类:

@RestController
public class SendMessageController {
    @Resource
    private IMessageProvider messageProvider;

    @GetMapping("/sendMessage")
    public String sendMessage(){
        return messageProvider.send();
    }
}

测试:

  • 启动eurekaServer7001
  • 启动rabbitMQ
  • 启动stream-privoder8801

访问:http://localhost:8801/sendMessage,并且查看rabbit的监控端,是否有流量波峰

存在流量波峰,表示生产者构建成功。

2. 消息消费者

创建项目:

cloud-stream-rabbitmq-consumer8802

POM文件:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>cloud2022</artifactId>
        <groupId>com.jm.springcloud</groupId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>cloud-stream-rabbitmq-consumer8802</artifactId>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!--基础配置-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-devtools</artifactId>
            <scope>runtime</scope>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

</project>

application.yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-consumer
  # actuator 健康检查需要配置,不配置会尝试去连接 localhost
  rabbitmq:
    host: 192.168.36.100
    port: 5672
    username: admin
    password: "123"
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.36.100
                port: 5672
                username: admin
                password: "123"
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          #group: atguiguA

eureka:
  client: # 客户端进行Eureka注册的配置
    service-url:
      defaultZone: http://localhost:7001/eureka
  instance:
    lease-renewal-interval-in-seconds: 2 # 设置心跳的时间间隔(默认是30秒)
    lease-expiration-duration-in-seconds: 5 # 如果现在超过了5秒的间隔(默认是90秒)
    instance-id: receive-8802.com  # 在信息列表时显示主机名称
    prefer-ip-address: true     # 访问的路径变为IP地址

主启动类:

@SpringBootApplication
@EnableEurekaClient
public class StreamMQMain8802 {
      public static void main(String[] args) {
            SpringApplication.run(StreamMQMain8802.class, args);
      }
}

业务类:

@Component
@EnableBinding(Sink.class)
@Slf4j
public class ReceiveMessageListener {
    @Value("${server.port}")
    private String serverPort;

    @StreamListener(Sink.INPUT)
    public void input(Message<String> message){
        log.info("消费者1号,---->接收到的消息:"+message.getPayload()+"\t port:"+serverPort);
    }
}

测试:

  • 启动EurekaServer7001
  • 启动:cloud-stream-rabbitmq-provider8801 生产者
  • 启动:cloud-stream-rabbitmq-consumer8802 消费者
  • 8801发现一条消息 http://localhost:8801/sendMessage
  • 查看8002是否接收到了消息:

消费者消费成功

五.消费分组与持久化

为了演示分组,需要在按照8802,创建出来一份相同的消费者8803,创建和配置过程同8802,这里不在过多演示

1. 消息分组

依照8802,克隆出来一份运行8803 - cloud-stream-rabbitmq-consumer8803。

启动

  • RabbitMQ
  • 服务注册 - 7001
  • 消息生产 - 8801
  • 消息消费 - 8802
  • 消息消费 - 8803

此时会出现一个问题:一个消息被重复消费了两次

生产实际案例

比如在如下场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就会造成数据错误,我们得避免这种情况。这时我们就可以使用Stream中的消息分组来解决。

在这里插入图片描述

注意在Stream中处于同一个group中的多个消费者是竞争关系,就能够保证消息只会被其中一个应用消费一次。不同组是可以全面消费的(重复消费)。

如何定义分组?

spring:
  application:
    name: cloud-stream-consumer
  # actuator 健康检查需要配置,不配置会尝试去连接 localhost
  rabbitmq:
    host: 192.168.36.100
    port: 5672
    username: admin
    password: "123"
  cloud:
    stream:
      binders: # 在此处配置要绑定的rabbitmq的服务信息;
        defaultRabbit: # 表示定义的名称,用于于binding整合
          type: rabbit # 消息组件类型
          environment: # 设置rabbitmq的相关的环境配置
            spring:
              rabbitmq:
                host: 192.168.36.100
                port: 5672
                username: admin
                password: "123"
      bindings: # 服务的整合处理
        input: # 这个名字是一个通道的名称
          destination: studyExchange # 表示要使用的Exchange名称定义
          content-type: application/json # 设置消息类型,本次为对象json,如果是文本则设置“text/plain”
          binder: defaultRabbit # 设置要绑定的消息服务的具体设置
          group: atguiguA # 定义分组同一个组的消费者处于竞争关系,不同组会重复消费

以上述的问题,只需要将8802、8803将两个微服务都处于同一组中,即可解决重复消费问题

8802修改YML group: A_Group
8803修改YML group: A_Group

结论:同一个组的多个微服务实例,每次只会有一个拿到

2. 消息持久化

在使用分组后将自动消息被持久化

在这里插入图片描述

测试

  • 停止8802/8803并去除掉8802的分组group: A_Group,8803的分组group: A_Group没有去掉。
  • 8801先发送4条消息到RabbitMq。
  • 先启动8802,无分组属性配置,后台没有打出来消息。
  • 再启动8803,有分组属性配置,后台打出来了MQ上的消息。(消息持久化体现)
Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐