4

记一次Golang踩坑RabbitMQ

 2 years ago
source link: https://zacharyfan.com/archives/1547.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

记一次Golang踩坑RabbitMQ

这里是Z哥的个人公众号

每周五11:45 按时送达

当然了,也会时不时加个餐~

我的第「218」篇原创敬上

大家好,我是Z哥。

最近在项目中遇到了一个使用 RabbitMQ 时的问题,这个问题我觉得还是有一定普适性的,和大家分享一下,避免大家后续在同一个问题上犯错。

消息队列(MQ)是在软件开发中很常用的中间件,如果一个程序需要协调另一个程序进行数据的“write”操作,并且不关心“write”的结果,则便会选择它。它是一个保存消息(数据)的容器,由它来确保消息一定被送达到目标程序。

打个比喻来说,消息队列就是一个邮差,它负责将信件(消息)从源头送往目的地,并且根据信件重要性的不同,提供当面签收确认或者直接投放两种服务。

RabbitMQ 就是一个典型的消息队列,以 AMQP 为标准。历史也比较悠久,大概是从 2007 年研发出来的,用的编程语言Erlang也同样具有年代感。

需要简单介绍一下 Erlang 的特点,它对我们理解 RabbitMQ 有很大的帮助。

Erlang 是一种运行于“虚拟机”(类似 JVM)的解释性语言。是一个结构化,动态类型编程语言,内建并行计算支持。使用 Erlang 编写出的应用运行时通常由成千上万个轻量级“进程”(并非传统意义上的进程)组成,并通过消息传递相互通讯。进程间上下文切换对于 Erlang 来说仅仅 只是一两个环节,比起 C 程序的线程切换要高效得多得多了。
基于百度百科资料进行整理

不管是什么 MQ 中间件,作为消息的生产方和消费方都需要和 MQ 的服务端建立连接进行通讯。

一般这个连接都会使用 TCP 协议,在 RabbitMQ 里也不例外。大多数 RabbitMQ 的 SDK 都会将连接封装为一个「Connection」对象。

还没完,大多数的 MQ 中间件还会在「Connection」的基础上增加一个「Channel」的概念,以通过复用的方式提高 TCP 连接的利用率,因为建立和销毁 TCP 连接是非常昂贵的开销。在 RabbitMQ 中的复用 TCP 连接方式是「Non-blocking I/O」的模式。

关于NIO,「Non-blocking I/O」的概念,有感兴趣的话可以跳转去看之前写的这篇文章。(分布式系统关注点——阻塞与非阻塞有什么区别?)

多说一句,任何方案都不是“银弹”。当每个 Channel 的流量不是很大时,复用单一的 Connection 可以在产生性能瓶颈的情况下有效地节省 TCP 连接资源。但是 Channel 本身的流量很大时,这时候多个 Channel 复用一个 Connection 就会产生性能瓶颈,进而使整体的流量被限制了。此时就需要开辟多个 Connection,将这些 Channel 均摊到这些 Connection 中,至于哪些 Channel 使用那个 Connection 以及Connection 与 Channel 之间的数量关系是多少,需要根据业务自身的实际情况进行调节。

Channel 在 AMQP 中是一个很重要的概念,大多数操作都是在信道这个层面展开的。比如, channel.exchangeDeclare、channel.queueDeclare、channel.basicPublish、channel.basicConsume 等方法。RabbitMQ 相关的 API 与 AMQP 紧密相连,比如 channel.basicPublish 对应 AMQP 的 Basic.Publish 命令。

可能你要问了,Channel 是不是也能像 Connection 一样被复用?这是个好问题,也是我们这次遇到问题的关键点。

结论是:可以,但是需要自己保证客户端对 Channel 访问的线程安全问题,因为在 Channel 的另一端,在 RabbitMQ 的服务端,每个 Channel 由一个单独的“进程”所管理,如果由于多线程复用Channel 导致数据帧乱序了,RabbitMQ 的服务端会主动关闭整个 Connection 。

因此,我们这次犯的错误就是多线程复用了同一个 Channel 导致的问题。所以,如果你也用到 streadway/amqp 这个库的话,需要特别注意这点。

不过,不同语言的SDK内部实现不同,我们分别使用 Golang 的 AMQP 库 streadway/amqp,和 RabbitMQ 官方提供的 C# 版本的库分别模拟过同样的场景,前者出现问题,后者却没有问题。

受限于时间原因,没有具体去核实 C# 库的源码,主观猜测是 C# 库内部多做了一些对于单个 Channel 的线程安全处理。

最后,我整理了三点使用 streadway/amqp 库的最佳实践,你可以看看:

golang 中使用 streadway/amqp 时,需要保证每一个线程单独一个 Channel。
streadway/amqp 库中的获取一个 Channel 的方法「Connection.channel()」是线程安全的。但是内部有一个 defaultChannelMax 的参数对 Channel 的数量进行了限制,默认是 (2 << 10) – 1,2047。这个需要注意:

我们可以通过调用 amqp.DialConfig(url string, config Config) 来调整个限制。

但是,并不是你调整了多少就是多少,还需要和 RabbitMQ 服务端的配置进行 min() 函数的处理,最终为两者的最小值。
Tips:特别是用云厂商的 MQ 产品,因为阶梯收费的原因会对很多性能参数做限制,需要格外关注这点,比如某版本的阿里云 RabbitMQ 实例限制是单个 Connection 最多 64 个 Channel)

正如前面对 Erlang 的简单介绍,Erlang 是一个天然支持多“进程”设计的语言,所以在 RabbitMQ 的服务端设计中,每一个 Queue,每一个 Connection 都是单独的一个“进程”。因此如果你想尽可能地压榨 RabbitMQ 性能,可以通过建立更多的 Connection 或者创建更多的 Queue 来实现,当然需要注意到 Connection 的创建和销毁的性能开销问题。


原创文章,转载请注明本文链接: https://zacharyfan.com/archives/1547.html

关于作者:张帆(Zachary,个人微信号:Zachary-ZF)。坚持用心打磨每一篇高质量原创。欢迎扫描二维码~

微信公众号

定期发表原创内容:架构设计丨分布式系统丨产品丨运营丨一些思考。

如果你是初级程序员,想提升但不知道如何下手。又或者做程序员多年,陷入了一些瓶颈想拓宽一下视野。欢迎关注我的公众号「跨界架构师」,回复「技术」,送你一份我长期收集和整理的思维导图。

如果你是运营,面对不断变化的市场束手无策。又或者想了解主流的运营策略,以丰富自己的“仓库”。欢迎关注我的公众号「跨界架构师」,回复「运营」,送你一份我长期收集和整理的思维导图。

No related posts.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK