2

聊一聊如何用C#轻松完成一个TCC分布式事务

 2 years ago
source link: https://www.cnblogs.com/catcher1994/p/csharp-dtm-tcc.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

银行跨行转账业务是一个典型分布式事务场景,假设 A 需要跨行转账给 B,那么就涉及两个银行的数据,无法通过一个数据库的本地事务保证转账的 ACID ,只能够通过分布式事务来解决。

聊一聊如何用C#轻松完成一个SAGA分布式事务 中介绍了借助 DTM 用 SAGA 事务模式解决了上面的银行跨行转账业务。

这一篇我们就来看看如何用 TCC 的事务模式来处理这个问题。

什么是 TCC

TCC是Try、Confirm、Cancel三个词语的缩写,最早是由 Pat Helland 于 2007 年发表的一篇名为《Life beyond Distributed Transactions:an Apostate’s Opinion》的论文提出。

TCC分为3个阶段

  • Try 阶段:尝试执行,完成所有业务检查(一致性), 预留必需的业务资源(准隔离性)
  • Confirm 阶段:如果所有分支的Try都成功了,则走到Confirm阶段。Confirm真正执行业务,不作任何业务检查,只使用 Try 阶段预留的业务资源
  • Cancel 阶段:如果所有分支的Try有一个失败了,则走到Cancel阶段。Cancel释放 Try 阶段预留的业务资源。

对于前面的跨行转账业务,最简单的做法是,在Try阶段调整余额,在Cancel阶段反向调整余额,Confirm阶段则空操作。这么做带来的问题是,如果A扣款成功,金额转入B失败,最后回滚,把A的余额调整为初始值。在这个过程中如果A发现自己的余额被扣减了,但是收款方B迟迟没有收到余额,那么会对A造成困扰。

更好的做法是,Try阶段冻结A转账的金额,Confirm进行实际的扣款,Cancel进行资金解冻,这样用户在任何一个阶段,看到的数据都是清晰明了的。

下面我们进行一个 TCC 事务的具体开发

dotnet add package Dtmcli --version 0.4.0

注:相比 0.3.0,0.4.0 支持了 4 个新的特性,详见 https://github.com/dtm-labs/dtmcli-csharp/releases/tag/v0.4.0

成功的 TCC

先来看一下一个成功完成的 TCC 时序图。

可以看到它的流程和 SAGA 的还是有比较大的区别。

同样的,上图的微服务1,对应我们示例的 OutApi,也就是转钱出去的那个服务。

微服务2,对应我们示例的 InApi,也就是转钱进来的那个服务。

下面我们来编写两个服务的Try/Confirm/Cancel的处理。

OutApi

app.MapPost("/api/TransOutTry", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) => 
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】Try 操作,bb={bb}");
        // tx 参数是事务,可和本地事务一起提交回滚
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutConfirm", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】Confirm操作,bb={bb}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransOutCancel", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转出【{req.Amount}】Cancel操作,bb={bb}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

InApi


app.MapPost("/api/TransInTry", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】Try操作,bb={bb}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInConfirm", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】Confirm操作,bb={bb}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

app.MapPost("/api/TransInCancel", async (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    using var db = Db.GeConn();
    await bb.Call(db, async (tx) =>
    {
        Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】Cancel操作,bb={bb}");
        await Task.CompletedTask;
    });

    return Results.Ok(TransResponse.BuildSucceedResponse());
});

到此各个子事务的处理已经OK了,在上面的代码中,下面这几行是子事务屏障相关代码,只要按照这个方式来调用您的业务逻辑,子事务屏障保证重复请求、悬挂、空补偿情况出现时,您的业务逻辑不会被调用,保证了正常业务的正确进行

var bb = bbFactory.CreateBranchBarrier(context.Request.Query);
await bb.Call(db, async (tx) =>
{
    // 业务操作...
});

然后准备开启 TCC 事务,进行分支调用

var cts = new CancellationTokenSource();

var gid = await dtmClient.GenGid(cts.Token);

var res = await tccGlobalTransaction.Excecute(gid, async (tcc) =>
{
    // 用户1 转出30元
    var res1 = await tcc.CallBranch(userOutReq, outApi + "/TransOutTry", outApi + "/TransOutConfirm", outApi + "/TransOutCancel", cts.Token);

    // 用户2 转入30元
    var res2 = await tcc.CallBranch(userInReq, inApi + "/TransInTry", inApi + "/TransInConfirm", inApi + "/TransInCancel", cts.Token);
    
    Console.WriteLine($"case1, branch-out-res= {res1} branch-in-res= {res2}");
}, cts.Token);

Console.WriteLine($"case1, {gid} tcc 提交结果 = {res}");

到这里,一个完整的 TCC 分布式事务就编写完成了。

需要注意的地方:

  1. 依赖 TccGlobalTransaction ,这个是单例的
  2. tcc 的 CallBranch 方法就是事务分支的调用

搭建好 dtm 的环境后,运行上面的例子,会看到下面的输出。

成功的示例都是相对比较简单的。

下面来看一个 TCC 回滚的例子。

TCC 的回滚

假如银行将金额准备转入用户2时,发现用户2的账户异常,返回失败,会怎么样?我们修改代码,模拟这种情况:

在 InApi 加多一个转入Try失败的处理接口

app.MapPost("/api/TransInTryError", (IBranchBarrierFactory bbFactory, HttpContext context, TransRequest req) =>
{
    var bb = bbFactory.CreateBranchBarrier(context.Request.Query);

    Console.WriteLine($"用户【{req.UserId}】转入【{req.Amount}】Try--失败,bb={bb}");

    return Results.Ok(TransResponse.BuildFailureResponse());
});

再来看一下事务失败交互的时序图

这个跟成功的 TCC 差别就在于,当某个子事务返回失败后,后续就回滚全局事务,调用各个子事务的 Cancel 操作,保证全局事务全部回滚。

再调整一下调用方,把转入 Try 操作替换成上面这个返回错误的接口。

var cts = new CancellationTokenSource();

var gid = await dtmClient.GenGid(cts.Token);

var res = await tccGlobalTransaction.Excecute(gid, async (tcc) =>
{
    var res1 = await tcc.CallBranch(userOutReq, outApi + "/TransOutTry", outApi + "/TransOutConfirm", outApi + "/TransOutCancel", cts.Token);
    var res2 = await tcc.CallBranch(userInReq, inApi + "/TransInTryError", inApi + "/TransInConfirm", inApi + "/TransInCancel", cts.Token);

    Console.WriteLine($"case2, branch-out-res= {res1} branch-in-res= {res2}");
}, cts.Token);

Console.WriteLine($"case2, {gid} tcc 提交结果 = {res}");

需要注意的是 CallBranch 方法在对应的微服务返回失败后会抛出异常,进而触发全局事务的回滚操作,这个时候 dtm 才会触发 Cancel 的操作。

运行结果如下:

重点看三个地方,

  • 转入的 Cancel 操作并没有执行,因为这里模拟的是转入失败的情况,子事务屏障判定为空补偿了
  • 没有输出分支调用的结果,是因为执行第二个分支的时候没有返回成功的结果
  • 输出的提交结果为空,表明这个事务是失败的,成功的话会返回这个事务的 gid

在这篇文章里,通过 2 个简单的例子,完整给出了编写一个 TCC 事务的过程,涵盖了正常成功完成,异常回滚的情况。

希望对研究分布式事务的您有所帮助。

本文示例代码: DtmTccDemo


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK