6

使用Redis Stream来做消息队列和在Asp.Net Core中的实现

 3 years ago
source link: https://www.cnblogs.com/xiaxiaolu/p/15270334.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

使用Redis Stream来做消息队列和在Asp.Net Core中的实现

Redis - Wikipedia

我一直以来使用redis的时候,很多低烈度需求(并发要求不是很高)需要用到消息队列的时候,在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的服务,kafka和RabbitMQ等;

奈何这兄弟一直不给力;

虽然 Redis 的Pub/Sub 是实现了发布/订阅的,但这家伙最坑的是:丢数据

由于Pub/Sub 只是简单的实现了发布订阅模式,简单的沟通起生产者和消费者,当接收生产者的数据后并立即推送或者说转发给订阅消费者,并不会做任何的持久化、存储操作。由此:

  1. ​ 消费者(客户端)掉线;
  2. ​ 消费者未订阅(所以使用的时候一定记得先订阅再生产);
  3. ​ 服务端宕机;
  4. ​ 消费者消费不过来,消息堆积(生产数据受数据缓冲区限制);

以上情况都会导致生产数据的丢失,基于上坑,据我所知大家很少使用Pub/Sub ;

不过官方的哨兵集群通信的时候就是用的Pub/Sub;

然后,各路大佬结合队列、阻塞等等实现了各种各样的方案,主要是使用:BLPOP+LPUSH 的实现

这里就不一一展开了,有兴趣请看叶老板文章

可能是各种实现都会带来各种的问题,redis的官方也看到了社区的挣扎。终于,到了Redis5.0,官方带来了消息队列的实现:Stream

Redis Stream介绍

简单来说Redis Stream 就是想用Redis 做消息队列的最佳推荐;

XADD--发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 #再发一条

#也可在发布的时候指定消息长度 MAXLEN
XADD stream1 MAXLEN 10000  * name zhangshan age 19
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631628884174-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631628890025-0"

其中的'*'表示让 Redis 自动生成唯一的消息 ID,格式是 「时间戳-自增序号」

MAXLEN 指定消息长度的时候,每发布一条新消息将丢弃一条最旧消息;如不设置将保存每一条消息;

查看消息长度 XLEN

127.0.0.1:6379> XLEN stream1
(integer) 2

XREAD--订阅消息

订阅消息

XREAD COUNT 5 STREAMS stream1 0-0 
127.0.0.1:6379> XREAD COUNT 5 STREAMS stream1 0-0 
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

'0-0' 表示从开头读取

如果需继续拉取下一条,需传入上一条消息的id

阻塞等待消息

XREAD COUNT 5 BLOCK 50000 STREAMS stream1 1631628890025-0

阻塞等待消息id ‘1631628890025-0’ 后的消息

50000 阻塞时间(毫秒) ‘0’ 表示无限期阻塞

从到这里就可以看出 Pub/Sub多端订阅的最大优点,Stream也是支持的。有的同学很快就发现问题了,这里多端订阅后,没有消息确认ACK机制。

没错,因为现在所有的消费者都是订阅共同的消息,多端订阅,如果某个客户端ACK某条消息后,其他端消费不了,就实现不了多端消费了。

由此,引出 分组:GROUP

GROUP--订阅分组消息(多端订阅)

同样先发布消息

XADD stream1 * name hei age 18
XADD stream1 * name zhangshan age 19 
127.0.0.1:6379> XADD stream1 * name hei age 18
"1631629080208-0"
127.0.0.1:6379> XADD stream1 * name zhangshan age 19 
"1631629084083-0"

XGROUP CREATE 创建分组

创建分组1

XGROUP CREATE stream1 group1 0-0  
127.0.0.1:6379> XGROUP CREATE stream1 group1 0-0  
OK

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息

XREADGROUP--分组读取

分组 group1

XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  

consumer1 消费者名称, redis服务器会记住第一次使用的消费者名称;


127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
127.0.0.1:6379> XREADGROUP GROUP group1 consumer1 COUNT 5 STREAMS stream1 >  
(nil)

‘0-0’ 表示从开头读取

'>' 表示读取最新,未被消费过的消息 (可以看到命令执行第二遍已经读不到新消息了)

分组 group2

127.0.0.1:6379> XGROUP CREATE stream1 group2 0-0  
OK
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 >  
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19

可以看到可以读到同样的消息,多端订阅没有问题;

当然分组也支持阻塞读取:

#和XREAD一样
XREAD COUNT 5 BLOCK 0 STREAMS queue 1618469127777-0 

#分组阻塞
XREADGROUP GROUP group2 consumer1 COUNT 5 BLOCK 0 STREAMS stream1 > 

‘0’ 表示无限期阻塞,单位(毫秒)

XPENDING--待处理消息

消息使用XREADGROUP 读取后会进入待处理条目列表(PEL);

我们看看:

 XPENDING stream1 group2
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 4
2) "1631628884174-0"
3) "1631629084083-0"
4) 1) 1) "consumer1"
      2) "4"
  1. (integer) 4 //表示当前消费者组的待处理消息的数量
  2. "1631628884174-0" //消息最大id
  3. "1631629084083-0" //最小id
      1. "consumer1" // 消费者名称
      2. "4" //消费者待处理消息数量

XACK--删除已处理消息(消息确认机制)

我们已经知道group2待处理消息有4条,我们从头读取看看:

XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      4) 1) "1631629084083-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"

假设最后一条消息 ‘1631629084083-0’ 我已处理完成

127.0.0.1:6379> XACK stream1 group2 1631629084083-0
(integer) 1
127.0.0.1:6379> XREADGROUP GROUP group2 consumer1 COUNT 5 STREAMS stream1 0-0
1) 1) "stream1"
   2) 1) 1) "1631628884174-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
      2) 1) "1631628890025-0"
         2) 1) "name"
            2) "zhangshan"
            3) "age"
            4) "19"
      3) 1) "1631629080208-0"
         2) 1) "name"
            2) "hei"
            3) "age"
            4) "18"
127.0.0.1:6379>  XPENDING stream1 group2
1) (integer) 3
2) "1631628884174-0"
3) "1631629080208-0"
4) 1) 1) "consumer1"
      2) "3"

可以清楚看到goroup2 待处理消息剩下3条;

这时 Redis 已经把这条消息标记为「处理完成」不再追踪;

Stream在Asp.net Core中的使用

private static string _connstr = "172.16.3.119:6379";
private static string _keyStream = "stream1";
private static string _nameGrourp = "group1";
private static string _nameConsumer = "consumer1";
csRedis.XAdd(_keyStream, "*", ("name", "message1"));
static async Task CsRedisStreamConsumer()
{
    Console.WriteLine("CsRedis StreamConsumer start!");

    var csRedis = new CSRedis.CSRedisClient(_connstr);
    csRedis.XAdd(_keyStream, "*", ("name", "message1"));

    try
    {
        csRedis.XGroupCreate(_keyStream, _nameGrourp);
    }
    catch { }

    (string key, (string id, string[] items)[] data)[] product;
    (string Pid, string Platform, string Time) data = (null, null, null);

    while (true)
    {
        try
        {
            product = csRedis.XReadGroup(_nameGrourp, _nameConsumer, 1, 10000, (_keyStream, ">"));
            if (product?.Length > 0 == true && product[0].data?.Length > 0 == true)
            {
                Console.WriteLine($"message-id:{product.FirstOrDefault().data.FirstOrDefault().id}");

                product.FirstOrDefault().data.FirstOrDefault().items.ToList().ForEach(value =>
                {
                    Console.WriteLine($"    {value}");
                });

                //csRedis.XAck(_keyStream, _nameGrourp, product[0].data[0].id);
            }
        }
        catch (Exception)
        {
            //throw;
        }
    }
}

CSRedisCore

动画2

这里的超时报错可通过修改连接参数:syncTimeout 解决

CSRedisCore支持阻塞读取;

StackExchange.Redis

db.StreamAdd(_keyStream, "name", "message1", "*");
static async Task StackExchangeRedisStreamConsumer()
{
    Console.WriteLine("StackExchangeRedis StreamConsumer start!");

    var redis = ConnectionMultiplexer.Connect(_connstr);
    var db = redis.GetDatabase();

    try
    {
        ///初始化方式1
        //db.StreamAdd(_keyStream, "name", "message1", "*");
        //db.StreamCreateConsumerGroup(_keyStream, _nameGrourp);

        //方式2
        db.StreamCreateConsumerGroup(_keyStream, _nameGrourp, StreamPosition.NewMessages);
    }
    catch { }

    StreamEntry[] data = null;

    while (true)
    {
        data = db.StreamReadGroup(_keyStream, _nameGrourp, _nameConsumer, ">", count: 1, noAck: true);

        if (data?.Length > 0 == true)
        {
            Console.WriteLine($"message-id:{data.FirstOrDefault().Id}");

            data.FirstOrDefault().Values.ToList().ForEach(c =>
            {
                Console.WriteLine($"    {c.Name}:{c.Value}");
            });

            db.StreamAcknowledge(_keyStream, _nameGrourp, data.FirstOrDefault().Id);
        }
    }
}

动画

StackExchange.Redis 有点比较坑的是不存在阻塞读取;理由:https://stackexchange.github.io/StackExchange.Redis/PipelinesMultiplexers.html#multiplexing

Q:Stream是否支持AOF、RDB持久化?

A:支持,其它数据类型一样,每个写操作,也都会写入到 RDB 和 AOF 中。

Q:Stream是否还是会丢数据?若是,何种情况下?;

A:会;

1、AOF是定时写盘的,如果数据还在内存中时redis服务宕机就会;

2、主从切换时(从库还未同步完成主库发来的数据,就被提成主库);

3、消息队列超MAXLEN限制;

技术中有的时候没有“银弹”,只有更适合的技术,汝之蜜糖彼之砒霜;

很多时候的技术选型都是个比较麻烦的东西,对选型人的要求很高;你可能不是只需要熟悉其中的一种路线,而是要踩过各种各样的坑,再根据当前受限的环境,选择比较适合目前需求/团队的;

回到Stream上,我认为目前Stream能满足挺大部分队列需求;

特别是“在项目本身已经使用了Redis的情况下都想直接用Redis来做消息队列,而不想引入新的更专业的mq,比如kafka和RabbitMQ的时候”

当然,最终决定需要用更专业的mq与否的,还是需求;

http://www.redis.cn/

https://database.51cto.com/art/202104/659208.htm

https://github.com/2881099/csredis/

https://stackexchange.github.io/StackExchange.Redis/Streams.html


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK