基于Dubbo&RocketMQ实现SOA与分布式事务
基于Dubbo&RocketMQ实现SOA与分布式事务项目介绍项目定位首先本项目是基于前后端分离的架构,后端仅提供RESTful接口,前端使用的是Vue.js。本项目的单机版本见 Github同时有对应的前端项目,因为主力在后端,所以可能质量一般,见 Github本篇...
项目介绍
项目定位
- 首先本项目是基于前后端分离的架构,后端仅提供RESTful接口,前端使用的是Vue.js。
本项目的单机版本见 Github
同时有对应的前端项目,因为主力在后端,所以可能质量一般,见 Github - 本篇文章介绍的是经过SOA服务化拆分后的版本,基本功能不变,按业务模块进行了拆分。
之前我也写过介绍有关SOA服务化拆分的项目示例的文章 SOA服务化拆分,但是没有实现数据库的拆分,这也会牵扯到一个分布式事务的问题,它们是分布式环境下不可避免的问题,前一篇文章回避了这一问题,而本篇文章将介绍数据库的分库和基于RocketMQ实现分布式事务的关键逻辑。 - 本项目的Github 传送门 。欢迎各位的star和fork,这也是驱使我分享更多技术文章的动力。
功能
用户模块
- 获取图片验证码
- 登录:解决重复登录问题
- 注册
- 分页查询用户信息
- 修改用户信息
- 重置密码
站内信模块
- 一对一发送站内信
- 管理员广播
- 读取站内信(未读和已读)
- 一对多发送站内信
- 分页查询站内信
邮件模块
- 单独发送邮件
- 群发邮件
- Thymeleaf邮件模板
产品模块
- 获取所有产品类别
- 分页获取某一类别的所有产品
- 获取某一产品的详细信息
- 添加产品类型
- 添加某一产品
订单模块
- 下单,购买某一产品
- 浏览历史订单
- 取消订单
新闻模块
- 读取最新新闻
- 添加新闻
支付模块
- 用户充值
- 订单付款
Cos云存储(文件上传下载)
- 准备:获取appId、secretId、secretKey以及在官网上设置CORS
- JS部分:进行文件上传/下载
- Java部分:搭建鉴权服务器,提供token
只需要JS引入cos的包,Java不需要
涉及技术
- SpringBoot+多环境配置(dev,proc,test)
- Dubbo
- SpringMVC
- Spring
- MyBaits
- MyBatis Generator
- MyBatis PageHelper
- Druid
- Lombok
- JWT
- Spring Security
- JavaMail
- Thymeleaf
- HttpClient
- Spring Scheduler
- Hibernate Validator
- Redis Cluster
- MySQL主从复制,读写分离,按业务分库
- Spring Async
- Spring Cache
- Swagger
- Spring Test
- Spring Actuator
- Logback+Slf4j多环境日志
- i18n
- Maven Multi-Module
部署
项目使用到了20个虚拟机以及本地的应用服务器(4个),如果全部部署到分布式环境中大概需要24台机器。
Redis(所有app共用,6台)
101,102,107,108,109,110:
1)将每个节点下aof、rdb、nodes.conf本地备份文件删除;
放在root目录下
rm -rf /root/dump.rdb
rm -rf /root/nodes.conf
/usr/local/bin/redis-server /opt/redis/redis.conf
2)101节点:
/usr/local/bin/redis-trib.rb create –replicas 1 192.168.1.101:6379 192.168.1.102:6379 192.168.1.107:6379 192.168.1.108:6379 192.168.1.109:6379 192.168.1.110:6379
MySQL(每个服务三台)
开机启动!不用写了
106,111,112:
service mysqld start
123,124,125
service mysqld start
Zookepper(所有app共用,三台,或一台)
118,119,120:
zkServer.sh start
zkServer.sh status
Dubbo管控台(所有app共用,一台,可选)
121:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh
RocketMQ(双Master)
113,114:
启动nameserver
cd /usr/local/alibaba-rocketmq/bin/
nohup sh mqnamesrv &
启动rocketmq
113:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
jps
114:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
jps
113:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh
RocketMQ(双Master双Slave)
113,114,116,117:
113
启动nameserver
cd /usr/local/alibaba-rocketmq/bin/
nohup sh mqnamesrv &
启动rocketmq
113:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-a.properties >/dev/null 2>&1 &
jps
114:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-noslave/broker-b.properties >/dev/null 2>&1 &
jps
116:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-2s-async/broker-a-s.properties >/dev/null 2>&1 &
jps
117:
cd /usr/local/alibaba-rocketmq/bin
nohup sh mqbroker -c /usr/local/alibaba-rocketmq/conf/2m-2s-async/broker-b-s.properties >/dev/null 2>&1 &
jps
113:
/usr/local/apache-tomcat-8.5.20/bin/startup.sh
完整的虚拟机安装和软件安装可以参考本项目的Github中的《Linxu集群搭建》。
SOA拆分
业务拆分
- 用户子系统(拥有用户库 eshop_user):
- 用户模块:user+mail,涉及user,role,mail,mail_text,balance表
- 产品模块:product,涉及product,category表
- 新闻模块:news,涉及news表
- 消息模块: producer_transaction_message表
- 订单子系统(拥有订单库eshop_order):
- 订单模块: order,涉及order表
- 消息模块: consumer_transaction_message表
- 邮件子系统(无数据库)
拆分时的约定
- 公共的domain、enumeration都放在common模块下
- 一般情况下api模块放service接口和exception异常
注意自己模块的异常放在自己模块的api模块下(Dubbo异常机制) - i18n资源文件放在common下即可,别的模块下不用放
项目启动顺序
email,order,user,web
注意事项
- 所有实体类都要实现serializable接口
- Dubbo异常处理机制:
异常类和接口类在同一jar包里,直接抛出,否则被调方service中抛出的异常,在调用方中会被包一层RuntimeException,无法获得原来的异常。 - 更多的注意事项可以参考 SpringBootSkeleton 中的README文件。
数据库拆分
首先是按照业务分库:每个模块拥有每个模块对应的数据库,比如订单模块对应着订单库,里面有订单表。
本项目中的业务分库实际操作如下:
1. 用户模块对应用户库:
2. 订单模块对应订单库:
3. 邮件模块不需要数据库。
对每个数据库都做了主从复制和读写分离,一主二从,保证数据库的高可用。
其次是分表:如果表的体积过大,可以将单张表拆为多张表,每张表持有原表的一部分数据,在插入时可以根据一定的规则分散到多张表中。
以上可以用一张图来描述,图源网络,侵删。
按照业务分库可能会带来的问题
- 跨库Join
- 分布式事务
分表可能会带来的问题
- 全局分布式ID生成
- 分片规则选取
- 排序分页等
解决方案
RDBMS -> NoSQL -> NewSQL
在RDBMS基础上有一些分布式数据库中间件,比如MyCat;另一种则是新型数据库,称为NewSQL,一般是兼容某一种RDBMS以保证用户使用无障碍(比如兼容MySQL),比如TiDB。
中间件的局限性(来自网络)
性能
基于 MySQL 的方案它的天花板在哪里,它的天花板特别明显。有一个思路是能不能通过 MySQL 的 server 把 InnoDB 变成一个分布式数据库,听起来这个方案很完美,但是很快就会遇到天花板。因为 MySQL 生成的执行计划是个单机的,它认为整个计划的 cost 也是单机的,我读取一行和读取下一行之间的开销是很小的,比如迭代 next row 可以立刻拿到下一行。实际上在一个分布式系统里面,这是不一定的。
另外,你把数据都拿回来计算这个太慢了,很多时候我们需要把我们的 expression 或者计算过程等等运算推下去,向上返回一个最终的计算结果,这个一定要用分布式的 plan,前面控制执行计划的节点,它必须要理解下面是分布式的东西,才能生成最好的 plan,这样才能实现最高的执行效率。
比如说你做一个 sum,你是一条条拿回来加,还是让一堆机器一起算,最后给我一个结果。 例如我有 100 亿条数据分布在 10 台机器上,并行在这 10台机器我可能只拿到 10 个结果,如果把所有的数据每一条都拿回来,这就太慢了,完全丧失了分布式的价值。聊到 MySQL 想实现分布式,另外一个实现分布式的方案就是 Proxy。但是 Proxy 本身的天花板在那里,就是它不支持分布式的 transaction,它不支持跨节点的 join,它无法理解复杂的 plan,一个复杂的 plan 打到 Proxy 上面,Proxy 就傻了,我到底应该往哪一个节点上转发呢,如果我涉及到 subquery sql 怎么办?所以这个天花板是瞬间会到,在传统模型下面的修改,很快会达不到我们的要求。
高可用(运维)
另外一个很重要的是,MySQL 支持的复制方式是半同步或者是异步,但是半同步可以降级成异步,也就是说任何时候数据出了问题你不敢切换,因为有可能是异步复制,有一部分数据还没有同步过来,这时候切换数据就不一致了。前一阵子出现过某公司突然不能支付了这种事件,今年有很多这种类似的 case,所以微博上大家都在说“说好的异地多活呢?”……
为什么传统的方案在这上面解决起来特别的困难,天花板马上到了,基本上不可能解决这个问题。另外是多数据中心的复制和数据中心的容灾,MySQL 在这上面是做不好的。
SQL支持
基于中间件来进行分库, 确实对 SQL 有阉割的情况,并不是所有sql都能够支持。主要原因是数据被拆分了。而数据一旦被拆分到多个节点,则: 1.复杂的join查询2. 同时更新多个数据库节点的sql语句这两类SQL的支持难度,就比较高。这也是目前市面上所有中间件都无法满足的两点。复杂的join查询之所以难以支持,是因为要跨节点join;同时更新多个节点的sql难以支持,是因为很难解决多个节点的并发一致性问题。但是除了这两点之外,其他的sql类型,一款中间件是能够努力做到的。
与本项目的结合点
本项目没有考虑分表,数据量实在太小,而且实现非常繁琐。另外分表并不是SOA项目的必须实现的,但分布式事务是SOA项目必须要解决的,数据库不拆分,仅拆分业务,是无法实现服务的弹性扩展的。
基于本项目的示例和学习性质,所以希望选取一种较为简单的解决方案。当然最希望是Dubbo本身能支持分布式事务,但很遗憾,Dubbo目前不支持。
其次是考虑使用MyCat中间件,但它对分布式事务支持不够好,目前仅支持弱XA(下一个主题即分布式事务,稍后介绍),而它对分表可以屏蔽内部细节的优势又没有得到体现,所以没有被采用。
之后又考虑使用TiDB,据说是分布式数据库方面的最新成果,但资料太少,而且担心无法实现MySQL的平滑过渡,可能有坑需要填等问题,最后也没有采用。但其本身是非常优秀的技术,其官网上对其介绍如下:
TiDB 是新一代开源分布式 NewSQL 数据库,模型受 Google Spanner / F1 论文的启发, 实现了自动的水平伸缩,强一致性的分布式事务,基于 Raft 算法的多副本复制等重要 NewSQL 特性。 TiDB 结合了 RDBMS 和 NoSQL 的优点,部署简单,在线弹性扩容和异步表结构变更不影响业务, 真正的异地多活及自动故障恢复保障数据安全,同时兼容 MySQL 协议,使迁移使用成本降到极低。
另外还有一些闭源的解决方案,比如阿里云、腾讯云中都有分布式数据库的解决方案,也没有考虑(还是优先开源)。
最终是没有采用数据库中间件或者NewSQL,而是采用了原生MySQL,加上手工实现(common模块)的读写分离。
分布式事务(重点)
介绍
@Transactional
void bussinessMethod(){
// 远程Service,修改的是远程数据库
AService.updateA();
// 本地Service,修改的是本地数据库
BService.updateB();
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
以上便是一个分布式事务的基本示例。以往我们使用本地事务,一般是使用Spring的@Transactional声明式事务。比如执行两个业务操作,都是修改了数据库的,我们要求两个数据库DML增改删操作要么全部执行,要么一个都不执行。
比如第一个业务操作执行成功后,而第二个业务操作执行失败,抛出异常时,第一个业务操作会被回滚,这时候就可以确保数据是一致性的。
但是在分布式环境下,第一个业务操作执行成功后,而第二个业务操作执行失败,抛出异常,而此时是无法回滚第一个业务操作对数据库的修改的,因为不同同一个数据库,数据库的Connection就同,无法放到同一个事务中管理。此时就会出现数据不一致的问题。
常见解决方案
首先要说明的是事务分为柔性事务即不去追求实时的一致性,而是最终一致性和严格事务。实现严格的一致性的典型解决方案是2PC,在Java中为XA实现,但其因为效率问题无法在互联网应用中被青睐。而柔性事务则经常被使用。
2PC(2-Phase-Commit)
它分成两个阶段,先由一方进行提议(propose)并收集其他节点的反馈(vote),再根据反馈决定提交(commit)或中止(abort)事务。我们将提议的节点称为协调者(coordinator),其他参与决议节点称为参与者(participants, 或cohorts)。
两阶段提交中的第二阶段, 协调者需要等待所有参与者发出yes请求, 或者一个参与者发出no请求后, 才能执行提交或者中断操作. 这会造成长时间同时锁住多个资源, 造成性能瓶颈, 如果参与者有一个耗时长的操作, 性能损耗会更明显.
实现复杂, 不利于系统的扩展, 不推荐。
TCC
TCC, 是基于补偿型事务的AP系统的一种实现, 具有最终一致性.
下面以客户购买商品时的付款操作为例进行讲解:
Try:
完成所有的业务检查(一致性),预留必须业务资源(准隔离性);
体现在本例中, 就是确认客户账户余额足够支付(一致性), 锁住客户账户, 商户账户(准隔离性).
Confirm:
使用Try阶段预留的业务资源执行业务(业务操作必须是幂等的), 如果执行出现异常, 要进行重试.
在这里就是执行客户账户扣款, 商户账户入账操作.
Cancle:
释放Try阶段预留的业务资源, 在这里就是释放客户账户和商户账户的锁;
如果任一子业务在Confirm阶段有操作无法执行成功, 会造成对业务活动管理器的响应超时, 此时要对其他业务执行补偿性事务. 如果补偿操作执行也出现异常, 必须进行重试, 若实在无法执行成功, 则事务管理器必须能够感知到失败的操作, 进行log(用于事后人工进行补偿性事务操作或者交由中间件接管在之后进行补偿性事务操作).
TCC能够对分布式事务中的各个资源进行分别锁定, 分别提交与释放, 例如, 假设有AB两个操作, 假设A操作耗时短, 那么A就能较快的完成自身的try-confirm-cancel流程, 释放资源. 无需等待B操作. 如果事后出现问题, 追加执行补偿性事务即可.
TCC是绑定在各个子业务上的(除了cancel中的全局回滚操作), 也就是各服务之间可以在一定程度上”异步并行”执行。
异步确保型/可靠消息最终一致(基于消息中间件,要求MQ支持事务消息->目前阿里闭源版MQ支持)
执行步骤如下:
- MQ发送方发送远程事务消息到MQ Server;
- MQ Server给予响应, 表明事务消息已成功到达MQ Server.
- MQ发送方Commit本地事务.
若本地事务Commit成功, 则通知MQ Server允许对应事务消息被消费; 若本地事务失败, 则通知MQ Server对应事务消息应被丢弃.
若MQ发送方超时未对MQ Server作出本地事务执行状态的反馈, 那么需要MQ Server向MQ发送方主动回查事务状态, 以决定事务消息是否能被消费. - 当得知本地事务执行成功时, MQ Server允许MQ订阅方消费本条事务消息.
需要额外说明的一点, 就是事务消息投递到MQ订阅方后, 并不一定能够成功执行. 需要MQ订阅方主动给予消费反馈(ack)
如果MQ订阅方执行远程事务成功, 则给予消费成功的ack, 那么MQ Server可以安全将事务消息移除;
如果执行失败, MQ Server需要对消息重新投递, 直至消费成功.
本项目就是采用这种方法实现的,而且是基于阉割版的RocketMQ为(未实现消息回查)实现了外围的消息回查。
最大努力通知型(基于消息中间件,定期校对)
这是分布式事务中要求最低的一种, 也可以通过消息中间件实现, 与前面异步确保型操作不同的一点是, 在消息由MQ Server投递到消费者之后, 允许在达到最大重试次数之后正常结束事务.
1.业务活动的主动方,在完成业务处理之后,向业务活动的被动方发送消息,允许消息丢失。
2.主动方可以设置时间阶梯型通知规则,在通知失败后按规则重复通知,直到通知N次后不再通知。
3.主动方提供校对查询接口给被动方按需校对查询,用于恢复丢失的业务消息。
4.业务活动的被动方如果正常接收了数据,就正常返回响应,并结束事务。
5.如果被动方没有正常接收,根据定时策略,向业务活动主动方查询,恢复丢失的业务消息。
相比于可靠消息最终一致方案,最大努力通知方案设计上比较简单,主要是由两部分构成。
1.实时消息服务(MQ):接收主动方发送的MQ消息。
2.通知服务子系统:监听MQ消息,当收到消息后,向被动方发送通知(一般是URL方式),同时生成通知记录。如果没有接收到被动方的返回消息,就根据通知记录进行重复通知。
最大努力通知方案实现方式比较简单,本质上就是通过定期校对,适用于数据一致性时间要求不太高的场合,其实不把它看作是分布式事务方案,只认为是一种跨平台的数据处理方案也是可以的。
与本项目的结合点(RocketMQ)
RocketMQ与其他消息中间件的一个区别是支持事务消息。这个事务消息其实在上面的可靠消息最终一致中已经介绍过了,下面就RocketMQ而言说明一下其原理。
关于RocketMQ的完整介绍可以参考本项目的Github中的《RocketMQ笔记》。
RocketMQ事务消息原理
一个分布式事务被拆为一个本地事务和一个消息发送。
而消息发送的前提是本地事务执行成功,本地事务提交后,消息才会发送出去;否则会取消该消息的发送。
流程如下:
1. Producer向Broker发送Prepared消息,可能会发送失败
2. 执行本地事务
3. 如果本地事务执行成功,则发送Confirm消息;如果失败,那么回滚本地事务,取消发送Confirm消息,Broker会删除Prepared消息。
4. Producer发送Confirm消息时可能会发送失败,此时消息的状态仍为Prepared。
5. Broker接收到Confirm消息时,会将该消息推送给Consumer。
6. 设置Scheduler去向Producer轮询Prepared消息的当前状态,称为消息回查。消息回查主要目的是检测Confirm消息发送失败的情况。
7. Consumer接收到消息,执行本地事务。
8. 本地事务执行成功时,会返回给Broker一个ACK,执行失败时,Broker会定期重新发送给Consumer该消息,超过重试次数时可以选择不再重试。
RocketMQ第一阶段发送Prepared消息时,会拿到消息的地址,第二阶段执行本地事务,第三阶段通过第一阶段拿到的地址去访问消息,并修改消息的状态。
为解决确认消息发送失败的问题(消息回查),RocketMQ会定期扫描消息集群中的事务消息,如果发现了Prepared消息,它会向消息发送端(生产者)确认,Bob的钱到底是减了还是没减呢?如果减了是回滚还是继续发送确认消息呢?RocketMQ会根据发送端设置的策略来决定是回滚还是继续发送确认消息。这样就保证了消息发送与本地事务同时成功或同时失败。
问题
- prepared消息可能会发送失败,此时会抛出异常。解决方案一般是返回错误结果,用户进行重试。当然也可以设置Producer对发送失败的消息进行重试。
- 本地事务执行失败,返回错误结果,用户进行重试。
- Confirm消息发送失败,此时Producer一般是无法感知到的。此时需要Broker遍历Prepared消息,进行消息回查。而开源RocketMQ阉割了这部分的内容,这也是一会我将要介绍的外围解决事务回查问题的内容。如果可以检测到Confirm消息发送失败的情况,那么一般解决方案是重新发送Confirm消息。
- 重试发送Confirm消息,仍失败,且超过重试次数时要进行记录,可以考虑人工处理:继续重发或者回滚本地事务(需要自行实现)。
- Consumer消费失败。消费失败时RocketMQ会自动进行重试,我们可以自己去设置一个重试次数,超过重试次数时进行记录,可以考虑人工处理:继续重试或者回滚本地事务。
- Consumer消息重复。RocketMQ不能保证消息重复,而对于当前业务而言是不能重复的,这个解决方案在后面也会介绍。
分布式事务设计
场景
在业务中有一处需要用户为订单付款,该业务会修改用户库的balance(用户余额表),扣减用户的余额,然后会修改订单库的order(订单表)和enterprise(企业余额表),将订单状态设置为已被支付,并增加企业的余额。这里就同时修改多个数据库,涉及到了分布式事务的问题。我最终是使用了RocketMQ的事务消息,并从外围解决了消息回查的问题。
他人思路
在设计我的解决方案前尝试搜索了一下别人的实现 传送门。他的解决方案是在producer和consumer方设置了两个scheduler,感觉是有些复杂的。我是在其基础上进行了简化,并解决了一些其他问题,使得整个解决方案比较完整和逻辑自洽。
我的设计
A和B是两个Service,A执行本地事务,B执行远程事务。A会调用B的远程服务,完成整个业务。就本项目而言,A就是用户模块的AccountService,B就是订单模块的OrderService。A和B都有一张表,存储着消息数据。从MQ的视角看来,A是消息的Producer,B是消息的Consumer。
A(本地事务执行方,MQProducer)
1) db
producer_msg(msgId,body,message_status,create_time,update_time,send_times,topic) msgId这里为orderId
2) mq
作为producer时,注册Topic account:当执行本地事务时同时插入producer_msg,默认status都是未被消费。如果本地事务执行失败,那么直接回滚,不插入。当消息发送失败时,我们已经在producer_msg插入了记录,可以进行回查。
3) scheduler
A需要同步B的数据库,使得两个数据库数据一致,不同的即为确认信息发送失败的。
消息状态有未被消费、已被消费、消费失败、超过消费失败的重试次数、超过确认消息发送失败的重试次数和已被回滚。
A和B数据库同步维护所有消息,只是A数据库保存内容更多,比如会保存消息的body。
如果消息已经是超过重试次数或已被消费,那么A不会再去考虑它。
A的Scheduler会遍历A数据库,找出未被消费和消费失败的id且创建时间距离当前时间超过1min,发送给B。
B会遍历这些id
for(id in ids){
如果 id 不存在,说明确认消息发送失败,
如果 id 存在,则将该id对应的status一并返回,map.put(id,status)
}
- 1
- 2
- 3
- 4
A 接收到map后,keySet取得所有id,拿发送过去的id减去这些id(差集),就是确认消息发送失败的消息,进行重新发送;遍历map,将本地数据库同步为B数据库。
这个方法可能会出现消息重复,因为A刚发送消息,B该没有处理,A的Scheduler就去查询了,当然消息都没有被消费,因为A会重发刚才的消息,但是B有做消息去重,所以不会影响。
B(远程事务执行方,MQConsumer)
1) db
consumer_msg(msgId,create_time. message_status,topic) msgId这里是orderId
2) mq
作为consumer,注册Topic account:
当接收到消息后,查询是否被执行过,如果没有被消费过(id未找到)或者消费失败了(这里解决了消息重复消费的问题),则执行远程事务后插入/更新consumer_ msg(status为已被消费),已被消费则跳过。
远程事务执行失败时,插入/更新consumer_ msg(status为消费失败)
超过重试消费次数的消息也更新consumer_ msg,status为超过消费的重试次数。
B这里就维护它所接收的消息的状态。
消息表
在producer这一方设计了producer_transaction_message表。
- msgId是消息唯一id,可以采用业务上的id来实现,比如订单id。
- body是消息体,比如订单对象的序列化结果。
- message_status是消息状态
- update_time是最后更新记录时间
- create_time是消息创建时间
- send_times是确认消息重复发送次数
- topic是消息主题,这里均为account
在consumer这一方设计了consumer_transaction_message表。
看得出来是producer的表的部分列,其含义也是相同的。
分布式事务实现代码
Producer方
MQProducerConfig(配置MQProducer)
@Configuration
@Slf4j
@Getter
public class MQProducerConfig {
@Value("${spring.rocketmq.group-name}")
private String groupName;
@Value("${spring.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${spring.rocketmq.topic}")
private String topic;
@Value("${spring.rocketmq.confirm-message-faiure-retry-times}")
private Integer retryTimes;
public static final Integer CHECK_GAP = 1;
@Bean
public MQProducer mqProducer() throws MQClientException {
TransactionMQProducer producer = new TransactionMQProducer(groupName);
producer.setNamesrvAddr(namesrvAddr);
producer.setTransactionCheckListener(new TransactionCheckListener() {
@Override
public LocalTransactionState checkLocalTransactionState(MessageExt msg) {
// doNothing
return LocalTransactionState.COMMIT_MESSAGE;
}
});
Runtime.getRuntime().addShutdownHook(new Thread(new Runnable() {
public void run() {
producer.shutdown();
}
}));
producer.start();
log.info("producer started!");
return producer;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
AccountLocalTransactionExecutor(执行本地事务)
@Component
@Slf4j
public class AccountLocalTransactionExecutor implements LocalTransactionExecuter {
@Autowired
private PayService payService;
@Autowired
private ProducerTransactionMessageService messageService;
@Override
public LocalTransactionState executeLocalTransactionBranch(Message msg, Object arg) {
try {
String paymentPassword = (String) arg;
OrderDO order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
if (order.getOrderStatus() != OrderStatus.UNPAID) {
log.info("{} 订单状态不为unpaid", order.getId());
throw new OrderStateIllegalException(order.getOrderStatus().toString());
}
// 本地事务,减少用户账户余额
// 抛出异常时会进行回滚,下面构造消息存储到数据库也不会被执行
payService.decreaseAccount(order.getUser().getId(), order.getTotalPrice(), paymentPassword);
// 保存消息至数据库
ProducerTransactionMessageDO messageDO = ProducerTransactionMessageDO.builder()
.id(order.getId())
.body(msg.getBody())
.createTime(LocalDateTime.now())
.updateTime(LocalDateTime.now())
.messageStatus(MessageStatus.UNCONSUMED)
.topic(msg.getTopic())
.sendTimes(0)
.build();
messageService.save(messageDO);
// 成功通知MQ消息变更 该消息变为:<确认发送>
return LocalTransactionState.COMMIT_MESSAGE;
} catch (Exception e) {
e.printStackTrace();
log.info("本地事务执行失败,直接回滚!");
// 失败则不通知MQ 该消息一直处于:<暂缓发送>
return LocalTransactionState.ROLLBACK_MESSAGE;
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
AccountServiceImpl(Producer支付业务入口)
@Service
@Slf4j
public class AccountServiceImpl implements AccountService {
@Autowired
private MQProducerConfig config;
@Autowired
private MQProducer producer;
@Autowired
private AccountLocalTransactionExecutor executor;
@Autowired
private ProducerTransactionMessageService messageService;
@Autowired
private PayService payService;
@Override
public void commit(OrderDO order, String paymentPassword) {
Message message = new Message();
message.setTopic(config.getTopic());
message.setBody(ProtoStuffUtil.serialize(order));
TransactionSendResult result = null;
try {
result = this.producer.sendMessageInTransaction(message, executor, paymentPassword);
log.info("事务消息发送结果:{}", result);
log.info("TransactionState:{} ", result.getLocalTransactionState());
// 因为无法获得executor中抛出的异常,只能模糊地返回订单支付失败信息。
// TODO 想办法从executor中找到原生异常
} catch (Exception e) {
log.info("AccountService抛出异常...");
e.printStackTrace();
}
if (result.getLocalTransactionState() == LocalTransactionState.ROLLBACK_MESSAGE) {
throw new OrderPaymentException(order.getId());
}
}
@Transactional
@Override
public void rollback(ProducerTransactionMessageDO message) {
OrderDO order = ProtoStuffUtil.deserialize(message.getBody(), OrderDO.class);
message.setMessageStatus(MessageStatus.ROLLBACK);
message.setUpdateTime(LocalDateTime.now());
messageService.update(message);
payService.increaseAccount(order.getUser().getId(), order.getTotalPrice());
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
TransactionCheckScheduler(消息回查)
@Component
public class TransactionCheckScheduler {
@Autowired
private ProducerTransactionMessageService messageService;
/**
* 每分钟执行一次事务回查
*/
@Scheduled(fixedRate = 60 * 1000)
public void checkTransactionMessage(){
messageService.check();
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
ProducerTransactionMessageServiceImpl(Producer消息服务提供者)
@Slf4j
public class ProducerTransactionMessageServiceImpl implements ProducerTransactionMessageService {
@Autowired
private MQProducer producer;
@Autowired
private MQProducerConfig config;
@Autowired
private ProductTransactionMessageDOMapper mapper;
@Autowired
private ConsumerTransactionMessageService consumerTransactionMessageService;
@Transactional
@Override
public void save(ProducerTransactionMessageDO message) {
mapper.insert(message);
}
@Transactional
@Override
public void check() {
List<Long> all = mapper.findMessageIdsByStatusCreatedAfter(Arrays.asList(MessageStatus.UNCONSUMED, MessageStatus.CONSUME_FAILED), MQProducerConfig.CHECK_GAP);
Map<Long, MessageStatus> statusMap = consumerTransactionMessageService.findConsumerMessageStatuses(all);
for (Map.Entry<Long, MessageStatus> entry : statusMap.entrySet()) {
mapper.updateByPrimaryKeySelective(ProducerTransactionMessageDO.builder().id(entry.getKey()).messageStatus(entry.getValue()).updateTime(LocalDateTime.now()).build());
}
all.removeAll(statusMap.keySet());
// 此时all为确认消息发送失败的
this.reSend(mapper.selectBatchByPrimaryKeys(all));
}
@Transactional
@Override
public void reSend(List<ProducerTransactionMessageDO> messages) {
for (ProducerTransactionMessageDO messageDO : messages) {
if (messageDO.getSendTimes() == config.getRetryTimes()) {
messageDO.setUpdateTime(LocalDateTime.now());
messageDO.setMessageStatus(MessageStatus.OVER_CONFIRM_RETRY_TIME);
mapper.updateByPrimaryKeySelective(messageDO);
continue;
}
Message message = new Message();
message.setTopic(config.getTopic());
message.setBody(messageDO.getBody());
try {
SendResult result = producer.send(message);
messageDO.setSendTimes(messageDO.getSendTimes() + 1);
messageDO.setUpdateTime(LocalDateTime.now());
mapper.updateByPrimaryKeySelective(messageDO);
log.info("发送重试消息完毕,Message:{},result:{}", message, result);
} catch (Exception e) {
e.printStackTrace();
log.info("发送重试消息时失败! Message:{}", message);
}
}
}
@Transactional
@Override
public void delete(Long id) {
mapper.deleteByPrimaryKey(id);
}
@Transactional(readOnly = true)
@Override
public List<ProducerTransactionMessageDO> findByIds(List<Long> ids) {
return mapper.selectBatchByPrimaryKeys(ids);
}
@Transactional(readOnly = true)
@Override
public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(MessageQueryConditionDTO dto) {
return mapper.findByCondition(dto, dto.getPageNum(), dto.getPageSize()).toPageInfo();
}
@Override
public void update(ProducerTransactionMessageDO message) {
mapper.updateByPrimaryKeySelective(message);
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
- 64
- 65
- 66
- 67
- 68
- 69
- 70
- 71
- 72
- 73
- 74
- 75
- 76
- 77
- 78
- 79
- 80
Consumer
MQConsumerConfig(配置MQConsumer)
@Configuration
@Slf4j
@Getter
public class MQConsumerConfig {
private DefaultMQPushConsumer consumer;
@Value("${spring.rocketmq.group-name}")
private String groupName;
@Value("${spring.rocketmq.namesrv-addr}")
private String namesrvAddr;
@Value("${spring.rocketmq.topic}")
private String topic;
@Autowired
private AccountMessageListener accountMessageListener;
@Value("${spring.rocketmq.consume-failure-retry-times}")
private Integer retryTimes;
@PostConstruct
public void init() throws MQClientException {
this.consumer = new DefaultMQPushConsumer(groupName);
this.consumer.setNamesrvAddr(namesrvAddr);
// 启动后从队列头部开始消费
this.consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
this.consumer.subscribe(topic, "*");
this.consumer.registerMessageListener(accountMessageListener);
this.consumer.start();
log.info("consumer started!");
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
AccountMessageListener(消息接收方)
@Component
@Slf4j
public class AccountMessageListener implements MessageListenerConcurrently {
@Autowired
private OrderService orderService;
@Autowired
@Qualifier("consumerTransactionMessageService")
private ConsumerTransactionMessageService messageService;
@Autowired
private MQConsumerConfig config;
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs, ConsumeConcurrentlyContext context) {
log.info("接收到消息数量为:{}", msgs.size());
for (MessageExt msg : msgs) {
ConsumerTransactionMessageDO messageDO = null;
OrderDO order = null;
try {
String topic = msg.getTopic();
String keys = msg.getKeys();
order = ProtoStuffUtil.deserialize(msg.getBody(), OrderDO.class);
log.info("消费者接收到消息:topic: {}, keys:{} , order: {}", topic, keys, order);
// 如果已经被消费过并且消费成功,那么不再重复消费(未被消费->id不存在或消费失败或超过重试次数的都会继续消费)
if(messageService.isMessageConsumedSuccessfully(order.getId())){
continue;
}
messageDO = ConsumerTransactionMessageDO.builder()
.id(order.getId())
.createTime(LocalDateTime.now())
.topic(msg.getTopic())
.build();
// 业务逻辑处理
orderService.finishOrder(order);
// 如果业务逻辑抛出异常,那么会跳过插入CONSUMED
messageDO.setMessageStatus(MessageStatus.CONSUMED);
// 如果是未被消费,第一次就消费成功了,则插入
// 如果是超过重试次数,又人工设置重试,则更新状态为已被消费
messageService.insertOrUpdate(messageDO);
} catch (Exception e) {
e.printStackTrace();
// 重试次数达到最大重试次数
if (msg.getReconsumeTimes() == config.getRetryTimes()) {
log.info("客户端重试三次,需要人工处理");
messageService.update(
ConsumerTransactionMessageDO.builder()
.id(order.getId())
.messageStatus(MessageStatus.OVER_CONSUME_RETRY_TIME).build()
);
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
log.info("消费失败,进行重试,当前重试次数为: {}", msg.getReconsumeTimes());
messageDO.setMessageStatus(MessageStatus.CONSUME_FAILED);
// 如果第一次消费失败,那么插入
// 如果之前消费失败,继续重试,那么doNothing
// 如果之前是超过重试次数,人工设置重试,那么将状态改为消费失败
messageService.insertOrUpdate(messageDO);
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
}
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
- 62
- 63
ConsumerTransactionMessageServiceImpl(Consumer消息服务提供者)
public class ConsumerTransactionMessageServiceImpl implements ConsumerTransactionMessageService {
@Autowired
private ConsumerTransactionMessageDOMapper mapper;
@Transactional(readOnly = true)
@Override
public Map<Long, MessageStatus> findConsumerMessageStatuses(List<Long> ids) {
Map<Long, MessageStatus> result = new HashMap<>();
for (Long id : ids) {
MessageStatus status = mapper.findStatusById(id);
if (status != null) {
result.put(id, status);
}
}
return result;
}
@Transactional(readOnly = true)
@Override
public ConsumerTransactionMessageDO selectByPrimaryKey(Long id) {
return mapper.selectByPrimaryKey(id);
}
@Transactional
@Override
public void insert(ConsumerTransactionMessageDO record) {
mapper.insert(record);
}
@Override
public void insertOrUpdate(ConsumerTransactionMessageDO record) {
ConsumerTransactionMessageDO recordInDB = mapper.selectByPrimaryKey(record.getId());
if (recordInDB == null) {
mapper.insert(record);
} else {
recordInDB.setMessageStatus(record.getMessageStatus());
mapper.updateByPrimaryKeySelective(recordInDB);
}
}
@Transactional
@Override
public void insertIfNotExists(ConsumerTransactionMessageDO record) {
if (mapper.selectByPrimaryKey(record.getId()) == null) {
mapper.insert(record);
}
}
@Transactional
@Override
public void update(ConsumerTransactionMessageDO record) {
mapper.updateByPrimaryKeySelective(record);
}
@Transactional(readOnly = true)
@Override
public boolean isMessageConsumedSuccessfully(Long id) {
MessageStatus status = mapper.findStatusById(id);
return status == MessageStatus.CONSUMED;
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
- 37
- 38
- 39
- 40
- 41
- 42
- 43
- 44
- 45
- 46
- 47
- 48
- 49
- 50
- 51
- 52
- 53
- 54
- 55
- 56
- 57
- 58
- 59
- 60
- 61
消息管理
尚需提供一个消息的监控平台,可以搜索和查看消息的状态,尤其是需要人工处理的死信,可以回滚本地事务或重新发送。
界面类似于下图:
当前仅开发了消息管理系统的数据接口,尚未开发其客户端。
@RestController
@RequestMapping("/message_console")
public class MessageConsoleController {
@Autowired
private ProducerTransactionMessageService messageService;
@Autowired
private AccountService accountService;
@RequestMapping(value = "/query", method = RequestMethod.POST)
public PageInfo<ProducerTransactionMessageDO> findByQueryDTO(@RequestBody MessageQueryConditionDTO queryDTO) {
if (queryDTO.getPageNum() == null || queryDTO.getPageNum() <= 0) {
queryDTO.setPageNum(Integer.valueOf(PageProperties.DEFAULT_PAGE_NUM));
}
if (queryDTO.getPageSize() == null || queryDTO.getPageSize() <= 0) {
queryDTO.setPageSize(Integer.valueOf(PageProperties.DEFAULT_PAGE_SIZE));
}
return messageService.findByQueryDTO(queryDTO);
}
@RequestMapping(value = "/reSend", method = RequestMethod.POST)
public void reSend(@RequestBody MessageIdDTO dto) {
List<ProducerTransactionMessageDO> messages = messageService.findByIds(dto.getIds());
for (ProducerTransactionMessageDO messageDO : messages) {
messageDO.setMessageStatus(MessageStatus.UNCONSUMED);
messageDO.setSendTimes(0);
}
messageService.reSend(messages);
}
@RequestMapping(value = "/rollback", method = RequestMethod.POST)
public void rollback(@RequestBody MessageIdDTO dto) {
for (ProducerTransactionMessageDO message : messageService.findByIds(dto.getIds())) {
accountService.rollback(message);
}
}
}
- 1
- 2
- 3
- 4
- 5
- 6
- 7
- 8
- 9
- 10
- 11
- 12
- 13
- 14
- 15
- 16
- 17
- 18
- 19
- 20
- 21
- 22
- 23
- 24
- 25
- 26
- 27
- 28
- 29
- 30
- 31
- 32
- 33
- 34
- 35
- 36
总结
自上次开发完SpringBootSOASkeleton之后,就一直希望能完成一个数据库按业务分库和分布式事务的项目。大概花了两周,大概尝试了TCC和可靠消息最终一致两种方法,最终解决了分布式事务的问题。
TCC是我首先采用的技术,使用了Github开源的ByteTCC,但花了很多时间没有跑通,另外用起来非常复杂,对业务逻辑侵入非常大,最后是放弃了,但也留下来基于ByTeTCC的完成度比较高的代码,最后以Git的一个tag结束了它的生命周期。
然后我考虑使用MQ,尤其是原本对事务消息有所支持的RocketMQ来实现分布式事务。因为消息回查的功能被阉割,又去阅读了其源码和他人考虑的解决方案去实现它。就目前这个解决方案而言,自我感觉是比较完善的,既不是非常复杂, 又解决了RocketMQ原来存在的很多问题。但因为还是一个学生,对分布式比较缺乏经验,如果大家能发现其中存在的问题,也希望在博客下评论或Github提issue。
全部代码已经放到Github上,按照《Linux集群搭建》配置的环境下,代码是可以跑通的,只是确认消息发送失败这种场景很难模拟出来,这也是有待观察的。
参考资料
大规模SOA系统中的分布事务处事-程立
支付宝架构与技术
RocketMQ用户指南v3.2.4
高并发下的幂等策略分析
RocketMQ源码解析
分布式开放消息系统(RocketMQ)的原理与实践
更多推荐
所有评论(0)