8

请教, kafka 如何做到一个 topic 分发不同的类型的消息

 1 year ago
source link: https://www.v2ex.com/t/935312
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

V2EX  ›  程序员

请教, kafka 如何做到一个 topic 分发不同的类型的消息

  NoKey · 17 小时 48 分钟前 · 1186 次点击
场景是这样的,上游服务 A ,通过 kafka 发消息个下游服务 B,C,D

为了后续集成方便,A 使用了一个 Topic

这个时候,需要 BCD 接收自己的消息

这种场景下,如何才能控制 BCD 只收到自己的消息,不收别人的消息呢?

考虑了几种方式:
1. 通过 key 。这样下游服务只有收到消息之后才知道 key 是啥,不是自己的丢弃,但是这样必须收消息,也就是 B 会收到 C ,D 的消息,感觉不好。
2. 通过分区。不同下游的消息放到不同的分区,但是这样会造成分区不均衡,部分分区过大。

请问一下大家有没有更好的办法呢?谢谢
25 条回复    2023-04-25 16:46:13 +08:00
antipro

antipro      17 小时 42 分钟前 via Android

给 B ,C ,D 各建一个 Kafka
aijam

aijam      17 小时 38 分钟前

cloudzhou

cloudzhou      17 小时 25 分钟前

给 B ,C ,D 各建一个 Topic 就可以
dddd1919

dddd1919      17 小时 24 分钟前

如果只让 BCD 接收到自己的消息,那就在 push 时分三个 topic ,直接把消息隔离开,缺点就是负载可能不均服务利用率降低
如果只让 BCD 处理自己要的消息并忽略掉无意义消息,可以在各自 consumer 加 filterStrategy 过滤掉无关消息
NoKey

NoKey      17 小时 16 分钟前

建多个 topic 的麻烦点就是,后续要不断的增加 topic ,有没有办法,一个 topic 就可以解决呢?😂
ChaYedan666

ChaYedan666      16 小时 57 分钟前

@NoKey 不可能吧,不论怎么说,只要是都发一个 topic ,那么 BCD 就得把里面的消息拉过来做过滤,过滤后再消费自己的;或者另一种就是你自己说的第二种,同一个消费者组,监听不同分区,根据 key 发不同的分区,分区不均衡啥的就你得自己控制了
wuYin

wuYin      16 小时 35 分钟前 via iPhone

也许可以用 2 个 kafka 集群,A 写集群 1 ,自己写个 connector 做消息解析与分发,写到集群 2 的三个 topic ,再由 B C D 各自消费。
这种做法引入了新的集群和组件,成本和维护代价更高。可行但不建议
securityCoding

securityCoding      16 小时 32 分钟前 via Android

kafka 不好做,换阿里云 rocketmq 加 tag
kaddusabagei38

kaddusabagei38      15 小时 55 分钟前

建议换队列
urnoob

urnoob      15 小时 47 分钟前

B C D 各自作为一个消费者组。
waitwait365

waitwait365      15 小时 43 分钟前

用 rabbitmq
zgzhang

zgzhang      15 小时 30 分钟前

kafka stream 做个任务来处理
WhereverYouGo

WhereverYouGo      15 小时 22 分钟前

在消息体里定义 business_type: B 、C 、D ,然后引进一个中间层 X ,X 直接消费 A 发送的消息,并根据 business_type 决定调用( HTTP 或 RPC ) B 、C 、D 。(计算机科学中的每个问题都可以用一间接层解决 doge )
WhereverYouGo

WhereverYouGo      15 小时 20 分钟前

但是上述方案有个问题:B 、C 、D 直接接受流量的冲击,没有 MQ 来缓冲,服务可能会被打爆
fkdog

fkdog      15 小时 14 分钟前

明明有现成的高速公路,多建两个 topic 的事,你非得要自己单独再修一条路。我不知道怎么评价你这个需求。。
“为了方便”,请问改成 3 个 topic 不方便在哪里?
awinds

awinds      14 小时 43 分钟前

除非你真的有需求有另外的 E 同时消费所有数据,不然就多个 topic 吧
lower

lower      14 小时 41 分钟前

@WhereverYouGo 感觉问题不大,X 其实已经在 mq 后面了,慢慢一个一个取消息就行
Super8

Super8      14 小时 20 分钟前

可以在消息的 key 或者 value 中添加标识,例如在消息的 key 中添加 B 、C 、D 等标识,表示该消息是发给 B 、C 、D 的,然后在消费者端使用带有过滤条件的消费者来消费消息,只消费自己需要的消息。具体可以使用 Kafka 的 Consumer API 提供的 subscribe 方法中的参数来实现,例如使用 subscribe(Collections.singleton(topic), new MyPartitionAssignor()) 方法,其中 MyPartitionAssignor 实现了 PartitionAssignor 接口,可以根据标识来分配分区。另外,也可以使用 Kafka Streams 来实现消息过滤和分发。
Super8

Super8      14 小时 19 分钟前

rocketmq 中 tag 最适合这个场景
zhaoyy0513

zhaoyy0513      13 小时 22 分钟前

@Super8 我创建的 KafkaConsumer 用到的 api 里面没有这两个参数的方法啊老哥,你说的这个 kafka 是哪个版本的啊
zhaoyy0513

zhaoyy0513      13 小时 11 分钟前

import org.apache.kafka.common.serialization.Serdes;
import org.apache.kafka.streams.KafkaStreams;
import org.apache.kafka.streams.StreamsBuilder;
import org.apache.kafka.streams.StreamsConfig;
import org.apache.kafka.streams.kstream.KStream;
import org.apache.kafka.streams.kstream.Produced;

import java.util.Properties;

public class KafkaStreamsExample {
public static void main(String[] args) {
// 设置 Kafka Streams 属性
Properties props = new Properties();
props.put(StreamsConfig.APPLICATION_ID_CONFIG, "stream-processing-app");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

// 创建 Kafka Streams 实例
StreamsBuilder builder = new StreamsBuilder();
KStream<String, String> stream = builder.stream("topic-A");

// 根据消息的 key 将消息路由到不同的分区中
stream.selectKey((key, value) -> key)
.through("topic-A-shuffle")
.groupByKey()
.foreach((key, value) -> {
// 处理消息
System.out.println("Processed message: " + value);
});

// 将处理后的消息发送到下游服务
stream.mapValues(value -> "processed " + value)
.to("topic-B", Produced.with(Serdes.String(), Serdes.String()));

KafkaStreams kafkaStreams = new KafkaStreams(builder.build(), props);
kafkaStreams.start();
}
}

在上面的代码中,首先使用 selectKey()方法将消息的 key 作为新的 key ,然后使用 through()方法将消息发送到一个新的 Topic 中,这个新的 Topic 会使用 Kafka 默认的分区策略将消息路由到不同的分区中。然后,我们使用 groupByKey()方法将同一个 key 的消息分组,确保每个消费者只消费自己需要的消息。最后,我们使用 foreach()方法处理分组后的消息,并使用 mapValues()方法将处理后的消息发送到下游服务。

需要注意的是,使用分流操作可能会导致数据倾斜(data skew)问题,因为某些 key 的消息可能比其他 key 的消息更频繁,从而导致某些分区比其他分区拥有更多的消息。为了解决这个问题,可以使用一些分区策略(partitioning strategy),例如随机分配、循环分配、哈希分配等。
burymme11

burymme11      13 小时 0 分钟前

可以中间自己加一个路由层。
新建一个中间层 AA ,来监听 topic ,处理上游服务 A 的消息,在 AA 里面,自己写代码做负载均衡,比如根据消息 ID 取模,给 B ,C ,D 分配好不同的 key ,最后所有消息再往新的 NewTopic 里丢。这样 B ,C ,D 就监听 NewTopic 就行,以后要加薪的下游服务,你只要改动 AA 层分发路由的代码就好。
Dlin

Dlin      12 小时 58 分钟前

kafka 的 topic 和 rabbitmq 的 topic 不一样么。
zhaoyy0513

zhaoyy0513      12 小时 57 分钟前

要实现上游系统 A 将消息发送到下游系统 B 、C 、D ,并确保每个下游系统只处理自己需要处理的消息,同时还要确保消息只被消费一次,可以采用以下方案:

使用 Kafka 作为消息中间件,将上游系统 A 发送的消息发布到一个名为"topic-A"的 Kafka 主题中。

在下游系统 B 、C 、D 中,创建三个不同的消费者组,分别为"group-B"、"group-C"、"group-D",并订阅"topic-A"主题。

在消费者端,使用 Kafka 中的消息过滤器来过滤掉不需要的消息,只选择要处理的消息。可以使用 Kafka 中的消息键(key)来实现过滤。例如,下游系统 B 只想处理键(key)为"key-B"的消息,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);

// 订阅"topic-A"主题
consumer.subscribe(Collections.singletonList("topic-A"));

// 消费消息
while (true) {
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(100));
for (ConsumerRecord<String, String> record : records) {
if (record.key().equals("key-B")) {
// 处理消息
}
}
consumer.commitSync();
}
```

为了确保消息只被消费一次,将消费者的 auto.offset.reset 属性设置为"earliest",并启用自动提交偏移量。这将确保消费者在启动时从最早可用的偏移量开始消费,以避免漏掉任何消息,并且将自动提交偏移量以确保每个消息只被消费一次。例如,可以使用以下代码来实现:

java
Copy
// 创建 Kafka 消费者
Properties props = new Properties();
props.put("bootstrap.servers", "localhost:9092");
props.put("group.id", "group-B");
props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
props.put("auto.offset.reset", "earliest");
props.put("enable.auto.commit", "true");
KafkaConsumer<String, String> consumer = new KafkaConsumer<>(props);
```
使用上述方案,上游系统 A 可以将消息发送到"topic-A"主题中,下游系统 B 、C 、D 可以使用 Kafka 消费者订阅该主题,并使用消息过滤器来过滤掉不需要的消息,只选择要处理的消息。自动提交偏移量将确保每个消息只被消费一次。

上面两条回复都是 chatgpt 回复的
PythonYXY

PythonYXY      12 小时 48 分钟前

为什么不建多个 topic 呢,如果下游服务不固定可以做成配置式的啊

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK