5

RabbitMQ 入门系列:7、保障消息不重复消费:产生消息的唯一ID。 - 路过秋天

 2 years ago
source link: https://www.cnblogs.com/cyq1162/p/16603245.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上一篇介绍了消息如何确保不丢失,本篇简单介绍如何保障消息不重复消费的处理方式。

对于接收端而言,对消息的确认来往是需要时间的。

如果同一个队列,同时存在多个客户端监听,那么多个客户端有一定概率能收到相同的信息。

这时候就会产生消息重复的问题的:

1、处理消息重复消费的几种方式:

网上人们说的主要两种方式:

方式一、Redis:用setnx命令,做消息id判断。

方式三、数据库:做唯一索引,消息id能插入就可以处理,所以重复消费就会失败。

优缺点说明:

方式一:Redis setnx 分布式锁:

不推荐使用:一种不太靠谱的方式,如果项目对数据允许出错,可以尝试使用。

为啥不靠谱,可以参考文章:https://zhuanlan.zhihu.com/p/418268774

方式二:数据库 的锁:

推荐使用:可以用唯一索引,也可以用事务锁,使用起来成熟又简单。

方式三:独占文件、剪贴版

推荐场景:单机场景下

1、可以通过对文件的独占打开和释放,来达到进程间的锁。

2、可以通过对剪贴版的读写,来达到进程间的锁。

前面几种方式,都有一个要素,就是需要消息的唯一ID。

2、如何产生消息的唯一ID:

A:对消息进行hash,取hash值做为唯一ID(无法避免,有一定概念产生相同的hash)。

B:对每一个发送的消息,都产生一个GUID,这样在获取消息的时候,就可以拿到消息对应的唯一ID。

下面看代码演示:

3、发送消息时,带唯一ID:

using (var channel = Rabbit.Instance.DefaultConnection.CreateModel())
{
    channel.BasicReturn += (sender, e) =>
    {
        //通过交换机发送过去,但没发送到指定的队列,数据丢失
        //do .....
        Console.WriteLine("消息没发到指定队列:" + Encoding.UTF8.GetString(e.Body.ToArray()));
    };
    channel.ConfirmSelect();
    channel.QueueDeclare("FirstQueue", false, false, false);
    var pro = channel.CreateBasicProperties();
    pro.MessageId = Guid.NewGuid().ToString();
    channel.BasicPublish("", "FirstQueue", true, pro, Encoding.UTF8.GetBytes("这是要发送的内容"));
    if (channel.WaitForConfirms(TimeSpan.FromSeconds(10)))
    {
        //发送确认成功
    }
    else
    {
        //超时或失败,需要处理是否重发消息。
    }
}

在消息发送前,生成一个属性,这个属性可以附带很多信息,其中一个是MessageId。

然后发送的时候,第4个参数,把属性带过去即可。

4、接收消息:获取消息唯一ID:

var channel = Rabbit.Instance.DefaultConnection.CreateModel();
 
var consumer = new EventingBasicConsumer(channel);
consumer.Received += (model, ea) =>
{
    var message = Encoding.UTF8.GetString(ea.Body.ToArray());
   
    Console.WriteLine("收到默认消息 {0}", message);
    Console.WriteLine("收到默认消息GUID {0}", ea.BasicProperties.MessageId);
    try
    {
        channel.BasicAck(ea.DeliveryTag, false);
    }
    catch (Exception err)
    {

       //处理确认失败的情况。
    }
   
   
};
channel.BasicConsume(queue: "FirstQueue",
                      autoAck: false,
                      consumer: consumer);

17408-20220819202138106-768681478.png

本篇介绍如何保障消息不重复消费以及如何产生消息的唯一ID,除了网上的基本两种方式,个人还奉献了单机版的场景方式。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK