10

MQ系列5:RocketMQ消息的发送模式 - Hello-Brand

 2 years ago
source link: https://www.cnblogs.com/wzh2010/p/16629876.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

MQ系列1:消息中间件执行原理
MQ系列2:消息中间件的技术选型
MQ系列3:RocketMQ 架构分析
MQ系列4:NameServer 原理解析

在之前的篇章中,我们学习了RocketMQ的原理,以及RocketMQ中 命名服务 ServiceName 的运行流程,本篇从消息的生产、消费来理解一条消息的生命周期。

1 消息生产

在RocketMQ中,消息生产指的是 消息生产者往消息队列中写入数据的过程。因为业务场景的复杂性,RocketMQ架构设计了多种不同的发送策略。下面先讨论几种常见的场景:
-** 同步发送:** 整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。

  • 异步发送: 调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。
  • OneWay(单向)发送: 只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。
  • 延迟发送: 指定延迟的时间,在延迟时间到达之后再进行消息的发送。
  • 批量发送: 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
    以下是生产者实例化启动的过程:
    image

1.1 消息发送步骤

一般情况下,我们发送消息,会使用默认的DefaultMQProducer类,经过以下几个步骤实现:

  • 创建消息生产者Producer,并设置Producer的GroupName(生产组)。
  • 设置InstanceName(实例名称),当你的业务需要启用多个Producer的时候,使用不同的InstanceName来区分。
  • 设置NameServer地址,这样Producer才能从NameServer中得到路由信息
  • 完成其他的初始化配置,比如配置异常重试次数(降低消息丢失的可能性),通信模块初始化等。
  • 组装消息对象,指定主题Topic、Tag和消息体Message 等信息。
  • 通过NameServer获取到的Broker路由地址,将消息发送。

1.2 消息发生返回状态

消息发送之后,会相应的拿到回执。返回对象中的状态(SendResult.SendStatus)有4种,如下:

  • FLUSH_DISK_TIMEOUT 刷盘超时
    如果将Broker的刷盘策略设置成SYNC_FLUSH,那么没有在规定的时间完成刷盘则会报该错误。
  • FLUSH_SLAVE_TIMEOUT 主从同步超时
    主从模式下(也可以叫主备),Broker配置为SYNC_MASTER模式,如果没有在设定时间内完成主从同步,则会报该错误。
  • SLAVE_NOT_AVAILABLE 未找到Slave Broker
    主从模式下,且Broker配置为SYNC_MASTER,如果未找到Slave的Broker,则会报该错误。
  • SEND_OK
    表示发送成功。

1.3 发送同步消息

实时同步消息是一种对可靠性、实时性要求比较高的场景,使用的也比较广泛,比如:

  • 重要的消息通知,比如验证码,不能超过太长时间推送,那样可能失效
  • 消费记录确认
  • 数据实时处理和推送 等等
public class SyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testSyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testSyncGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","sync", "测试同步消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 6、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}
image

1.4 发送异步消息

我们知道,异步主要用于那些对实时响应不敏感的业务,可以容忍一定时间的等待,只要能达到最终一致性即可。
有时候为了在流量高峰期进行削峰和分流,缓解压力,我们经常采用异步消息的发送模式。这种业务场景也很常见,比如:

  • 消费信息的推送,可能在你买单之后的几分钟才送达
  • 数据统计、文件打包下载等需要长耗时的任务
public class AsyncProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testAsyncGroup
        DefaultMQProducer producer = new DefaultMQProducer("testAsyncGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","async", "测试异步消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 6. 发送异步消息,SendCallback是处理异步回调的方法
        producer.send(msg, new SendCallback() {
            @Override
            public void onSuccess(SendResult sendResult) {  // 成功回调
                System.out.println("success: " + sendResult);
            }
            @Override
            public void onException(Throwable throwable) {  // 失败回调
                System.out.println("fail: " + throwable);
            }
        });
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}
image

1.5 单向发送消息

OneWay的模式主要用在Care发送结果的场景,只要消息发送出去即完成任务,不需要对发送的状态、结果负责。常见的使用场景如

  • 普通日志记录
  • 非核心的埋点上报等
public class OneWayProducerApplication {
    public static void main(String[] args) throws Exception {
        // 1、创建生产者producer,并指定生产者组名为 testOneWayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testOneWayGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","oneway", "测试单向发送消息".getBytes("UTF-8"));
        // 5、发送消息到一个Broker
        producer.sendOneway(msg);
        // 6、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}
image

1.6 发送延时消息

指定延迟的时间,在延迟时间到达之后再进行消息的发送。这种的使用场景也很多:

  • 比如火车票订购,提交了一个订单就把车票给占位了,这时候可以发送一个延时确认的消息,15m 未付款,就要把该车票释放,让其他人去购买。
  • 还比如购买了电影票,可以发送一个核销信息,在电影开场前15分钟就无法退票了。

1.6.1 延时时间的使用限制

延时时间并不是随意指定的,Rocket源码中指定了18种等级,分别代表不同的时间时长,如下:

// org/apache/rocketmq/store/config/MessageStoreConfig.java 的第198行
private String messageDelayLevel = "1s 5s 10s 30s 1m 2m 3m 4m 5m 6m 7m 8m 9m 10m 20m 30m 1h 2h";
  • RocketMq不支持任意时间延时,需设置固定的延时等级,从1s到2h分别对应着等级1到18
  • 可以使用setDelayTimeLevel(int level) 方法设置延时等级,level 从 0 开始

1.6.2 发送延时消息具体实现

通过下面的代码,可以得到的结果是消费的时间点比信息记录的时间点延迟了1分钟,这是因为我们在send的时候做了delay。

public class DelayProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException , UnsupportedEncodingException {
        // 1、创建生产者producer,并指定生产者组名为 testDelayGroup
        DefaultMQProducer producer = new DefaultMQProducer("testDelayGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息,并指定Topic,Tag和消息体
        Message msg = new Message("testTopic","delay", "测试延迟发送消息".getBytes("UTF-8"));
        // 5、设置延时等级4,对应1m,所以这个消息在一分钟之后发送
        msg.setDelayTimeLevel(4);
        // 6、发送消息到一个Broker
        SendResult sendResult = producer.send(msg);
        // 7、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 8、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}
image

1.7 发送批量消息

  • 对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
  • 批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。

waitStoreMsgOK: 消息发送时是否等消息存储完成后再返回。

  • 一批次的消息总大小不应超过4MB。
public class BatchProducerApplication {
    public static void main(String[] args) throws InterruptedException, RemotingException, MQClientException, MQBrokerException, UnsupportedEncodingException {
        // 1、创建生产者producer,并指定生产者组名为 testBatchGroup
        DefaultMQProducer producer = new DefaultMQProducer("testBatchGroup");
        // 2、指定NameServer的地址,以获取Broker路由地址
        producer.setNamesrvAddr("192.168.139.1:9876");
        // 3、启动producer
        producer.start();
        // 4、创建消息列表,并指定Topic,Tag和消息体
        List<Message> messages = new ArrayList<>();
        String topic = "testTopic";
        messages.add(new Message(topic, "batch", "测试批量发送消息 0".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "测试批量发送消息 1".getBytes("UTF-8")));
        messages.add(new Message(topic, "batch", "测试批量发送消息 2".getBytes("UTF-8")));

        // 5、发送消息到一个Broker
        SendResult sendResult = producer.send(messages);
        // 6、通过sendResult返回消息是否成功送达
        System.out.printf("%s%n", sendResult);
        // 7、如果不再发送消息,关闭生产者Producer
        producer.shutdown();
    }
}

1.8 如何提升消息生产的性能

消息的发送一般是经过 client发送、Broker服务器接收并处理、Broker服务器返回应答 三个步骤。
如果我们想要提高消息生产的效率,一般有如下方法:

  • Oneway方式发送
    Oneway方式发送用在一些性能要求高,可靠性要求低的场景下,比如日志采集,非核心的埋点上报等。Oneway方式发送请求无需应答,即将数据写入客户端的Socket缓冲区就返回,不等待结果的返回。
    所以这种模式是极快的,可以把发送消息时长缩短至微秒级。
  • 增加Producer的并发量,使用多个Producer同时发送
    RocketMQ引入了一个并发窗口,在窗口内消息可以并发地写入DirectMem中,然后异步地将连续数据写入文件系统。
    顺序执行CommitLog让RocketMQ可以保持较高的写入性能。
  • 恰当的批量发送
    对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。批量发送消息须有相同的topic,相同的waitStoreMsgOK,且不能是延时消息。
    对于消息体的大小也要注意不能超过4MB。

根据阿里内部调优后的性能测试报告,消息的写入性能达到90万+的TPS,我们可以朝着这个指标进行优化。

本篇介绍了RocketMQ 消息生产与发送的几种模式:

  • 同步发送:整个过程业务是阻塞等待的,消息发送之后等待 Broker 响应,得到响应结果之后再传递给业务线程。
  • 异步发送:调用RocketMQ 的 Async API,消息生产者只要把消息发送任务放进线程池就返回给业务线程。所有的逻辑处理、IO操作、网络请求 都由线程池处理,处理完成之后,调用业务程序定义好的回调函数来告知业务最终的结果。
  • OneWay(单向)发送:只负责触发对消息的发送,发送出即完成任务,不需要对发送的状态、结果负责。
  • 延迟发送:指定延迟的时间,在延迟时间到达之后再进行消息的发送。
  • 批量发送:对于同类型、同特征的消息,可以聚合进行批量发送,减少MQ的连接发送次数,能够显著提升性能。
    可以根据实际的业务场景选择适当的发送模式。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK