10

Akka入门系列(八):akka kafka Consumer

 3 years ago
source link: http://edisonxu.com/2018/12/04/akka-kafka-consumer.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

核心API

在使用Akka kafka consumer前, 先了解下几个核心API:

  • ConsumerSetting Consumer的配置信息;
  • ConsumerRecord Kafka消息的封装类,包含消息的K、V,以及该消息所属的topic, partition, offset, timestamp等;
  • ConsumerMessageConsumerRecord的进一步充血模型,提供了自动commit以及修改offset信息的API;
  • Subscription 该Consumer的订阅信息,有AutoSubscriptionManualSubscription两个子接口,分别用于自动从Topic读取Partition以及手动绑定Partition;

Akka Kafka中,Consumer一般是作为流的Source,在akka.kafka.javadsl.Consumer中提供了常用的几种Source。主要包含两大类:

1. Offset存储及读取机制独立于Kafka以外,需自行实现commit逻辑,命名为plainxxxSource;
2. Offset存储及读取机制依赖于Kafka的所提供的API,通过调用Akka已封装的`ConsumerMessage`进行offset的commit,命名为committableXXXSource

详情可以参见最后的目录。

1
2
3
4
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-stream-kafka_2.12</artifactId>
</dependency>

所有的Consumer都需要传入配置类ConsumerSetting,需要提供如下信息:

  • Kafka消息key和value的反序列化器
  • Kafka集群的地址信息
  • consumer的GroupId,注意:offset是按组进行commit的
  • Kafka Consumer的调优参数
1
2
3
4
5
6
7
8
9
10
11
12
public static ConsumerSettings getConsumerSettings(
Deserializer keyDeserializer,
Deserializer valDeserializer,
Config config,
String groupId){
Deserializer<String> keySerializer = new StringDeserializer();
Deserializer<byte[]> valSerializer = new ByteArrayDeserializer();

return ConsumerSettings.create(config, keyDeserializer, valDeserializer)
.withGroupId(groupId) // if not defined here, config must contains "group.id"
.withProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
}

数据处理服务定义

我们用相同的一段代码,来代表整个Flow中的数据转换过程。

1
2
3
4
5
6
7
8
static class DummyBusinessLogic {
public CompletionStage<Integer> work(ConsumerRecord<String, byte[]> record){
return CompletableFuture.supplyAsync(() -> {
System.out.println("Partition["+record.partition()+"] got:"+new String(record.value()));
return record.partition();
});
}
}

Offset管理独立于Kafka以外

此类API命名规则都是plainXXXSource,对外都emit出ConsumerRecord,Offset维护在外部的存储里,可以先读取再处理或提供读取的方法给API由其调用获得最新的Offset。Commit也是手动进行,但是可以通过修改auto-commit参数(该值默认是false),由Kafka自行进行Offset的Commit。Kafka的自动Commit是阈值和周期性的Commit,哪个先触发就直接commit,比较适合量大且允许消息重复递交的场景。
我们先实现一个简单的外部存储类,用以演示

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
// A dummy storage to store offset externally
static class ExternalOffsetStorage {
private Map<TopicPartition, Long> partitionOffsetMap = new HashMap<>();

public ExternalOffsetStorage(String topic, int partitonNum) {
for(int i=0;i<partitonNum;i++){
partitionOffsetMap.put(new TopicPartition(topic, i), new Long(0));
}
}

// User CompletionStage is to warn that read the offset may cost some time
/*public CompletionStage<Long> getLatestOffset(){
return CompletableFuture.completedFuture(offset.get());
}*/
public Long getLatestOffset(TopicPartition partition){
return partitionOffsetMap.get(partition);
}

public CompletionStage<Done> commitOffset(TopicPartition partition){
return CompletableFuture.supplyAsync(() -> {
partitionOffsetMap.put(partition, getLatestOffset(partition)+1);
return Done.done();
});
}

public CompletionStage<Done> commitOffset(int partition){
return CompletableFuture.supplyAsync(() -> {
for(TopicPartition p: partitionOffsetMap.keySet()){
if(p.partition() == partition)
partitionOffsetMap.put(p, partitionOffsetMap.get(p)+1);
}
return Done.done();
});
}

public Map<TopicPartition, Long> getPartitionOffsetMap() {
return partitionOffsetMap;
}

public CompletionStage<Map<TopicPartition, Object>> getOffsetsOnAssign(Set<TopicPartition> topicPartitions){
return CompletableFuture.supplyAsync(()->{
Map<TopicPartition, Object> result = new HashMap<>();
topicPartitions.forEach(partition -> result.put(partition, partitionOffsetMap.get(partition)));
return result;
});
}
}

不分Partition处理

不分Partition处理的API,是最简单的Consumer.plainSource,它接受两个参数:

  • ConsumerSetting 配置参数
  • Subscription Kafka的partition信息,可以是AutoSubscriptionManualSubscription
    • AutoSubscriptionSubscriptions.topics("topic")来指定
    • ManualSubscription则需要显式提供每一个TopicPartition及其对应的offset。如果只有一个partition,可以直接Subscriptions.assignmentWithOffset(new TopicPartition("topic", /*partition: */ 0), currentOffset)。如果是多个partition,则传入一个MapTopicPartition作为key,Offset的值为value。
      由于commit的时机和逻辑都是自己提供的,所以比较适合去实现exact-once-delivery
      1
      2
      3
      4
      5
      6
      7
      Consumer.plainSource(
      consumerSettings,
      Subscriptions.assignmentWithOffset(offsetStorage.getPartitionOffsetMap()))
      //Subscriptions.topics(topic))
      .mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
      .to(Sink.ignore())
      .run(materializer);

分Partition处理

API中含Partitioned字样的,均是分Partition处理的API,即每一个Partition会对应一个新的子Source。这一类中,是Consumer.plainPartitionedSourceConsumer.plainPartitionedManualOffsetSource
与plainSource不同的点是:

  • 只接受AutoSubscription
  • 原Source并不直接emit ConsumerRecord,而是派生出三个子Source,从它获得的是一个Pair<TopicPartition, Source>封装类,包含了为每一个partition提供了一个Source对象。
  • Consumer.plainPartitionedManualOffsetSourceConsumer.plainPartitionedSource基础上,增加了一个函数参数,要求传入一个Function,根据提供的包含TopicPartition信息的Set,返回对应的每个Partition的Offset信息,封装在一个Map里,key是TopicPartition, value是offset值。

下面例子里,用flatMapMerge将原Source派生的Source合并(即Pair::second返回值)后,交由同一段Flow处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
if(!manualAssignOffset)
Consumer.plainPartitionedSource(
consumerSettings,
Subscriptions.topics(topic))
// merge ConsumerRecord from different partition Source
.flatMapMerge(partitionNum, Pair::second)
// use same logic flow to handle ConsumerRecord
.mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
.to(Sink.ignore())
.run(materializer);
else
Consumer.plainPartitionedManualOffsetSource(
consumerSettings,
Subscriptions.topics(topic),
offsetStorage::getOffsetsOnAssign)
//.mapAsync(partitionNum, logic::workWithPartitions)
.flatMapMerge(partitionNum, Pair::second)
.mapAsync(partitionNum, record -> logic.work(record).thenApply(partition->offsetStorage.commitOffset(partition)))
.to(Sink.ignore())
.run(materializer);

Offset管理依赖Kafka

Kafka的JAVA API,提供了将offset保存在zookeeper上的功能,在新版的Kafka,更是为了避免zookeeper的性能问题,在其内部创建一个叫__consumer_offsets的topic来存储offset。由offsets.storage参数定义。

该API能够自由控制何时将offsetcommit到Kafka去。比较适合用于at-least-once递交的场景,即消息可能会被多次递交,以保证至少会有一次成功,但相应的,如果发生错误,该错误也会发生多次。

1
2
3
4
5
6
7
8
// single commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
// asynchronously finish logic work and fetch the offset to commit
.mapAsync(1, msg-> logic.work(msg.record()).thenApply(partition -> msg.committableOffset()))
// commit offset
.mapAsync(1, offset->offset.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

这里用了mapAsync异步并行来处理消息,并行数位设为1,保证处理消息的顺序(Kafka单个partition是保序的,但是对于同一个topic的多个partition之间是无序的)。
运行一下Producer,然后可以用JMX查看该Topic的offset数在上升。

该API每处理一个消息就会commit一次,这种方式相当慢。推荐的方式是用batch批量commit,用牺牲发生错误时的重复投递来换取性能。

批量Commit

自动批处理

Akka提供了Committer.sink方法来实现自动批量Commit。在使用这个sink前,需要先在配置文件中定义或者代码里直接指定以下两个参数:

  • max-batch 每次commit的最大消息数,即超过该数即会触发一次commit
  • max-interval 两次commit之间的最大间隔

这两个参数调的越大,Kafka对于commit的load越小,消耗时间越少,但相应的,如果发生错误,重新处理的消息数肯定也是对应增加的。调的越小,则commit越频繁,会带来commit性能瓶颈。这个属于Kafka批量commit的老问题了,与Akka本身是无关的,应视不同场景进行相应参数优化。
修改application.conf,在akka.kafka.consumer区块中添加

1
2
3
4
5
6
7
akka.kafka.consumer {
...
# Maximum number of messages in a single commit batch
max-batch = 1000
# Maximum interval between commits in milliseconds
max-interval = 10000
}

批量commit代码如下:

1
2
3
4
5
6
7
// batch commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.<ConsumerMessage.Committable>thenApply(partition -> msg.committableOffset())
)
.to(Committer.sink(CommitterSettings.create(config)))
.run(materializer);

PS: .<ConsumerMessage.Committable>这里是做类型强转,将msg.committableOffset返回的CommittableOffset转成其实现接口Committable

手动批处理

另一种方式,是手动的将消息用Akka StreambatchAPI聚合后做批量commit。

1
2
3
4
5
6
7
8
9
10
11
12
13
// manual batch commit
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.thenApply(partition -> msg.committableOffset())
)
.batch(
20,
ConsumerMessage::createCommittableOffsetBatch,
ConsumerMessage.CommittableOffsetBatch::updated
)
.mapAsync(3, batch->batch.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

注意:
用这种方式时,只有当下游consumer处理速度比上游的producer处理速度要慢时,batch才会触发(背压),否则会按正常commit处理。
测试时,需要把producer中的控制发送速率的.throttle()注掉,同时调高发送消息数,这样才能看到效果。

按时间聚合批处理

以上都是适合于消息速率比较高的场景,有些场景下,消息的速率非常低,可能24小时内没有任何消息抵达。此时,需考虑打开kafka的批量commit刷新参数(akka.kafka.consumer.commit-refresh-interval),否则在Kafka的存储中,offset会过期。同时,对于这种速率较低的topic,最好使用按时间进行聚合后进行批处理的groupWithinAPI。

1
2
3
4
5
6
7
8
9
10
// time-based aggregation
Consumer.committableSource(consumerSettings, Subscriptions.topics(topic))
.mapAsync(1, msg-> logic.work(msg.record())
.thenApply(partition -> msg.committableOffset())
)
.groupedWithin(5, Duration.ofSeconds(60))
.map(ConsumerMessage::createCommittableOffsetBatch)
.mapAsync(3, batch->batch.commitJavadsl())
.to(Sink.ignore())
.run(materializer);

groupedWithinAPI,接受两个参数,第一个是控制多少个消息触发聚合,第二个是控制时间窗口。如果窗口内接受消息数超过第一个参数,则立刻聚合,如果未超过,到窗口时间到期时也会触发。

测试时,第一个先把Producer的最大消息数改为4,然后不要启用速率控制。看到offset并不是立刻就commit掉。然后把Producer的最大消息数改为10,每1秒发送一个,可以看到当超过5时,立刻触发commit。

分Partition处理

TODO: 流的聚合

有时,我们想在offset里添加自定义的metadata,此时,可以调用Consumer.commitWithMetadataSourceAPI,用法还是比较简单,具体请参考官方文档。但需要注意的是,由于kafka可以周期性commit(akka.kafka.consumer.commit-refresh-interval参数),第一个offset可能并不会包含新的metadata信息。

每个Partition一个独立的Source

at-least-once投递

at-most-once投递

单独commit

Consumer.atMostOnceSource

批量commit

Consumer API

API使用场景参数发射类plainSource将Offset存到外部,不支持存到Kafka本身(除非开启auto-commit,用kafka自己的自动commit功能)ConsumerSettings<K,V> consumerSettings, Subscription subscriptionConsumerRecordplainExternalSource将Offset存到外部,可以使用外部KafkaAsyncConsumer的特殊Source,一般用于预先定义好一个Consumer Actor,然后用该API去手动绑定许多topic-partitionsActorRef consumer, ManualSubscription subscriptionConsumerRecordplainPartitionedSource将Offset存到外部,从topic自动获取partition,每个partition分别对应一个source,放到一个Pair中ConsumerSettings<K,V> consumerSettings, AutoSubscription subscriptionPair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]plainPartitionedManualOffsetSource与plainPartitionedSource基本一致,只是允许将partition的offset存储到外部去,使用时调用传入的getOffsetsOnAssign方法去从外部读取offsetConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[Set[TopicPartition], CompletionStage[Map[TopicPartition, Long]] getOffsetsOnAssignPair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]plainPartitionedManualOffsetSource多了一个onRevoke方法,用于在关闭时去处理(存储)还未commit的offset,以及做一些清扫任务ConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[Set[TopicPartition], CompletionStage[Map[TopicPartition, Long]] getOffsetsOnAssign, Consumer[Set[TopicPartition]] onRevokePair[TopicPartition, Source[ConsumerRecord[K, V], NotUsed]committableSource提供API将Offset存到kafka内部,使用时自由控制何时commitConsumerSettings<K,V> consumerSettings, Subscription subscriptionCommittableMessagecommittableExternalSource与plainExternalSource一样,只是提供了可以commit到Kafka内部的APIActorRef consumer, ManualSubscription subscription, String groupId, FiniteDuration commitTimeoutCommittableMessagecommitWithMetadataSource提供API将Offset存到kafka内部,并可以将额外的信息放入offset的元数据里,比如什么时间或哪个节点commit的等ConsumerSettings<K,V> consumerSettings, Subscription subscription, Function[ConsumerRecord[K, V], String] metadataFromRecordCommittableMessagecommittablePartitionedSource与plainPartitionedSource一样,只是提供了可以commit到Kafka内部的APIConsumerSettings<K,V> consumerSettings, AutoSubscription subscriptionPair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]commitWithMetadataPartitionedSource与plainPartitionedSource一样,只是提供了可以commit到Kafka内部的API,同时允许添加额外信息到offset的元数据里ConsumerSettings<K,V> consumerSettings, AutoSubscription subscription, Function[ConsumerRecord[K, V], String] metadataFromRecordPair[TopicPartition, Source[CommittableMessage[K, V], NotUsed]atMostOnceSource消息在发给下游逻辑处理前,先自动将offset更新commit掉,以保证至多一次投递ConsumerSettings<K,V> consumerSettings, Subscription subscriptionConsumerRecord
如果您觉得文章不错,可以请我喝一杯咖啡!

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK