9

跟我学RocketMQ[1-3]之发送普通消息及封装DefaultMQProducer支持spring

 3 years ago
source link: http://wuwenliang.net/2019/01/23/%E8%B7%9F%E6%88%91%E5%AD%A6RocketMQ-1-3-%E4%B9%8B%E5%8F%91%E9%80%81%E6%99%AE%E9%80%9A%E6%B6%88%E6%81%AF%E5%8F%8A%E5%B0%81%E8%A3%85DefaultMQProducer%E6%94%AF%E6%8C%81spring/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

本文是《跟我学RocketMQ系列》的第三篇,前面两篇中,我带领大家了解了如何搭建RocketMQ以及如何通过web端的console进行RocketMQ的运维。

从本文开始,我将从研发的角度,逐步深入RocketMQ。

本文先讲解如何利用RocketMQ的java客户端进行普通消息的发送以及对它进行薄封装,以便更好的适配spring框架。

对RocketMQ的封装版本的代码已经上传github,shield-rocketmq-client-spring 欢迎大家star及fork~

通过DefaultMQProducer发送普通消息

RocketMQ使用DefaultMQProducer实现普通消息的发送操作。

首先通过构造方法初始化一个生产者组为“PID-TEST”的普通消息生产者。

注意 RocketMQ官方建议,生产者组统一以 PID_ 开头,消费者组统一以 CID_ 开头。

DefaultMQProducer defaultMQProducer =
     new DefaultMQProducer("PID_TEST");

设置Nameserver地址

defaultMQProducer.setNamesrvAddr("127.0.0.1:9876");

启动生产者,建立到broker的链接

defaultMQProducer.start();

使用Jackson进行消息实体序列化

ObjectMapper objectMapper = new ObjectMapper();
for (int i = 0; i < 10; i++) {
    try {
        MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
                "SNOWALKER_TEST",
                "SNOWALKER_TEST-TAG",
                "localhost.localdomain",
                "测试消息简单发送------第" + i + "次",
                "10",
                "simple-msg-test-" + i);

构造消息协议并使用Jackson序列化为JSON字符串

String message  = objectMapper.writeValueAsString(msg);

使用官方的Message实体,构造消息体,并设置消息发布的主题名,TAG名,同时需要将要发送的消息体转换为二进制形式。

Message sendMessage = new Message(
        msg.getTopicName(),
        msg.getTagName(),
        message.getBytes());

通过调用defaultMQProducer的send(Message msg, SendCallback sendCallback)方法进行消息发送。

这里发送方需要实现SendCallback回调接口,实现其中的onSuccess,onException方法,分别对应发送结果的成功和异常两种情况。正式的业务场景中需要对这些情况做对应的业务操作。

defaultMQProducer.send(sendMessage, new SendCallback() {
    @Override
    public void onSuccess(SendResult sendResult) {
        LOGGER.info("消息id={}, 发送结果={}" ,
        sendResult.getMsgId(), 
        sendResult.getSendStatus());
    }

    @Override
    public void onException(Throwable throwable) {
        LOGGER.info("消息主题={}, 消息体={}" ,
        sendMessage.getTopic(),
        new String(sendMessage.getBody()));
        throwable.printStackTrace();
    }
});

消息发送失败会抛出MQClientException,正式的业务中需要对异常进行捕获并处理。

    } catch (Exception e) {
        e.printStackTrace();
    }
}

我们尝试运行一下,这里我已经有了对应的消费者,可以看下运行的日志:

2019-01-23 09:55:25.022  INFO 18784 --- [ublicExecutor_8] c.s.shield.job.publisher.DemoPublisher   : 
消息id=AC1E5356496018B4AAC2736D06CF0002, 发送结果=SEND_OK

2019-01-23 09:55:27.519  INFO 18784 --- [MessageThread_8] c.s.shield.job.consumer.DemoConsumer     : 
当前消费线程名=ConsumeMessageThread_8, 消息id=AC1E5356496018B4AAC2736D06CF0002, 收到消息为={"msgName":"rocketmq-simple-msg-test","topicName":"SNOWALKER_TEST","tagName":"SNOWALKER_TEST-TAG","clusterName":"localhost.localdomain","taskName":"测试消息简单发送------第0次","threadSize":"10","threadName":"simple-msg-test-0"}

可以看到消息成功提交到了Broker并且被消费者消息,识别的标志是msgId。

Spring框架整合DefaultMQProducer

目前业务系统大量使用了Springboot、spring框架,因此我们对DefaultMQProducer进行一层薄封装,话不多说直接上代码。

封装基于 Spring Boot v1.5.3.RELEASE, Spring v4.3.8.RELEASE,请读者自行添加依赖。

声明为Spring的一个bean,同时声明为prototype,支持多例。

@Component
@Scope("prototype")
public class RocketMQSimpleProducerAgent {

    private static final Logger LOGGER = LoggerFactory.getLogger(RocketMQSimpleProducerAgent.class);

声明DefaultMQProducer为成员变量,不进行初始化,初始化操作在后续的init()方法中进行。

private DefaultMQProducer defaultMQProducer;

初始化defaultMQProducer,构造方法传入生产者组id,并设置NameServer的地址,这里将配置统一封装到RocketMQProducerConfig配置类中(具体内容在附录中)。

public RocketMQSimpleProducerAgent init(RocketMQProducerConfig producerConfig) throws Exception {
    defaultMQProducer = 
        new DefaultMQProducer(producerConfig.getProducerGroup());
    defaultMQProducer.setNamesrvAddr(
        producerConfig.getNameSrvAddr());
    LOGGER.debug("com.shield.job.message.rocketmq.RocketMQProducerAgent[初始化完成]");
    return this;
}

独立的生产者启动方法。

/**
* 启动消费者服务
*/
public void start() throws MQClientException {
    this.defaultMQProducer.start();
}

独立的关闭方法。

public void destroy() {
    this.defaultMQProducer.shutdown();
    LOGGER.debug
        ("com.shield.job.message.rocketmq.RocketMQProducerAgent[已关闭]");
}

为方便外部对生产者进行进一步的自定义设置,提供外部获取defaultMQProducer的接口。

    public DefaultMQProducer getProducer() {
        return this.defaultMQProducer;
    }

}

很简洁的薄封装,那么如何使用呢?

RocketMQSimpleProducerAgent使用案例

我们还是基于文章开始时候的例子,将其改造成为基于RocketMQSimpleProducerAgent的调用方式,代码如下。

@Component
public class DemoPublisher {

    private static final Logger LOGGER = LoggerFactory.getLogger(DemoPublisher.class);

引入RocketMQSimpleProducerAgent类,这里建议将封装后的客户端统一写为一个二方包,便于各个项目使用。

使用@Resource(name = “rocketMQSimpleProducerAgent”)或者直接@Autowired将自定义的普通消息生产者注入。

@Resource(name = "rocketMQSimpleProducerAgent")
RocketMQSimpleProducerAgent rocketMQProducerAgent;

调用方需要实现一个返回值为void的方法,并标记为@PostConstruct,在该方法中进行rocketMQProducerAgent的初始化。当spring在加载过程中,DemoPublisher初始化之前会调用该init()方法初始化rocketMQProducerAgent。通过start()链式调用,启动普通消息生产者,内部是调用的defaultMQProducer.start()方法。

@PostConstruct
void init() throws Exception {
    rocketMQProducerAgent.init(new RocketMQProducerConfig(
            "group-snowalker",
            "172.30.83.100:9876"
    )).start();
    this.publish();
}

生产者逻辑和上文讲解的没有区别。

    public void publish() {
        ObjectMapper objectMapper = new ObjectMapper();
        for (int i = 0; i < 10; i++) {
            try {
                MessageBean msg = new MessageBean("rocketmq-simple-msg-test",
                        "SNOWALKER_TEST",
                        "SNOWALKER_TEST-TAG",
                        "localhost.localdomain",
                        "测试消息简单发送------第" + i + "次",
                        "10",
                        "simple-msg-test-" + i);
                String message  = objectMapper.writeValueAsString(msg);

                Message sendMessage = new Message(
                        msg.getTopicName(), msg.getTagName(), message.getBytes());

                rocketMQProducerAgent.getProducer().send(sendMessage, new SendCallback() {
                    @Override
                    public void onSuccess(SendResult sendResult) {
                        LOGGER.info("消息id={}, 发送结果={}" ,sendResult.getMsgId(), sendResult.getSendStatus());
                    }

                    @Override
                    public void onException(Throwable throwable) {
                        LOGGER.info("消息主题={}, 消息体={}" ,sendMessage.getTopic(), new String(sendMessage.getBody()));
                        throwable.printStackTrace();
                    }
                });
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
}

运行之后的效果和一开始的直接调用RocketMQ的java开发包一致,但是这种方式可以让我们更加灵活的在不同的业务中使用消息发送,参数都是可以自定义的。可以使用@Value读取配置文件,能够让我们更关注业务逻辑而不需要关注消息发送的细节。

附录:RocketMQProducerConfig配置类

该配置类封装了生产者客户端初始化的必填参数,目的是收拢初始化参数,从而使初始化接口更加简洁,符合开闭原则。

/**
* @author snowalker
* @version 1.0
* @date 2019/1/21 10:38
* @className RocketMQProducerConfig
* @desc RocketMQ生产者配置
*/
public class RocketMQProducerConfig {

    /**生产者组*/
    private String producerGroup;

    /**指定NameServer名称*/
    private String nameSrvAddr;

    public RocketMQProducerConfig(String producerGroup, String nameSrvAddr) {
        Preconditions.checkNotNull(producerGroup);
        Preconditions.checkNotNull(nameSrvAddr);
        this.producerGroup = producerGroup;
        this.nameSrvAddr = nameSrvAddr;
    }

    public String getProducerGroup() {
        return producerGroup;
    }

    public String getNameSrvAddr() {
        return nameSrvAddr;
    }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK