3

万字长文解析Kafka分区工作机制

 1 year ago
source link: https://www.51cto.com/article/720312.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

万字长文解析Kafka分区工作机制

作者:丁威 2022-10-10 08:35:17
分区的状态主要包括NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition四个状态,只有分区状态为OnlinePartition才能对外提供读与写。

Kafka的消息发送与消息消费与分区关联密切,我们从这篇文章开始讲点学习分区相关的知识,本篇文章将重点介绍分区内部的工作机制,即分区状态机运转机制。

1、Kafka分区状态

Kafka内部分区的运转机制具体实现为PartitionStateMachine,从这个类的注释上来看可以得知Kafka分区的状态共有四个,它们分别是:

  • NonExistentPartition 表示分区不存在,通常是该分区从未创建过或者创建后被删除。
  • NewPartition 分区已创建,即分配完成了副本,但还未进行分区Leader选举,即还不存在Leader分区与ISR集合,前一个有效状态为NonExistentPartition。
  • OnlinePartition 分区处于在线时的状态,表示已经完成了分区选举,成功选举出Leader,此时可以进行消息发送与消息消费,前一个有效状态为NewPartition/OfflinePartition。
  • OfflinePartition分区处于离线时状态,表示选举出来的Leader失效了,例如Leader所在的Broker宕机,前一个有效状态为NewPartition/OnlinePartition。

关于分区的状态变如下所示:

图片

2、Kafka分区状态机

接下来本文的行为思路,将会通过源码阅读的方式,深入PartitionStateMachine的实现细节,从而提炼出分区变更实现要点,帮助我们更好的运维kafka。

2.1 状态机启动流程

状态机的启动流程定义在PartitionStateMachine的startup方法,该方法的调用时机:一个新的Broker通过控制器选举成为新的Controller时会被调用。

该方法的声明如下:

图片

状态机的启动主要包括两个步骤:

  • 初始化分区的状态
  • 触发分区状态向OnlinePartition转换

接下来将详细探讨实现细节。

2.1.1 分区状态初始化

首先我们来看一下分区的初始化流程,具体代码如下所示:

图片

该方法的实现要点:

  • 在KafkaController中使用来ControllerContext用来在内存中存储与控制器相关的数据结构,其中Map[String, mutable.Map[Int, Seq[Int]]]  partitionReplicaAssignmentUnderlying存储了当前集群中所有的分区信息(主题名称、分区编号,副本数情况),既然是控制器重新选举,故需要重新初始化所有的分区。
  • 然后根据 Map[TopicPartition, LeaderIsrAndControllerEpoch] partitionLeadershipInfo中存储各个分区当前的运行时状态,这里分成三种情况:

如果partitionLeadershipInfo中并不存在主题分区的Leaer和ISR信息,驱动状态从NonExistentPartition转换为NewPartition。

如果partitionLeadershipInfo中存在主题分区的leader信息,但对应的Broker已经为下线状态,则驱动状态从NonExistentPartition转换为OfflinePartition。

如果partitionLeadershipInfo中存在主题分区的leader信息,但对应的Broker已经为下线状态,则将状态从NonExistentPartition先转换为OfflinePartition。

值得注意的是,调用changeStateTo方法改变分区的状态,仅仅只是在内存中更新状态,其具体实现如图所示:

图片

具体的做好是将需要更新的状态存储到Map[TopicPartition, PartitionState] 中。

2.1.2 分区状态运转机制

在内存中根据当前维护的LeaderAndISR信息后将状态存储到本地内存后,接下来就是将分区状态向Online状态转换,具体的代码实现见PartitionStateMachine的triggerOnlinePartitionStateChange方法,代码如下所示:

图片

该方法的实现要点是在内存缓存中(Map[TopicPartition, PartitionState] )挑选出状态处于OfflinePartition与NewPartition并且未被删除的分区,驱动状态机,调用handleStateChanges方法尝试向OnlinePartition分区转化。

图片

该方法主要做如下两件事情:

  • 调用PartitionStateMachine的doHandleStateChanges的方法,驱动分区状态机的转换。
  • 然后调用ControllerBrokerRequestBatch的sendRequestsToBrokers方法,实现元信息在其他Broker上的同步

要想清晰而全面的了解分区状态的变更,我还给出了Kafka中所有调用handleStateChanges的调用入口,在后续深入研究Kafka相关机制时会再次一一提及,调用链如下图所示:

图片

由于篇幅的问题,分区信息在其他Broker中的状态同步将在下一篇文章中介绍。

PartitionStateMachine的doHandleStateChanges方法在上一篇中已经详细介绍,尴尬,在Kafka生产实践中又出问题了 中详细介绍过,在这里我稍微总结提炼一下:

目标状态为NewPartition、OfflinePartition、NonExistentPartition 这三个状态并没有什么复杂的实现逻辑,只是更新内存中的状态,并在state-change.log文件中将输出状态变更日志,只有目标状态为OnlinePartition时才会详细的处理逻辑。

但或许你有一个疑问,状态变更为NewPartition,什么时候会向OnlinePartition状态转换呢?其实通过调用doHandleStateChanges将目标方法设置为NewPartition后,会紧接着调用triggerOnlinePartitionStateChange等方法,将状态进一步向OnlinePartition状态转化。

由于在尴尬,在Kafka生产实践中又出问题了 这篇文章中详细介绍了OfflinePartition向OnlinePartition的转化流程,故本篇文章就将重点放在了NewPartition状态向OnlinePartition的转化处理逻辑,其实也就是分区创建的流程,这块的代码入口如下所示:

图片

由于PartitionStateMachine的initializeLeaderAndIsrForPartitions方法比较长,接下来将分步讲解。

2.1.3 分区初始化流程

接下来我们详细探讨PartitionStateMachine的initializeLeaderAndIsrForPartitions方法。

Step1:首先获取所有分区对应的在线副本,Seq< Map< TopicPartition, Seq< Int>> > liveReplicasPerPartition 来表示,类比Java的数据结构为List< Map< TopicPartition, List< Interger> >,代码如下所示:

图片

在Kafka中创建一个主题时,kafka首先会根据集群节点的负载情况,根据主题的分区数、副本数,物理机架等信息,生成静态负载情况,存储在/brokers/topics/{topicName},其数据如下图所示:

图片

而liveReplicasPerPartition是在这个数据结构的基础上筛选出在线的broker,例如如果id为4的broker已下线,那么liveReplicasPerPartition中的值就可能如下所示:

["0":[0,1,2],"1":[1,2],"2":[2,0],"3":[0,1],"4":[0,2],"5":[1,0],"6":[0,2,1],"7":[1,0,2]]

Step2:如果一个分区所有预分配的分片都不在线,则打印错误日志,代码如下所示:

图片

Step3:为分区创建leaderIsrAndControllerEpoch信息,代码如下所示:

图片

这里的实现比较简单,值得注意的是初始化时分区的Leader则为ISR列表中的第一个分区。

Step4:将分区的状态信息 leaderIsrAndControllerEpoch(leader,isr,LeaderEpoch、ControllerEpoch)写入到zookeeper中,具体代码如下;

图片

具体就是在zookeeper中创建/broker/topics/{topicName}/partitions/{分区序号}/state,并将leaderIsrAndControllerEpoch写入到上述节点,具体效果如下图所示:

图片

Step5:对zookeeper写入结果进行处理,对应的代码如下所示:

图片

如果在zookeeper中创建成功,将leaderIsrAndControllerEpoch信息缓存到内存中(Map< TopicPartition, leaderIsrAndControllerEpoch>)中,并将信息放入到controllerBrokerRequestBatch,Kafka Broker控制将信息同步到集群的其他Broker上,同时会在state-change.log日志文件中记录状态成功变更日志;如果创建失败,则在state-change.log中输出对应的错误日志。

当然:为了尽量保证上述过程成功创建,Zookeeper的写入过程引入来重试机制来保证最终执行成功,除非一些类似AUTH_FAILED等不可恢复的异常。

分区的信息写入到zookeeper的/broker/topics/{topicName}/partitions/{分区序号}/state文件路径后,会再次调用changeTo方法,在内存中将分区的状态变更为OnlineParttion。

那在什么时候触发真正创建分区相关的文件夹呢?

原来在将分区信息写入到zookeeper指定文件后,由于Kafka Controller订阅了/broker/topics/{topicName}相关节点,故节点的创建会实时告知Kafka Controller,从而执行分区的选择,具体的代码如下所示:

图片

通过Zookeeper的事件监听机制,kafka就这样巧妙的实现了分区状态机的切换。

通过上面的学习,我们对分区的理解应该更加深刻了,从这里我们至少能得出如下结论:

分区的状态主要包括NonExistentPartition、NewPartition、OnlinePartition、OfflinePartition四个状态,只有分区状态为OnlinePartition才能对外提供读与写。

Kafka启动时,在选举好集群的控制器(Kafka Controller)后会启动分区状态机(PartitionStateMachine),Kafka会根据/brokers/topics/{topicName}/partitions/{partition_no}/state中的信息,驱动分区状态向OnlineParttion转换。

当新创建主题时,Kafka会根据当前集群的负载情况,主题需要创建的分区数量、副本数量,机架信息等,进行负载均衡,生成分区的意向leader,已经分区副本的分布情况,写入到/brokers/topics/{topicName}节点上,此时会触发PartitionModifications,从而触发分区创建流程,即从NewPartition向OnlineParttion转换。

责任编辑:武晓燕 来源: 中间件兴趣圈

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK