架构解析

我们目前的架构是一个消费者对应一个生产者,我们需要设置多个消费者,其中需要确定的问题是,是否会有消息重复消费的问题以及消息持久化的问题。由于我们之前已经创建了一个信息消费者cloud-stream-rabbitmq-consumer8802,我们参照这个消费模块创建cloud-stream-rabbitmq-consumer8803。由于过程不太复杂,我这里也就不一一截图展示了。

运行

我们启动Eureka服务端,一个生产模块,两个消费模块,然后调用生产模块接口四次。
在这里插入图片描述

生产者8801

在这里插入图片描述

消费者8802

在这里插入图片描述

消费者8803

在这里插入图片描述

问题解析

重复消费问题和消息持久化问题

上边的结构我们可以理解为下图的生产场景中,订单系统我们做集群部署,都会从RabbitMQ中获取订单信息,那如果一个订单同时被两个服务获取到,那么就可能会造成数据错误,我们得避免这种情况。这时我们通过可以使用Spring Cloud Stream中的消息分组来解决这样的问题。同时分组后消息的持久化也会得到解决,在实际应用中推荐进行分组。

  • 微服务应用放置于同一个group中,就能够保证消息只会被其中一个应用消费一次。

  • 不同的组是可以重复消费的,同一个组内会发生竞争关系,只有其中一个可以消费。

在这里插入图片描述

配置消费者8002的配置文件yml

server:
  port: 8802

spring:
  application:
    name: cloud-stream-rabbitmq-consumer
  rabbitmq:
    host: 106.15.73.43
    port: 5672
    username: guest
    password: guest
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
          group: a_group # 分组
eureka:
  client:
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka
    register-with-eureka: true
    fetch-registry: true

配置消费者8003的配置文件yml

server:
  port: 8803

eureka:
  client:
    fetch-registry: true
    register-with-eureka: true
    service-url:
      defaultZone: http://127.0.0.1:7001/eureka

spring:
  application:
    name: cloud-stream-rabbitmq-consumer
  rabbitmq:
    port: 5672
    host: 106.15.73.43
    username: guest
    password: guest
  cloud:
    stream:
      binders:
        defaultRabbit:
          type: rabbit
      bindings:
        input:
          destination: studyExchange
          content-type: application/json
          binder: defaultRabbit
          group: a_group # 分组

再次运行

我们启动Eureka服务端,一个生产模块,两个消费模块,然后调用生产模块接口四次。

生产者8801

在这里插入图片描述

消费者8802

在这里插入图片描述

消费者8803

在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐