7

十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法

 3 years ago
source link: https://www.cnblogs.com/weskynet/p/14878337.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

十五、.net core(.NET 6)搭建RabbitMQ消息队列生产者和消费者的简单方法

 搭建RabbitMQ简单通用的直连方法

如果还没有MQ环境,可以参考上一篇的博客:

https://www.cnblogs.com/weskynet/p/14877932.html

接下来开始.net core操作Rabbitmq有关的内容。我打算使用比较简单的单机的direct直连模式,来演示一下有关操作,基本套路差不多。

首先,我在我的package包项目上面,添加对RabbitMQ.Client的引用:

在Common文件夹下,新建类库项目 Wsk.Core.RabbitMQ,并且引用package项目:

在启动项目下的appsettings配置文件里面,新增一个访问RabbitMQ的配置信息:

配置部分代码:

"MQ": [
    {
      "Host": "127.0.0.1", // MQ安装的实际服务器IP地址
      "Port": 5672, // 服务端口号
      "User": "wesky", // 用户名
      "Password": "wesky123", // 密码
      "ExchangeName": "WeskyExchange", // 设定一个Exchange名称,
      "Durable": true // 是否启用持久化
    }
  ]

然后,在实体类项目下,新建实体类MqConfigInfo,用于把读取的配置信息赋值到该实体类下:

实体类代码:

ContractedBlock.gifExpandedBlockStart.gif

View Code

在刚刚新建的RabbitMQ类库项目下面,引用该实体类库项目,以及APppSettings项目。然后新建一个类,叫做ReadMqConfigHelper,以及它的interface接口,并且提供一个方法,叫ReadMqConfig,用来进行读取配置信息使用:

读取配置信息类代码:

ContractedBlock.gifExpandedBlockStart.gif

View Code

接着,新建类MqConnectionHelper以及接口IMqConnectionHelper,用于做MQ连接、创建生产者和消费者等有关操作:

然后,新增一系列创建连接所需要的静态变量:

然后,设置两个消费者队列,用来测试。以及添加生产者连接有关的配置和操作:

然后,创建消费者连接方法:

其中,StartListener下面提供了事件,用于手动确认消息接收。如果设置为自动,有可能导致消息丢失:

然后,添加消息发布方法:

在interface接口里面,添加有关的接口,用于等下依赖注入使用:

连接类部分的代码:

ContractedBlock.gifExpandedBlockStart.gif

View Code

现在,我把整个Wsk.Core.RabbitMQ项目进行添加到依赖注入:

然后,在启动项目里面的初始化服务里面,添加对MQ连接的初始化以及连接,并且发送两条消息进行测试:

启用程序,提示发送成功:

打开RabbitMQ页面客户端,可以看见新增了一个交换机WeskyExchange:

点进去可以看见对应的流量走势:

关闭程序,现在添加消费者的初始化和连接,然后重新发送:

可见发送消息成功,并且消费者也成功接收到了消息。打开客户端查看一下:

在WeskyExchange交换机下,多了两个队列,以及队列归属的RoutingKey分别是WeskyNet001和WeskyNet002。以及在Queue目录下,多了两个队列的监控信息:

为了看出点效果,我们批量发消息试一下:

然后启动项目,我们看一下监控效果。先是交换机页面的监控:

然后是队列1的监控:

现在换一种写法,在消费者那边加个延迟:

并且生产者的延迟解除:

再启动一下看看效果:

会发现队列消息被堵塞,必须在执行完成以后,才可以解锁。而且生产者这边并不需要等待,可以看见消息一次性全发出去了,可以继续执行后续操作:

以上就是关于使用Direct模式进行RabbitMQ收发消息的内容,发送消息可以在其他类里面或者方法里面,直接通过静态方法进行发送;接收消息,启动了监听,就可以一直存活。如果有兴趣,也可以自己尝试Fanout、Topic等不同的模式进行测试,以及可以根据不同的机器,进行配置成收发到不同服务器上面进行通信。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK