8

基于推和拉两种方式消费RabbitMQ消息

 2 years ago
source link: http://www.lzhpo.com/article/172
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

”推“模式:

  1. MQ主动将消息推送给消费者
  2. 实时性较高

”拉“模式:

  1. 消费者主动从MQ中拉取消息
  2. 实时性不高
  3. 降低系统吞吐量

通常我们是用”推“模式的,但是,由于某种原因,可能只有在满足某种条件下,消费者才能去进行消费,或者需要批处理的时候,此时就需要用到”拉“模式。

一、“推”模式

1.定义交换机、队列、路由键以及yml配置

package com.lzhpo.common.queue.rabbitmq.consts;import lombok.AccessLevel;import lombok.NoArgsConstructor;/** @author lzhpo */@NoArgsConstructor(access = AccessLevel.PRIVATE)public final class RabbitConst {  public static final String PUSH_EXCHANGE = "sun.push.exchange";  public static final String PUSH_QUEUE = "sun.push.queue";  public static final String PUSH_ROUTING_KEY = "sun.push.routing-key";}
spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    virtual-host: /    # 开启发送失败退回    publisher-returns: true    # 开启消息确认机制    publisher-confirm-type: correlated    template:      # 设置消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除      mandatory: true    listener:      type: simple      simple:        # 手动ACK        acknowledge-mode: manual        # 重试次数超过上面的设置之后是否丢弃(要想将无法消费的消息丢到死信队列,此处必须配置为false)        default-requeue-rejected: false        retry:          # 开启重试机制          enabled: true          # 重试次数,默认:3次          max-attempts: 3

2.定义消费者

import com.lzhpo.common.queue.rabbitmq.consts.RabbitConst;import com.rabbitmq.client.Channel;import java.util.Map;import lombok.extern.slf4j.Slf4j;import org.springframework.amqp.rabbit.annotation.Exchange;import org.springframework.amqp.rabbit.annotation.Queue;import org.springframework.amqp.rabbit.annotation.QueueBinding;import org.springframework.amqp.rabbit.annotation.RabbitListener;import org.springframework.amqp.support.AmqpHeaders;import org.springframework.messaging.handler.annotation.Headers;import org.springframework.stereotype.Component;/** @author lzhpo */@Slf4j@Componentpublic class RabbitPushConsumer {  @RabbitListener(      bindings = {        @QueueBinding(            value = @Queue(value = RabbitConst.PUSH_QUEUE, durable = "true", autoDelete = "false"),            exchange = @Exchange(value = RabbitConst.PUSH_EXCHANGE),            key = {RabbitConst.PUSH_ROUTING_KEY})      })  public void listener(String message, @Headers Map<String, Object> headers, Channel channel) {    log.info("Consumer receive new message: {}", message);    final Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);    try {      channel.basicAck(deliveryTag, false);      log.info("Consumption success: {}", message);    } catch (Exception e) {      log.error("Abnormal consumption!", e);      try {        channel.basicNack(deliveryTag, false, true);      } catch (Exception ex) {        log.error("Reject message exception!", ex);      }    }  }}

这里headers同时也可以获取其它信息:

final String exchange = (String) headers.get(AmqpHeaders.RECEIVED_EXCHANGE);final String queue = (String) headers.get(AmqpHeaders.CONSUMER_QUEUE);final String routingKey = (String) headers.get(AmqpHeaders.RECEIVED_ROUTING_KEY);

3.简单写一个生产者测试用例发送消息

  @Test  void send() {    RabbitTemplate rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);    // 消息发送到RabbitMQ交换机后接收ack回调(不能依靠此ack值判断消息是否被消费者成功消费)    rabbitTemplate.setConfirmCallback(        (correlationData, ack, cause) -> {          if (ack) {            log.info("Successfully to send message to exchange! ");          } else {            log.error(                "Failed to send message to exchange! correlationData: {}, cause: {}",                correlationData,                cause);          }        });    rabbitTemplate.setMandatory(true);    // 消息发送到RabbitMQ交换机,但无相应queue时的回调    rabbitTemplate.setReturnsCallback(        returned ->            log.error(                "The message sent has no corresponding queue! Returned message: {}", returned));    rabbitTemplate.convertAndSend(RabbitConst.PUSH_EXCHANGE, RabbitConst.PUSH_ROUTING_KEY, "123");  }

4.测试结果

二、“拉”模式(可批量)

1.定义交换机、队列、路由键常量以及yml配置

import lombok.AccessLevel;import lombok.NoArgsConstructor;/** @author lzhpo */@NoArgsConstructor(access = AccessLevel.PRIVATE)public final class RabbitConst {  public static final String PULL_EXCHANGE = "sun.pull.exchange";  public static final String PULL_QUEUE = "sun.pull.queue";  public static final String PULL_ROUTING_KEY = "sun.pull.routing-key";}

yml配置:

spring:  rabbitmq:    host: 127.0.0.1    port: 5672    username: guest    password: guest    virtual-host: /    # 开启发送失败退回    publisher-returns: true    # 开启消息确认机制    publisher-confirm-type: correlated    template:      # 设置消费者在消息没有被路由到合适队列情况下会被return监听,而不会自动删除      mandatory: true    listener:      type: simple      simple:        # 手动ACK        acknowledge-mode: manual        # 重试次数超过上面的设置之后是否丢弃(要想将无法消费的消息丢到死信队列,此处必须配置为false)        default-requeue-rejected: false        retry:          # 开启重试机制          enabled: true          # 重试次数,默认:3次          max-attempts: 3

2.将交换机、队列、路由键进行绑定

import com.lzhpo.common.queue.rabbitmq.consts.RabbitConst;import org.springframework.amqp.core.Binding;import org.springframework.amqp.core.BindingBuilder;import org.springframework.amqp.core.DirectExchange;import org.springframework.amqp.core.Queue;import org.springframework.amqp.rabbit.connection.ConnectionFactory;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.beans.factory.config.ConfigurableBeanFactory;import org.springframework.context.annotation.Bean;import org.springframework.context.annotation.Configuration;import org.springframework.context.annotation.Scope;/** @author lzhpo */@Configurationpublic class RabbitConfig {  /**   * Prototype for {@link RabbitTemplate}   *   * @param connectionFactory {@link ConnectionFactory}   * @return {@link RabbitTemplate}   */  @Bean  @Scope(ConfigurableBeanFactory.SCOPE_PROTOTYPE)  public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {    RabbitTemplate template = new RabbitTemplate(connectionFactory);    template.setMandatory(true);    return template;  }  /**   * Create {@link RabbitConst#PULL_EXCHANGE} direct type exchange   *   * @return {@link DirectExchange}   */  @Bean  public DirectExchange pullDirectExchange() {    return new DirectExchange(RabbitConst.PULL_EXCHANGE, true, false);  }  /**   * Create {@link RabbitConst#PULL_QUEUE} queue   *   * @return {@link Queue}   */  @Bean  public Queue pullQueue() {    return new Queue(RabbitConst.PULL_QUEUE, true);  }  /**   * Binding {@link RabbitConfig#pullQueue()} with {@link RabbitConfig#pullQueue()}   *   * @return {@link Binding}   */  @Bean  public Binding pullBinding() {    return BindingBuilder.bind(pullQueue())        .to(pullDirectExchange())        .with(RabbitConst.PULL_ROUTING_KEY);  }}

3.编写简单测试用例

import cn.hutool.extra.spring.SpringUtil;import com.lzhpo.common.queue.rabbitmq.TestRabbitMqApp;import com.lzhpo.common.queue.rabbitmq.consts.RabbitConst;import com.lzhpo.common.queue.rabbitmq.utils.MessageUtil;import com.rabbitmq.client.GetResponse;import java.util.ArrayList;import java.util.List;import java.util.Objects;import lombok.Data;import lombok.extern.slf4j.Slf4j;import org.junit.jupiter.api.Test;import org.springframework.amqp.rabbit.core.RabbitTemplate;import org.springframework.boot.test.context.SpringBootTest;/** @author lzhpo */@Slf4j@SpringBootTest(classes = TestRabbitMqApp.class)class PullConsumerTest {  @Test  void producer() {    for (int i = 1; i <= 10; i++) {      SampleMessage sampleMessage = new SampleMessage();      sampleMessage.setId(i);      sampleMessage.setName("Lewis-" + i);      String messageJson = MessageUtil.toJsonString(sampleMessage);      RabbitTemplate rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);      // 消息发送到RabbitMQ交换机后接收ack回调(不能依靠此ack值判断消息是否被消费者成功消费)      rabbitTemplate.setConfirmCallback(          (correlationData, ack, cause) -> {            if (ack) {              log.info("Successfully to send message to exchange! message: {}", messageJson);            } else {              log.error(                  "Failed to send message to exchange! message: {},correlationData: {}, cause: {}",                  messageJson,                  correlationData,                  cause);            }          });      rabbitTemplate.setMandatory(true);      // 消息发送到RabbitMQ交换机,但无相应queue时的回调      rabbitTemplate.setReturnsCallback(          returned ->              log.error(                  "The message sent has no corresponding queue! Returned message: {}", returned));      rabbitTemplate.convertAndSend(RabbitConst.PULL_EXCHANGE, RabbitConst.PULL_ROUTING_KEY, messageJson);    }  }  @Test  void consumer() {    // 要拉取的消息条数    int batchSize = 2;    RabbitTemplate rabbitTemplate = SpringUtil.getBean(RabbitTemplate.class);    rabbitTemplate.execute(        channel -> {          List<SampleMessage> messageBodyList = new ArrayList<>(batchSize);          long deliveryTag = 0;          while (messageBodyList.size() < batchSize) {            GetResponse response = channel.basicGet(RabbitConst.PULL_QUEUE, false);            log.info("The result of pulling the message response this time: {}", response);            if (Objects.isNull(response)) {              break;            }            byte[] bodyBytes = response.getBody();            String json = new String(bodyBytes);            SampleMessage messageBody = MessageUtil.parse(json, SampleMessage.class);            messageBodyList.add(messageBody);            deliveryTag = response.getEnvelope().getDeliveryTag();          }          log.info(              "A total of {} messages were pulled this time:{}",              messageBodyList.size(),              messageBodyList);          try {            // 可以在此做一些业务操作            // 也可以将此消费者测试用例封装起来,用java.util.function.Consumer作为参数传入,在此进行consumer.accept(messageBodyList),以确保业务操作成功之后ack            channel.basicAck(deliveryTag, true);          } catch (Exception e) {            log.error("Abnormal consumption!", e);            try {              channel.basicNack(deliveryTag, true, true);            } catch (Exception ex) {              log.error("Reject message exception!", ex);            }          }          // Or you can return the result to handle it yourself,          // but you must ensure that the ack will not affect you          return null;        });  }  @Data  private static class SampleMessage {    private int id;    private String name;  }}

上面消费者当中,可以手动指定要拉取的消息条数,拉取之后也可以做一些业务操作,然后再进行ack(确保业务操作成功之后再ack)

4.生产者发送消息

RabbitMQ控制台同样也可以看得到有10条消息:

5.手动拉取两条消息进行消费

RabbitMQ控制台也看得到少了两条已消费的消息:

关于deliveryTag

deliveryTag:消息投递序号。范围是1~9223372036854775807(long类型最大值)。

当注册消费者时,RabbitMQ将使用basic.deliver方法传递消息,该方法带有一个deliveryTag,它唯一地标识一个channel上的传递,因此,deliveryTag的范围仅限于channel

在手动ack的时候,deliveryTag也可用于ack、nack、reject等操作。

手动应答同时也可被批处理,它可以减少网络流量。
ack、nack的mutiple参数设置为true,则可以一次性应答deliveryTag小于等于传入值的所有应答。

// 手动消费,第二个参数即为mutiplechannel.basicAck(deliveryTag, true);
// 手动拒绝消息// 第二个参数即为mutiple// 第三个参数为requeue:被拒绝的消息是否应该重新排队而不是丢弃/死信channel.basicNack(deliveryTag, true, true);

在最前面的”推“模式的消费者中,mutiple参数就是为false,即一条一条消费。

参考文档:https://www.rabbitmq.com/confirms.html


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK