Spring 对Apache Kafka的支持与集成
source link: https://segmentfault.com/a/1190000038294401
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
1. 引言
Apache Kafka 是一个分布式的、容错的流处理系统。在本文中,我们将介绍Spring对Apache Kafka的支持,以及原生Kafka Java客户端Api 所提供的抽象级别。
Spring Kafka 通过 @KafkaListener 注解,带来了一个简单而典型的 Spring 模板编程模型,它还带有一个 KafkaTemplate 和消息驱动的 POJO 。
2. 安装和设置
要下载和安装Kafka,请参考官方指南。然后还需要在 pom.xml
文件中添加 spring-kafka
:
<dependency> <groupId>org.springframework.kafka</groupId> <artifactId>spring-kafka</artifactId> <version>2.3.7.RELEASE</version> </dependency>
新建一个 Spring Boot 示例应用程序,以默认配置启动。
3. 配置 Topics
以前我们使用命令行工具在 Kafka
中创建 topic
,例如:
$ bin/kafka-topics.sh --create \ --zookeeper localhost:2181 \ --replication-factor 1 --partitions 1 \ --topic mytopic
但是随着 AdminClient
在Kafka中的引入,我们现在可以通过编程来创建 Topic
。
如下代码,添加 KafkAdmin
bean 到 Spring中,它将自动为 NewTopic
类的所有 bean
添加 topic
:
@Configuration public class KafkaTopicConfig { @Value(value = "${kafka.bootstrapAddress}") private String bootstrapAddress; @Bean public KafkaAdmin kafkaAdmin() { Map<String, Object> configs = new HashMap<>(); configs.put(AdminClientConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); return new KafkaAdmin(configs); } @Bean public NewTopic topic1() { return new NewTopic("developlee", 1, (short) 1); } }
4. 消息生成
要创建消息,首先需要配置 ProducerFactory
,并设置创建 Kafka Producer
实例的策略,然后使用 KafkaTemplate
。 KafkaTemplate
包装了 Producer
实例,并提供向 Kafka Topic
发送消息的简便方法。
在整个应用程序上下文中使用单个实例将提供更高的性能。因此推荐使用一个 Producer
实例。该实例是线程安全的,所以 KakfaTemplate
实例也是线程安全的,
4.1. Producer 配置
@Configuration public class KafkaProducerConfig { @Bean public ProducerFactory<String, String> producerFactory() { Map<String, Object> configProps = new HashMap<>(); configProps.put( ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); configProps.put( ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class); configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, String> kafkaTemplate() { return new KafkaTemplate<>(producerFactory()); } }
4.2. 消息发布
我们使用 KafkaTemplate
来发布消息:
@Autowired private KafkaTemplate<String, String> kafkaTemplate; public void sendMessage(String msg) { kafkaTemplate.send(topicName, msg); }
send
API 返回 ListenableFuture
对象。如果我们想阻塞发送线程并获得关于发送消息的结果,我们可以调用 ListenableFuture
对象的 get
API。线程将会等待结果,但它会降低生产者的速度。
Kafka是一个快速流处理平台。因此,最好异步处理结果,这样后续消息就无需等待前一条消息的结果。我们可以通过回调来实现:
public void sendMessage(String message) { ListenableFuture<SendResult<String, String>> future = kafkaTemplate.send(topicName, message); future.addCallback(new ListenableFutureCallback<SendResult<String, String>>() { @Override public void onSuccess(SendResult<String, String> result) { System.out.println("Sent message=[" + message + "] with offset=[" + result.getRecordMetadata().offset() + "]"); } @Override public void onFailure(Throwable ex) { System.out.println("Unable to send message=[" + message + "] due to : " + ex.getMessage()); } }); }
5. 消息消费
5.1. 消费者配置
对于消费消息,我们需要配置一个 ConsumerFactory
和一个 KafkaListenerContainerFactory
。
一旦这些bean在Spring Bean工厂中可用,就可以使用 @KafkaListener
注解配置基于POJO的消费者。
配置类上需要添加 @EnableKafka
注解,以便能够检测 Spring
管理的bean上的 @KafkaListener
注解:
@EnableKafka @Configuration public class KafkaConsumerConfig { @Bean public ConsumerFactory<String, String> consumerFactory() { Map<String, Object> props = new HashMap<>(); props.put( ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapAddress); props.put( ConsumerConfig.GROUP_ID_CONFIG, groupId); props.put( ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); props.put( ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class); return new DefaultKafkaConsumerFactory<>(props); } @Bean public ConcurrentKafkaListenerContainerFactory<String, String> kafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); return factory; } }
5.2. 消息消费
@KafkaListener(topics = "topicName", groupId = "foo") public void listenGroupFoo(String message) { System.out.println("Received Message in group foo: " + message); }
可以为一个 topic 实现多个 listener,每个topic 都有不同的组Id。此外,一个消费者可以监听来自不同 topic 的消息:
@KafkaListener(topics = "topic1, topic2", groupId = "foo")
Spring 还支持使用 listener 中的 @Header 注解检索一个或多个消息标题:
@KafkaListener(topics = "topicName") public void listenWithHeaders( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }
5.3. 消费来自特定分区的消息
注意到,我们只使用一个分区创建了 topic “developlee”。但是,对于具有多个分区的主题, @KafkaListener 可以显式订阅具有初始偏移量 topic 的特定分区:
@KafkaListener( topicPartitions = @TopicPartition(topic = "topicName", partitionOffsets = { @PartitionOffset(partition = "0", initialOffset = "0"), @PartitionOffset(partition = "3", initialOffset = "0")}), containerFactory = "partitionsKafkaListenerContainerFactory") public void listenToPartition( @Payload String message, @Header(KafkaHeaders.RECEIVED_PARTITION_ID) int partition) { System.out.println( "Received Message: " + message" + "from partition: " + partition); }
由于 initialOffset
已被发送到该 listener 中的分区0,因此每次初始化该 listener
时,将重新使用以前从分区0和分区3消耗的所有消息。如果不需要设置偏移量,我们可以使用 @TopicPartition
注解的 partitions
属性只设置没有偏移量的分区:
@KafkaListener(topicPartitions = @TopicPartition(topic = "topicName", partitions = { "0", "1" }))
5.4. 为Listener添加消息过滤器
通过添加自定义过滤器,可以将 listener
配置为使用特定类型的消息。这可以通过将 RecordFilterStrategy
设置为 KafkaListenerContainerFactory
来完成:
@Bean public ConcurrentKafkaListenerContainerFactory<String, String> filterKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, String> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(consumerFactory()); factory.setRecordFilterStrategy( record -> record.value().contains("World")); return factory; }
然后可以将 listener
配置为使用此容器工厂:
@KafkaListener( topics = "topicName", containerFactory = "filterKafkaListenerContainerFactory") public void listenWithFilter(String message) { System.out.println("Received Message in filtered listener: " + message); }
在这个 listener
中,所有与过滤器匹配的 消息都将被丢弃
。
6. 自定义消息转换器
到目前为止,我们只讨论了字符串作为消息发送和接收的对象。但是,我们也可以发送和接收定制的Java对象。这需要在 ProducerFactory
中配置适当的序列化器,并在 ConsumerFactory
中配置反序列化器。
让我们看一个简单的bean,并将以消息的形式发送它:
public class Greeting { private String msg; private String name; // standard getters, setters and constructor }
6.1. 生产自定义消息
在本例中,我们将使用 JsonSerializer
。我们看看 ProducerFactory
和 KafkaTemplate
的代码:
@Bean public ProducerFactory<String, Greeting> greetingProducerFactory() { // ... configProps.put( ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class); return new DefaultKafkaProducerFactory<>(configProps); } @Bean public KafkaTemplate<String, Greeting> greetingKafkaTemplate() { return new KafkaTemplate<>(greetingProducerFactory()); }
新的 KafkaTemplate
可用于发送 Greeting 消息:
kafkaTemplate.send(topicName, new Greeting("Hello", "World"));
6.2. 消费自定义消息
同样,我们修改 ConsumerFactory
和 KafkaListenerContainerFactory
来正确反序列化 Greeting 消息:
@Bean public ConsumerFactory<String, Greeting> greetingConsumerFactory() { // ... return new DefaultKafkaConsumerFactory<>( props, new StringDeserializer(), new JsonDeserializer<>(Greeting.class)); } @Bean public ConcurrentKafkaListenerContainerFactory<String, Greeting> greetingKafkaListenerContainerFactory() { ConcurrentKafkaListenerContainerFactory<String, Greeting> factory = new ConcurrentKafkaListenerContainerFactory<>(); factory.setConsumerFactory(greetingConsumerFactory()); return factory; }
spring-kafka
JSON序列化器和反序列化器使用 Jackson 库,该库是 spring-kafka
项目的可选maven依赖项。我们也把它加到 pom.xml
文件:
<dependency> <groupId>com.fasterxml.jackson.core</groupId> <artifactId>jackson-databind</artifactId> <version>2.9.7</version> </dependency>
建议不要使用 Jackson 的最新版本,而是使用 pom.xml
文件 中 spring-kafka
的版本。
最后,我们需要编写一个 listener 来 消费 Greeting 消息:
@KafkaListener( topics = "topicName", containerFactory = "greetingKafkaListenerContainerFactory") public void greetingListener(Greeting greeting) { // process greeting message }
7. 结语
Recommend
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK