7

跟我学RocketMQ之批量消息发送源码解析

 3 years ago
source link: http://wuwenliang.net/2019/08/08/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E6%89%B9%E9%87%8F%E6%B6%88%E6%81%AF%E5%8F%91%E9%80%81%E6%BA%90%E7%A0%81%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

上篇文章 跟我学RocketMQ之消息发送源码解析 中,我们已经对普通消息的发送流程进行了详细的解释,但是由于篇幅问题没有展开讲解批量消息的发送。本文中,我们就一起来集中分析一下批量消息的发送是怎样的逻辑。

DefaultProducer.send

RocketMQ提供了批量发送消息的API,同样在DefaultProducer.java中

@Override
public SendResult send(
    Collection<Message> msgs) throws MQClientException, RemotingException, 
        MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs));
}

它的参数为Message集合,也就是一批消息。它的另外一个重载方法提供了发送超时时间参数

@Override
public SendResult send(Collection<Message> msgs,
    long timeout) throws MQClientException, RemotingException,
         MQBrokerException, InterruptedException {
    return this.defaultMQProducerImpl.send(batch(msgs), timeout);
}

可以看到是将消息通过batch()方法打包为单条消息,我们看一下batch方法的逻辑

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

    // 声明批量消息体
    MessageBatch msgBatch;
    try {

        // 从Message的list生成批量消息体MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        // 设置消息体,此时的消息体已经是处理过后的批量消息体
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    // 设置topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}

从代码可以看到,核心思想是将一批消息(Collection msgs)打包为MessageBatch对象,我们看下MessageBatch的声明

public class MessageBatch extends Message implements Iterable<Message> {

    private final List<Message> messages;

    private MessageBatch(List<Message> messages) {
        this.messages = messages;
    }

可以看到MessageBatch继承自Message,持有List引用。

我们接着看一下generateFromList方法

MessageBatch.generateFromList

public static MessageBatch generateFromList(Collection<Message> messages) {
    assert messages != null;
    assert messages.size() > 0;

    // 首先实例化一个Message的list
    List<Message> messageList = new ArrayList<Message>(messages.size());
    Message first = null;

    // 对messages集合进行遍历
    for (Message message : messages) {

        // 判断延时级别,如果大于0抛出异常,原因为:批量消息发送不支持延时
        if (message.getDelayTimeLevel() > 0) {
            throw new UnsupportedOperationException
                ("TimeDelayLevel in not supported for batching");
        }

        // 判断topic是否以 **"%RETRY%"** 开头,如果是,
        // 则抛出异常,原因为:批量发送消息不支持消息重试
        if (message.getTopic().startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX)) {
            throw new UnsupportedOperationException("Retry Group is not supported for batching");
        }

        // 判断集合中的每个Message的topic与批量发送topic是否一致,
        // 如果不一致则抛出异常,原因为:
        // 批量消息中的每个消息实体的Topic要和批量消息整体的topic保持一致。
        if (first == null) {
            first = message;
        } else {
            if (!first.getTopic().equals(message.getTopic())) {
                throw new UnsupportedOperationException("The topic of the messages in one batch should be the same");
            }

            // 判断批量消息的首个Message与其他的每个Message实体的等待消息存储状态是否相同,
            // 如果不同则报错,原因为:批量消息中每个消息的waitStoreMsgOK状态均应该相同。
            if (first.isWaitStoreMsgOK() != message.isWaitStoreMsgOK()) {
                throw new UnsupportedOperationException("The waitStoreMsgOK of the messages in one batch should the same");
            }
        }

        // 校验通过后,将message实体添加到messageList中
        messageList.add(message);
    }

    // 将处理完成的messageList作为构造方法,
    // 初始化MessageBatch实体,并设置topic以及isWaitStoreMsgOK状态。
    MessageBatch messageBatch = new MessageBatch(messageList);

    messageBatch.setTopic(first.getTopic());
    messageBatch.setWaitStoreMsgOK(first.isWaitStoreMsgOK());
    return messageBatch;
}

总结一下,generateFromList方法对调用方设置的Collection集合进行遍历,经过前置校验之后,转换为MessageBatch对象并返回给DefaultProducer.batch方法中,我们接着看DefaultProducer.batch的逻辑。

到此,通过MessageBatch.generateFromList方法,将发送端传入的一批消息集合转换为了MessageBatch实体。

DefaultProducer.batch

private MessageBatch batch(Collection<Message> msgs) throws MQClientException {

    // 声明批量消息体
    MessageBatch msgBatch;
    try {
        // 从Message的list生成批量消息体MessageBatch
        msgBatch = MessageBatch.generateFromList(msgs);
        for (Message message : msgBatch) {
            Validators.checkMessage(message, this);
            MessageClientIDSetter.setUniqID(message);
            message.setTopic(withNamespace(message.getTopic()));
        }
        // 设置消息体,此时的消息体已经是处理过后的批量消息体
        msgBatch.setBody(msgBatch.encode());
    } catch (Exception e) {
        throw new MQClientException("Failed to initiate the MessageBatch", e);
    }
    // 设置topic
    msgBatch.setTopic(withNamespace(msgBatch.getTopic()));
    return msgBatch;
}

注意下面这行代码:

// 设置消息体,此时的消息体已经是处理过后的批量消息体
msgBatch.setBody(msgBatch.encode());

这里对MessageBatch进行消息编码处理,通过调用MessageBatch的encode方法实现,代码逻辑如下:

public byte[] encode() {
    return MessageDecoder.encodeMessages(messages);
}

可以看到是通过静态方法 encodeMessages(List messages) 实现的。

我们看一下encodeMessages方法的逻辑:

public static byte[] encodeMessages(List<Message> messages) {
    //TO DO refactor, accumulate in one buffer, avoid copies
    List<byte[]> encodedMessages = new ArrayList<byte[]>(messages.size());
    int allSize = 0;
    for (Message message : messages) {

        // 遍历messages集合,分别对每个Message实体进行编码操作,转换为byte[]
        byte[] tmp = encodeMessage(message);
        // 将转换后的单个Message的byte[]设置到encodedMessages中
        encodedMessages.add(tmp);
        // 批量消息的二进制数据长度随实际消息体递增
        allSize += tmp.length;
    }
    byte[] allBytes = new byte[allSize];
    int pos = 0;
    for (byte[] bytes : encodedMessages) {
        // 遍历encodedMessages,按序复制每个Message的二进制格式消息体
        System.arraycopy(bytes, 0, allBytes, pos, bytes.length);
        pos += bytes.length;
    }
    // 返回批量消息整体的消息体二进制数组
    return allBytes;
}

encodeMessages的逻辑在注释中分析的已经比较清楚了,其实就是遍历messages,并按序拼接每个Message实体的二进制数组格式消息体并返回。

我们可以继续看一下单个Message是如何进行编码的,调用了 MessageDecoder.encodeMessage(message) 方法,逻辑如下:

public static byte[] encodeMessage(Message message) {
    //only need flag, body, properties
    byte[] body = message.getBody();
    int bodyLen = body.length;
    String properties = messageProperties2String(message.getProperties());
    byte[] propertiesBytes = properties.getBytes(CHARSET_UTF8);
    //note properties length must not more than Short.MAX
    short propertiesLength = (short) propertiesBytes.length;
    int sysFlag = message.getFlag();
    int storeSize = 4 // 1 TOTALSIZE
        + 4 // 2 MAGICCOD
        + 4 // 3 BODYCRC
        + 4 // 4 FLAG
        + 4 + bodyLen // 4 BODY
        + 2 + propertiesLength;
    ByteBuffer byteBuffer = ByteBuffer.allocate(storeSize);
    // 1 TOTALSIZE
    byteBuffer.putInt(storeSize);

    // 2 MAGICCODE
    byteBuffer.putInt(0);

    // 3 BODYCRC
    byteBuffer.putInt(0);

    // 4 FLAG
    int flag = message.getFlag();
    byteBuffer.putInt(flag);

    // 5 BODY
    byteBuffer.putInt(bodyLen);
    byteBuffer.put(body);

    // 6 properties
    byteBuffer.putShort(propertiesLength);
    byteBuffer.put(propertiesBytes);

    return byteBuffer.array();
}

这里其实就是将消息按照RocektMQ的消息协议进行编码,格式为:

消息总长度          ---  4字节
魔数                --- 4字节
bodyCRC校验码       --- 4字节
flag标识            --- 4字节
body长度            --- 4字节
消息体              --- 消息体实际长度N字节
属性长度            --- 2字节
扩展属性            --- N字节

通过encodeMessage方法处理之后,消息便会被编码为固定格式,最终会被Broker端进行处理并持久化。

到此便是批量消息发送的源码分析,实际上RocketMQ在处理批量消息的时候是将其解析为单个消息再发送的,这样就在底层统一了单条消息、批量消息发送的逻辑,让整个框架的设计更加健壮,也便于我们进行理解学习。

后续的发送流程这里就不再重复展开了,感兴趣的同学可以移步我们的上一篇文章查看

跟我学RocketMQ之消息发送源码解析

批量消息的源码分析就暂时告一段落,更多的源码分析随后奉上,感谢您的阅读。



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK