架构(三)分布式事务seata使用
微服务的环境下,数据库隔离,数据不一致的情况越来越多,要么建立补偿机制要么手动刷数据.这时候就需要分布式事务,有问题上下游一起回滚,但是数据的可见性、链路时长还有其他很多原因说明链路不适宜拉的过长。
一、引言
微服务的环境下,数据库隔离,数据不一致的情况越来越多,要么建立补偿机制要么手动刷数据,这时候就需要分布式事务,有问题上下游一起回滚,但是数据的可见性、链路时长还有其他很多原因说明链路不适宜拉的过长。
转换实际使用场景,博主主要是spring cloud feign的http调用,这里也主要描述该环境下的原理
1、上下游进入全局事务
2、回滚
至于每个服务到底是怎么回滚的,那就要看一下源代码了
二、使用
1、服务端
作者使用的是db模式
在server使用的数据库建立了global_table、branch_table、lock_table,这些是服务端用于记录全局事务并进行锁表控制的记录表。
CREATE TABLE `global_table` (
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`status` tinyint(4) NOT NULL,
`application_id` varchar(32) DEFAULT NULL,
`transaction_service_group` varchar(32) DEFAULT NULL,
`transaction_name` varchar(128) DEFAULT NULL,
`timeout` int(11) DEFAULT NULL,
`begin_time` bigint(20) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`xid`),
KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `branch_table` (
`branch_id` bigint(20) NOT NULL,
`xid` varchar(128) NOT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`resource_group_id` varchar(32) DEFAULT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`branch_type` varchar(8) DEFAULT NULL,
`status` tinyint(4) DEFAULT NULL,
`client_id` varchar(64) DEFAULT NULL,
`application_data` varchar(2000) DEFAULT NULL,
`gmt_create` datetime(6) DEFAULT NULL,
`gmt_modified` datetime(6) DEFAULT NULL,
PRIMARY KEY (`branch_id`),
KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
CREATE TABLE `lock_table` (
`row_key` varchar(128) NOT NULL,
`xid` varchar(96) DEFAULT NULL,
`transaction_id` bigint(20) DEFAULT NULL,
`branch_id` bigint(20) NOT NULL,
`resource_id` varchar(256) DEFAULT NULL,
`table_name` varchar(32) DEFAULT NULL,
`pk` varchar(36) DEFAULT NULL,
`gmt_create` datetime DEFAULT NULL,
`gmt_modified` datetime DEFAULT NULL,
PRIMARY KEY (`row_key`),
KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;
修改file.conf
## transaction log store, only used in seata-server
store {
## store mode: file、db、redis
mode = "db"
## rsa decryption public key
publicKey = ""
## file store property
file {
## store location dir
dir = "sessionStore"
# branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
maxBranchSessionSize = 16384
# globe session size , if exceeded throws exceptions
maxGlobalSessionSize = 512
# file buffer size , if exceeded allocate new buffer
fileWriteBufferCacheSize = 16384
# when recover batch read size
sessionReloadReadSize = 100
# async, sync
flushDiskMode = async
}
## database store property
db {
## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
datasource = "druid"
## mysql/oracle/postgresql/h2/oceanbase etc.
dbType = "mysql"
driverClassName = "com.mysql.jdbc.Driver"
## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
//选择db模式将此处修改为global_table等表所在的库
url = "jdbc:mysql://**?zeroDateTimeBehavior=convertToNull&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true"
user = "**"
password = "**"
minConn = 5
maxConn = 100
globalTable = "global_table"
branchTable = "branch_table"
lockTable = "lock_table"
queryLimit = 100
maxWait = 5000
}
## redis store property
redis {
## redis mode: single、sentinel
mode = "single"
## single mode property
single {
host = "127.0.0.1"
port = "6379"
}
## sentinel mode property
sentinel {
masterName = ""
## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
sentinelHosts = ""
}
password = ""
database = "0"
minConn = 1
maxConn = 10
maxTotal = 100
queryLimit = 100
}
}
服务端需要加入到注册中心,目的是使客户端找到seata.registry配置的application,客户端会与服务端建立连接
作者使用的是eureka,修改registry.conf
registry {
# file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
type = "eureka"
nacos {
application = "seata-server"
serverAddr = "127.0.0.1:8848"
group = "SEATA_GROUP"
namespace = ""
cluster = "default"
username = ""
password = ""
}
eureka {
//修改为客户端向eureka注册的路径
serviceUrl = "***"
//定义seata服务端的名称,客户端要配置相同名称
application = "seata-server"
weight = "1"
}
redis {
serverAddr = "localhost:6379"
db = 0
password = ""
cluster = "default"
timeout = 0
}
zk {
cluster = "default"
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
}
consul {
cluster = "default"
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
etcd3 {
cluster = "default"
serverAddr = "http://localhost:2379"
}
sofa {
serverAddr = "127.0.0.1:9603"
application = "default"
region = "DEFAULT_ZONE"
datacenter = "DefaultDataCenter"
cluster = "default"
group = "SEATA_GROUP"
addressWaitTime = "3000"
}
file {
name = "file.conf"
}
}
config {
# file、nacos 、apollo、zk、consul、etcd3
type = "file"
nacos {
serverAddr = "127.0.0.1:8848"
namespace = ""
group = "SEATA_GROUP"
username = ""
password = ""
dataId = "seataServer.properties"
}
consul {
serverAddr = "127.0.0.1:8500"
aclToken = ""
}
apollo {
appId = "seata-server"
## apolloConfigService will cover apolloMeta
apolloMeta = "http://192.168.1.204:8801"
apolloConfigService = "http://192.168.1.204:8080"
namespace = "application"
apolloAccesskeySecret = ""
cluster = "seata"
}
zk {
serverAddr = "127.0.0.1:2181"
sessionTimeout = 6000
connectTimeout = 2000
username = ""
password = ""
nodePath = "/seata/seata.properties"
}
etcd3 {
serverAddr = "http://localhost:2379"
}
file {
name = "file.conf"
}
}
2、客户端
客户端服务建立undo_log,存储的是服务协议回滚的信息。
CREATE TABLE `undo_log` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`branch_id` bigint(20) NOT NULL,
`xid` varchar(100) NOT NULL,
`context` varchar(128) NOT NULL,
`rollback_info` longblob NOT NULL,
`log_status` int(11) NOT NULL,
`log_created` datetime NOT NULL,
`log_modified` datetime NOT NULL,
`ext` varchar(100) DEFAULT NULL,
PRIMARY KEY (`id`),
UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=83 DEFAULT CHARSET=utf8mb4;
客户端一开始选用的是最新的1.4.2,但是这个有坑在,具体看后面
<!--seata-->
<dependency>
<groupId>io.seata</groupId>
<artifactId>seata-spring-boot-starter</artifactId>
<version>1.4.2</version>
</dependency>
客户端需要对事务分组、服务名等进行配置
#事务分组,与seata-server配置的分组对应
seata.tx-service-group = my_test_tx_group
#配置TC服务名
seata.service.vgroupMapping.my_test_tx_group = seata-server
#注册中心,与TC一致
seata.registry.type = eureka
seata.registry.eureka.service-url = ***
//这里的名称要和服务端配置的注册中心一致
seata.registry.eureka.application = seata-server
seata.registry.eureka.weight = 1
XA模式使用加@GlobalTransactional就可以,的确很方便
@Override
@GlobalTransactional
public void testSeata(MacUnserviceDayEntity macUnserviceDayEntity) {
String xid = RootContext.getXID();
this.save(macUnserviceDayEntity);
bitClient.departmentPage();
int i = 1/0;
}
但是测试之后下游服务没有回滚,应该是seata的全局事务xid没有传到下游
作者尝试引入spring-cloud-starter-alibaba-seata,很遗憾客户端无法启动
跟seata的创作者聊了一下,他们认为是博主公司父pom的boot版本太低了,这里博主也要吐槽一下父pom的boot才1.5.22,很多人反映过这个boot版本低有影响,但是负责人就是不改,没办法只能找其他办法
3、原因分析
既然是xid没有传递到下级服务,看看能不能自己传吧
上游服务定义配置类把xid加载到feign的请求头
@Configuration
public class FeignConfigSeata implements RequestInterceptor {
@Override
public void apply(RequestTemplate requestTemplate) {
//从seata的RootContext中取出xid
String xid = RootContext.getXID();
if (StringUtils.isNotBlank(xid)) {
System.out.println("feign 获得分布式事务xid:"+xid);
}
//把xid放入feign的请求头里面
requestTemplate.header("Fescar-Xid", xid);
}
}
下游服务从feign请求头取出xid进行seata的RootContext绑定
@Configuration
public class FescarXidFilter extends OncePerRequestFilter {
protected Logger logger = LoggerFactory.getLogger(FescarXidFilter.class);
@Override
protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
String xid = RootContext.getXID();
//获取请求头中的xid
String restXid = request.getHeader("Fescar-Xid");
boolean bind = false;
if(StringUtils.isBlank(xid)&&StringUtils.isNotBlank(restXid)){
//使用seata的RootContext进行全局事务id的绑定
RootContext.bind(restXid);
bind = true;
if (logger.isDebugEnabled()) {
logger.debug("bind[" + restXid + "] to RootContext");
}
}
try{
filterChain.doFilter(request, response);
} finally {
if (bind) {
String unbindXid = RootContext.unbind();
if (logger.isDebugEnabled()) {
logger.debug("unbind[" + unbindXid + "] from RootContext");
}
if (!restXid.equalsIgnoreCase(unbindXid)) {
logger.warn("xid in change during http rest from " + restXid + " to " + unbindXid);
if (unbindXid != null) {
RootContext.bind(unbindXid);
logger.warn("bind [" + unbindXid + "] back to RootContext");
}
}
}
}
}
}
再试一下,然后还是没传过来,打断点看了一下,从上游的拦截开始就没有从RootContext中获取到,但是在测试方法是有的
想了一会,hystrix是有线程池的,使用线程池中的线程进行请求调用再把数据加载到原始线程,这样原始线程的ThreadLocal就没带过来啊
这里可以使用拦截器把原始线程与hystrix的头信息进行绑定,最终成功回滚,在undo_log中可以看到seata的回滚日志。
public class ClientConfig {
private static final String X_ID= "TX_XID";
@Bean
public Encoder multipartFormEncoder(ObjectFactory<HttpMessageConverters> messageConverters) {
return new SpringFormEncoder(new SpringEncoder(messageConverters));
}
@Bean
public RequestInterceptor headerInterceptor() {
return template -> {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest();
String xId = Objects.nonNull(request.getAttribute(X_ID) ? String.valueOf(request.getAttribute(X_ID)) : null;
template.header(X_ID, xId)
};
}
}
四、原理
AT(补偿回滚)
这次体验使用的是AT模式,其实是使用了暂时存储当前的sql语句和之前的数据镜像,如果需要回滚就解析生成回滚的sql。
1、xid传递与拦截
xid作为seata传递核心的全局事务唯一键,需要被传递给下游服务
下游服务主要是实现了spring的HandlerInterceptor,然后在preHandle方法中获取请求头的TX_XID,然后放到自己的ThreadLocal里面
public class TransactionPropagationInterceptor implements HandlerInterceptorAdapter {
private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);
@Override
public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
return this.bindXid(rpcXid);
}
@Override
public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
if (RootContext.inGlobalTransaction()) {
String rpcXid = request.getHeader(RootContext.KEY_XID);
this.cleanXid(rpcXid);
}
}
protected boolean bindXid(String rpcXid) {
String xid = RootContext.getXID();
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
}
if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
RootContext.bind(rpcXid);
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("bind[{}] to RootContext", rpcXid);
}
}
return true;
}
protected void cleanXid(String rpcXid) {
XidResource.cleanXid(rpcXid);
}
}
2、回滚
在DataSourceManager的branchRollback方法中进行回滚
public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
String applicationData) throws TransactionException {
DataSourceProxy dataSourceProxy = get(resourceId);
if (dataSourceProxy == null) {
throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
}
try {
//根据数据源的db类型获取对应处理类
UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
if (LOGGER.isInfoEnabled()) {
LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);
}
} catch (TransactionException te) {
StackTraceLogger.error(LOGGER, te,
"branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
} else {
return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
}
}
return BranchStatus.PhaseTwo_Rollbacked;
}
取到处理类执行undo方法,通过"SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE "+ ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " = ? AND " + ClientTableColumnsName.UNDO_LOG_XID+ " = ? FOR UPDATE";获取客户端存储在undolog表里面的数据
public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
ConnectionProxy connectionProxy = null;
Connection conn = null;
ResultSet rs = null;
PreparedStatement selectPST = null;
boolean originalAutoCommit = true;
for (; ; ) {
try {
connectionProxy = dataSourceProxy.getConnection();
conn = connectionProxy.getTargetConnection();
// The entire undo process should run in a local transaction.
if (originalAutoCommit = conn.getAutoCommit()) {
conn.setAutoCommit(false);
}
// Find UNDO LOG
selectPST = conn.prepareStatement(buildSelectUndoSql());
selectPST.setLong(1, branchId);
selectPST.setString(2, xid);
rs = selectPST.executeQuery();
boolean exists = false;
while (rs.next()) {
exists = true;
if (!canUndo(state)) {
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
}
return;
}
String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
Map<String, String> context = parseContext(contextString);
byte[] rollbackInfo = getRollbackInfo(rs);
//拿到context
String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
: UndoLogParserFactory.getInstance(serializer);
BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);
try {
// put serializer name to local
setCurrentSerializer(parser.getName());
//解析出类SQLUndoLog
List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
if (sqlUndoLogs.size() > 1) {
Collections.reverse(sqlUndoLogs);
}
for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
sqlUndoLog.setTableMeta(tableMeta);
AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
dataSourceProxy.getDbType(), sqlUndoLog);
undoExecutor.executeOn(connectionProxy);
}
} finally {
// remove serializer name
removeCurrentSerializer();
}
}
if (exists) {
deleteUndoLog(xid, branchId, conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
State.GlobalFinished.name());
}
} else {
insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
conn.commit();
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
State.GlobalFinished.name());
}
}
return;
} catch (SQLIntegrityConstraintViolationException e) {
// Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
if (LOGGER.isInfoEnabled()) {
LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
}
} catch (Throwable e) {
if (conn != null) {
try {
conn.rollback();
} catch (SQLException rollbackEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
}
}
if (e instanceof SQLUndoDirtyException) {
throw new BranchTransactionException(BranchRollbackFailed_Unretriable, String.format(
"Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = %s branchId = %s",
xid, branchId), e);
}
throw new BranchTransactionException(BranchRollbackFailed_Retriable,
String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
branchId, e.getMessage()),
e);
} finally {
try {
if (rs != null) {
rs.close();
}
if (selectPST != null) {
selectPST.close();
}
if (conn != null) {
if (originalAutoCommit) {
conn.setAutoCommit(true);
}
connectionProxy.close();
}
} catch (SQLException closeEx) {
LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
}
}
}
}
在SQLUndoLog类中主要是这几个属性,通过语句类型和前镜像就可以补偿回滚到之前的数据
private SQLType sqlType;
private String tableName;
private TableRecords beforeImage;
private TableRecords afterImage;
那么回滚日志是在什么时候被查到表里的呢,这里只讲其中一条线,定义PreparedStatementProxy实现了java.sql包下的的PreparedStatement接口
public int executeUpdate() throws SQLException {
return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}
执行方法会拿到数据库对应的Executor
public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
StatementProxy<S> statementProxy,
StatementCallback<T, S> statementCallback,
Object... args) throws SQLException {
if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
// Just work as original statement
return statementCallback.execute(statementProxy.getTargetStatement(), args);
}
String dbType = statementProxy.getConnectionProxy().getDbType();
if (CollectionUtils.isEmpty(sqlRecognizers)) {
sqlRecognizers = SQLVisitorFactory.get(
statementProxy.getTargetSQL(),
dbType);
}
Executor<T> executor;
if (CollectionUtils.isEmpty(sqlRecognizers)) {
executor = new PlainExecutor<>(statementProxy, statementCallback);
} else {
if (sqlRecognizers.size() == 1) {
SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
switch (sqlRecognizer.getSQLType()) {
case INSERT:
executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
new Object[]{statementProxy, statementCallback, sqlRecognizer});
break;
case UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case DELETE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case SELECT_FOR_UPDATE:
if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
} else {
executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
}
break;
case INSERT_ON_DUPLICATE_UPDATE:
switch (dbType) {
case JdbcConstants.MYSQL:
case JdbcConstants.MARIADB:
executor =
new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
}
break;
case UPDATE_JOIN:
switch (dbType) {
case JdbcConstants.MYSQL:
executor = new MySQLUpdateJoinExecutor<>(statementProxy,statementCallback,sqlRecognizer);
break;
default:
throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
}
break;
default:
executor = new PlainExecutor<>(statementProxy, statementCallback);
break;
}
} else {
executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
}
}
T rs;
try {
rs = executor.execute(args);
} catch (Throwable ex) {
if (!(ex instanceof SQLException)) {
// Turn other exception into SQLException
ex = new SQLException(ex);
}
throw (SQLException) ex;
}
return rs;
}
几个方法调用之后就查出前镜像和当前镜像
protected T executeAutoCommitFalse(Object[] args) throws Exception {
try {
TableRecords beforeImage = beforeImage();
T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
TableRecords afterImage = afterImage(beforeImage);
prepareUndoLog(beforeImage, afterImage);
return result;
} catch (TableMetaException e) {
LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
e.getTableName(), e.getColumnName());
statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
throw e;
}
}
将数据包装成SQLUndoLog放到Connection链接里面
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
return;
}
if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
if (beforeImage.getRows().size() != afterImage.getRows().size()) {
throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
}
}
ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();
TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
String lockKeys = buildLockKey(lockKeyRecords);
if (null != lockKeys) {
connectionProxy.appendLockKey(lockKeys);
SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
connectionProxy.appendUndoLog(sqlUndoLog);
}
}
五、总结
seata的使用为了全面越来越复杂,博主看了看2.x的代码相较于1.4多了很多模块,但是对于很多公司其实不是非常需要,有兴趣的欢迎讨论。
更多推荐
所有评论(0)