SpringCloud+Eureka+Seata:整合分布式事务
版本说明:seata-server:1.1.0SpringCloud:Greenwich.SR5spring-cloud-starter-alibaba-seata:2.2.0.RELEASEseata-spring-boot-starter:1.2.0理论知识:何为2PC:flink专栏有篇文章介绍过,因为flink端到端的Exactly Once就是依赖2PC实现的seata的设计思想Seat
版本说明:
- seata-server:1.1.0
- SpringCloud:Greenwich.SR5
- spring-cloud-starter-alibaba-seata:2.2.0.RELEASE
- seata-spring-boot-starter:1.2.0
理论知识:
-
何为2PC:flink专栏有篇文章介绍过,因为flink端到端的Exactly Once就是依赖2PC实现的
-
seata的设计思想
Seata的设计目标其一是对业务无侵入,因此从业务无侵入的2PC方案着手,在传统2PC的基础上演进,并解决2PC方案面临的问题。
Seata把一个分布式事务理解成一个包含了若干分支事务的全局事务。全局事务的职责是协调其下管辖的分支事务达成一致,要么一起成功提交,要么一起失败回滚。 -
seata的3个组件
Transaction Coordinator (TC): 事务协调器,它是独立的中间件,需要独立部署运行,它维护全局事务的运行状态,接收TM指令发起全局事务的提交与回滚,负责与RM通信协调各各分支事务的提交或回滚。
Transaction Manager TM: 事务管理器,TM需要嵌入应用程序中工作,它负责开启一个全局事务,并最终向TC发起全局提交或全局回滚的指令。
Resource Manager (RM): 控制分支事务,负责分支注册、状态汇报,并接收事务协调器TC的指令,驱动分支(本地)事务的提交和回滚。
其中TC是单独起的一个服务,TM和RM是以jar包的形式嵌入 -
seata执行流程
1、每个RM使用DataSourceProxy连接数据库,其目的是使用ConnectionProxy,使用数据源和数据连接代理的目的就是在第一阶段将undo_log和业务数据放在一个本地事务提交,这样就保存了只要有业务操作就一定有undo_log。
2、在第一阶段undo_log中存放了数据修改前和修改后的值,为事务回滚作好准备,所以第一阶段完成就已经将分支事务提交,也就释放了锁资源。
3、TM开启全局事务开始,将XID全局事务id放在事务上下文中,通过feign调用也将XID传入下游分支事务,每个分支事务将自己的Branch ID分支事务ID与XID关联。
4、第二阶段全局事务提交,TC会通知各各分支参与者提交分支事务,在第一阶段就已经提交了分支事务,这里各个参与者只需要删除undo_log即可,并且可以异步执行,第二阶段很快可以完成。
5、第二阶段全局事务回滚,TC会通知各各分支参与者回滚分支事务,通过 XID 和 Branch ID 找到相应的回滚日志,通过回滚日志生成反向的 SQL 并执行,以完成分支事务回滚到之前的状态,如果回滚失败则会重试回滚操作。
代码实战
seata-server的安装部署:
seata-server的注册中心用eureka,配置中心用file类型
-
先启动一个eureka的注册中心,代码比较简单,贴个application.yml,我的注册中心是打包到linux上运行起来的
spring: application: name: server_center server: port: 18000 eureka: server: enable-self-preservation: false eviction-interval-timer-in-ms: 3000 instance: hostname: 192.168.217.101 client: register-with-eureka: false fetch-registry: false service-url: #服务端地址 defaultZone: http://${eureka.instance.hostname}:${server.port}/eureka/
-
seata-server的下载部署,我下载的是seata-server-1.1.0.tar.gz,解压安装
配置registry.conf:registry { # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa type = "eureka" eureka { serviceUrl = "http://192.168.217.101:18000/eureka" application = "seata_server" weight = "1" } file { name = "file.conf" } }
配置file.conf,这里选择的存储方式是db,配置下mysql数据库的信息即可
## transaction log store, only used in seata-server store { ## store mode: file、db mode = "db" ## database store property db { ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp) etc. datasource = "dbcp" ## mysql/oracle/h2/oceanbase etc. dbType = "mysql" driverClassName = "com.mysql.jdbc.Driver" url = "jdbc:mysql://localhost:3306/seata" user = "root" password = "root" minConn = 1 maxConn = 10 globalTable = "global_table" branchTable = "branch_table" lockTable = "lock_table" queryLimit = 100 } }
-
db库的配置,db库就是上面file.conf中url地址的库,脚本如下:
-- -------------------------------- The script used when storeMode is 'db' -------------------------------- -- the table to store GlobalSession data CREATE TABLE IF NOT EXISTS `global_table` ( `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `status` TINYINT NOT NULL, `application_id` VARCHAR(32), `transaction_service_group` VARCHAR(32), `transaction_name` VARCHAR(128), `timeout` INT, `begin_time` BIGINT, `application_data` VARCHAR(2000), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`xid`), KEY `idx_gmt_modified_status` (`gmt_modified`, `status`), KEY `idx_transaction_id` (`transaction_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store BranchSession data CREATE TABLE IF NOT EXISTS `branch_table` ( `branch_id` BIGINT NOT NULL, `xid` VARCHAR(128) NOT NULL, `transaction_id` BIGINT, `resource_group_id` VARCHAR(32), `resource_id` VARCHAR(256), `branch_type` VARCHAR(8), `status` TINYINT, `client_id` VARCHAR(64), `application_data` VARCHAR(2000), `gmt_create` DATETIME(6), `gmt_modified` DATETIME(6), PRIMARY KEY (`branch_id`), KEY `idx_xid` (`xid`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8; -- the table to store lock data CREATE TABLE IF NOT EXISTS `lock_table` ( `row_key` VARCHAR(128) NOT NULL, `xid` VARCHAR(96), `transaction_id` BIGINT, `branch_id` BIGINT NOT NULL, `resource_id` VARCHAR(256), `table_name` VARCHAR(32), `pk` VARCHAR(36), `gmt_create` DATETIME, `gmt_modified` DATETIME, PRIMARY KEY (`row_key`), KEY `idx_branch_id` (`branch_id`) ) ENGINE = InnoDB DEFAULT CHARSET = utf8;
-
启动eureka,再启动seata-server,启动后如下:
编写代码
这里以银行转账的例子来编写:
新建两个数据库bank1和bank2,分别创建account_info表和undo_log表,脚本如下:
bank1的account_info:
DROP TABLE IF EXISTS `account_info`;
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (2, '张三的账户', '1', '', 10000);
bank2的account_info:
CREATE TABLE `account_info` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`account_name` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '户
主姓名',
`account_no` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT '银行
卡号',
`account_password` varchar(100) CHARACTER SET utf8 COLLATE utf8_bin NULL DEFAULT NULL COMMENT
'帐户密码',
`account_balance` double NULL DEFAULT NULL COMMENT '帐户余额',
PRIMARY KEY (`id`) USING BTREE
) ENGINE = InnoDB AUTO_INCREMENT = 5 CHARACTER SET = utf8 COLLATE = utf8_bin ROW_FORMAT =
Dynamic;
INSERT INTO `account_info` VALUES (3, '李四的账户', '2', NULL, 0);
undo_log脚本,两个库都要创建这个表
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=1 DEFAULT CHARSET=utf8;
写两个微服务
seata从1.1.0开始,支持在application中配置
两个微服务的application.yml文件,贴其中一个,只需要更改下服务名、端口号、数据库信息
server:
port: 8100
servlet:
context-path: /bank1Server
spring:
application:
name: bank1Server
datasource:
type: com.alibaba.druid.pool.DruidDataSource
driver-class-name: com.mysql.jdbc.Driver
url: jdbc:mysql://192.168.217.101/bank1
username: root
password: root
druid:
initial-size: 5
min-idle: 5
max-active: 20
test-while-idle: true
test-on-borrow: false
test-on-return: false
pool-prepared-statements: true
max-pool-prepared-statement-per-connection-size: 20
max-wait: 60000
time-between-eviction-runs-millis: 60000
min-evictable-idle-time-millis: 30000
filters: stat
async-init: true
eureka:
client:
service-url:
defaultZone: http://192.168.217.101:18000/eureka/
instance:
lease-renewal-interval-in-seconds: 1
lease-expiration-duration-in-seconds: 3
prefer-ip-address: true
#用ip:port注册
instance-id: ${spring.cloud.client.ip-address}:${server.port}
#----------------------------seata----------------------------
seata:
enabled: true
application-id: seata_server
tx-service-group: default
enable-auto-data-source-proxy: true
use-jdk-proxy: false
service:
vgroup-mapping:
default: seata_server
enable-degrade: false
disable-global-transaction: false
registry:
type: eureka
eureka:
weight: 1
service-url: http://192.168.217.101:18000/eureka/
#----------------------------seata----------------------------
然后就是dao层、service层、controller层的编写,都是基础的,还有服务调用方的feign客户端的编写,这里贴下bank1的service代码:
@Service
@Slf4j
public class AccountInfoService {
@Autowired
AccountInfoMapper mapper;
@Autowired
Bank2ServerFeign feign;
//张三转账
@GlobalTransactional
@Transactional
public void updateAccountBalance(String accountNo, Double amount) {
log.info("******** Bank1 Service Begin ... xid: {}", RootContext.getXID());
//张三扣减金额
mapper.updateAccountBalance(accountNo, amount * -1);
//向李四转账
String remoteRst = feign.transfer(amount);
//远程调用失败
if (remoteRst.equals("fallback")) {
throw new RuntimeException("bank1 下游服务异常");
}
//人为制造错误
if (amount == 3) {
throw new RuntimeException("bank1 make exception 3");
}
}
}
bank1的feign和hystrix代码:
@FeignClient(value = "bank2Server/bank2Server", fallbackFactory = Bank2ServerFallbackFactory.class)
public interface Bank2ServerFeign {
@GetMapping("/transfer")
String transfer(@RequestParam("amount") Double amount);
}
@Component
public class Bank2ServerFallbackFactory implements FallbackFactory<Bank2ServerFeign> {
@Override
public Bank2ServerFeign create(Throwable throwable) {
return new Bank2ServerFeign() {
@Override
public String transfer(Double amount) {
return "fallback";
}
};
}
}
controller层代码:
@RestController
public class Bank1Controller {
@Autowired
AccountInfoService service;
//转账
@RequestMapping("/transfer")
public String transfer(Double amount) {
service.updateAccountBalance("1", amount);
return "bank1===" + amount;
}
}
另一个服务bank2Server的代码,这里贴下service层和controller层
@Service
@Slf4j
public class AccountInfoService {
@Autowired
AccountInfoMapper mapper;
@Transactional
public void updateAccountBalance(String accountNo, Double amount) {
log.info("******** Bank2 Service Begin ... xid: {}", RootContext.getXID());
//李四增加金额
mapper.updateAccountBalance(accountNo, amount);
//制造异常
if (amount == 2) {
throw new RuntimeException("bank1 make exception 2");
}
}
}
@RestController
public class Bank2Controller {
@Autowired
AccountInfoService service;
@RequestMapping("/transfer")
public String transfer(Double amount) {
service.updateAccountBalance("2", amount);
return "bank2===" + amount;
}
}
测试
分别启动两个服务,结果如下:
seata-server和两个服务都已经启动成功了!
然后浏览器请求接口
http://localhost:8100/bank1Server/transfer?amount=1
结果是bank1库中张三的余额减少了1,变为999;而bank2中李四的余额增加了1,变为1。
再测试下失败的场景:因为在代码中设置了,如果转账金额为3,则让bank1服务跑异常,即张三扣减金额失败,而bank2服务李四增加金额是成功的。请求接口:
http://localhost:8100/bank1Server/transfer?amount=3
bank1的service层中if (amount == 3) {
处打断点,此时bank2的本地事务已经提交,可以进入bank2库查看表account_info,李四的金额变为了4(之前的1+3),说明seata实现分布式事务在第一阶段各个分支事务是在本地提交事务的,这样是能释放资源锁的,比传统2PC机制效率高;然后再查看undo_log表,这个表中是有一条数据的,里面记录了提交事务之前的数据(before_image)和之后的数据(after_image)。然后把断点走完,此时bank1是抛出异常的,也就是说此时bank2的事务是需要回滚的。再次查看bank2中的account_info表,李四的金额是1,说明回滚了,而undo_log表中的数据也被删除掉了。再次说明分布式事务成功,也验证了seata实现分布式事务的机制。
更多推荐
所有评论(0)