7

Kafka中指定副本为Leader的三种实现方式_kafka_石臻臻的杂货铺_InfoQ写作社区

 2 years ago
source link: https://xie.infoq.cn/article/c3b2f9acefa3ddbe6dd34c9fe
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

哈喽~大家好啊,我是彦祖😋

之前,我写过一篇文章叫做 Kafka如何修改分区Leader

就是因为在我们实际的运维过程中,需要指定某个副本为 ISR,但是呢 Kafka 中的 Leader 选举策略并不支持这个功能,所以需要我们自己来实现它。关于 Leader 选举策略,你可以看这篇文章Leader选举流程和4种选举策略

但是我们在之前的文章中,是留下了一个小尾巴-优化与改进

我们先简单的回顾一下之前的 2 种方案

方案一: 分区副本重分配 (低成本方案)

之前关于分区副本重分配 我已经写过很多文章了, 这里我就简单说一下;

一般分区副本重分配主要有三个流程

  1. 生成推荐的迁移 Json 文件

  2. 执行迁移 Json 文件

  3. 验证迁移流程是否完成

这里我们主要看第 2 步骤, 来看看迁移文件一般是什么样子的

{ "version": 1, "partitions": [{  "topic": "topic1",  "partition": 0,  "replicas": [0,1,2] }]}

这个迁移 Json 意思是, 把topic1的「0」号分区的副本分配成[0,1,2] ,也就是说 topic1-0号分区最终有 3 个副本分别在 {brokerId-0,brokerId-1,brokerId-2} ;

又根据Leader的选举策略得知,不管是什么策略的选择,都是按照 AR 的顺序来选的

修改 AR 顺序

AR: 副本的分配顺序

那么我们想要实现我们的需求是不是把这个 Json 文件 中的"replicas": [0,1,2]改一下就行了比如改成"replicas": [2,1,0],改完 Json 后执行,执行execute, 正式开始重分配流程!迁移完成之后, 就会发现,Leader 已经变成上面的第一个位置的副本「2」了

执行 Leader 选举

修改完 AR 顺序就结束了吗?

可以说是结束了,也可以说没有结束。

上面只是修改了 AR 的顺序, 但是没有执行 Leader 选举呀,这个时候 Leader 还是原来的,所以我们需要主动触发一下 Leader 选举

## 石臻臻的杂货铺## 微信: szzdzhp001sh bin/kafka-leader-election.sh --bootstrap-server xxxx:9090 --topic Topic1 --election-type PREFERRED --partition 0

这样就会立马切换成我们想要的 Leader 了。

也可以不主动触发,等 Controller 自动均衡。

如果你觉得主动触发这个很麻烦,那么没有关系,那就不执行,如果你开启了自动均衡策略的话,默认是开启的。

延伸: 自动均机制

当一个 broker 停止或崩溃时,这个 broker 中所有分区的 leader 将转移给其他副本。这意味着在默认情况下,当这个 broker 重新启动之后,它的所有分区都将仅作为 follower,不再用于客户端的读写操作。

为了避免这种不平衡,Kafka 有一个优先副本的概念。如果一个分区的副本列表是 1,5,9,节点 1 将优先作为其他两个副本 5 和 9 的 leader。

Controller 会有一个定时任务,定期执行优先副本选举,这样就不会导致负载不均衡和资源浪费,这就是 leader 的自动均衡机制

优点: 实现了需求, 不需要改源码,也没有额外的开发工作。

缺点: 操作比较复杂容易出错,需要先获取原先的分区分配数据,然后手动修改 Json 文件,这里比较容易出错,影响会比较大,当然这些都可以通过校验接口来做好限制, 最重要的一点是 副本重分配当前只能有一个任务!假如你当前有一个「副本重分配」的任务在,那么这里就不能够执行了。

方案二: 手动修改 AR 顺序(高成本方案)

  1. 从 zk 中获取/brokers/topics/{topic 名称}节点数据。

  2. 手动调整一下里面的顺序

  3. 将调整后的数据,重新覆盖掉之前的节点。

  4. 删除 zk 中的/Controller 节点,让它触发重新加载,并且同时触发 Leader 选举。

在这里插入图片描述

修改的时候请先用 get 获取数据,在那个基础上改,因为不同版本,里面的数据结构是不一样的,我们只需要改分区 AR 顺序就行了 "partitions":{"0":[0,1,2]}

## get zk 节点数据。get /szz1/brokers/topics/Topic2## zk中的修改命令set /szz1/brokers/topics/Topic2  {"version":2,"partitions":{"0":[0,1,2]},"adding_replicas":{},"removing_replicas":{}}

为什么要删除 Controller 的 zk 节点?

之所以删除 Controller 节点,是因为我们手动修改了 zk 节点数据之后,因为没有副本的新增,是不会触发 Controller 去更新 AR 内存的,就算你主动触发 Leader 选举,AR 还是以前的,并不会达到想要的效果。

删除 zk 中的/Controller 节点,会触发 Controller 重新选举,重新选举会重新加载所有元数据,所以我们刚刚加载的数据就会生效, 同时 Controller 重新加载也会触发 Leader 选举

简单代码当然上面功能,手动改起来麻烦,那么饿肯定是要集成到LogiKM 3.0中的咯;

优点: 实现了目标需求, 简单, 操作方便

缺点: 频繁的Controller重选举对生产环境来说会有一些影响;

方案三:修改源码(高级方案推荐)

我们方案二中的问题就是需要删除/Controller 节点发送重新选举,我们能不能不重新选举 Controller 也能生效呢?

如何让修改后的 AR 立即生效 ?

Controller 会监听每一个 topic 的节点/brokers/topics/{topic 名称}

KafkaController#processPartitionModifications

/*** 石臻臻的杂货铺* 微信:szzdzhp001* 省略部分代码**/ private def processPartitionModifications(topic: String): Unit = {    def restorePartitionReplicaAssignment(      topic: String,      newPartitionReplicaAssignment: Map[TopicPartition, ReplicaAssignment]    ): Unit = {         val partitionReplicaAssignment = zkClient.getFullReplicaAssignmentForTopics(immutable.Set(topic))    val partitionsToBeAdded = partitionReplicaAssignment.filter { case (topicPartition, _) =>      controllerContext.partitionReplicaAssignment(topicPartition).isEmpty    }    if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {    } else if (partitionsToBeAdded.nonEmpty) {      info(s"New partitions to be added $partitionsToBeAdded")      partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)      }      onNewPartitionCreation(partitionsToBeAdded.keySet)    }    }  }

这段代码省略了很多,我想让你看到的是

只有新增了副本,才会执行更新 Controller 的内存操作。

那么我们在这里面新增一段逻辑

新增逻辑:如果只是变更了 AR 的顺序,那么我们也更新一下内存。

来我们改一下源码

    // 1. 找到 AR 顺序有变更的 所有TopicPartition    val partitionsOrderChange = partitionReplicaAssignment.filter { case (topicPartition, _) =>      //这里自己写下过滤逻辑 把只是顺序变更的分区找出      true    }    if (topicDeletionManager.isTopicQueuedUpForDeletion(topic)) {      if (partitionsToBeAdded.nonEmpty) {      } else {      }    } else if (partitionsToBeAdded.nonEmpty) {      info(s"New partitions to be added $partitionsToBeAdded")      partitionsToBeAdded.foreach { case (topicPartition, assignedReplicas) =>        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)      }      onNewPartitionCreation(partitionsToBeAdded.keySet)    }else if (partitionsOrderChange.nonEmpty) {      // ② .在这里加个逻辑      info(s"OrderChange partitions to be updatecache $partitionsToBeAdded")      partitionsOrderChange.foreach { case (topicPartition, assignedReplicas) =>        controllerContext.updatePartitionFullReplicaAssignment(topicPartition, assignedReplicas)      }    }

改成这样之后,上面的流程就变成了

  1. 从 zk 中获取/brokers/topics/{topic 名称}节点数据。

  2. 手动调整一下里面的顺序

  3. 将调整后的数据,重新覆盖掉之前的节点。

  4. 手动执行一次,优先副本选举。

完美解决!

方案三 改了之后会对其他的流程有影响吗?

上面更改的方法,一般是在分区副本重分配或者新增分区的时候会触发。

上面新增的逻辑并不会对现有流程有影响,因为假设都是上面的场景的情况下,他们都是会主动更新内存的。

在我看来,这里的改动,完全可以向 kafka 社区提一个 Pr. 来“修复”这个问题。

因为提了这个 PR,对我们有收益,没有额外的开销!

欢迎留下你的看法,一起讨论!


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK