注意(重要)
1、本文基于 windows 环境下 springcloud + nacos2.0.+ + seata1.4.+ + mysql5.7.+ 实现分布式事务案例。
2、框架集成版本对照:https://github.com/alibaba/spring-cloud-alibaba/wiki/%E7%89%88%E6%9C%AC%E8%AF%B4%E6%98%8E
3、springcloud + nacos2.0.+(包含持久化):https://blog.csdn.net/qq_36763419/article/details/121179174

Seata 概念

1、SeataSeata 是一款开源的分布式事务解决方案,致力于在微服务架构下提供高性能和简单易用的分布式事务服务。

2、分布式事务问题?
以前单体的应用被拆分为微服务应用,原来的三个模块被拆分成三个独立的应用,分别使用三个独立的数据源,业务操作需要用三个服务来完成,此时每个服务内部的数据一致性由本地事务来保证,但是全局的数据一致性没法保证总之一句话:一次业务操作需要跨多个数据源或需要跨多个系统进行远程调用,就会产生分布式事务问题。官方案例:https://seata.io/zh-cn/docs/user/quickstart.html

  • 仓储(库存)服务:对给定的商品扣除库存。
  • 订单服务:根据采购需求创建订单。
  • 账户服务:从用户账号中扣除余额。
    在这里插入图片描述

Seata 简介

官方文档:https://seata.io/zh-cn/docs/overview/what-is-seata.html

一、一个经典的分布式事务过程:XID(Transaction ID)+三个组件模型。
在这里插入图片描述
1、XID(TranSaction ID):全局唯一的事务 ID 。
2、三个组件模型概念:
(1)TC(Transaction Coordinator):事务协调器。维护全局事务的运行状态,负责协调并驱动全局事务的提交和回滚。
(2)TM(Transaction Manager):事务管理器。定义全局事务的范围:开始全局事务、提交或回滚全局事务。
(3)RM(Resource Manager):资源管理器。管理分支事务处理的资源,与 TC(事务协调器)交互,注册分支事务状态汇报。接受 TC 的指令,并驱动分支(本地)事务的提交或回滚。
3、分布式事务处理过程:
(1)TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成一个全局唯一的 XID;
(2)XID 在微服务调用链路的上下文中传播;
(3)RM 向 TC 注册分支事务,将其纳入 XID 对应的全局事务管辖;
(4)TM 向 TC 发起针对 XID 的全局提交或回滚决议;
(5)TC 调度 XID 管辖的全部分支事务完成提交或回滚的请求。

Seata 集群高可用部署

在这里插入图片描述

seata-server 的安装配置

配置文件以及seata_server数据库表结构:下载 seata 项目源文件(source),在解压目录下的script/ 目录 有配置文件脚本以及seata_server数据库表脚本等信息。将整个 script 文件夹拷贝放到 seata-server 的解压目录,便于后续操作。【重要】

1、下载 seata-server 地址:https://github.com/seata/seata/releases/tag/v1.4.2
2、解压到指定目录,并修改 conf 目录下的 file.conf 配置文件(先备份文件)。主要修改:自定义事务组名称+事务日志存储模式为db+数据库连接信息。
(1)修改 mode = "db",配置数据源之后支持 seata server 高可用。
(2)修改数据库连接信息
修改结果如下:

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://127.0.0.1:3306/seata_server?rewriteBatchedStatements=true"
    user = "root"
    password = "123456"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }
}

3、创建数据库【MySQL5.7.+】:seata_server

-- -------------------------------- The script used when storeMode is 'db' --------------------------------
-- the table to store GlobalSession data
CREATE TABLE IF NOT EXISTS `global_table`
(
    `xid`                       VARCHAR(128) NOT NULL,
    `transaction_id`            BIGINT,
    `status`                    TINYINT      NOT NULL,
    `application_id`            VARCHAR(32),
    `transaction_service_group` VARCHAR(32),
    `transaction_name`          VARCHAR(128),
    `timeout`                   INT,
    `begin_time`                BIGINT,
    `application_data`          VARCHAR(2000),
    `gmt_create`                DATETIME,
    `gmt_modified`              DATETIME,
    PRIMARY KEY (`xid`),
    KEY `idx_gmt_modified_status` (`gmt_modified`, `status`),
    KEY `idx_transaction_id` (`transaction_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store BranchSession data
CREATE TABLE IF NOT EXISTS `branch_table`
(
    `branch_id`         BIGINT       NOT NULL,
    `xid`               VARCHAR(128) NOT NULL,
    `transaction_id`    BIGINT,
    `resource_group_id` VARCHAR(32),
    `resource_id`       VARCHAR(256),
    `branch_type`       VARCHAR(8),
    `status`            TINYINT,
    `client_id`         VARCHAR(64),
    `application_data`  VARCHAR(2000),
    `gmt_create`        DATETIME(6),
    `gmt_modified`      DATETIME(6),
    PRIMARY KEY (`branch_id`),
    KEY `idx_xid` (`xid`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

-- the table to store lock data
CREATE TABLE IF NOT EXISTS `lock_table`
(
    `row_key`        VARCHAR(128) NOT NULL,
    `xid`            VARCHAR(96),
    `transaction_id` BIGINT,
    `branch_id`      BIGINT       NOT NULL,
    `resource_id`    VARCHAR(256),
    `table_name`     VARCHAR(32),
    `pk`             VARCHAR(36),
    `gmt_create`     DATETIME,
    `gmt_modified`   DATETIME,
    PRIMARY KEY (`row_key`),
    KEY `idx_branch_id` (`branch_id`)
) ENGINE = InnoDB
  DEFAULT CHARSET = utf8;

4、修改 conf 目录下的 registry.conf 配置文件(先备份文件)。修改注册中心配置中心,此处使用的 nacos

# 注册中心:将 Seata 注册到 nacos 注册中心,供客户端访问
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = "0b0b9b5d-4e28-44c1-a359-8975169fb30b"
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
}

# 配置中心:将 seata 服务的配置发布到 nacos 配置中心
config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = "0b0b9b5d-4e28-44c1-a359-8975169fb30b"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
}

5、配置中心配置(复制后删除注释)
(1)修改数据源配置,与 file.conf 配置文件的数据源配置一致。
(2)修改事务分组

transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableClientBatchSendRequest=false
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
# (复制后删除注释)事务分组:my_test_tx_group 自定事务分组名称,客户端配置访问事务分组需要与此处名称一致,此处的 default 和客户端配置文件中的注册中心的 cluster 一致
service.vgroupMapping.my_test_tx_group=default
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=false
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
# (复制后删除注释)修改存储模式
store.mode=db
store.publicKey=
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100
# (复制后删除注释)修改数据源配置
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://127.0.0.1:3306/seata_server?useUnicode=true&rewriteBatchedStatements=true
store.db.user=root
store.db.password=123456
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
log.exceptionRate=100
transport.serialization=seata
transport.compressor=none
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898

(3)将配置信息发布到 nacos 配置中心:找到 seata-server 解压目录下的script/config-center/nacos/nacos-config.sh 文件(linux环境下直接启动,windows环境下可以使用git启动),启动就能将"script/config-center/config.txt"配置信息发布到 nacos 配置中心。配置发布命令如下:

sh ${SEATAPATH}/script/config-center/nacos/nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t 5a3c7d6c-f497-4d68-a71a-2e5e3340b3ca -u username -w password

如下指令参数说明可打开 nacos-config.sh 查看,方可知道为何是这样的指令:

参数名词说明
-hhost主机地址,默认值 localhost
-pport端口,默认值 8848
-ggroup分组
-tnamespace id命名空间 ID
-uusername登录用户名
-wpassword密码

seata 启动

一、Windows 环境下(这里不掩饰集群启动)
直接双击解压目录下的 bin 目录下的 seata-server.bat即可完成 seata 服务的启动。

二、Linux 环境下

sh ${SEATAPATH}/bin/seata-server.sh -h localhost -p 8091 -m db -n 1 -e dev

参数说明如下:

参数全写作用备注
-h–host指定注册中心注册的IP不指定获取当前的IP,外部王文部署在云环境和容器中的 server 建议指定
-p–port指定 server 启动的端口默认为8091
-m–storeMode事务日志存储方式支持file、db、redis,默认为 file ,注意 redis 需要 seata-server1.3 以上支持
-n–serverNode用于指定 seata-server 节点ID如 1、2、3…等,默认为1,这里可以配合修改端口和节点ID 来表示启动多个seata-server服务端
-e–seataEnv指定 seata-server 运行环境如 dev、test 等,服务启动时会用使用 registry-dev.conf、registry-test.conf 配置文件,默认不加 -e 参数表示启动时加载 registry.conf

分布式业务实现

订单 / 库存 / 账户 业务 数据库及表 准备

【前提:保证 nacos 和 seata 能够正常启动成功】
分布式事务业务说明:当用户下单时,会在订单服务中创建一个订单,然后通过远程调用库存服务来扣减商品库存,在通过远程调用账户服务来扣减用户账户余额,最后在订单服务中修改订单状态为已完成。下订单 --> 减库存 --> 扣账户(余额)

一、创建业务数据库:seata_orderseata_storageseata_account
二、创建响应的数据库表
1、seata_order 数据库下创建 t_order 表

CREATE TABLE if not exists t_order(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    user_id BIGINT(11) DEFAULT NULL COMMENT '用户id',
    product_id BIGINT(11) DEFAULT NULL COMMENT '产品id',
    count INT(11) DEFAULT NULL COMMENT '数量',
    money DECIMAL(11,0) DEFAULT NULL COMMENT '金额',
    state INT(1) DEFAULT NULL COMMENT '订单状态:0:创建中; 1:已完结'
) ENGINE=INNODB AUTO_INCREMENT=7 DEFAULT CHARSET=utf8;

SELECT * FROM t_order;

2、seata_storage 数据库下建 t_storage 表

CREATE TABLE if not exists t_storage(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY,
    product_id BIGINT(11) DEFAULT NULL COMMENT '产品id',
    total INT(11) DEFAULT NULL COMMENT '总库存',
    used INT(11) DEFAULT NULL COMMENT '已用库存',
    residue INT(11) DEFAULT NULL COMMENT '剩余库存'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;

INSERT INTO seata_storage.t_storage(id,product_id,total,used,residue) VALUES('1','1','100','0','100');
SELECT * FROM t_storage;

3、seata_account库下建 t_account 表

CREATE TABLE if not exists t_account(
    id BIGINT(11) NOT NULL AUTO_INCREMENT PRIMARY KEY COMMENT 'id',
    user_id BIGINT(11) DEFAULT NULL COMMENT '用户id',
    total DECIMAL(10,0) DEFAULT NULL COMMENT '总额度',
    used DECIMAL(10,0) DEFAULT NULL COMMENT '已用余额',
    residue DECIMAL(10,0) DEFAULT '0' COMMENT '剩余可用额度'
) ENGINE=INNODB AUTO_INCREMENT=2 DEFAULT CHARSET=utf8;
 
INSERT INTO t_account(id,user_id,total,used,residue) VALUES('1','1','1000','0','1000');
SELECT * FROM t_account;

三、按照上述 3 库分别建对应的回滚日志表
注意:订单-库存-账户 3个数据库下都需要建各自的回滚日志表,混滚日志表在 seata-server 解压目录下的 script\client\at\db\mysql.sql

-- for AT mode you must to init this sql for you business database. the seata server not need it.
CREATE TABLE IF NOT EXISTS `undo_log`
(
    `id`		    BIGINT	     NOT NULL AUTO_INCREMENT,
    `branch_id`     BIGINT       NOT NULL COMMENT 'branch transaction id',
    `xid`           VARCHAR(128) NOT NULL COMMENT 'global transaction id',
    `context`       VARCHAR(128) NOT NULL COMMENT 'undo_log context,such as serialization',
    `rollback_info` LONGBLOB     NOT NULL COMMENT 'rollback info',
    `log_status`    INT(11)      NOT NULL COMMENT '0:normal status,1:defense status',
    `log_created`   DATETIME(6)  NOT NULL COMMENT 'create datetime',
    `log_modified`  DATETIME(6)  NOT NULL COMMENT 'modify datetime',
	PRIMARY KEY (`id`),
    UNIQUE KEY `ux_undo_log` (`xid`, `branch_id`)
) ENGINE = InnoDB AUTO_INCREMENT = 1 DEFAULT CHARSET = utf8 COMMENT ='AT transaction mode undo table';

四、最终数据库及表如下
在这里插入图片描述

订单 / 库存 / 账户 业务 微服务 准备

业务需求:下订单 --> 减库存 --> 扣余额 -->修改(订单)状态

订单模块(order-moudule)

订单模块 seata-order-service2001 代码:https://github.com/1914526816lhw/cloud-microservice/tree/master/seata-order-service2001

库存模块(storage-moudule)

库存模块 seata-storage-service2002 代码:https://github.com/1914526816lhw/cloud-microservice/tree/master/seata-storage-service2002

账户模块(account-moudule)

账户模块 seata-account-service2003 代码:https://github.com/1914526816lhw/cloud-microservice/tree/master/seata-account-service2003

Seata分布式事务原理简介

在这里插入图片描述

TC/TM/RM三大组件,分布式事务的执行流程

1、TM 开启分布式事务。(TM 向 TC 注册全局事务记录)
2、换业务场景,编排数据库、服务等事务内部资源。(RM 向 TC 汇报资源准备状态)
3、TM 结束分布式事务,开启一阶段结束。(TM 通知 TC 提交/回滚分布式事务)
4、TC 汇总事务信息,决定分布式事务是提交还是回滚。
5、TC 通知所有的 RM 提交/回滚资源,事务二阶段结束。

AT 模式如何做到对业务的无侵入

AT 模式:https://seata.io/zh-cn/docs/overview/what-is-seata.html

一阶段加载

在一阶段加载中,seata 会拦截 业务SQL
1、解析 SQL 语义,找到 业务SQL 要更新的业务数据,在业务数据被更新前,将其保存为 before image
2、执行 业务SQL 更新业务数据;
3、在业务数据更新之后,其保存成 after image,最后生成行锁。
以上操作全部在一个数据库事务内完成,这样保证了一阶段操作的原子性。
在这里插入图片描述

二阶段提交

二阶段如果是提交的话:在一阶段 业务SQL 已经提交至数据库,所以 seata 框架只需将一阶段保存的快照数据和行锁删掉,完成数据清理即可
在这里插入图片描述

二阶段回滚

二阶段如果是混滚的话,seata 就需要回滚一阶段已执行的 业务SQL,还原业务数据。回滚方式便是用 before image 还原业务数据;但在还原钱要校验脏写,对比 数据库当前数据after image,如果两份数据完全一致就说明没有脏写,可以还原业务数据;如果不一致就说明有脏写,出现脏写就需要转人工处理。
在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐