Seata-分布式事务中间件

seata的产生
在微服务调用中会出现如下的情况
在这里插入图片描述
在服务间的调用可能会出现一些服务调用成功,一些服务调用失败的情况。
为了使这些请求要么都成功,要么都失败就需要引入分布式事物的概念。
Seata 是一款开源的分布式事务解决方案,致力于提供高性能和简单易用的分布式事务服务。

seata简介

1.seata术语

TC (Transaction Coordinator) - 事务协调者

维护全局和分支事务的状态,驱动全局事务提交或回滚。

TM (Transaction Manager) - 事务管理器

定义全局事务的范围:开始全局事务、提交或回滚全局事务。

RM (Resource Manager) - 资源管理器

管理分支事务处理的资源,与TC交谈以注册分支事务和报告分支事务的状态,并驱动分支事务提交或回滚。

2PC(Two-Phase Commit) - 二阶段提交

seata使用二阶段提交模式来保持事物的一致性。与传统的2PC相比,seata具有如下特点

  • seata可以设置超时超时时间,可以正常的释放资源
  • seata的事务默认隔离级别是读未提交,因此在phase1阶段就会进行事务提交,减少phase2阶段持锁时间。
  • 传统2pc通过数据库层面保持数据的一致性,而seata client端是以jar包的形式运行在客户端。

2.seta的工作原理

2.1 seata执行流程

在这里插入图片描述
Seata 分布式事务的整体执行机制如上图所示,分为两阶段提交:

  • 发起方 TM 向 TC 申请开启一个全局事务,全局事务创建成功并生成唯一的全局事务标识 XID,该 XID 在后续事务的服务调用链路的上下文传播(通过Aop实现))
  • RM 向 TC 注册分支事务,汇报资源准备状况,并与 XID 进行绑定(Branch分支事务指分布式事务中每个独立的本地局部事务)
  • TM 向 TC 发起 XID 下的所有分支事务的全局提交或回滚请求(事务一阶段结束)
  • TC 汇总事务信息,决定分布式事务是提交还是回滚;
  • TC 通知所有 RM 提交/回滚 资源,事务二阶段结束;
2.2 Seata 的 AT 模式原理
整体机制:两阶段提交协议的演变:
优点
  • 对代码没有任何倾入性,只需要使用@GlobalTransaction注解
  • 可以满足90%及以上业务场景的使用。
  • AT模式支持的数据库有:MySQL、Oracle、PostgreSQL、 TiDB、MariaDB。
一阶段:

业务数据和回滚日志记录在同一个本地事务中提交,释放本地锁和连接资源。

  • 解析 SQL:得到 SQL 的类型(UPDATE,INSERT),表,WHERE条件 等相关的信息。
  • 查询前镜像:根据解析得到的条件信息,生成查询语句,定位数据。
  • 执行业务 SQL。
  • 查询后镜像:根据前镜像的结果,通过 主键 定位数据。
  • 插入回滚日志:把前后镜像数据以及业务 SQL 相关的信息组成一条回滚日志记录,插入到 UNDO_LOG 表中。
  • 提交前,向 TC 注册分支:申请表中,主键值记录的 全局锁 。
  • 本地事务提交:业务数据的更新和前面步骤中生成的 UNDO LOG 一并提交。
  • 将本地事务提交的结果上报给 TC。
二阶段:

提交异步化,非常快速地完成。
回滚通过一阶段的回滚日志进行反向补偿。

二阶段回滚
  • 收到 TC 的分支回滚请求,开启一个本地事务,执行如下操作。
  • 通过 XID 和 Branch ID 查找到相应的 UNDO LOG 记录。
  • 数据校验:拿 UNDO LOG 中的后镜与当前数据进行比较,如果有不同,说明数据被当前全局事务之外的动作做了修改。这种情况,需要根据配置策略来做处理。
  • 根据 UNDO LOG 中的前镜像和业务 SQL 的相关信息生成并执行回滚的语句
  • 提交本地事务。并把本地事务的执行结果(即分支事务回滚的结果)上报给 TC。
二阶段提交
  • 收到 TC 的分支提交请求,把请求放入一个异步任务的队列中,马上返回提交成功的结果给 TC。
  • 异步任务阶段的分支提交请求将异步和批量地删除相应 UNDO LOG 记录。

链路图:
在这里插入图片描述

2.3 Seata 的 TCC模式

根据两阶段行为模式的不同,将分支事务划分为 Automatic (Branch) Transaction Mode 和 TCC (Branch) Transaction Mode.

AT 模式(参考链接 TBD)基于 支持本地 ACID 事务 的 关系型数据库:

一阶段 prepare 行为:在本地事务中,一并提交业务数据更新和相应回滚日志记录。
二阶段 commit 行为:马上成功结束,自动 异步批量清理回滚日志。
二阶段 rollback 行为:通过回滚日志,自动 生成补偿操作,完成数据回滚。

相应的,TCC 模式,不依赖于底层数据资源的事务支持:

一阶段 prepare 行为:调用 自定义 的 prepare 逻辑。
二阶段 commit 行为:调用 自定义 的 commit 逻辑。
二阶段 rollback 行为:调用 自定义 的 rollback 逻辑。

所谓 TCC 模式,是指支持把 自定义 的分支事务纳入到全局事务的管理中。

2.4 Saga模式

详情见 : https://seata.io/zh-cn/docs/user/saga.html

2.5 XA模式

详情见 :https://seata.io/zh-cn/docs/dev/mode/xa-mode.html

seata AT模式实战

seata-server服务端搭建

第一步:https://github.com/seata/seata/releases 下载seata-server包
第二步:解压文件并配置落地策略和使用的注册中心。以选用db-mysql和nacos注册中心为例
1.解压后目录

在这里插入图片描述

2.配置目录中config下的file.conf 和 registry.conf文件,需要修改注册中心,配置中心以及中间数据存储方式。

在这里插入图片描述

3.file.conf修改mode=“db” 并配置数据库访问信息
## transaction log store, only used in seata-server
store {
  ## store mode: file、db、redis
  mode = "db"
  ## rsa decryption public key
  publicKey = ""
  ## file store property
  file {
    ## store location dir
    dir = "sessionStore"
    # branch session size , if exceeded first try compress lockkey, still exceeded throws exceptions
    maxBranchSessionSize = 16384
    # globe session size , if exceeded throws exceptions
    maxGlobalSessionSize = 512
    # file buffer size , if exceeded allocate new buffer
    fileWriteBufferCacheSize = 16384
    # when recover batch read size
    sessionReloadReadSize = 100
    # async, sync
    flushDiskMode = async
  }

  ## database store property
  db {
    ## the implement of javax.sql.DataSource, such as DruidDataSource(druid)/BasicDataSource(dbcp)/HikariDataSource(hikari) etc.
    datasource = "druid"
    ## mysql/oracle/postgresql/h2/oceanbase etc.
    dbType = "mysql"
    driverClassName = "com.mysql.jdbc.Driver"
    ## if using mysql to store the data, recommend add rewriteBatchedStatements=true in jdbc connection param
    url = "jdbc:mysql://192.168.72.133:3306/seata?rewriteBatchedStatements=true"
    user = "hive"
    password = "12345678"
    minConn = 5
    maxConn = 100
    globalTable = "global_table"
    branchTable = "branch_table"
    lockTable = "lock_table"
    queryLimit = 100
    maxWait = 5000
  }

  ## redis store property
  redis {
    ## redis mode: single、sentinel
    mode = "single"
    ## single mode property
    single {
      host = "127.0.0.1"
      port = "6379"
    }
    ## sentinel mode property
    sentinel {
      masterName = ""
      ## such as "10.28.235.65:26379,10.28.235.65:26380,10.28.235.65:26381"
      sentinelHosts = ""
    }
    password = ""
    database = "0"
    minConn = 1
    maxConn = 10
    maxTotal = 100
    queryLimit = 100
  }
}

4.registry.conf 修改registry和config中的 type="nacos"及配置nacos信息
registry {
  # file 、nacos 、eureka、redis、zk、consul、etcd3、sofa
  type = "nacos"

  nacos {
    application = "seata-server"
    serverAddr = "192.168.19.1:8848"
    group = "SEATA_GROUP"
    namespace = ""
    cluster = "default"
    username = "nacos"
    password = "nacos"
  }
  eureka {
    serviceUrl = "http://localhost:8761/eureka"
    application = "default"
    weight = "1"
  }
  redis {
    serverAddr = "localhost:6379"
    db = 0
    password = ""
    cluster = "default"
    timeout = 0
  }
  zk {
    cluster = "default"
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
  }
  consul {
    cluster = "default"
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  etcd3 {
    cluster = "default"
    serverAddr = "http://localhost:2379"
  }
  sofa {
    serverAddr = "127.0.0.1:9603"
    application = "default"
    region = "DEFAULT_ZONE"
    datacenter = "DefaultDataCenter"
    cluster = "default"
    group = "SEATA_GROUP"
    addressWaitTime = "3000"
  }
  file {
    name = "file.conf"
  }
}

config {
  # file、nacos 、apollo、zk、consul、etcd3
  type = "nacos"

  nacos {
    serverAddr = "192.168.19.1:8848"
    namespace = "seata"
    group = "SEATA_GROUP"
    username = "nacos"
    password = "nacos"
    dataId = "seataServer.properties"
  }
  consul {
    serverAddr = "127.0.0.1:8500"
    aclToken = ""
  }
  apollo {
    appId = "seata-server"
    ## apolloConfigService will cover apolloMeta
    apolloMeta = "http://192.168.1.204:8801"
    apolloConfigService = "http://192.168.1.204:8080"
    namespace = "application"
    apolloAccesskeySecret = ""
    cluster = "seata"
  }
  zk {
    serverAddr = "127.0.0.1:2181"
    sessionTimeout = 6000
    connectTimeout = 2000
    username = ""
    password = ""
    nodePath = "/seata/seata.properties"
  }
  etcd3 {
    serverAddr = "http://localhost:2379"
  }
  file {
    name = "file.conf"
  }
}

第五步:上传配置文件到nacos注册中心。

脚本文件官方下载无需更改。我放下script目录下。https://github.com/seata/seata/tree/develop/script/config-center/nacos

#!/bin/sh
# Copyright 1999-2019 Seata.io Group.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at、
#
#      http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.

while getopts ":h:p:g:t:u:w:" opt
do
  case $opt in
  h)
    host=$OPTARG
    ;;
  p)
    port=$OPTARG
    ;;
  g)
    group=$OPTARG
    ;;
  t)
    tenant=$OPTARG
    ;;
  u)
    username=$OPTARG
    ;;
  w)
    password=$OPTARG
    ;;
  ?)
    echo " USAGE OPTION: $0 [-h host] [-p port] [-g group] [-t tenant] [-u username] [-w password] "
    exit 1
    ;;
  esac
done

if [ -z ${host} ]; then
    host=localhost
fi
if [ -z ${port} ]; then
    port=8848
fi
if [ -z ${group} ]; then
    group="SEATA_GROUP"
fi
if [ -z ${tenant} ]; then
    tenant=""
fi
if [ -z ${username} ]; then
    username=""
fi
if [ -z ${password} ]; then
    password=""
fi

nacosAddr=$host:$port
contentType="content-type:application/json;charset=UTF-8"

echo "set nacosAddr=$nacosAddr"
echo "set group=$group"

urlencode() {
  length="${#1}"
  i=0
  while [ $length -gt $i ]; do
    char="${1:$i:1}"
    case $char in
    [a-zA-Z0-9.~_-]) printf $char ;;
    *) printf '%%%02X' "'$char" ;;
    esac
    i=`expr $i + 1`
  done
}

failCount=0
tempLog=$(mktemp -u)
function addConfig() {
  dataId=`urlencode $1`
  content=`urlencode $2`
  curl -X POST -H "${contentType}" "http://$nacosAddr/nacos/v1/cs/configs?dataId=$dataId&group=$group&content=$content&tenant=$tenant&username=$username&password=$password" >"${tempLog}" 2>/dev/null
  if [ -z $(cat "${tempLog}") ]; then
    echo " Please check the cluster status. "
    exit 1
  fi
  if [ "$(cat "${tempLog}")" == "true" ]; then
    echo "Set $1=$2 successfully "
  else
    echo "Set $1=$2 failure "
    failCount=`expr $failCount + 1`
  fi
}

count=0
COMMENT_START="#"
for line in $(cat $(dirname "$PWD")/config.txt | sed s/[[:space:]]//g); do
    if [[ "$line" =~ ^"${COMMENT_START}".*  ]]; then
      continue
    fi
    count=`expr $count + 1`
	  key=${line%%=*}
    value=${line#*=}
	  addConfig "${key}" "${value}"
done

echo "========================================================================="
echo " Complete initialization parameters,  total-count:$count ,  failure-count:$failCount "
echo "========================================================================="

if [ ${failCount} -eq 0 ]; then
	echo " Init nacos config finished, please start seata-server. "
else
	echo " init nacos config fail. "
fi

seta配置信息官方下载需要修改store.mode=db及store.db相关配置信息

#For details about configuration items, see https://seata.io/zh-cn/docs/user/configurations.html
#Transport configuration, for client and server
transport.type=TCP
transport.server=NIO
transport.heartbeat=true
transport.enableTmClientBatchSendRequest=false
transport.enableRmClientBatchSendRequest=true
transport.enableTcServerBatchSendResponse=false
transport.rpcRmRequestTimeout=30000
transport.rpcTmRequestTimeout=30000
transport.rpcTcRequestTimeout=30000
transport.threadFactory.bossThreadPrefix=NettyBoss
transport.threadFactory.workerThreadPrefix=NettyServerNIOWorker
transport.threadFactory.serverExecutorThreadPrefix=NettyServerBizHandler
transport.threadFactory.shareBossWorker=false
transport.threadFactory.clientSelectorThreadPrefix=NettyClientSelector
transport.threadFactory.clientSelectorThreadSize=1
transport.threadFactory.clientWorkerThreadPrefix=NettyClientWorkerThread
transport.threadFactory.bossThreadSize=1
transport.threadFactory.workerThreadSize=default
transport.shutdown.wait=3
transport.serialization=seata
transport.compressor=none

#Transaction routing rules configuration, only for the client
service.vgroupMapping.default_tx_group=default
#If you use a registry, you can ignore it
service.default.grouplist=127.0.0.1:8091
service.enableDegrade=false
service.disableGlobalTransaction=false

#Transaction rule configuration, only for the client
client.rm.asyncCommitBufferLimit=10000
client.rm.lock.retryInterval=10
client.rm.lock.retryTimes=30
client.rm.lock.retryPolicyBranchRollbackOnConflict=true
client.rm.reportRetryCount=5
client.rm.tableMetaCheckEnable=true
client.rm.tableMetaCheckerInterval=60000
client.rm.sqlParserType=druid
client.rm.reportSuccessEnable=false
client.rm.sagaBranchRegisterEnable=false
client.rm.sagaJsonParser=fastjson
client.rm.tccActionInterceptorOrder=-2147482648
client.tm.commitRetryCount=5
client.tm.rollbackRetryCount=5
client.tm.defaultGlobalTransactionTimeout=60000
client.tm.degradeCheck=false
client.tm.degradeCheckAllowTimes=10
client.tm.degradeCheckPeriod=2000
client.tm.interceptorOrder=-2147482648
client.undo.dataValidation=true
client.undo.logSerialization=jackson
client.undo.onlyCareUpdateColumns=true
server.undo.logSaveDays=7
server.undo.logDeletePeriod=86400000
client.undo.logTable=undo_log
client.undo.compress.enable=true
client.undo.compress.type=zip
client.undo.compress.threshold=64k
#For TCC transaction mode
tcc.fence.logTableName=tcc_fence_log
tcc.fence.cleanPeriod=1h

#Log rule configuration, for client and server
log.exceptionRate=100

#Transaction storage configuration, only for the server. The file, DB, and redis configuration values are optional.
store.mode=db
store.lock.mode=file
store.session.mode=file
#Used for password encryption
store.publicKey=

#If `store.mode,store.lock.mode,store.session.mode` are not equal to `file`, you can remove the configuration block.
store.file.dir=file_store/data
store.file.maxBranchSessionSize=16384
store.file.maxGlobalSessionSize=512
store.file.fileWriteBufferCacheSize=16384
store.file.flushDiskMode=async
store.file.sessionReloadReadSize=100

#These configurations are required if the `store mode` is `db`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `db`, you can remove the configuration block.
store.db.datasource=druid
store.db.dbType=mysql
store.db.driverClassName=com.mysql.jdbc.Driver
store.db.url=jdbc:mysql://192.168.72.133:3306/seata?useUnicode=true&rewriteBatchedStatements=true
store.db.user=hive
store.db.password=12345678
store.db.minConn=5
store.db.maxConn=30
store.db.globalTable=global_table
store.db.branchTable=branch_table
store.db.distributedLockTable=distributed_lock
store.db.queryLimit=100
store.db.lockTable=lock_table
store.db.maxWait=5000

#These configurations are required if the `store mode` is `redis`. If `store.mode,store.lock.mode,store.session.mode` are not equal to `redis`, you can remove the configuration block.
store.redis.mode=single
store.redis.single.host=127.0.0.1
store.redis.single.port=6379
store.redis.sentinel.masterName=
store.redis.sentinel.sentinelHosts=
store.redis.maxConn=10
store.redis.minConn=1
store.redis.maxTotal=100
store.redis.database=0
store.redis.password=
store.redis.queryLimit=100

#Transaction rule configuration, only for the server
server.recovery.committingRetryPeriod=1000
server.recovery.asynCommittingRetryPeriod=1000
server.recovery.rollbackingRetryPeriod=1000
server.recovery.timeoutRetryPeriod=1000
server.maxCommitRetryTimeout=-1
server.maxRollbackRetryTimeout=-1
server.rollbackRetryTimeoutUnlockEnable=false
server.distributedLockExpireTime=10000
server.xaerNotaRetryTimeout=60000
server.session.branchAsyncQueueSize=5000
server.session.enableBranchAsyncRemove=false

#Metrics configuration, only for the server
metrics.enabled=false
metrics.registryType=compact
metrics.exporterList=prometheus
metrics.exporterPrometheusPort=9898
第六步:在脚本目录下执行(命令中-t 是指nacos的namespace需要提前创建)

在这里插入图片描述
脚本在script目录下,配置文件为config.txt

 sh nacos-config.sh -h localhost -p 8848 -g SEATA_GROUP -t seata -u nacos -w nacos
启动seata服务端 bin目录下双击seata-server.bat

成功后可以在nacos的服务列表中看到seata-server服务,在配置列表可以看到相关配置信息,至此服务端已搭建完成。

服务列表
在这里插入图片描述
配置信息
在这里插入图片描述

seata-server客户端demo

客户端的代码库地址:https://gitee.com/ou_song/seata

演示图片

  1. 调用order服务,order服务中调用customer和stock服务。在stock服务中心抛出异常,发现order和customer中均没有数据变化。
    请求:
    在这里插入图片描述

前数据:
在这里插入图片描述

order在这里插入图片描述
stock
在这里插入图片描述

后数据:

customer
在这里插入图片描述
order
在这里插入图片描述
stock
在这里插入图片描述

  1. 将断点拦截在stock服务中可以看到seata数据库中的数据和服务端的undo_log中的数据。
    customer表中数据
    在这里插入图片描述
    order表中
    在这里插入图片描述
    seata服务表中

lock_table
在这里插入图片描述
global_table
在这里插入图片描述

Logo

权威|前沿|技术|干货|国内首个API全生命周期开发者社区

更多推荐