200 行代码告诉你 TDMQ 中 Pulsar 广播如何实现
source link: https://my.oschina.net/u/4587289/blog/5006986
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
导读
Pulsar 作为 Apache 社区的相对新的成员,在业界受到非常大量的关注。新产品的文档相对不齐全也是非常能够理解的。今天客户问过来广播怎么实现的,我解释了半天,又找了很多介绍产品的 PPT,最终也没有找到“官方”的文档说明这个事情。于是我就写了这篇文章,方便大家 copy/paste 。
作者介绍
徐为
腾讯云微服务团队高级解决方案构架师
毕业于欧盟 Erasmus Mundus IMMIT,获得经济和IT管理硕士学位
自2006年以来,曾就职于SonyEricsson、SAP等多家公司,历任软件开发工程师,数据开发工程师,解决方案架构师
Pulsar订阅模型分类
Pulsar 原文支持的几种模式如下,依次是 独占模式 / 高可用模式 / 分享模式 / 基于键值 的分享模式。
如果这几个模式还没有理解的,可以去官网先看一下,我个人觉得看过应该是可以理解的:
https://pulsar.apache.org/docs/en/concepts-messaging/#subscriptions
Pulsar 广播模式
Pulsar 的订阅模式和很多 MQ 不太一样。比如 RabbitMQ/Kafka 等,一般消费端(Consumer)是直接去对接 Topic 的,然后 Consumer 自己又有个组的概念在配置中心去设置 offset,以此来决定是一起分享 Topic 的数据,还是每个人都接收同样的数据。在 Pulsar 的消费订阅模型里,添加了一个 Subscription 的逻辑,Subscription 的 Type 决定了消费是独享还是分享。
于是广播模式可以用不同 Subscription 独享的模式来实现,具体架构可以参照下图:
代码实现
1. Full-mesh 的形创建 Java 项目(比如:Springboot - 这个应该是相对简单的 IDE 集成开发组件)
画重点
- pulsar-client-api 和 tdmq-client 需要2.6.0
- tdmq-client 需要在腾讯的repo里才能拿到,需要使用介绍链接介绍的方式进行maven的配置(gradle方法类似)
介绍链接:https://cloud.tencent.com/document/product/1179/44914
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 https://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<parent>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-parent</artifactId>
<version>2.4.3</version>
<relativePath /> <!-- lookup parent from repository -->
</parent>
<groupId>com.examble.demo</groupId>
<artifactId>tdmq-demo</artifactId>
<version>0.0.1-SNAPSHOT</version>
<name>tdmq-demo</name>
<description>demo project to test tdmq</description>
<properties>
<java.version>1.8</java.version>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>com.tencent.tdmq</groupId>
<artifactId>tdmq-client</artifactId>
<version>2.6.0</version>
</dependency>
<!-- https://mvnrepository.com/artifact/org.apache.pulsar/pulsar-client-api -->
<dependency>
<groupId>org.apache.pulsar</groupId>
<artifactId>pulsar-client-api</artifactId>
<version>2.6.0</version>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-test</artifactId>
<scope>test</scope>
</dependency>
</dependencies>
<build>
<plugins>
<plugin>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-maven-plugin</artifactId>
</plugin>
</plugins>
</build>
</project>
2. 创建一个 Component 用来全局使用 Producer 和 Consumers
这里创建了1个 Producer 和3个拥有 exclusive subscription 的 consumers(广播模式 - 我们期待他们3个每次都收到一样的信息)
package com.example.demo.tdmq.instance;
import javax.annotation.PostConstruct;
import org.apache.pulsar.client.api.AuthenticationFactory;
import org.apache.pulsar.client.api.Consumer;
import org.apache.pulsar.client.api.Message;
import org.apache.pulsar.client.api.MessageListener;
import org.apache.pulsar.client.api.Producer;
import org.apache.pulsar.client.api.PulsarClient;
import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.SubscriptionType;
import org.springframework.beans.factory.config.ConfigurableBeanFactory;
import org.springframework.context.annotation.Scope;
import org.springframework.stereotype.Component;
@Component
@Scope(ConfigurableBeanFactory.SCOPE_SINGLETON)
public class Global {
PulsarClient client;
public Producer<byte[]> producer;
public Consumer<byte[]> consumer01;
public Consumer<byte[]> consumer02;
public Consumer<byte[]> consumer03;
public Global() {
}
@PostConstruct
public void init() {
try {
client = PulsarClient.builder().serviceUrl("pulsar://<Your TDMQ Pulsar Service URL>:6000/")
.listenerName("custom:<TDMQ Pulsar Instance ID>/<TDMQ VPC ID>/<TDMQ Subnet ID>")
.authentication(AuthenticationFactory.token(
"<Your Credential Token from TDMQ>"))
.build();
producer = client.newProducer().topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>").create();
consumer01 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)
.topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")
.messageListener(new MessageListener<byte[]>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
System.out.println("Consumer01" + " - " + System.currentTimeMillis() + " - "
+ new String(msg.getData()));
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).subscriptionName("my-subscription01").subscribe();
consumer02 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)
.topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")
.messageListener(new MessageListener<byte[]>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
System.out.println("Consumer02" + " - " + System.currentTimeMillis() + " - "
+ new String(msg.getData()));
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).subscriptionName("my-subscription02").subscribe();
consumer03 = client.newConsumer().subscriptionType(SubscriptionType.Exclusive)
.topic("persistent://<TDMQ Pulsar Instance ID>/<your name space>/<your topic>")
.messageListener(new MessageListener<byte[]>() {
/**
*
*/
private static final long serialVersionUID = 1L;
@Override
public void received(Consumer<byte[]> consumer, Message<byte[]> msg) {
System.out.println("Consumer03" + " - " + System.currentTimeMillis() + " - "
+ new String(msg.getData()));
try {
consumer.acknowledge(msg);
} catch (PulsarClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}).subscriptionName("my-subscription03").subscribe();
} catch (PulsarClientException e) {
// TODO Auto-generated catch block
e.printStackTrace();
}
}
}
3. 最外层的测试代码和简单的 Message 模型
public class MessageModel {
private String messageText = null;
public String getMessageText() {
return messageText;
}
public void setMessageText(String messageText) {
this.messageText = messageText;
}
}
跑起来测试一下,果然3个一起接收一样的消息
话不多说,赶紧跑起来玩玩吧!
有相关需求的读者欢迎留言告诉我们你的想法!
往期
扫描下方二维码关注本公众号,
了解更多微服务、消息队列的相关信息!
解锁超多鹅厂周边!
Recommend
-
92
这篇文章介绍了Pulsar仅通过一套API实现了灵活的消息模型——队列和发布订阅,还介绍了企业级的多租户特性、Geo复制和强持久性保证。
-
8
首页新闻
-
12
本文作者为 jesse-anderson。内容由 StreamNative 翻译并整理。 本文以三个实际使用场景为例,从 CTO 的视角出发,在技术等方面对比 Kafka 和 Pulsar。 阅读本文需要大约 8 分钟。 关于 Apache Puls...
-
3
实现可扩展的流处理:Pulsar Key_Shared 订阅模式 本文翻译自 StreamNative 博客《Scalable Stream Processing with Pulsar’s Key_Shared Subscription》[1],作者:David Kjerrumgaard。 译者:刘梓霖、段嘉 ...
-
5
V2EX › 程序员 计网自顶向下 实现广播的 RPF 的 冗余分组的疑问? amiwrong123
-
5
编辑导语:国内的饮品行业向来被几大巨头长期垄据,多少品牌想要打破被垄断的局面,拼出一席之地,但都被扼杀于初期之中。元气森林的成功引起了业内的广大关注,本文将从多个角度来分析,元气森林是如何实现增长的。
-
7
一篇文章告诉你JavaScript 如何实现继承-51CTO.COM 一篇文章告诉你JavaScript 如何实现继承 作者:·Python进阶学习交流 2022-02-18 00:13:53 JavaScript 在编程语言界是个特殊种类,它和其他编程语言很不...
-
2
源码级别的广播与监听实现近期疫情形势严峻,情形不容乐观,周末也不敢出去浪了,躲在家里“葛优...
-
3
我看过那么多所谓的教程,大部分都是教“如何使用工具”的,没有多少是教“如何制作工具”的,能教“如何仿制工具”的都已经是凤毛麟角,中国 软件行业,缺的是真正可以“制作工具”的程序员,而绝对不缺那些“使用工具”的程序员!...... ”这个业界最不需要的就是“会使...
-
5
泛微齐业成,一文告诉你如何实现全程数字化的预算管理 2022-12-28
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK