12

Kafka的扩容和缩容

 3 years ago
source link: https://niyanchun.com/expand-or-decommissioning-kafka-cluster.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Kafka的扩容和缩容

2020-10-29 技术 Kafka 170次阅读

本文讨论Kafka的扩缩容以及故障后如何“补齐”分区,本文的操作基于Kafka 2.6版本。

扩容也就是新增节点,扩容后老的数据不会自动迁移,只有新创建的topic才可能会分配到新增的节点上面。如果我们不需要迁移旧数据,那直接把新的节点启动起来就行了,不需要做额外的操作。但有的时候,新增节点后,我们会将一些老数据迁移到新的节点上,以达到负载均衡的目的,这个时候就需要手动操作了。Kafka提供了一个脚本(在bin目录下):kafka-reassign-partitions.sh,通过这个脚本可以重新分配分区的分布。脚本的使用比较简单,提供一个JSON格式的分配方案,然后传给脚本,脚本根据我们的分配方案重新进行平衡。

举个例子,假如现在集群有181、182两个broker,上面有4个topic:test-1,test-2,test-3,test-4,这些topic都有4个分区,2个副本,如下:

# 两个broker
[zk: localhost:2181(CONNECTED) 0] ls /kafka_26/brokers/ids
[181, 182]

# 4个topic
➜ bin/kafka-topics.sh --list --zookeeper localhost:2181/kafka_26
__consumer_offsets
test-1
test-1
test-3
test-4
# test-1
➜  bin/kafka-topics.sh --describe --topic test-1  --zookeeper localhost:2181/kafka_26
Topic: test-1   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-1   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-1   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-1   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-1   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-2
➜  bin/kafka-topics.sh --describe --topic test-2  --zookeeper localhost:2181/kafka_26
Topic: test-2   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-2   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-2   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-2   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-2   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-3
➜  bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-3   Partition: 1    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-3   Partition: 2    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-3   Partition: 3    Leader: 182     Replicas: 182,181       Isr: 182,181
# test-4
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-4   Partition: 1    Leader: 181     Replicas: 181,182       Isr: 181,182
        Topic: test-4   Partition: 2    Leader: 182     Replicas: 182,181       Isr: 182,181
        Topic: test-4   Partition: 3    Leader: 181     Replicas: 181,182       Isr: 181,182

现在扩容了,新增了两个节点:183和184。扩容后,我们想要把test-3,test-4迁移到183,184上面去。

首先我们可以准备如下JSON格式的文件(假设文件名为topics-to-move.json):

{
    "topics": [
        {
            "topic": "test-3"
        },
        {
            "topic": "test-4"
        }
    ],
    "version": 1
}

里面写明想要重新分配的topic。然后执行如下命令:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --topics-to-move-json-file topics-to-move.json --broker-list "183,184" --generate
# 当前分区的分布情况
Current partition replica assignment
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}
# 建议的分区分布情况
Proposed partition reassignment configuration
{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[183,184],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[184,183],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[183,184],"log_dirs":["any","any"]}]}

可以看到上面的命令会列出当前分区的分布情况,并且会给出一个建议的新分区分配方案,都是JSON格式的,内容也很简单。然后我们将建议的分配方案保存为一个文件(假设文件名为expand-cluster-reassignment.json),当然我们也可以手动修改这个方案,只要格式正确即可。然后执行下面命令使用新的方案进行分区重分配:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
Current partition replica assignment

{"version":1,"partitions":[{"topic":"test-3","partition":0,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":1,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-3","partition":2,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-3","partition":3,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":0,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":1,"replicas":[181,182],"log_dirs":["any","any"]},{"topic":"test-4","partition":2,"replicas":[182,181],"log_dirs":["any","any"]},{"topic":"test-4","partition":3,"replicas":[181,182],"log_dirs":["any","any"]}]}

Save this to use as the --reassignment-json-file option during rollback
Successfully started partition reassignments for test-3-0,test-3-1,test-3-2,test-3-3,test-4-0,test-4-1,test-4-2,test-4-3

这样就提交了重分配的任务,可以使用下面的命令查看任务的执行状态:

➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify
Status of partition reassignment:
Reassignment of partition test-3-0 is complete.
Reassignment of partition test-3-1 is complete.
Reassignment of partition test-3-2 is complete.
Reassignment of partition test-3-3 is complete.
Reassignment of partition test-4-0 is complete.
Reassignment of partition test-4-1 is complete.
Reassignment of partition test-4-2 is complete.
Reassignment of partition test-4-3 is complete.

Clearing broker-level throttles on brokers 181,182,183,184
Clearing topic-level throttles on topics test-3,test-4

完成后,我们检查一下新的test-3和test-4的分区分配情况:

➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-3   Partition: 1    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-3   Partition: 3    Leader: 184     Replicas: 184,183       Isr: 183,184
        
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,184       Isr: 183,184
        Topic: test-4   Partition: 2    Leader: 184     Replicas: 184,183       Isr: 183,184
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,184       Isr: 184,183

可以看到,这两个topic的数据已经全部分配到183和184节点上了。

缩容和故障场景

从上面可以看到,其实数据分配完全是由我们自己把控的,缩容也只是数据迁移而已,只需要提供正确的迁移方案即可。一般生产环境很少有缩容的,但有一个场景比较常见,就是某个节点故障了,且无法恢复。以前的文章提到过,节点故障后,这个节点上的分区就丢了,Kafka不会自动在其它可用节点上重新创建一个副本,这个时候就需要我们自己手动在其他可用节点创建副本,原理和扩容是一样的。接着上面的例子,比如现在184节点故障了,且无法恢复了,而test-3和test-4有部分分区是在该节点上面的,自然也就丢了:

# 节点挂了,zk中的节点已经没了
[zk: localhost:2181(CONNECTED) 15] ls /kafka_26/brokers/ids
[181, 182, 183]

# 可以看到ISR中已经没有184了
➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26
Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-3   Partition: 1    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-3   Partition: 3    Leader: 183     Replicas: 184,183       Isr: 183
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26
Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,184       Isr: 183
        Topic: test-4   Partition: 2    Leader: 183     Replicas: 184,183       Isr: 183
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,184       Isr: 183

这个时候,我们准备把test-3原来在184上的分区分配到181上面去,把test-4在184上的分区分配到182上去,那分配方案就是下面这样的:

➜ cat expand-cluster-reassignment.json
{
  "version": 1,
  "partitions": [
    {
      "topic": "test-3",
      "partition": 0,
      "replicas": [183, 181],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 1,
      "replicas": [181, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 2,
      "replicas": [183, 181],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-3",
      "partition": 3,
      "replicas": [181, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 0,
      "replicas": [182, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 1,
      "replicas": [183, 182],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 2,
      "replicas": [182, 183],
      "log_dirs": ["any", "any"]
    },
    {
      "topic": "test-4",
      "partition": 3,
      "replicas": [183, 182],
      "log_dirs": ["any", "any"]
    }
  ]
}

然后执行分配方案即可:

# 执行分配方案
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --execute
# 输出略

# 查看进度
➜ bin/kafka-reassign-partitions.sh --bootstrap-server localhost:9092 --reassignment-json-file expand-cluster-reassignment.json --verify 
# 输出略

# 完成后查看test-3和test-4
➜ bin/kafka-topics.sh --describe --topic test-3  --zookeeper localhost:2181/kafka_26Topic: test-3   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-3   Partition: 0    Leader: 183     Replicas: 183,181       Isr: 183,181
        Topic: test-3   Partition: 1    Leader: 183     Replicas: 181,183       Isr: 183,181
        Topic: test-3   Partition: 2    Leader: 183     Replicas: 183,181       Isr: 183,181
        Topic: test-3   Partition: 3    Leader: 183     Replicas: 181,183       Isr: 183,181
➜ bin/kafka-topics.sh --describe --topic test-4  --zookeeper localhost:2181/kafka_26Topic: test-4   PartitionCount: 4       ReplicationFactor: 2    Configs: 
        Topic: test-4   Partition: 0    Leader: 183     Replicas: 182,183       Isr: 183,182
        Topic: test-4   Partition: 1    Leader: 183     Replicas: 183,182       Isr: 183,182
        Topic: test-4   Partition: 2    Leader: 183     Replicas: 182,183       Isr: 183,182
        Topic: test-4   Partition: 3    Leader: 183     Replicas: 183,182       Isr: 183,182

不管扩容还是缩容,或者是故障后手动补齐分区,实质都是分区重分配,使用kafka-reassign-partitions.sh脚本即可。该脚本使用也非常简单:

  1. 先提供一个JSON格式的需要重分配的topic列表,然后执行--generate生成迁移方案文件;
  2. 然后使用--execute执行新的分配方案;
  3. 最后使用--verify查看分配方案执行进度。

如果对于分配方案文件格式很熟悉,可以跳过1.


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK