目录

一、核心原理:RocketMQ 事务消息机制

1. 执行流程(4 步,必懂)

2. 一致性保证

二、环境准备

三、Java 完整代码实现

1. 常量配置

2. 事务生产者(核心:分布式事务发起方)

(1)自定义事务监听器(核心逻辑)

(2)事务生产者启动类

3. 事务消费者(积分服务)

四、运行测试步骤

五、运行日志(清晰看到事务流程)

生产者日志

消费者日志

异常场景(事务回查)

六、关键知识点总结

1. 为什么能保证分布式事务一致性?

2. 生产环境必须注意

3. 适用场景

总结


RocketMQ 是业界主流的分布式事务解决方案,基于可靠消息 + 事务消息实现最终一致性(分布式事务首选方案,性能远优于 2PC/3PC)。


一、核心原理:RocketMQ 事务消息机制

RocketMQ 分布式事务 = 半消息 + 事务状态回查 + 确认 / 回滚它解决的核心问题:本地事务执行成功 → 消息必须投递成功;本地事务失败 → 消息绝不投递,保证分布式系统数据一致。

1. 执行流程(4 步,必懂)

  1. 生产者发送 半消息(Half Message)消息先发送到 MQ,但对消费者不可见,MQ 只确认消息已存储。
  2. 生产者执行本地事务比如:扣减余额、创建订单。
  3. 生产者向 MQ 提交事务状态
    • 本地事务成功 → 发送Commit,半消息变为可消费消息
    • 本地事务失败 → 发送Rollback,MQ 删除半消息
  4. MQ 事务状态回查(兜底机制)如果第 3 步网络中断,MQ 会主动回调生产者,查询本地事务最终状态,保证数据不丢不乱。

2. 一致性保证

  • 最终一致性:本地事务成功 → 消息一定被消费
  • 无数据丢失:回查机制解决网络超时 / 宕机问题
  • 无重复消费:消费者做幂等控制即可

二、环境准备

  1. 启动 RocketMQ 4.9+ / 5.x 服务
  2. Maven 依赖(SpringBoot + RocketMQ)
<!-- RocketMQ 客户端 -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.7</version>
</dependency>

三、Java 完整代码实现

我们模拟订单创建 + 积分增加的分布式事务场景:

  1. 订单服务(事务生产者):创建订单(本地事务)
  2. 积分服务(消费者):收到消息后增加积分

1. 常量配置

import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;

public class RocketMQConstant {
    // NameServer 地址
    public static final String NAME_SERVER = "127.0.0.1:9876";
    // 事务消息 Topic
    public static final String TRANSACTION_TOPIC = "Topic_Transaction_Order";
    // 生产者组(事务消息必须指定组)
    public static final String PRODUCER_GROUP = "Tx_Producer_Group";
    // 消费者组
    public static final String CONSUMER_GROUP = "Tx_Consumer_Group";
}

2. 事务生产者(核心:分布式事务发起方)

必须使用 TransactionMQProducer,并实现事务监听器

(1)自定义事务监听器(核心逻辑)
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.concurrent.ConcurrentHashMap;

/**
 * 事务监听器:实现本地事务执行 + 事务回查
 */
public class OrderTransactionListener implements TransactionListener {

    // 模拟数据库存储事务状态(生产环境用DB)
    private static final ConcurrentHashMap<String, Boolean> TRANSACTION_DB = new ConcurrentHashMap<>();

    /**
     * 第一步:执行本地事务(创建订单)
     */
    @Override
    public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
        String orderId = msg.getKeys();
        try {
            System.out.println("===== 开始执行本地事务:创建订单,订单ID:" + orderId);
            
            // 模拟本地事务:创建订单、扣减库存等
            createOrder(orderId);
            
            // 本地事务执行成功
            TRANSACTION_DB.put(orderId, true);
            System.out.println("本地事务执行成功,订单ID:" + orderId);
            
            // 返回 COMMIT,消息对消费者可见
            return LocalTransactionState.COMMIT_MESSAGE;
        } catch (Exception e) {
            System.out.println("本地事务执行失败,订单ID:" + orderId);
            TRANSACTION_DB.put(orderId, false);
            
            // 返回 ROLLBACK,删除半消息
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    /**
     * 第二步:MQ 回调事务回查(兜底机制)
     * 当executeLocalTransaction未返回结果时,RocketMQ主动调用查询
     */
    @Override
    public LocalTransactionState checkLocalTransaction(MessageExt msg) {
        String orderId = msg.getKeys();
        System.out.println("===== RocketMQ 主动回查事务状态,订单ID:" + orderId);

        // 查询本地事务状态
        Boolean isSuccess = TRANSACTION_DB.get(orderId);
        if (isSuccess != null && isSuccess) {
            return LocalTransactionState.COMMIT_MESSAGE;
        } else {
            return LocalTransactionState.ROLLBACK_MESSAGE;
        }
    }

    // 模拟创建订单
    private void createOrder(String orderId) {
        // 真实环境:执行数据库操作
        System.out.println("数据库:插入订单记录,订单ID:" + orderId);
    }
}
(2)事务生产者启动类
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;

import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;

/**
 * 事务消息生产者(订单服务)
 */
public class TransactionProducer {
    public static void main(String[] args) throws MQClientException, InterruptedException {
        // 1. 创建事务生产者
        TransactionMQProducer producer = new TransactionMQProducer(RocketMQConstant.PRODUCER_GROUP);
        producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER);

        // 2. 创建线程池(处理事务回调)
        ExecutorService executorService = new ThreadPoolExecutor(
                2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
                Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
        );

        // 3. 设置事务监听器 + 线程池
        producer.setTransactionListener(new OrderTransactionListener());
        producer.setExecutorService(executorService);

        // 4. 启动生产者
        producer.start();
        System.out.println("===== 事务生产者启动成功 =====");

        try {
            // 5. 构建事务消息(必须设置唯一KEY,用于事务回查)
            String orderId = "ORDER_" + System.currentTimeMillis();
            Message message = new Message(
                    RocketMQConstant.TRANSACTION_TOPIC,
                    "Tag_Order",
                    orderId,  // 唯一KEY,非常重要
                    ("订单数据:" + orderId).getBytes(StandardCharsets.UTF_8)
            );

            // 6. 发送事务消息(半消息)
            producer.sendMessageInTransaction(message, null);
            System.out.println("===== 半消息发送成功 =====");

        } catch (Exception e) {
            e.printStackTrace();
        }

        // 保持程序运行
        Thread.sleep(100000);
        producer.shutdown();
    }
}

3. 事务消费者(积分服务)

import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;

import java.util.List;

/**
 * 事务消息消费者(积分服务)
 * 功能:收到订单消息 → 增加用户积分
 */
public class TransactionConsumer {
    public static void main(String[] args) throws MQClientException {
        // 1. 创建消费者
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMQConstant.CONSUMER_GROUP);
        consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER);

        // 2. 订阅Topic
        consumer.subscribe(RocketMQConstant.TRANSACTION_TOPIC, "*");

        // 3. 注册消息监听器
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    try {
                        String orderId = msg.getKeys();
                        String body = new String(msg.getBody());
                        System.out.println("===== 积分服务消费消息 =====");
                        System.out.println("订单ID:" + orderId);
                        System.out.println("消息内容:" + body);

                        // 模拟增加积分
                        addUserPoints(orderId);
                        
                        System.out.println("积分增加成功!\n");
                    } catch (Exception e) {
                        e.printStackTrace();
                        // 消费失败,返回RECONSUME_LATER,MQ会重试
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
                // 消费成功
                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 4. 启动消费者
        consumer.start();
        System.out.println("===== 事务消费者启动成功,等待消息 =====");
    }

    // 模拟增加积分
    private static void addUserPoints(String orderId) {
        System.out.println("数据库:用户积分 +100,关联订单:" + orderId);
    }
}

四、运行测试步骤

  1. 启动 RocketMQ
    # 启动NameServer
    mqnamesrv.cmd
    # 启动Broker
    mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true
    
  2. 启动消费者(积分服务)
  3. 启动生产者(订单服务)

五、运行日志(清晰看到事务流程)

生产者日志

===== 事务生产者启动成功 =====
===== 半消息发送成功 =====
===== 开始执行本地事务:创建订单,订单ID:ORDER_1735640000000
数据库:插入订单记录,订单ID:ORDER_1735640000000
本地事务执行成功,订单ID:ORDER_1735640000000

消费者日志

===== 事务消费者启动成功,等待消息 =====
===== 积分服务消费消息 =====
订单ID:ORDER_1735640000000
消息内容:订单数据:ORDER_1735640000000
数据库:用户积分 +100,关联订单:ORDER_1735640000000
积分增加成功!

异常场景(事务回查)

如果生产者发送半消息后宕机 / 断网,RocketMQ 会自动触发事务回查

===== RocketMQ 主动回查事务状态,订单ID:ORDER_1735640000000

六、关键知识点总结

1. 为什么能保证分布式事务一致性?

  • 半消息机制:先存消息,再执行事务,避免 “事务成功消息丢失”
  • 事务回查:解决网络超时、服务宕机导致的状态未知问题
  • 最终一致性:只要本地事务成功,消息一定会被投递消费

2. 生产环境必须注意

  1. 消息必须带唯一 KEY(订单 ID / 业务 ID),用于事务回查
  2. 本地事务状态必须持久化(数据库,不能用内存)
  3. 消费者做幂等控制(避免重复增加积分)
  4. 消费失败返回 RECONSUME_LATER,让 MQ 重试

3. 适用场景

  • 订单 + 积分
  • 订单 + 物流
  • 支付 + 通知
  • 所有不需要强一致性、追求高性能的分布式业务

总结

  1. RocketMQ 分布式事务基于事务消息实现最终一致性
  2. 核心流程:发半消息 → 执行本地事务 → 提交 / 回滚 → 事务回查
  3. Java 代码核心:TransactionMQProducer + 自定义TransactionListener
  4. 生产环境可直接套用这套代码,稳定可靠

更多推荐