1

分布式锁的实现方案 - bug的自我救赎

 2 years ago
source link: https://www.cnblogs.com/selfsalvation/p/16188160.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

什么是分布式锁

当多个进程在同一个系统中,用分布式锁控制多个进程对资源的访问

分布式锁应用场景

  1. 传统的单体应用单机部署情况下,可以使用java并发处理相关的API进行互斥控制。
  2. 分布式系统后由于多线程,多进程分布在不同机器上,使单机部署情况下的并发控制锁策略失效,为了解决跨JVM互斥机制来控制共享资源的访问,这就是分布式锁的来源;分布式锁应用场景大都是高并发、大流量场景。

分布式锁实现

1、基于redis的分布式锁

redis分布式锁的实现

  1. 加锁机制:根据hash节点选择一个客户端执行lua脚本
  2. 锁互斥机制:再来一个客户端执行同样的lua脚本会提示已经存在锁,然后进入循环一直尝试加锁
  3. 可重入机制
  4. watch dog自动延期机制
  5. 释放锁机制 
 1 private RedissonClient getClient(){
 2         Config config = new Config();
 3         config.useSingleServer().setAddress("redis://127.0.0.1:6379");//.setPassword("");//.setConnectionMinimumIdleSize(10).setConnectionPoolSize(10);//.setConnectionPoolSize();//172.16.10.164
 4         RedissonClient redissonClient = Redisson.create(config);
 5         return redissonClient;
 6     }
 7     private ExecutorService executorService = Executors.newCachedThreadPool();
 8     ----------------------------------------------------------------
 9       int[] count = {0};
10         for (int i = 0; i < 10; i++) {
11             RedissonClient client = getClient();
12             final RedisLock redisLock = new RedisLock(client,"lock_key");
13             executorService.submit(() -> {
14                 try {
15                     redisLock.lock();
16                     count[0]++;
17                 } catch (Exception e) {
18                     e.printStackTrace();
19                 } finally {
20                     try {
21                         redisLock.unlock();
22                     } catch (Exception e) {
23                         e.printStackTrace();
24                     }
25                 }
26             });

RedLock

 1 public static RLock create (String url, String key){
 2         Config config = new Config();
 3         config.useSingleServer().setAddress(url);
 4         RedissonClient redissonClient = Redisson.create(config);
 5         return redissonClient.getLock(key);
 6     }
 7 
 8     RedissonRedLock redissonRedLock = new RedissonRedLock(
 9             create("redis://redis://127.0.0.1:6379","lock_key1"),
10             create("redis://redis://127.0.0.1:6380","lock_key2"),
11             create("redis://redis://127.0.0.1:6381","lock_key3"));
12     RedisRedLock redLock = new RedisRedLock(redissonRedLock);
13 
14     private ExecutorService executorService = Executors.newCachedThreadPool();
15     
16     ------------------------------------------------------------------
17      int[] count = {0};
18         for (int i = 0; i < 2; i++) {
19             executorService.submit(() -> {
20                 try {
21                     redLock.lock();
22                     count[0]++;
23                 } catch (Exception e) {
24                     e.printStackTrace();
25                 } finally {
26                     try {
27                         redLock.unlock();
28                     } catch (Exception e) {
29                         e.printStackTrace();
30                     }
31                 }
32             });
33         }

redis源码分析

redisson

 1 public void lockInterruptibly(long leaseTime, TimeUnit unit) throws InterruptedException {
 2         //获取当前线程id
 3         long threadId = Thread.currentThread().getId();
 4         //尝试获得锁,返回还剩余的锁过期时间
 5         Long ttl = tryAcquire(leaseTime, unit, threadId);
 6         // lock acquired
 7         //如果ttl为空,代表当前没有锁,获取成功
 8         if (ttl == null) {
 9             return;
10         }
11         
12         //如果获取锁失败,则订阅到对应这个锁的channel,一旦其他线程释放锁时,通知线程去获取锁
13         RFuture<RedissonLockEntry> future = subscribe(threadId);
14         commandExecutor.syncSubscription(future);
15 
16         try {
17             //循环等待
18             while (true) {
19                 //尝试获得锁
20                 ttl = tryAcquire(leaseTime, unit, threadId);
21                 // lock acquired
22                 if (ttl == null) {
23                     break;
24                 }
25 
26                 // waiting for message //ttl大于0,则等待ttl时间后继续尝试获取锁
27                 if (ttl >= 0) {
28                     getEntry(threadId).getLatch().tryAcquire(ttl, TimeUnit.MILLISECONDS);
29                 } else {
30                     getEntry(threadId).getLatch().acquire();
31                 }
32             }
33         } finally {
34             unsubscribe(future, threadId);
35         }
36 //        get(lockAsync(leaseTime, unit));
37     }

2、基于ETCD实现分布式锁分析

ETCD分布式锁的实现

  1. Lease机制:租约机制(TTL,Time To Live),Etcd 可以为存储的 key-value 对设置租约,
    当租约到期,key-value 将失效删除;同时也支持续约,通过客户端可以在租约到期之前续约,
    以避免 key-value 对过期失效。Lease 机制可以保证分布式锁的安全性,为锁对应的 key 配置租约,
    即使锁的持有者因故障而不能主动释放锁,锁也会因租约到期而自动释放

  2. Revision机制:每个 key 带有一个 Revision 号,每进行一次事务加一,它是全局唯一的,
    通过 Revision 的大小就可以知道进行写操作的顺序。在实现分布式锁时,多个客户端同时抢锁,
    根据 Revision 号大小依次获得锁,可以避免 “羊群效应” ,实现公平锁

  3. Prefix机制:即前缀机制。例如,一个名为 /etcdlock 的锁,两个争抢它的客户端进行写操作,
    实际写入的 key 分别为:key1="/etcdlock/UUID1",key2="/etcdlock/UUID2",
    其中,UUID 表示全局唯一的 ID,确保两个 key 的唯一性。写操作都会成功,但返回的 Revision 不一样,
    那么,如何判断谁获得了锁呢?通过前缀 /etcdlock 查询,返回包含两个 key-value 对的的 KeyValue 列表,
    同时也包含它们的 Revision,通过 Revision 大小,客户端可以判断自己是否获得锁

  4. Watch机制:即监听机制,Watch 机制支持 Watch 某个固定的 key,也支持 Watch 一个范围(前缀机制),
    当被 Watch 的 key 或范围发生变化,客户端将收到通知;在实现分布式锁时,如果抢锁失败,
    可通过 Prefix 机制返回的 KeyValue 列表获得 Revision 比自己小且相差最小的 key(称为 pre-key),
    对 pre-key 进行监听,因为只有它释放锁,自己才能获得锁,如果 Watch 到 pre-key 的 DELETE 事件,
    则说明 pre-key 已经释放,自己已经持有锁

基于ETCD分布式锁

步骤1、建立连接

客户端连接 Etcd,以 /etcd/lock 为前缀创建全局唯一的 key,
假设第一个客户端对应的 key="/etcd/lock/UUID1",第二个为 key="/etcd/lock/UUID2";
客户端分别为自己的 key 创建租约 - Lease,租约的长度根据业务耗时确定;

步骤2、创建定时任务作为租约的“心跳”

当一个客户端持有锁期间,其它客户端只能等待,为了避免等待期间租约失效,
客户端需创建一个定时任务作为“心跳”进行续约。此外,如果持有锁期间客户端崩溃,
心跳停止,key 将因租约到期而被删除,从而锁释放,避免死锁

步骤3、客户端将自己全局唯一的 key 写入 Etcd

执行 put 操作,将步骤 1 中创建的 key 绑定租约写入 Etcd,根据 Etcd 的 Revision 机制,
假设两个客户端 put 操作返回的 Revision 分别为 1、2,客户端需记录 Revision 用以
接下来判断自己是否获得锁

步骤 4、客户端判断是否获得锁

客户端以前缀 /etcd/lock/ 读取 keyValue 列表,判断自己 key 的 Revision 是否为当前列表中
最小的,如果是则认为获得锁;否则监听列表中前一个 Revision 比自己小的 key 的删除事件,一旦监听到删除事件或者因租约失效而删除的事件,则自己获得锁。

步骤 5、执行业务

获得锁后,操作共享资源,执行业务代码

步骤 6、释放锁

完成业务流程后,删除对应的key释放锁

  1 public class EtcdDistributeLock extends AbstractLock{
  2 
  3     private Client client;
  4     private Lock lockClient;
  5     private Lease leaseClient;
  6     private String lockKey;
  7     private String lockPath;
  8     /** 锁的次数 */
  9     private AtomicInteger lockCount;
 10     /** 租约有效期,防止客户端崩溃,可在租约到期后自动释放锁;另一方面,正常执行过程中,会自动进行续租,单位 ns */
 11     private Long leaseTTL;
 12     /** 续约锁租期的定时任务,初次启动延迟,单位默认为 s,默认为1s,可根据业务定制设置*/
 13     private Long initialDelay = 0L;
 14     /** 定时任务线程池类 */
 15     ScheduledExecutorService service = null;
 16     /** 保存线程与锁对象的映射,锁对象包含重入次数,重入次数的最大限制为Int的最大值 */
 17     private final ConcurrentMap<Thread, LockData> threadData = Maps.newConcurrentMap();
 18 
 19     public EtcdDistributeLock(){}
 20 
 21     public EtcdDistributeLock(Client client, String lockKey, long leaseTTL,TimeUnit unit){
 22         this.client = client;
 23         lockClient = client.getLockClient();
 24         leaseClient = client.getLeaseClient();
 25         this.lockKey = lockKey;
 26         // 转纳秒
 27         this.leaseTTL = unit.toNanos(leaseTTL);
 28         service = Executors.newSingleThreadScheduledExecutor();
 29     }
 30 
 31 
 32     @Override
 33     public void lock() {
 34         // 检查重入性
 35         Thread currentThread = Thread.currentThread();
 36         LockData oldLockData = threadData.get(currentThread);
 37         if (oldLockData != null && oldLockData.isLockSuccess()) {
 38             // re-entering
 39             int lockCount = oldLockData.lockCount.incrementAndGet();
 40             if(lockCount < 0 ){
 41                 throw new Error("超出可重入次数限制");
 42             }
 43             return;
 44         }
 45 
 46         // 记录租约 ID
 47         Long leaseId = 0L;
 48         try{
 49             leaseId = leaseClient.grant(TimeUnit.NANOSECONDS.toSeconds(leaseTTL)).get().getID();
 50             // 续租心跳周期
 51             long period = leaseTTL - leaseTTL / 5;
 52             // 启动定时任务续约
 53             service.scheduleAtFixedRate(new EtcdDistributeLock.KeepAliveRunnable(leaseClient, leaseId),
 54                     initialDelay,period,TimeUnit.NANOSECONDS);
 55             LockResponse lockResponse = lockClient.lock(ByteSequence.from(lockKey.getBytes()), leaseId).get();
 56             if(lockResponse != null){
 57                 lockPath = lockResponse.getKey().toString(Charset.forName("utf-8"));
 58                 log.info("获取锁成功,锁路径:{},线程:{}",lockPath,currentThread.getName());
 59             }
 60         }catch (InterruptedException | ExecutionException e){
 61             log.error("获取锁失败",e);
 62             return;
 63         }
 64         // 获取锁成功,锁对象设置
 65         LockData newLockData = new LockData(currentThread, lockKey);
 66         newLockData.setLeaseId(leaseId);
 67         newLockData.setService(service);
 68         threadData.put(currentThread, newLockData);
 69         newLockData.setLockSuccess(true);
 70     }
 71 
 72     @Override
 73     public void lockInterruptibly() throws InterruptedException {
 74         super.lockInterruptibly();
 75     }
 76 
 77     @Override
 78     public boolean tryLock() {
 79         return super.tryLock();
 80     }
 81 
 82     @Override
 83     public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
 84         return super.tryLock(time,unit);
 85     }
 86 
 87 
 88     @Override
 89     public void unlock() {
 90         Thread currentThread = Thread.currentThread();
 91         LockData lockData = threadData.get(currentThread);
 92         if (lockData == null){
 93             throw new IllegalMonitorStateException("You do not own the lock: " + lockKey);
 94         }
 95         int newLockCount = lockData.lockCount.decrementAndGet();
 96         if ( newLockCount > 0 ) {
 97             return;
 98         }
 99         if ( newLockCount < 0 ) {
100             throw new IllegalMonitorStateException("Lock count has gone negative for lock: " + lockKey);
101         }
102         try {
103             // 释放锁
104             if(lockPath != null){
105                 lockClient.unlock(ByteSequence.from(lockPath.getBytes())).get();
106             }
107             if(lockData != null){
108                 // 关闭定时任务
109                 lockData.getService().shutdown();
110                 // 删除租约
111                 if (lockData.getLeaseId() != 0L) {
112                     leaseClient.revoke(lockData.getLeaseId());
113                 }
114             }
115         } catch (InterruptedException | ExecutionException e) {
116             log.error("解锁失败",e);
117         }finally {
118             // 移除当前线程资源
119             threadData.remove(currentThread);
120         }
121     }
122 
123 
124     @Override
125     public Condition newCondition() {
126         return super.newCondition();
127     }
128 
129     /**
130      * 心跳续约线程类
131      */
132     public static class KeepAliveRunnable implements Runnable {
133         private Lease leaseClient;
134         private long leaseId;
135 
136         public KeepAliveRunnable(Lease leaseClient, long leaseId) {
137             this.leaseClient = leaseClient;
138             this.leaseId = leaseId;
139         }
140 
141         @Override
142         public void run() {
143             // 对该leaseid进行一次续约
144             leaseClient.keepAliveOnce(leaseId);
145         }
146     }
147 
148 public class EtcdLockTest {
149     private Client client;
150     private String key = "/etcd/lock";
151     private static final String server = "http://xxxx:xxxx";
152     private ExecutorService executorService = Executors.newFixedThreadPool(10000);
153 
154     @Before
155     public void before() throws Exception {
156         initEtcdClient();
157     }
158 
159     private void initEtcdClient(){
160        client = Client.builder().endpoints(server).build();
161     }
162 
163     @Test
164     public void testEtcdDistributeLock() throws InterruptedException {
165         int[] count = {0};
166         for (int i = 0; i < 100; i++) {
167             executorService.submit(() -> {
168                 final EtcdDistributeLock lock = new EtcdDistributeLock(client, key,20,TimeUnit.SECONDS);
169                 try {
170                     lock.lock();
171                     count[0]++;
172                 } catch (Exception e) {
173                     e.printStackTrace();
174                 } finally {
175                     try {
176                         lock.unlock();
177                     } catch (Exception e) {
178                         e.printStackTrace();
179                     }
180                 }
181             });
182         }
183         executorService.shutdown();
184         executorService.awaitTermination(1, TimeUnit.HOURS);
185         System.err.println("执行结果: " + count[0]);
186     }
187 }

3、基于Zookeeper分布式锁

  1. 启动客户端,确认链接到了服务器
  2. 多个客户端并发的在特定路径下创建临时性顺序节点
  3. 客户端判断自己的创建的顺序节点是否是最小的,如果是最小的,则获取锁成功
  4. 第三步若判定失败,则采用zk的watch机制监听自己的前一个顺序节点,等待前一个节点的删除(放锁)事件,再开始第三步判定。

zookeeper作为高性能分布式协调框架,可以把其看做一个文件系统,其中有节点的概念,并且分为4种:1.持久性节点2.持久性顺序节点3.临时性节点4.临时性顺序节点。
分布式锁的实现主要思路就是:监控其他客户端的状态,来判断自己是否可以获得锁。
采用临时性顺序节点的原因:

  1. zk服务器维护了客户端的会话有效性,当会话失效的时候,其会话所创建的临时性节点都会被删除,通过这一特点,可以通过watch临时节点来监控其他客户端的情况,方便自己做出相应动作。
  2. 因为zk对写操作是顺序性的,所以并发创建的顺序节点会有一个唯一确定的序号,当前锁是公平锁的一种实现,所以依靠这种顺序性可以很好的解释—节点序列小的获取到锁并且可以采用watch自己的前一个节点来避免惊群现象(这样watch事件的传播是线性的)。
 1 public class ZKLock extends AbstractLock {
 2 
 3     /**
 4      *     1.Connect to zk
 5      */
 6     private CuratorFramework client;
 7 
 8     private InterProcessLock lock ;
 9 
10 
11     public  ZKLock(String zkAddress,String lockPath) {
12         // 1.Connect to zk
13         client = CuratorFrameworkFactory.newClient(
14                 zkAddress,
15                 new RetryNTimes(5, 5000)
16         );
17         client.start();
18         if(client.getState() == CuratorFrameworkState.STARTED){
19             log.info("zk client start successfully!");
20             log.info("zkAddress:{},lockPath:{}",zkAddress,lockPath);
21         }else{
22             throw new RuntimeException("客户端启动失败。。。");
23         }
24         this.lock = defaultLock(lockPath);
25     }
26 
27     private InterProcessLock defaultLock(String lockPath ){
28        return  new InterProcessMutex(client, lockPath);
29     }
30     @Override
31     public void lock() {
32         try {
33             this.lock.acquire();
34         } catch (Exception e) {
35             throw new RuntimeException(e);
36         }
37     }
38 
39     @Override
40     public boolean tryLock() {
41         boolean flag ;
42         try {
43             flag=this.lock.acquire(0,TimeUnit.SECONDS);
44         } catch (Exception e) {
45             throw new RuntimeException(e);
46         }
47         return flag;
48     }
49 
50     @Override
51     public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {
52         boolean flag ;
53         try {
54             flag=this.lock.acquire(time,unit);
55         } catch (Exception e) {
56             throw new RuntimeException(e);
57         }
58         return flag;
59     }
60 
61     @Override
62     public void unlock() {
63         try {
64             this.lock.release();
65         } catch (Exception e) {
66             throw new RuntimeException(e);
67         }
68     }
69 
70 }
71  private ExecutorService executorService = Executors.newCachedThreadPool();
72 
73 
74     @Test
75     public void testLock() throws Exception{
76         ZKLock zkLock = new ZKLock("xxxx:xxxx","/lockPath");
77         int[] num = {0};
78         long start = System.currentTimeMillis();
79         for(int i=0;i<200;i++){
80             executorService.submit(()->{
81                 try {
82                     zkLock.lock();
83                     num[0]++;
84                 } catch (Exception e){
85                     throw new RuntimeException(e);
86                 } finally {
87                     zkLock.unlock();
88                 }
89             });
90 
91         }
92         executorService.shutdown();
93         executorService.awaitTermination(1, TimeUnit.HOURS);
94         log.info("耗时:{}",System.currentTimeMillis()-start);
95         System.out.println(num[0]);
96     }

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK