3

冷饭新炒:理解Redisson中分布式锁的实现

 3 years ago
source link: https://www.daqianduan.com/17089.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

前提

在很早很早之前,写过一篇文章介绍过 Redis 中的 red lock 的实现,但是在生产环境中,笔者所负责的项目使用的分布式锁组件一直是 RedissonRedisson 是具备多种内存数据网格特性的基于 Java 编写的 Redis 客户端框架( Redis Java Client with features of In-Memory Data Grid ),基于 Redis 的基本数据类型扩展出很多种实现的高级数据结构,具体见其官方的简介图:

yAfEby.png

本文要分析的 R(ed)Lock 实现,只是其中一个很小的模块,其他高级特性可以按需选用。下面会从基本原理、源码分析和基于 Jedis 仿实现等内容进行展开。本文分析的 Redisson 源码是 2020-01 左右 Redisson 项目的 main 分支源码,对应版本是 3.14.1

基本原理

red lock 的基本原理其实就”光明正大地”展示在 Redis 官网的首页文档中(具体链接是 https://redis.io/topics/distlock ):

nMnqmi.png

摘录一下简介进行翻译:在许多环境中不同进程必须以互斥方式使用共享资源进行操作时,分布式锁是一个非常有用的原语。此试图提供一种更规范的算法来实现Redis的分布式锁。我们提出了一种称为 Redlock 的算法,它实现了 DLM (猜测是 Distributed Lock Manager 的缩写,分布式锁管理器),我们认为它比普通的单实例方法更安全。

算法的三个核心特征(三大最低保证):

  • Safety property (安全性):互斥。确保在任何给定时刻下,只有一个客户端可以持有锁
  • Liveness property A (活性 A ):无死锁。即使存在曾经锁定资源的客户端崩溃或者出现网络分区异常,确保锁总是能够成功获取
  • Liveness property B (活性 B ):容错性。只要大多数 Redis 节点处于正常运行状态,客户端就可以获取和释放锁

文档中还指出了目前算法对于故障转移的实现还存在明显的竞态条件问题(描述的应该是 Redis 主从架构下的问题):

  • 客户端 A 获取 Redis 主节点中的锁(假设锁定的资源为 X
  • Redis 主节点把 KEY 同步到 Redis 从节点之前, Redis 主节点崩溃
  • Redis 从节点因为故障晋升为主节点
  • 此时,客户端 B 获取资源 X 的锁成功,问题是资源 X 的锁在前面已经被客户端 A 获取过,这样就出现了并发问题

算法的实现很简单,单个 Redis 实例下加锁命令如下:

SET $resource_name $random_value NX PX $ttl

这里的 NxPXSET 命令的增强参数,自从 Redis2.6.12 版本起, SET 命令已经提供了可选的复合操作符:

  • EX :设置超时时间,单位是秒
  • PX :设置超时时间,单位是毫秒
  • NXIF NOT EXIST 的缩写,只有 KEY 不存在的前提下才会设置 K-V ,设置成功返回 1 ,否则返回 0
  • XXIF EXIST 的缩写,只有在 KEY 存在的前提下才会设置 K-V ,设置成功返回 1 ,否则返回 0

单个 Redis 实例下解锁命令如下:

# KEYS[1] = $resource_name
# ARGV[1] = $random_value
if redis.call("get",KEYS[1]) == ARGV[1] then
    return redis.call("del",KEYS[1])
else
    return 0
end

使用Redisson中的RLock

使用 RLock 要先实例化 RedissonRedisson 已经适配了 Redis 的哨兵、集群、普通主从和单机模式,因为笔者本地只安装了单机 Redis ,所以这里使用单机模式配置进行演示。实例化 RedissonClient

static RedissonClient REDISSON;

@BeforeClass
public static void beforeClass() throws Exception {
    Config config = new Config();
    // 单机
    config.useSingleServer()
            .setTimeout(10000)
            .setAddress("redis://127.0.0.1:6379");
    REDISSON = Redisson.create(config);
//        // 主从
//        config.useMasterSlaveServers()
//                .setMasterAddress("主节点连接地址")
//                .setSlaveAddresses(Sets.newHashSet("从节点连接地址"));
//        REDISSON = Redisson.create(config);
//        // 哨兵
//        config.useSentinelServers()
//                .setMasterName("Master名称")
//                .addSentinelAddress(new String[]{"哨兵连接地址"});
//        REDISSON = Redisson.create(config);
//        // 集群
//        config.useClusterServers()
//                .addNodeAddress(new String[]{"集群节点连接地址"});
//        REDISSON = Redisson.create(config);
}

加锁和解锁:

@Test
public void testLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    RLock lock = REDISSON.getLock(resourceName);
    Thread threadA = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadA");
    Thread threadB = new Thread(() -> {
        try {
            lock.lock();
            process(resourceName);
        } finally {
            lock.unlock();
            System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
        }
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName) {
    String threadName = Thread.currentThread().getName();
    System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
    try {
        Thread.sleep(1000);
    } catch (InterruptedException ignore) {
    }
}

// 某次执行的输出结果
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁

更多的时候,我们会选用带等待时间周期和锁最大持有时间的 API

@Test
public void testTryLockAndUnLock() throws Exception {
    String resourceName = "resource:x";
    int waitTime = 500;
    int leaseTime = 1000;
    Thread threadA = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadA");
    Thread threadB = new Thread(() -> {
        process(resourceName, waitTime, leaseTime);
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}

private void process(String resourceName, int waitTime, int leaseTime) {
    RLock lock = REDISSON.getLock(resourceName);
    try {
        String threadName = Thread.currentThread().getName();
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
                Thread.sleep(800);
            } finally {
                lock.unlock();
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        } else {
            System.out.println(String.format("线程%s获取资源%s的锁失败,等待时间:%d ms", threadName, resourceName, waitTime));
        }
    } catch (InterruptedException e) {
        Thread.currentThread().interrupt();
    }
}
// 某次执行的输出结果
线程threadA获取到资源resource:x的锁
线程threadB获取资源resource:x的锁失败,等待时间:500 ms
线程threadA释放资源resource:x的锁

为了使用的时候更加简单,可以参考 spring-tx 中的编程式事务那样进行轻度封装:

@RequiredArgsConstructor
private static class RedissonLockProvider {

    private final RedissonClient redissonClient;

    public <T> T executeInLock(String resourceName, LockAction lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            return lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }

    public <T> T executeInLock(String resourceName, int waitTime, int leaseTime, LockAction lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                return lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
        return null;
    }

    public void executeInLockWithoutResult(String resourceName, int waitTime, int leaseTime, LockActionWithoutResult lockAction) throws InterruptedException {
        RLock lock = redissonClient.getLock(resourceName);
        boolean tryLock = lock.tryLock(waitTime, leaseTime, TimeUnit.MILLISECONDS);
        if (tryLock) {
            try {
                lockAction.onAcquire(resourceName);
                lockAction.doInLock(resourceName);
            } finally {
                lock.unlock();
                lockAction.onExit(resourceName);
            }
        }
    }

    public void executeInLockWithoutResult(String resourceName, LockActionWithoutResult lockAction) {
        RLock lock = redissonClient.getLock(resourceName);
        try {
            lock.lock();
            lockAction.onAcquire(resourceName);
            lockAction.doInLock(resourceName);
        } finally {
            lock.unlock();
            lockAction.onExit(resourceName);
        }
    }
}

@FunctionalInterface
interface LockAction {

    default void onAcquire(String resourceName) {

    }

    <T> T doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

@FunctionalInterface
interface LockActionWithoutResult {

    default void onAcquire(String resourceName) {

    }

    void doInLock(String resourceName);

    default void onExit(String resourceName) {

    }
}

使用 RedissonLockProvider (仅供参考):

@Test
public void testRedissonLockProvider() throws Exception {
    RedissonLockProvider provider = new RedissonLockProvider(REDISSON);
    String resourceName = "resource:x";
    Thread threadA = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadA");
    Thread threadB = new Thread(() -> {
        provider.executeInLockWithoutResult(resourceName, new LockActionWithoutResult() {

            @Override
            public void onAcquire(String resourceName) {
                System.out.println(String.format("线程%s获取到资源%s的锁", Thread.currentThread().getName(), resourceName));
            }

            @Override
            public void doInLock(String resourceName) {
                try {
                    Thread.sleep(800);
                } catch (InterruptedException ignore) {

                }
            }

            @Override
            public void onExit(String resourceName) {
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        });
    }, "threadB");
    threadA.start();
    threadB.start();
    Thread.sleep(Long.MAX_VALUE);
}
// 某次执行结果
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁
线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁

Redisson中RLock的实现原理

RedissonRLock 的实现是基本参照了 Redisred lock 算法进行实现,不过在原始的 red lock 算法下进行了改良,主要包括下面的特性:

  • 互斥
  • 无死锁
  • 可重入,类似于 ReentrantLock ,同一个线程可以重复获取同一个资源的锁(一般使用计数器实现),锁的重入特性一般情况下有利于提高资源的利用率
  • 续期 ,这个是一个比较前卫解决思路,也就是如果一个客户端对资源 X 永久锁定,那么并不是直接对 KEY 生存周期设置为 -1 ,而是通过一个守护线程每隔固定周期延长 KEY 的过期时间,这样就能实现 在守护线程不被杀掉的前提下,避免客户端崩溃导致锁无法释放长期占用资源的问题
  • 锁状态变更订阅,依赖于 org.redisson.pubsub.LockPubSub ,用于订阅和通知锁释放事件
  • 不是完全参考 red lock 算法的实现,数据类型选用了 HASH ,配合 Lua 脚本完成多个命令的原子性

续期或者说延长 KEY 的过期时间在 Redisson 使用 watch dog 实现,理解为用于续期的守护线程,底层依赖于 Netty 的时间轮 HashedWheelTimer 和任务 io.netty.util.Timeout 实现, 俗称看门狗 ,下面会详细分析。

先看 RLock 的类图:

AfEjQv.png

这里有一个疑惑点,RedissonRedLock(RedissonMultiLock的子类)的注释中提到RedLock locking algorithm implementation for multiple locks. It manages all locks as one. 但从直观上看,RedissonLock才是整个锁体系的核心,里面的实现思路也是遵从red lock算法的。

RedissonLock 就是 RLock 的直接实现,也是分布式锁实现的核心类,从源码中看到 Redisson#getLock() 就是直接实例化 RedissonLock

public class Redisson implements RedissonClient {
    
    // ...... 省略其他代码

    @Override
    public RLock getLock(String name) {
        return new RedissonLock(connectionManager.getCommandExecutor(), name);
    }

    // ...... 省略其他代码
}

因此只需要围绕 RedissonLock 的源码进行分析即可。 RedissonLock 的类继承图如下:

e2aqeq.png

这里需要有几点认知:

  • RedissonLock 实现了 java.util.concurrent.locks.Lock 接口中除了 newCondition() 方法外的所有方法,也就是可以基本无缝适配 Lock 接口,对于习惯 Lock 接口的 API 的使用者来说是一个福音
  • RedissonLock 基本所有同步 API 都依赖于异步 API 的实现,也就是 RLock 的实现依赖于 RLockAsync 的实现,底层依赖的是 Nettyio.netty.util.concurrent.Promise ,具体见 RedissonPromise ,如果用过 JUC 中的 Future 的开发者应该比较熟悉 Future#get() ,这里的做法类似
  • 右边的几个父类的简单功能描述如下:
    • RObjectAsync :所有 Redisson 对象的基础接口,提供一些内存测量、对象拷贝、移动等的异步方法
    • RObjectRObjectAsync 的同步版本
    • RExpirableAsync :提供对象 TTL 相关的异步方法
    • RExpirableRExpirableAsync 的同步版本
    • RedissonObject :直接实现类 RObject 接口中的方法
    • RedissonExpirable :主要是实现了 RExpirable 接口中的方法

接着先看 RedissonLock 的构造函数和核心属性:

// 存放entryName -> ExpirationEntry,用于获取当前entryName的线程重入计数器和续期任务
private static final ConcurrentMap<String, ExpirationEntry> EXPIRATION_RENEWAL_MAP = new ConcurrentHashMap<>();

// 内部的锁持有的最大时间,来源于参数Config#lockWatchdogTimeout,用于控制续期的周期
protected long internalLockLeaseTime;

// ID,唯一标识,是一个UUID
final String id;

// 
final String entryName;

// 锁释放事件订阅发布相关
protected final LockPubSub pubSub;

// 命令异步执行器实例
final CommandAsyncExecutor commandExecutor;

/**
 * CommandAsyncExecutor是命令的异步执行器,里面的方法是相对底层的面向通讯框架的方法,包括异步写、异步读和同步结果获取等
 * name参数就是getLock()时候传入的参数,其实就是最终同步到Redis中的KEY
 */
public RedissonLock(CommandAsyncExecutor commandExecutor, String name) {
    super(commandExecutor, name);
    this.commandExecutor = commandExecutor;
    // 这里的ID为外部初始化的UUID实例,调用toString()
    this.id = commandExecutor.getConnectionManager().getId();
    this.internalLockLeaseTime = commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout();
    // 这里的entryName = uuid值 + : + 外部传进来的name(KEY),如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
    this.entryName = id + ":" + name;
    // 初始化LockPubSub实例,用于订阅和发布锁释放的事件
    this.pubSub = commandExecutor.getConnectionManager().getSubscribeService().getLockPubSub();
}

// RedissonLock内部类ExpirationEntry,存放着线程重入的计数器和续期的Timeout任务
public static class ExpirationEntry {
    
    // 线程ID -> 线程重入的次数
    private final Map<Long, Integer> threadIds = new LinkedHashMap<>();
    private volatile Timeout timeout;
    
    public ExpirationEntry() {
        super();
    }
    
    // 这个方法主要记录线程重入的计数
    public void addThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            counter = 1;
        } else {
            counter++;
        }
        threadIds.put(threadId, counter);
    }

    public boolean hasNoThreads() {
        return threadIds.isEmpty();
    }

    public Long getFirstThreadId() {
        if (threadIds.isEmpty()) {
            return null;
        }
        return threadIds.keySet().iterator().next();
    }

    public void removeThreadId(long threadId) {
        Integer counter = threadIds.get(threadId);
        if (counter == null) {
            return;
        }
        counter--;
        if (counter == 0) {
            threadIds.remove(threadId);
        } else {
            threadIds.put(threadId, counter);
        }
    }
    
    public void setTimeout(Timeout timeout) {
        this.timeout = timeout;
    }
    public Timeout getTimeout() {
        return timeout;
    }
}

这里需要关注一下 Config 中的 lockWatchdogTimeout 参数:

bIbaQv.png

翻译一下大意: lockWatchdogTimeout 参数只有在没有使用 leaseTimeout 参数定义的成功获取到锁的场景(简单来说就是不设置时限的加锁)下生效,如果看门狗在下一个 lockWatchdogTimeout 周期内不进行续期,那么锁就会过期释放( 从源码上看,每三分之一 lockWatchdogTimeout 就会执行一次续期任务,每次通过 pexpireKEY 的存活周期延长 lockWatchdogTimeout ), lockWatchdogTimeout 的默认值为 30000 ,也就是 30 秒。

这里先列举一下 RedissonLock 中获取名称的方法,以便后面分析这些名称作为 K-V 结构的 KEY 时候使用:

  • id :由配置实例化时候实例化的 UUID 实例生成,从源码上分析每个连接方式的 Redisson 实例有唯一的 UUIDConnectionManager 初始化的时候会调用 UUID id = UUID.randomUUID() ,笔者认为可以理解为 Redisson 实例在某个应用程序进程中的唯一标识,毕竟一般情况下,一个应用程序应该只会应用一种 Redisson 的连接方式
  • getEntryName() :返回的是 UUID + : + $KEY ,例如 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:resource:x
  • getName() :返回的是 $KEY ,例如 resource:x
  • getChannelName() :返回的是 redisson_lock__channel:{$KEY} ,例如 redisson_lock__channel:{resource:x}
  • getLockName(long threadId) :返回的是 UUID + : + $threadId ,例如 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1

接着看加锁的方法,核心实现主要是:

  • private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedExceptionlock 方法体系
  • public boolean tryLock(long waitTime, long leaseTime, TimeUnit unit) throws InterruptedExceptiontryLock 方法体系

先看只包含锁最大持有时间的 lock() 方法体系:

/**
 * 获取锁,不指定等待时间,只指定锁的最大持有时间
 * 通过interruptibly参数配置支持中断
 */
private void lock(long leaseTime, TimeUnit unit, boolean interruptibly) throws InterruptedException {
    long threadId = Thread.currentThread().getId();
    // 尝试获取锁,返回的ttl为空代表获取锁成功,返回的ttl代表已经存在的KEY的剩余存活时间
    Long ttl = tryAcquire(leaseTime, unit, threadId);
    // lock acquired
    if (ttl == null) {
        return;
    }
    // 订阅redisson_lock__channel:{$KEY},其实本质的目的是为了客户端通过Redis的订阅发布,感知到解锁的事件
    // 这个方法会在LockPubSub中注册一个entryName -> RedissonLockEntry的哈希映射,RedissonLockEntry实例中存放着RPromise<RedissonLockEntry>结果,一个信号量形式的锁和订阅方法重入计数器
    // 下面的死循环中的getEntry()或者RPromise<RedissonLockEntry>#getNow()就是从这个映射中获取的
    RFuture<RedissonLockEntry> future = subscribe(threadId);
    // 同步订阅执行,获取注册订阅Channel的响应,区分是否支持中断
    if (interruptibly) {
        commandExecutor.syncSubscriptionInterrupted(future);
    } else {
        commandExecutor.syncSubscription(future);
    }
    // 走到下面的for循环说明返回的ttl不为空,也就是Redis已经存在对应的KEY,有其他客户端已经获取到锁,此客户端线程的调用需要阻塞等待获取锁
    try {
        while (true) {
            // 死循环中尝试获取锁,这个是后面会分析的方法
            ttl = tryAcquire(leaseTime, unit, threadId);
            // 返回的ttl为空,说明获取到锁,跳出死循环,这个死循环或者抛出中断异常,或者获取到锁成功break跳出,没有其他方式
            if (ttl == null) {
                break;
            }

            // 这个ttl来源于等待存在的锁的KEY的存活时间,直接使用许可为0的信号量进行阻塞等待,下面的几个分支判断都是大同小异,只是有的支持超时时间,有的支持中断
            // 有的是永久阻塞直到锁释放事件订阅LockPubSub的onMessage()方法回调激活getLatch().release()进行解锁才会往下走
            // 这里可以学到一个特殊的技巧,Semaphore(0),信号量的许可设置为0,首个调用acquire()的线程会被阻塞,直到其他线程调用此信号量的release()方法才会解除阻塞,类似于一个CountDownLatch(1)的效果
            if (ttl >= 0) {
                try {
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                } catch (InterruptedException e) {
                    if (interruptibly) {
                        throw e;
                    }
                    future.getNow().getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            } else {
                if (interruptibly) {
                    future.getNow().getLatch().acquire();
                } else {
                    future.getNow().getLatch().acquireUninterruptibly();
                }
            }
        }
    } finally {
        // 获取到锁或者抛出中断异常,退订redisson_lock__channel:{$KEY},不再关注解锁事件
        unsubscribe(future, threadId);
    }
}

// 这是一个异步转同步的方法,类似于FutureTask#get(),关键看调用的tryAcquireAsync()方法
private Long tryAcquire(long leaseTime, TimeUnit unit, long threadId) {
    return get(tryAcquireAsync(leaseTime, unit, threadId));
}

/**
 * 通过传入锁持有的最大时间和线程ID异步获取锁
 */
private <T> RFuture<Long> tryAcquireAsync(long leaseTime, TimeUnit unit, long threadId) {
    // 锁持有最大时间不为-1,也就是明确锁的持有时间,不是永久持有的场景
    if (leaseTime != -1) {
        return tryLockInnerAsync(leaseTime, unit, threadId, RedisCommands.EVAL_LONG);
    }
    // 走到这里说明是leaseTime == -1,KEY不设置过期时间的分支,需要启动看门狗机制。尝试内部异步获取锁,注意这里的lockWatchdogTimeout是从配置中获取传进去,不是内部的internalLockLeaseTime属性,这里的默认值还是30000毫秒
    RFuture<Long> ttlRemainingFuture = tryLockInnerAsync(commandExecutor.getConnectionManager().getCfg().getLockWatchdogTimeout(), TimeUnit.MILLISECONDS, threadId, RedisCommands.EVAL_LONG);
    ttlRemainingFuture.onComplete((ttlRemaining, e) -> {
        // 执行异常场景直接返回
        if (e != null) {
            return;
        }

        // 成功获取到锁的场景,需要基于线程ID启用看门狗,通过时间轮指定定时任务进行续期
        if (ttlRemaining == null) {
            // 定时调度进行续期操作
            scheduleExpirationRenewal(threadId);
        }
    });
    return ttlRemainingFuture;
}

/**
 * 转换锁持有最大时间,通过参数进行加锁的LUA脚本调用 
 * getName()就是传入的KEY,如resource:x getLockName()就是锁的名称,形式是:UUID + : + threadId,如559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
 * internalLockLeaseTime在leaseTime != -1的前提下使用的是原值,在leaseTime == -1的前提下,使用的是lockWatchdogTimeout
 */
<T> RFuture<T> tryLockInnerAsync(long leaseTime, TimeUnit unit, long threadId, RedisStrictCommand<T> command) {
    // 时间转换为毫秒,注意一点这里的internalLockLeaseTime是类内的属性,被重新赋值了
    internalLockLeaseTime = unit.toMillis(leaseTime);
    // 底层向Redis服务执行LUA脚本
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, command,
                "if (redis.call('exists', KEYS[1]) == 0) then " +
                    "redis.call('hset', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                    "redis.call('hincrby', KEYS[1], ARGV[2], 1); " +
                    "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                    "return nil; " +
                "end; " +
                "return redis.call('pttl', KEYS[1]);",
                Collections.<Object>singletonList(getName()), internalLockLeaseTime, getLockName(threadId));
}

先留意一下属性 internalLockLeaseTime ,它在 tryLockInnerAsync() 方法内被重新赋值,在 leaseTime == -1L 的前提下,它被赋值为 lockWatchdogTimeout ,这个细节很重要,决定了后面续期方法(看门狗)的调度频率。另外, leaseTime != -1L 不会进行续期,也就是不会启动看门狗机制。

接着需要仔细分析一下 tryLockInnerAsync() 中执行的 LUA 脚本,笔者把它提取出来通过注释进行描述:

-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一段代码是判断锁定的资源KEY不存在的时候进行相应值的设置,代表资源没有被锁定,首次获取锁成功
if (redis.call('exists', KEYS[1]) == 0) then
    -- 这里是设置调用次数,可以理解为延长KEY过期时间的调用次数
    redis.call('hset', KEYS[1], ARGV[2], 1);
    -- 设置KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第二段代码是判断HASH的field是否存在,如果存在说明是同一个线程重入的情况,这个时候需要延长KEY的TTL,并且HASH的field对应的value加1,记录延长ttl的次数
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    -- 这里是增加调用次数,可以理解为增加延长KEY过期时间的调用次数
    redis.call('hincrby', KEYS[1], ARGV[2], 1);
    -- 延长KEY的过期时间
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return nil;
end;
-- 第三段代码是兜底的,走到这里说明当前线程获取锁失败,锁已经被其他(进程中的)线程占有,返回当前KEY被占用资源的ttl,用来确定需要休眠的最大时间
return redis.call('pttl', KEYS[1]);

这里画一个图演示一下这个 Lua 脚本中三段代码出现的逻辑:

INfQNz.png

剩下一个 scheduleExpirationRenewal(threadId) 方法还没有分析,里面的逻辑就是看门狗的定期续期逻辑:

// 基于线程ID定时调度和续期
private void scheduleExpirationRenewal(long threadId) {
    // 如果需要的话新建一个ExpirationEntry记录线程重入计数,同时把续期的任务Timeout对象保存在属性中
    ExpirationEntry entry = new ExpirationEntry();
    ExpirationEntry oldEntry = EXPIRATION_RENEWAL_MAP.putIfAbsent(getEntryName(), entry);
    if (oldEntry != null) {
        // 当前进行的当前线程重入加锁
        oldEntry.addThreadId(threadId);
    } else {
        // 当前进行的当前线程首次加锁
        entry.addThreadId(threadId);
        // 首次新建ExpirationEntry需要触发续期方法,记录续期的任务句柄
        renewExpiration();
    }
}

// 处理续期
private void renewExpiration() {
    // 根据entryName获取ExpirationEntry实例,如果为空,说明在cancelExpirationRenewal()方法已经被移除,一般是解锁的时候触发
    ExpirationEntry ee = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (ee == null) {
        return;
    }
    // 新建一个定时任务,这个就是看门狗的实现,io.netty.util.Timeout是Netty结合时间轮使用的定时任务实例
    Timeout task = commandExecutor.getConnectionManager().newTimeout(new TimerTask() {
        @Override
        public void run(Timeout timeout) throws Exception {
            // 这里是重复外面的那个逻辑,
            ExpirationEntry ent = EXPIRATION_RENEWAL_MAP.get(getEntryName());
            if (ent == null) {
                return;
            }
            // 获取ExpirationEntry中首个线程ID,如果为空说明调用过cancelExpirationRenewal()方法清空持有的线程重入计数,一般是锁已经释放的场景
            Long threadId = ent.getFirstThreadId();
            if (threadId == null) {
                return;
            }
            // 向Redis异步发送续期的命令
            RFuture<Boolean> future = renewExpirationAsync(threadId);
            future.onComplete((res, e) -> {
                // 抛出异常,续期失败,只打印日志和直接终止任务
                if (e != null) {
                    log.error("Can't update lock " + getName() + " expiration", e);
                    return;
                }
                // 返回true证明续期成功,则递归调用续期方法(重新调度自己),续期失败说明对应的锁已经不存在,直接返回,不再递归
                if (res) {
                    // reschedule itself
                    renewExpiration();
                }
            });
        }
    }, 
    // 这里的执行频率为leaseTime转换为ms单位下的三分之一,由于leaseTime初始值为-1的情况下才会进入续期逻辑,那么这里的执行频率为lockWatchdogTimeout的三分之一
    internalLockLeaseTime / 3, TimeUnit.MILLISECONDS); 
    
    // ExpirationEntry实例持有调度任务实例
    ee.setTimeout(task);
}

// 调用Redis,执行Lua脚本,进行异步续期
protected RFuture<Boolean> renewExpirationAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
                "redis.call('pexpire', KEYS[1], ARGV[1]); " +
                "return 1; " +
            "end; " +
            "return 0;",
        Collections.<Object>singletonList(getName()), 
        //  这里根据前面的分析,internalLockLeaseTime在leaseTime的值为-1的前提下,对应值为lockWatchdogTimeout
        internalLockLeaseTime, getLockName(threadId));  
}

基于源码推断出续期的机制由入参 leaseTime 决定:

  • leaseTime == -1 的前提下(一般是 lock()lockInterruptibly() 这类方法调用),续期任务的调度周期为 lockWatchdogTimeout / 3 ,锁的最大持有时间( KEY 的过期时间)被刷新为 lockWatchdogTimeout
  • leaseTime != -1 的前提下(一般是 lock(long leaseTime, TimeUnit unit)lockInterruptibly(long leaseTime, TimeUnit unit) 这类方法调用指定 leaseTime 不为 -1 ),这种情况下会直接设置锁的过期时间为输入值转换为 ms 单位的时间量,不会启动续期机制

提取续期的 Lua 脚本如下:

-- KEYS[1] == getName() --> $KEY --> resource:x
-- ARGV[1] == internalLockLeaseTime --> 30000
-- ARGV[2] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then
    redis.call('pexpire', KEYS[1], ARGV[1]);
    return 1;
end;
return 0;

到此为止,不带 waitTime 参数的加锁和续期逻辑基本分析完毕,而带 waitTime 参数的 tryLock(long waitTime, long leaseTime, TimeUnit unit) 实现其实和只存在 leaseTime 参数的 lock(long leaseTime, TimeUnit unit, boolean interruptibly) 实现底层调用的方法是一致的,最大的区别是会在尝试获取锁操作之后基于前后的 System.currentTimeMillis() 计算出时间差和 waitTime 做对比,决定需要阻塞等待还是直接超时获取锁失败返回,处理阻塞等待的逻辑是客户端本身的逻辑,这里就不做详细展开,因为源码实现也不是十分优雅(太多 long currentTime = System.currentTimeMillis() 的代码段了)。接着花点功夫分析一下解锁的实现,包括一般情况下的解锁 unlock() 和强制解锁 forceUnlockAsync()

//  一般情况下的解锁
@Override
public void unlock() {
    try {
        get(unlockAsync(Thread.currentThread().getId()));
    } catch (RedisException e) {
        // IllegalMonitorStateException一般是A线程加锁,B线程解锁,内部判断线程状态不一致抛出的
        if (e.getCause() instanceof IllegalMonitorStateException) {
            throw (IllegalMonitorStateException) e.getCause();
        } else {
            throw e;
        }
    }
}

@Override
public RFuture<Void> unlockAsync() {
    // 获取当前调用解锁操作的线程ID
    long threadId = Thread.currentThread().getId();
    return unlockAsync(threadId);
}

@Override
public RFuture<Void> unlockAsync(long threadId) {
    // 构建一个结果RedissonPromise
    RPromise<Void> result = new RedissonPromise<Void>();
    // 返回的RFuture如果持有的结果为true,说明解锁成功,返回NULL说明线程ID异常,加锁和解锁的客户端线程不是同一个线程
    RFuture<Boolean> future = unlockInnerAsync(threadId);
    future.onComplete((opStatus, e) -> {
        // 这是内部的异常,说明解锁异常,需要取消看门狗的续期任务
        if (e != null) {
            cancelExpirationRenewal(threadId);
            result.tryFailure(e);
            return;
        }
        // 这种情况说明线程ID异常,加锁和解锁的客户端线程不是同一个线程,抛出IllegalMonitorStateException异常
        if (opStatus == null) {
            IllegalMonitorStateException cause = new IllegalMonitorStateException("attempt to unlock lock, not locked by current thread by node id: "
                    + id + " thread-id: " + threadId);
            result.tryFailure(cause);
            return;
        }
        // 走到这里说明正常解锁,取消看门狗的续期任务
        cancelExpirationRenewal(threadId);
        result.trySuccess(null);
    });
    return result;
}

// 真正的内部解锁的方法,执行解锁的Lua脚本
protected RFuture<Boolean> unlockInnerAsync(long threadId) {
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then " +
                "return nil;" +
            "end; " +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1); " +
            "if (counter > 0) then " +
                "redis.call('pexpire', KEYS[1], ARGV[2]); " +
                "return 0; " +
            "else " +
                "redis.call('del', KEYS[1]); " +
                "redis.call('publish', KEYS[2], ARGV[1]); " +
                "return 1; "+
            "end; " +
            "return nil;",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE, internalLockLeaseTime, getLockName(threadId));
}

// 取消续期任务
void cancelExpirationRenewal(Long threadId) {
    // 这里说明ExpirationEntry已经被移除,一般是基于同一个线程ID多次调用解锁方法导致的(并发解锁)
    ExpirationEntry task = EXPIRATION_RENEWAL_MAP.get(getEntryName());
    if (task == null) {
        return;
    }
    // 传入的线程ID不为NULL,从ExpirationEntry中移除线程ID,如果持有的线程ID对应的线程重入计数不为0,会先递减到0,等于0的前提下才会进行删除
    if (threadId != null) {
        task.removeThreadId(threadId);
    }
    // 这里threadId == null的情况是为了满足强制解锁的场景,强制解锁需要直接删除锁所在的KEY,不需要理会传入的线程ID(传入的线程ID直接为NULL)
    // 后者task.hasNoThreads()是为了说明当前的锁没有被任何线程持有,对于单线程也确定在移除线程ID之后重入计数器已经为0,从ExpirationEntry中移除,这个时候获取ExpirationEntry的任务实例进行取消即可
    if (threadId == null || task.hasNoThreads()) {
        Timeout timeout = task.getTimeout();
        if (timeout != null) {
            timeout.cancel();
        }
        // EntryName -> ExpirationEntry映射中移除当前锁的相关实例ExpirationEntry
        EXPIRATION_RENEWAL_MAP.remove(getEntryName());
    }
}

// 强制解锁
@Override
public boolean forceUnlock() {
    return get(forceUnlockAsync());
}

@Override
public RFuture<Boolean> forceUnlockAsync() {
    // 线程ID传入为NULL,取消当前的EntryName对应的续期任务
    cancelExpirationRenewal(null);
    // 执行Lua脚本强制删除锁所在的KEY并且发布解锁消息
    return commandExecutor.evalWriteAsync(getName(), LongCodec.INSTANCE, RedisCommands.EVAL_BOOLEAN,
            "if (redis.call('del', KEYS[1]) == 1) then "
            + "redis.call('publish', KEYS[2], ARGV[1]); "
            + "return 1 "
            + "else "
            + "return 0 "
            + "end",
            Arrays.<Object>asList(getName(), getChannelName()), LockPubSub.UNLOCK_MESSAGE);
}

这里列出一般情况下解锁和强制解锁的 Lua 脚本,分析如下:

-- unlockInnerAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- ARGV[2] == internalLockLeaseTime --> 30000或者具体的锁最大持有时间
-- ARGV[3] == getLockName(threadId) --> 559cc9df-bad8-4f6c-86a4-ffa51b7f1c36:1
-- 第一个IF分支判断如果锁所在的哈希的field不存在,说明当前线程ID未曾获取过对应的锁,返回NULL表示解锁失败
if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then
    return nil;
end;
-- 走到这里通过hincrby进行线程重入计数-1,返回计数值
local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);
-- 计数值大于0,说明线程重入加锁,这个时候基于internalLockLeaseTime对锁所在KEY进行续期
if (counter > 0) then
    redis.call('pexpire', KEYS[1], ARGV[2]);
    return 0;
else
    -- 计数值小于或等于0,说明可以解锁,删除锁所在的KEY,并且向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
    redis.call('del', KEYS[1]);
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1;
end;
-- 最后的return nil;在IDEA中提示是不会到达的语句,估计这里是开发者笔误写上去的,前面的if-else都有返回语句,这里应该是不可达的
return nil;

-------------------------------------------------- 不怎么华丽的分割线 -------------------------------------------------

-- forceUnlockAsync方法的lua脚本
-- KEYS[1] == getName() --> $KEY --> resource:x
-- KEYS[2] == getChannelName() --> 订阅锁的Channel --> redisson_lock__channel:{resource:x}
-- ARGV[1] == LockPubSub.UNLOCK_MESSAGE --> 常量数值0
-- 强制删除锁所在的KEY,如果删除成功向redisson_lock__channel:{$KEY}发布消息,内容是0(常量数值)
if (redis.call('del', KEYS[1]) == 1) then
    redis.call('publish', KEYS[2], ARGV[1]);
    return 1
else
    return 0
end

其他辅助方法都相对简单,这里弄个简单的”流水账”记录一番:

  • isLocked() :基于 getName() 调用 RedisEXISTS $KEY 命令判断是否加锁
  • isHeldByThread(long threadId)isHeldByCurrentThread() :基于 getName()getLockName(threadId) 调用 RedisHEXISTS $KEY $LOCK_NAME 命令判断 HASH 中对应的 field-value 是否存在,存在则说明锁被对应线程 ID 的线程持有
  • getHoldCount() :基于 getName()getLockName(threadId) 调用 RedisHGET $KEY $LOCK_NAME 命令,用于获取线程对于某一个锁的持有量(注释叫 holds ,其实就是同一个线程对某一个锁的 KEY 的续期次数)

订阅和发布部分设计到大量 Netty 组件使用相关的源码,这里不详细展开,这部分的逻辑简单附加到后面这个流程图中。最后,通过一个比较详细的图分析一下 Redisson 的加锁和解锁流程。

  • 不带 waitTime 参数的加锁流程:

6bmYji.png

  • 带有 waitTime 参数的加锁流程(图右边的流程基本不变,主要是左边的流程每一步都要计算时间间隔):

Qjuemy.png

  • 解锁流程:

i6Vbye.png

假设不同进程的两个不同的线程 XY 去竞争资源 RESOURCE 的锁,那么可能的流程如下:

6J3Q3y.png

最后再概括一下 Redisson 中实现 red lock 算法使用的 HASH 数据类型:

  • KEY 代表的就是资源或者锁, 创建、存在性判断,延长生存周期和删除操作总是针对 KEY 进行的
  • FIELD 代表的是锁名称 lockName() ,但是其实它由 Redisson 连接管理器实例的初始化 UUID 拼接客户端线程 ID 组成,严格来说应该是获取锁的客户端线程唯一标识
  • VALUE 代表的是客户端线程对于锁的持有量,从源码上看应该是 KEY 被续期的次数

基于Jedis实现类似Redisson的分布式锁功能

前面的章节已经比较详细分析了 Redisson 中分布式锁的实现原理,这里使用 Jedis 和多线程技巧做一个类似的实现。为了简单起见,这里只实现一个无入参的 lock() 方法(类似于 RedissonleaseTime == -1 的场景)和 unlock() 方法。定义接口 RedLock

public interface RedLock {

    void lock(String resource) throws InterruptedException;

    void unlock(String resource);
}

为了简单起见,笔者把所有实现逻辑都写在实现类 RedisRedLock 中:

@RequiredArgsConstructor
public class RedisRedLock implements RedLock {

    private final JedisPool jedisPool;
    private final String uuid;

    private static final String WATCH_DOG_TIMEOUT_STRING = "30000";
    private static final long WATCH_DOG_TASK_DURATION = 10000L;
    private static final String CHANNEL_PREFIX = "__red__lock:";
    private static final String UNLOCK_STATUS_STRING = "0";

    private static final String LOCK_LUA = "if (redis.call('exists', KEYS[1]) == 0) then\n" +
            "    redis.call('hset', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then\n" +
            "    redis.call('hincrby', KEYS[1], ARGV[2], 1);\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[1]);\n" +
            "    return nil;\n" +
            "end;\n" +
            "return redis.call('pttl', KEYS[1]);";

    private static final String UNLOCK_LUA = "if (redis.call('hexists', KEYS[1], ARGV[3]) == 0) then\n" +
            "    return nil;\n" +
            "end;\n" +
            "local counter = redis.call('hincrby', KEYS[1], ARGV[3], -1);\n" +
            "if (counter > 0) then\n" +
            "    redis.call('pexpire', KEYS[1], ARGV[2]);\n" +
            "    return 0;\n" +
            "else\n" +
            "    redis.call('del', KEYS[1]);\n" +
            "    redis.call('publish', KEYS[2], ARGV[1]);\n" +
            "    return 1;\n" +
            "end;";

    private static final String RENEW_LUA = "if (redis.call('hexists', KEYS[1], ARGV[2]) == 1) then " +
            "redis.call('pexpire', KEYS[1], ARGV[1]); " +
            "return 1; " +
            "end; " +
            "return 0;";

    private static final ExecutorService SUB_PUB_POOL = Executors.newCachedThreadPool();
    private static final ScheduledExecutorService WATCH_DOG_POOL = new ScheduledThreadPoolExecutor(
            Runtime.getRuntime().availableProcessors() * 2
    );

    private static class ThreadEntry {

        private final ConcurrentMap<Long, Integer> threadCounter = Maps.newConcurrentMap();

        private volatile WatchDogTask watchDogTask;

        public synchronized void addThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                counter = 1;
            } else {
                counter++;
            }
            threadCounter.put(threadId, counter);
        }

        public synchronized boolean hasNoThreads() {
            return threadCounter.isEmpty();
        }

        public synchronized Long getFirstThreadId() {
            if (threadCounter.isEmpty()) {
                return null;
            }
            return threadCounter.keySet().iterator().next();
        }

        public synchronized void removeThreadId(long threadId) {
            Integer counter = threadCounter.get(threadId);
            if (counter == null) {
                return;
            }
            counter--;
            if (counter == 0) {
                threadCounter.remove(threadId);
            } else {
                threadCounter.put(threadId, counter);
            }
        }

        public void setWatchDogTask(WatchDogTask watchDogTask) {
            this.watchDogTask = watchDogTask;
        }

        public WatchDogTask getWatchDogTask() {
            return watchDogTask;
        }
    }

    @Getter
    private static class SubPubEntry {

        private final String key;
        private final Semaphore latch;
        private final SubscribeListener subscribeListener;

        public SubPubEntry(String key) {
            this.key = key;
            this.latch = new Semaphore(0);
            this.subscribeListener = new SubscribeListener(key, latch);
        }
    }

    private static final ConcurrentMap<String, ThreadEntry> THREAD_ENTRY_MAP = Maps.newConcurrentMap();

    @Override
    public void lock(String resource) throws InterruptedException {
        long threadId = Thread.currentThread().getId();
        String lockName = uuid + ":" + threadId;
        String entryName = uuid + ":" + resource;
        // 获取锁
        Long ttl = acquire(resource, lockName, threadId, entryName);
        // 加锁成功直接返回
        if (Objects.isNull(ttl)) {
            return;
        }
        // 订阅
        SubPubEntry subPubEntry = subscribeAsync(resource);
        try {
            for (; ; ) {
                ttl = acquire(resource, lockName, threadId, entryName);
                // 加锁成功直接返回
                if (Objects.isNull(ttl)) {
                    return;
                }
                if (ttl > 0L) {
                    subPubEntry.getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
                }
            }
        } finally {
            unsubscribeSync(subPubEntry);
        }
    }

    private Long acquire(String key, String lockName, long threadId, String entryName) {
        Object result = execute0(jedis -> jedis.eval(LOCK_LUA, Lists.newArrayList(key),
                Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
        if (Objects.nonNull(result)) {
            return Long.parseLong(String.valueOf(result));
        }
        // 启动看门狗
        ThreadEntry entry = new ThreadEntry();
        ThreadEntry oldEntry = THREAD_ENTRY_MAP.putIfAbsent(entryName, entry);
        if (oldEntry != null) {
            oldEntry.addThreadId(threadId);
        } else {
            entry.addThreadId(threadId);
            Runnable renewAction = () -> executeWithoutResult(jedis -> jedis.eval(RENEW_LUA, Lists.newArrayList(key),
                    Lists.newArrayList(WATCH_DOG_TIMEOUT_STRING, lockName)));
            WatchDogTask watchDogTask = new WatchDogTask(new AtomicReference<>(renewAction));
            entry.setWatchDogTask(watchDogTask);
            WATCH_DOG_POOL.scheduleWithFixedDelay(watchDogTask, 0, WATCH_DOG_TASK_DURATION, TimeUnit.MILLISECONDS);
        }
        return null;
    }

    private SubPubEntry subscribeAsync(String key) {
        SubPubEntry subPubEntry = new SubPubEntry(key);
        SUB_PUB_POOL.submit(() -> {
            SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
            executeWithoutResult(jedis -> jedis.subscribe(subscribeListener, subscribeListener.getChannelName()));
            return null;
        });
        return subPubEntry;
    }

    private void unsubscribeSync(SubPubEntry subPubEntry) {
        SubscribeListener subscribeListener = subPubEntry.getSubscribeListener();
        subscribeListener.unsubscribe(subscribeListener.getChannelName());
    }

    @Override
    public void unlock(String resource) {
        long threadId = Thread.currentThread().getId();
        String entryName = uuid + ":" + resource;
        String lockName = uuid + ":" + threadId;
        String channelName = CHANNEL_PREFIX + resource;
        Object result = execute0(jedis -> jedis.eval(UNLOCK_LUA, Lists.newArrayList(resource, channelName),
                Lists.newArrayList(UNLOCK_STATUS_STRING, WATCH_DOG_TIMEOUT_STRING, lockName)));
        ThreadEntry threadEntry = THREAD_ENTRY_MAP.get(entryName);
        if (Objects.nonNull(threadEntry)) {
            threadEntry.removeThreadId(threadId);
            if (threadEntry.hasNoThreads() && Objects.nonNull(threadEntry.getWatchDogTask())) {
                threadEntry.getWatchDogTask().cancel();
            }
        }
        if (Objects.isNull(result)) {
            throw new IllegalMonitorStateException();
        }
    }

    private static class SubscribeListener extends JedisPubSub {

        @Getter
        private final String key;
        @Getter
        private final String channelName;
        @Getter
        private final Semaphore latch;

        public SubscribeListener(String key, Semaphore latch) {
            this.key = key;
            this.channelName = CHANNEL_PREFIX + key;
            this.latch = latch;
        }

        @Override
        public void onMessage(String channel, String message) {
            if (Objects.equals(channelName, channel) && Objects.equals(UNLOCK_STATUS_STRING, message)) {
                latch.release();
            }
        }
    }

    @RequiredArgsConstructor
    private static class WatchDogTask implements Runnable {

        private final AtomicBoolean running = new AtomicBoolean(true);
        private final AtomicReference<Runnable> actionReference;

        @Override
        public void run() {
            if (running.get() && Objects.nonNull(actionReference.get())) {
                actionReference.get().run();
            } else {
                throw new WatchDogTaskStopException("watch dog cancel");
            }
        }

        public void cancel() {
            actionReference.set(null);
            running.set(false);
        }
    }

    private <T> T execute0(Function<Jedis, T> function) {
        try (Jedis jedis = jedisPool.getResource()) {
            return function.apply(jedis);
        }
    }

    interface Action {

        void apply(Jedis jedis);
    }

    private void executeWithoutResult(Action action) {
        try (Jedis jedis = jedisPool.getResource()) {
            action.apply(jedis);
        }
    }

    private static class WatchDogTaskStopException extends RuntimeException {

        @Override
        public synchronized Throwable fillInStackTrace() {
            return this;
        }
    }

    public static void main(String[] args) throws Exception {
        String resourceName = "resource:x";
        RedLock redLock = new RedisRedLock(new JedisPool(new GenericObjectPoolConfig()), UUID.randomUUID().toString());
        Thread threadA = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadA");
        Thread threadB = new Thread(() -> {
            try {
                redLock.lock(resourceName);
                process(resourceName);
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                redLock.unlock(resourceName);
                System.out.println(String.format("线程%s释放资源%s的锁", Thread.currentThread().getName(), resourceName));
            }
        }, "threadB");
        threadA.start();
        threadB.start();
        Thread.sleep(Long.MAX_VALUE);
    }

    private static void process(String resourceName) {
        String threadName = Thread.currentThread().getName();
        System.out.println(String.format("线程%s获取到资源%s的锁", threadName, resourceName));
        try {
            Thread.sleep(1000);
        } catch (InterruptedException ignore) {
        }
    }
}

上面的实现短时间内编写完,没有做详细的 DEBUG ,可能会有纰漏。某次执行结果如下:

线程threadB获取到资源resource:x的锁
线程threadB释放资源resource:x的锁
线程threadA获取到资源resource:x的锁
线程threadA释放资源resource:x的锁

小结

Redisson 中的 red lock 实现,应用到下面的核心技术:

  • 合理应用 Redis 的基本数据类型 HASH
  • Redis 的订阅发布
  • Lua 脚本的原子性
  • Netty 中的 Promise 实现
  • Netty 中的时间轮 HashedWheelTimer 和对应的定时任务 (HashedWheel)Timeout
  • Semaphore 进行带期限、永久或者可中断的阻塞以及唤醒,替代 CountDownLatch 中的无等待期限阻塞

上面的核心技术相对合理地应用,才能实现一个高效而且容错能力相对比较高的分布式锁方案,但是从目前来看, Redisson 仍未解决 red lock 算法中的故障转移缺陷,笔者认为这个有可能是 Redis 实现分布式锁方案的一个底层缺陷, 此方案在 Redis 单实例中是相对完善 ,一旦应用在 Redis 集群(普通主从、哨兵或者 Cluster ),有几率会出现前文提到的节点角色切换导致多个不同客户端获取到同一个资源对应的锁的问题。暂时无解。

参考资料:

Redisson
Redis

画图用的是 ProcessOnhttps://www.processon.com/view/link/5ffc540de0b34d2060d2d715

(c-2-w e-a-20210110 2021年的第一篇文章,希望这一年不要这么鸽,这个系列的下一篇是《冷饭新炒:理解JDK中UUID的底层实现》)

#感谢您访问本站#
#本文转载自互联网,若侵权,请联系删除,谢谢!657271#qq.com#

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK