6

.NetCore中使用分布式事务DTM的二阶段消息 - 以往清泉

 1 year ago
source link: https://www.cnblogs.com/xwc1996/p/17252311.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

.NetCore中使用分布式事务DTM的二阶段消息

二阶段消息是DTM新提出的,可以完美代替现有的事务消息和本地消息表架构。无论从复杂度、性能、便利性还是代码量都是完胜现有的方案。

相比现有的消息架构借助于各种消息中间件比如RocketMQ等,DTM自己实现了无需额外的学习成本。它能够保证本地事务的提交和全局事务提交是“原子的”,适合解决不需要回滚的分布式事务场景

二阶段消息保证提交的原子性和如何保证业务成功执行如下时序图:

988132-20230327173707743-181074124.png

 二阶段消息主要是指PrepareSubmit两个阶段,主程序向DTM服务发送Prepare消息,成功后执行本地事务,完成本地事务后发送Submit消息至DTM服务,之后DTM会调用分支事件执行其他服务,最后完成全局事务。

 当发送了Prepare但是Submit没有提交的话,会进行回调请求来确认消息的情况,具体工作过程如下:

 1、在处理本地事务时,会将gid插入到barrier表中,同时带上插入原因为committed。该表有一个唯一索引,主要字段为gid

 2、当进行回查时,二阶段消息的操作不是直接查gid是否存在,而是再insert ignore一条带有相同gid的数据,同时带上插入原因为rollbacked。此时如果表中如果已有gid的记录,那么新的插入操作就会被ignore,否则数据会被插入。

 3、然后再用gid查询表中的记录,如果查到记录的reasoncommitted,那么说明本地事务已提交;如果查到记录的reasonrollbacked,那么说明本地事务已回滚。

二、安装DTM

 我使用二进制包下载安装地址,我是Window环境所以下载后解压,点击dtm.exe进行运行即可,如下启动成功

988132-20230327112243545-632255591.png

启动成功后可以访问http://localhost:36789,进入管理后台

三、创建DTM所需的表

我们需要创建一个表处理消息的回查,表里保存全局事务ID,具体作用在后续说明,我这里用的SqlServer数据库,所以执行如下:

CREATE TABLE [dbo].[barrier]
(
    [id] bigint NOT NULL IDENTITY(1,1) PRIMARY KEY,
    [trans_type] varchar(45) NOT NULL DEFAULT(''),
    [gid] varchar(128) NOT NULL DEFAULT(''),
    [branch_id] varchar(128) NOT NULL DEFAULT(''),
    [op] varchar(45) NOT NULL DEFAULT(''),
    [barrier_id] varchar(45) NOT NULL DEFAULT(''),
    [reason] varchar(45) NOT NULL DEFAULT(''),
    [create_time] datetime NOT NULL DEFAULT(getdate()) ,
    [update_time] datetime NOT NULL DEFAULT(getdate())
)

GO

CREATE UNIQUE INDEX[ix_uniq_barrier] ON[dbo].[barrier]
        ([gid] ASC, [branch_id] ASC, [op] ASC, [barrier_id] ASC)
WITH(IGNORE_DUP_KEY = ON)

GO

这里比较关键的是那个唯一索引,有一个IGNORE_DUP_KEY = ON,这个其实就是为了等价mysqlinsert ignore表示存在相关字段的信息则不插入,否则就插入数据

当然还支持很多其他的数据库,建表语句可以从这里查看地址

 四、创建项目

 我们简单的创建两个.net core webapi项目进行测试,两个项目都进行相同的如下操作:

 1、安装Dtmcli和Microsoft.EntityFrameworkCore.SqlServer

 安装Dtmcli是因为其中已经帮我们集成了DTM客户端SDK HTTP版本,想要GRPC版本可以安装Dtmgrpc

 安装Microsoft.EntityFrameworkCore.SqlServer很显然是为了处理数据库。

Install-Package Dtmcli
Install-Package Microsoft.EntityFrameworkCore.SqlServer

2、配置

接下来我们配置服务,先在配置文件appsetting.json中添加如下

  "AppSettings": {
    "DtmUrl": "http://localhost:36789",
    "BusiUrl": "http://localhost:5056",
    "QueryPreparedUrl": "http://localhost:5046",
    "BarrierConn": "Data Source=.;Initial Catalog=HTGL;TrustServerCertificate=True;;Integrated Security=True"
  }

 DtmUrlDTM的监听地址,http的是36789grpc的是36790

 BusiUrl:访问其他服务的地址

 QueryPreparedUrl:回查的地址

 BarrierConn:数据库连接语句

 添加一个配置类:

    public class AppSettings
    {
        public string DtmUrl { get; set; }
        public string BusiUrl { get; set; }
        public string BarrierConn { get; set; }
        public string QueryPreparedUrl { get; set; }
    }

 之后注入服务如下:

builder.Services.AddDtmcli(dtm => { 
    dtm.DtmUrl = builder.Configuration.GetValue<string>("AppSettings:DtmUrl");
    dtm.SqlDbType = DtmCommon.Constant.Barrier.DBTYPE_SQLSERVER;
    dtm.BarrierSqlTableName = "[HTGL].[dbo].[barrier]";
});
builder.Services.Configure<AppSettings>(builder.Configuration.GetSection("AppSettings"));

SqlDbType:表示使用的数据库类型

BarrierSqlTableNameBarrier表的名字

3、添加代码

我们在其中一个项目添加主程序代码如下:

    [ApiController]public class DtmController : ControllerBase
    {

        private readonly ILogger<DtmController> _logger;
        private readonly IDtmClient _dtmClient;
        private readonly IDtmTransFactory _transFactory;
        private readonly AppSettings _settings;
        private readonly IBranchBarrierFactory _factory;
        public DtmController(ILogger<DtmController> logger, IDtmClient dtmClient,IDtmTransFactory transFactory, IOptions<AppSettings> settings, IBranchBarrierFactory factory)
        {
            _logger = logger;
            _dtmClient = dtmClient;
            _transFactory = transFactory;
            _settings = settings.Value;
            _factory = factory;
        }
        private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
        [HttpPost("post-dtm-msg")]
        public async Task<IActionResult> Get(CancellationToken cancellationToken)
        {
            //1、创建gid
            var gid = await _dtmClient.GenGid(cancellationToken);
            //2、设置分支事务
            var msg = _transFactory.NewMsg(gid)
                .Add(_settings.BusiUrl + "/TransOut", new { id = 123 })
                .Add(_settings.BusiUrl + "/TransIn", new { id = 321 });//3、执行submit
            using (DbConnection conn = GetConn())
            {
                await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                {
                    //4、执行本地事务
                    await Task.CompletedTask;
                });
            }
            _logger.LogInformation("result gid is {0}", gid);
            return Content("SUCCESS");
        }
        [HttpGet("msg-queryprepared")]
        public async Task<IActionResult> QueryPrepared(CancellationToken cancellationToken)
        {
            var bb = _factory.CreateBranchBarrier(Request.Query);
            _logger.LogInformation("bb {0}", bb);
            using (DbConnection conn = GetConn())
            {
                //回调查询消息状态
                var res = await bb.QueryPrepared(conn);
                return Ok(new { dtm_result = res });
            }
        }
    }

然后我们向另一个服务项目添加如下代码,作为一个简单的服务方法,没有任何操作只是返回成功:

[ApiController]
    public class TransController : ControllerBase
    {
        private readonly ILogger<TransController> _logger;
        private readonly IBranchBarrierFactory _factory;
        private readonly AppSettings _settings;
        private DbConnection GetConn() => new Microsoft.Data.SqlClient.SqlConnection(_settings.BarrierConn);
        public TransController(ILogger<TransController> logger, IBranchBarrierFactory factory, IOptions<AppSettings> settings)
        {
            _logger = logger;
            _factory = factory;
            _settings = settings.Value;
        }
        [HttpPost("TransIn")]
        public async Task<IResult> In()
        {
            return Results.Ok(new { dtm_result = "SUCCESS" });
            //return Results.Ok(new { dtm_result = "FAILURE" });
        }
        [HttpPost("TransOut")]
        public async Task<IResult> Out()
        {
            return Results.Ok(new { dtm_result = "SUCCESS" });
        }
    }

五、执行查看结果

我们正常执行,可以看到下面的动图结果,在执行完本地事务后会访问分支事务,然后数据库表中添加了一条记录

988132-20230327193654694-1654296783.gif

可以在管理后台看到我们请求成功的信息

988132-20230327193748918-440521115.png

 如果要演示失败,需要做以下修改直接报错,我们可以看到访问了回调方法,然后数据库中看到rollback标记的消息

using (DbConnection conn = GetConn())
            {
                await msg.DoAndSubmitDB(_settings.QueryPreparedUrl + "/msg-queryprepared", conn, async tx =>
                {
                    throw new Exception("报错了");
                    //4、执行本地事务
                    await Task.CompletedTask;
                });
            }
988132-20230327195806568-2110754280.gif

 提交后再宕机演示比较麻烦,我就不演示了,大家意会即可。

 如果分支事务返回的不是SUCCESS而是FAILURE会由DTM隔一段时间重新请求,dtm对每个事务的重试是指数退避策略,具体为间隔是每失败一次,间隔加倍,避免过多的重试,导致系统负载异常上升。

 如果您经过长时间的的宕机,因指数退避算法导致要很久才会重试。如果您想要手动触发立即重试,您可以手动把相应事务的next_cron_time(Redis存储引擎的该功能还在开发中)修改为当前时间,就会在数秒内被定时轮询,事务就会继续往前执行。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK