6

【RocketMQ】MQ消息发送 - shanml

 2 years ago
source link: https://www.cnblogs.com/shanml/p/16387192.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【RocketMQ】MQ消息发送

首先来看一个RcoketMQ发送消息的例子:

@Service
public class MQService {

    @Autowired
    DefaultMQProducer defaultMQProducer;

    public void sendMsg() {
        String msg = "我是一条消息";
        // 创建消息,指定TOPIC、TAG和消息内容
        Message sendMsg = new Message("TestTopic", "TestTag", msg.getBytes());
        SendResult sendResult = null;
        try {
            // 同步发送消息
            sendResult = defaultMQProducer.send(sendMsg);
            System.out.println("消息发送响应:" + sendResult.toString());
        } catch (MQClientException e) {
            e.printStackTrace();
        } catch (RemotingException e) {
            e.printStackTrace();
        } catch (MQBrokerException e) {
            e.printStackTrace();
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
    }
}

RocketMQ是通过DefaultMQProducer进行消息发送的,它实现了MQProducer接口,MQProducer接口中定义了消息发送的方法,方法主要分为三大类:

  1. 同步进行消息发送,向Broker发送消息之后等待响应结果
  2. 异步进行消息发送,向Broker发送消息之后立刻返回,当消息发送完毕之后触发回调函数
  3. sendOneway单向发送,也是异步消息发送,向Broker发送消息之后立刻返回,但是没有回调函数
public interface MQProducer extends MQAdmin {

    // 同步发送消息
    SendResult send(final Message msg) throws MQClientException, RemotingException, MQBrokerException,
        InterruptedException;
        
    // 异步发送消息,SendCallback为回调函数
    void send(final Message msg, final SendCallback sendCallback) throws MQClientException,
        RemotingException, InterruptedException;
    
    // 异步发送消息,没有回调函数
    void sendOneway(final Message msg) throws MQClientException, RemotingException,
        InterruptedException;
    
    // 省略其他方法
}

接下来以将以同步消息发送为例来分析消息发送的流程。

DefaultMQProducer里面有一个DefaultMQProducerImpl类型的成员变量defaultMQProducerImpl,从默认的无参构造函数中可以看出在构造函数中对defaultMQProducerImpl进行了实例化,在send方法中就是调用defaultMQProducerImpl的方法进行消息发送的:

public class DefaultMQProducer extends ClientConfig implements MQProducer {

    /**
     * 默认消息生产者实现类
     */
    protected final transient DefaultMQProducerImpl defaultMQProducerImpl;
    
    /**
     * 默认的构造函数
     */
    public DefaultMQProducer() {
        this(null, MixAll.DEFAULT_PRODUCER_GROUP, null);
    }
    /**
     * 构造函数
     */
    public DefaultMQProducer(final String namespace, final String producerGroup, RPCHook rpcHook) {
        this.namespace = namespace;
        this.producerGroup = producerGroup;
        // 实例化
        defaultMQProducerImpl = new DefaultMQProducerImpl(this, rpcHook);
    }
  
    /**
     * 同步发送消息
     */
    @Override
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 设置主题
        msg.setTopic(withNamespace(msg.getTopic()));
        // 发送消息
        return this.defaultMQProducerImpl.send(msg);
    }
}

DefaultMQProducerImpl中消息的发送在sendDefaultImpl方法中实现,处理逻辑如下:

  1. 根据设置的主题查找对应的路由信息TopicPublishInfo
  2. 获取失败重试次数,在消息发送失败时进行重试
  3. 获取上一次选择的消息队列所在的Broker,如果上次选择的Broker为空则为NULL,然后调用selectOneMessageQueue方法选择一个消息队列,并记录本次选择的消息队列,在下一次发送消息时选择队列时使用
  4. 计算选择消息队列的耗时,如果大于超时时间,终止本次发送
  5. 调用sendKernelImpl方法进行消息发送
  6. 调用updateFaultItem记录向Broker发送消息的耗时,在开启故障延迟处理机制时使用
public class DefaultMQProducerImpl implements MQProducerInner {
    /**
     * DEFAULT SYNC -------------------------------------------------------
     */
    public SendResult send(
        Message msg) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息
        return send(msg, this.defaultMQProducer.getSendMsgTimeout());
    }
    
    public SendResult send(Message msg,
        long timeout) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        // 发送消息
        return this.sendDefaultImpl(msg, CommunicationMode.SYNC, null, timeout);
    }
    
    /**
     * 发送消息
     * @param msg 发送的消息
     * @param communicationMode
     * @param sendCallback 回调函数
     * @param timeout 超时时间
     */
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
        this.makeSureStateOK();
        Validators.checkMessage(msg, this.defaultMQProducer);
        final long invokeID = random.nextLong();
        // 开始时间
        long beginTimestampFirst = System.currentTimeMillis();
        long beginTimestampPrev = beginTimestampFirst;
        long endTimestamp = beginTimestampFirst;
        //  查找主题路由信息
        TopicPublishInfo topicPublishInfo = this.tryToFindTopicPublishInfo(msg.getTopic());
        if (topicPublishInfo != null && topicPublishInfo.ok()) {
            boolean callTimeout = false;
            // 消息队列
            MessageQueue mq = null;
            Exception exception = null;
            SendResult sendResult = null;
            // 获取失败重试次数
            int timesTotal = communicationMode == CommunicationMode.SYNC ? 1 + this.defaultMQProducer.getRetryTimesWhenSendFailed() : 1;
            int times = 0;
            String[] brokersSent = new String[timesTotal];
            for (; times < timesTotal; times++) {
                // 获取BrokerName
                String lastBrokerName = null == mq ? null : mq.getBrokerName();
                // 根据BrokerName选择一个消息队列
                MessageQueue mqSelected = this.selectOneMessageQueue(topicPublishInfo, lastBrokerName);
                if (mqSelected != null) {
                    // 记录本次选择的消息队列
                    mq = mqSelected;
                    brokersSent[times] = mq.getBrokerName();
                    try {
                        // 记录时间
                        beginTimestampPrev = System.currentTimeMillis();
                        if (times > 0) {
                            //Reset topic with namespace during resend.
                            msg.setTopic(this.defaultMQProducer.withNamespace(msg.getTopic()));
                        }
                        // 计算选择消息队列的耗时时间
                        long costTime = beginTimestampPrev - beginTimestampFirst;
                        // 如果已经超时,终止发送
                        if (timeout < costTime) {
                            callTimeout = true;
                            break;
                        }
                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        // 结束时间
                        endTimestamp = System.currentTimeMillis();
                        // 记录向Broker发送消息的请求耗时,消息发送结束时间 - 开始时间
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        switch (communicationMode) {
                            case ASYNC:
                                return null;
                            case ONEWAY:
                                return null;
                            case SYNC:
                                // 如果发送失败
                                if (sendResult.getSendStatus() != SendStatus.SEND_OK) {
                                    // 是否重试
                                    if (this.defaultMQProducer.isRetryAnotherBrokerWhenNotStoreOK()) {
                                        continue;
                                    }
                                }
                                // 返回结果
                                return sendResult;
                            default:
                                break;
                        }
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        // 如果抛出异常,记录请求耗时
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    }
                    // ... 省略其他异常处理
                } else {
                    break;
                }
            }

            if (sendResult != null) {
                return sendResult;
            }
            // ...
        }

        validateNameServerSetting();

        throw new MQClientException("No route info of this topic: " + msg.getTopic() + FAQUrl.suggestTodo(FAQUrl.NO_TOPIC_ROUTE_INFO),
            null).setResponseCode(ClientErrorCode.NOT_FOUND_TOPIC_EXCEPTION);
    }

}

获取路由信息

DefaultMQProducerImpl中有一个路由信息表topicPublishInfoTable,记录了主题对应的路由信息,其中KEY为topic, value为对应的路由信息对象TopicPublishInfo:

public class DefaultMQProducerImpl implements MQProducerInner {
    
    // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
    private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
}

主题路由信息

TopicPublishInfo中记录了主题所在的消息队列信息、所在Broker等信息:

messageQueueList:一个MessageQueue类型的消息队列列表,MessageQueue中记录了主题名称、主题所属的Broker名称和队列ID

sendWhichQueue:计数器,选择消息队列的时候增1,以此达到轮询的目的

topicRouteData:从NameServer查询到的主题对应的路由数据,包含了队列和Broker的相关数据

public class TopicPublishInfo {
  
    // 消息队列列表
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); 
  
    // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); 
    
    // 主题路由数据
    private TopicRouteData topicRouteData;
    
    // ...
}

// 消息队列
public class MessageQueue implements Comparable<MessageQueue>, Serializable {
    private static final long serialVersionUID = 6191200464116433425L;
    private String topic; // 主题
    private String brokerName; // 所属Broker名称
    private int queueId; // 队列ID
    // ...
}

// 主题路由数据
public class TopicRouteData extends RemotingSerializable {
  
    private List<QueueData> queueDatas; // 队列数据列表
    private List<BrokerData> brokerDatas; // Broker信息列表
    // ...
}

// 队列数据
public class QueueData implements Comparable<QueueData> {
    private String brokerName; // Broker名称
    private int readQueueNums; // 可读队列数量
    private int writeQueueNums; // 可写队列数量
    private int perm;
    private int topicSysFlag;
}

// Broker数据
public class BrokerData implements Comparable<BrokerData> {
    private String cluster; // 集群名称
    private String brokerName; // Broker名称
    private HashMap<Long, String> brokerAddrs; // Broker地址集合,KEY为Broker ID, value为Broker 地址
    // ...
}

查找路由信息

在查找主题路由信息的时候首先从DefaultMQProducerImpl缓存的路由表topicPublishInfoTable中根据主题查找路由信息,如果查询成功返回即可,如果未查询到,需要从NameServer中获取路由信息,如果获取失败,则使用默认的主题路由信息:

public class DefaultMQProducerImpl implements MQProducerInner {
    
    // 路由信息表,KEY为topic, value为对应的路由信息对象TopicPublishInfo
    private final ConcurrentMap<String, TopicPublishInfo> topicPublishInfoTable =
        new ConcurrentHashMap<String, TopicPublishInfo>();
    
    /**
     * 根据主题查找路由信息
     * @param topic 主题
     * @return
     */
    private TopicPublishInfo tryToFindTopicPublishInfo(final String topic) {
        // 根据主题获取对应的主题路由信息
        TopicPublishInfo topicPublishInfo = this.topicPublishInfoTable.get(topic);
        // 如果未获取到
        if (null == topicPublishInfo || !topicPublishInfo.ok()) {
            this.topicPublishInfoTable.putIfAbsent(topic, new TopicPublishInfo());
            // 从NameServer中查询路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
        }
        // 如果路由信息获取成功
        if (topicPublishInfo.isHaveTopicRouterInfo() || topicPublishInfo.ok()) {
            // 返回路由信息
            return topicPublishInfo;
        } else {
            // 如果路由信息未获取成功,使用默认主题查询路由信息
            this.mQClientFactory.updateTopicRouteInfoFromNameServer(topic, true, this.defaultMQProducer);
            topicPublishInfo = this.topicPublishInfoTable.get(topic);
            // 返回路由信息
            return topicPublishInfo;
        }
    }
}

从NameServer获取主题路由信息

从NameServer获取主题路由信息数据是在MQClientInstance中的updateTopicRouteInfoFromNameServer方法中实现的:

  1. 判断是否使用默认的主题路由信息,如果是则获取默认的路由信息
  2. 如果不使用默认的路由信息,则从NameServer根据Topic查询取路由信息
  3. 获取到的主题路由信息被封装为TopicRouteData类型的对象返回
  4. topicRouteTable主题路由表中根据主题获取旧的路由信息,与新的对比,判断信息是否发生了变化,如果发送了变化需要更新brokerAddrTable中记录的数据
  5. 将新的路由信息对象加入到路由表topicRouteTable中,替换掉旧的信息
public class MQClientInstance {
    public boolean updateTopicRouteInfoFromNameServer(final String topic) {
        // 从NameServer更新路由信息
        return updateTopicRouteInfoFromNameServer(topic, false, null);
    }
    
   /**
     * 从NameServer更新路由信息
     * @param topic 主题
     * @param isDefault 是否使用默认的主题
     * @param defaultMQProducer 默认消息生产者
     * @return
     */
    public boolean updateTopicRouteInfoFromNameServer(final String topic, boolean isDefault,
        DefaultMQProducer defaultMQProducer) {
        try {
            if (this.lockNamesrv.tryLock(LOCK_TIMEOUT_MILLIS, TimeUnit.MILLISECONDS)) {
                try {
                    TopicRouteData topicRouteData;
                    // 是否使用默认的路由信息
                    if (isDefault && defaultMQProducer != null) {
                        // 使用默认的主题路由信息
                        topicRouteData = this.mQClientAPIImpl.getDefaultTopicRouteInfoFromNameServer(defaultMQProducer.getCreateTopicKey(),
                            clientConfig.getMqClientApiTimeout());
                        if (topicRouteData != null) {
                            for (QueueData data : topicRouteData.getQueueDatas()) {
                                int queueNums = Math.min(defaultMQProducer.getDefaultTopicQueueNums(), data.getReadQueueNums());
                                data.setReadQueueNums(queueNums); // 设置可读队列数量
                                data.setWriteQueueNums(queueNums); // 设置可写队列数量
                            }
                        }
                    } else {
                        // 从NameServer获取路由信息
                        topicRouteData = this.mQClientAPIImpl.getTopicRouteInfoFromNameServer(topic, clientConfig.getMqClientApiTimeout());
                    }
                    // 如果路由信息不为空
                    if (topicRouteData != null) {
                        // 从路由表中获取旧的路由信息
                        TopicRouteData old = this.topicRouteTable.get(topic);
                        // 判断路由信息是否发生变化
                        boolean changed = topicRouteDataIsChange(old, topicRouteData);
                        if (!changed) {
                            // 是否需要更新路由信息
                            changed = this.isNeedUpdateTopicRouteInfo(topic);
                        } else {
                            log.info("the topic[{}] route info changed, old[{}] ,new[{}]", topic, old, topicRouteData);
                        }
                        // 如果数据发生变化
                        if (changed) {
                            // 克隆一份新的路由信息
                            TopicRouteData cloneTopicRouteData = topicRouteData.cloneTopicRouteData();
                            // 处理brokerAddrTable中的数据
                            for (BrokerData bd : topicRouteData.getBrokerDatas()) {
                                // 更新brokerAddrTable中的数据
                                this.brokerAddrTable.put(bd.getBrokerName(), bd.getBrokerAddrs());
                            }
                            
                            // ...
                            
                            log.info("topicRouteTable.put. Topic = {}, TopicRouteData[{}]", topic, cloneTopicRouteData);
                            // 将新的路由信息加入到路由表
                            this.topicRouteTable.put(topic, cloneTopicRouteData);
                            return true;
                        }
                    } else {
                        log.warn("updateTopicRouteInfoFromNameServer, getTopicRouteInfoFromNameServer return null, Topic: {}. [{}]", topic, this.clientId);
                    }
                } catch (MQClientException e) {
                    if (!topic.startsWith(MixAll.RETRY_GROUP_TOPIC_PREFIX) && !topic.equals(TopicValidator.AUTO_CREATE_TOPIC_KEY_TOPIC)) {
                        log.warn("updateTopicRouteInfoFromNameServer Exception", e);
                    }
                } catch (RemotingException e) {
                    log.error("updateTopicRouteInfoFromNameServer Exception", e);
                    throw new IllegalStateException(e);
                } finally {
                    this.lockNamesrv.unlock();
                }
            } else {
                log.warn("updateTopicRouteInfoFromNameServer tryLock timeout {}ms. [{}]", LOCK_TIMEOUT_MILLIS, this.clientId);
            }
        } catch (InterruptedException e) {
            log.warn("updateTopicRouteInfoFromNameServer Exception", e);
        }

        return false;
    }   
}

向NameServer发送请求的代码实现在MQClientAPIImplgetTopicRouteInfoFromNameServer方法中,可以看到构建了请求命令RemotingCommand并设置请求类型为RequestCode.GET_ROUTEINFO_BY_TOPIC,表示从NameServer获取路由信息,之后通过Netty向NameServer发送请求,并解析返回结果:

public class MQClientAPIImpl {
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis)
        throws RemotingException, MQClientException, InterruptedException {
        // 从NameServer获取路由信息
        return getTopicRouteInfoFromNameServer(topic, timeoutMillis, true);
    }
    
    /**
     * 从NameServer获取路由信息
     * @param topic
     * @param timeoutMillis
     * @param allowTopicNotExist
     * @return
     * @throws MQClientException
     * @throws InterruptedException
     * @throws RemotingTimeoutException
     * @throws RemotingSendRequestException
     * @throws RemotingConnectException
     */
    public TopicRouteData getTopicRouteInfoFromNameServer(final String topic, final long timeoutMillis,
        boolean allowTopicNotExist) throws MQClientException, InterruptedException, RemotingTimeoutException, RemotingSendRequestException, RemotingConnectException {
        GetRouteInfoRequestHeader requestHeader = new GetRouteInfoRequestHeader();
        requestHeader.setTopic(topic);
        // 创建请求命令,请求类型为获取主题路由信息GET_ROUTEINFO_BY_TOPIC
        RemotingCommand request = RemotingCommand.createRequestCommand(RequestCode.GET_ROUTEINFO_BY_TOPIC, requestHeader);
        // 发送请求
        RemotingCommand response = this.remotingClient.invokeSync(null, request, timeoutMillis);
        assert response != null;
        switch (response.getCode()) {
            // 如果主题不存在
            case ResponseCode.TOPIC_NOT_EXIST: {
                if (allowTopicNotExist) {
                    log.warn("get Topic [{}] RouteInfoFromNameServer is not exist value", topic);
                }

                break;
            }
            // 如果请求发送成功
            case ResponseCode.SUCCESS: {
                byte[] body = response.getBody();
                // 返回获取的路由信息
                if (body != null) {
                    return TopicRouteData.decode(body, TopicRouteData.class);
                }
            }
            default:
                break;
        }

        throw new MQClientException(response.getCode(), response.getRemark());
    }    
}

选择消息队列

主题路由信息数据TopicPublishInfo获取到之后,需要从中选取一个消息队列,是通过调用MQFaultStrategy的selectOneMessageQueue方法触发的,之后会进入MQFaultStrategyselectOneMessageQueue方法从主题路由信息中选择消息队列:

public class DefaultMQProducerImpl implements MQProducerInner {
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 选择消息队列
        return this.mqFaultStrategy.selectOneMessageQueue(tpInfo, lastBrokerName);
    }
}

MQFaultStrategy的selectOneMessageQueue方法主要是通过调用TopicPublishInfo中的相关方法进行消息队列选择的

启用故障延迟机制

如果启用了故障延迟机制,会遍历TopicPublishInfo中存储的消息队列列表,对计数器增1,轮询选择一个消息队列,接着会判断消息队列所属的Broker是否可用,如果Broker可用返回消息队列即可。

如果选出的队列所属Broker不可用,会调用latencyFaultTolerancepickOneAtLeast方法(下面会讲到)选择一个Broker,从tpInfo中获取此Broker可写的队列数量,如果数量大于0,调用selectOneMessageQueue()方法选择一个队列。

如果故障延迟机制未选出消息队列,依旧会调用selectOneMessageQueue()选择出一个消息队列。

未启用故障延迟机制

直接调用的selectOneMessageQueue(String lastBrokerName)方法并传入上一次使用的Broker名称进行选择。

public class MQFaultStrategy {
     /**
     * 选择消息队列
     * @param tpInfo 主题路由信息
     * @param lastBrokerName 上一次使用的Broker名称
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 如果启用故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                // 计数器增1
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 遍历TopicPublishInfo中存储的消息队列列表
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 轮询选择一个消息队列
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    // 如果下标小于0,则使用0
                    if (pos < 0)
                        pos = 0;
                    // 根据下标获取消息队列
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 如果未获取到可用的Broker
                // 调用pickOneAtLeast选择一个
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // 从tpInfo中获取Broker可写的队列数量
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                // 如果可写的队列数量大于0
                if (writeQueueNums > 0) {
                    // 选择一个消息队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        // 设置消息队列所属的Broker
                        mq.setBrokerName(notBestBroker);
                        // 设置队列ID
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    // 返回消息队列
                    return mq;
                } else {
                    // 移除Broker
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
            return tpInfo.selectOneMessageQueue();
        }
        // 根据上一次使用的BrokerName获取消息队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

selectOneMessageQueue方法的实现

selectOneMessageQueue方法中,如果上一次选择的BrokerName为空,则调用无参的selectOneMessageQueue方法选择消息队列,也是默认的选择方式,首先对计数器增一,然后用计数器的值对messageQueueList列表的长度取余得到下标值pos,再从messageQueueList中获取pos位置的元素,以此达到轮询从messageQueueList列表中选择消息队列的目的。

如果传入的BrokerName不为空,遍历messageQueueList列表,同样对计数器增一,并对messageQueueList列表的长度取余,选取一个消息队列,不同的地方是选择消息队列之后,会判断消息队列所属的Broker是否与上一次选择的Broker名称一致,如果一致则继续循环,轮询选择下一个消息队列,也就是说,如果上一次选择了某个Broker发送消息,本次将不会再选择这个Broker,当然如果最后仍未找到满足要求的消息队列,则仍旧使用默认的选择方式,也就是调用无参的selectOneMessageQueue方法进行选择。

public class TopicPublishInfo {
    private boolean orderTopic = false;
    private boolean haveTopicRouterInfo = false;
    private List<MessageQueue> messageQueueList = new ArrayList<MessageQueue>(); // 消息队列列表
    private volatile ThreadLocalIndex sendWhichQueue = new ThreadLocalIndex(); // 一个计数器,每次选择消息队列的时候增1,以此达到轮询的目的
    private TopicRouteData topicRouteData;

    // ...
    
    public MessageQueue selectOneMessageQueue(final String lastBrokerName) {
        // 如果上一次选择的BrokerName为空
        if (lastBrokerName == null) {
            // 选择消息队列
            return selectOneMessageQueue();
        } else {
            // 遍历消息队列列表
            for (int i = 0; i < this.messageQueueList.size(); i++) {
                // 计数器增1
                int index = this.sendWhichQueue.incrementAndGet();
                // 对长度取余
                int pos = Math.abs(index) % this.messageQueueList.size();
                if (pos < 0)
                    pos = 0;
                // 获取消息队列,也就是使用使用轮询的方式选择消息队列
                MessageQueue mq = this.messageQueueList.get(pos);
                // 如果队列所属的Broker与上一次选择的不同,返回消息队列
                if (!mq.getBrokerName().equals(lastBrokerName)) {
                    return mq;
                }
            }
            // 使用默认方式选择
            return selectOneMessageQueue();
        }
    }
    
    // 选择消息队列
    public MessageQueue selectOneMessageQueue() {
        // 自增
        int index = this.sendWhichQueue.incrementAndGet();
        // 对长度取余
        int pos = Math.abs(index) % this.messageQueueList.size();
        if (pos < 0)
            pos = 0;
        // 选择消息队列
        return this.messageQueueList.get(pos);
    }
}

故障延迟机制

回到发送消息的代码中,可以看到消息发送无论成功与否都会调用updateFaultItem方法更新失败条目:

public class DefaultMQProducerImpl implements MQProducerInner {
        
    private MQFaultStrategy mqFaultStrategy = new MQFaultStrategy();
    
    // 发送消息
    private SendResult sendDefaultImpl(
        Message msg,
        final CommunicationMode communicationMode,
        final SendCallback sendCallback,
        final long timeout
    ) throws MQClientException, RemotingException, MQBrokerException, InterruptedException {
            // ...
            for (; times < timesTotal; times++) {
                    try {
                        // 开始时间
                        beginTimestampPrev = System.currentTimeMillis();
                        // ...
                        // 发送消息
                        sendResult = this.sendKernelImpl(msg, mq, communicationMode, sendCallback, topicPublishInfo, timeout - costTime);
                        // 结束时间
                        endTimestamp = System.currentTimeMillis();
                        // 更新失败条目
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        // ...
                    } catch (RemotingException e) {
                        endTimestamp = System.currentTimeMillis();
                        // 更新失败条目
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, true);
                        log.warn(String.format("sendKernelImpl exception, resend at once, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());
                        exception = e;
                        continue;
                    }
                    // 省略其他catch
                    // ...
                    catch (InterruptedException e) {
                        endTimestamp = System.currentTimeMillis();
                        this.updateFaultItem(mq.getBrokerName(), endTimestamp - beginTimestampPrev, false);
                        log.warn(String.format("sendKernelImpl exception, throw exception, InvokeID: %s, RT: %sms, Broker: %s", invokeID, endTimestamp - beginTimestampPrev, mq), e);
                        log.warn(msg.toString());

                        log.warn("sendKernelImpl exception", e);
                        log.warn(msg.toString());
                        throw e;
                    }
                } else {
                    break;
                }
            }

          // ...
    }
    
    // 更新FaultItem
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        // 调用MQFaultStrategy的updateFaultItem方法
        this.mqFaultStrategy.updateFaultItem(brokerName, currentLatency, isolation);
    }

}

MQFaultStrategy中有一个类型的成员变量,最终是通过调用latencyFaultToleranceupdateFaultItem方法进行更新的,并传入了三个参数:

brokerName:Broker名称

currentLatency:当前延迟时间,由上面的调用可知传入的值为发送消息的耗时时间,即消息发送结束时间 - 开始时间

duration:持续时间,根据isolation的值决定,如果为true,duration的值为30000ms也就是30s,否则与currentLatency的值一致

public class MQFaultStrategy {
  
    // 故障延迟机制
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
  
    /**
     *  更新失败条目
     * @param brokerName Broker名称
     * @param currentLatency 发送消息耗时:请求结束时间 - 开始时间
     * @param isolation
     */
    public void updateFaultItem(final String brokerName, final long currentLatency, boolean isolation) {
        if (this.sendLatencyFaultEnable) {
            // 计算duration,isolation为true时使用30000,否则使用发送消息的耗时时间currentLatency
            long duration = computeNotAvailableDuration(isolation ? 30000 : currentLatency);
            // 更新到latencyFaultTolerance中
            this.latencyFaultTolerance.updateFaultItem(brokerName, currentLatency, duration);
        }
    }
}

LatencyFaultToleranceImpl

LatencyFaultToleranceImpl中有一个faultItemTable,记录了每个Broker对应的FaultItem,在updateFaultItem方法中首先根据Broker名称从faultItemTable获取FaultItem

  • 如果获取为空,说明需要新增FaultItem,新建FaultItem对象,设置传入的currentLatency延迟时间(消息发送结束时间 - 开始时间)和开始时间即当前时间 +notAvailableDurationnotAvailableDuration值有两种情况,值为30000毫秒或者与currentLatency的值一致
  • 如果获取不为空,说明之前已经创建过对应的FaultItem,更新FaultItem中的currentLatency延迟时间和StartTimestamp开始时间
public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
   
    /**
     * 更新FaultItem
     * @param name Broker名称
     * @param currentLatency 延迟时间,也就是发送消息耗时:请求结束时间 - 开始时间
     * @param notAvailableDuration 不可用的持续时间,也就是上一步中的duration
     */
    @Override
    public void updateFaultItem(final String name, final long currentLatency, final long notAvailableDuration) {
        // 获取FaultItem
        FaultItem old = this.faultItemTable.get(name);
        // 如果不存在
        if (null == old) {
            // 新建FaultItem
            final FaultItem faultItem = new FaultItem(name);
            // 设置currentLatency延迟时间
            faultItem.setCurrentLatency(currentLatency);
            // 设置规避故障开始时间,当前时间 + 不可用的持续时间,不可用的持续时间有两种情况:值为30000或者与currentLatency一致
            faultItem.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            // 添加到faultItemTable
            old = this.faultItemTable.putIfAbsent(name, faultItem);
            if (old != null) {
                old.setCurrentLatency(currentLatency);
                old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
            }
        } else {
            // 更新时间
            old.setCurrentLatency(currentLatency);
            // 更新开始时间
            old.setStartTimestamp(System.currentTimeMillis() + notAvailableDuration);
        }
    }
}

FaultItemLatencyFaultToleranceImpl的一个内部类,里面有三个变量:

  • name:Broker名称。
  • currentLatency:延迟时间,等于发送消息耗时时间:发送消息结束时间 - 开始时间。
  • startTimestamp:规避故障开始时间:新建/更新FaultItem的时间 + 不可用的时间notAvailableDurationnotAvailableDuration值有两种情况,值为30000毫秒或者与currentLatency的值一致。

isAvailable方法

isAvailable方法用于开启故障延迟机制时判断Broker是否可用,可用判断方式为:当前时间 - startTimestamp的值大于等于 0,如果小于0则认为不可用。

上面分析可知startTimestamp的值为新建/更新FaultItem的时间 + 不可用的时间,如果当前时间减去规避故障开始时间的值大于等于0,说明此Broker已经超过了设置的规避时间,可以重新被选择用于发送消息。

compareTo方法

FaultItem还实现了Comparable,重写了compareTo方法,在排序的时候使用,对比大小的规则如下:

  1. 调用isAvailable方法判断当前对象和other的值是否相等,如果相等继续第2步,如果不相等,说明两个对象一个返回true一个返回false,此时优先判断当前对象的isAvailable方法返回值是否为true:

    • true:表示当前对象比other小,返回-1,对应当前对象为true,other对象为false的情况
    • false:调用otherisAvailable方法判断是否为true,如果为true,返回1,表示other比较大(对应当前对象为false,other对象为true的情况),否则继续第2步根据其他条件判断。
  2. 对比currentLatency的值,如果currentLatency值小于other的,返回-1,表示当前对象比other小。

  3. 对比startTimestamp的值,如果startTimestamp值小于other的,返回-1,同样表示当前对象比other小。

总结

isAvailable方法返回true的时候表示FaultItem对象的值越小,因为true代表Broker已经过了规避故障的时间,可以重新被选择。

currentLatency的值越小表示FaultItem的值越小。currentLatency的值与Broker发送消息的耗时有关,耗时越低,值就越小。

startTimestamp值越小同样表示整个FaultItem的值也越小。startTimestamp的值与currentLatency有关(值不为默认的30000毫秒情况下),currentLatency值越小,startTimestamp的值也越小。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> { 
    class FaultItem implements Comparable<FaultItem> {
        private final String name; // Broker名称
        private volatile long currentLatency; // 发送消息耗时时间:请求结束时间 - 开始时间
        private volatile long startTimestamp; // 规避开始时间:新建/更新FaultItem的时间 + 不可用的时间notAvailableDuration
       
        @Override
        public int compareTo(final FaultItem other) {
            // 如果isAvailable不相等,说明一个为true一个为false
            if (this.isAvailable() != other.isAvailable()) {
                if (this.isAvailable()) // 如果当前对象为true
                    return -1; // 当前对象小

                if (other.isAvailable())// 如果other对象为true
                    return 1; // other对象大
            }
            // 对比发送消息耗时时间
            if (this.currentLatency < other.currentLatency)
                return -1;// 当前对象小
            else if (this.currentLatency > other.currentLatency) {
                return 1; // other对象大
            }
            // 对比故障规避开始时间
            if (this.startTimestamp < other.startTimestamp)
                return -1;
            else if (this.startTimestamp > other.startTimestamp) {
                return 1;
            }

            return 0;
        }
        // 用于判断Broker是否可用
        public boolean isAvailable() {
            // 当前时间减去startTimestamp的值是否大于等于0,大于等于0表示可用
            return (System.currentTimeMillis() - startTimestamp) >= 0;
        }
   }
}

在选择消息队列时,如果开启故障延迟机制并且未找到合适的消息队列,会调用pickOneAtLeast方法选择一个Broker,那么是如何选择Broker的呢?

  1. 首先遍历faultItemTableMap集合,将每一个Broker对应的FaultItem加入到LinkedList链表中

  2. 调用sort方法对链表进行排序,默认是正序从小到大排序,FaultItem还实现Comparable就是为了在这里进行排序,值小的排在链表前面

  3. 计算中间值half

    • 如果half值小于等于0,取链表中的第一个元素
    • 如果half值大于0,从前half个元素中轮询选择元素

FaultItemcompareTo方法可知,currentLatency和startTimestamp的值越小,整个FaultItem的值也就越小,正序排序时越靠前,靠前表示向Broker发送消息的延迟越低,在选择Broker时优先级越高,所以如果half值小于等于0的时候,取链表中的第一个元素,half值大于0的时候,处于链表前half个的Brokerddd,延迟都是相对较低的,此时轮询从前haft个Broker中选择一个Broker。

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    // FaultItem集合,Key为BrokerName,value为对应的FaultItem对象
    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);
  
    @Override
    public String pickOneAtLeast() {
        final Enumeration<FaultItem> elements = this.faultItemTable.elements();
        List<FaultItem> tmpList = new LinkedList<FaultItem>();
        // 遍历faultItemTable
        while (elements.hasMoreElements()) {
            final FaultItem faultItem = elements.nextElement();
            // 将FaultItem添加到列表中
            tmpList.add(faultItem);
        }

        if (!tmpList.isEmpty()) {
            Collections.shuffle(tmpList);
            // 排序
            Collections.sort(tmpList);
            // 计算中间数
            final int half = tmpList.size() / 2;
            // 如果中位数小于等于0
            if (half <= 0) {
                // 获取第一个元素
                return tmpList.get(0).getName();
            } else {
                //  对中间数取余
                final int i = this.whichItemWorst.incrementAndGet() % half;
                return tmpList.get(i).getName();
            }
        }

        return null;
    }
}

再回到MQFaultStrategy中选择消息队列的地方,在开启故障延迟机制的时候,选择队列后会调用LatencyFaultToleranceImplisAvailable方法来判断Broker是否可用,而LatencyFaultToleranceImplisAvailable方法又是调用Broker对应 FaultItemisAvailable方法来判断的。

由上面的分析可知,isAvailable返回true表示Broker已经过了规避时间可以用于发送消息,返回false表示还在规避时间内,需要避免选择此Broker,所以故障延迟机制指的是在发送消息时记录每个Broker的耗时时间,如果某个Broker发生故障,但是生产者还未感知(NameServer 30s检测一次心跳,有可能Broker已经发生故障但未到检测时间,所以会有一定的延迟),用耗时时间做为一个故障规避时间(也可以是30000ms),此时消息会发送失败,在重试或者下次选择消息队列的时候,如果在规避时间内,可以在短时间内避免再次选择到此Broker,以此达到故障规避的目的。

如果某个主题所在的所有Broker都处于不可用状态,此时调用pickOneAtLeast方法尽量选择延迟时间最短、规避时间最短(排序后的失败条目中靠前的元素)的Broker作为此次发生消息的Broker。

public class MQFaultStrategy {
    private final LatencyFaultTolerance<String> latencyFaultTolerance = new LatencyFaultToleranceImpl();
     /**
     * 选择消息队列
     * @param tpInfo 主题路由信息
     * @param lastBrokerName 上一次使用的Broker名称
     * @return
     */
    public MessageQueue selectOneMessageQueue(final TopicPublishInfo tpInfo, final String lastBrokerName) {
        // 如果启用故障延迟机制
        if (this.sendLatencyFaultEnable) {
            try {
                // 计数器增1
                int index = tpInfo.getSendWhichQueue().incrementAndGet();
                // 遍历TopicPublishInfo中存储的消息队列列表
                for (int i = 0; i < tpInfo.getMessageQueueList().size(); i++) {
                    // 轮询选择一个消息队列
                    int pos = Math.abs(index++) % tpInfo.getMessageQueueList().size();
                    // 如果下标小于0,则使用0
                    if (pos < 0)
                        pos = 0;
                    // 根据下标获取消息队列
                    MessageQueue mq = tpInfo.getMessageQueueList().get(pos);
                    // 判断消息队列所属的Broker是否可用,如果可用返回当前选择的消息队列
                    if (latencyFaultTolerance.isAvailable(mq.getBrokerName()))
                        return mq;
                }
                // 如果未获取到可用的Broker
                // 调用pickOneAtLeast选择一个
                final String notBestBroker = latencyFaultTolerance.pickOneAtLeast();
                // 从tpInfo中获取Broker可写的队列数量
                int writeQueueNums = tpInfo.getQueueIdByBroker(notBestBroker);
                // 如果可写的队列数量大于0
                if (writeQueueNums > 0) {
                    // 选择一个消息队列
                    final MessageQueue mq = tpInfo.selectOneMessageQueue();
                    if (notBestBroker != null) {
                        // 设置消息队列所属的Broker
                        mq.setBrokerName(notBestBroker);
                        // 设置队列ID
                        mq.setQueueId(tpInfo.getSendWhichQueue().incrementAndGet() % writeQueueNums);
                    }
                    // 返回消息队列
                    return mq;
                } else {
                    // 移除Broker
                    latencyFaultTolerance.remove(notBestBroker);
                }
            } catch (Exception e) {
                log.error("Error occurred when selecting message queue", e);
            }
            // 如果故障延迟机制未选出消息队列,调用selectOneMessageQueue选择消息队列
            return tpInfo.selectOneMessageQueue();
        }
        // 根据上一次使用的BrokerName获取消息队列
        return tpInfo.selectOneMessageQueue(lastBrokerName);
    }
}

public class LatencyFaultToleranceImpl implements LatencyFaultTolerance<String> {
    private final ConcurrentHashMap<String, FaultItem> faultItemTable = new ConcurrentHashMap<String, FaultItem>(16);

    @Override
    public boolean isAvailable(final String name) {
        final FaultItem faultItem = this.faultItemTable.get(name);
        if (faultItem != null) {
            // 调用FaultItem的isAvailable方法判断是否可用
            return faultItem.isAvailable();
        }
        return true;
    }
}    

参考
丁威、周继锋《RocketMQ技术内幕》

RocketMQ版本:4.9.3


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK