9

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战

 8 months ago
source link: https://blog.51cto.com/u_16186397/9098774
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战

精选 原创

11来了 2024-01-04 12:04:27 ©著作权

文章标签 配置文件 System 发送消息 文章分类 Java 后端开发

设置消息追踪需要修改 broker 启动的配置文件,添加一行配置:traceTopicEnable=true 即可,操作如下:

# 进入到 rocketmq 的安装目录中
# 先复制一份配置文件
cp broker.conf custom.conf
# 在自定义配置文件中添加一行配置
vi custom.conf
## 添加配置
traceTopicEnable=true
# 杀死原来的 broker 进程,再重新启动即可
# 先查看原来 broker 进程 id
jps 
# 杀死 broker
kill -9 [进程id]
# 重新启动 broker,并指定配置文件
nohup sh bin/mqbroker -c conf/custom.conf ‐n localhost:9876 & autoCreateTopicEnable=true

在发送消息的时候,指定消息的 keys 就可以在 DashBoard 中观看到消息的追踪记录了

public class GlobalProducer {

    public static void main(String[] args) throws Exception {
        // true 即设置允许消息追踪
        DefaultMQProducer producer = new DefaultMQProducer(
                "producer_group",
                true);
        producer.setNamesrvAddr("127.0.0.1:9876");
        producer.start();

        for (int i = 0; i < 12; i++) {
            Message msg = new Message(
                    "Global-Orderly-Topic",
                    "Global_Orderly_Tag",
                    ("( " + i + " )message from GlobalProducer").getBytes());
            // 设置消息的 keys
            msg.setKeys("Global_Orderly_Tag");
            producer.send(msg);
        }
        System.out.println("Send Finished.");
    }
}
RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_发送消息

之后就可以在 DashBoard 中查看消息的追踪记录了:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_配置文件_02

点击进去,查看消息追踪详细信息如下:

RocketMQ系统性学习-SpringCloud Alibaba集成RocketMQ以及消息追踪、延时消息实战_System_03

延时消息实战

上边的案例使用了 SpringCloudStream 的 API 进行消息的收发,这里使用原生 API 进行消息收发实战,通过设置消息的延时时间,可以让消息等待指定时间之后再发送

5.x 之前,只能设置固定时间的延时消息

5.x 之后,可以自定义任意时间的延时消息

由于这里引入的 SpringCloudAlibaba 整合的 RocketMQ 是 4.9.4 版本的,因此只能设置固定时间的延时消息

延时时间有以下几种,通过 Leven 进行定位,如果 delayTimeLevel = 2,就是第二个延时时间 5s

1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h

消费者代码如下:

public class Consumer {

    public static void main(String[] args) throws Exception {
        // 1、创建消费者对象
        DefaultMQPushConsumer consumer = new DefaultMQPushConsumer("delay_group");

        // 2、为消费者对象设置 NameServer 地址
        consumer.setNamesrvAddr("127.0.0.1:9876");

        // 3、订阅主题
        consumer.subscribe("custom-delay-topic", "*");

        // 4、注册监听消息,并打印消息
        consumer.registerMessageListener(new MessageListenerConcurrently() {
            @Override
            public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                            ConsumeConcurrentlyContext context) {
                for (MessageExt msg : msgs) {
                    String printMsg = new String(msg.getBody()) + ", recvTime: "
                            + new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date());
                    System.out.println(printMsg);
                }

                return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
            }
        });

        // 5、把消费者直接启动起来
        consumer.start();
    }
}

生产者代码如下:

public class Producer {

    public static void main(String[] args) throws Exception {
        // 1、创建生产者对象
        DefaultMQProducer producer = new DefaultMQProducer("producer_group");

        // 2、为生产者对象设置 NameServer 地址
        producer.setNamesrvAddr("127.0.0.1:9876");

        // 3、把我们的生产者直接启动起来
        producer.start();

        // 4、创建消息、并发送消息
        for (int i = 0; i < 3; i++) {
            // public Message(String topic, String tags, String keys, byte[] body) {
            Message message = new Message(
                    "custom-delay-topic",
                    "delayTag",
                    "CUSTOM_DELAY",
                    ("("+i+")Hello Message From Delay Producer, " +
                            "date="+new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS").format(new Date())).getBytes()
            );
            // 设置定时的逻辑
            // "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
            message.setDelayTimeLevel(2);
            // 利用生产者对象,将消息直接发送出去
            producer.send(message);
        }
        System.out.println("Send Finished.");
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK