1

【Redis场景2】缓存更新策略(双写一致) - xbhog

 1 year ago
source link: https://www.cnblogs.com/xbhog/p/17004151.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

【Redis场景2】缓存更新策略(双写一致)

在业务初始阶段,流量很少的情况下,通过直接操作数据是可行的操作,但是随着业务量的增长,用户的访问量也随之增加,在该阶段自然需要使用一些手段(缓存)来减轻数据库的压力;所谓遇事不决,那就加一层。

在当前技术栈中,redis当属缓存的第一梯队了,但是随着缓存的引入,业务架构和问题也随之而来。

缓存好处:

  1. 降低后端负载
  2. 提高读写效率,降低响应时间

缓存成本:

  1. 数据一致性成本
  2. 代码维护成本

缓存更新策略

内存淘汰:

redis自动进行,当redis内存达到咱们设定的max-memery的时候,会自动触发淘汰机制,淘汰掉一些不重要的数据(可以自己设置策略方式)

宝塔redis配置图:

img

超时剔除:当我们给redis设置了过期时间ttl之后,redis会将超时的数据进行删除,方便咱们继续使用缓存

主动更新:我们可以手动调用方法把缓存删掉,通常用于解决缓存和数据库不一致问题

业务场景:

  1. 低一致性需求:使用内存淘汰机制。
  2. 高一致性需求:主动更新,并以超时剔除作为兜底方案

数据缓存不一致的解决方案

  • 删除缓存还是更新缓存?

    • 更新缓存:每次更新数据库都更新缓存,无效写操作较多
    • 删除缓存(V):更新数据库时让缓存失效,查询时再更新缓存
  • 如何保证缓存与数据库的操作的同时成功或失败?

    • 单体系统,将缓存与数据库操作放在一个事务
    • 分布式系统,利用TCC等分布式事务方案
  • 先操作缓存还是先操作数据库?

    • 先删除缓存,再操作数据库
    • 先操作数据库,再删除缓存(V)

结论:先操作数据库,在操作缓存

img

第一种(淘汰):

假设线程1先来,他先把缓存删了,此时线程2过来,他查询缓存数据并不存在,此时他写入缓存,当他写入缓存后,线程1再执行更新动作时,实际上写入的就是旧的数据,新的数据被旧数据覆盖了。

第二种:也会出现一个时差的问题,但是需要满足以下条件

  1. 两个读写线程同时访问

  2. 缓存刚好失效(查询未命中)

  3. 在线程一写入缓存的时间内,线程二要完成数据库的更新和删除缓存

    1. 缓存写入速度很快
    2. 写数据库一般会先「加锁」,所以写数据库,通常是要比读数据库的时间更长的

以上择优原则先操作数据后删除缓存的

该场景实现流程:以下分析结合部分代码(聚焦于redis的实现);

完整后端代码可在Github中获取:https://github.com/xbhog/hm-dianping

img

开发流程:

【查询店铺缓存流程】

  1. 从redis中查询店铺信息

    1. 命中缓存:返回店铺信息
    2. 未命中:查询数据库(2)
  2. 查询数据库

  3. 结果为空:店铺信息不存在

  4. 设置店铺缓存

public Result queryById(Long id) {
    //从redis查询商铺信息
    String shopInfo = stringRedisTemplate.opsForValue().get(SHOP_CACHE_KEY + id);
    //命中缓存,返回店铺信息
    if(StrUtil.isNotBlank(shopInfo)){
        Shop shop = JSONUtil.toBean(shopInfo, Shop.class);
        return Result.ok(shop);
    }
    //未命中缓存
    Shop shop = getById(id);
    if(Objects.isNull(shop)){
        return Result.fail("店铺不存在");
    }
    //对象转字符串
    stringRedisTemplate.opsForValue().set(SHOP_CACHE_KEY+id,JSONUtil.toJsonStr(shop),30L, TimeUnit.MINUTES);
    return Result.ok(shop);
}

在设置店铺缓存的时候,设置了失效时间(保证缓存的利用率)---满足高一致性需求:主动更新,并以超时剔除作为兜底方案;

然后在后台修改店铺信息的时候,先修改数据库,然后删除缓存;

@Override
@Transactional
public Result updateShopById(Shop shop) {
    Long id = shop.getId();
    if(ObjectUtil.isNull(id)){
        return Result.fail("====>店铺ID不能为空");
    }
    log.info("====》开始更新数据库");
    //更新数据库
    updateById(shop);
    stringRedisTemplate.delete(SHOP_CACHE_KEY + id);
    return Result.ok();
}

这里有一个点,在方法上设置事务,当数据库更新成功,删除缓存(相当于更新缓存);因为这里删除缓存后,下次访问店铺信息的时候,查询数据库会重新建立缓存。

虽然上述删除缓存的不管在前还是后面流程异常,都不会影响缓存的使用。但是不是双方一致,而是有所取舍(舍的缓存);

保证数据库和缓存都一致的方式:

重试:****无论是先操作缓存,还是先操作数据库,但凡后者执行失败了,我们就可以发起重试,尽可能地去做「补偿」。

  1. 同步重试(不可取)
  • 立即重试很大概率还会失败
  • 重试次数取值
  • 重试会占用当前这个线程资源,阻塞操作。
  1. 异步重试(MQ)
  2. canal

异步重试:RocketMQ

完整后端代码可在Github中获取:https://github.com/xbhog/hm-dianping

RocketMQ集群的搭建和使用:https://www.cnblogs.com/xbhog/p/17003037.html

在上面店铺信息修改的时候,我们更新了数据库后删除redis缓存,为了避免第二步的执行失败,我们将redis的操作放到消息队列中,由消费者来操作缓存。

引用:

缓存和数据库一致性问题,看这篇就够了

  • 消息队列保证可靠性:写到队列中的消息,成功消费之前不会丢失(重启项目也不担心)
  • 消息队列保证消息成功投递:下游从队列拉取消息,成功消费后才会删除消息,否则还会继续投递消息给消费者(符合我们重试的场景)

至于写队列失败和消息队列的维护成本问题:

  • 写队列失败:操作缓存和写消息队列,「同时失败」的概率其实是很小的
  • 维护成本:我们项目中一般都会用到消息队列,维护成本并没有新增很多

代码实现:

配置pom.xml和application.yaml

<rocketmq-spring-boot-starter-version>2.0.3</rocketmq-spring-boot-starter-version>

<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-client</artifactId>
    <version>4.9.3</version>
</dependency>
<dependency>
    <groupId>org.apache.rocketmq</groupId>
    <artifactId>rocketmq-spring-boot-starter</artifactId>
    <version>${rocketmq-spring-boot-starter-version}</version>
</dependency>
rocketmq:
  name-server: xxx.xxx.xxx.174:9876;xxx.xxx.xxx.246:9876
  producer:
    group: shopDataGroup

在更新店铺的操作中引入MQ,异步发送信息:

@Override
@Transactional
public Result updateShopById(Shop shop) {
    Long id = shop.getId();
    if(ObjectUtil.isNull(id)){
        return Result.fail("====>店铺ID不能为空");
    }
    log.info("====》开始更新数据库");
    //更新数据库
    updateById(shop);
    String shopRedisKey = SHOP_CACHE_KEY + id;
    Message message = new Message(TOPIC_SHOP,"shopRe",shopRedisKey.getBytes());
    //异步发送MQ
    try {
        rocketMQTemplate.getProducer().send(message);
    } catch (Exception e) {
        log.info("=========>发送异步消息失败:{}",e.getMessage());
    }
    //stringRedisTemplate.delete(SHOP_CACHE_KEY + id);
    //int i = 1/0;  验证异常流程后,
    return Result.ok();
}

设置消费者监听器:

package com.hmdp.mq;
/**
 * @author xbhog
 * @describe:
 * @date 2022/12/21
 */
@Slf4j
@Component
@RocketMQMessageListener(topic = TOPIC_SHOP,consumerGroup = "shopRe",
        messageModel = MessageModel.CLUSTERING)
public class RocketMqNessageListener  implements RocketMQListener<MessageExt> {
    @Resource
    private StringRedisTemplate stringRedisTemplate;

    @SneakyThrows
    @Override
    public void onMessage(MessageExt message) {
        log.info("========>异步消费开始");
        String body = null;
        body = new String(message.getBody(), "UTF-8");
        stringRedisTemplate.delete(body);
        int reconsumeTimes = message.getReconsumeTimes();
        log.info("======>重试次数{}",reconsumeTimes);
        if(reconsumeTimes > 3){
            log.info("消费失败:{}",body);
            return;
        }
        throw new RuntimeException("模拟异常抛出");
    }

}

查看重试结果:

 ====》开始更新数据库
36:29.174 DEBUG 69636 --- [nio-8081-exec-2] com.hmdp.mapper.ShopMapper.updateById    : ==>  Preparing: UPDATE tb_shop SET name=?, type_id=?, area=?, address=?, avg_price=?, sold=?, comments=?, score=?, open_hours=? WHERE id=?
36:29.192 DEBUG 69636 --- [nio-8081-exec-2] com.hmdp.mapper.ShopMapper.updateById    : ==> Parameters: 102茶餐厅(String), 1(Long), 大关(String), 金华路锦昌文华苑29号(String), 80(Long), 4215(Integer), 3035(Integer), 37(Integer), 10:00-22:00(String), 1(Long)
36:29.301 DEBUG 69636 --- [nio-8081-exec-2] com.hmdp.mapper.ShopMapper.updateById    : <==    Updates: 1
36:29.744  INFO 69636 --- [Thread_shopRe_1] com.hmdp.mq.RocketMqNessageListener      : ========>异步消费开始
36:30.011  INFO 69636 --- [Thread_shopRe_1] com.hmdp.mq.RocketMqNessageListener      : ======>重试次数0
36:30.014  WARN 69636 --- [Thread_shopRe_1] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:.......

java.lang.RuntimeException: 模拟异常抛出
	.......

36:42.636  INFO 69636 --- [Thread_shopRe_2] com.hmdp.mq.RocketMqNessageListener      : ========>异步消费开始
36:42.689  INFO 69636 --- [Thread_shopRe_2] com.hmdp.mq.RocketMqNessageListener      : ======>重试次数1
36:42.689  WARN 69636 --- [Thread_shopRe_2] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:.......

java.lang.RuntimeException: 模拟异常抛出
	.......

37:12.764  INFO 69636 --- [Thread_shopRe_3] com.hmdp.mq.RocketMqNessageListener      : ========>异步消费开始
37:12.820  INFO 69636 --- [Thread_shopRe_3] com.hmdp.mq.RocketMqNessageListener      : ======>重试次数2
37:12.821  WARN 69636 --- [Thread_shopRe_3] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt .......

java.lang.RuntimeException: 模拟异常抛出
	.......

38:12.896  INFO 69636 --- [Thread_shopRe_4] com.hmdp.mq.RocketMqNessageListener      : ========>异步消费开始
38:12.960  INFO 69636 --- [Thread_shopRe_4] com.hmdp.mq.RocketMqNessageListener      : ======>重试次数3
38:12.960  WARN 69636 --- [Thread_shopRe_4] a.r.s.s.DefaultRocketMQListenerContainer : consume message failed. messageExt:MessageExt .......

java.lang.RuntimeException: 模拟异常抛出
	.......
40:13.045  INFO 69636 --- [Thread_shopRe_5] com.hmdp.mq.RocketMqNessageListener      : ========>异步消费开始
40:13.110  INFO 69636 --- [Thread_shopRe_5] com.hmdp.mq.RocketMqNessageListener      : ======>重试次数4
40:13.110  INFO 69636 --- [Thread_shopRe_5] com.hmdp.mq.RocketMqNessageListener      : 消费失败:cache:shop:1

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK