5

基于Redis实现消息队列的实践

 8 months ago
source link: https://www.51cto.com/article/778100.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

基于Redis实现消息队列的实践

作者:凡夫贩夫 2023-12-30 13:47:48
不支持消费者确认机制,稳定性不能得到保证,例如当消费者获取到消息之后,还没来得及执行就宕机了。因为没有消费者确认机制,Redis 就会误以为消费者已经执行了,因此就不会重复发送未被正常消费的消息了,这样整体的 Redis 稳定性就被没有办法得到保障了。

为什么要基于Redis实现消费队列?

消息队列是一种典型的发布/订阅模式,是专门为异步化应用和分布式系统设计的,具有高性能、稳定性及可伸缩性的特点,是开发分布式系统和应用系统必备的技术之一。目前,针对不同的业务场景,比较成熟可靠的消息中间件产品有RocketMQ、Kafka、RabbitMq等,基于Redis再去实现一个消息队列少有提及,那么已经有很成熟的产品可以选择,还有必要再基于Redis自己来实现一个消息队列吗?基于Redis实现的消息队列有什么特别的地方吗?

先来回顾一个Redis有哪些特性:

  1. 速度快:Redis是基于内存的key-value类型的数据库,数据都存放在内存中,使得读写速度非常快,能够达到每秒数十万次的读写操作。
  2. 键值对的数据结构:Redis中的数据以键值对的形式存储,使得查询和操作数据非常方便和高效。
  3. 功能丰富:Redis具有许多实用的功能,例如键过期、发布订阅、Lua脚本、事务和管道等。这些功能使得Redis能够广泛应用于各种场景,如缓存、消息系统等。
  4. 持久化:Redis提供了两种持久化方案,即RDB(根据时间生成数据快照)和AOF(以追加方式记录每次写操作)。两种方案可以互相配合,确保数据的安全性。
  5. 主从复制:Redis支持主从复制功能,可以轻松实现数据备份和扩展。主节点会将其数据复制给从节点,从而实现数据的冗余和备份。
  6. 高可用和分布式:Redis从2.8版本开始提供了高可用实现哨兵模式,可以保证节点的故障发现和故障自动转移。此外,Redis从3.0版本开始支持集群模式,可以轻松实现数据的分布式存储和扩展。

总结一下:redis的特点就是:快、简单、稳定;

以RocketMQ为代表,作为专业的消息中间件而言,有哪些特性呢:

  1. 高性能、高可靠:RocketMQ采用分布式架构,能够高效地处理大量消息,同时也具有高可靠性的特性,能够保证消息的不丢失和正确传递。
  2. 高实时:RocketMQ支持消息的实时传递,能够满足实时交易系统的需求,为系统提供及时、准确的消息。
  3. 事务消息:RocketMQ支持事务消息,能够在消息发送和接收过程中保持事务的一致性,确保消息的可靠性和系统的稳定性。
  4. 顺序消息:RocketMQ可以保证消息的有序性,无论是在一个生产者还是多个生产者之间,都能保证消息按照发送顺序进行消费。
  5. 批量消息:RocketMQ支持批量消息,能够一次性发送多条消息,提高消息发送效率。
  6. 定时消息:RocketMQ支持定时消息,能够在指定的时间将消息发送到指定的Topic,满足定时任务的需求。
  7. 消息回溯:RocketMQ支持消息回溯,能够根据需要将消息重新发送到指定的Topic,便于调试和错误处理。
  8. 多种消息模式:RocketMQ支持发布/订阅、点对点、群聊等多种消息模式,适用于不同的业务场景。
  9. 可扩展性:RocketMQ采用分布式架构,能够方便地扩展消息处理能力,支持多个生产者和消费者同时处理消息。
  10. 多语言支持:RocketMQ提供多种语言的客户端库,支持包括Java、Python、C++等在内的多种编程语言。

总结一下:RocketMQ的特点就是除了性能非常高、系统本身的功能比较专业、完善,能适应非常多的场景;

从上述分析可以看出,Redis队列和MQ消息队列各有优势,Redis的最大特点就是快,所以基于Redis的消息队列相比MQ消息队列而言,更适合实时处理,但是基于Redis的消息队列更易受服务器内存限制;而RocketMQ消息队列作为专业的消息中间件产品,功能更完善,更适合应用于比较复杂的业务场景,可以实现离线消息发送、消息可靠投递以及消息的安全性,但MQ消息队列的读写性能略低于Redis队列。在技术选型时,除了上述的因素外,还有一个需要注意:大多数系统都会引入Redis作为基础的缓存中间件使用,如果要选用RocketMQ的话,还需要额外再申请资源进行部署。

很多时候,所谓的优点和缺点,只是针对特定场景而言,如果场景不一样了,优点可能会变成缺点,缺点也可能会变成优点。因此,除了专业的消息中间件外,基于Redis实现一个消息队列也是有必要的,在某些特殊的业务场景,比如一些并发量不是很高的管理系统,某些业务流程需要异步化处理,这时选择基于Redis自己实现一个消息队列,也是一个比较好的选择。这也是本篇文章主要分享的内容。

消息队列的基础知识:

什么是队列?

队列(Queue)是一种数据结构,遵循先进先出(FIFO)的原则。在队列中,元素被添加到末尾(入队),并从开头移除(出队)。

图片

Java中有哪些队列?

  1. LinkedList:LinkedList实现了Deque接口,可以作为队列(FIFO)或栈(LIFO)使用。它是一个双向链表,所以插入和删除操作具有很高的效率。
  2. ArrayDeque:ArrayDeque也是一个双端队列,具有高效的插入和删除操作。与LinkedList相比,ArrayDeque通常在大多数操作中表现得更快,因为它在内部使用动态数组。
  3. PriorityQueue:PriorityQueue是一个优先队列,它保证队列头部总是最小元素。你可以自定义元素的排序规则。
  4. ConcurrentLinkedQueue:ConcurrentLinkedQueue是一个线程安全的队列,它使用无锁算法进行并发控制。它适用于高并发场景,但在低并发场景中可能比其他队列慢。
  5. LinkedBlockingQueue:LinkedBlockingQueue是一个线程安全的阻塞队列,它使用链表数据结构来存储数据。当队列为空时,获取元素的操作将会被阻塞;当队列已满时,插入元素的操作将会被阻塞。
  6. ArrayBlockingQueue:ArrayBlockingQueue是一个线程安全的阻塞队列,它使用数组数据结构来存储数据。与LinkedBlockingQueue相比,ArrayBlockingQueue的容量是固定的。
  7. PriorityBlockingQueue:PriorityBlockingQueue是一个线程安全的优先阻塞队列。与PriorityQueue类似,它保证队列头部总是最小元素。
  8. SynchronousQueue:SynchronousQueue是一个线程安全的阻塞队列,它只包含一个元素。当队列为空时,获取元素的操作将会被阻塞;当队列已满时,插入元素的操作将会被阻塞。
  9. DelayQueue:DelayQueue是一个无界阻塞队列,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。

LinkedBlockingQueue

以LinkedBlockingQueue为例,其使用方法是这样的:

创建了一个生产者线程和一个消费者线程,生产者线程和消费者线程分别对同一个LinkedBlockingQueue对象进行操作。生产者线程通过调用put()方法将元素添加到队列中,而消费者线程通过调用take()方法从队列中取出元素。这两个方法都会阻塞线程,直到队列中有元素可供取出或有空间可供添加元素。

import java.util.concurrent.LinkedBlockingQueue;  
  
public class LinkedBlockingQueueExample {  
    public static void main(String[] args) {  
        LinkedBlockingQueue<String> queue = new LinkedBlockingQueue<>();  
  
        // 生产者线程  
        new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    queue.put("Element " + i);  
                    System.out.println("Produced: Element " + i);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
        // 消费者线程  
        new Thread(() -> {  
            for (int i = 0; i < 10; i++) {  
                try {  
                    String element = queue.take();  
                    System.out.println("Consumed: " + element);  
                } catch (InterruptedException e) {  
                    e.printStackTrace();  
                }  
            }  
        }).start();  
    }  
}

基于Redis实现消息队列的几种方式

基于List数据类型

  • List 类型实现的方式最为简单和直接,它主要是通过 lpush、rpop 存入和读取实现消息队列的,如下图所示:
图片

图片

  • lpush 可以把最新的消息存储到消息队列(List 集合)的首部,而 rpop 可以读取消息队列的尾部,这样就实现了先进先出;
  • 优点:使用 List 实现消息队列的优点是消息可以被持久化,List 可以借助 Redis 本身的持久化功能,AOF 或者是 RDB 或混合持久化的方式,用于把数据保存至磁盘,这样当 Redis 重启之后,消息不会丢失。
  • 缺点:基于List类型实现的消息队列不支持重复消费、没有按照主题订阅的功能、不支持消费消息确认等功能,如果确实需要,需要自己实现。

基于Zset数据类型

  • 基于ZSet数据类型实现消息队列,是利用 zadd 和 zrangebyscore 来实现存入和读取消息的。
  • 优点:和基于List数据类型差不多,同样具备持久化的功能,不同的是消息数据存储的结构类型不一样;
  • 缺点:List 存在的问题它也同样存在,不支持重复消费,没有主题订阅功能,不支持消费消息确认,并且使用 ZSet 还不能存储相同元素的值。因为它是有序集合,有序集合的存储元素值是不能重复的,但分值可以重复,也就是说当消息值重复时,只能存储一条信息在 ZSet 中。

基于发布订阅模式

  • 基于发布订阅模式,是使用Pattern Subscribe 的功能实现主题订阅的功能,也就是 。因此我们可以使用一个消费者“queue_*”来订阅所有以“queue_”开头的消息队列,如下图所示:
  • 优点:可以按照主题订阅方式

无法持久化保存消息,如果 Redis 服务器宕机或重启,那么所有的消息将会丢失;

发布订阅模式是“发后既忘”的工作模式,如果有订阅者离线重连之后就不能消费之前的历史消息;

不支持消费者确认机制,稳定性不能得到保证,例如当消费者获取到消息之后,还没来得及执行就宕机了。因为没有消费者确认机制,Redis 就会误以为消费者已经执行了,因此就不会重复发送未被正常消费的消息了,这样整体的 Redis 稳定性就被没有办法得到保障了。

基于Stream类型

基于Stream 类型实现:使用 Stream 的 xadd 和 xrange 来实现消息的存入和读取了,并且 Stream 提供了 xack 手动确认消息消费的命令,用它我们就可以实现消费者确认的功能了,使用命令如下:

127.0.0.1:6379> xack mq group1 1580959593553-0
(integer) 1

消费确认增加了消息的可靠性,一般在业务处理完成之后,需要执行 ack 确认消息已经被消费完成,整个流程的执行如下图所示:

其中“Group”为群组,消费者也就是接收者需要订阅到群组才能正常获取到消息。

以上就是基于Redis实现消息队列的几种方式的简单对比介绍,下面主要是分享一下基于Redis的List数据类型实现,其他几种方式,有兴趣的小伙可以自己尝试一下。

基于Redis的List数据类型实现消费队列的工作原理是什么?

Redis基于List结构实现队列的原理主要依赖于List的push和pop操作。

在Redis中,你可以使用LPUSH命令将一个或多个元素推入列表的左边,也就是列表头部。同样,你可以使用RPUSH命令将一个或多个元素推入列表的右边,也就是列表尾部。

对于队列来说,新元素总是从队列的头部进入,而读取操作总是从队列的尾部开始。因此,当你想将一个新元素加入队列时,你可以使用LPUSH命令。当你想从队列中取出一个元素时,你可以使用RPOP命令。

此外,Redis还提供了BRPOP命令,这是一个阻塞的RPOP版本。如果给定列表内没有任何元素可供弹出的话,将阻塞连接直到等待超时或发现可弹出元素为止。

需要注意的是,虽然Redis能够提供原子性的push和pop操作,但是在并发环境下使用队列时,仍然需要考虑线程安全和并发控制的问题。你可能需要使用Lua脚本或者其他机制来确保并发操作的正确性。

总的来说,Redis通过提供List数据结构以及一系列相关命令,可以很方便地实现队列的功能。

下面是Redis关于List数据结构操作的命令主要包括以下几种:

  • LPUSH key value:将一个或多个值插入到列表的头部。
  • RPUSH key value:将一个或多个值插入到列表的尾部。
  • LPOP key:移除并获取列表的第一个元素。
  • RPOP key:移除并获取列表的最后一个元素。
  • LRANGE key start stop:获取指定索引范围内的元素。
  • LINDEX key index:获取指定索引位置的元素。
  • LLEN key:获取列表的长度。
  • LREM key count value:移除列表中指定数量的特定元素。
  • BRPOP key [key ...] timeout:移出并获取列表的最后一个元素,如果列表没有元素会阻塞直到等待超时或发现可弹出元素为止。

基于Redis的List数据类型实现延迟消息队列实战

以一个实际需求为例,演示一个基于Redis的延迟队列是怎么使用的?

有一个XX任务管理的功能,主要的业务过程:

1、创建任务后;

2、不断检查任务的状态,任务的状态有三种:待执行、执行中、执行完成;

3、如果任务状态是执行完成后,主动获取任务执行结果,对任务执行结果进行处理;如果任务状态是待执行、执行中,则延迟5秒后,再次查询任务执行状态;

图片

图片

1、依赖引入

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-redis</artifactId>
    <version>1.4.7.RELEASE</version>
</dependency>
<dependency>
    <groupId>org.redisson</groupId>
    <artifactId>redisson</artifactId>
    <version>3.23.1</version>
</dependency>

2、定义三个延迟队列BeforeQueue、RunningQueue、CompleteQueue,对队列的任务进行存取,BeforeQueue用于对待执行状态的任务的存取,Running用于对执行中状态的任务的存取,CompleteQueue用于对执行完成状态的任务的存取,在三个任务队列中,取出元素是阻塞的,即如果队列中没有新的任务,当前线程会一直阻塞等待,直到有新的任务进入;如果是队列中还有元素,则遵循先进先出的原则逐个取出进行处理;

@Component
@Slf4j
public class BeforeQueue {
    @Autowired
    private RedissonClient redissonClient;


    /**
     * <p>取出元素</p>
     * <p>如果队列中没有元素,就阻塞等待,直</p>
     * @return
     */
    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue1");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("从myqueue1取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    /**
     * <p>放入元素</p>
     * @param obj
     */
    public void offer(Object obj){
        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue1");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向myqueue1设置元素:{}",obj.toString());
    }
}
@Component
@Slf4j
public class RunningQueue {
    @Autowired
    private RedissonClient redissonClient;


    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue2");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("从myqueue2取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    public void offer(Object obj){
        RBlockingDeque<Object> queue1 = redissonClient.getBlockingDeque("queue2");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向myqueue2设置元素:{}",obj.toString());
    }
}
@Component
@Slf4j
public class CompleteQueue {
    @Autowired
    private RedissonClient redissonClient;


    public Object take(){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingQueue("queue3");
        Object obj = null;
        try {
            obj = queue1.take();
            log.info("从CompleteQueue取出元素:{}",obj.toString());
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        return obj;
    }


    public void offer(Object obj){
        RBlockingQueue<Object> queue1 = redissonClient.getBlockingDeque("queue3");
        RDelayedQueue<Object> delayedQueue = redissonClient.getDelayedQueue(queue1);
        delayedQueue.offer(obj,5, TimeUnit.SECONDS);
        log.info("向CompleteQueue设置元素:{}",obj.toString());
    }
}

3、定义三个监听器BeforeQueueListener、RunningQueueListener、CompleteQueueListener,监听器的主要作用主要就是负责监听三个队列中是否有新的任务 元素进入,如果有,则立即取出消费;如果没有,则阻塞等待新的元素进入,具体的实现逻辑是:新创建的任务会先放置到BeforeQueue中,BeforeQueueListener监听到有新的任务进入,会取出任务作一些业务处理,业务处理完一放入到RunningQueue中,RunningQueueListener监听到有新的任务进入,会取出任务再进行处理,这里的处理主要是查询任务执行状态,查询状态结果主要分两种情况:1、执行中、待执行状态,则把任务重新放入RunningQueue队列中,延迟5秒;2、执行完成状态,则把任务放置到CompleteQueue中;CompleteQueueListener监听到有新的任务进入后,会主动获取任务执行结果,作最后业务处理;

4、监听器在在处理队列中的数据相关的业务时,如果发生异常,则需要把取出的元素再重新入入到当前队列中,等待下一轮的重试;

@Component
@Slf4j
public class BeforeQueueListener implements Listener{
    @Autowired
    private BeforeQueue beforeQueue;
    @Autowired
    private RunningQueue runningQueue;
    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    log.info("监听器进入阻塞:BeforeQueueListener");
                    Object obj = beforeQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("开始休眠1s模拟业务处理:BeforeQueueListener,元素:{}",obj.toString());
                            Thread.currentThread().sleep(1000);
                            log.info("业务处理完成:BeforeQueueListener,元素:{}",obj.toString());
                            runningQueue.offer(obj);
                        } catch (InterruptedException e) {
                            log.error("业务处理发生异常,重置元素到BeforeQueue队列中");
                            log.error(e.getMessage());
                            beforeQueue.offer(obj);
                        }


                    }
                }
            }
        }).start();
    }
}
@Component
@Slf4j
public class RunningQueueListener implements Listener {


    @Autowired
    private RunningQueue runningQueue;
    @Autowired
    private CompleteQueue completeQueue;


    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true) {
                    log.info("监听器进入阻塞:RunningQueueListener");
                    Object obj = runningQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("开始休眠1s模拟业务处理:RunningQueueListener,元素:{}", obj.toString());
                            Thread.currentThread().sleep(1000);
                            Random random = new Random();
                            int i = random.nextInt(2);
                            if (i==0) {
                                test();
                            }
                            log.info("业务处理完成:RunningQueueListener,元素:{}", obj.toString());
                            completeQueue.offer(obj);
                        } catch (Exception e) {
                            log.error("业务处理发生异常,重置元素到RunningQueue队列中");
                            log.error(e.getMessage());
                            runningQueue.offer(obj);
                        }
                    }
                }
            }
        }).start();
    }


    public void test(){
        try {
            int i=1/0;
        } catch (Exception e) {
           throw  new RuntimeException("除数异常");
        }
    }


}
@Component
@Slf4j
public class CompleteQueueListener implements Listener{


    @Autowired
    private CompleteQueue completeQueue;
    @Override
    public void start() {
        new Thread(new Runnable() {
            @Override
            public void run() {
                while (true){
                    log.info("监听器进入阻塞:CompleteQueueListener");
                    Object obj = completeQueue.take();
                    if (ObjectUtil.isNotNull(obj)) {
                        try {
                            log.info("开始休眠1s模拟业务处理:CompleteQueueListener,元素:{}",obj.toString());
                            Thread.currentThread().sleep(1000);
                            log.info("业务处理完成:listener3,元素:{}",obj.toString());
                        } catch (InterruptedException e) {
                            log.error("业务处理发生异常,重置元素到CompleteQueue队列中");
                            log.error(e.getMessage());
                            completeQueue.offer(obj);
                        }
                       log.info("CompleteQueueListener任务结束,元素:{}",obj.toString());
                    }
                }
            }
        }).start();
    }
}

5、利用Springboot的扩展点ApplicationRunner,在项目启动完成后,分别启动BeforeQueueListener、RunningQueueListener、CompleteQueueListener,让三个监听器进入阻塞监听状态

@Component
public class MyRunner implements ApplicationRunner {
    @Autowired
    private ApplicationContext applicationContext;
    @Override
    public void run(ApplicationArguments args) throws Exception {
        Map<String, Listener> beansOfType = applicationContext.getBeansOfType(Listener.class);
        for (String s : beansOfType.keySet()) {
            Listener listener = beansOfType.get(s);
            listener.start();
        }


    }
}
图片

图片

一个比较有意思的问题

日志丢失的问题

三个任务队列分别有三个线程来进行阻塞监听,即如果任务队列中有任务元素,则取出进行处理;如果没有,则阻塞等待,主线程只负责把任务设置到任务队列中,出现的问题是:控制台的日志输出显示任务元素已经放置到第一个BeforeQueue中,按照预期的结果应该是,控制台的日志输出会显示,从BeforeQueue取出元素进行业务处理、以及业务处理的日志,然后放置到RunningQueue中,再从RunningQueue中取出进行业务处理,接着放置到CompleteQueue队列中,最后从CompleteQueue中取出进行业务处理,最后结束;实际情况是:总是缺少从BeforeQueue取出元素进行业务处理、以及业务处理的日志,其他的日志输出都很正常、执行结果也正常;

经过排查分析,最后找到了原因:

是logback线程安全问题, Logback 的大部分组件都是线程安全的,但某些特定的配置可能会导致线程安全问题。例如,如果你在同一个 Appender 中处理多个线程的日志事件,那么可能会出现线程安全问题,导致某些日志事件丢失。

问题原因找到了,其实解决方法也就找到,具体就是logback的异步日志,logback.xml配置如下:

<?xml versinotallow="1.0" encoding="UTF-8"?>
<configuration scan="true" scanPeriod="60 seconds" debug="false">
    <!-- 日志存放路径 -->
    <property name="log.path" value="logs/"/>
    <!-- 日志输出格式 -->
    <property name="console.log.pattern"
              value="%d{yyyy-MM-dd HH:mm:ss.SSS} %highlight(%-5level) %magenta(${PID:-}) - %green([%-21thread]) %cyan(%-35logger{30}) %msg%n"/>
    <!-- 控制台输出 -->
    <appender name="console" class="ch.qos.logback.core.ConsoleAppender">
        <encoder>
            <pattern>${console.log.pattern}</pattern>
            <charset>utf-8</charset>
        </encoder>
    </appender>
    <appender name="ASYNC" class="ch.qos.logback.classic.AsyncAppender">
        <queueSize>500</queueSize>
        <discardingThreshold>0</discardingThreshold>
        <neverBlock>true</neverBlock>
        <appender-ref ref="console" />
    </appender>
    <!--系统操作日志-->
    <root level="info">
        <appender-ref ref="ASYNC" />
    </root>
</configuration>

文章中展示了关键性代码,示例全部代码地址:https://gitcode.net/fox9916/redisson-demo.git


Recommend

  • 12
    • www.80shihua.com 3 years ago
    • Cache

    redis消息队列的使用

    redis消息队列的使用 很久之前就是知道redis可以作为消息队列使用,但是一直没有实际使用过,今天实际使用了一下,发现还是有一些内容需要注意的。 redis有5个常用数据结构,分别是string,list,set,hash,sorted set, 其中list就是用来作为...

  • 13

    使用 Redis 流实现消息队列¶ 本文摘录自《Redis使用手册》, 更多信息请见: RedisGuide.com 。 在介绍了 Red...

  • 6

    分布式系统中必备的一个中间件就是消息队列,通过消息队列我们能对服务间进行异步解耦、流量消峰、实现最终一致性。目前市面上已经有 RabbitMQ、RochetMQ、ActiveMQ、Kafka等,有人会问:“Redis 适合做消息队列么?”在回答这个问题...

  • 10

    延迟任务应用广泛,延迟任务典型应用场景有订单超时自动取消;支付回调重试。其中订单超时取消具有幂等性属性,无需考虑重复消费问题;支付回调重试需要考虑重复消费问题。 延迟任务具有如下特点:在未来的某个时间点执行;一...

  • 6

    基于线程池、消息队列和epoll模型实现并发服务器架构 ...

  • 11
    • yangjiahao106.github.io 2 years ago
    • Cache

    redis作为消息队列

    redis作为消息队列 发表于 2021-02-20...

  • 4

    Redis基于(List、PubSub、Stream、消费者组)实现消息队列,基于Stream结构实现异步秒杀下单 推荐 原创

  • 7
    • blog.51cto.com 1 year ago
    • Cache

    Redis之消息队列实现

    Redis之消息队列实现 精选 原创 ​​秒杀场景​​​​采用消息队列实现​​​​List实现消息队列...

  • 3

    转载请注明出处: 1.redis 用zset做消息队列如何处理消息积压 改变消费者的消费能力:     可以增加消费者的数量,或者优化消费者的消费能力,使其能够更快地处理消息。同时,可以根据消息队列中消息的数量,动...

  • 7
    • yanbin.blog 8 months ago
    • Cache

    使用 Redis 作为消息队列

    使用 Redis 作为消息队列 2024-01-02 | 阅读(7) 有好长一段时间没使用 Redis 了,之前用的都是 AWS 上的 Elastic Cache 的 Redis, 那时候还是用的版本还是 4 和 5。在新的项目由于觉得 Elastic Cache...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK