8

Spring 对Apache Kafka的支持与集成

 3 years ago
source link: https://segmentfault.com/a/1190000038294401
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

1. 引言

Apache Kafka 是一个分布式的、容错的流处理系统。在本文中,我们将介绍Spring对Apache Kafka的支持,以及原生Kafka Java客户端Api 所提供的抽象级别。

Spring Kafka 通过 @KafkaListener 注解,带来了一个简单而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和消息驱动的 POJO 。

2. 安装和设置

要下载和安装Kafka,请参考官方指南。然后还需要在 pom.xml 文件中添加 spring-kafka :

<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    <version>2.3.7.RELEASE</version>
</dependency>

新建一个 Spring Boot 示例应用程序,以默认配置启动。

3. 配置 Topics

以前我们使用命令行工具在 Kafka 中创建 topic ,例如:

$ bin/kafka-topics.sh --create \
  --zookeeper localhost:2181 \
  --replication-factor 1 --partitions 1 \
  --topic mytopic

但是随着 AdminClient 在Kafka中的引入,我们现在可以通过编程来创建 Topic

如下代码,添加 KafkAdmin bean 到 Spring中,它将自动为 NewTopic 类的所有 bean 添加 topic

@Configuration
public class KafkaTopicConfig {
    
    @Value(value = "${kafka.bootstrapAddress}")
    private String bootstrapAddress;
 
    @Bean
    public KafkaAdmin kafkaAdmin() {
        Map<String, Object> configs = new HashMap<>();
        configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress);
        return new KafkaAdmin(configs);
    }
    
    @Bean
    public NewTopic topic1() {
         return new NewTopic("developlee", 1, (short) 1);
    }
}

4. 消息生成

要创建消息,首先需要配置 ProducerFactory ,并设置创建 Kafka Producer 实例的策略,然后使用 KafkaTemplateKafkaTemplate 包装了 Producer 实例,并提供向 Kafka Topic 发送消息的简便方法。

在整个应用程序上下文中使用单个实例将提供更高的性能。因此推荐使用一个 Producer 实例。该实例是线程安全的,所以 KakfaTemplate 实例也是线程安全的,

4.1. Producer 配置

@Configuration
public class KafkaProducerConfig {
 
    @Bean
    public ProducerFactory<String, String> producerFactory() {
        Map<String, Object> configProps = new HashMap<>();
        configProps.put(
          ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        configProps.put(
          ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        configProps.put(
          ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
          StringSerializer.class);
        return new DefaultKafkaProducerFactory<>(configProps);
    }
 
    @Bean
    public KafkaTemplate<String, String> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

4.2. 消息发布

我们使用 KafkaTemplate 来发布消息:

@Autowired
private KafkaTemplate<String, String> kafkaTemplate;
 
public void sendMessage(String msg) {
    kafkaTemplate.send(topicName, msg);
}

send API 返回 ListenableFuture 对象。如果我们想阻塞发送线程并获得关于发送消息的结果,我们可以调用 ListenableFuture 对象的 get API。线程将会等待结果,但它会降低生产者的速度。

Kafka是一个快速流处理平台。因此,最好异步处理结果,这样后续消息就无需等待前一条消息的结果。我们可以通过回调来实现:

public void sendMessage(String message) {
            
    ListenableFuture<SendResult<String, String>> future = 
      kafkaTemplate.send(topicName, message);
    
    future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() {
 
        @Override
        public void onSuccess(SendResult<String, String> result) {
            System.out.println("Sent message=[" + message + 
              "] with offset=[" + result.getRecordMetadata().offset() + "]");
        }
        @Override
        public void onFailure(Throwable ex) {
            System.out.println("Unable to send message=[" 
              + message + "] due to : " + ex.getMessage());
        }
    });
}

5. 消息消费

5.1. 消费者配置

对于消费消息,我们需要配置一个 ConsumerFactory 和一个 KafkaListenerContainerFactory

一旦这些bean在Spring Bean工厂中可用,就可以使用 @KafkaListener 注解配置基于POJO的消费者。

配置类上需要添加 @EnableKafka 注解,以便能够检测 Spring 管理的bean上的 @KafkaListener 注解:

@EnableKafka
@Configuration
public class KafkaConsumerConfig {
 
    @Bean
    public ConsumerFactory<String, String> consumerFactory() {
        Map<String, Object> props = new HashMap<>();
        props.put(
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, 
          bootstrapAddress);
        props.put(
          ConsumerConfig.GROUP_ID_CONFIG, 
          groupId);
        props.put(
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        props.put(
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, 
          StringDeserializer.class);
        return new DefaultKafkaConsumerFactory<>(props);
    }
 
    @Bean
    public ConcurrentKafkaListenerContainerFactory<String, String> 
      kafkaListenerContainerFactory() {
   
        ConcurrentKafkaListenerContainerFactory<String, String> factory =
          new ConcurrentKafkaListenerContainerFactory<>();
        factory.setConsumerFactory(consumerFactory());
        return factory;
    }
}

5.2. 消息消费

@KafkaListener(topics = "topicName", groupId = "foo")
public void listenGroupFoo(String message) {
    System.out.println("Received Message in group foo: " + message);
}

可以为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者可以监听来自不同 topic 的消息:

@KafkaListener(topics = "topic1, topic2", groupId = "foo")

Spring 还支持使用 listener 中的 @Header 注解检索一个或多个消息标题:

@KafkaListener(topics = "topicName")
public void listenWithHeaders(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

5.3. 消费来自特定分区的消息

注意到,我们只使用一个分区创建了 topic “developlee”。但是,对于具有多个分区的主题, @KafkaListener 可以显式订阅具有初始偏移量 topic 的特定分区:

@KafkaListener(
  topicPartitions = @TopicPartition(topic = "topicName",
  partitionOffsets = {
    @PartitionOffset(partition = "0", initialOffset = "0"), 
    @PartitionOffset(partition = "3", initialOffset = "0")}),
  containerFactory = "partitionsKafkaListenerContainerFactory")
public void listenToPartition(
  @Payload String message, 
  @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) {
      System.out.println(
        "Received Message: " + message"
        + "from partition: " + partition);
}

由于 initialOffset 已被发送到该 listener 中的分区0,因此每次初始化该 listener 时,将重新使用以前从分区0和分区3消耗的所有消息。如果不需要设置偏移量,我们可以使用 @TopicPartition 注解的 partitions 属性只设置没有偏移量的分区:

@KafkaListener(topicPartitions 
  = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))

5.4. 为Listener添加消息过滤器

通过添加自定义过滤器,可以将 listener 配置为使用特定类型的消息。这可以通过将 RecordFilterStrategy 设置为 KafkaListenerContainerFactory 来完成:

@Bean
public ConcurrentKafkaListenerContainerFactory<String, String>
  filterKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, String> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(consumerFactory());
    factory.setRecordFilterStrategy(
      record -> record.value().contains("World"));
    return factory;
}

然后可以将 listener 配置为使用此容器工厂:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "filterKafkaListenerContainerFactory")
public void listenWithFilter(String message) {
    System.out.println("Received Message in filtered listener: " + message);
}

在这个 listener 中,所有与过滤器匹配的 消息都将被丢弃

6. 自定义消息转换器

到目前为止,我们只讨论了字符串作为消息发送和接收的对象。但是,我们也可以发送和接收定制的Java对象。这需要在 ProducerFactory 中配置适当的序列化器,并在 ConsumerFactory 中配置反序列化器。

让我们看一个简单的bean,并将以消息的形式发送它:

public class Greeting {
 
    private String msg;
    private String name;
 
    // standard getters, setters and constructor
}

6.1. 生产自定义消息

在本例中,我们将使用 JsonSerializer 。我们看看 ProducerFactoryKafkaTemplate 的代码:

@Bean
public ProducerFactory<String, Greeting> greetingProducerFactory() {
    // ...
    configProps.put(
      ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, 
      JsonSerializer.class);
    return new DefaultKafkaProducerFactory<>(configProps);
}
 
@Bean
public KafkaTemplate<String, Greeting> greetingKafkaTemplate() {
    return new KafkaTemplate<>(greetingProducerFactory());
}

新的 KafkaTemplate 可用于发送 Greeting 消息:

kafkaTemplate.send(topicName, new Greeting("Hello", "World"));

6.2. 消费自定义消息

同样,我们修改 ConsumerFactoryKafkaListenerContainerFactory 来正确反序列化 Greeting 消息:

@Bean
public ConsumerFactory<String, Greeting> greetingConsumerFactory() {
    // ...
    return new DefaultKafkaConsumerFactory<>(
      props,
      new StringDeserializer(), 
      new JsonDeserializer<>(Greeting.class));
}
 
@Bean
public ConcurrentKafkaListenerContainerFactory<String, Greeting> 
  greetingKafkaListenerContainerFactory() {
 
    ConcurrentKafkaListenerContainerFactory<String, Greeting> factory =
      new ConcurrentKafkaListenerContainerFactory<>();
    factory.setConsumerFactory(greetingConsumerFactory());
    return factory;
}

spring-kafka JSON序列化器和反序列化器使用 Jackson 库,该库是 spring-kafka 项目的可选maven依赖项。我们也把它加到 pom.xml 文件:

<dependency>
    <groupId>com.fasterxml.jackson.core</groupId>
    <artifactId>jackson-databind</artifactId>
    <version>2.9.7</version>
</dependency>

建议不要使用 Jackson 的最新版本,而是使用 pom.xml 文件 中 spring-kafka 的版本。

最后,我们需要编写一个 listener 来 消费 Greeting 消息:

@KafkaListener(
  topics = "topicName", 
  containerFactory = "greetingKafkaListenerContainerFactory")
public void greetingListener(Greeting greeting) {
    // process greeting message
}

7. 结语

在本文中,我们介绍了Apache Kafka 和 Spring 集成的基础知识,且简要介绍了用于发送和接收消息的类。

本文的完整源代码可以在 GitHub上找到 . 在执行代码之前,请确保服务器正在运行 Kafka。

如果你觉得文章还不错,记得关注公众号: 锅外的大佬

刘一手的博客

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK