15

Java 从零实现属于你的 Redis 分布式锁

 3 years ago
source link: http://developer.51cto.com/art/202010/628964.htm
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

7jUnAz.jpg!mobile

V3EN3i6.png!mobile

redis分布式锁

为什么需要分布式锁

在 jdk 中为我们提供了加锁的方式:

(1)synchronized 关键字

(2)volatile + CAS 实现的乐观锁

(3)ReadWriteLock 读写锁

(4)ReenTrantLock 可重入锁

等等,这些锁为我们变成提供极大的便利性,保证在多线程的情况下,保证线程安全。

但是在分布式系统中,上面的锁就统统没用了。

我们想要解决分布式系统中的并发问题,就需要引入分布式锁的概念。

java 代码实现

创作动机

首先是对锁实现原理的一个实现,理论指导实践,实践完善理论。

晚上关于 redis 分布式锁的文章一大堆,但是也都稂莠不齐。

redis 分布式锁工具有时候中间件团队不见得会提供,提供了也不见得经常维护,不如自己实现一个,知道原理,也方便修改。

接口定义

为了便于和 JDK 复用,我们让接口继承自 jdk 的 Lock 接口。

package com.github.houbb.lock.api.core; 
 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Lock; 
 
/** 
 * 锁定义 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public interface ILock extends Lock { 
 
    /** 
     * 尝试加锁 
     * @param time 时间 
     * @param unit 当为 
     * @param key key 
     * @return 返回 
     * @throws InterruptedException 异常 
     * @since 0.0.1 
     */ 
    boolean tryLock(long time, TimeUnit unit, 
                    String key) throws InterruptedException; 
 
    /** 
     * 尝试加锁 
     * @param key key 
     * @return 返回 
     * @since 0.0.1 
     */ 
    boolean tryLock(String key); 
 
    /** 
     * 解锁 
     * @param key key 
     * @since 0.0.1 
     */ 
    void unlock(String key); 
 
} 

方法我们只添加了三个比较常用的核心方法,作为第一个版本,简单点。

后续陆续添加即可。

抽象实现

为了便于后期添加更多的所实现,这里首先实现了一个公用的抽象父类。

package com.github.houbb.lock.redis.core; 
 
import com.github.houbb.lock.api.core.ILock; 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.wait.api.IWait; 
 
import java.util.concurrent.TimeUnit; 
import java.util.concurrent.locks.Condition; 
 
/** 
 * 抽象实现 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public abstract class AbstractLockRedis implements ILock { 
 
    /** 
     * 锁等待 
     * @since 0.0.1 
     */ 
    private final IWait wait; 
 
    protected AbstractLockRedis(IWait wait) { 
        this.wait = wait; 
    } 
 
    @Override 
    public void lock() { 
        throw new UnsupportedOperationException(); 
    } 
 
    @Override 
    public void lockInterruptibly() throws InterruptedException { 
        throw new UnsupportedOperationException(); 
    } 
 
    @Override 
    public boolean tryLock() { 
        return tryLock(LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public void unlock() { 
        unlock(LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException { 
        long startTimeMills = System.currentTimeMillis(); 
 
        // 一次获取,直接成功 
        boolean result = this.tryLock(key); 
        if(result) { 
            return true; 
        } 
 
        // 时间判断 
        if(time <= 0) { 
            return false; 
        } 
        long durationMills = unit.toMillis(time); 
        long endMills = startTimeMills + durationMills; 
 
        // 循环等待 
        while (System.currentTimeMillis() < endMills) { 
            result = tryLock(key); 
            if(result) { 
                return true; 
            } 
 
            // 等待 10ms 
            wait.wait(TimeUnit.MILLISECONDS, 10); 
        } 
        return false; 
    } 
 
    @Override 
    public synchronized boolean tryLock(long time, TimeUnit unit) throws InterruptedException { 
        return tryLock(time, unit, LockRedisConst.DEFAULT_KEY); 
    } 
 
    @Override 
    public Condition newCondition() { 
        throw new UnsupportedOperationException(); 
    } 
 
} 

最核心的实际上是 public boolean tryLock(long time, TimeUnit unit, String key) throws InterruptedException 方法。

这个方法会调用 this.tryLock(key) 获取锁,如果成功,直接返回;如果不成功,则循环等待。

这里设置了超时时间,如果超时,则直接返回 true。

redis 锁实现

我们实现的 redis 分布锁,继承自上面的抽象类。

package com.github.houbb.lock.redis.core; 
 
import com.github.houbb.heaven.util.lang.StringUtil; 
import com.github.houbb.id.api.Id; 
import com.github.houbb.id.core.util.IdThreadLocalHelper; 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.lock.redis.exception.LockRedisException; 
import com.github.houbb.lock.redis.support.operator.IOperator; 
import com.github.houbb.wait.api.IWait; 
 
/** 
 * 这里是基于 redis 实现 
 * 
 * 实际上也可以基于 zk/数据库等实现。 
 * 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class LockRedis extends AbstractLockRedis { 
 
    /** 
     * redis 操作实现 
     * @since 0.0.1 
     */ 
    private final IOperator redisOperator; 
 
    /** 
     * 主键标识 
     * @since 0.0.1 
     */ 
    private final Id id; 
 
    public LockRedis(IWait wait, IOperator redisOperator, Id id) { 
        super(wait); 
        this.redisOperator = redisOperator; 
        this.id = id; 
    } 
 
    @Override 
    public boolean tryLock(String key) { 
        final String requestId = id.id(); 
        IdThreadLocalHelper.put(requestId); 
 
        return redisOperator.lock(key, requestId, LockRedisConst.DEFAULT_EXPIRE_MILLS); 
    } 
 
    @Override 
    public void unlock(String key) { 
        final String requestId = IdThreadLocalHelper.get(); 
        if(StringUtil.isEmpty(requestId)) { 
            String threadName = Thread.currentThread().getName(); 
            throw new LockRedisException("Thread " + threadName +" not contains requestId"); 
        } 
 
        boolean unlock = redisOperator.unlock(key, requestId); 
        if(!unlock) { 
            throw new LockRedisException("Unlock key " + key + " result is failed!"); 
        } 
    } 
} 

这里就是 redis 锁的核心实现了,如果不太理解,建议回顾一下原理篇:

redis 分布式锁原理详解

加锁

加锁部分,这里会生成一个 id 标识,用于区分当前操作者。

为了安全也设置了默认的超时时间。

当然这里是为了简化调用者的使用成本,开发在使用的时候只需要关心自己要加锁的 key 即可。

当然,甚至连加锁的 key 都可以进一步抽象掉,比如封装 @DistributedLock 放在方法上,即可实现分布式锁。这个后续有时间可以拓展,原理也不难。

解锁

解锁的时候,就会获取当前进程的持有标识。

凭借当前线程持有的 id 标识,去解锁。

IOperator

我们对 redis 的操作进行了抽象,为什么抽象呢?

因为 redis 服务种类实际很多,可以是 redis 单点,集群,主从,哨兵。

连接的客户端也可以很多,jedis,spring redisTemplate, codis, redisson 等等。

这里为了后期拓展方便,就对操作进行了抽象。

接口

定义接口如下:

package com.github.houbb.lock.redis.support.operator; 
 
/** 
 * Redis 客户端 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public interface IOperator { 
 
    /** 
     * 尝试获取分布式锁 
     * 
     * @param lockKey    锁 
     * @param requestId  请求标识 
     * @param expireTimeMills 超期时间 
     * @return 是否获取成功 
     * @since 0.0.1 
     */ 
    boolean lock(String lockKey, String requestId, int expireTimeMills); 
 
    /** 
     * 解锁 
     * @param lockKey 锁 key 
     * @param requestId 请求标识 
     * @return 结果 
     * @since 0.0.1 
     */ 
    boolean unlock(String lockKey, String requestId); 
 
} 

jedis 实现

我们实现一个 jedis 单点版本的:

package com.github.houbb.lock.redis.support.operator.impl; 
 
import com.github.houbb.lock.redis.constant.LockRedisConst; 
import com.github.houbb.lock.redis.support.operator.IOperator; 
import redis.clients.jedis.Jedis; 
 
import java.util.Collections; 
 
/** 
 * Redis 客户端 
 * @author binbin.hou 
 * @since 0.0.1 
 */ 
public class JedisOperator implements IOperator { 
 
    /** 
     * jedis 客户端 
     * @since 0.0.1 
     */ 
    private final Jedis jedis; 
 
    public JedisOperator(Jedis jedis) { 
        this.jedis = jedis; 
    } 
 
    /** 
     * 尝试获取分布式锁 
     * 
     * expireTimeMills 保证当前进程挂掉,也能释放锁 
     * 
     * requestId 保证解锁的是当前进程(锁的持有者) 
     * 
     * @param lockKey         锁 
     * @param requestId       请求标识 
     * @param expireTimeMills 超期时间 
     * @return 是否获取成功 
     * @since 0.0.1 
     */ 
    @Override 
    public boolean lock(String lockKey, String requestId, int expireTimeMills) { 
        String result = jedis.set(lockKey, requestId, LockRedisConst.SET_IF_NOT_EXIST, LockRedisConst.SET_WITH_EXPIRE_TIME, expireTimeMills); 
        return LockRedisConst.LOCK_SUCCESS.equals(result); 
    } 
 
    /** 
     * 解锁 
     * 
     * (1)使用 requestId,保证为当前锁的持有者 
     * (2)使用 lua 脚本,保证执行的原子性。 
     * 
     * @param lockKey   锁 key 
     * @param requestId 请求标识 
     * @return 结果 
     * @since 0.0.1 
     */ 
    @Override 
    public boolean unlock(String lockKey, String requestId) { 
        String script = "if redis.call('get', KEYS[1]) == ARGV[1] then return redis.call('del', KEYS[1]) else return 0 end"; 
        Object result = jedis.eval(script, Collections.singletonList(lockKey), Collections.singletonList(requestId)); 
        return LockRedisConst.RELEASE_SUCCESS.equals(result); 
    } 
 
} 

这里时最核心的部分。

别看简单几行代码,需要注意的点还是很多的。

加锁

加锁时附带 requestId,用来标识自己为锁的持有者。

SETNX 当 key 不存在时才进行加锁。

设置加锁的过期时间,避免因异常等原因未释放锁,导致锁的长时间占用。

解锁

使用 lua 脚本,保证操作的原子性。

为了证明为锁的持有者,传入 requestId。

测试验证

maven 引入

<dependency> 
    <groupId>com.github.houbb</groupId> 
    <artifactId>lock-core</artifactId> 
    <version>0.0.1</version> 
</dependency> 

测试代码

Jedis jedis = new Jedis("127.0.0.1", 6379); 
IOperator operator = new JedisOperator(jedis); 
 
// 获取锁 
ILock lock = LockRedisBs.newInstance().operator(operator).lock(); 
 
try { 
    boolean lockResult = lock.tryLock(); 
    System.out.println(lockResult); 
    // 业务处理 
} catch (Exception e) { 
    e.printStackTrace(); 
} finally { 
    lock.unlock(); 
} 

小结

到这里,一个简单版本的 redis 分布式锁就实现完成了。

当然还有很多可以改进的地方:

(1)比如引入递增的 sequence,避免分布式锁中的 GC 导致的问题

(2)对于更多 redis 服务端+客户端的支持

(3)对于注解式 redis 分布式锁的支持


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK