47

Kafka 的事务到底长啥样?

 5 years ago
source link: https://www.tuicool.com/articles/MRVzIvn
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

vua6n2n.jpg!web

作者 | 来自网络

整理 | 纯粹技术分享

这篇文章主要讲述 Kafka 事务性相关原理,从 Kafka EOS 语义、幂等性、事务性等几个方面阐述

Kafka EOS 语义

EOS(Exactly Once Semantics,精确一次处理语义)是从 Kafka 0.11.0.0 版本开始支持的,之前版本中只支持 At Least Once 和 At Most Once 语义,并不支持 Exactly Once 语义

因为在很多要求严格的场景下,如使用 Kafka 处理交易数据,Exactly Once 语义是必须的。我们可以通过让下游系统具有幂等性来配合 Kafka 的 At Least Once 语义来间接实现 Exactly Once 语义。但是也存在一些问题:

  • 该方案要求下游系统支持幂等操作,限制了 Kafka 的适用场景

  • 实现门槛相对较高,需要用户对 Kafka 的工作机制非常了解

  • 对于 Kafka Stream 而言,Kafka 本身即是自己的下游系统,但 Kafka 在 0.11.0.0 版本之前不具有幂等发送能力

因此,Kafka 本身对Exactly Once语义的支持就非常必要。

Kafka 幂等性

在说 Kafka 的事务之前,先要说一下 Kafka 中幂等( Idempotent )的实现。幂等和事务是 Kafka 0.11.0.0 版本引入的两个特性,以此来实现 EOS 语义。

Kafka 幂等性是 Producer 端的特性,为了实现生产端幂等性,Kafka 引入了 Producer ID(即PID)和 Sequence Number。

  • PID:每个新的 Producer 在初始化的时候会被分配一个唯一的 PID,这个PID 对用户完全是透明的。

  • Sequence Numbler:对于每个 PID,该 Producer 发送到每个 Partition 的 数据 都有对应的序列号,这些序列号是从0开始单调递增的。

Broker 端在缓存中保存了这 Sequence Numbler ,对于接收的每条消息,如果其序号比 Broker 缓存中序号大 于1则接受它,否则将其丢弃。这样就可以实现了消息重复提交了。幂等涉及的参数是 enable.idempotence,默认为 false,开启需要设置为 ture。

但是,这种只能保证单个 Producer 对于 单会话单 Partition  的 Exactly Once 语义。不能保证同一个 Producer 一个 topic 不同的 Partition 幂等。

Kafka 事务性

Kafka 事务支持

正是因为 Kafka Idempotent 不提供跨多个 Partition 和跨会话场景下的保证,因此,我们是需要一种更强的事务保证,能够原子处理多个 Partition 的写入操作,数据要么全部写入成功,要么全部失败,这就是 Kafka Transactions,即Kafka 事务。

Kafka 事务 API

producer提供了initTransactions,beginTransaction,sendOffsetsToTransaction,commitTransaction,abortTransaction 五个事务方法。

    /**

* 初始化事务。需要注意的有:

* 1、前提

* 需要保证transation.id属性被配置。

* 2、这个方法执行逻辑是:

*   (1)Ensures any transactions initiated by previous instances of the producer with the same

*      transactional.id are completed. If the previous instance had failed with a transaction in

*      progress, it will be aborted. If the last transaction had begun completion,

*      but not yet finished, this method awaits its completion.

*    (2)Gets the internal producer id and epoch, used in all future transactional

*      messages issued by the producer.

*

*/

public void initTransactions();

/**

* 开启事务

*/

public void beginTransaction() throws ProducerFencedException ;

/**

* 为消费者提供的在事务内提交偏移量的操作

*/

public void sendOffsetsToTransaction(Map<TopicPartition, OffsetAndMetadata> offsets,

String consumerGroupId) throws ProducerFencedException ;

/**

* 提交事务

*/

public void commitTransaction() throws ProducerFencedException;

/**

* 放弃事务,类似回滚事务的操作

*/

public void abortTransaction() throws ProducerFencedException ;

相关属性配置

使用 Kafka 的事务 API 时的一些注意事项:

  • 需要消费者的自动模式设置为 false,并且不能子再手动的进行执行consumer#commitSync或者consumer#commitAsyc。

  • 生产者配置  transactional.id  属性。

  • 生产者不需要再配置 enable.idempotence,因为如果配置了transaction.id,则此时 enable.idempotence 会被设置为true。

  • 消费者需要配置 isolation.level 属性,有两个可选值:"read_committed "" read_uncommitted ",默认" read_uncommitted "。

Kafka 事务示例

以下是 Producer 事务使用示例:

Properties props = new Properties();
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
props.put("client.id", "ProducerTranscationnalExample");
props.put("bootstrap.servers", "localhost:9092");
props.put("transactional.id", "test-transactional");

props . put ( "acks" , "all" );

KafkaProducer producer = new KafkaProducer ( props );

producer . initTransactions ();

try {

String msg = "matt test" ; producer . beginTransaction (); producer . send ( new ProducerRecord ( topic , "0" , msg . toString ())); producer . send ( new ProducerRecord ( topic , "1" , msg . toString ())); producer . send ( new ProducerRecord ( topic , "2" , msg . toString ()));


producer . commitTransaction ();

} catch ( ProducerFencedException e1 ) {

e1 . printStackTrace ();


producer . close ();

} catch ( KafkaException e2 ) {

e2 . printStackTrace ();


producer . abortTransaction ();

}

producer . close

();

Kafka 幂等与事务的关系

事务属性实现前提是幂等性,即在配置事务属性 transaction id 时,必须还得配置幂等性;但是幂等性是可以独立使用的,不需要依赖事务属性。

  • 幂等性引入了 Porducer ID(还有  Sequence Numbler )。

  • 事务属性引入了 Transaction Id 属性。

参数组合情况:

  • enable.idempotence = true,transactional.id不设置:只支持幂等性。

  • enable.idempotence = true,transactional.id设置:支持事务属性和幂等性

  • enable.idempotence = false,transactional.id不设置:没有事务属性和幂等性的kafka

  • enable.idempotence = false,transactional.id设置:无法获取到PID,此时会报错

参考链接:

Kafka EOS 之事务性实现:

https://www.codercto.com/a/36351.html

Kafka生产者事务和幂等:

http://www.heartthinkdo.com/?p=2040#5

一个进阶的大数据技术交流学习公众号,死磕大数据与分布式系统,分享NoSQL数据库、存储计算引擎、消息中间件等。长按二维码关注:

2QVbIf3.png!web

好文推荐:

喜欢就戳一下  "在看"   ↘↘


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK