Jta+Atomikos解决分布式事务
1.基本概念1.1 Jta+Atomikos 简介传统项目中,比如项目中使用到多数据源的时候大多数采用jta+Atomikos解决分布式事务问题,jta+Atomikos底层是基于XA协议的两阶段提交方案。1.2 XA协议XA 事务的基础是两阶段提交协议。分为以下两阶段需要有一个事务协调者来保证所有的事务参与者都完成了准备工作。如果协调者收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了
1.基本概念
1.1 Jta+Atomikos 简介
传统项目中,比如项目中使用到多数据源的时候大多数采用jta+Atomikos解决分布式事务问题,jta+Atomikos底层是基于XA协议的两阶段提交方案。
1.2 XA协议
XA 事务的基础是两阶段提交协议。分为以下两阶段
需要有一个事务协调者来保证所有的事务参与者都完成了准备工作。
如果协调者收到所有参与者都准备好的消息,就会通知所有的事务都可以提交了。
Mysql 在这个XA事务中扮演的是参与者的角色,而不是协调者(事务管理器)。
1.3 JTA
JTA(java Transaction API)是JavaEE 13 个开发规范之一。java 事务API,允许应用程序执行分布式事务处理——在两个或多个网络计算机资源上访问并且更新数据。JDBC驱动程序的JTA支持极大地增强了数据访问能力。事务最简单最直接的目的就是保证数据的有效性,数据的一致性
1.4 Atomikos
Atomikos TransactionsEssentials 是一个为Java平台提供增值服务的并且开源类事务管理器
2.阶段提交协议
2.1 两阶段提交协议
第一阶段(准备阶段):
协调者向参与者发起指令,参与者评估自己的状态,如果参与者评估指令可以完成,则会写redo或者undo日志,让后锁定资源,执行操作,但并不提交。
第二阶段:
如果每个参与者明确返回准备成功,则协调者向参与者发送提交指令,参与者释放锁定的资源,如何任何一个参与者明确返回准备失败,则协调者会发送中指指令,参与者取消已经变更的事务,释放锁定的资源。
两阶段提交方案应用非常广泛,几乎所有商业OLTP数据库都支持XA协议。但是两阶段提交方案锁定资源时间长,对性能影响很大,基本不适合解决微服务事务问题。
缺点:如果协调者宕机,参与者没有协调者指挥,则会一直阻塞。
2.2 三阶段提交协议
三阶段提交协议是两阶段提交协议的改进版本。它通过超时机制解决了阻塞的问题,并且把两个阶段增加为三个阶段:
询问阶段:协调者询问参与者是否可以完成指令,协调者只需要回答是还是不是,而不需要做真正的操作,这个阶段超时导致中止。
准备阶段:如果在询问阶段所有的参与者都返回可以执行操作,协调者向参与者发送预执行请求,然后参与者写redo和undo日志,执行操作,但是不提交操作;如果在询问阶段任何参与者返回不能执行操作的结果,则协调者向参与者发送中止请求,这里的逻辑与两阶段提交协议的的准备阶段是相似的,这个阶段超时导致成功
提交阶段:如果每个参与者在准备阶段返回准备成功,也就是预留资源和执行操作成功,协调者向参与者发起提交指令,参与者提交资源变更的事务,释放锁定的资源;如果任何一个参与者返回准备失败,也就是预留资源或者执行操作失败,协调者向参与者发起中止指令,参与者取消已经变更的事务,执行undo日志,释放锁定的资源,这里的逻辑与两阶段提交协议的提交阶段一致
2.3 2PC与3PC提交区别
三阶段提交协议与两阶段提交协议相比,优点:增加了一个询问阶段,询问阶段可以确保尽可能早的发现无法执行操作而需要中止的行为,但是它并不能发现所有的这种行为,只会减少这种情况的发生在准备阶段以后,协调者和参与者执行的任务中都增加了超时,一旦超时,协调者和参与者都继续提交事务,默认为成功,这也是根据概率统计上超时后默认成功的正确性最大
但是一旦发生超时,系统仍然会发生不一致,只不过这种情况很少见罢了,好处就是至少不会阻塞和永远锁定资源。
3.SpringBoot代码实现
3.1 添加依赖
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-jta-atomikos</artifactId>
</dependency>
3.2 配置两个数据源
mybatis:
type-aliases-package: com.atomikos.apps.model
spring:
datasource:
test1:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.54:3306/db_1?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
password: root#qq.com
username: root
test2:
driver-class-name: com.mysql.cj.jdbc.Driver
jdbc-url: jdbc:mysql://192.168.1.54:3306/db_2?serverTimezone=UTC&useUnicode=true&characterEncoding=utf-8&useSSL=true
password: root#qq.com
username: root
package com.atomikos.apps.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.datasource.test1")
public class DBConfig1 {
private String jdbcUrl;
private String username;
private String password;
}
package com.atomikos.apps.config;
import lombok.Data;
import org.springframework.boot.context.properties.ConfigurationProperties;
@Data
@ConfigurationProperties(prefix = "spring.datasource.test2")
public class DBConfig2 {
private String jdbcUrl;
private String username;
private String password;
}
package com.atomikos.apps.datasource;
import com.atomikos.apps.config.DBConfig1;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Primary;
import javax.sql.DataSource;
import java.sql.SQLException;
@Configuration
@MapperScan(basePackages = "com.atomikos.apps.mapper.test1", sqlSessionTemplateRef = "test1SqlSessionTemplate")
public class DataSource1Config {
@Bean(name = "test1DataSource")
@Primary
public DataSource testDataSource(DBConfig1 dbConfig1) throws SQLException {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setURL(dbConfig1.getJdbcUrl());
mysqlXADataSource.setPassword(dbConfig1.getPassword());
mysqlXADataSource.setUser(dbConfig1.getUsername());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
//创建atomikos全局事务
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXADataSource);
xaDataSource.setUniqueResourceName("test1DataSource");
return xaDataSource;
}
@Bean(name = "test1SqlSessionFactory")
@Primary
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test1DataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test1SqlSessionTemplate")
@Primary
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("test1SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
package com.atomikos.apps.datasource;
import com.atomikos.apps.config.DBConfig2;
import com.mysql.jdbc.jdbc2.optional.MysqlXADataSource;
import org.apache.ibatis.session.SqlSessionFactory;
import org.mybatis.spring.SqlSessionFactoryBean;
import org.mybatis.spring.SqlSessionTemplate;
import org.mybatis.spring.annotation.MapperScan;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.jta.atomikos.AtomikosDataSourceBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import javax.sql.DataSource;
import java.sql.SQLException;
/**
* 描述:DataSource2Config
* @author: myx
* @date: 2019/5/31
* Copyright © 2019-grape. All rights reserved.
*/
@Configuration
@MapperScan(basePackages = "com.atomikos.apps.mapper.test2", sqlSessionTemplateRef = "test2SqlSessionTemplate")
public class DataSource2Config {
@Bean(name = "test2DataSource")
public DataSource testDataSource(DBConfig2 dbConfig2) throws SQLException {
MysqlXADataSource mysqlXADataSource = new MysqlXADataSource();
mysqlXADataSource.setURL(dbConfig2.getJdbcUrl());
mysqlXADataSource.setPassword(dbConfig2.getPassword());
mysqlXADataSource.setUser(dbConfig2.getUsername());
mysqlXADataSource.setPinGlobalTxToPhysicalConnection(true);
//创建atomikos全局事务
AtomikosDataSourceBean xaDataSource = new AtomikosDataSourceBean();
xaDataSource.setXaDataSource(mysqlXADataSource);
xaDataSource.setUniqueResourceName("test2DataSource");
return xaDataSource;
}
@Bean(name = "test2SqlSessionFactory")
public SqlSessionFactory testSqlSessionFactory(@Qualifier("test2DataSource") DataSource dataSource) throws Exception {
SqlSessionFactoryBean bean = new SqlSessionFactoryBean();
bean.setDataSource(dataSource);
return bean.getObject();
}
@Bean(name = "test2SqlSessionTemplate")
public SqlSessionTemplate testSqlSessionTemplate(@Qualifier("test2SqlSessionFactory") SqlSessionFactory sqlSessionFactory) throws Exception {
return new SqlSessionTemplate(sqlSessionFactory);
}
}
3.3 测试
@Test
@Transactional
public void testAtomikos() {
user1Mapper.insert(new User("测试1", "11111111", "男"));
int i=1/0;
user2Mapper.insert(new User("测试2", "11111111", "女"));
}
更多推荐
所有评论(0)