9

【mq】从零开始实现 mq-04-启动检测与实现优化 - 老马啸西风

 2 years ago
source link: https://www.cnblogs.com/houbbBlogs/p/16218299.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

【mq】从零开始实现 mq-01-生产者、消费者启动

【mq】从零开始实现 mq-02-如何实现生产者调用消费者?

【mq】从零开始实现 mq-03-引入 broker 中间人

【mq】从零开始实现 mq-04-启动检测与实现优化

上一节我们引入了中间人 broker,让消息的生产者和消费者解耦。

这一节我们对初始化代码进行优化,便于后期拓展维护。

启动检测

生产者启动优化

整体实现调整如下:

@Override
public synchronized void run() {
    this.paramCheck();
    // 启动服务端
    log.info("MQ 生产者开始启动客户端 GROUP: {}, PORT: {}, brokerAddress: {}",
            groupName, port, brokerAddress);
    try {
        //channel future
        this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
                initChannelHandler(), check);

        // register to broker
        this.registerToBroker();

        // 标识为可用
        enableFlag = true;
        log.info("MQ 生产者启动完成");
    } catch (Exception e) {
        log.error("MQ 生产者启动遇到异常", e);
        throw new MqException(ProducerRespCode.RPC_INIT_FAILED);
    }
}

看起来是不是比起原来清爽很多呢?

但是复杂性只会转移,不会消失

答案就是封装到 initChannelFutureList 中去了。

initChannelFutureList

因为这里是生产者、消费者都会用到。

所以我们先放在统一的工具类中,实现本身和以前大同小异。

/**
 * 初始化列表
 * @param brokerAddress 地址
 * @param channelHandler 处理类
 * @param check 是否检测可用性
 * @return 结果
 * @since 0.0.4
 */
public static List<RpcChannelFuture> initChannelFutureList(final String brokerAddress,
                                                           final ChannelHandler channelHandler,
                                                           final boolean check) {
    List<RpcAddress> addressList = InnerAddressUtils.initAddressList(brokerAddress);
    List<RpcChannelFuture> list = new ArrayList<>();
    for(RpcAddress rpcAddress : addressList) {
        try {
            final String address = rpcAddress.getAddress();
            final int port = rpcAddress.getPort();
            EventLoopGroup workerGroup = new NioEventLoopGroup();
            Bootstrap bootstrap = new Bootstrap();
            ChannelFuture channelFuture = bootstrap.group(workerGroup)
                    .channel(NioSocketChannel.class)
                    .option(ChannelOption.SO_KEEPALIVE, true)
                    .handler(new ChannelInitializer<Channel>(){
                        @Override
                        protected void initChannel(Channel ch) throws Exception {
                            ch.pipeline()
                                    .addLast(new LoggingHandler(LogLevel.INFO))
                                    .addLast(channelHandler);
                        }
                    })
                    .connect(address, port)
                    .syncUninterruptibly();
            log.info("启动客户端完成,监听 address: {}, port:{}", address, port);
            RpcChannelFuture rpcChannelFuture = new RpcChannelFuture();
            rpcChannelFuture.setChannelFuture(channelFuture);
            rpcChannelFuture.setAddress(address);
            rpcChannelFuture.setPort(port);
            rpcChannelFuture.setWeight(rpcAddress.getWeight());
            list.add(rpcChannelFuture);
        } catch (Exception exception) {
            log.error("注册到 broker 服务端异常", exception);
            if(check) {
                throw new MqException(MqCommonRespCode.REGISTER_TO_BROKER_FAILED);
            }
        }
    }

    if(check
        && CollectionUtil.isEmpty(list)) {
        log.error("check=true 且可用列表为空,启动失败。");
        throw new MqException(MqCommonRespCode.REGISTER_TO_BROKER_FAILED);
    }
    return list;
}

这里的 check 为了避免 2 种情况:

(1)某一个 broker 不可用

(2)没有可用的 broker 信息。

消费者启动优化

消费者连接 broker 和生产者是类似的。

这里只是放一下实现,不做更多的赘述。

@Override
public void run() {
    // 启动服务端
    log.info("MQ 消费者开始启动服务端 groupName: {}, brokerAddress: {}",
            groupName, brokerAddress);
    //1. 参数校验
    this.paramCheck();
    try {
        //channel future
        this.channelFutureList = ChannelFutureUtils.initChannelFutureList(brokerAddress,
                initChannelHandler(),
                check);

        // register to broker
        this.registerToBroker();

        // 标识为可用
        enableFlag = true;
        log.info("MQ 消费者启动完成");
    } catch (Exception e) {
        log.error("MQ 消费者启动异常", e);
        throw new MqException(ConsumerRespCode.RPC_INIT_FAILED);
    }
}

这一小节的内容特别简单,对初始化部分做了优化,便于后期维护拓展。

希望本文对你有所帮助,如果喜欢,欢迎点赞收藏转发一波。

我是老马,期待与你的下次重逢。

The message queue in java.(java 简易版本 mq 实现) https://github.com/houbb/mq

rpc-从零开始实现 rpc https://github.com/houbb/rpc


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK