9

跟我学RocketMQ之开源客户端混合云实践与案例解析

 3 years ago
source link: http://wuwenliang.net/2019/08/28/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ%E4%B9%8B%E5%BC%80%E6%BA%90%E5%AE%A2%E6%88%B7%E7%AB%AF%E6%B7%B7%E5%90%88%E4%BA%91%E5%AE%9E%E8%B7%B5%E4%B8%8E%E6%A1%88%E4%BE%8B%E8%A7%A3%E6%9E%90/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client
  • 开源rocketmq-java客户端sdk使用方法
  • 开源rocketmq-java客户端sdk使用场景解读
  • 混合云场景案例解析
  • 下一站:测试/线上一体化

开源rocketmq-java客户端sdk使用方法

目前通过RocketMQ开源客户端可以访问阿里云RocketMQ的 普通消息、顺序消息、延时/定时消息、事务消息,基本覆盖了云上MQ的主流场景。

我们接着讲解一下如何通过开源SDK使用云上RocketMQ。

本部分的配置项,生产者、消费者应用都需要添加。

我们通过代码案例讲解一下如何使用RocketMQ开源客户端的访问云上的MQ,进行消息发送与消费。

使用4.5.1版本进行讲解,在项目中添加如下依赖

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-client -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.5.1</version>
</dependency>

<!-- https://mvnrepository.com/artifact/org.apache.rocketmq/rocketmq-acl -->
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-acl</artifactId>
    <version>4.5.1</version>
</dependency>

在springboot项目的application.properties中添加rocketmq配置

#nameServer地址
rocketmq.nameServer.offline=开源版本nameServer地址
rocketmq.nameServer.aliyun=阿里云接入点
rocketmq.acl.accesskey=云上accesskey
rocketmq.acl.accessSecret=云上accessSecret

ak sk 写上对开源版本代码运行没有影响。换到开源版本,这部分代码保留即可,

为了方便代码部署,编写一个配置读取类,根据环境变量设置的环境类型,选取不同的nameServer

@Component
public class MQNamesrvConfig {

    // 线下开源rocketMQ
    @Value("${rocketmq.nameServer.offline}")
    String offlineNamesrv;

    // 阿里云RocketMQ接入点
    @Value("${rocketmq.nameServer.aliyun}")
    String aliyunNamesrv;

    /**
    * 根据环境选择nameServer地址
    * @return
    */
    public String nameSrvAddr() {
        String envType = System.getProperty("envType");
        if (StringUtils.isBlank(envType)) {
            throw new IllegalArgumentException("please insert envType");
        }
        switch (envType) {
            case "offline" : {
                return offlineNamesrv;
            }
            case "aliyun" : {
                return aliyunNamesrv;
            }
            default : {
                throw new IllegalArgumentException("please insert right envType, offline/aliyun");
            }
        }
    }
}

生产者代码样例

为了支持代码访问云上MQ需要更改生产者构造方法,代码如下:

@Autowired
MQNamesrvConfig namesrvConfig;

@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;

@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;

private DefaultMQProducer defaultMQProducer;

@PostConstruct
public void init() {
    defaultMQProducer =
            new DefaultMQProducer("GID_SNOWALKE_TEST",
            new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)));
    defaultMQProducer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
    // 发送失败重试次数
    defaultMQProducer.setRetryTimesWhenSendFailed(3);
    try {
        defaultMQProducer.start();
    } catch (MQClientException e) {
        LOGGER.error("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
        throw new RuntimeException("[秒杀订单生产者]--SecKillChargeOrderProducer加载异常!", e);
    }
    LOGGER.info("[秒杀订单生产者]--SecKillChargeOrderProducer加载完成!");
}

关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secretKey设置进去。官方sample代码如下

DefaultMQProducer producer = new DefaultMQProducer("ProducerGroupName", getAclRPCHook());
producer.setNamesrvAddr("127.0.0.1:9876");
producer.start();

static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
}

消费者代码样例

消费者初始化代码也需要稍作改造。

@Autowired
MQNamesrvConfig namesrvConfig;

@Value("${rocketmq.acl.accesskey}")
String aclAccessKey;

@Value("${rocketmq.acl.accessSecret}")
String aclAccessSecret;

private DefaultMQPushConsumer defaultMQPushConsumer;

@Resource(name = "secKillChargeOrderListenerImpl")
private MessageListenerConcurrently messageListener;

@PostConstruct
public void init() {
    defaultMQPushConsumer =
            new DefaultMQPushConsumer("GID_SNOWALKE_TEST",,
                    new AclClientRPCHook(new SessionCredentials(aclAccessKey, aclAccessSecret)),
                    // 平均分配队列算法,hash
                    new AllocateMessageQueueAveragely());
    defaultMQPushConsumer.setNamesrvAddr(namesrvConfig.nameSrvAddr());
    // 从头开始消费
    defaultMQPushConsumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
    // 消费模式:集群模式
    defaultMQPushConsumer.setMessageModel(MessageModel.CLUSTERING);
    // 注册监听器
    defaultMQPushConsumer.registerMessageListener(messageListener);
    // 设置每次拉取的消息量,默认为1
    defaultMQPushConsumer.setConsumeMessageBatchMaxSize(1);
    // 订阅所有消息
    try {
        defaultMQPushConsumer.subscribe(MessageProtocolConst.SECKILL_CHARGE_ORDER_TOPIC.getTopic(), "*");
        // 启动消费者
        defaultMQPushConsumer.start();
    } catch (MQClientException e) {
        LOGGER.error("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!e={}", LogExceptionWapper.getStackTrace(e));
        throw new RuntimeException("[秒杀下单消费者]--SecKillChargeOrderConsumer加载异常!", e);
    }
    LOGGER.info("[秒杀下单消费者]--SecKillChargeOrderConsumer加载完成!");
}

同生产者初始化过程类似,关键点在于构造初始化的过程中,设置AclClientRPCHook,并将accesskey与secret设置进去。官方sample代码如下

我们主要以PushConsumer为例,PullConsumer的使用可以自行前往官方sample目录下查看

DefaultMQPushConsumer consumer = 
new DefaultMQPushConsumer("ConsumerGroupName", 
                            getAclRPCHook(), 
                            new AllocateMessageQueueAveragely());
consumer.setNamesrvAddr("127.0.0.1:9876");


static RPCHook getAclRPCHook() {
    return new AclClientRPCHook(new SessionCredentials(ACL_ACCESS_KEY,ACL_SECRET_KEY));
} 

补充:其他改造方案简介

  1. 通过java -D方式将ak sk及nameserver参数设置为环境变量
  2. 通过docker -e 方式将参数设置为环境变量
  3. 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便

这些方式几乎可以做到零改动升级。

开源rocketmq-java客户端sdk使用场景解读

使用开源RocketMQ的java客户端能够满足多种需求,如:

  1. 混合云场景
  2. 测试线上一体化
  3. 云迁移场景

混合云场景

如果您的企业需要同时访问阿里云和其他公有云或者本地数据中心,开源客户端也可以支持多云环境的无缝切换。很好的支持分布式系统多云环境的数据同步、异地容灾等功能。

我们常说,不能把鸡蛋放在一个篮子里。

这种思维恰恰就是多云环境的体现。对于某些核心业务,我们要满足HA,应用部署需要具备可迁移,多环境热备,抗不可抗力等的能力。

我们会选择打通多云环境,或者建立私有云平台,保证跨云数据同步。

本次分享主要就混合云场景做重要的分析,我们将采用一个电商秒杀业务场景的混合云场景改造进行案例分析。

测试/线上一体化

鉴于开源客户端既可以访问开源RocketMQ集群又可以访问阿里云MQ,您的应用可以在开发、测试阶段使用开源RocketMQ集群,线上阶段使用阿里云RocketMQ,这将很大程度为您节约成本。切换过程无需修改一行代码。

测试线上一体化的核心为 “一套代码,处处运行”,线上线下逻辑相同,不需要进行针对环境进行变更,使业务逻辑无状态化。

云上的MQ公网模式下按量收费,为了节省开发测试成本,因此在开发测试阶段选择使用线下的开源RocektMQ;正式上线再接入线上的云MQ。

客户端sdk支持一套代码访问开源、云上的MQ,从而降低开发成本。

示例图如下:

测试线上一体化
测试线上一体化

案例:我当前所在的业务线在生产环境主要使用了阿里云RocketMQ的铂金版,线下测试如果也使用铂金版成本较高,换成公网由于Topic数量有三十多个,测试开发两套环境每个月固定支出几千,因此对代码进行了升级。

测试和开发环境功能测试访问本地开源RocketMQ集群,线上使用阿里云铂金版RocketMQ,效果很稳定。

云迁移场景

如果您正想从开源RocketMQ迁移到阿里云MQ,开源客户端是最好的选择,升级到开源客户端后,您可以在应用不停机的情况下无缝迁移到阿里云RocketMQ。

商业版MQ具有完备的技术团队支撑,稳定性及可靠性表现优异,如果应用有上云需求,访问对应区域的阿里云RocketMQ是一个不错的选择。

尤其是之前已经基于开源版MQ部署过的应用,代码量可观,如果使用云上的客户端进行兼容,修改量比较大,因此通过ACL支持的开源版客户端,几乎零修改,便可以无缝上云。

推荐的迁移策略如下:

  1. 对现有应用的客户端进行升级,加入ACL支持
  2. 进行线下的兼容性测试,保证原先业务不受影响
  3. 将配置信息剥离到环境变量中,如:
    1. 通过java -D方式将参数设置为环境变量
    2. 通过docker -e 方式将参数设置为环境变量
    3. 如果使用了K8S部署应用,则使用values.yaml或者configMap则更为方便
  4. 将应用打包后部署到云上环境即可完成迁移

流程图如下:

云迁移流程
云迁移流程

混合云场景案例解析

混合云场景的案例分析基于一个简化的电商秒杀场景业务系统讲解,项目的代码地址为 https://github.com/TaXueWWL/seckill-rocketmq

项目简介:

用户访问秒杀网关进行秒杀订单下单,平台通过RocketMQ对秒杀流量进行削峰填谷。用户通过主动查询订单 获取下单结果的完整业务流程

本地化部署架构解析

上述场景为非混合云平台部署方式,该方式的部署架构图如下:

非混合云部署方式
非混合云部署方式

秒杀网关服务、秒杀订单服务、开源MQ集群、缓存集群、DB集群都在一个环境进行部署。

混合云平台部署解析

我们对上述部署方式进行混合云改造,将数据库、MQ等中间件更换为云上版本,订单收单业务模块部署到云上环境;将缓存、收单网关部署到本地环境(使用本地环境模拟IDC)。整个部署方式变成了混合云架构。

整个平台的部署架构图如下:

混合云部署方式
混合云部署方式

案例流程解析

案例的流程图如下;

案例流程图
案例流程图

demo演示及讲解

我们对demo项目进行演示,主要演示通过访问阿里云云上MQ实现消息收发。

消息发送控制台截图:

消息发送
消息发送

消息消费控制台截图:

消息发送
消息发送

改造流程总结

  1. 代码支持云上MQ,具体方法参考第一章节 《开源rocketmq-java客户端sdk使用方法》
  2. 进行应用迁移部署

可以看到通过4.5.1以上的新版本RocketMQ客户端SDK,能够明显降低我们业务上云、混合云部署等的改造成本,提高业务部署的可靠性。

下一站:测试/线上一体化

进行混合云部署改造的过程其实就涉及到了另外的场景,即:测试/线上一体化。我们可以通过4.5.1以上的新版本RocketMQ客户端SDK达到该目的。

即:开发测试环境访问本地开源MQ集群,进行测试,节约成本。线上使用云上MQ,节约运维成本,更好的满足服务高可用。

上云指导文档

如果有以下场景需求可以访问下列链接,获取更多的上云指导

  • 云迁移场景:想从开源RocketMQ迁移到阿里云MQ
  • 混合云场景:应用需要同时访问私有云的开源RocketMQ和阿里云tMQ。
  • 测试环境访问开源RocketMQ,线上环境访问阿里云MQ。

RocketMQ开源客户端访问阿里云MQ

RocketMQ 官方Sample



版权声明:

原创不易,洗文可耻。除非注明,本博文章均为原创,转载请以链接形式标明本文地址。



About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK