RocketMQ 分布式事务一致性详解(含 Java 完整代码)
·
目录
RocketMQ 是业界主流的分布式事务解决方案,基于可靠消息 + 事务消息实现最终一致性(分布式事务首选方案,性能远优于 2PC/3PC)。
一、核心原理:RocketMQ 事务消息机制
RocketMQ 分布式事务 = 半消息 + 事务状态回查 + 确认 / 回滚它解决的核心问题:本地事务执行成功 → 消息必须投递成功;本地事务失败 → 消息绝不投递,保证分布式系统数据一致。
1. 执行流程(4 步,必懂)
- 生产者发送 半消息(Half Message)消息先发送到 MQ,但对消费者不可见,MQ 只确认消息已存储。
- 生产者执行本地事务比如:扣减余额、创建订单。
- 生产者向 MQ 提交事务状态
- 本地事务成功 → 发送Commit,半消息变为可消费消息
- 本地事务失败 → 发送Rollback,MQ 删除半消息
- MQ 事务状态回查(兜底机制)如果第 3 步网络中断,MQ 会主动回调生产者,查询本地事务最终状态,保证数据不丢不乱。
2. 一致性保证
- 最终一致性:本地事务成功 → 消息一定被消费
- 无数据丢失:回查机制解决网络超时 / 宕机问题
- 无重复消费:消费者做幂等控制即可
二、环境准备
- 启动 RocketMQ 4.9+ / 5.x 服务
- Maven 依赖(SpringBoot + RocketMQ)
<!-- RocketMQ 客户端 -->
<dependency>
<groupId>org.apache.rocketmq</groupId>
<artifactId>rocketmq-client</artifactId>
<version>4.9.7</version>
</dependency>
三、Java 完整代码实现
我们模拟订单创建 + 积分增加的分布式事务场景:
- 订单服务(事务生产者):创建订单(本地事务)
- 积分服务(消费者):收到消息后增加积分
1. 常量配置
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
public class RocketMQConstant {
// NameServer 地址
public static final String NAME_SERVER = "127.0.0.1:9876";
// 事务消息 Topic
public static final String TRANSACTION_TOPIC = "Topic_Transaction_Order";
// 生产者组(事务消息必须指定组)
public static final String PRODUCER_GROUP = "Tx_Producer_Group";
// 消费者组
public static final String CONSUMER_GROUP = "Tx_Consumer_Group";
}
2. 事务生产者(核心:分布式事务发起方)
必须使用 TransactionMQProducer,并实现事务监听器。
(1)自定义事务监听器(核心逻辑)
import org.apache.rocketmq.client.producer.LocalTransactionState;
import org.apache.rocketmq.client.producer.TransactionListener;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.concurrent.ConcurrentHashMap;
/**
* 事务监听器:实现本地事务执行 + 事务回查
*/
public class OrderTransactionListener implements TransactionListener {
// 模拟数据库存储事务状态(生产环境用DB)
private static final ConcurrentHashMap<String, Boolean> TRANSACTION_DB = new ConcurrentHashMap<>();
/**
* 第一步:执行本地事务(创建订单)
*/
@Override
public LocalTransactionState executeLocalTransaction(Message msg, Object arg) {
String orderId = msg.getKeys();
try {
System.out.println("===== 开始执行本地事务:创建订单,订单ID:" + orderId);
// 模拟本地事务:创建订单、扣减库存等
createOrder(orderId);
// 本地事务执行成功
TRANSACTION_DB.put(orderId, true);
System.out.println("本地事务执行成功,订单ID:" + orderId);
// 返回 COMMIT,消息对消费者可见
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
System.out.println("本地事务执行失败,订单ID:" + orderId);
TRANSACTION_DB.put(orderId, false);
// 返回 ROLLBACK,删除半消息
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
/**
* 第二步:MQ 回调事务回查(兜底机制)
* 当executeLocalTransaction未返回结果时,RocketMQ主动调用查询
*/
@Override
public LocalTransactionState checkLocalTransaction(MessageExt msg) {
String orderId = msg.getKeys();
System.out.println("===== RocketMQ 主动回查事务状态,订单ID:" + orderId);
// 查询本地事务状态
Boolean isSuccess = TRANSACTION_DB.get(orderId);
if (isSuccess != null && isSuccess) {
return LocalTransactionState.COMMIT_MESSAGE;
} else {
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
// 模拟创建订单
private void createOrder(String orderId) {
// 真实环境:执行数据库操作
System.out.println("数据库:插入订单记录,订单ID:" + orderId);
}
}
(2)事务生产者启动类
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.TransactionMQProducer;
import org.apache.rocketmq.common.message.Message;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
/**
* 事务消息生产者(订单服务)
*/
public class TransactionProducer {
public static void main(String[] args) throws MQClientException, InterruptedException {
// 1. 创建事务生产者
TransactionMQProducer producer = new TransactionMQProducer(RocketMQConstant.PRODUCER_GROUP);
producer.setNamesrvAddr(RocketMQConstant.NAME_SERVER);
// 2. 创建线程池(处理事务回调)
ExecutorService executorService = new ThreadPoolExecutor(
2, 5, 100, TimeUnit.SECONDS, new ArrayBlockingQueue<>(2000),
Executors.defaultThreadFactory(), new ThreadPoolExecutor.AbortPolicy()
);
// 3. 设置事务监听器 + 线程池
producer.setTransactionListener(new OrderTransactionListener());
producer.setExecutorService(executorService);
// 4. 启动生产者
producer.start();
System.out.println("===== 事务生产者启动成功 =====");
try {
// 5. 构建事务消息(必须设置唯一KEY,用于事务回查)
String orderId = "ORDER_" + System.currentTimeMillis();
Message message = new Message(
RocketMQConstant.TRANSACTION_TOPIC,
"Tag_Order",
orderId, // 唯一KEY,非常重要
("订单数据:" + orderId).getBytes(StandardCharsets.UTF_8)
);
// 6. 发送事务消息(半消息)
producer.sendMessageInTransaction(message, null);
System.out.println("===== 半消息发送成功 =====");
} catch (Exception e) {
e.printStackTrace();
}
// 保持程序运行
Thread.sleep(100000);
producer.shutdown();
}
}
3. 事务消费者(积分服务)
import org.apache.rocketmq.client.consumer.DefaultMQPushConsumer;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyContext;
import org.apache.rocketmq.client.consumer.listener.ConsumeConcurrentlyStatus;
import org.apache.rocketmq.client.consumer.listener.MessageListenerConcurrently;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.message.MessageExt;
import java.util.List;
/**
* 事务消息消费者(积分服务)
* 功能:收到订单消息 → 增加用户积分
*/
public class TransactionConsumer {
public static void main(String[] args) throws MQClientException {
// 1. 创建消费者
DefaultMQPushConsumer consumer = new DefaultMQPushConsumer(RocketMQConstant.CONSUMER_GROUP);
consumer.setNamesrvAddr(RocketMQConstant.NAME_SERVER);
// 2. 订阅Topic
consumer.subscribe(RocketMQConstant.TRANSACTION_TOPIC, "*");
// 3. 注册消息监听器
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
for (MessageExt msg : msgs) {
try {
String orderId = msg.getKeys();
String body = new String(msg.getBody());
System.out.println("===== 积分服务消费消息 =====");
System.out.println("订单ID:" + orderId);
System.out.println("消息内容:" + body);
// 模拟增加积分
addUserPoints(orderId);
System.out.println("积分增加成功!\n");
} catch (Exception e) {
e.printStackTrace();
// 消费失败,返回RECONSUME_LATER,MQ会重试
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
// 消费成功
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
});
// 4. 启动消费者
consumer.start();
System.out.println("===== 事务消费者启动成功,等待消息 =====");
}
// 模拟增加积分
private static void addUserPoints(String orderId) {
System.out.println("数据库:用户积分 +100,关联订单:" + orderId);
}
}
四、运行测试步骤
- 启动 RocketMQ
# 启动NameServer mqnamesrv.cmd # 启动Broker mqbroker.cmd -n 127.0.0.1:9876 autoCreateTopicEnable=true - 启动消费者(积分服务)
- 启动生产者(订单服务)
五、运行日志(清晰看到事务流程)
生产者日志
===== 事务生产者启动成功 =====
===== 半消息发送成功 =====
===== 开始执行本地事务:创建订单,订单ID:ORDER_1735640000000
数据库:插入订单记录,订单ID:ORDER_1735640000000
本地事务执行成功,订单ID:ORDER_1735640000000
消费者日志
===== 事务消费者启动成功,等待消息 =====
===== 积分服务消费消息 =====
订单ID:ORDER_1735640000000
消息内容:订单数据:ORDER_1735640000000
数据库:用户积分 +100,关联订单:ORDER_1735640000000
积分增加成功!
异常场景(事务回查)
如果生产者发送半消息后宕机 / 断网,RocketMQ 会自动触发事务回查:
===== RocketMQ 主动回查事务状态,订单ID:ORDER_1735640000000
六、关键知识点总结
1. 为什么能保证分布式事务一致性?
- 半消息机制:先存消息,再执行事务,避免 “事务成功消息丢失”
- 事务回查:解决网络超时、服务宕机导致的状态未知问题
- 最终一致性:只要本地事务成功,消息一定会被投递消费
2. 生产环境必须注意
- 消息必须带唯一 KEY(订单 ID / 业务 ID),用于事务回查
- 本地事务状态必须持久化(数据库,不能用内存)
- 消费者做幂等控制(避免重复增加积分)
- 消费失败返回 RECONSUME_LATER,让 MQ 重试
3. 适用场景
- 订单 + 积分
- 订单 + 物流
- 支付 + 通知
- 所有不需要强一致性、追求高性能的分布式业务
总结
- RocketMQ 分布式事务基于事务消息实现最终一致性
- 核心流程:发半消息 → 执行本地事务 → 提交 / 回滚 → 事务回查
- Java 代码核心:
TransactionMQProducer+ 自定义TransactionListener - 生产环境可直接套用这套代码,稳定可靠
更多推荐

所有评论(0)