8

下一代消息队列pulsar到底是什么?

 3 years ago
source link: https://mp.weixin.qq.com/s?__biz=MzA5Mjg2MDQ5NQ%3D%3D&%3Bmid=2452509916&%3Bidx=1&%3Bsn=585dc9b9ac9a3d5c6c62200c415beb71
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

背景

之前琢磨了很久一直想写一篇pulsar相关的文章,但是一直知识储备不够,对于很多细节还是不了解,于是查了很多资料,总算是可以凑出一篇文章了。

Pulsar是一个由yahoo公司于2016年开源的消息中间件,2018年成为Apache的顶级项目。在我之前的文章中写过很多其他消息中间件的文章,比如kafka,rocketmq等等,如果大家对于消息队列不了解的可以阅读以下我之前的文章:

在开源的业界已经有这么多消息队列中间件了,pulsar作为一个新势力到底有什么优点呢?pulsar自从出身就不断的再和其他的消息队列(kafka,rocketmq等等)做比较,但是Pulsar的设计思想和大多数的消息队列中间件都不同,具备了高吞吐,低延迟,计算存储分离,多租户,异地复制等功能,所以pulsar也被誉为下一代消息队列中间件,接下来我会一一对其进行详细的解析。

pulsar架构原理

MJVRRvy.png!mobileMjUzEvn.png!mobile

整体的架构和其他的消息队列中间件差别不是太大,相信大家也看到了很多熟悉的名词,接下来会给大家一一解释这些名词的含义。

名词解释

  • Producer:消息生产者,将消息发送到broker。

  • Consumer:消息消费者,从Broker读取消息到客户端,进行消费处理。

  • Broker: 可以看作是pulsar的server,Producer和Consumer都看作是client.消息处理的节点,pulsar的Broker和其他消息中间件的都不一样,他是无状态的没有存储,所以可以无限制的扩展,这个后面也会详解讲到。

  • Bookie: 负责所有消息的持久化,这里采用的是Apache Bookeeper。

  • ZK: 和kafka一样pulsar也是使用zk保存一些元数据,比如配置管理,topic分配,租户等等。

  • Service Discovery:可以理解为Pulsar中的nginx,只用一个url就可以和整个broker进行打交道,当然也可以使用自己的服务发现。客户端发出的读取,更新或删除主题的初始请求将发送给可能不是处理该主题的 broker 。如果这个 broker 不能处理该主题的请求,broker 将会把该请求重定向到可以处理主题请求的 broker。

不论是kafka,rocketmq还是我们的pulsar其实作为消息队列中间件最为重要的大概就是分为三个部分:

  • Producer是如何生产消息,发送到对应的Broker

  • Broker是如何处理消息,将高效的持久化以及查询

  • Consumer是如何进行消费消息

而我们后面也会围绕着这三个部分进行展开讲解。

Producer生产消息

先简单看一下如何用代码进行消息发送:

PulsarClient client = PulsarClient.create("pulsar://pulsar.us-west.example.com:6650");

Producer producer = client.createProducer(
                "persistent://sample/standalone/ns1/my-topic");

// Publish 10 messages to the topic
for (int i = 0; i < 10; i++) {
    producer.send("my-message".getBytes());
}
  • Step1: 首先使用我们的url创建一个client这个url是我们service discovery的地址,如果我们使用单机模式可以进行直连

  • Step2:我们传入了一个类似url的参数,我们只需要传递这个就能指定我们到底在哪个topic或者namespace下面创建的:

url的格式为:{persistent|non-persistent}://tenant/namespace/topic

组成 含义 persistent/non-persistent Pulsar 提供持久化、非持久化两种主题,如果选择的是非持久化主题的话,所有消息都在内存中保存,如果broker重启,消息将会全部丢失。如果选择的是持久化主题,所有消息都会持久化到磁盘,重启broker,消息也可以正常消费。 tenant 顾名思义就是租户,pulsar最开始在雅虎内部是作为全公司使用的中间件使用的,需要给topic指定一些层级,租户就是其中一层,比如这个可以是一个大的部门,例如电商中台租户。 namespace 命名空间,可以看作是第二层的层级,比如电商中台下的订单业务组 topic 消息队列名字
  • Step3: 调用send方法发送消息,这里也提供了sendAsync方法支持异步发送。

上面三个步骤中,步骤1,2属于我们准备阶段,用于构建客户端,构建Producer,我们真的核心逻辑在send中,那这里我先提几个小问题,大家可以先想想在其他消息队列中是怎么做的,然后再对比pulsar的看一下:

  • 我们调用了send之后是会立即发送吗?

  • 如果是多partition,怎么找到我应该发送到哪个Broker呢?

发送模式

我们上面说了send分为async和sync两种模式,但实际上在pulsar内部sync模式也是采用的async模式,在sync模式下模拟回调阻塞,达到同步的效果,这个在kafka中也是采用的这个模式,但是在rocketmq中,所有的send都是真正的同步,都会直接请求到broker。

基于这个模式,在pulsar和kafka中都支持批量发送,在rocketmq中是直接发送,批量发送有什么好处呢?当我们发送的TPS特别高的时候,如果每次发送都直接和broker直连,可能会做很多的重复工作,比如压缩,鉴权,创建链接等等。比如我们发送1000条消息,那么可能会做1000次这个重复的工作,如果是批量发送的话这1000条消息合并成一次请求,相对来说压缩,鉴权这些工作就只需要做一次。

IFRfqur.png!mobile

有同学可能会问,批量发送会不会导致发送的时间会有一定的延误?这个其实不需要担心,在pulsar中默认定时每隔1ms发送一次batch,或者当batchsize默认到了1000都会进行发送,这个发送的频率都还是很快的。

发送负载均衡

在消息队列中通常会将topic进行水平扩展,在pulsar和kafka中叫做partition,在rocketmq中叫做queue,本质上都是分区,我们可以将不同分区落在不同的broker上,达到我们水平扩展的效果。

在我们发送的时候可以自己制定选择partition的策略,也可以使用它默认轮训partition策略。当我们选择了partition之后,我们怎么确定哪一个partition对应哪一个broker呢?

可以先看看下面这个图:

nInam2J.png!mobile
  • Step1: 我们所有的信息分区映射信息在zk和broker的缓存中都有进行存储。

  • Step2: 我们通过查询broker,可以获取到分区和broker的关系,并且定时更新。

  • Step3: 在pulsar中每个分区在发送端的时候都被抽象成为一个单独的Producer,这个和kafka,rocketmq都不一样,在kafka里面大概就是选择了partition之后然后再去找partition对应的broker地址,然后进行发送。pulsar将每一个partition都封装成Producer,在代码实现上就不需要去关注他具体对应的是哪个broker,所有的逻辑都在producer这个代码里面,整体来说比较干净。

EVNvUj3.png!mobile

压缩消息

消息压缩是优化信息传输的手段之一,我们通常看见一些大型文件都会是以一个压缩包的形式提供下载,在我们消息队列中我们也可以用这种思想,我们将一个batch的消息,比如有1000条可能有1M的传输大小,但是经过压缩之后可能就只会有几十kb,增加了我们和broker的传输效率,但是与之同时我们的cpu也带来了损耗。Pulsar客户端支持多种压缩类型,如 lz4、zlib、zstd、snappy 等。

client.newProducer()
    .topic(“test-topic”)
    .compressionType(CompressionType.LZ4)
    .create();

Broker

接下来我们来说说第二个比较重要的部分 Broker ,在Broker的设计中pulsar和其他所有的消息队列差别比较大,而正是因为这个差别也成为了他的特点。

计算和存储分离

首先我们来说说他最大的特点:计算和存储分离。我们在开始的说过Pulsar是下一代消息队列,就非常得益于他这个架构设计,无论是kafka还是RocketMQ,所有的计算和存储都放在同一个机器上,这个模式有几个弊端:

  • 扩展困难:当我们需要扩展的集群的时候,我们通常是因为cpu或者磁盘其中一个原因影响,但是我们却要申请一个可能cpu和磁盘配置都很好的机器,造成了资源浪费。并且kafka这种进行扩展,还需要进行迁移数据,过程十分繁杂。

  • 负载不均衡:当某些partion数据特别多的时候,会导致broker负载不均衡,如下面图,如果某个partition数据特别多,那么就会导致某个broker(轮船)承载过多的数据,但是另外的broker可能又比较空闲

    RFniEfM.png!mobile

pulsar计算分离架构能够非常好的解决这个问题:

  • 对于计算:也就是我们的broker,提供消息队列的读写,不存储任何数据,无状态对于我们扩展非常友好,只要你机器足够,就能随便上。扩容Broker往往适用于增加Consumer的吞吐,当我们有一些大流量的业务或者活动,比如电商大促,可以提前进行broker的扩容。

  • 对于存储:也就是我们的bookie,只提供消息队列的存储,如果对消息量有要求的,我们可以扩容bookie,并且我们不需要迁移数据,扩容十分方便。

消息存储

名词解析:

ieINrqj.png!mobile

上图是bookie的读写架构图,里面有一些名词需要先介绍一下:

  • Entry,Entry是存储到bookkeeper中的一条记录,其中包含Entry ID,记录实体等。

  • Ledger,可以认为ledger是用来存储Entry的,多个Entry序列组成一个ledger。

  • Journal,其实就是bookkeeper的WAL(write ahead log),用于存bookkeeper的事务日志,journal文件有一个最大大小,达到这个大小后会新起一个journal文件。

  • Entry log,存储Entry的文件,ledger是一个逻辑上的概念,entry会先按ledger聚合,然后写入entry log文件中。同样,entry log会有一个最大值,达到最大值后会新起一个新的entry log文件

  • Index file,ledger的索引文件,ledger中的entry被写入到了entry log文件中,索引文件用于entry log文件中每一个ledger做索引,记录每个ledger在entry log中的存储位置以及数据在entry log文件中的长度。

  • MetaData Storage,元数据存储,是用于存储bookie相关的元数据,比如bookie上有哪些ledger,bookkeeper目前使用的是zk存储,所以在部署bookkeeper前,要先有zk集群。

    Ef2EJjb.png!mobile

整体架构上的写流程:

  • Step1: broker发起写请求,首先对Journal磁盘写入WAL,熟悉mysql的朋友知道redolog,journal和redolog作用一样都是用于恢复没有持久化的数据。

  • Step2: 然后再将数据写入index和ledger,这里为了保持性能不会直接写盘,而是写pagecache,然后异步刷盘。

  • Step3: 对写入进行ack。

读流程为:

  • Step1: 先读取index,当然也是先读取cache,再走disk。

  • Step2: 获取到index之后,根据index去entry logger中去对应的数据

如何高效读写?

在kafka中当我们的topic变多了之后,由于kafka一个topic一个文件,就会导致我们的磁盘IO从顺序写变成随机写。在rocketMq中虽然将多个topic对应一个写入文件,让写入变成了顺序写,但是我们的读取很容易导致我们的pagecache被各种覆盖刷新,这对于我们的IO的影响是非常大的。所以pulsar在读写两个方面针对这些问题都做了很多优化:

  • 写流程:顺序写 + pagecache。在写流程中我们的所有的文件都是独立磁盘,并且同步刷盘的只有Journal,Journal是顺序写一个journal-wal文件,顺序写效率非常高。ledger和index虽然都会存在多个文件,但是我们只会写入pagecache,异步刷盘,所以随机写不会影响我们的性能。

  • 读流程:broker cache + bookie cache,在pulsar中对于追尾读(tailing read)非常友好基本不会走io,一般情况下我们的consumer是会立即去拿producer发送的消息的,所以这部分在持久化之后依然在broker中作为cache存在,当然就算broker没有cache(比如broker是新建的),我们的bookie也会在memtable中有自己的cache,通过多重cache减少读流程走io。

我们可以发现在最理想的情况下读写的io是完全隔离开来的,所以在Pulsar中能很容易就支持百万级topic,而在我们的kafka和rocketmq中这个是非常困难的。

无限流式存储

一个Topic实际上是一个ledgers流(Segment),通过这个设计所以Pulsar他并不是一个单纯的消息队列系统,他也可以代替流式系统,所以他也叫流原生平台,可以替代flink等系统。

reYfuqj.png!mobile

可以看见我们的Event Stream(topic/partition),由多个Segment存储组成,而每个segment由entry组成,这个可以看作是我们每批发送的消息通常会看作是一个entry。

Segment可以看作是我们写入文件的一个基本维度,同一个Segment的数据会写在同一个文件上面,不同Segment将会是不同文件,而Segment之间的在metadata中进行保存。

yMv22iy.png!mobile

分层存储

在kafka和rocketmq中消息是会有一定的保存时间的,因为磁盘会有空间限制,在pulsar中也提供这个功能,但是如果你想让自己的消息永久存储,那么可以使用分级存储,我们可以将一些比较老的数据,定时的刷新到廉价的存储中,比如s3,那么我们就可以无限存储我们的消息队列了。

EBbIj2I.png!mobile

数据复制

在pulsar中的数据复制和kafka,rocketmq都有很大的不同,在其他消息队列中通常是其他副本主动同步,通常这个时间就会变得不可预测,而在pulsar采用了类似qurom协议,给一组可用的bookie池,然后并发的写入其中的一部分bookie,只要返回部分成功(通常大于1/2)就好。

Az2QNrq.png!mobile
  • Ensemble Size(E)决定给定 ledger 可用的 bookie 池大小。

  • Write Quorum Size(Qw)指定 Pulsar 向其中写入 entry 的 bookie 数量。

  • Ack Quorum Size(Qa)指定必须 ack 写入的 bookie 数量。

采用这种并发写的方式,会更加高效的进行数据复制,尤其是当数据副本比较多的时候。

Consumer

接下来我们来聊聊pulsar中最后一个比较重要的组成 consumer

订阅模式

订阅模式是用来定义我们的消息如何分配给不同的消费者,不同消息队列中间件都有自己的订阅模式,一般我们常见的订阅模式有:

  • 集群模式:一条消息只能被一个集群内的消费者所消费。

  • 广播模式:一条消息能被集群内所有的消费者消费。

在pulsar中提供了4种订阅模式,分别是独占,灾备,共享,键共享:

NzaMJjV.png!mobile
  • 独占:顾名思义只能由一个消费者独占,如果同一个集群内有第二个消费者去注册,第二个就会失败,这个适用于全局有序的消息。

  • 灾备:加强版独占,如果独占的那个挂了,会自动的切换到另外一个好的消费者,但是还是只能由一个独占。

  • 共享模式:这个模式看起来有点像集群模式,一条消息也是只能被一个集群内消费者消费,但是和rocketmq不同的是,rocketmq是以partition维度,同一个Partition的数据都会被发到一个机器上。在Pulsar中消费不会以partition维度,而是轮训所有消费者进行消息发送。这有个什么好处呢?如果你有100台机器,但是你只有10个partition其实你只有10台消费者能运转,但是在pulsar中100台机器都可以进行消费处理。

  • 键共享:类似上面说的partition维度去发送,在rocketmq中同一个key的顺序消息都会被发送到一个partition,但是这里不会有partition维度,而只是按照key的hash去分配到固定的consumer,也解决了消费者能力限制于partition个数问题。

消息获取模式

不论是在kafka还是在rocketmq中我们都是client定时轮训我们的broker获取消息,这种模式叫做长轮训(Long-Polling)模式。这种模式有一个缺点网络开销比较大,我们来计算一下consumer被消费的时延,我们假设broker和consumer之间的一次网络延时为R,那么我们总共的时间为:

  • 当某一条消息A刚到broker的,这个时候long-polling刚好打包完数据返回,broker返回到consumer这个时间为R。

  • consumer又再次发送request请求,这个又为R。

  • 将我们的消息A返回给consumer这里又为R。

如果只考虑网络时延,我们可以看见我们这条消息的消费时延大概是3R,所以我们必须想点什么对其进行一些优化,有同学可能马上就能想到,我们消息来了直接推送给我们的consumer不就对了,这下我们的时延只会有一次R,这个就是我们常见的推模式,但是简单的推模式是有问题的,如果我们有生产速度远远大于消费速度,那么推送的消息肯定会干爆我们的内存,这个就是背压。那么我们怎么解决背压呢?我们就可以优化推送方式,将其变为动态推送,我们结合Long-polling,在long-polling请求时将Buffer剩余空间告知给Broker,由Broker负责推送数据。此时Broker知道最多可以推送多少条数据,那么就可以控制推送行为,不至于冲垮Consumer。

举个例子:

Consumer发起请求时Buffer剩余容量为100,Broker每次最多返回32条消息,那么Consumer的这次long-polling请求Broker将在执行3次push(共push96条消息)之后返回response给Consumer(response包含4条消息)。

如果采用long-polling模型,Consumer每发送一次请求Broker执行一次响应,这个例子需要进行4次long-polling交互(共4个request和4个response,8次网络操作;Dynamic Push/Pull中是1个request,三次push和一个response,共5次网络操作)。

所以pulsar就采用了这种消息获取模式,从consumer层进一步优化消息达到时间。我觉得这个设计非常巧妙,很多中间件的这种long-polling模式都可以参考这种思想去做一个改善。

总结

Apache Pulsar很多设计思想都和其他中间件不一样,但无疑于其更加贴近于未来,大胆预测一下其他的一些消息中间件未来的发展也都会向其靠拢,目前国内的Pulsar使用者也是越来越多,腾讯云提供了pulsar的云版本TDMQ,当然还有一些其他的知名公司华为,知乎,虎牙等等有都在对其做一个逐步的尝试,我相信pulsar真的是一个趋势。最后也让我想起了最近大江大河大结局的一句话:

所有的变化,都可能伴随着痛苦和弯路,开放的道路,也不会是阔野坦途,但大江大河,奔涌向前的趋势,不是任何险滩暗礁,能够阻挡的。道之所在,虽千万人吾往矣。

我在这里其实只说了一些大概,更多的一些细节,大家可以看一下下面的学习参考资料吧:

  • 首先大家可以去看看pulsar的官网的文档,首先了解一个大概。

  • 大家也可以关注pulsar的公众号,每天都会发一些pulsar相关的文章,我觉得写得非常好。

  • 可以去B站搜索TGIP,这个是pulsar每周都会由一个项目组的成员去讲相关的资料,如果想学习可以看看这个视频。

  • push or pull?: https://www.cnblogs.com/hzmark/p/mq_push_pull.html

  • 架构决策之消息中间件-Pulsar:https://blog.csdn.net/tcy83/article/details/106731392

如果大家觉得这篇文章对你有帮助,你的关注和转发是对我最大的支持,O(∩_∩)O:

RbARn2R.jpg!mobile

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK