0

RabbitMQ 入门系列:6、保障消息:不丢失:发送方、Rabbit存储端、接收方。 - 路过秋...

 1 year ago
source link: https://www.cnblogs.com/cyq1162/p/16603046.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

本篇简单介绍如何保障消息不丢失的处理方式。

1、保障消息不丢失:发送方

主要是通过消息确认或事务,来保障这个过程,下面见具体代码:

1、通过确认机制处理的代码:

using RabbitMQ.Client;
using System.Text;using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.ConfirmSelect();//开启确认
    channel.QueueDeclare("FirstQueue", false, false, false);
    channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));
    if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))//设置最长超时时间
    {
        //发送确认成功
    }
    else
    {
        //超时或失败,需要处理是否重发消息。
    }
}

2、通过事务机制处理的代码:

using RabbitMQ.Client;
using System.Text;

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.TxSelect();
    channel.QueueDeclare("FirstQueue", false, false, false);
    channel.BasicPublish("", "FirstQueue", false, null, Encoding.UTF8.GetBytes("这是要发送的内容"));
    try
    {
        channel.TxCommit();
    }
    catch (Exception)
    {
        channel.TxRollback();
        //处理事务提交失败的逻辑。
    }
}

2、保障消息不丢失:RabbitMQ端

对于RabbitMQ端的消息保障,我们人为可以处理的是,设置创建的队列或消息是否持久化。

通过创建持久化的队列或消息,可以保障消息写入硬盘,重启时仍能还原信息。

//第二个参数:是否持久化
channel.QueueDeclare("FirstQueue", true, false, false);

3、保障消息不丢失:接收方

接收方主要是通过消息确认,来指示是否收到信息。

var channel = Rabbit.Instance.DefaultConnection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine("收到默认消息 {0}", message);
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: true,
                      consumer: consumer);

在以上代码中,通过指定authAck可以自动回应收到信息。

当然,有需要也可以手动回应:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();

var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
    Console.WriteLine("收到默认消息 {0}", message);
  try
    {
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception err)
    {

       //处理确认失败的情况。
    }
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: false,
                      consumer: consumer);
为了避免消息丢失问题,消息的确认,最好在是业务处理完再进行确认。

否则会出现第三方中介出问题时,或业务处理出问题时,或刚确认好消息,业务还没处理就系统异常,导致消息未消费就丢失的问题。

4、发送方:还有一种情况:通过交换机发送过去,但交换机没送到指定的队列时

这时候应答也是正常的,但数据丢失,这种情况,是这样处理的:

17408-20220819183845451-17267535.png
1、发送信息BasicPublish方法的第三个参数:mandatory设置为true。

2、定义接收的回调:BasicReturn事件。

本篇简单介绍如何使用RabbitMQ消息时,做到消息的可靠性,不丢失。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK