6

NetCore微服务实现事务一致性masstransit之saga使用 - 星仔007

 2 years ago
source link: https://www.cnblogs.com/morec/p/16080026.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

NetCore微服务实现事务一致性masstransit之saga使用

demo如下,一个订单处理的小例子:

首先看看结果很简单:

核心代码如下:

using MassTransit;
using Microsoft.Extensions.DependencyInjection;
using Microsoft.Extensions.Logging;
using OrderProcessor.Event;
using ServiceModel;
using ServiceModel.Command;
using ServiceModel.DTO;
using ServiceModel.Event;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace OrderProcessor.Service
{
    public class OrderProcessorStateMachine:MassTransitStateMachine<ProcessingOrderState>
    {
        private readonly ILogger<OrderProcessorStateMachine> logger;

        public OrderProcessorStateMachine()
        {
            this.logger = GlobalServiceProvider.Instance.CreateScope().ServiceProvider.GetService<ILogger<OrderProcessorStateMachine>>();
            this.InstanceState(x => x.State);
            this.State(() => this.Processing);
            this.ConfigureCorrelationIds();
            this.Initially(this.SetOrderSummitedHandler());
            this.During(Processing, this.SetStockReservedHandler(), SetPaymentProcessedHandler(), SetOrderShippedHandler());
            SetCompletedWhenFinalized();
        }

        private void ConfigureCorrelationIds()
        {
            this.Event(() => this.OrderSubmitted, x => x.CorrelateById(c => c.Message.CorrelationId).SelectId(c => c.Message.CorrelationId));
            this.Event(() => this.StockReserved, x => x.CorrelateById(c => c.Message.CorrelationId));
            this.Event(() => this.PaymentProcessed, x => x.CorrelateById(c => c.Message.CorrelationId));
            this.Event(() => this.OrderShipped, x => x.CorrelateById(c => c.Message.CorrelationId));
        }

        private EventActivityBinder<ProcessingOrderState, IOrderSubmitted> SetOrderSummitedHandler() =>
            When(OrderSubmitted).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order))
                                .Then(c => this.logger.LogInformation($"Order submitted to {c.Data.CorrelationId} received"))
                                .ThenAsync(c => this.SendCommand<IReserveStock>("rabbitWarehouseQueue", c))
                                .TransitionTo(Processing);


        private EventActivityBinder<ProcessingOrderState, IStockReserved> SetStockReservedHandler() =>
            When(StockReserved).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order))
                               .Then(c => this.logger.LogInformation($"Stock reserved to {c.Data.CorrelationId} received"))
                               .ThenAsync(c => this.SendCommand<IProcessPayment>("rabbitCashierQueue", c));


        private EventActivityBinder<ProcessingOrderState, IPaymentProcessed> SetPaymentProcessedHandler() =>
            When(PaymentProcessed).Then(c => this.UpdateSagaState(c.Instance, c.Data.Order))
                                  .Then(c => this.logger.LogInformation($"Payment processed to {c.Data.CorrelationId} received"))
                                  .ThenAsync(c => this.SendCommand<IShipOrder>("rabbitDispatcherQueue", c));


        private EventActivityBinder<ProcessingOrderState, IOrderShipped> SetOrderShippedHandler() =>
            When(OrderShipped).Then(c =>
            {
                this.UpdateSagaState(c.Instance, c.Data.Order);
                c.Instance.Order.Status = Status.Processed;
            })
                              .Publish(c => new OrderProcessed(c.Data.CorrelationId, c.Data.Order))
                              .Finalize();

        private void UpdateSagaState(ProcessingOrderState state, Order order)
        {
            var currentDate = DateTime.Now;
            state.Created = currentDate;
            state.Updated = currentDate;
            state.Order = order;
        }

        private async Task SendCommand<TCommand>(string endpointKey, BehaviorContext<ProcessingOrderState, IMessage> context)
            where TCommand : class, IMessage
        {
            var sendEndpoint = await context.GetSendEndpoint(new Uri(""));
            await sendEndpoint.Send<TCommand>(new
            {
                CorrelationId = context.Data.CorrelationId,
                Order = context.Data.Order
            });
        }
        public  State Processing { get; private set; }
        public Event<IOrderSubmitted> OrderSubmitted { get; private set; }
        public Event<IOrderShipped> OrderShipped { get; set; }
        public Event<IPaymentProcessed> PaymentProcessed { get; private set; }
        public Event<IStockReserved> StockReserved { get; private set; }
        
    }
}
using MassTransit;
using MassTransit.MongoDbIntegration.Saga;
using OrderProcessor;
using OrderProcessor.Service;

var builder = WebApplication.CreateBuilder(args);

// Add services to the container.

builder.Services.AddControllers();

builder.Services.AddMassTransit(x =>
{
    x.UsingRabbitMq((context, cfg) =>
    {
        var connection = "amqp://lx:admin@ip:5672/my_vhost";//不加主机会报错
        cfg.Host(connection);
        cfg.UseDelayedRedelivery(r => r.Intervals(TimeSpan.FromMinutes(5), TimeSpan.FromMinutes(15), TimeSpan.FromMinutes(30)));
        cfg.UseMessageRetry(r => r.Immediate(5));

        cfg.ConfigureEndpoints(context);
        cfg.ReceiveEndpoint("", ep =>
        {
            ep.StateMachineSaga(new OrderProcessorStateMachine(), MongoDbSagaRepository<ProcessingOrderState>.Create("connecturl","db"));
        });

    });
});

var app = builder.Build();

app.Run();

这是整个订单的几个步骤。

想把代码都贴出来,过程梳理给大家参考,但是时间有限这个点没那么多了,而且我理应要把这个程序跑起来的。明天照常上班,暂不过多研究。

整个demo代码:

exercise/MassTransitDemo/MassTransitSagasDemo at master · liuzhixin405/exercise (github.com)

有兴趣可以还有一个demo:

exercise/MassTransitDemo/SagaTest-master at master · liuzhixin405/exercise (github.com)

masstransit官网:

MassTransit (masstransit-project.com)

不得不说这个东西真的很不错,不过暂时没找到翻译,大概的过了下文档,还有好多不清楚的,英文水平有限。demo都是来自外国大佬贡献的,很遗憾国内有这方面的文章,但是深入一点的都是国外友人的贡献,而且现成的微服务demo写的很好很多,视情况项目可借鉴。

 此demo有待后续完善,或大佬帮忙补充后,再完整这个随笔的流程和代码,今天只是起个头。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK