9

手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(九) - Spring A...

 2 years ago
source link: https://www.cnblogs.com/xxyopen/p/16350743.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(九) - Spring AMQP 集成与配置

手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(一) - 介绍
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(二) - 数据库设计
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(三) - 项目初始化
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(四) - 日志 & 跨域配置
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(五) - MyBatis-Plus & 代码生成器集成与配置
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(六) - 本地缓存 Caffeine 和 分布式缓存 Redis 集成与配置
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(七) - Elasticsearch 8.2 集成与配置
手把手教你使用 Spring Boot 3 开发上线一个前后端分离的生产级系统(八) - XXL-JOB 集成与配置

Spring AMQP 介绍

AMQP(高级消息队列协议)是一个异步消息传递所使用的应用层协议规范,为面向消息的中间件设计,不受产品和开发语言的限制. Spring AMQP 将核心 Spring 概念应用于基于 AMQP 消息传递解决方案的开发。

RabbitMQ 是基于 AMQP 协议的轻量级、可靠、可扩展、可移植的消息中间件,Spring 使用 RabbitMQ 通过 AMQP 协议进行通信。Spring Boot 为通过 RabbitMQ 使用 AMQP 提供了多种便利,包括 spring-boot-starter-amqp “Starter”。

Spring AMQP 集成与配置

  1. 可通过如下 Docker 命令 安装 RabbiMQ:
docker run -it --rm --name rabbitmq -p 5672:5672 -p 15672:15672 rabbitmq:3.10-management
  1. 登录 RabbiMQ 的 web 管理界面,创建虚拟主机novel:
RabbitMQ
  1. 项目中加入如下的 maven 依赖:
<dependency> <groupId>org.springframework.boot</groupId> <artifactId>spring-boot-starter-amqp</artifactId></dependency>
  1. 在 application.yml 配置文件中加入 RabbitMQ 的连接配置:
spring: rabbitmq: addresses: "amqp://guest:[email protected]" virtual-host: novel template: retry: # 开启重试 enabled: true # 最大重试次数 max-attempts: 3 # 第一次和第二次重试之间的持续时间 initial-interval: "3s"
  1. 此时已经可以在 Spring Beans 中注入 AmqpTemplate 发送消息了。

Spring AMQP 使用示例

  1. io.github.xxyopen.novel.core.constant包下创建 AMQP 相关常量类:
/** * AMQP 相关常量 * * @author xiongxiaoyang * @date 2022/5/25 */public class AmqpConsts { /** * 小说信息改变 MQ * */ public static class BookChangeMq{ /** * 小说信息改变交换机 * */ public static final String EXCHANGE_NAME = "EXCHANGE-BOOK-CHANGE"; /** * Elasticsearch book 索引更新的队列 * */ public static final String QUEUE_ES_UPDATE = "QUEUE-ES-BOOK-UPDATE"; /** * Redis book 缓存更新的队列 * */ public static final String QUEUE_REDIS_UPDATE = "QUEUE-REDIS-BOOK-UPDATE"; // ... 其它的更新队列 } }
  1. io.github.xxyopen.novel.core.config包下创建 AMQP 配置类,配置各个交换机、队列以及绑定关系:
/** * AMQP 配置类 * * @author xiongxiaoyang * @date 2022/5/25 */@Configurationpublic class AmqpConfig { /** * 小说信息改变交换机 */ @Bean public FanoutExchange bookChangeExchange() { return new FanoutExchange(AmqpConsts.BookChangeMq.EXCHANGE_NAME); } /** * Elasticsearch book 索引更新队列 */ @Bean public Queue esBookUpdateQueue() { return new Queue(AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE); } /** * Elasticsearch book 索引更新队列绑定到小说信息改变交换机 */ @Bean public Binding esBookUpdateQueueBinding() { return BindingBuilder.bind(esBookUpdateQueue()).to(bookChangeExchange()); } // ... 其它的更新队列以及绑定关系 }
  1. io.github.xxyopen.novel.manager.mq包下创建 AMQP 消息管理类,用来发送各种 AMQP 消息:
/** * AMQP 消息管理类 * * @author xiongxiaoyang * @date 2022/5/25 */@Component@RequiredArgsConstructorpublic class AmqpMsgManager { private final AmqpTemplate amqpTemplate; @Value("${spring.amqp.enable}") private String enableAmqp; /** * 发送小说信息改变消息 */ public void sendBookChangeMsg(Long bookId) { if (Objects.equals(enableAmqp, CommonConsts.TRUE)) { sendAmqpMessage(amqpTemplate, AmqpConsts.BookChangeMq.EXCHANGE_NAME, null, bookId); } } private void sendAmqpMessage(AmqpTemplate amqpTemplate, String exchange, String routingKey, Object message) { // 如果在事务中则在事务执行完成后再发送,否则可以直接发送 if (TransactionSynchronizationManager.isActualTransactionActive()) { TransactionSynchronizationManager.registerSynchronization(new TransactionSynchronization() { @Override public void afterCommit() { amqpTemplate.convertAndSend(exchange, routingKey, message); } }); return; } amqpTemplate.convertAndSend(exchange, routingKey, message); } }
  1. 在小说信息更新后,发送 AMQP 消息:
@Transactional(rollbackFor = Exception.class)@Overridepublic RestResp<Void> saveBookChapter(ChapterAddReqDto dto) { // 1) 保存章节相关信息到小说章节表 // a) 查询最新章节号 // b) 设置章节相关信息并保存 // 2) 保存章节内容到小说内容表 // 3) 更新小说表最新章节信息和小说总字数信息 // a) 更新小说表关于最新章节的信息 // b) 发送小说信息更新的 MQ 消息 amqpMsgManager.sendBookChangeMsg(dto.getBookId()); return RestResp.ok();}
  1. io.github.xxyopen.novel.core.listener包下创建 Rabbit 队列监听器,监听各个 RabbitMQ 队列的消息并处理:
/** * Rabbit 队列监听器 * * @author xiongxiaoyang * @date 2022/5/25 */@Component@RequiredArgsConstructor@Slf4jpublic class RabbitQueueListener { private final BookInfoMapper bookInfoMapper; private final ElasticsearchClient esClient; /** * 监听小说信息改变的 ES 更新队列,更新最新小说信息到 ES * */ @RabbitListener(queues = AmqpConsts.BookChangeMq.QUEUE_ES_UPDATE) @SneakyThrows public void updateEsBook(Long bookId) { BookInfo bookInfo = bookInfoMapper.selectById(bookId); IndexResponse response = esClient.index(i -> i .index(EsConsts.BookIndex.INDEX_NAME) .id(bookInfo.getId().toString()) .document(EsBookDto.build(bookInfo)) ); log.info("Indexed with version " + response.version()); } // ... 监听其它队列,刷新其它副本数据 }

此时,如果需要更新其它小说副本数据,只需要配置更新队列和增加监听器,不需要在小说信息变更的地方增加任何业务代码,而且任意小说副本的数据刷新之间互不影响,真正实现了模块间的解耦。


Recommend

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK