分布式事务 | 使用DTM 的Saga 模式
前面章节提及的MassTransit、dotnetcore/CAP都提供了分布式事务的处理能力,但也仅局限于Saga和本地消息表模式的实现。那有没有一个独立的分布式事务解决方案,涵盖多种分布式事务处理模式,如Saga、TCC、XA模式等。有,目前业界主要有两种开源方案,其一是阿里开源的Seata,另一个就是DTM。其中Seata仅支持Java、Go和Python语言,因此不在.NET 的选择范围。DTM则通过提供简单易用的HTTP和gRPC接口,屏蔽了语言的无关性,因此支持任何开发语言接入,目前提供了Go、Python、NodeJs、Ruby、Java和C#等语言的SDK。
DTM,全称Distributed Transaction Manager,是一个分布式事务管理器,解决跨数据库、跨服务、跨语言更新数据的一致性问题。它提供了Saga、TCC、 XA和二阶段消息模式以满足不同应用场景的需求,同时其首创的子事务屏障技术可以有效解决幂等、悬挂和空补偿等异常问题。
DTM 事务处理过程及架构
那DTM是如何处理分布式事务的呢?以一个经典的跨行转账业务为例来看下事务处理过程。对于跨行转账业务而言,很显然是跨库跨服务的应用场景,不能简单通过本地事务解决,可以使用Saga模式,以下是基于DTM提供的Saga事务模式成功转账的的时序图:

从以上时序图可以看出,DTM整个全局事务分为如下几步:
- 用户定义好全局事务所有的事务分支(全局事务的组成部分称为事务分支),然后提交给DTM,DTM持久化全局事务信息后,立即返回
- DTM取出第一个事务分支,这里是TransOut,调用该服务并成功返回
- DTM取出第二个事务分支,这里是TransIn,调用该服务并成功返回
- DTM已完成所有的事务分支,将全局事务的状态修改为已完成
基于以上这个时序图的基础上,再来看下DTM的架构:

整个DTM架构中,一共有三个角色,分别承担了不同的职责:
- RM-资源管理器:RM是一个应用服务,通常连接到独立的数据库,负责处理全局事务中的本地事务,执行相关数据的修改、提交、回滚、补偿等操作。例如在前面的这个Saga事务时序图中,步骤2、3中被调用的TransIn和TransOut方法所在的服务都是RM。
- AP-应用程序:AP是一个应用服务,负责全局事务的编排,他会注册全局事务,注册子事务,调用RM接口。例如在前面的这个SAGA事务中,发起步骤1的是AP,它编排了一个包含TransOut、TransIn的全局事务,然后提交给TM
- TM-事务管理器:TM就是DTM服务,负责全局事务的管理,作为一个独立的服务而存在。每个全局事务都注册到TM,每个事务分支也注册到TM。TM会协调所有的RM来执行不同的事务分支,并根据执行结果决定是否提交或回滚事务。例如在前面的Saga事务时序图中,TM在步骤2、3中调用了各个RM,在步骤4中,完成这个全局事务。
总体而言,AP-应用程序充当全局事务编排器的角色通过DTM提供的开箱即用的SDK进行全局事务和子事务的注册。TM-事务管理器接收到注册的全局事务和子事务后,负责调用RM-资源管理器来执行对应的事务分支,TM-事务管理器根据事务分支的执行结果决定是否提及或回滚事务。
快速上手
百闻不如一见,接下来就来实际上手体验下如何基于DTM来实际应用Saga进行分布式跨行转账事务的处理。
创建示例项目
接下来就来创建一个示例项目:
- 使用
dotnet new webapi -n DtmDemo.Webapi创建示例项目。 - 添加Nuget包:
Dtmcli和Pomelo.EntityFrameworkCore.MySql。 - 添加DTM配置项:
{ |
"dtm": { |
"DtmUrl": "http://localhost:36789", |
"DtmTimeout": 10000, |
"BranchTimeout": 10000, |
"DBType": "mysql", |
"BarrierTableName": "dtm_barrier.barrier", |
} |
} |
- 定义银行账户
BankAccount实体类:
namespace DtmDemo.WebApi.Models |
{ |
public class BankAccount |
{ |
public int Id { get; set; } |
public decimal Balance { get; set; } |
} |
} |
- 定义
DtmDemoWebApiContext数据库上下文:
using Microsoft.EntityFrameworkCore; |
namespace DtmDemo.WebApi.Data |
{ |
public class DtmDemoWebApiContext : DbContext |
{ |
public DtmDemoWebApiContext (DbContextOptions<DtmDemoWebApiContext> options) |
: base(options) |
{ |
} |
public DbSet<DtmDemo.WebApi.Models.BankAccount> BankAccount { get; set; } = default!; |
} |
} |
- 注册DbContext 和DTM服务:
using Microsoft.EntityFrameworkCore; |
using DtmDemo.WebApi.Data; |
using Dtmcli; |
var builder = WebApplication.CreateBuilder(args); |
var connectionStr = builder.Configuration.GetConnectionString("DtmDemoWebApiContext"); |
// 注册DbContext |
builder.Services.AddDbContext<DtmDemoWebApiContext>(options => |
{ |
options.UseMySql(connectionStr, ServerVersion.AutoDetect(connectionStr)); |
}); |
// 注册DTM |
builder.Services.AddDtmcli(builder.Configuration, "dtm"); |
- 执行
dotnet ef migrations add 'Initial'创建迁移。 - 为便于初始化演示数据,定义
BankAccountController如下,其中PostBankAccount接口添加了await _context.Database.MigrateAsync();用于自动应用迁移。
using Microsoft.AspNetCore.Mvc; |
using Microsoft.EntityFrameworkCore; |
using DtmDemo.WebApi.Data; |
using DtmDemo.WebApi.Models; |
using Dtmcli; |
namespace DtmDemo.WebApi.Controllers |
{ |
[Route("api/[controller]")] |
[ApiController] |
public class BankAccountsController : ControllerBase |
{ |
private readonly DtmDemoWebApiContext _context; |
public BankAccountsController(DtmDemoWebApiContext context) |
{ |
_context = context; |
} |
[HttpGet] |
public async Task<ActionResult<IEnumerable<BankAccount>>> GetBankAccount() |
{ |
return await _context.BankAccount.ToListAsync(); |
} |
[HttpPost] |
public async Task<ActionResult<BankAccount>> PostBankAccount(BankAccount bankAccount) |
{ |
await _context.Database.MigrateAsync(); |
_context.BankAccount.Add(bankAccount); |
await _context.SaveChangesAsync(); |
return Ok(bankAccount); |
} |
} |
应用Saga模式
接下来定义SagaDemoController来使用DTM的Saga模式来模拟跨行转账分布式事务:
using Microsoft.AspNetCore.Mvc; |
using Microsoft.EntityFrameworkCore; |
using DtmDemo.WebApi.Data; |
using DtmDemo.WebApi.Models; |
using Dtmcli; |
using DtmCommon; |
namespace DtmDemo.WebApi.Controllers |
{ |
[Route("api/[controller]")] |
[ApiController] |
public class SagaDemoController : ControllerBase |
{ |
private readonly DtmDemoWebApiContext _context; |
private readonly IConfiguration _configuration; |
private readonly IDtmClient _dtmClient; |
private readonly IDtmTransFactory _transFactory; |
private readonly IBranchBarrierFactory _barrierFactory; |
private readonly ILogger<BankAccountsController> _logger; |
public SagaDemoController(DtmDemoWebApiContext context, IConfiguration configuration, IDtmClient dtmClient, IDtmTransFactory transFactory, ILogger<BankAccountsController> logger, IBranchBarrierFactory barrierFactory) |
{ |
this._context = context; |
this._configuration = configuration; |
this._dtmClient = dtmClient; |
this._transFactory = transFactory; |
this._logger = logger; |
this._barrierFactory = barrierFactory; |
} |
} |
对于跨行转账业务,使用DTM的Saga模式,首先要进行事务拆分,可以拆分为以下4个子事务,并分别实现:
转出子事务(TransferOut)
[HttpPost("TransferOut")] |
public async Task<IActionResult> TransferOut([FromBody] TransferRequest request) |
{ |
var msg = $"用户{request.UserId}转出{request.Amount}元"; |
_logger.LogInformation($"转出子事务-启动:{msg}"); |
// 1. 创建子事务屏障 |
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
try |
{ |
using (var conn = _context.Database.GetDbConnection()) |
{ |
// 2. 在子事务屏障内执行事务操作 |
await branchBarrier.Call(conn, async (tx) => |
{ |
_logger.LogInformation($"转出子事务-执行:{msg}"); |
await _context.Database.UseTransactionAsync(tx); |
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
if (bankAccount == null || bankAccount.Balance < request.Amount) |
throw new InvalidDataException("账户不存在或余额不足!"); |
bankAccount.Balance -= request.Amount; |
await _context.SaveChangesAsync(); |
}); |
} |
} |
catch (InvalidDataException ex) |
{ |
_logger.LogInformation($"转出子事务-失败:{ex.Message}"); |
// 3. 按照接口协议,返回409,以表示子事务失败 |
return new StatusCodeResult(StatusCodes.Status409Conflict); |
} |
_logger.LogInformation($"转出子事务-成功:{msg}"); |
return Ok(); |
} |
以上代码中有几点需要额外注意:
- 使用Saga模式,必须开启子事务屏障:
_barrierFactory.CreateBranchBarrier(Request.Query),其中Request.Query中的参数由DTM 生成,类似:?branch_id=01&gid=XTzKHgxemLyL8EXtMTLvzK&op=action&trans_type=saga,主要包含四个参数:- gid:全局事务Id
- trans_type:事务类型,是saga、msg、xa或者是tcc。
- branch_id:子事务的Id
- op:当前操作,对于Saga事务模式,要么为action(正向操作),要么为compensate(补偿操作)。
- 必须在子事务屏障内执行事务操作:
branchBarrier.Call(conn, async (tx) =>{} - 对于Saga正向操作而言,业务上的失败与异常是需要做严格区分的,例如前面的余额不足,是业务上的失败,必须回滚。而对于网络抖动等其他外界原因导致的事务失败,属于业务异常,则需要重试。因此若因业务失败(这里是账户不存在或余额不足)而导致子事务失败,则必须通过抛异常的方式并返回
**409**状态码以告知DTM 子事务失败。 - 以上通过抛出异常的方式中断子事务执行并在外围捕获特定异常返回
409状态码。在外围捕获异常时切忌放大异常捕获,比如直接catch(Exception),如此会捕获由于网络等其他原因导致的异常,而导致DTM 不再自动处理该异常,比如业务异常时的自动重试。
转出补偿子事务(TransferOut_Compensate)
转出补偿,就是回滚转出操作,进行账户余额归还,实现如下:
[HttpPost("TransferOut_Compensate")] |
public async Task<IActionResult> TransferOut_Compensate([FromBody] TransferRequest request) |
{ |
var msg = $"用户{request.UserId}回滚转出{request.Amount}元"; |
_logger.LogInformation($"转出补偿子事务-启动:{msg}"); |
// 1. 创建子事务屏障 |
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
using (var conn = _context.Database.GetDbConnection()) |
{ |
// 在子事务屏障内执行事务操作 |
await branchBarrier.Call(conn, async (tx) => |
{ |
_logger.LogInformation($"转出补偿子事务-执行:{msg}"); |
await _context.Database.UseTransactionAsync(tx); |
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
if (bankAccount == null) |
return; //对于补偿操作,可直接返回,中断后续操作 |
bankAccount.Balance += request.Amount; |
await _context.SaveChangesAsync(); |
}); |
} |
_logger.LogInformation($"转出补偿子事务-成功!"); |
// 2. 因补偿操作必须成功,所以必须返回200。 |
return Ok(); |
} |
由于DTM设计为总是执行补偿,也就是说即使正向操作子事务失败时,DTM 仍旧会执行补偿逻辑。但子事务屏障会在执行时判断正向操作的执行状态,当子事务失败时,并不会执行补偿逻辑。
另外DTM的补偿操作,是要求最终成功的,只要还没成功,就会不断进行重试,直到成功。因此在补偿子事务中,即使补偿子事务中出现业务失败时,也必须返回**200**。因此当出现bankAccount==null时可以直接 return。
转入子事务(TransferIn)
转入子事务和转出子事务的实现基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并通过抛异常的方式并最终返回409状态码来显式告知DTM 子事务执行失败。
[HttpPost("TransferIn")] |
public async Task<IActionResult> TransferIn([FromBody] TransferRequest request) |
{ |
var msg = $"用户{request.UserId}转入{request.Amount}元"; |
_logger.LogInformation($"转入子事务-启动:{msg}"); |
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
try |
{ |
using (var conn = _context.Database.GetDbConnection()) |
{ |
await branchBarrier.Call(conn, async (tx) => |
{ |
_logger.LogInformation($"转入子事务-执行:{msg}"); |
await _context.Database.UseTransactionAsync(tx); |
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
if (bankAccount == null) |
throw new InvalidDataException("账户不存在!"); |
bankAccount.Balance += request.Amount; |
await _context.SaveChangesAsync(); |
}); |
} |
} |
catch (InvalidDataException ex) |
{ |
_logger.LogInformation($"转入子事务-失败:{ex.Message}"); |
return new StatusCodeResult(StatusCodes.Status409Conflict); |
} |
_logger.LogInformation($"转入子事务-成功:{msg}"); |
return Ok(); |
} |
转入补偿子事务(TransferIn_Compensate)
转入补偿子事务和转出补偿子事务的实现也基本类似,都是开启子事务屏障后,在branchBarrier.Call(conn, async tx => {}中实现事务逻辑,并最终返回200状态码来告知DTM 补偿子事务执行成功。
[HttpPost("TransferIn_Compensate")] |
public async Task<IActionResult> TransferIn_Compensate([FromBody] TransferRequest request) |
{ |
var msg = "用户{request.UserId}回滚转入{request.Amount}元"; |
_logger.LogInformation($"转入补偿子事务-启动:{msg}"); |
var branchBarrier = _barrierFactory.CreateBranchBarrier(Request.Query); |
using (var conn = _context.Database.GetDbConnection()) |
{ |
await branchBarrier.Call(conn, async (tx) => |
{ |
_logger.LogInformation($"转入补偿子事务-执行:{msg}"); |
await _context.Database.UseTransactionAsync(tx); |
var bankAccount = await _context.BankAccount.FindAsync(request.UserId); |
if (bankAccount == null) return; |
bankAccount.Balance -= request.Amount; |
await _context.SaveChangesAsync(); |
}); |
} |
_logger.LogInformation($"转入补偿子事务-成功!"); |
return Ok(); |
} |
编排Saga事务
拆分完子事务,最后就可以进行Saga事务编排了,其代码如下所示:
[HttpPost("Transfer")] |
public async Task<IActionResult> Transfer(int fromUserId, int toUserId, decimal amount, |
CancellationToken cancellationToken) |
{ |
try |
{ |
_logger.LogInformation($"转账事务-启动:用户{fromUserId}转账{amount}元到用户{toUserId}"); |
//1. 生成全局事务ID |
var gid = await _dtmClient.GenGid(cancellationToken); |
var bizUrl = _configuration.GetValue<string>("TransferBaseURL"); |
//2. 创建Saga |
var saga = _transFactory.NewSaga(gid); |
//3. 添加子事务 |
saga.Add(bizUrl + "/TransferOut", bizUrl + "/TransferOut_Compensate", |
new TransferRequest(fromUserId, amount)) |
.Add(bizUrl + "/TransferIn", bizUrl + "/TransferIn_Compensate", |
new TransferRequest(toUserId, amount)) |
.EnableWaitResult(); // 4. 按需启用是否等待事务执行结果 |
//5. 提交Saga事务 |
await saga.Submit(cancellationToken); |
} |
catch (DtmException ex) // 6. 如果开启了`EnableWaitResult()`,则可通过捕获异常的方式,捕获事务失败的结果。 |
{ |
_logger.LogError($"转账事务-失败:用户{fromUserId}转账{amount}元到用户{toUserId}失败!"); |
return new BadRequestObjectResult($"转账失败:{ex.Message}"); |
} |
_logger.LogError($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!"); |
return Ok($"转账事务-完成:用户{fromUserId}转账{amount}元到用户{toUserId}成功!"); |
} |
主要步骤如下:
- 生成全局事务Id:
var gid =await _dtmClient.GenGid(cancellationToken); - 创建Saga全局事务:
_transFactory.NewSaga(gid); - 添加子事务:
saga.Add(string action, string compensate, object postData);包含正向和反向子事务。 - 如果依赖事务执行结果,可通过
EnableWaitResult()开启事务结果等待。 - 提交Saga全局事务:
saga.Submit(cancellationToken); - 若开启了事务结果等待,可以通过
try...catch..来捕获DtmExcepiton异常来获取事务执行异常信息。
运行项目
既然DTM作为一个独立的服务存在,其负责通过HTTP或gRPC协议发起子事务的调用,因此首先需要启动一个DTM实例,又由于本项目依赖MySQL,因此我们采用Docker Compose的方式来启动项目。在Visual Studio中通过右键项目->Add->Docker Support->Linux 即可添加Dockerfile如下所示:
FROM mcr.microsoft.com/dotnet/aspnet:6.0 AS base |
WORKDIR /app |
EXPOSE 80 |
EXPOSE 443 |
FROM mcr.microsoft.com/dotnet/sdk:6.0 AS build |
WORKDIR /src |
COPY ["DtmDemo.WebApi/DtmDemo.WebApi.csproj", "DtmDemo.WebApi/"] |
RUN dotnet restore "DtmDemo.WebApi/DtmDemo.WebApi.csproj" |
COPY . . |
WORKDIR "/src/DtmDemo.WebApi" |
RUN dotnet build "DtmDemo.WebApi.csproj" -c Release -o /app/build |
FROM build AS publish |
RUN dotnet publish "DtmDemo.WebApi.csproj" -c Release -o /app/publish |
FROM base AS final |
WORKDIR /app |
COPY --from=publish /app/publish . |
ENTRYPOINT ["dotnet", "DtmDemo.WebApi.dll"] |
在Visual Studio中通过右键项目->Add Container Orchestrator Support->Docker Compose即可添加docker-compose.yml,由于整个项目依赖mysql和DTM,修改docker-compose.yml如下所示,其中定义了三个服务:db,dtm和dtmdemo.webapi。
version: '3.4' |
services: |
db: |
image: 'mysql:5.7' |
container_name: dtm-mysql |
environment: |
MYSQL_ROOT_PASSWORD: 123456 # 指定MySQL初始密码 |
volumes: |
- ./docker/mysql/scripts:/docker-entrypoint-initdb.d # 挂载用于初始化数据库的脚本 |
ports: |
- '3306:3306' |
dtm: |
depends_on: ["db"] |
image: 'yedf/dtm:latest' |
container_name: dtm-svc |
environment: |
IS_DOCKER: '1' |
STORE_DRIVER: mysql # 指定使用MySQL持久化DTM事务数据 |
STORE_HOST: db # 指定MySQL服务名,这里是db |
STORE_USER: root |
STORE_PASSWORD: '123456' |
STORE_PORT: 3306 |
STORE_DB: "dtm" # 指定DTM 数据库名 |
ports: |
- '36789:36789' # DTM HTTP 端口 |
- '36790:36790' # DTM gRPC 端口 |
dtmdemo.webapi: |
depends_on: ["dtm", "db"] |
image: ${DOCKER_REGISTRY-}dtmdemowebapi |
environment: |
ASPNETCORE_ENVIRONMENT: docker # 设定启动环境为docker |
container_name: dtm-webapi-demo |
build: |
context: . |
dockerfile: DtmDemo.WebApi/Dockerfile |
ports: |
- '31293:80' # 映射Demo:80端口到本地31293端口 |
- '31294:443' # 映射Demo:443端口到本地31294端口 |
更多推荐
所有评论(0)