一、引言

        微服务的环境下,数据库隔离,数据不一致的情况越来越多,要么建立补偿机制要么手动刷数据,这时候就需要分布式事务,有问题上下游一起回滚,但是数据的可见性、链路时长还有其他很多原因说明链路不适宜拉的过长。

        转换实际使用场景,博主主要是spring cloud feign的http调用,这里也主要描述该环境下的原理

1、上下游进入全局事务

2、回滚

至于每个服务到底是怎么回滚的,那就要看一下源代码了

二、使用

1、服务端

        作者使用的是db模式

        在server使用的数据库建立了global_table、branch_table、lock_table,这些是服务端用于记录全局事务并进行锁表控制的记录表。

CREATE TABLE `global_table` (
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `status` tinyint(4) NOT NULL,
  `application_id` varchar(32) DEFAULT NULL,
  `transaction_service_group` varchar(32) DEFAULT NULL,
  `transaction_name` varchar(128) DEFAULT NULL,
  `timeout` int(11) DEFAULT NULL,
  `begin_time` bigint(20) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`xid`),
  KEY `idx_gmt_modified_status` (`gmt_modified`,`status`),
  KEY `idx_transaction_id` (`transaction_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `branch_table` (
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(128) NOT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `resource_group_id` varchar(32) DEFAULT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `branch_type` varchar(8) DEFAULT NULL,
  `status` tinyint(4) DEFAULT NULL,
  `client_id` varchar(64) DEFAULT NULL,
  `application_data` varchar(2000) DEFAULT NULL,
  `gmt_create` datetime(6) DEFAULT NULL,
  `gmt_modified` datetime(6) DEFAULT NULL,
  PRIMARY KEY (`branch_id`),
  KEY `idx_xid` (`xid`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

CREATE TABLE `lock_table` (
  `row_key` varchar(128) NOT NULL,
  `xid` varchar(96) DEFAULT NULL,
  `transaction_id` bigint(20) DEFAULT NULL,
  `branch_id` bigint(20) NOT NULL,
  `resource_id` varchar(256) DEFAULT NULL,
  `table_name` varchar(32) DEFAULT NULL,
  `pk` varchar(36) DEFAULT NULL,
  `gmt_create` datetime DEFAULT NULL,
  `gmt_modified` datetime DEFAULT NULL,
  PRIMARY KEY (`row_key`),
  KEY `idx_branch_id` (`branch_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8;

         修改file.conf

## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    //选择db模式将此处修改为global_table等表所在的库
    url = "jdbc:mysql://**?zeroDateTimeBehavior=convertToNull&autoReconnect=true&useUnicode=true&characterEncoding=utf-8&allowMultiQueries=true"
    user = "**"
    password = "**"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    ## redis mode: single、sentinel
    mode = "single"
    ## single mode property
    single {
      host = "127.0.0.1"
      port = "6379"
    }
    ## sentinel mode property
    sentinel {
      masterName = ""
      ## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
      sentinelHosts = ""
    }
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }
}

        服务端需要加入到注册中心,目的是使客户端找到seata.registry配置的application,客户端会与服务端建立连接

        作者使用的是eureka,修改registry.conf

registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "eureka"

  nacos {
    application = "seata-server"
    serverAddr = "127.0.0.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = ""
    password = ""
  }
  eureka {
    //修改为客户端向eureka注册的路径
    serviceUrl = "***"
    //定义seata服务端的名称,客户端要配置相同名称
    application = "seata-server"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "file"

  nacos {
    serverAddr = "127.0.0.1:8848"
    namespace = ""
    group = "SEATA_GROUP"
    username = ""
    password = ""
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    ## apolloConfigService will cover apolloMeta
    apolloMeta = "http://192.168.1.204:8801"
    apolloConfigService = "http://192.168.1.204:8080"
    namespace = "application"
    apolloAccesskeySecret = ""
    cluster = "seata"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

2、客户端

        客户端服务建立undo_log,存储的是服务协议回滚的信息。

CREATE TABLE `undo_log` (
  `id` bigint(20) NOT NULL AUTO_INCREMENT,
  `branch_id` bigint(20) NOT NULL,
  `xid` varchar(100) NOT NULL,
  `context` varchar(128) NOT NULL,
  `rollback_info` longblob NOT NULL,
  `log_status` int(11) NOT NULL,
  `log_created` datetime NOT NULL,
  `log_modified` datetime NOT NULL,
  `ext` varchar(100) DEFAULT NULL,
  PRIMARY KEY (`id`),
  UNIQUE KEY `ux_undo_log` (`xid`,`branch_id`)
) ENGINE=InnoDB AUTO_INCREMENT=83 DEFAULT CHARSET=utf8mb4;

        客户端一开始选用的是最新的1.4.2,但是这个有坑在,具体看后面

        <!--seata-->
        <dependency>
            <groupId>io.seata</groupId>
            <artifactId>seata-spring-boot-starter</artifactId>
            <version>1.4.2</version>
        </dependency>

         客户端需要对事务分组、服务名等进行配置

#事务分组,与seata-server配置的分组对应
seata.tx-service-group = my_test_tx_group
#配置TC服务名
seata.service.vgroupMapping.my_test_tx_group = seata-server
#注册中心,与TC一致
seata.registry.type = eureka
seata.registry.eureka.service-url = ***
//这里的名称要和服务端配置的注册中心一致
seata.registry.eureka.application = seata-server
seata.registry.eureka.weight = 1

        XA模式使用加@GlobalTransactional就可以,的确很方便

    @Override
    @GlobalTransactional
    public void testSeata(MacUnserviceDayEntity macUnserviceDayEntity) {
        String xid = RootContext.getXID();
        this.save(macUnserviceDayEntity);
        bitClient.departmentPage();
        int i  = 1/0;
    }

         但是测试之后下游服务没有回滚,应该是seata的全局事务xid没有传到下游

         作者尝试引入spring-cloud-starter-alibaba-seata,很遗憾客户端无法启动

         跟seata的创作者聊了一下,他们认为是博主公司父pom的boot版本太低了,这里博主也要吐槽一下父pom的boot才1.5.22,很多人反映过这个boot版本低有影响,但是负责人就是不改,没办法只能找其他办法

3、原因分析

        既然是xid没有传递到下级服务,看看能不能自己传吧

        上游服务定义配置类把xid加载到feign的请求头

@Configuration
public class FeignConfigSeata implements RequestInterceptor {
    @Override
    public void apply(RequestTemplate requestTemplate) {
        //从seata的RootContext中取出xid
        String xid = RootContext.getXID();
        if (StringUtils.isNotBlank(xid)) {
            System.out.println("feign 获得分布式事务xid:"+xid);
        }
        //把xid放入feign的请求头里面
        requestTemplate.header("Fescar-Xid", xid);
    }
}

        下游服务从feign请求头取出xid进行seata的RootContext绑定

@Configuration
public class FescarXidFilter extends OncePerRequestFilter {
    protected Logger logger = LoggerFactory.getLogger(FescarXidFilter.class);
    @Override
    protected void doFilterInternal(HttpServletRequest request, HttpServletResponse response, FilterChain filterChain) throws ServletException, IOException {
        String xid = RootContext.getXID();
        //获取请求头中的xid
        String restXid = request.getHeader("Fescar-Xid");
        boolean bind = false;
        if(StringUtils.isBlank(xid)&&StringUtils.isNotBlank(restXid)){
            //使用seata的RootContext进行全局事务id的绑定
            RootContext.bind(restXid);
            bind = true;
            if (logger.isDebugEnabled()) {

                logger.debug("bind[" + restXid + "] to RootContext");

            }

        }
        try{

            filterChain.doFilter(request, response);

        } finally {
            if (bind) {
                String unbindXid = RootContext.unbind();
                if (logger.isDebugEnabled()) {

                    logger.debug("unbind[" + unbindXid + "] from RootContext");
                }

                if (!restXid.equalsIgnoreCase(unbindXid)) {
                    logger.warn("xid in change during http rest from " + restXid + " to " + unbindXid);

                    if (unbindXid != null) {
                        RootContext.bind(unbindXid);
                        logger.warn("bind [" + unbindXid + "] back to RootContext");

                    }
                }
            }
        }
    }
}

         再试一下,然后还是没传过来,打断点看了一下,从上游的拦截开始就没有从RootContext中获取到,但是在测试方法是有的

        想了一会,hystrix是有线程池的,使用线程池中的线程进行请求调用再把数据加载到原始线程,这样原始线程的ThreadLocal就没带过来啊

        这里可以使用拦截器把原始线程与hystrix的头信息进行绑定,最终成功回滚,在undo_log中可以看到seata的回滚日志。

public class ClientConfig {

    private static final String X_ID= "TX_XID";

 @Bean
 public Encoder multipartFormEncoder(ObjectFactory<HttpMessageConverters> messageConverters) {
return new SpringFormEncoder(new SpringEncoder(messageConverters));
 }

@Bean
 public RequestInterceptor headerInterceptor() {
return template -> {
RequestAttributes requestAttributes = RequestContextHolder.getRequestAttributes();
 HttpServletRequest request = ((ServletRequestAttributes) requestAttributes).getRequest();
 String xId = Objects.nonNull(request.getAttribute(X_ID) ? String.valueOf(request.getAttribute(X_ID)) : null;
 template.header(X_ID, xId)
};
 }
}

四、原理

AT(补偿回滚)

这次体验使用的是AT模式,其实是使用了暂时存储当前的sql语句和之前的数据镜像,如果需要回滚就解析生成回滚的sql。

1、xid传递与拦截

xid作为seata传递核心的全局事务唯一键,需要被传递给下游服务

下游服务主要是实现了spring的HandlerInterceptor,然后在preHandle方法中获取请求头的TX_XID,然后放到自己的ThreadLocal里面

public class TransactionPropagationInterceptor implements HandlerInterceptorAdapter {

    private static final Logger LOGGER = LoggerFactory.getLogger(TransactionPropagationInterceptor.class);


    @Override
    public boolean preHandle(HttpServletRequest request, HttpServletResponse response, Object handler) {
        String rpcXid = request.getHeader(RootContext.KEY_XID);
        return this.bindXid(rpcXid);
    }

    @Override
    public void afterCompletion(HttpServletRequest request, HttpServletResponse response, Object handler, Exception ex) throws Exception {
        if (RootContext.inGlobalTransaction()) {
            String rpcXid = request.getHeader(RootContext.KEY_XID);
            this.cleanXid(rpcXid);
        }
    }


    protected boolean bindXid(String rpcXid) {
        String xid = RootContext.getXID();

        if (LOGGER.isDebugEnabled()) {
            LOGGER.debug("xid in RootContext[{}] xid in HttpContext[{}]", xid, rpcXid);
        }
        if (StringUtils.isBlank(xid) && StringUtils.isNotBlank(rpcXid)) {
            RootContext.bind(rpcXid);
            if (LOGGER.isDebugEnabled()) {
                LOGGER.debug("bind[{}] to RootContext", rpcXid);
            }
        }

        return true;
    }

    protected void cleanXid(String rpcXid) {
        XidResource.cleanXid(rpcXid);
    }

}

2、回滚

在DataSourceManager的branchRollback方法中进行回滚

public BranchStatus branchRollback(BranchType branchType, String xid, long branchId, String resourceId,
                                   String applicationData) throws TransactionException {
    DataSourceProxy dataSourceProxy = get(resourceId);
    if (dataSourceProxy == null) {
        throw new ShouldNeverHappenException(String.format("resource: %s not found",resourceId));
    }
    try {
        //根据数据源的db类型获取对应处理类
        UndoLogManagerFactory.getUndoLogManager(dataSourceProxy.getDbType()).undo(dataSourceProxy, xid, branchId);
        if (LOGGER.isInfoEnabled()) {
            LOGGER.info("branch rollback success, xid:{}, branchId:{}", xid, branchId);
        }
    } catch (TransactionException te) {
        StackTraceLogger.error(LOGGER, te,
            "branchRollback failed. branchType:[{}], xid:[{}], branchId:[{}], resourceId:[{}], applicationData:[{}]. reason:[{}]",
            new Object[]{branchType, xid, branchId, resourceId, applicationData, te.getMessage()});
        if (te.getCode() == TransactionExceptionCode.BranchRollbackFailed_Unretriable) {
            return BranchStatus.PhaseTwo_RollbackFailed_Unretryable;
        } else {
            return BranchStatus.PhaseTwo_RollbackFailed_Retryable;
        }
    }
    return BranchStatus.PhaseTwo_Rollbacked;

}

取到处理类执行undo方法,通过"SELECT * FROM " + UNDO_LOG_TABLE_NAME + " WHERE "+ ClientTableColumnsName.UNDO_LOG_BRANCH_XID + " = ? AND " + ClientTableColumnsName.UNDO_LOG_XID+ " = ? FOR UPDATE";获取客户端存储在undolog表里面的数据

public void undo(DataSourceProxy dataSourceProxy, String xid, long branchId) throws TransactionException {
    ConnectionProxy connectionProxy = null;
    Connection conn = null;
    ResultSet rs = null;
    PreparedStatement selectPST = null;
    boolean originalAutoCommit = true;

    for (; ; ) {
        try {
            connectionProxy = dataSourceProxy.getConnection();
            conn = connectionProxy.getTargetConnection();

            // The entire undo process should run in a local transaction.
            if (originalAutoCommit = conn.getAutoCommit()) {
                conn.setAutoCommit(false);
            }

            // Find UNDO LOG
            selectPST = conn.prepareStatement(buildSelectUndoSql());
            selectPST.setLong(1, branchId);
            selectPST.setString(2, xid);
            rs = selectPST.executeQuery();

            boolean exists = false;
            while (rs.next()) {
                exists = true;

                if (!canUndo(state)) {
                    if (LOGGER.isInfoEnabled()) {
                        LOGGER.info("xid {} branch {}, ignore {} undo_log", xid, branchId, state);
                    }
                    return;
                }

                String contextString = rs.getString(ClientTableColumnsName.UNDO_LOG_CONTEXT);
                Map<String, String> context = parseContext(contextString);
                byte[] rollbackInfo = getRollbackInfo(rs);

                //拿到context                
                String serializer = context == null ? null : context.get(UndoLogConstants.SERIALIZER_KEY);
                UndoLogParser parser = serializer == null ? UndoLogParserFactory.getInstance()
                    : UndoLogParserFactory.getInstance(serializer);
                BranchUndoLog branchUndoLog = parser.decode(rollbackInfo);

                try {
                    // put serializer name to local
                    setCurrentSerializer(parser.getName());
                    //解析出类SQLUndoLog
                    List<SQLUndoLog> sqlUndoLogs = branchUndoLog.getSqlUndoLogs();
                    if (sqlUndoLogs.size() > 1) {
                        Collections.reverse(sqlUndoLogs);
                    }
                    for (SQLUndoLog sqlUndoLog : sqlUndoLogs) {
                        TableMeta tableMeta = TableMetaCacheFactory.getTableMetaCache(dataSourceProxy.getDbType()).getTableMeta(
                            conn, sqlUndoLog.getTableName(), dataSourceProxy.getResourceId());
                        sqlUndoLog.setTableMeta(tableMeta);
                        AbstractUndoExecutor undoExecutor = UndoExecutorFactory.getUndoExecutor(
                            dataSourceProxy.getDbType(), sqlUndoLog);
                        undoExecutor.executeOn(connectionProxy);
                    }
                } finally {
                    // remove serializer name
                    removeCurrentSerializer();
                }
            }


            if (exists) {
                deleteUndoLog(xid, branchId, conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log deleted with {}", xid, branchId,
                        State.GlobalFinished.name());
                }
            } else {
                insertUndoLogWithGlobalFinished(xid, branchId, UndoLogParserFactory.getInstance(), conn);
                conn.commit();
                if (LOGGER.isInfoEnabled()) {
                    LOGGER.info("xid {} branch {}, undo_log added with {}", xid, branchId,
                        State.GlobalFinished.name());
                }
            }

            return;
        } catch (SQLIntegrityConstraintViolationException e) {
            // Possible undo_log has been inserted into the database by other processes, retrying rollback undo_log
            if (LOGGER.isInfoEnabled()) {
                LOGGER.info("xid {} branch {}, undo_log inserted, retry rollback", xid, branchId);
            }
        } catch (Throwable e) {
            if (conn != null) {
                try {
                    conn.rollback();
                } catch (SQLException rollbackEx) {
                    LOGGER.warn("Failed to close JDBC resource while undo ... ", rollbackEx);
                }
            }
            if (e instanceof SQLUndoDirtyException) {
                throw new BranchTransactionException(BranchRollbackFailed_Unretriable, String.format(
                    "Branch session rollback failed because of dirty undo log, please delete the relevant undolog after manually calibrating the data. xid = %s branchId = %s",
                    xid, branchId), e);
            }
            throw new BranchTransactionException(BranchRollbackFailed_Retriable,
                String.format("Branch session rollback failed and try again later xid = %s branchId = %s %s", xid,
                    branchId, e.getMessage()),
                e);

        } finally {
            try {
                if (rs != null) {
                    rs.close();
                }
                if (selectPST != null) {
                    selectPST.close();
                }
                if (conn != null) {
                    if (originalAutoCommit) {
                        conn.setAutoCommit(true);
                    }
                    connectionProxy.close();
                }
            } catch (SQLException closeEx) {
                LOGGER.warn("Failed to close JDBC resource while undo ... ", closeEx);
            }
        }
    }
}

在SQLUndoLog类中主要是这几个属性,通过语句类型和前镜像就可以补偿回滚到之前的数据

private SQLType sqlType;

private String tableName;

private TableRecords beforeImage;

private TableRecords afterImage;

那么回滚日志是在什么时候被查到表里的呢,这里只讲其中一条线,定义PreparedStatementProxy实现了java.sql包下的的PreparedStatement接口

public int executeUpdate() throws SQLException {
    return ExecuteTemplate.execute(this, (statement, args) -> statement.executeUpdate());
}

执行方法会拿到数据库对应的Executor

public static <T, S extends Statement> T execute(List<SQLRecognizer> sqlRecognizers,
                                                 StatementProxy<S> statementProxy,
                                                 StatementCallback<T, S> statementCallback,
                                                 Object... args) throws SQLException {
    if (!RootContext.requireGlobalLock() && BranchType.AT != RootContext.getBranchType()) {
        // Just work as original statement
        return statementCallback.execute(statementProxy.getTargetStatement(), args);
    }

    String dbType = statementProxy.getConnectionProxy().getDbType();
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        sqlRecognizers = SQLVisitorFactory.get(
                statementProxy.getTargetSQL(),
                dbType);
    }
    Executor<T> executor;
    if (CollectionUtils.isEmpty(sqlRecognizers)) {
        executor = new PlainExecutor<>(statementProxy, statementCallback);
    } else {
        if (sqlRecognizers.size() == 1) {
            SQLRecognizer sqlRecognizer = sqlRecognizers.get(0);
            switch (sqlRecognizer.getSQLType()) {
                case INSERT:
                    executor = EnhancedServiceLoader.load(InsertExecutor.class, dbType,
                                new Class[]{StatementProxy.class, StatementCallback.class, SQLRecognizer.class},
                                new Object[]{statementProxy, statementCallback, sqlRecognizer});
                    break;
                case UPDATE:
                    if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                        executor = new SqlServerUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    } else {
                        executor = new UpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    }
                    break;
                case DELETE:
                    if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                        executor = new SqlServerDeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    } else {
                        executor = new DeleteExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    }
                    break;
                case SELECT_FOR_UPDATE:
                    if (JdbcConstants.SQLSERVER.equalsIgnoreCase(dbType)) {
                        executor = new SqlServerSelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    } else {
                        executor = new SelectForUpdateExecutor<>(statementProxy, statementCallback, sqlRecognizer);
                    }
                    break;
                case INSERT_ON_DUPLICATE_UPDATE:
                    switch (dbType) {
                        case JdbcConstants.MYSQL:
                        case JdbcConstants.MARIADB:
                            executor =
                                new MySQLInsertOnDuplicateUpdateExecutor(statementProxy, statementCallback, sqlRecognizer);
                            break;
                        default:
                            throw new NotSupportYetException(dbType + " not support to INSERT_ON_DUPLICATE_UPDATE");
                    }
                    break;
                case UPDATE_JOIN:
                    switch (dbType) {
                        case JdbcConstants.MYSQL:
                            executor = new MySQLUpdateJoinExecutor<>(statementProxy,statementCallback,sqlRecognizer);
                            break;
                        default:
                            throw new NotSupportYetException(dbType + " not support to " + SQLType.UPDATE_JOIN.name());
                    }
                    break;
                default:
                    executor = new PlainExecutor<>(statementProxy, statementCallback);
                    break;
            }
        } else {
            executor = new MultiExecutor<>(statementProxy, statementCallback, sqlRecognizers);
        }
    }
    T rs;
    try {
        rs = executor.execute(args);
    } catch (Throwable ex) {
        if (!(ex instanceof SQLException)) {
            // Turn other exception into SQLException
            ex = new SQLException(ex);
        }
        throw (SQLException) ex;
    }
    return rs;
}

几个方法调用之后就查出前镜像和当前镜像

protected T executeAutoCommitFalse(Object[] args) throws Exception {
    try {
        TableRecords beforeImage = beforeImage();
        T result = statementCallback.execute(statementProxy.getTargetStatement(), args);
        TableRecords afterImage = afterImage(beforeImage);
        prepareUndoLog(beforeImage, afterImage);
        return result;
    } catch (TableMetaException e) {
        LOGGER.error("table meta will be refreshed later, due to TableMetaException, table:{}, column:{}",
            e.getTableName(), e.getColumnName());
        statementProxy.getConnectionProxy().getDataSourceProxy().tableMetaRefreshEvent();
        throw e;
    }
}
将数据包装成SQLUndoLog放到Connection链接里面
protected void prepareUndoLog(TableRecords beforeImage, TableRecords afterImage) throws SQLException {
    if (beforeImage.getRows().isEmpty() && afterImage.getRows().isEmpty()) {
        return;
    }
    if (SQLType.UPDATE == sqlRecognizer.getSQLType()) {
        if (beforeImage.getRows().size() != afterImage.getRows().size()) {
            throw new ShouldNeverHappenException("Before image size is not equaled to after image size, probably because you updated the primary keys.");
        }
    }
    ConnectionProxy connectionProxy = statementProxy.getConnectionProxy();

    TableRecords lockKeyRecords = sqlRecognizer.getSQLType() == SQLType.DELETE ? beforeImage : afterImage;
    String lockKeys = buildLockKey(lockKeyRecords);
    if (null != lockKeys) {
        connectionProxy.appendLockKey(lockKeys);

        SQLUndoLog sqlUndoLog = buildUndoItem(beforeImage, afterImage);
        connectionProxy.appendUndoLog(sqlUndoLog);
    }
}

五、总结

        seata的使用为了全面越来越复杂,博主看了看2.x的代码相较于1.4多了很多模块,但是对于很多公司其实不是非常需要,有兴趣的欢迎讨论。

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐