前言:

我们知道 RabbitMQ 保证消息一定会发送成功,一般有两种方式,分别是 Publisher Confirm 机制和事务消息,上一篇我们分享了如果使用 RabbitMQ Publisher Confirm 机制来保证消息一定发送成功,本篇我们来分享使用 RabbitMQ 事务消息保证消息一定会发送成功。

RabbitMQ 系列文章传送门

RabbitMQ 的介绍及核心概念讲解

@RabbitListener 注解详解

Spring Boot 整合 RabbitMQ 详解

RabbitMQ 如何保证消息一定会发送成功?【RabbitMQ Publisher Confirm 机制】

RabbitMQ 的事务消息是什么?

RabbitMQ 的事务消息机制是将生产者发消息的操作打包成一个原子单元,要么全部成功,要么全部失败,事务可以确保消息的完整性,但是要慎重使用,因为事务消息对 RabbitMQ 的性能有一定影响。

RabbitMQ 的事务消息有点类似 Spring 的事务,分为开始事务、提交事务、回滚事务。

  • txSelect():开始事务,使用 txSelect() 开启事务。
  • txCommit():提交事务,如果 txCommit() 提交事务成功了,则消息一定会发送到 RabbitMQ。
  • txRollback():回滚事务,如果在执行 txCommit() 之前 RabbitMQ 发生了异常,txRollback() 会捕获异常进行回滚。

RabbitMQ 发送事务消息首先需要我们使用 txSelect开启事务,然后就可以发布消息给 RabbitMQ 了,如果 txCommit 提交成功了,则消息一定发送到了 RabbitMQ,如果在 txCommit 执行之前 RabbitMQ 发生了任何异常,我们就捕获这个异常然后执行 txRollback 进行回滚操作,整个过程跟 Spring 的事务机制没太大的区别,因此,我们可以通过 RabbitMQ 事务机制保证消息一定可以发送成功。

了解了 RabbitMQ 的事务消息机制,接下来我们就分享两种方式来实现 RabbitMQ 事务消息。

RabbitMQ 事务消息实战

在进行 RabbitMQ 事务消息实战之前先给出一个结论,RabbitMQ Publisher Confirm 机制和事务消息不能同时使用,因此我们在进行事务消息实战时候需要修改如下配置:

#spring.rabbitmq.publisher-confirm-type=correlated

如果不注释掉该配置,项目启动会报错。

Spring Boot 集成 RabbitMQ

在 proerties 或者 yml 文件中添加 RabbitMQ 配置如下:

spring.rabbitmq.host= xxx.xxx.xxx
spring.rabbitmq.port= 5672
spring.rabbitmq.username= admin
spring.rabbitmq.password= admin
spring.rabbitmq.virtual-host = /study

项目 pom.xml 文件中引入 spring-boot-starter-amqp 依赖如下:

<dependency>
	<groupId>org.springframework.boot</groupId>
	<artifactId>spring-boot-starter-amqp</artifactId>
	<version>2.4.5</version>
</dependency>

配置交换机 Exchange、路由 RoutingKey、队列 Queue

RabbitTransactionConfig 不仅绑定了交换机 Exchange、路由 RoutingKey、队列 Queue,还注入了 RabbitMQ 事务管理器 rabbitTransactionManager 和 transactionRabbitTemplate,具体代码如下:

import org.springframework.amqp.core.Binding;
import org.springframework.amqp.core.BindingBuilder;
import org.springframework.amqp.core.DirectExchange;
import org.springframework.amqp.core.Queue;
import org.springframework.amqp.rabbit.connection.ConnectionFactory;
import org.springframework.amqp.rabbit.core.RabbitTemplate;
import org.springframework.amqp.rabbit.transaction.RabbitTransactionManager;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class RabbitTransactionConfig {

    //注入队列
    @Bean("transactionQueue")
    public Queue queue() {
        return new Queue("direct-transaction-queue");
    }

    //注入交换机
    @Bean("directTransactionExchange")
    public DirectExchange directTransactionExchange() {
        //durable:重启后是否有效 autodelete: 长期未使用是否删除掉
        return new DirectExchange("direct-transaction-exchange", true, true);
    }

    //绑定队列和交换机
    @Bean("directTransactionBinding")
    public Binding binding() {
        return BindingBuilder.bind(queue()).to(directTransactionExchange()).with("direct-transaction-exchange-routing-key");
    }

    //配置 RabbitMQ 事务管理器
    @Bean("rabbitTransactionManager")
    public RabbitTransactionManager rabbitTransactionManager(ConnectionFactory connectionFactory) {
        return new RabbitTransactionManager(connectionFactory);
    }

    //配置 RabbitMQ transactionRabbitTemplate
    @Bean("transactionRabbitTemplate")
    public RabbitTemplate transactionRabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }
    
}

使用 RabbitTemplate 事务消息生产者

这里的 transactionRabbitTemplate 我们在上面的 RabbitTransactionConfig 配置中进行了配置,因此可以直接使用,具体的事务消息发送者者代码如下:

import com.user.service.rabbitmq.producer.*;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestParam;
import org.springframework.web.bind.annotation.RestController;

@Slf4j
@Component
public class RabbitTransactionProducer {

    @Autowired
    private RabbitTemplate transactionRabbitTemplate;

    //@PostConstruct注解的作用是在对象创建后执行一些需要在对象注入完毕后进行的操作
    @PostConstruct
    public void init() {
        transactionRabbitTemplate.setChannelTransacted(true);
    }

    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public void sendTransactionaMessage(String message) {
        //发送消息
        transactionRabbitTemplate.convertAndSend("direct-transaction-exchange", "direct-transaction-exchange-routing-key", message + "1");
        transactionRabbitTemplate.convertAndSend("direct-transaction-exchange", "direct-transaction-exchange-routing-key", message + "2");
        //发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了
        int a = 1 / 0;
        transactionRabbitTemplate.convertAndSend("direct-transaction-exchange", "direct-transaction-exchange-routing-key", message + "3");
    }

}

我们发送三条消息,在二三条消息之间模拟一个业务异常,看下是否前两条消息都完成了回滚,验证效果如下:

在这里插入图片描述

RabbitMQ 控制台结果如下:

在这里插入图片描述

可以看到 direct-transaction-queue 这个队列上一条消息也没有,结果符合预期。

去掉模拟的业务异常,再次进行消息发送,RabbitMQ 控制台结果如下:

在这里插入图片描述

可以看到 direct-transaction-queue 队列上有 3条消息,综合上面有无异常情况的验证,可以证明事务消息保证了三条消息要么一起成功要么一起失败,保证原子性。

使用 Channel 发送事务消息生产者

封装 RabbitMqUtil 来获取 Channel,代码如下:

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class RabbitMqUtil {

    /**
     * @return com.rabbitmq.client.Channel
     * @date 2024/9/6 10:43
     * @description 获取 RabbitMQ Channel
     */
    public static Channel getChannel() {
        // 创建一个连接工厂,并设置MQ的相关信息
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("xxxxxx");
        factory.setUsername("xxx");
        factory.setPassword("xxx");
        factory.setVirtualHost("/xxx");
        Channel channel = null;
        try {
            // 创建连接
            Connection connection = factory.newConnection();
            // 获取信道
            channel = connection.createChannel();
        } catch (Exception e) {
            log.error("创建 RabbitMQ Channel 失败", e);
            e.printStackTrace();
        }
        return channel;
    }
}

上面我们演示了使用 RabbitTemplate 发送事务消息,这里我们演示使用 Channel 来发送事务消息,代码如下:


import com.rabbitmq.client.Channel;
import com.user.service.util.RabbitMqUtil;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Component;
import org.springframework.transaction.annotation.Transactional;

import java.io.IOException;
import java.nio.charset.StandardCharsets;


@Slf4j
@Component
public class RabbitTransactionChannelProducer {


    @Transactional(rollbackFor = Exception.class, transactionManager = "rabbitTransactionManager")
    public void sendTransactionaChannelMessage(String message) {
        //获取 Channel
        Channel channel = RabbitMqUtil.getChannel();
        try {
            //开启事务
            channel.txSelect();
            //发送消息
            channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "1").getBytes(StandardCharsets.UTF_8));
            channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "2").getBytes(StandardCharsets.UTF_8));
            //发送第三条消息之前模拟一个错误 我们看下前两条消息否回滚了
            //int a = 1 / 0;
            channel.basicPublish("direct-transaction-exchange", "direct-transaction-exchange-routing-key", null, (message + "3").getBytes(StandardCharsets.UTF_8));
            //提交事务
            channel.txCommit();
        } catch (Exception e) {
            //回滚事务
            try {
                channel.txRollback();
            } catch (IOException ex) {
                log.error("txRollback error", e);
                ex.printStackTrace();
            }
            e.printStackTrace();
        } finally {
            try {
                channel.close();
            } catch (Exception e) {
                log.error("channel close error", e);
                e.printStackTrace();
            }
        }
    }

}

这里我不在截图演示消息发送结果了,贴上代码,有兴趣的自己去进行验证。

事务消息消费者

前面演示了事务消息的生产者,接下来把事务消息的消费者代码也一并贴出来,消费者的代码基本大同小异,根据自己的业务需求来写,我的演示代码如下:

import com.rabbitmq.client.Channel;
import lombok.extern.slf4j.Slf4j;
import org.springframework.amqp.core.Message;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.stereotype.Component;

import java.io.IOException;


@Slf4j
@Component
public class RabbitTransactionaConsumer {


    //direct transaction 模式消费端
    @RabbitListener(queues = "direct-transaction-queue")
    public void directConfirmMessageConsumer(String messageData, Channel channel, Message message) {
        long deliveryTag = message.getMessageProperties().getDeliveryTag();
        try {
            log.info("Transaction 事务消息消费开始,消息内容:{},deliveryTag:{}", messageData, deliveryTag);
            if ("transactiona message2".equals(messageData)) {
                //模拟业务异常 验证 NACK
                int a = 1 / 0;
            }
            //模拟业务
            channel.basicAck(deliveryTag, true);
        } catch (Exception e) {
            //NACK multiple:是否批量处理  true:将一次性ack所有小于 deliveryTag 的消息 false 就不 ACK requeue:为true时, 重新入队
            try {
                channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, true);
            } catch (IOException ex) {
                ex.printStackTrace();
            }
            e.printStackTrace();
        }
    }
}

这里借用事务消息的消费来演示一下消费端的消息 ACK ,验证结果如下:,,,,

在这里插入图片描述

最后附上触发事务消息发送的代码,如下:

@Autowired
private RabbitTransactionProducer rabbitTransactionProducer;

@GetMapping("/send-transactiona-message")
private String sendTransactionaMessage(@RequestParam String message) {
	rabbitTransactionProducer.sendTransactionaMessage(message);
	return "OK";
}

@Autowired
private RabbitTransactionChannelProducer transactionChannelProducer;

@GetMapping("/send-transactiona-channel-message")
private String sendTransactionaChannelMessage(@RequestParam String message) {
	transactionChannelProducer.sendTransactionaChannelMessage(message);
	return "OK";
}

总结:本篇分享了两种使用 RabbitMQ 发送事务消息的代码,希望可以帮助到有需要的小伙伴。

如有不正确的地方欢迎各位指出纠正。

Logo

欢迎加入西安开发者社区!我们致力于为西安地区的开发者提供学习、合作和成长的机会。参与我们的活动,与专家分享最新技术趋势,解决挑战,探索创新。加入我们,共同打造技术社区!

更多推荐