9

.Net Core对于`RabbitMQ`封装分布式事件总线 - tokengo

 1 year ago
source link: https://www.cnblogs.com/hejiale010426/p/17110806.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

首先我们需要了解到分布式事件总线是什么;

分布式事件总线是一种在分布式系统中提供事件通知、订阅和发布机制的技术。它允许多个组件或微服务之间的协作和通信,而无需直接耦合或了解彼此的实现细节。通过事件总线,组件或微服务可以通过发布或订阅事件来实现异步通信。

例如,当一个组件完成了某项任务并生成了一个事件,它可以通过事件总线发布该事件。其他相关组件可以通过订阅该事件来接收通知,并做出相应的反应。这样,组件之间的耦合就被减轻了,同时也提高了系统的可维护性和可扩展性。

然后了解一下RabbitMQ

RabbitMQ是一种开源的消息代理和队列管理系统,用于在分布式系统中进行异步通信。它的主要功能是接收和分发消息,并且支持多种协议,包括AMQP,STOMP,MQTT等。RabbitMQ通过一个中间层,可以把消息发送者与消息接收者隔离开来,因此消息发送者和消息接收者并不需要在同一时刻在线,并且也不需要互相知道对方的地址。

  1. RabbitMQ的主要功能包括:
    1. 消息存储:RabbitMQ可以将消息存储在内存或硬盘上,以保证消息的完整性。
    2. 消息路由:RabbitMQ支持消息的路由功能,可以将消息从生产者发送到消费者。
    3. 消息投递:RabbitMQ提供了多种消息投递策略,包括简单模式、工作队列、发布/订阅模式等。
    4. 可靠性:RabbitMQ保证消息的可靠性,即消息不会丢失、不重复、按顺序投递。
    5. 可扩展性:RabbitMQ支持水平扩展,可以通过增加节点来扩展系统的处理能力。

本文将讲解使用RabbitMQ实现分布式事件

实现我们创建一个EventsBus.Contract的类库项目,用于提供基本接口,以支持其他实现

在项目中添加以下依赖引用,并且记得添加EventsBus.Contract项目引用

<ItemGroup>
	<PackageReference Include="Microsoft.Extensions.DependencyInjection.Abstractions" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options" Version="7.0.0" />
    <PackageReference Include="Microsoft.Extensions.Options.ConfigurationExtensions" Version="7.0.0" />
    <PackageReference Include="RabbitMQ.Client" Version="6.4.0" />
</ItemGroup>

创建项目完成以后分别创建EventsBusOptions.cs,IEventsBusHandle.cs,RabbitMQEventsManage.cs,ILoadEventBus.cs ,提供我们的分布式事件基本接口定义

EventsBusOptions.cs

namespace EventsBus.Contract;

public class EventsBusOptions
{
    /// <summary>
    /// 接收时异常事件
    /// </summary>
    public static Action<IServiceProvider, Exception,byte[]>? ReceiveExceptionEvent;
}

IEventsBusHandle.cs

namespace EventsBus.Contract;

public interface IEventsBusHandle<in TEto> where TEto : class
{
    Task HandleAsync(TEto eventData);
}

ILoadEventBus.cs

namespace EventsBus.Contract;

public interface ILoadEventBus
{
    /// <summary>
    /// 发布事件
    /// </summary>
    /// <param name="eto"></param>
    /// <typeparam name="TEto"></typeparam>
    /// <returns></returns>
    Task PushAsync<TEto>(TEto eto) where TEto : class;
}

EventsBusAttribute.cs:用于Eto(Eto 是我们按照约定使用的Event Transfer Objects(事件传输对象)的后缀. s虽然这不是必需的,但我们发现识别这样的事件类很有用(就像应用层上的DTO 一样))的名称,对应到RabbitMQ的通道

namespace EventsBus.RabbitMQ;

[AttributeUsage(AttributeTargets.Class)]
public class EventsBusAttribute : Attribute
{
    public readonly string Name;

    public EventsBusAttribute(string name)
    {
        Name = name;
    }
}

然后可以创建我们的RabbitMQ实现了,创建EventsBus.RabbitMQ类库项目,用于编写EventsBus.ContractRabbitMQ实现

创建项目完成以后分别创建Extensions\EventsBusRabbitMQExtensions.cs,Options\RabbitMQOptions.cs,EventsBusAttribute.cs,,RabbitMQFactory.cs,RabbitMQLoadEventBus.cs

Extensions\EventsBusRabbitMQExtensions.cs:提供我们RabbitMQ扩展方法让使用者更轻松的注入,命名空间使用Microsoft.Extensions.DependencyInjection,这样就在注入的时候减少过度使用命名空间了

using EventsBus.Contract;
using EventsBus.RabbitMQ;
using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Configuration;

namespace Microsoft.Extensions.DependencyInjection;

public static class EventsBusRabbitMQExtensions
{
    public static IServiceCollection AddEventsBusRabbitMQ(this IServiceCollection services,
        IConfiguration configuration)
    {
        services.AddSingleton<RabbitMQFactory>();
        services.AddSingleton(typeof(RabbitMQEventsManage<>));
        services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));
        services.AddSingleton<ILoadEventBus, RabbitMQLoadEventBus>();
        
        return services;
    }
}

Options\RabbitMQOptions.cs:提供基本的Options 读取配置文件中并且注入,services.Configure<RabbitMQOptions>(configuration.GetSection(nameof(RabbitMQOptions)));的方法是读取IConfiguration的名称为RabbitMQOptions的配置东西,映射到Options中,具体使用往下看。

using RabbitMQ.Client;

namespace EventsBus.RabbitMQ.Options;

public class RabbitMQOptions
{
    /// <summary>
    /// 要连接的端口。 <see cref="AmqpTcpEndpoint.UseDefaultPort"/>
    /// 指示应使用的协议的缺省值。
    /// </summary>
    public int Port { get; set; } = AmqpTcpEndpoint.UseDefaultPort;

    /// <summary>
    /// 地址
    /// </summary>
    public string HostName { get; set; }

    /// <summary>
    /// 账号
    /// </summary>
    public string UserName { get; set; }

    /// <summary>
    /// 密码
    /// </summary>
    public string Password { get; set; }
}

RabbitMQEventsManage.cs:用于管理RabbitMQ的数据接收,并且将数据传输到指定的事件处理程序

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;
using RabbitMQ.Client;
using RabbitMQ.Client.Events;

namespace EventsBus.RabbitMQ;

public class RabbitMQEventsManage<TEto> where TEto : class
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQEventsManage(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
        _ = Task.Run(Start);
    }

    private void Start()
    {
        var channel = _rabbitMqFactory.CreateRabbitMQ();
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        channel.QueueDeclare(name, false, false, false, null);
        var consumer = new EventingBasicConsumer(channel); //消费者
        channel.BasicConsume(name, true, consumer); //消费消息
        consumer.Received += async (model, ea) =>
        {
            var bytes = ea.Body.ToArray();
            try
            {
                // 这样就可以实现多个订阅
                var events = _serviceProvider.GetServices<IEventsBusHandle<TEto>>();
                foreach (var handle in events)
                {
                    await handle?.HandleAsync(JsonSerializer.Deserialize<TEto>(bytes));
                }
            }
            catch (Exception e)
            {
                EventsBusOptions.ReceiveExceptionEvent?.Invoke(_serviceProvider, e, bytes);
            }
        };
    }
}

RabbitMQFactory.cs:提供RabbitMQ链接工厂,在这里你可以自己去定义和管理RabbitMQ工厂

using EventsBus.RabbitMQ.Options;
using Microsoft.Extensions.Options;
using RabbitMQ.Client;

namespace EventsBus.RabbitMQ;

public class RabbitMQFactory : IDisposable
{
    private readonly RabbitMQOptions _options;
    private readonly ConnectionFactory _factory;
    private IConnection? _connection;

    public RabbitMQFactory(IOptions<RabbitMQOptions> options)
    {
        _options = options?.Value;
        // 将Options中的参数添加到ConnectionFactory
        _factory = new ConnectionFactory
        {
            HostName = _options.HostName,
            UserName = _options.UserName,
            Password = _options.Password,
            Port = _options.Port
        };
    }

    public IModel CreateRabbitMQ()
    {
        // 当第一次创建RabbitMQ的时候进行链接
        _connection ??= _factory.CreateConnection();

        return _connection.CreateModel();
    }

    public void Dispose()
    {
        _connection?.Dispose();
    }
}

RabbitMQLoadEventBus.cs:用于实现ILoadEventBus.cs通过ILoadEventBus发布事件RabbitMQLoadEventBus.cs是RabbitMQ的实现

using System.Reflection;
using System.Text.Json;
using EventsBus.Contract;
using Microsoft.Extensions.DependencyInjection;

namespace EventsBus.RabbitMQ;

public class RabbitMQLoadEventBus : ILoadEventBus
{
    private readonly IServiceProvider _serviceProvider;
    private readonly RabbitMQFactory _rabbitMqFactory;

    public RabbitMQLoadEventBus(IServiceProvider serviceProvider, RabbitMQFactory rabbitMqFactory)
    {
        _serviceProvider = serviceProvider;
        _rabbitMqFactory = rabbitMqFactory;
    }

    public async Task PushAsync<TEto>(TEto eto) where TEto : class
    {

        //创建一个通道
        //这里Rabbit的玩法就是一个通道channel下包含多个队列Queue
        using var channel = _rabbitMqFactory.CreateRabbitMQ();
        
        // 获取Eto中的EventsBusAttribute特性,获取名称,如果没有默认使用类名称
        var eventBus = typeof(TEto).GetCustomAttribute<EventsBusAttribute>();
        var name = eventBus?.Name ?? typeof(TEto).Name;
        
        // 使用获取的名称创建一个通道
        channel.QueueDeclare(name, false, false, false, null);
        var properties = channel.CreateBasicProperties();
        properties.DeliveryMode = 1;
        // 将数据序列号,然后发布
        channel.BasicPublish("", name, false, properties, JsonSerializer.SerializeToUtf8Bytes(eto)); //生产消息
        // 让其注入启动管理服务,RabbitMQEventsManage需要手动激活,由于RabbitMQEventsManage是单例,只有第一次激活才有效,
        var eventsManage = _serviceProvider.GetService<RabbitMQEventsManage<TEto>>();
        
        await Task.CompletedTask;
    }
}

在这里我们的RabbitMQ分布式事件就设计完成了,注:这只是简单的一个示例,并未经过大量测试,请勿直接在生产使用;

然后我们需要使用RabbitMQ分布式事件总线工具包

使用RabbitMQ分布式事件总线的示例

首先我们需要准备一个RabbitMQ,可以在官网自行下载,我就先使用简单的,通过docker compose启动一个RabbitMQ,下面提供一个compose文件

version: '3.1'
services:
  rabbitmq:
    restart: always # 开机自启
    image: rabbitmq:3.11-management # RabbitMQ使用的镜像
    container_name: rabbitmq # docker名称
    hostname: rabbit
    ports:
      - 5672:5672 # 只是RabbitMQ SDK使用的端口
      - 15672:15672 # 这是RabbitMQ管理界面使用的端口
    environment:
      TZ: Asia/Shanghai # 设置RabbitMQ时区
      RABBITMQ_DEFAULT_USER: token # rabbitMQ账号
      RABBITMQ_DEFAULT_PASS: dd666666 # rabbitMQ密码
    volumes:
      - ./data:/var/lib/rabbitmq

启动以后我们创建一个WebApi项目,项目名称Demo,创建完成打开项目文件添加引用

<Project Sdk="Microsoft.NET.Sdk.Web">

    <PropertyGroup>
        <TargetFramework>net7.0</TargetFramework>
        <Nullable>enable</Nullable>
        <ImplicitUsings>enable</ImplicitUsings>
    </PropertyGroup>

    <ItemGroup>
        <PackageReference Include="Microsoft.AspNetCore.OpenApi" Version="7.0.0" />
        <PackageReference Include="Swashbuckle.AspNetCore" Version="6.4.0" />
    </ItemGroup>

    <ItemGroup>
        <!-- 引用RabbitMQ事件总线项目-->
        <ProjectReference Include="..\EventsBus.RabbitMQ\EventsBus.RabbitMQ.csproj" />
    </ItemGroup>

</Project>

修改appsettings.json配置文件:将RabbitMQ的配置写上,RabbitMQOptions名称对应在EventsBus.RabbitMQ中的RabbitMQOptions文件![image-20230211022801105]

2415052-20230211024443025-1860568751.png

在这里注入的时候将配置注入好了

{
  "Logging": {
    "LogLevel": {
      "Default": "Information",
      "Microsoft.AspNetCore": "Warning"
    }
  },
  "AllowedHosts": "*",
  "RabbitMQOptions": {
    "HostName": "127.0.0.1",
    "UserName": "token",
    "Password": "dd666666"
  }
}

创建DemoEto.cs文件:

using EventsBus.RabbitMQ;

namespace Demo;

[EventsBus("Demo")]
public class DemoEto
{
    public int Size { get; set; }
    
    public string Value { get; set; }
}

创建DemoEventsBusHandle.cs文件:这里是订阅DemoEto事件,相当于是DemoEto的处理程序

using System.Text.Json;
using EventsBus.Contract;

namespace Demo;

/// <summary>
/// 事件处理服务,相当于订阅事件
/// </summary>
public class DemoEventsBusHandle : IEventsBusHandle<DemoEto>
{
    public async Task HandleAsync(DemoEto eventData)
    {
        Console.WriteLine($"DemoEventsBusHandle: {JsonSerializer.Serialize(eventData)}");
        await Task.CompletedTask;
    }
}

打开Program.cs 修改代码: 在这里注入了事件总线服务,和我们的事件处理服务

using Demo;
using EventsBus.Contract;

var builder = WebApplication.CreateBuilder(args);

builder.Services.AddControllers();

builder.Services.AddEndpointsApiExplorer();
builder.Services.AddSwaggerGen();

// 注入事件处理服务
builder.Services.AddSingleton(typeof(IEventsBusHandle<DemoEto>),typeof(DemoEventsBusHandle));

// 注入RabbitMQ服务
builder.Services.AddEventsBusRabbitMQ(builder.Configuration);

var app = builder.Build();

// 只有在Development显示Swagger
if (app.Environment.IsDevelopment())
{
    app.UseSwagger();
    app.UseSwaggerUI();
}

// 强制Https
app.UseHttpsRedirection();

app.UseAuthorization();

app.MapControllers();

app.Run();

创建Controllers\EventBusController.cs控制器:我们在控制器中注入了ILoadEventBus ,通过调用接口实现发布事件;

using EventsBus.Contract;
using Microsoft.AspNetCore.Mvc;

namespace Demo.Controllers;

[ApiController]
[Route("[controller]")]
public class EventBusController : ControllerBase
{
    private readonly ILoadEventBus _loadEventBus;

    public EventBusController(ILoadEventBus loadEventBus)
    {
        _loadEventBus = loadEventBus;
    }

    /// <summary>
    /// 发送信息
    /// </summary>
    /// <param name="eto"></param>
    [HttpPost]
    public async Task Send(DemoEto eto)
    {
        await _loadEventBus.PushAsync(eto);
    }
}

然后我们启动程序会打开Swagger调试界面:

2415052-20230211024427928-1325939634.png

然后我们发送一下事件:

2415052-20230211024422917-738311118.gif

我们可以看到,在数据发送的时候也同时订阅到了我们的信息,也可以通过分布式事件总线限流等实现,

来自Token的分享

技术交流群:737776595


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK