7

消息队列之kafka 消费者的Offset管理

 2 years ago
source link: https://ivalue2333.github.io/2021/01/21/message%20queue/%E6%B6%88%E6%81%AF%E9%98%9F%E5%88%97%E4%B9%8Bkafka%20%E6%B6%88%E8%B4%B9%E8%80%85%E7%9A%84Offset%E7%AE%A1%E7%90%86/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

消息队列之kafka 消费者的Offset管理

发表于

2021-01-21

更新于 2021-02-24 分类于 消息队列


本文字数: 2.1k 阅读时长 ≈ 2 分钟

[TOC]

Consumer通过提交Offset来记录当前消费的最后位置,以便于消费者发生崩溃或者有新的消费者加入消费者组,而引发的分区再均衡操作,每个消费者可能会分到不同的分区。我测试的kafka版本是:0.11.0.2,消费者往一个特殊的主题“_consumer_offset”发送消息

消息的内容包括:

fields content

Key Consumer Group, topic, partition

Payload Offset, metadata, timestamp

两种 offset 存储方式

  • zookeeper

kafka api

如果是根据kafka默认的api来消费,即【org.apache.kafka.clients.consumer.KafkaConsumer】,我们会配置参【bootstrap.servers】来消费。而其消费者的offset会更新到一个kafka自带的topic【__consumer_offsets】下面,查看当前group的消费进度,则要依靠kafka自带的工具【kafka-consumer-offset-checker】

Offset Commit

Offset的提交逻辑其实和普通的生产者往kafka发送数据是一样的。

Consumer

消费者启动时会为“_consumer_offset”主题创建一个内置的生产者,用于Offset数据的提交。

Broker

Broker就是将Offset提交当成是正常的生产请求,逻辑不变。

“_consumer_offset”主题会在集群中的第一个Offset提交请求时被自动创建。

Offset的提交方式

Offset提交时会有两个问题:重复消费(消费者最少消费一次或者恰好消费一次)和漏消费(消费者最多消费一次)。

  • 当提交的Offset小于客户端处理的最后一条消息的Offset,会造成重复消费。 情景:先消费,后提交Offset,如果消费成功、提交失败,消费者下次获取的Offset还是以前的,所以会造成重复消费。
  • 当提交的Offset大于客户端处理的最后一条消息的Offset,会造成漏消费。 情景:先提交Offset,后消费,如果提交成功、消费失败,消费者下次获取的Offset已经是新的,所以会造成漏消费。

根据具体的业务情况,选择合适的提交方式,可以有效的解决掉重复消费和漏消费的问题。

自动提交是最简单的提交方式,通过设置参数,可以开启自动提交也可以设置提交的时间间隔。缺点就是,当消费了一些数据后,还未达到自动的提交时间,这个时候,有新的消费者加入,或者当前消费者挂掉,会出现分区再均衡操作,之后消费者重新在上一次提交的Offset开始消费,造成重复消费。虽然可以缩短自动提交间隔,但是还是无法解决这个问题。

同步提交当前Offset

关闭手动提交,可以通过同步提交接口来提交当前的Offset,虽然可以获取主动性,但是也牺牲了吞吐量,因为同步提交必然是阻塞的,而且会有重试机制。

异步提交当前Offset

使用异步提交方式,既有主动性,也可以增加kafka消费的吞吐量,没有重试机制,也解决不掉重复消费的问题。

同步和异步组合提交

正常使用的时候使用异步提交,速度快。当要关闭消费者的时候,使用同步提交,即使失败了也会一直重试,直到提交成功或者发生无法恢复的错误。不管是同步提交还是异步提交都避免不了重复消费和漏消费的问题。

java api

消费者如果是根据javaapi来消费,也就是【kafka.javaapi.consumer.ConsumerConnector】,我们会配置参数【zookeeper.connect】来消费。这种情况下,消费者的offset会更新到zookeeper的【consumers/{group}/offsets/{topic}/{partition}】目录下,例如:

[zk: localhost(CONNECTED) 0] get /kafka/consumers/zoo-consumer-group/offsets/my-topic/0
5662
cZxid = 0x20006d28a
ctime = Wed Apr 12 18:20:51 CST 2017
mZxid = 0x30132b0ed
mtime = Tue Aug 22 18:53:22 CST 2017
pZxid = 0x20006d28a
cversion = 0
dataVersion = 5758
aclVersion = 0
ephemeralOwner = 0x0
dataLength = 4
numChildren = 0

https://juejin.cn/post/6844904016212656141

https://zqhxuyuan.github.io/2016/02/18/Kafka-Consumer-Offset-Manager/

https://www.jianshu.com/p/e6f535fdf2a4


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK