2

聊一聊异构系统间数据一致性 - 从此启程

 2 years ago
source link: https://www.cnblogs.com/fancunwei/p/16125202.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

聊一聊异构系统间数据一致性

380359-20220410112000920-540147405.png 之前忙活过一个多方合作的项目,异构系统间维护数据一致性有时候是个头疼但必须解决的问题,今天我从背景、问题、解决思路、技术方案等几个方面浅谈一下,抛砖引玉。

近两年我司承接了某个持续性的会议项目,即每季度或月不定期举行会议。本项目目前有三个主要供应方(面向用户的A方,数据中间B方,会议数据同步C方【我司】)。 为了方便演示问题,以下流程和职责都做了裁剪。

简化流程如下: 380359-20220410112121838-498473156.png 简化职责如下:

  • A方职责: 用户通过官网/小程序进行报名,A方调用B方的标准接口,不存储数据
  • B方职责:作为ISP,提供标准查询、新增、修改等相关接口,几乎不提供定制。基于表单和表单数据,完成数据存储与流转。
  • C方职责:提供导入/更新/审核/注销等入口,新数据会通知到B方,B方数据新增/更新也会通知到C方。

从图例来看,B方/C方数据存储方面是冗余的。但B方只存储了核心数据,提供不了太多业务行为,C方具有业务需要的全套流程,但在此项目中作为后方支持及后续现场支持,三方形成了一种生态和谐。本篇博客主旨在讨论多方异构系统之间如何保证数据的一致性。

产品/项目

从标准Sass系统来讲,这样的多方交互,不利于系统稳定性,有诸多不可控因素。但从项目角度,这是各方考虑/斗争/谈判/费用等综合因素下友好协商的结果。 当然这是一个私有部署项目,所以会有很多坚持和妥协。

大领导提到一个说法:项目是要交付的,功能完美是产品考虑的。在功能不完善的情况下,如何去交付?

最后的兜底

哎,一言难尽。是通宵了几次核对/修复数据的,这是最后的办法了。为了苦逼不再重现,今年要对整个线动一动手术。(说好的.net 不996呢?)(拿着白菜价操着卖白粉的心)。

  • C方 需要所有子会报名前,主会必须报名。
  • B方 各会之间的报名数据是无序到达的。
  • B方 任意报名数据更新或新增都会推送到C方,C方收到更新也会更新B方。这里有一些措施进行了拦截中断,但仍会频繁循环更新问题。这是目前现状(为什么会出现?太赶工?)
  • 无开发环境,需盲写代码,发到测试环境进行联调测试。
  • 调用链太长,日志过多,排错时需要根据调用各服务接口来判断走到了哪步,出现了一个问题。调用链能查到一些问题,但不容批量定位问题。单个查太难。
  • 高并发下,redis组件出现各种问题(timeout等)
  • token问题
  • 接口请求时间超长
  • 其他问题...

数据很大,也很小

大部分数据能对上,偶尔几十个或断断续续产生新问题的数据需要及时人工修复。功能有缺陷,人工也是一种交付办法,但不可持续,太他妈的累了。数据不一致,也是导致通宵核对/修复数据的一大原因。如果数据全一致,就不会那么辛苦了。

  • 明确项目是要继续做的
  • 目标产品化/更方便维护方向发展。一团队养一项目。
  • 有改进想法提出来,拉会推进
  • 缺人,招人(遥遥无期...)
  • 针对请求无序问题,引入延时队列,先处理主会、子会延迟几秒钟在处理。
  • 针对循环更新问题,记录B方数据来源,非必要情况下,不回更B方。必须终止掉。【冤冤相报何时了】
  • 针对排错困难问题,引入mysql记录新增报名的请求以及处理结果,可以更快查询处理结果。
  • 针对bug,测试根据各测试场景进行复测,按10/100/1000/3000/万级规模压测。提前发下问题。
  • 推进客户方一起做必要去重逻辑。

无论是标准产品还是交付项目,做任何改动都要评估。

  • 多沟通,大家都是站在一条线的。有利于事情解决的方案认同度会更高。
  • 预估花多少时间,有多少资源。
  • 能挤出来的空窗期有多久,客户方/产品方对于需求的急迫性有多强。
  • 基于场景测试,把缺陷优先级先列出来,根据空窗期先修复紧急缺陷。

把紧急且影响范围广的问题解决了,风险就小了很多了。80%的问题是由20%的因素造成的。 这也正符合程序优化中的时间/空间局部性。

进程运行时,在一段时间里,程序的执行往往呈现高度的局部性, 包括时间局部性和空间局部性。
时间局部性是一旦一个指令被执行了, 则在不久的将来,它可能再被执行。
空间局部性是一旦一个指令一个存储单元被访问,那么它附近的单元也将很快被访问.

mysql实现延迟队列

  • 优先处理主会,子会延时处理 380359-20220410112316725-1750198840.png 由于隐私问题,这里只列部分字段 380359-20220410112344430-958389933.png

  • 数据库轮询获取未处理数据 380359-20220410112409265-1844849804.png 这里如何提高消费速度,可以参考《计算机系统结构》中标量处理机的流水线的一些知识。

    380359-20220410112432757-1180936580.png

  • 首先要无相关,即按AccountId分组,分组内的数据是无冲突/相关的,可以分批进行。记录各任务状态,最后统一提交数据库状态,然后1s后继续轮询。这种类似静态流水线。动态流水线较为复杂,这里暂不做实现。

do
{
    var groupTemps = groupDatas.Skip((pageIndex - 1) * pageSize).Take(pageSize).ToList();
    var currentRecords = new List<QidianNotifydelayData>();
    foreach (var item in groupTemps)
    {
        currentRecords.AddRange(item.ToList());
    }
    var temp = taskFunc(currentRecords);
    taskList.Add(temp);
    pageIndex++;
}
while ((pageIndex - 1) * pageSize <= groupCount);

//等待全部执行
await Task.WhenAll(taskList.ToArray());

await _dbContext.CommitAsync();

Thread.Sleep(1);
  • 如果1s轮询觉得太浪费,后续可以根据请求发送标记位(下次轮询时间),有数据时,可以快速轮询,无数据时放宽时间。极端处理方式,当主会请求过来处理完成后,直接发起子会处理,但要考虑数据库是否能承受的住这种并发压力。

  • 如果考虑请求会重复执行,可以在执行内加redis锁。慎用for update,并发一大就over.


/// <summary>

/// 锁定执行。
/// </summary>


/// <param name="key"></param>
/// <param name="func"></param>
/// <param name="timeSpan"></param>
/// <returns></returns>
public async Task<BizResult<T>> LockExcute<T>(string key, Func<Task<BizResult<T>>> func, int timeSpan)
{
  var db = (this._cacheClient as RedisClient).Db;

  var mutexKey = string.Format("mutex:", key);
  if (await db.StringSetAsync(mutexKey, "1", TimeSpan.FromSeconds(timeSpan), When.NotExists))
  {
      try
      {
          var item = await func.Invoke();
          return item;
      }
      catch (Exception ex)
      {
          _logger.LogError("LockExcute:Exception:" + ex.Message);
          return BizResult.BusinessFailed<T>(-1, $"执行失败,Message:{ex.Message}");

      }
      finally
      {
          await db.KeyDeleteAsync(mutexKey);
      }
  }
  else
  {
      _logger.LogWarning($"LockExcute:Key:{key},正在处理中,请稍候");

      return BizResult.BusinessFailed<T>(-1, "正在处理中,请稍候");
  }
}

redis实现延迟队列

  • 由于业务中一个Account同时只能处理一个主会,如果在处理子会的时候,主会请求突然过来了,就会有问题,这里就需要加锁主会。引入了Redis延迟队列
  • 基于Redis ZSet有序集合实现。
  • 思路:当前时间戳和延时时间相加,也就是到期时间,存入Redis中,然后不断轮询,找到到期的,拿到再删除即可。
  • 目前实现缺点:不利于监控,未发起http请求处理业务,导致调用链有缺。
/// <summary>
 /// 3.入队列
 /// </summary>
 /// <param name="redisModel"></param>
 /// <returns></returns>
 public async Task EnqueueZset(DataToModel redisModel)
 {
     redisModel.UpdateTime = redisModel.UpdateTime.AddSeconds(5);// 最后更新时间 + 5秒
     var redisDb = _redisConnectionService.GetRedisConnectionMultiplexer().GetDatabase(0);//默认DB0
     if (redisDb != null)
     {
         IsoDateTimeConverter timeFormat = new IsoDateTimeConverter();
         timeFormat.DateTimeFormat = "yyyy-MM-dd HH:mm:ss.fff";
         await redisDb.SortedSetAddAsync(ZSet_Queue, JsonConvert.SerializeObject(redisModel, Formatting.Indented, timeFormat), redisModel.UpdateTime.ToTimeStamp());//得分 --放入redis
         _logger.LogInformation($"数据排队--入队列!redisModel:{JsonConvert.SerializeObject(redisModel)}");
     }
 }

rabbmit实现延迟队列

  • 死信队列过期--》重推信队列?暂未实现。

数据更新方案

  • 核心原则:先查询对比,有变更再更新。从B方数据过来的,尽量不再更新回去。减小并发量,控制复杂度。

数据核对方案

  • 待补充。未实现自动化。后期可以获取双方系统数据,汇总对比。

    380359-20220410112527544-1947220199.png

部署/压测/监控

Jmeter(来自于测试同学提供的脚本)

这里只做简单截图

  • 配置预定义参数 380359-20220410112759415-165990023.png

    380359-20220410112817942-643890287.png

  • 必要情况下配置后置处理程序
    380359-20220410112839536-593062033.png
    380359-20220410112859293-671289269.png
  • 配置好thread group,http request后,执行调用观察接口
  • 查询请求执行是否成功

    380359-20220410113023288-1946580004.png

  • 查看聚合报告

    380359-20220410113044069-203991732.png

kubernetes

  • kubectl get nodes 获取所有节点
  • kubectl get pod -A 查看所有服务,观察status和age 380359-20220410113108496-1124517193.png
  • kubectl logs [-f] [-p] POD [-c CONTAINER] 查看日志信息。

-c, --container="": 容器名
-f, --follow[=false]: 指定是否持续输出日志
--interactive[=true]: 如果为true,当需要时提示用户进行输入。默认为true
--limit-bytes=0: 输出日志的最大字节数。默认无限制
-p, --previous[=false]: 如果为true,输出pod中曾经运行过,但目前已终止的容器的日志
--since=0: 仅返回相对时间范围,如5s、2m或3h,之内的日志。默认返回所有日志。只能同时使用since和since-time中的一种
--since-time="": 仅返回指定时间(RFC3339格式)之后的日志。默认返回所有日志。只能同时使用since和since-time中的一种
--tail=-1: 要显示的最新的日志条数。默认为-1,显示所有的日志
--timestamps[=false]: 在日志中包含时间戳

mysql监控(来自于运维同学的反馈)

这里只截图简单信息

  • 通过云监控查看mysql状态[最大连接数/cpu/内存/慢查询/索引建议/锁]

调用链/日志

此处暂不截图。

380359-20220410113227191-779795526.png

  • 一期方案

    380359-20220410113248122-356116553.png

  • 二期方案
    380359-20220410113305506-1499913973.png
  • 三期方案

    380359-20220410113321333-1874483843.png 当然那是进展顺利的情况下,不顺利的情况下就变成了这样

    380359-20220410113338030-999986054.png

某些时候也会听到如下言论:

  • 一定要保证xx的信誉。
  • 今晚就不要睡觉了吧?大家多坚持一下。

就如现在的疫情封控一样,做好了精准防控一片赞歌,失控了就好好居家、共渡难关。 网络和现实都会告诉你什么就是人间。

以上是关于定制化需求的一些解决方案,希望对未来类似产品或项目做个参考。本篇从问题着手,分析有利于解决/消除异构系统数据一致性办法。当然数据一致性也依赖于自身系统的高可用,这里未做过多描述,以后再说。

到此结束,谢谢观看!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK