5

微服务简单实现最终一致性 - 星仔007

 2 years ago
source link: https://www.cnblogs.com/morec/p/16101228.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

微服务简单实现最终一致性

有花时间去研究masstransit的saga,英文水平不过关,始终无法实现上手他的代码编排的业务,遗憾。

本文通过rabbit和sqlserver实现下单,更新库存,更新产品,模拟数据最终一致性。

项目结构如下,reportService可有可无,这里就相当一个链条,只要两节走通了后面可以接龙,本文有用到不省略。流程:orderservice=>eComm=>reportservice 。

下面先看看order的配置,通过控制器新增订单同时发布订单信息到order_exchange交换机,Key是"order.created,这样就把订单推送到了队列,等到库存服务获取订单去更新库存。

// POST api/<OrderController>
[HttpPost]
public async Task Post([FromBody] OrderDetail orderDetail)
{
var id = await orderCreator.Create(orderDetail);
publisher.Publish(JsonConvert.SerializeObject(new OrderRequest {
OrderId = id,
ProductId = orderDetail.ProductId,
Quantity = orderDetail.Quantity
}), "order.created", null);
} 

更新库存的代码,然后再发送消息告诉order服务,这里有哪个try包裹,如果这里有失败会触发catch,发送减库存失败的消息。order服务消费到这条消息就会执行相应的删除订单操作。代码如下:

using Ecomm.DataAccess;
using Ecomm.Models;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace Ecomm
{
public class OrderCreatedListener : IHostedService
{
private readonly IPublisher publisher;
private readonly ISubscriber subscriber;
private readonly IInventoryUpdator inventoryUpdator;
public OrderCreatedListener(IPublisher publisher, ISubscriber subscriber, IInventoryUpdator inventoryUpdator)
{
this.publisher = publisher;
this.subscriber = subscriber;
this.inventoryUpdator = inventoryUpdator;
}
public Task StartAsync(CancellationToken cancellationToken)
{
subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<OrderRequest>(message);
try
{
inventoryUpdator.Update(response.ProductId, response.Quantity).GetAwaiter().GetResult();
publisher.Publish(JsonConvert.SerializeObject(
new InventoryResponse { OrderId = response.OrderId, IsSuccess = true }
), "inventory.response", null);
}
catch (Exception)
{
publisher.Publish(JsonConvert.SerializeObject(
new InventoryResponse { OrderId = response.OrderId, IsSuccess = false }
), "inventory.response", null);
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}
using Ecomm.Models;
using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System.Collections.Generic;
using System.Threading;
using System.Threading.Tasks;
namespace OrderService
{
public class InventoryResponseListener : IHostedService
{
private readonly ISubscriber subscriber;
private readonly IOrderDeletor orderDeletor;
public InventoryResponseListener(ISubscriber subscriber, IOrderDeletor orderDeletor)
{
this.subscriber = subscriber;
this.orderDeletor = orderDeletor;
}
public Task StartAsync(CancellationToken cancellationToken)
{
subscriber.Subscribe(Subscribe);
return Task.CompletedTask;
}
private bool Subscribe(string message, IDictionary<string, object> header)
{
var response = JsonConvert.DeserializeObject<InventoryResponse>(message);
if (!response.IsSuccess)
{
orderDeletor.Delete(response.OrderId).GetAwaiter().GetResult();
}
return true;
}
public Task StopAsync(CancellationToken cancellationToken)
{
return Task.CompletedTask;
}
}
}

  上面的代码是整个服务的核心业务,也很简单就是队列相互通信相互确认操作是否顺利,失败就执行回归操作,而这里我们都会写好对应补偿代码:

using Dapper;
using System;
using System.Collections.Generic;
using System.Data.SqlClient;
using System.Linq;
using System.Threading.Tasks;

namespace OrderService
{
    public class OrderDeletor : IOrderDeletor
    {
        private readonly string connectionString;

        public OrderDeletor(string connectionString)
        {
            this.connectionString = connectionString;
        }

        public async Task Delete(int orderId)
        {
            using var connection = new SqlConnection(connectionString);
            connection.Open();
            using var transaction = connection.BeginTransaction();
            try
            {
                await connection.ExecuteAsync("DELETE FROM OrderDetail WHERE OrderId = @orderId", new { orderId }, transaction: transaction);
                await connection.ExecuteAsync("DELETE FROM [Order] WHERE Id = @orderId", new { orderId }, transaction: transaction);
                transaction.Commit();
            }
            catch
            {
                transaction.Rollback();
            }
        }
    }
}

库存服务里有发布产品的接口,这里没有做过多的处理,只是把产品新增放到队列,供后面的ReportService服务获取,该服务拿到后会执行产品数量扣除:

using Microsoft.Extensions.Hosting;
using Newtonsoft.Json;
using Plain.RabbitMQ;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ReportService
{
    public class ReportDataCollector : IHostedService
    {
        private const int DEFAULT_QUANTITY = 100;
        private readonly ISubscriber subscriber;
        private readonly IMemoryReportStorage memoryReportStorage;

        public ReportDataCollector(ISubscriber subscriber, IMemoryReportStorage memoryReportStorage)
        {
            this.subscriber = subscriber;
            this.memoryReportStorage = memoryReportStorage;
        }

        public Task StartAsync(CancellationToken cancellationToken)
        {
            subscriber.Subscribe(Subscribe);
            return Task.CompletedTask;
        }
        private bool Subscribe(string message, IDictionary<string, object> header)
        //private bool ProcessMessage(string message, IDictionary<string, object> header)
        {
            if (message.Contains("Product"))
            {
                var product = JsonConvert.DeserializeObject<Product>(message);
                if (memoryReportStorage.Get().Any(r => r.ProductName == product.ProductName))
                {
                    return true;
                }
                else
                {
                    memoryReportStorage.Add(new Report
                    {
                        ProductName = product.ProductName,
                        Count = DEFAULT_QUANTITY
                    });
                }
            }
            else
            {
                var order = JsonConvert.DeserializeObject<Order>(message);
                if(memoryReportStorage.Get().Any(r => r.ProductName == order.Name))
                {
                    memoryReportStorage.Get().First(r => r.ProductName == order.Name).Count -= order.Quantity;
                }
                else
                {
                    memoryReportStorage.Add(new Report
                    {
                        ProductName = order.Name,
                        Count = DEFAULT_QUANTITY - order.Quantity
                    });
                }
            }
            return true;
        }

        public Task StopAsync(CancellationToken cancellationToken)
        {
            return Task.CompletedTask;
        }
    }
}

到这里整个流程大概如此。只要理清楚了订单和库存更新这里的业务,后面差不多一样,可以无限递归。代码文末有链接供下载。

这里有一个地方的代码如下,新增库存的时候同时发布消息。假如新增完订单后面崩掉了,这里是个原子操作最佳。

[HttpPost]
public async Task Post([FromBody] OrderDetail orderDetail)
{
var id = await orderCreator.Create(orderDetail);
publisher.Publish(JsonConvert.SerializeObject(new OrderRequest {
OrderId = id,
ProductId = orderDetail.ProductId,
Quantity = orderDetail.Quantity
}), "order.created", null);
}

  很遗憾masstransit的saga还没有整明白,那就上cap,完成业务一致性。加了点cap代码因为之前是dapper,所以加了dbcontext和cap相关代码有点小乱。核心代码如下:

using DotNetCore.CAP;
using MediatR;
using OrderService.Command;
using System.Threading;
using Ecomm.Models;
using System.Collections.Generic;
namespace OrderService.Handler
{
public class InsertOrderDetailHandler : IRequestHandler<InsertOrderDetailCommand, InsertOrderDetailModel>
{
private readonly OrderDbContext context;
private readonly ICapPublisher cap;
public InsertOrderDetailHandler(OrderDbContext context, ICapPublisher cap)
{
this.context = context;
this.cap = cap;
}
public async System.Threading.Tasks.Task<InsertOrderDetailModel> Handle(InsertOrderDetailCommand request, CancellationToken cancellationToken)
{
using(var trans =context.Database.BeginTransaction(cap))
{
var order =  context.Orders.Add(new Order
{
UpdatedTime = System.DateTime.Today,
UserId = request.UserId,
UserName = request.UserName
});
var orderDetail = context.OrderDetails.Add(new OrderDetail
{
OrderId = order.Entity.Id,
ProductId = request.ProductId,
Quantity = request.Quantity,
ProductName = request.ProductName,
});
context.SaveChanges();
cap.Publish<OrderRequest>("order.created", new OrderRequest
{
OrderId = order.Entity.Id,
ProductId = orderDetail.Entity.ProductId,
Quantity = orderDetail.Entity.Quantity
}, new Dictionary<string,string>()) ;
trans.Commit();
return new InsertOrderDetailModel { OrderDetailid = orderDetail.Entity.Id, OrderId = order.Entity.Id, Success = true };
}
}
}
}

 到这里差不多要结束了,这里的代码都可以调试运行的。因为加了cap,order服务有两套rabbitmq的配置,有冗余,而且有点坑。调试的时候注意,Plain.RabbitMQ支持的交换机不是持久化的,而cap是持久化的,所以有点不兼容。第一次运行可以先确保Plain.RabbitMQ正常,再删掉交换机,cap跑起来了再建持久化交换机,这样cap消息就会被rabbitmq接收,后面就会被库存服务消费。因为我这里cap不会自动绑定队列,Plain.RabbitMQ是可以的。所以需要新建交换机后再绑定队列。而且这里队列以Plain.RabbitMQ生成的名字来绑定。要不然又可能会调试踩坑无法出坑。 用cap不注意你连消息队列都看不到,看到了队列也看不到消费数据,这点不知道是我不会还是cap有什么难的配置。结束。。。

上例项目demo:

liuzhixin405/SimpleOrders_Next (github.com)

超简单微服务demo

liuzhixin405/SimpleOrders (github.com)


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK