0

springboot入门14 – Kafka应用

 3 years ago
source link: http://www.zhyea.com/2021/02/16/springboot-entrance-14-using-kafka.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

这几天优化了一下之前写的一个springboot kafka组件。比较起原生的spring-kafka来,我希望能够简化kafka的使用,可以更聚焦于具体的消息处理逻辑。

接下来的内容是这个组件的用法。

这个组件已经提交到了maven中央仓库,可以直接通过依赖的形式引入:

<dependency>
<groupId>org.chobit.spring</groupId>
<artifactId>kafka-spring-boot-starter</artifactId>
<version>[0.2.2,)</version>
</dependency>

0.2.2是这两天刚发布的一个版本。

消费者Processor

kafka-spring-boot-starter这个组件已经完成了kafka消费者的主要功能。

对于开发者来说,可以不必关注 KafkaConsumer的创建,只需要实现 Processor接口并注入到容器中即可。

下面是一个简单的示例:

@Component("zhyyy")
public class MyProcessor implements Processor<String, String> {
    @Override
    public void process(ConsumerRecords<String, String> records) {
        for (ConsumerRecord<String, String> r : records) {
            String json = r.value();
            System.out.println(json);

如示例中,通过 @Component注解完成了 Processor实现类的实例的注入,并为注入的Bean提供了一个名称:zhyyy。记住这个名称,在之后的配置文件中会用到。

使用生产者

kafka-spring-boot-starter会根据配置主动创建 KafkaProducer。开发使用时可以直接从容器中获取 ProducerTemplate实例来发送消息:

    @Autowired
    private ProducerTemplate<?, ?> producerTemplate;

如果写入kafka的消息的key和value的序列化方案采用的都是默认的字符串(反)序列化方案( StringDeserializer和 StringSerializer),可以使用 StringProducerTemplate实例:

    @Autowired
    private StringProducerTemplate producerTemplate;

发送消息时酌情调用不同的 send()方法:

void send(String topic, V value){...}
void send(String topic, K key, V value){...}
void send(String topic, V value, Callback callback){...}
void send(String topic, K key, V value, Callback callback) {...}

下面是一个最简单的配置:

kafka:
  config:
    test-group00:
      bootstrap-servers: kafka1,kafka2,kafka3
      topics: test-topic1
      consumer:
        processor: zhyyy
        count: 4

如上配置中:

  • test-group00既是配置项的ID,也是消费组ID
  • bootstrap-servers我想不需要多做解释。
  • topics对应的是一个数组结构,也可以写作 [test-topic1]或 [test-topic1,test-topic2],即支持同一个kafka集群上多个类似topic的统一处理
  • consumer是消费者相关配置,processor对应的是 Processor实现类的Bean名称,count标识的是应用内消费线程的数量

默认的序列化方案采用的是字符串序列化方案。

虽然在配置中没有体现,但kafka-spring-boot-starter组件会基于已有的信息创建 KafkaProducer,使用时可以通过 ProducerTemplate执行消息发送。

比较完整的配置是这样子的:

kafka:
  common:
    consumer:
      prop1: value1
      prop2: value2
    producer:
      prop3: value3
      prop4: value4
  config:
    test-group00:
      bootstrapServers: kafka1,kafka2,kafka3
      topics: test-topic
      consumer:
        autoOffsetReset: latest
        processor: zhyyy
        count: 4
        keyDeserializer: org.apache.kafka.common.serialization.StringDeserializer
        valueDeserializer: org.apache.kafka.common.serialization.StringDeserializer
        props:
          prop1: value1
          prop2: value2
      producer:
        keySerializer: org.apache.kafka.common.serialization.StringSerializer
        valueSerializer: org.apache.kafka.common.serialization.StringSerializer
        props:
          prop3: value3
          prop4: value4
    test-group02:
      bootstrapServers: kafka4,kafka5,kafka6
      topics: test-topic2
      consumer:
        autoOffsetReset: latest
        processor: zhyyy

其中common模块下是一些通用的配置,config模块下则是一或多组具体的配置(这里是两组)。common下的配置会被config下的配置覆盖。

此外还独立出来了一些常用的配置项,如autoOffsetResetkeyDeserializer等,以便在使用时进行配置。

示例应用在 github / spring-boot-kafka

kafka-spring-boot-starter这个组件的源码也在 github 。如果有定制化的需求可以据此进行调整。


Recommend

  • 44

    发布与订阅消息系统 在正式讨论Apache Kafka (以下简称Kafka)之前,先来了解发布与订阅消息系统的概念, 并认识这个系统的重要性。数据(消息)的发送者(发布者)不会直接把消息发送给接收 者,这是发布与订阅消息系统的一个特点...

  • 36
    • 微信 mp.weixin.qq.com 4 years ago
    • Cache

    全网最通俗易懂的Kafka入门

    众所周知,消息队列的产品有好几种,这里我选择学习Kafka的原因,无他,公司在用。 我司使用的是Kafka和自研的消息队列(Kafka和RocketMQ)改版,于是我就想学学Kafka这款消息队列啦。本篇文章对Kafka入门,希望对大家有所帮助。

  • 33
    • 掘金 juejin.im 4 years ago
    • Cache

    全网最通俗易懂的Kafka入门!

    只有光头才能变强。 文本已收录至我的GitHub仓库,欢迎Star:github.com/ZhongFuChen… 在这篇之前已经写过两篇基础文...

  • 45
    • www.cnblogs.com 4 years ago
    • Cache

    Kafka入门(2):消费与位移

    摘要 在这篇文章中,我将从消息在Kafka中的物理存储方式讲起,介绍分区-日志段-日志的各个层次。 然后我将接着上一篇文章的内容,把消费者的内容展开讲一讲,区分消费者与消费者组,以及这么设计有什么用。 ...

  • 10

    核心API在使用Akka kafka consumer前, 先了解下几个核心API:ConsumerSetting Consumer的配置信息;ConsumerRecord Kafka消息的封装类,包含消息的K、V,以及该消息所属的topic, partition, offset,...

  • 15

    Akka Stream在用Akka去对接Kafka之前,有必要先简单了解下Akka Stream,它是基于Reactive Stream(Akka是其创立成员之一)的。Akka Stream中,将流的拓扑处理逻辑命名为...

  • 8

    概述 之前有写过springboot缓存应用的说明(《 springboot入门01 – 缓存的使用 》)。不过实际的场景有时候会比较复杂一些,比如...

  • 8

    SpringBoot使用Testcontainers+Avro消息测试Kafka消费者您有一个 Spring Boot 微服务来管理用户数据。该微服务侦听来自 Kafka 的传入事件(例如用户创建、更新或删除),将它们转换为您自己的业务对象,将它们写入 PostgreSQL 数据库并通过 REST 接口将它们提供...

  • 7

    在上篇文章中,我们详细的介绍了 kafka 的架构模型,在集群环境中,kafka 可以通过设置分区数来加快数据的消费速度。光知道理论还不行,我们得真真切切的实践起来才行!下面,我将结合生产环境的真实案例,以SpringBoot技术框架为基础,向大家介绍 kafk...

  • 10

    Share this:In the previous post Kafka Tutorial - Java Producer and Consumer we have learned how to implement a Producer and...

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK