2

Reactive分布式锁-Redis实现

 3 years ago
source link: https://segmentfault.com/a/1190000040036458
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Reactive分布式锁-Redis实现

  • 1 . 原有授权项目集成了Spring中的RedisLockRegistry以实现分布式锁,在迁移授权服务为Reactive编程的时候,需要实现Reactive方式的分布式锁实现(Reference[1])。
  • 2 . 原有RedisLockRegistry是基于Lua-ScriptThreadId来进行处理的。
  • 3 . 主要目的是保持迁移后的项目中原有业务逻辑不变,并可保证并发问题。

技术方案及难点

  • 1 . 由于Reactive的编程模式相对于传统编程模式的变化,在Reactor-Netty的Event-Loop环境下,无法再使用线程ID进行逻辑区分.但仍然可以使用Redis的Lua-Script来实现并发控制
  • 2 . 在并发时,无法再使用传统的while(true) {... break}Thread.sleep的方式,来等待获取锁和检查锁状态。需要转换思路,使用Reactive的方式进行处理。
  • 3 . 最终实现和RedisLockRegistry基本保持一致的锁处理方案,并适配Reactive环境
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-webflux</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis-reactive</artifactId>
    <optional>true</optional>
</dependency>
<dependency>
    <groupId>io.projectreactor.addons</groupId>
    <artifactId>reactor-extra</artifactId>
</dependency>
  • 1 . 锁处理Lua-Script
private static final String OBTAIN_LOCK_SCRIPT = "local lockSet = redis.call('SETNX', KEYS[1], ARGV[1])\n" +
            "if lockSet == 1 then\n" +
            "  redis.call('PEXPIRE', KEYS[1], ARGV[2])\n" +
            "  return true\n" +
            "else\n" +
            "  return false\n" +
            "end";
  • 2 . 核心获取锁代码片段
/**
 * execute redis-script to obtain lock
 * @return if obtain success then return true otherwise return false
 */
private Mono<Boolean> obtainLock() {
    return Mono.from(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
                    .execute(ReactiveRedisDistributedLockRegistry.this.obtainLockScript,
                            Collections.singletonList(this.lockKey),
                            List.of(this.lockId,String.valueOf(ReactiveRedisDistributedLockRegistry.this.expireAfter)))
            )
            .map(success -> {
                boolean result = Boolean.TRUE.equals(success);
                if (result) {
                    this.lockedAt = System.currentTimeMillis();
                }
                return result;
            });
}
  • 3 . 核心释放锁代码片段
/**
 * remove redis lock key
 * @return
 */
private Mono<Boolean> removeLockKey() {
    return Mono.just(ReactiveRedisDistributedLockRegistry.this.unlinkAvailable)
            .filter(unlink -> unlink)
            .flatMap(unlink -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate
                    .unlink(this.lockKey)
                    .doOnError(throwable -> {
                        ReactiveRedisDistributedLockRegistry.this.unlinkAvailable = false;
                        if (log.isDebugEnabled()) {
                            log.debug("The UNLINK command has failed (not supported on the Redis server?); " +
                                    "falling back to the regular DELETE command", throwable);
                        } else {
                            log.warn("The UNLINK command has failed (not supported on the Redis server?); " +
                                    "falling back to the regular DELETE command: " + throwable.getMessage());
                        }
                    })
                    .onErrorResume(throwable -> ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey))
            )
            .switchIfEmpty(ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.delete(this.lockKey))
            .then(Mono.just(true));
}
  • 5 . 检查锁是否被占用代码片段
/**
 * check is the acquired is in this process
 * @return
 */
Mono<Boolean> isAcquiredInThisProcess() {
    return ReactiveRedisDistributedLockRegistry.this.reactiveStringRedisTemplate.opsForValue()
            .get(this.lockKey)
            .map(this.lockId::equals);
}
  • 4 . 基础锁接口定义
public interface ReactiveDistributedLock {

    /**
     * get lock Key
     * @return
     */
    String getLockKey();

    /**
     * Try to acquire the lock once. Lock is acquired for a pre configured duration.
     * @return if lock succeeded then return true otherwise return false
     * <strong>if flow is empty default return false</strong>
     */
    Mono<Boolean> acquireOnce();

    /**
     * Try to acquire the lock. Lock is acquired for a pre configured duration.
     * @return
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
     */
    Mono<Boolean> acquire();

    /**
     * Try to acquire the lock for a given duration.
     * @param duration duration in used
     * @return
     * <strong>the given duration must less than the default duration.Otherwise the lockKey well be expire by redis with default expire duration</strong>
     * <strong>if flow is empty then throw an excpetion {@link CannotAcquireLockException}</strong>
     */
    Mono<Boolean> acquire(Duration duration);

    /**
     * Release the lock.
     * @return
     * <strong>if lock key doesn't exist in the redis,then throw an exception {@link IllegalStateException}</strong>
     */
    Mono<Boolean> release();
    
}    
  • 5 . 基础锁接口实现
private final class ReactiveRedisDistributedLock implements ReactiveDistributedLock {

        @Override
        public String getLockKey() {
            return this.lockKey;
        }

        @Override
        public Mono<Boolean> acquireOnce() {
            log.debug("Acquire Lock Once,LockKey:{}",this.lockKey);
            return this.obtainLock()
                    .doOnNext(lockResult -> log.info("Obtain Lock Once,LockKey:{},Result:{}",this.lockKey,lockResult))
                    .doOnError(this::rethrowAsLockException);
        }

        @Override
        public Mono<Boolean> acquire() {
            log.debug("Acquire Lock By Default Duration :{}" ,expireDuration);
            // 这里使用默认配置的最大等待时间获取锁
            return this.acquire(ReactiveRedisDistributedLockRegistry.this.expireDuration);
        }

        @Override
        public Mono<Boolean> acquire(Duration duration) {
          //尝试获取锁
            return this.obtainLock()
                 //过滤获取锁成功
                    .filter(result -> result)
                    //如果是Empty,则重试
                    .repeatWhenEmpty(Repeat.onlyIf(repeatContext -> true)
                           //重试超时时间
                            .timeout(duration)
                            //重试间隔
                            .fixedBackoff(Duration.ofMillis(100))
                            .//重试日志记录
                            .doOnRepeat(objectRepeatContext -> {
                                if (log.isTraceEnabled()) {
                                    log.trace("Repeat Acquire Lock Repeat Content:{}",objectRepeatContext);
                                }
                            })
                    )
                    //这里必须使用 `defaultIfEmpty`,在repeat超时后,整个流的信号会变为empty,如果不处理empty则整个留就中断了或者由最外层的empty处理方法处理
                    .defaultIfEmpty(false)
                    //记录上锁结果日志
                    .doOnNext(lockResult -> log.info("Obtain Lock,Lock Result :{},Lock Info:{}",lockResult,this))
                    //如果出错,则抛出异常信息
                    .doOnError(this::rethrowAsLockException);
        }

        @Override
        public Mono<Boolean> release() {
          //检查当前锁是否是自己占用
            return this.isAcquiredInThisProcess()
                 //占用的锁
                    .filter(isThisProcess -> isThisProcess)
                    //释放锁
                    .flatMap(isThisProcess -> this.removeLockKey()
                           //记录日志
                            .doOnNext(releaseResult -> log.info("Released Lock:{},Lock Info:{}",releaseResult,this))
                            //出现未知异常,则重新抛出
                            .onErrorResume(throwable -> Mono.fromRunnable(() -> ReflectionUtils.rethrowRuntimeException(throwable)))
                            //如果流是empty,则表示,锁已经不存在了,被Redis配置的最大过期时间释放
                            .switchIfEmpty(Mono.error(new IllegalStateException("Lock was released in the store due to expiration. " + "The integrity of data protected by this lock may have been compromised.")))
                    );
        }
}
  • 6 . 内置定时任务,用于检测过期没在使用的RedisLock,并释放内存缓存。定时任务挂载SpringBean的声明周期,已完成定时任务启动和关闭。(InitializingBean,DisposableBean)
private Scheduler scheduler = Schedulers.newSingle("redis-lock-evict",true);

//挂载Spring 声明周期
@Override
public void afterPropertiesSet() {
    log.debug("Initialize Auto Remove Unused Lock Execution");
    //使用Flux的特性来实现定时任务
    Flux.interval(expireEvictIdle, scheduler)
            .flatMap(value -> {
                long now = System.currentTimeMillis();
                log.trace("Auto Remove Unused Lock ,Evict Triggered");
                return Flux.fromIterable(this.locks.entrySet())
                        //过滤已经过期的锁对象
                        .filter(entry -> now - entry.getValue().getLockedAt() > expireAfter)
                        //将没有被占用的锁删除
                        .flatMap(entry -> entry.getValue()
                                .isAcquiredInThisProcess()
                                .filter(inProcess -> !inProcess)
                                .doOnNext(inProcess -> {
                                    this.locks.remove(entry.getKey());
                                    log.debug("Auto Remove Unused Lock,Lock Info:{}", entry);
                                })
                                //错误记录日志
                                .onErrorResume(throwable -> {
                                    log.error("Auto Remove Unused Locks Occur Exception,Lock Info: " + entry, throwable);
                                    return Mono.empty();
                                })
                        );
            })
            //Scheduler 需要订阅才能执行
            .subscribe();
}

@Override
public void destroy() {
    log.debug("Shutdown Auto Remove Unused Lock Execution");
    //挂载SpringBean声明周期,销毁Scheduler
    this.scheduler.dispose();
}
  • 7 .优化锁接口处理逻辑,增加接口默认方法,便于锁控制和处理。将锁下游执行逻辑包装成Supplier便于调用和处理
/**
 * Acquire a lock and release it after action is executed or fails.
 *
 * @param <T>  type od value emitted by the action
 * @param executionSupplier to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
 */
default <T> Mono<T> acquireAndExecute(Supplier<Mono<T>> executionSupplier) {
    return acquire()
            .flatMap(acquireResult -> Mono.just(acquireResult)
                        .filter(result -> result)
                        //这里配合上锁逻辑,如果是空,则表示无法获取锁
                        .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                        .flatMap(lockResult -> executionSupplier
                                .get()
                                .flatMap(result -> this.release()
                                        .flatMap(releaseResult -> Mono.just(result))
                                )
                                .switchIfEmpty(this.release().then(Mono.empty()))
                                .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                        )
            );
}

/**
 * Acquire a lock for a given duration and release it after action is executed.
 *
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
 */
default <T> Mono<T> acquireAndExecute(Duration duration, Supplier<Mono<T>> executionSupplier) {
    return acquire(duration)
            .flatMap(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMap(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().then(Mono.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}

/**
 * Acquire a lock and release it after action is executed or fails.
 *
 * @param <T>  type od value emitted by the action
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true if lock is acquired.
 * @see ReactiveDistributedLock#acquire()
 */
default <T> Flux<T> acquireAndExecuteMany(Supplier<Flux<T>> executionSupplier) {
    return acquire()
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().thenMany(Flux.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}

/**
 * Acquire a lock for a given duration and release it after action is executed.
 *
 * @param <T>      type od value emitted by the action
 * @param duration how much time must pass for the acquired lock to expire
 * @param executionSupplier     to be executed subscribed to when lock is acquired
 * @return true, if lock is acquired
 * @see ReactiveDistributedLock#acquire(Duration)
 */
default <T> Flux<T> acquireAndExecuteMany(Duration duration, Supplier<Flux<T>> executionSupplier) {
    return acquire(duration)
            .flatMapMany(acquireResult -> Mono.just(acquireResult)
                    .filter(result -> result)
                    .switchIfEmpty(Mono.error(new CannotAcquireLockException("Failed to Obtain Lock ,LockKey: " + getLockKey())))
                    .flatMapMany(lockResult -> executionSupplier
                            .get()
                            .flatMap(result -> this.release()
                                    .flatMap(releaseResult -> Mono.just(result))
                            )
                            .switchIfEmpty(this.release().thenMany(Flux.empty()))
                            .onErrorResume(throwable -> this.release().flatMap(r -> Mono.error(throwable)))
                    )
            );
}
  • application.yml中进行参数配置
lock:
  redis:
    reactive:
      expire-after: 10s
      expire-evict-idle: 1s
  • 注入Bean
@Autowired
private ReactiveRedisDistributedLockRegistry reactiveRedisDistributedLockRegistry;
  • 1 . 上锁一次,快速失败
@Test
public void testAcquireOnce() throws Exception {
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_ONCE";
    Flux<String> flux = Flux.range(0, 5)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireOnce()
                    .filter(acquireResult -> acquireResult)
                    .flatMap(acquireResult -> processFunctions.processFunction())
                    .switchIfEmpty(Mono.just(FAILED))
            )
            .doOnNext(System.out::println);
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .expectNext(FAILED)
            .verifyComplete();

}
  • 2 . 上锁等待默认超时时间
@Test
public void testAcquireDefaultDurationAndProcessDuringTheExpireDuration() throws Exception {
    //default lock expire is 10S
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_DEFAULT";
    Flux<String> flux = Flux.range(0, 3)
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(() ->
                            processFunctions.processDelayFunction(Duration.ofSeconds(2))
                    )
                    .doOnNext(System.out::println)
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()),throwable -> {
                        System.out.println("Lock Error");
                        return Mono.just(FAILED);
                    })
            );
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(OK)
            .expectNext(OK)
            .verifyComplete();

}
  • 3 . 上锁指定时间
@Test
public void testAcquireDuration() throws Exception {
    ProcessFunctions processFunctions = new ProcessFunctions();
    String key = "LOCK_GIVEN_DURATION";
    Flux<String> flux = Flux.range(0, 3)
            .subscribeOn(Schedulers.parallel())
            .flatMap(value -> this.reactiveRedisDistributedLockRegistry.obtain(key)
                    .acquireAndExecute(Duration.ofSeconds(3), () ->
                            processFunctions.processDelayFunction(Duration.ofSeconds(2))
                    )
                    .doOnNext(System.out::println)
                    .onErrorResume(throwable -> CannotAcquireLockException.class.isAssignableFrom(throwable.getClass()), throwable -> {
                        System.out.println("Lock Error");
                        return Mono.just(FAILED);
                    })
            );
    StepVerifier.create(flux)
            .expectNext(OK)
            .expectNext(FAILED)
            .expectNext(OK)
            .verifyComplete();

}
  • 1 . RedisLockRegistry: https://docs.spring.io/spring-integration/docs/5.3.6.RELEASE/reference/html/redis.html#redis-lock-registry
  • 2 . Trigger Mono Execution After Another Mono Terminates: https://stackoverflow.com/questions/50686524/how-to-trigger-mono-execution-after-another-mono-terminates
  • 维护在GitHub,欢迎Issue和Star reactive-redis-distributed-lock
  • 目前以SpringBoot脚手架的形式编写,并没有发布到Maven中央仓库,若有需要可以自行打包。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK