10

RabbitMQ的六种工作模式 - 農碼一生

 2 years ago
source link: https://www.cnblogs.com/wml-it/p/16536123.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RabbitMQ的六种工作模式

一、普通队列模式

1.  一个消费者,一个队列,一个消费者。
2.  消息产生消息放入队列,消息的消费者(consumer) 监听(while) 消息队列,如果队列中有消息,就消费掉,消息被拿走后,自动从队列中删除(隐患 消息可能没有被消费者正确处理,已经从队列中消失了,造成消息的丢失)应用场景:聊天(中间有一个过度的服务器;p端,c端)
image

22.cnblogs.com/blog/1913282/202207/1913282-20220730231124618-1368574550.png)

  • 获取RabbitMQ连接帮助类

    后面代码,这部分创建连接共用

     public class RabbitMQHelper
        {
            /// <summary>
            /// 获取RabbitMQ连接
            /// </summary>
            /// <returns></returns>
            public static IConnection GetConnection()
            {
                //实例化连接工厂
                var factory = new ConnectionFactory
                {
                    HostName = "127.0.0.1", //ip
                    Port = 5672, // 端口
                    UserName = "Admin", // 账户
                    Password = "Admin", // 密码
                    VirtualHost = "/"   // 虚拟主机
                };
    
                return factory.CreateConnection();
            }
        }
    
  • 生产者

    public class Send
    {
    
        public static void SendMessage()
        {
            string queueName = "normal";
    
            //1.创建链接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                // 2.创建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.声明队列
                    channel.QueueDeclare(queueName, false, false, false, null);
                    // 没有绑定交换机,怎么找到路由队列的呢?
                    for (int i = 1; i <= 30; i++)
                    {
                        //4.构建Byte消息数据包
                        string message =$"第{i}条消息";
                        var body = Encoding.UTF8.GetBytes(message);//消息以二进制形式传输
    
                        // 发送消息到rabbitmq,使用rabbitmq中默认提供交换机路由,默认的路由Key和队列名称完全一致
                        //5.发送数据包
                        channel.BasicPublish(exchange: "", routingKey: queueName, null, body);
                        Thread.Sleep(1000);//添加延迟
                        Console.WriteLine("生产:" + message);
                    }
                }
            }
    
        } 
    }
    
  • 消费者

    public class Receive
    {
        public static void ReceiveMessage()
        {
            // 消费者消费是队列中消息
            string queueName = "normal";
            //1.建立链接链接
            var connection = RabbitMQHelper.GetConnection();
            {
                //2.建立信道
                var channel = connection.CreateModel();
                {
                    //3.声明队列:如果你先启动是消费端就会异常
                    channel.QueueDeclare(queueName, false, false, false, null);
                    //4.创建一个消费者实例
                    var consumer = new EventingBasicConsumer(channel);
                    //5.绑定消息接收后的事件委托
                    consumer.Received +=(model, ea) => {
                        var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                        Thread.Sleep(1000);
                        Console.WriteLine(" Normal Received => {0}", message);
                    }; 
                    //6.启动消费者
                    channel.BasicConsume( queue: queueName, autoAck:true, consumer);//开始消费
                }
    
            }
    
        } 
    }
    

二、工作队列模式

  1. 一个消费者,一个队列,多个消费者。但多个消费者中只会有一个会成功地消费消息

  2. 消息产生者将消息放入队列消费者可以有多个,消费者1,消费者2,同时监听同一个队列,消息被消费?C1 C2共同争抢当前的消息队列内容,谁先拿到谁负责消费消息(隐患,高并发情况下,默认会产生某一个消息被多个消费者共同使用。

  3. 应用场景:红包;大项目中的资源调度(任务分配系统不需知道哪一个任务执行系统在空闲,直接将任务扔到消息队列中,空闲的系统自动争抢)

    image
  • 生产者

     public class WorkerSend
        {
    
            public static void SendMessage()
            {
                string queueName = "Worker_Queue";
    
                using (var connection = RabbitMQHelper.GetConnection())
                {
                    using(var channel = connection.CreateModel())
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        for (int i = 0; i < 30; i++)
                        {
                            string message = $"RabbitMQ Worker {i + 1} Message";
                            var body = Encoding.UTF8.GetBytes(message);
                            channel.BasicPublish("", queueName, null, body);
                            Console.WriteLine("send Task {0} message",i + 1);
                        }
                       
                    }
                }
                
            } 
        }
    
  • 消费者

      public class WorkerReceive
        {
            public static void ReceiveMessage()
            {
                string queueName = "Worker_Queue";
                var connection = RabbitMQHelper.GetConnection();
                {
                    var channel = connection.CreateModel();
                    {
                        channel.QueueDeclare(queueName, false, false, false, null);
                        var consumer = new EventingBasicConsumer(channel);
                        //设置prefetchCount : 1来告知RabbitMQ,在未收到消费端的消息确认时,不再分发消息,也就确保了当消费端处于忙碌状态时,不再分配任务。
                        channel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);
                        consumer.Received +=(model, ea) => {
                            var message = Encoding.UTF8.GetString(ea.Body.ToArray());
                            Console.WriteLine(" Worker Queue Received => {0}", message);
                        }; 
                        channel.BasicConsume(queueName,true, consumer);
                    }
                   
                }
              
            } 
        }
    

三、扇形队列模式(发布/订阅模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。每个消费队列中消息一致,且每个消息消费者都从自己的消息队列的第一个消息开始消费,直到最后。

  2. 交换机为rabbitMQ中内部组件。消息生产者将消息发送给rabbitMQ后,rabbitMQ会根据订阅的消费者个数,生成对应数目的消息队列,这样每个消费者都能获取生产者发送的全部消息。

  3. 一旦消费者断开与rabbitMQ的连接,队列就会消失。如果消费者数目很多,对于rabbitMQ而言,也是个重大负担,订阅模式是个长连接,占用并发数,且每个消费者一个队列会占用大量空间

  4. 相关应用场景:邮件群发,群聊,广播

    image
  • 生产者
 public static void SendMessage()
        {
            //1.创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.创建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.声明交换机对象
                    channel.ExchangeDeclare("fanout_exchange", "fanout");
                   
                    // 4.创建队列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.绑定到交互机
                    // fanout_exchange 绑定了 3个队列 
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");//指定交换机
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    for (int i = 0; i < 10; i++)
                    {
                        //6.构建消息byte数组
                        string message = $"RabbitMQ Fanout {i + 1} Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.发送消息
                        channel.BasicPublish("fanout_exchange", "", null, body);//同时把消息发送到订阅的三个队列
                        Console.WriteLine("Send Fanout {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消费者
 public class FanoutConsumer
    {
        public static void ConsumerMessage()
        {
            //1.创建连接
            var connection = RabbitMQHelper.GetConnection();
            {
                //2,。创建信道
                var channel = connection.CreateModel();
                {
                    //3.申明exchange
                    channel.ExchangeDeclare(exchange: "fanout_exchange", type: "fanout");
                    
                    // 4.创建队列
                    string queueName1 = "fanout_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "fanout_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "fanout_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    
                    // 5.绑定到交互机
                    channel.QueueBind(queue: queueName1, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName2, exchange: "fanout_exchange", routingKey: "");
                    channel.QueueBind(queue: queueName3, exchange: "fanout_exchange", routingKey: "");

                    Console.WriteLine("[*] Waitting for fanout logs.");

                    //6.申明consumer
                    var consumer = new EventingBasicConsumer(channel);
                    //绑定消息接收后的事件委托
                    consumer.Received += (model, ea) => {
                        var body = ea.Body;
                        var message = Encoding.UTF8.GetString(body.ToArray());
                        Console.WriteLine("[x] {0}", message);

                    };
                    //7.启动消费者
                    channel.BasicConsume(queue: queueName1, autoAck: true, consumer: consumer);//只会消费队列queueName1中的消息,其他队列中订阅的消息仍然存在
                    Console.WriteLine(" Press [enter] to exit.");
                    Console.ReadLine();
                }
            }
        }
    }

四、直接队列模式(Routing路由模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

  2. 消息生产者将消息发送给交换机,交换机按照路由判断,将路由到的RouteKey的消息,推送与之绑定的队列,交换机根据路由的key,只能匹配上路由key对应的消息队列,对应的消费者才能消费消息;

    image
  • 生产者:
public static void SendMessage()
        {
            //1.创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.创建信道
                using(var channel = connection.CreateModel())
                {
                    // 3.声明Direct交换机
                    channel.ExchangeDeclare("direct_exchange", "direct");

                    // 4.创建队列
                    string queueName1 = "direct_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "direct_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "direct_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);

                    // 5.绑定到交互机 指定routingKey
                    channel.QueueBind(queue: queueName1, exchange: "direct_exchange", routingKey: "red");
                    channel.QueueBind(queue: queueName2, exchange: "direct_exchange", routingKey: "yellow");
                    channel.QueueBind(queue: queueName3, exchange: "direct_exchange", routingKey: "green");

                    for (int i = 0; i < 10; i++)
                    {
                        string message = $"RabbitMQ Direct {i + 1} Message =>green";
                        var body = Encoding.UTF8.GetBytes(message);
                        // 发送消息的时候需要指定routingKey发送
                        channel.BasicPublish(exchange: "direct_exchange", routingKey: "green", null, body);//只发布到RouteKey:green的队列
                        Console.WriteLine("Send Direct {0} message",i + 1);
                    }
                }
            }
            
        } 
    }
  • 消费者
   public class DirectConsumer
    {
        public static void ConsumerMessage()
        {
            //1.创建连接
            var connection = RabbitMQHelper.GetConnection();
            //2.创建通信
            var channel = connection.CreateModel();
            //3.声明交换机
            channel.ExchangeDeclare(exchange: "direct_exchange", type: "direct");
            //4.绑定交换机
            var queueName = "direct_queue2";//队列direct_queue3绑定有red,yellow,green共3个RouteKey
            channel.QueueDeclare(queueName, false, false, false, null);
            //此处消费通信没有必要绑定所有的RouteKey,根据前生产者通信的路由规则,每个队列中只会路由到一种消息
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "red");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "yellow");
            channel.QueueBind(queue: queueName,
                                      exchange: "direct_exchange",
                                      routingKey: "green");

            Console.WriteLine(" [*] Waiting for messages.");

            //5.实例化消费者
            var consumer = new EventingBasicConsumer(channel);
            //6.为消费者绑定消费委托事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);

                // 消费完成后需要手动签收消息,如果不写该代码就容易导致重复消费问题
                //7.手动确认签收消息
                channel.BasicAck(ea.DeliveryTag, true); // 可以降低每次签收性能损耗
            };

            // 消息签收模式
            // 手动签收 保证正确消费,不会丢消息(基于客户端而已)
            // 自动签收 容易丢消息 
            // 签收:意味着消息从队列中删除
            channel.BasicConsume(queue: queueName,
                                 autoAck: false,
                                 consumer: consumer);//设置为不自动签收,进行手动签收

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }
    }

五、模糊匹配队列模式(Topic 主题模式)

  1. 一个消息生产者,一个交换机,多个队列,多个消息消费者。一个交换机绑定多个消息队列,每个消息队列都有自己唯一的Routekey,每一个消息队列有一个消费者监听。

  2. 此时的自己唯一的Routekey,不是一个确定值,像我们熟悉的正则表达式对应的匹配规则。

  3. 生产者产生消息,把消息交给交换机,交换机根据RouteKey的模糊匹配到对应的队列,由队列监听消费者消费消息。

  4. 和* 都是通配符,命名规则是多个单词用顿号(.)分隔开

    代表代表一个单词

    *代表多个单词

    image
  • 生产者:
      public static void SendMessage()
        {
            //1.创建连接
            using (var connection = RabbitMQHelper.GetConnection())
            {
                //2.创建信道
                using (var channel = connection.CreateModel())
                {
                    //3.声明交换机
                    channel.ExchangeDeclare("topic_exchange", "topic");
                    //4.声明队列
                    string queueName1 = "topic_queue1";
                    channel.QueueDeclare(queueName1, false, false, false, null);
                    string queueName2 = "topic_queue2";
                    channel.QueueDeclare(queueName2, false, false, false, null);
                    string queueName3 = "topic_queue3";
                    channel.QueueDeclare(queueName3, false, false, false, null);
                    //5.绑定到交互机
                    channel.QueueBind(queue: queueName1, exchange: "topic_exchange", routingKey: "user.data.*");
                    channel.QueueBind(queue: queueName2, exchange: "topic_exchange", routingKey: "user.data.delete");
                    channel.QueueBind(queue: queueName3, exchange: "topic_exchange", routingKey: "user.data.update");
                
                    for (int i = 0; i < 10; i++)
                    {
                        //6.准备发送字节数组
                        string message = $"RabbitMQ Topic {i + 1} Delete Message";
                        var body = Encoding.UTF8.GetBytes(message);
                        //7.根据RouteKey发布消息
                        channel.BasicPublish("topic_exchange", "user.data.delete", null, body);//会发布到queueName1,queueName2
                        Console.WriteLine("Send Topic {0} message", i + 1);
                    }
                }
            }

        }
  • 消费者:
 public static void ConsumerMessage()
        {
            //1.创建连接
            var connection = RabbitMQHelper.GetConnection();
            //2.创建通信
            var channel = connection.CreateModel();
            //3.声明交换机
            channel.ExchangeDeclare(exchange: "topic_exchange", type: "topic");
            //4.声明队列
            var queueName = "topic_queue3";
            channel.QueueDeclare(queueName, false, false, false, null);
            //5.绑定交换机
            channel.QueueBind(queue: queueName,
                                      exchange: "topic_exchange",
                                      routingKey: "user.data.*");

            Console.WriteLine(" [*] Waiting for messages.");
            //6.创建消费者
            var consumer = new EventingBasicConsumer(channel);
            //7.绑定消费委托事件
            consumer.Received += (model, ea) =>
            {
                var body = ea.Body;
                var message = Encoding.UTF8.GetString(body.ToArray());
                var routingKey = ea.RoutingKey;
                Console.WriteLine(" [x] Received '{0}':'{1}'", routingKey, message);
            };

            //8.启动消费
            channel.BasicConsume(queue: queueName,
                                 autoAck: true,
                                 consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        }

六、RPC 模式(了解)

RPC即客户端远程调用服务端的方法 ,使用MQ可以实现RPC的异步调用,基于Direct交换机实现,流程如下:

1、客户端即是生产者也是消费者,向RPC请求队列发送RPC调用消息,同时监听RPC响应队列。

2、服务端监听RPC请求队列的消息,收到消息后执行服务端的方法,得到方法返回的结果。

3、服务端将RPC方法 的结果发送到RPC响应队列。

4、客户端(RPC调用方)监听RPC响应队列,接收到RPC调用结果。

源码地址


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK