核心功能要求:

  1. 订单服务(生产者):提供一个 HTTP 接口 /order/create,接收订单信息(订单号、用户ID、商品名称、数量等)。

  2. 业务处理:订单服务先“保存订单”(模拟数据库操作),然后通过 RabbitMQ 将订单消息发送到 order.exchange 交换机。

  3. 消息路由:使用 Topic 交换机,路由键为 order.create.用户ID,匹配队列绑定键 order.create.#,消息进入 user.queue 队列。

  4. 用户服务(消费者):监听 user.queue 队列,收到消息后执行后续业务(如为用户记录订单、发送通知等)。

解决的问题:

  • 服务解耦:订单服务不需要直接调用用户服务接口,降低耦合。

  • 异步处理:用户服务的操作(如发通知)不会阻塞订单创建流程,提升响应速度。

  • 消息可靠传递:通过 RabbitMQ 保证消息不丢失(队列持久化)。

1. pom.xml —— Maven 依赖配置

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0
         https://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <!-- 继承 Spring Boot 父工程,自动管理依赖版本 -->
    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>rabbitmq-business-demo</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <name>rabbitmq-business-demo</name>
    <description>Demo for business logic with RabbitMQ</description>

    <properties>
        <java.version>11</java.version>
    </properties>

    <dependencies>
        <!-- Spring Boot Web 启动器,提供 RESTful 接口、嵌入 Tomcat 等 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>

        <!-- Spring Boot RabbitMQ 启动器,自动配置 RabbitTemplate、ConnectionFactory 等 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

        <!-- Lombok,简化 POJO 代码(自动生成 getter/setter/toString 等) -->
        <dependency>
            <groupId>org.projectlombok</groupId>
            <artifactId>lombok</artifactId>
            <optional>true</optional>
        </dependency>

        <!-- 测试依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-test</artifactId>
            <scope>test</scope>
        </dependency>
    </dependencies>

    <build>
        <plugins>
            <!-- Spring Boot Maven 插件,可将应用打包为可执行 jar -->
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
                <configuration>
                    <!-- 打包时排除 Lombok(非生产依赖) -->
                    <excludes>
                        <exclude>
                            <groupId>org.projectlombok</groupId>
                            <artifactId>lombok</artifactId>
                        </exclude>
                    </excludes>
                </configuration>
            </plugin>
        </plugins>
    </build>
</project>

2. application.yml —— 应用配置文件

yaml

server:
  port: 8080               # HTTP 服务端口

spring:
  rabbitmq:
    host: localhost        # RabbitMQ 服务器地址
    port: 5672             # RabbitMQ 通信端口(AMQP 协议)
    username: guest        # 登录用户名(4.x 默认需强密码,开发环境可用)
    password: guest
    virtual-host: /        # 虚拟主机,默认根目录
  listener:
    simple:
      acknowledge-mode: auto   # 消息确认模式:auto 自动确认(消费后自动 ACK)

logging:
  level:
    com.example.rabbitmq: debug   # 打印调试日志,便于观察

3. RabbitMQConfig.java —— 配置队列、交换机、绑定

java

package com.example.rabbitmq.config;

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.core.TopicExchange;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**
 * RabbitMQ 配置类:声明交换机、队列以及它们之间的绑定关系。
 * 这些组件在 Spring 容器启动时会被自动创建(如果 RabbitMQ 中不存在)。
 */
@Configuration
public class RabbitMQConfig {

    // 交换机名称(订单交换机,Topic 类型)
    public static final String ORDER_EXCHANGE = "order.exchange";
    // 用户服务队列名称
    public static final String USER_QUEUE = "user.queue";
    // 路由键模式:order.create.#  其中 # 匹配零个或多个单词
    public static final String ORDER_ROUTING_KEY = "order.create.#";

    /**
     * 声明 Topic 交换机
     * @return TopicExchange 对象
     * 参数说明:name 交换机名称;durable 持久化(重启后不丢失);autoDelete 是否自动删除
     */
    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    /**
     * 声明用户服务队列
     * @return Queue 对象
     * 参数:name 队列名;durable 持久化
     */
    @Bean
    public Queue userQueue() {
        return new Queue(USER_QUEUE, true);
    }

    /**
     * 将队列绑定到交换机,并指定路由键模式
     * @return Binding 对象
     */
    @Bean
    public Binding binding() {
        return BindingBuilder.bind(userQueue())       // 绑定队列
                .to(orderExchange())                  // 绑定到交换机
                .with(ORDER_ROUTING_KEY);             // 指定路由键模式
    }
}

4. OrderDTO.java —— 数据传输对象

java

package com.example.rabbitmq.dto;

import lombok.Data;
import java.io.Serializable;

/**
 * 订单数据传输对象(DTO),用于在微服务之间传递订单信息。
 * 注意:必须实现 Serializable,因为消息在 RabbitMQ 中需要序列化传输。
 */
@Data   // Lombok 注解:自动生成 getter、setter、toString、equals、hashCode 等方法
public class OrderDTO implements Serializable {
    private static final long serialVersionUID = 1L;   // 序列化版本号

    private String orderId;       // 订单ID
    private String userId;        // 用户ID
    private String productName;   // 商品名称
    private Integer amount;       // 数量
    // 可根据需求添加更多字段,如 金额、创建时间等
}

5. OrderMessageProducer.java —— 消息生产者

java

package com.example.rabbitmq.producer;

import com.example.rabbitmq.config.RabbitMQConfig;
import com.example.rabbitmq.dto.OrderDTO;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 订单消息生产者:负责将订单消息发送到 RabbitMQ 交换机
 */
@Component
public class OrderMessageProducer {

    // RabbitTemplate 是 Spring 封装的操作 RabbitMQ 的核心工具类
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 发送订单创建消息
     * @param orderDTO 订单信息
     */
    public void sendOrderCreatedMessage(OrderDTO orderDTO) {
        // 动态生成路由键:order.create.用户ID
        // 这样可以让不同的用户路由到不同的队列(如果需要),此处只是示例
        String routingKey = "order.create." + orderDTO.getUserId();

        // 发送消息:exchange 交换机名称,routingKey 路由键,message 消息体(会自动序列化为 JSON)
        rabbitTemplate.convertAndSend(RabbitMQConfig.ORDER_EXCHANGE, routingKey, orderDTO);
        System.out.println("订单服务发送消息:" + orderDTO);
    }
}

6. OrderService.java —— 订单业务服务

java

package com.example.rabbitmq.service;

import com.example.rabbitmq.dto.OrderDTO;
import com.example.rabbitmq.producer.OrderMessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

/**
 * 订单业务逻辑层:处理订单创建的核心业务
 */
@Service
public class OrderService {

    @Autowired
    private OrderMessageProducer messageProducer;   // 注入消息生产者

    /**
     * 创建订单(模拟)
     * @param orderDTO 订单信息
     */
    public void createOrder(OrderDTO orderDTO) {
        // 1. 保存订单到数据库(此处仅打印模拟)
        System.out.println("订单服务:保存订单 " + orderDTO.getOrderId() + " 到数据库");

        // 2. 发送消息通知用户服务(异步解耦)
        messageProducer.sendOrderCreatedMessage(orderDTO);
    }
}

7. OrderController.java —— 订单接口控制器

java

package com.example.rabbitmq.controller;

import com.example.rabbitmq.dto.OrderDTO;
import com.example.rabbitmq.service.OrderService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

/**
 * 订单 RESTful 控制器,对外提供 HTTP 接口
 */
@RestController
public class OrderController {

    @Autowired
    private OrderService orderService;

    /**
     * 创建订单接口
     * 请求方式:POST
     * 请求路径:/order/create
     * 请求体:JSON 格式的订单信息
     */
    @PostMapping("/order/create")
    public String createOrder(@RequestBody OrderDTO orderDTO) {
        orderService.createOrder(orderDTO);
        return "订单创建成功,消息已发送";
    }
}

8. UserMessageConsumer.java —— 消息消费者

java

package com.example.rabbitmq.consumer;

import com.example.rabbitmq.dto.OrderDTO;
import com.example.rabbitmq.service.UserService;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

/**
 * 用户服务消息消费者:监听 user.queue,处理订单创建消息
 */
@Component
public class UserMessageConsumer {

    @Autowired
    private UserService userService;

    /**
     * 监听 user.queue 队列,当有消息到达时自动调用此方法
     * @param orderDTO 接收到的订单消息(Spring 会自动将 JSON 反序列化为 OrderDTO 对象)
     */
    @RabbitListener(queues = "user.queue")
    public void handleOrderCreated(OrderDTO orderDTO) {
        System.out.println("用户服务收到订单消息:" + orderDTO);
        // 调用用户业务逻辑处理订单
        userService.processOrderCreated(orderDTO);
    }
}

9. UserService.java —— 用户业务服务

java

package com.example.rabbitmq.service;

import com.example.rabbitmq.dto.OrderDTO;
import org.springframework.stereotype.Service;

/**
 * 用户业务逻辑层:处理与用户相关的业务(如记录订单、发送通知等)
 */
@Service
public class UserService {

    /**
     * 处理订单创建后的业务逻辑
     * @param orderDTO 订单信息
     */
    public void processOrderCreated(OrderDTO orderDTO) {
        // 实际业务:例如更新 Redis 中用户的最新订单,或发送短信、邮件等
        System.out.println("用户服务:为用户 " + orderDTO.getUserId() + " 记录订单 " + orderDTO.getOrderId());
        // 模拟发送通知
        System.out.println("用户服务:发送订单创建成功通知给用户 " + orderDTO.getUserId());
    }
}

10. RabbitmqBusinessApplication.java —— 主启动类

java

package com.example.rabbitmq;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;

/**
 * Spring Boot 主启动类
 * @SpringBootApplication 包含三个注解:@Configuration、@EnableAutoConfiguration、@ComponentScan
 * 它会自动扫描当前包及其子包下的所有组件(如 @Service、@Controller、@Component 等)
 */
@SpringBootApplication
public class RabbitmqBusinessApplication {
    public static void main(String[] args) {
        // 启动 Spring Boot 应用
        SpringApplication.run(RabbitmqBusinessApplication.class, args);
    }
}

代码如何运行?

  1. 启动 RabbitMQ(Docker 命令):

    bash

    docker run -d --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:4-management
  2. 运行主启动类 RabbitmqBusinessApplication

  3. 发送 POST 请求(例如使用 Postman):

    • URL: http://localhost:8080/order/create

    • Headers: Content-Type: application/json

    • Body:

5.微服务

项目功能说明

这个项目实现了一个 “用户服务 + 订单服务” 的典型微服务场景。核心业务流程如下:

  1. 用户通过浏览器向用户服务发起创建订单的请求。

  2. 用户服务收到请求,将订单信息封装成消息,通过 RabbitMQ 发送出去。

  3. 订单服务作为独立的微服务,一直在监听特定的消息队列。当收到用户服务的消息后,执行自己的业务逻辑(例如计算运费、扣减库存等)。

这个流程成功实现了两个微服务间的异步通信与解耦。

详细实现步骤

⚠️ 环境准备提示:在开始编码前,请确保已根据历史对话中的方法,使用 Docker 正确安装并启动了 RabbitMQ 服务,且 JDK 和 Maven 版本符合要求。

第一阶段:搭建父工程 (版本管理核心)

1. 创建Maven父工程
新建一个 Maven 项目,选择 pom 打包方式,用于管理所有子模块和统一依赖版本。

2. pom.xml
整个项目的“版本宪法”。它的核心是通过 dependencyManagement 锁定了 Spring Boot 2.7.x 和 Spring Cloud 2021.0.x 的版本组合,确保所有子模块的依赖版本一致,从根源上杜绝版本冲突问题。

来源:实验五 阶段一 3.

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
    <modelVersion>4.0.0</modelVersion>
    <packaging>pom</packaging> <!-- 父工程打包方式为pom -->

    <!-- 配置子模块 -->
    <modules>
        <module>eureka-server</module>
        <module>user-service</module>
        <module>order-service</module>
    </modules>

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.7.18</version>
        <relativePath/>
    </parent>

    <groupId>com.example</groupId>
    <artifactId>springcloud-rabbitmq-demo</artifactId>
    <version>1.0-SNAPSHOT</version>

    <properties>
        <java.version>11</java.version>
        <spring-cloud.version>2021.0.8</spring-cloud.version>
    </properties>

    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

    <build>
        <plugins>
            <plugin>
                <groupId>org.springframework.boot</groupId>
                <artifactId>spring-boot-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
</project>
第二阶段:搭建服务注册中心 (Eureka Server)

这是服务发现的枢纽,所有微服务启动后都会向它注册自己的网络地址。

1. 创建子模块: eureka-server。在父工程上右击 New -> Module,选择 Maven 创建。

2. 子模块 pom.xml

来源:实验五 阶段二 1.

xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0" ...>
    <parent>
        <groupId>com.example</groupId>
        <artifactId>springcloud-rabbitmq-demo</artifactId>
        <version>1.0-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>
    <artifactId>eureka-server</artifactId>

    <dependencies>
        <!-- Eureka Server 服务端依赖 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-server</artifactId>
        </dependency>
    </dependencies>
</project>

3. 启动类:EurekaServerApplication.java
@EnableEurekaServer 注解是开启服务注册中心功能的开关。

来源:实验五 阶段二 2.

java

package com.example.eureka;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.server.EnableEurekaServer;

@SpringBootApplication
@EnableEurekaServer
public class EurekaServerApplication {
    public static void main(String[] args) {
        SpringApplication.run(EurekaServerApplication.class, args);
    }
}

4. 配置文件:application.yml

来源:实验五 阶段二 3.

yaml

server:
  port: 8761
spring:
  application:
    name: eureka-server
eureka:
  client:
    register-with-eureka: false  # 禁止自己注册自己
    fetch-registry: false        # 禁止拉取服务列表
    service-url:
      defaultZone: http://localhost:8761/eureka/
第三阶段:开发用户服务(消息生产者)

1. 创建子模块: user-service。将服务注册为 Eureka Client,集成 Web 和 RabbitMQ。

2. 子模块 pom.xml

来源:实验五 阶段三 1.

xml

<dependencies>
    <dependency>
        <groupId>org.springframework.cloud</groupId>
        <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
    </dependency>
</dependencies>

3. 启动类:UserServiceApplication.java
@EnableEurekaClient 注解让此服务启动后自动注册到 Eureka Server。

来源:实验五 阶段三 2.

java

package com.example.user;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
public class UserServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(UserServiceApplication.class, args);
    }
}

4. 配置文件:application.yml

来源:实验五 阶段三 3.

yaml

server:
  port: 8081
spring:
  application:
    name: user-service
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
  instance:
    prefer-ip-address: true

5. 定义消息组件(交换机、队列、绑定):RabbitConfig.java

来源:实验五 阶段三 4.

java

package com.example.user.config;

import org.springframework.amqp.core.*;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitConfig {
    public static final String ORDER_EXCHANGE = "order.exchange";
    public static final String USER_QUEUE = "user.queue";
    public static final String ORDER_ROUTING_KEY = "order.create.#";

    @Bean
    public TopicExchange orderExchange() {
        return new TopicExchange(ORDER_EXCHANGE, true, false);
    }

    @Bean
    public Queue userQueue() {
        return new Queue(USER_QUEUE, true);
    }

    @Bean
    public Binding binding() {
        return BindingBuilder.bind(userQueue())
                .to(orderExchange())
                .with(ORDER_ROUTING_KEY);
    }
}

6. 开发消息生产者:MessageProducer.java
使用 RabbitTemplate 将消息(订单信息)发送到 RabbitMQ。

来源:实验五 阶段三 4.

java

package com.example.user.producer;

import com.example.user.config.RabbitConfig;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class MessageProducer {
    @Autowired
    private RabbitTemplate rabbitTemplate;

    public void sendOrderCreateMessage(String orderId, String userId) {
        String message = "用户 " + userId + " 创建了订单: " + orderId;
        rabbitTemplate.convertAndSend(RabbitConfig.ORDER_EXCHANGE, "order.create", message);
        System.out.println(">>> 生产者 [用户服务] 发送消息: " + message);
    }
}

7. 开发REST接口:UserController.java
提供一个 Web 接口,供外部调用以触发订单创建流程。

来源:实验五 阶段三 5.

java

package com.example.user.controller;

import com.example.user.producer.MessageProducer;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@RestController
public class UserController {
    @Autowired
    private MessageProducer messageProducer;

    @GetMapping("/create-order")
    public String createOrder(@RequestParam String orderId,
                              @RequestParam String userId) {
        messageProducer.sendOrderCreateMessage(orderId, userId);
        return "订单创建成功,消息已发送!";
    }
}
第四阶段:开发订单服务(消息消费者)

1. 创建子模块: order-service。其 pom.xml 与 user-service 的依赖完全相同(Eureka Client + Web + AMQP)。启动类 OrderServiceApplication 需添加 @EnableEurekaClient 注解,application.yml 端口配置为 8082,服务名为 order-service

2. 开发消息消费者:MessageConsumer.java
@RabbitListener 注解是该消费者的核心。它指示 Spring 在后台启动一个监听器,一旦 user.queue 队列中有消息,就会立即触发 handleOrderCreateMessage 方法,实现异步“消费”。

来源:实验五 阶段四 4.

java

package com.example.order.consumer;

import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

@Component
public class MessageConsumer {
    @RabbitListener(queues = "user.queue")
    public void handleOrderCreateMessage(String message) {
        System.out.println(">>> 消费者 [订单服务] 收到消息: " + message);
        // 模拟订单服务自己的业务逻辑
        System.out.println(">>> 订单服务执行业务: 根据订单信息处理库存...");
    }
}

3. 启动类:OrderServiceApplication.java

来源:实验五 阶段四 2.

java

package com.example.order;

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

@SpringBootApplication
@EnableEurekaClient
public class OrderServiceApplication {
    public static void main(String[] args) {
        SpringApplication.run(OrderServiceApplication.class, args);
    }
}

4. 配置文件:application.yml

来源:实验五 阶段四 3.

yaml

server:
  port: 8082
spring:
  application:
    name: order-service
  rabbitmq:
    host: localhost
    port: 5672
    username: guest
    password: guest
eureka:
  client:
    service-url:
      defaultZone: http://localhost:8761/eureka/
  instance:
    prefer-ip-address: true

验证与运行

  1. 严格按顺序启动:因为服务之间有依赖关系,必须确保先启动作为“地基”的注册中心,再启动其他业务服务。

    • ① 启动 EurekaServerApplication

    • ② 启动 UserServiceApplication

    • ③ 启动 OrderServiceApplication

  2. 查看注册中心:访问 http://localhost:8761,确认 USER-SERVICE 和 ORDER-SERVICE 均已成功注册。

  3. 触发业务流程:通过浏览器访问 http://localhost:8081/create-order?orderId=ORD001&userId=USER1001,将触发用户服务发送消息。

  4. 观察控制台

    • 用户服务控制台 会打印: >>> 生产者 [用户服务] 发送消息: ...

    • 订单服务控制台 会打印: >>> 消费者 [订单服务] 收到消息: ...
      这证明两个微服务已经通过消息队列完成了异步通信。

扩展:Spring Cloud Stream 实现方式

除了直接使用 Spring AMQP,官方还推荐使用更高层的 Spring Cloud Stream 框架。它可以让你使用统一API操作 RabbitMQ、Kafka 等多种消息中间件,实现底层技术的灵活切换。

下面用 Spring Cloud Stream 重写关键部分:

1. 在 user-service 和 order-service 的 pom.xml 中添加依赖

来源:实验六 5.1.1

xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2. 定义消息通道接口
通过 @Output 和 @Input 注解定义通道,将业务与具体中间件解耦。

来源:实验六 5.2.1 和 5.2.2

java

// 用户服务:作为 Source(消息源)
public interface UserSource {
    String USER_ORDER_OUTPUT = "user-order-output";
    @Output(USER_ORDER_OUTPUT)
    MessageChannel userOrderOutput();
}

// 订单服务:作为 Sink(消息接收器)
public interface OrderSink {
    String ORDER_INPUT = "order-input";
    @Input(ORDER_INPUT)
    SubscribableChannel orderInput();
}

3. 用户服务-生产者

来源:实验六 5.2.1

java

@Service
@EnableBinding(UserSource.class)
public class UserService {
    @Autowired
    private UserSource userSource;

    public void notifyOrderService(String orderMsg) {
        userSource.userOrderOutput()
            .send(MessageBuilder.withPayload(orderMsg).build());
    }
}

4. 订单服务-消费者

来源:实验六 5.2.2

java

@Service
@EnableBinding(OrderSink.class)
public class OrderService {
    @StreamListener(OrderSink.ORDER_INPUT)
    public void receiveOrderMessage(String msg) {
        System.out.println("Received: " + msg);
    }
}

5. 动态绑定配置 (bootstrap.yml)
使用配置文件而非 Java 代码来描述消息的路由关系。

来源:实验六 5.1.2

yaml

spring:
  cloud:
    stream:
      bindings:
        user-order-output:          # 对应 UserSource 中的输出通道
          destination: order.exchange
          content-type: application/json
        order-input:                # 对应 OrderSink 中的输入通道
          destination: order.exchange
          group: order-group
      rabbit:
        bindings:
          user-order-output:
            producer:
              routing-key-expression: "'order.create.rk'"

思考题(实验考核重点)

老师可能会要求结合实验进行思考。针对微服务架构,通常需要思考以下核心问题:

  1. 为什么要用消息队列?
    主要是为了实现解耦(用户服务无需知道订单服务的IP)、异步(创建订单的请求立即返回,无需等待订单服务处理完成)和削峰(应对高并发订单请求)。

  2. Eureka 的自我保护模式是什么?
    它是Eureka的一种容错机制。当短时间内丢失大量服务心跳时,Eureka会进入保护模式,不再注销任何服务实例,宁可保留可能已挂掉的服务,也要避免大规模误销毁引发的雪崩。生产环境应开启它【12†L34-L34】。

  3. 如何选择交换机类型?

    • Direct:适用于“点对点”的精准消息传递,例如发送 error 级别的日志给专门的错误处理服务【12†L18-L19】。

    • Topic:适用于需要按“主题模式”模糊匹配的场景,比如此处,可以匹配 order.createorder.update 等多个路由键【12†L19-L20】。

    • Fanout:适用于“广播”场景,当需要将一条订单消息通知给积分、物流、统计等多个下游服务时最合适【12†L20-L21】。

更多推荐