使用Java客户端发送消息和消费的应用 - 阿里云云起实验室
source link: https://www.cnblogs.com/bainana/p/16481746.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
使用Java客户端发送消息和消费的应用
体验链接:https://developer.aliyun.com/adc/scenario/fb1b72ee956a4068a95228066c3a40d6
本教程将Demo演示使用java客户端发送消息和消费的应用场景
第1节 如何发送和消费并发消息
并发消息,也叫普通消息,是相对顺序消息而言的,普通消息的效率最高。本教程将简单演示如何使用纯java client发送和消费消息。
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
再执行命令, 可以看到正常生产和消费输出
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ConcurrentMessageDemo" -Dexec.classpathScope=runtime
3. Demo代码说明
Demo代码可以查看github。并发消息,意思是生产者可以并发的向topic中发送消息, 消费端不区分顺序的消息,这种模式效率最好。生产者demo代码如下:
最后留一个思考题给大家: 生产者实例和消费者实例, 都是线程安全的吗?
第2节 如何发送和消费顺序消息
顺序消息分为分区有序和全局有序。生产消费代码都是一样的, 区别在于分区有序的topic中queue个数可以是任意有效值,全局有序的topic要求queue的个数为1。顺序消息的实现非常简单易懂,但牺牲了可用性,单节点故障会直接影响顺序消息。
什么是分区有序消息,什么场景应该使用呢,又该如何发送分区有序消息?分区有序表示在一个queue中的消息是有序的,发送消息时设置设置了相同key的消息会被发送到同一个queue中。
本教程将简单演示如何使用纯java client发送和消费顺序消息。
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
再执行命令, 可以看到正常生产和消费输出。 消费输出注意看相同queue id的消息输出内容中的数字,按照从小到大就是正确的。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.OrderMessageDemo1" -Dexec.classpathScope=runtime
3. Demo代码说明
Demo代码可以查看github。
- 生产者说明
生产者会根据设置的keys做hash,相同hash值的消息会发送到相同的queue中。所以相同hash值的消息需要保证在同一个线程中顺序的发送。
- 消费者说明
消费者使用相对比较简单, 消息监听类实现org.apache.rocketmq.client.consumer.listener.MessageListenerOrderly接口即可。相同queue的消息需要串行处理,这样救保证消费的顺序性
第3节 如何发送和消费延迟消息
延迟消息,对于一些特殊场景比如订票后30分钟不支付自动取消等类似场景比较有用。本教程将简单演示如何使用纯java client发送和消费延迟消息。
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
执行命令, 可以看到正常生产和消费输出。 目前RocketMQ支持多种延迟级别, 不过每种延迟级别都是基于RocketMQ自身,实际延迟时间会加上Broker-Client端的网络情况不同而略有差异。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.DelayMessageDemo" -Dexec.classpathScope=runtime
3. Demo代码说明
Demo代码可以查看github。
- 生产者说明
生产者在发送消息的时候需要设置延迟级别,RocketMQ支持多种延迟级别。如果把延迟时间算作一个以空格分割的数组,延迟级别就是延迟时间数组的下标index+1。RocketMQ如何解析延迟级别和延迟时间映射关系。
- 消费者说明: 消费者按照并发消息消费即可
第4节 如何发送和消费事务消息
事务消息,是RocketMQ解决分布式事务的一种实现,极其简单好用。一个事物消息大致的生命周期如下图
概括为如下几个重要点:
-
生产者发送half消息(事物消息)
-
Broker存储half消息
-
生产者处理本地事物,处理成功后commit事物
-
消费者消费到事物消息
本教程将简单演示如何使用纯java client发送和消费事物消息。
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
执行命令, 可以看到事物消息的全部过程。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.TransactionMessageDemo" -Dexec.classpathScope=runtime
3. Demo代码说明
Demo代码可以查看github。在事物消息中,消费代码和普通消息的消费一样,主要代码在生产者端。
生产者端的主要代码包含3个步骤:
- 初始化生产者,设置回调线程池、设置本地事物处理监听类。
这里注意事物消息的生产者类是: org.apache.rocketmq.client.producer.TransactionMQProducer, 而不是普通生产者类。
事物监听类需要实现2个方法,这里的逻辑都是mock的,实际使用的时候需要根据实际修改。
- 发送事物消息。调用sendMessageInTransaction()方法发送事物消息, 而不是以前的send()方法。
第5节 生产者消费者如何同步发送、消费消息(Request-Reply)
request-reply模式,可以满足目前类似RPC同步调用的场景,本教程将简单演示如何使用该模式。
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
执行命令, 可以看到正常生产和消费输出。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.RequestReplyMessageDemo" -Dexec.classpathScope=runtime
通过代码结果和代码比较, 我们得知request-reply类似RPC同步调用的效果。
个人觉得:需要同步调用就用RPC, 不要走MQ,毕竟两者是完全不同的目标的产品,专业的事情交给专业的产品。
3. Demo代码说明
Demo代码可以查看github。
request-reply模式,在生产者和消费者两端都和一般的生产消费有区别,下面分别介绍下demo代码。
生产者demo主要代码, 主要区别在于调用request(),而不是send()方法。
消费者demo主要代码: 消费代码主要增加了“回复”逻辑。回复是利用消息发送直接向生产者发送一条消息。 有点类似事物消息中broker回查生产者。
一个小问题:事物消息和request-reply消息时,生产者的生产者组名有什么要求嘛?
第6节 如何有选择性的消费消息
有时候我们只想消费部分消息, 当然全部消费,在代码中过滤。 假如消息海量时, 会有很多资源浪费,比如浪费不必要的带宽。我们可以通过tag,sql92表达式来选择性的消费。
- 进入broker目录
cd /usr/local/services/5-rocketmq/broker-01
- 编辑配置文件,修改broker配置项2个
vim conf/broker.conf
配置项值:
// 是否支持重试消息也过滤
filterSupportRetry=true
// 支持属性过滤
enablePropertyFilter=true
- 重启broker
./restart.sh
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行tag过滤代码demo
执行命令, 可以看到正常生产和消费输出。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 tag" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
3. 执行sql过滤代码demo
执行命令, 可以看到正常生产和消费输出。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876 sql" -Dexec.mainClass="org.apache.rocketmqdemos.FliterMessageDemo" -Dexec.classpathScope=runtime
4. Demo代码说明
Demo代码可以查看github。以下分别介绍生产者和消费者主要demo代码。
在生产tag消息的时候, 消息中需要加上发送tag;sql92过滤的时候,加上自定义k-v。
tag过滤消费时,在订阅topic时, 也添加上tag订阅
SQL92过滤时,添加上SQL过滤订阅。至于SQL92除了等号,还是支持什么,大家可以自行自行查看或者到群里问。
第7节 如何使用ACL客户端生产消费消息
ACL,全称是Access Control List,是RocketMQ设计来做访问和权限控制的。更多文档参见github wiki:https://github.com/apache/rocketmq/wiki/RIP-5-RocketMQ-ACL
0. 启动一个集群
- 进入broker目录
cd /usr/local/services/5-rocketmq/broker-01
- 编辑配置文件,修改broker配置项1个
vim conf/broker.conf
配置项值:
aclEnable=true
- 重启broker
./restart.sh
1. 下载java代码demo(已下载则忽略操作)
cd /data/demos
git clone https://github.com/ApacheRocketMQ/06-all-java-demos.git
2. 打包,执行代码demo
执行命令, 可以看到正常生产和消费输出。 demo代码使用的admin权限发送和消费,实际使用需要对于每个topic,消费者组授权,才能正常生产消费。
// 进入demo代码目录
cd /data/demos/06-all-java-demos/
// 打包
mvn clean package
// 运行代码
mvn exec:java -Dexec.args="127.0.0.1:39876" -Dexec.mainClass="org.apache.rocketmqdemos.ACLDemo" -Dexec.classpathScope=runtime
3. Demo代码说明
Demo代码可以查看github。带ACL的生产者和消费者在初始化的时候,都必须给一个hook实例,构建方法如下:
static RPCHook getAclRPCHook(String accessKey, String secretKey) {
return new AclClientRPCHook(new SessionCredentials(accessKey, secretKey));
}
在broker端secret key用来校验信息的完整性, access key用来校验用户权限。二者缺一不可。
Recommend
-
117
在微信群里面,“刷屏”的行为是被谴责的。那如果小程硬是要做到“刷屏”的话,有什么办法吗?显然,靠快速打字是不现实的,用程序来实现才靠谱。本文介绍如何用程序发送消息到微信群。参考网页微信的功能,很自然的一个想法就是调用网页微信的接口来实现这个功能。那...
-
62
-
45
Swoole可以执行异步操作,因此在web页面将请求提交给Swoole处理后,不用等待返回结果,页面也不会卡顿。Swoole在后台将耗时长的操作进行异步处理,从而改善用户体验,例如本节要给大家讲解的Swoole处理邮件。
-
13
Error 521 Ray ID: 5ff1bca8de430a58 • 2020-12-09 21:14:39 UTC Web server is down ...
-
8
netty 服务端主动向客户端发送消息后, 怎么接收返回 ...
-
14
记 Microsoft 365 E5 Developer 使用 Thunderbird 客户端发送邮件的问题January 16, 2021解决方法看上去很简单。 推荐友人使用 E5 Developer 之后,友人绑定了自己的域名上去——然后想使用 Thunderbird 客户端发信。但客户端永远会提醒
-
4
你好呀~欢迎来到“安全头条”!如果你是第一次光顾,可以先阅读站内公告了解我们哦。 【安全头条公告】 安全头条主要发布时下最火热最新鲜的网安资讯,不同于正儿八经的权威消息。本站的资讯内容可能会更富趣味性、讨论性,工作...
-
8
消息队列应用场景&&ActiveMQ消息发送失败的处理方案 今天我们来介绍一下ActiveMQ消息队列消息发送失败的处理方案。 在介绍今天的内容之前,首先我们来探讨一下为什么要用MQ。 企业中系统为什么要...
-
5
socket.io使用示例–向指定客户端发消息 | Lenix Blog 安装 node 创建一个目录例如socketio 在目录下执行 npm install socket.io npm install red...
-
5
Sorry, you have been blocked You are unable to access azhuge233.com Why have I been blocked? This website is...
About Joyk
Aggregate valuable and interesting links.
Joyk means Joy of geeK