2

ReentrantLock介绍及源码解析 - 木木他爹

 1 year ago
source link: https://www.cnblogs.com/darling2047/p/17097859.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

ReentrantLock介绍及源码解析

一、ReentrantLock介绍

  • ReentrantLock是JUC包下的一个并发工具类,可以通过他显示的加锁(lock)和释放锁(unlock)来实现线程的安全访问,ReentrantLock还可以实现公平锁和非公平锁,并且其与synchronized的作用是一致的,区别在于加锁的底层实现不一样,写法上也不一样,具体异同可以参见下图:
1047153-20230207113335373-1001085428.png

二、ReentrantLock的源码简析

1、源码分析

  • ReentrantLock(下面简称RL)就是AQS独占锁的一个典型实现,其通过维护state变量的值来判断当前线程是否能够拥有锁,如果通过cas将state成功从0变成1表示争用资源成功,否则表示争用失败,进入CLH队列,通过CLH队列来维护那些暂时没抢占到锁资源的线程;其内部维护了一个名为Sync的内部类来继承AQS,又因为RL既可以支持公平锁也可以支持非公平锁,所以其内部还维护了两个内部类FairSync和NonfairSync来继承Sync,通过他们来实现AQS的模板方法从而实现加锁的过程;类的关系图如下:
1047153-20230207113354133-390244017.png
  • 公平锁和非公平锁在源码层的两点区别:

    1、非公平上来直接抢锁

    2、当state=0时,非公平直接抢,公平锁还会判断队列还有没有前置节点

2、lock方法的源码跟踪

下面就让我们跟踪RL的lock()和unLock()源码来看看代码级别是怎么实现的吧!

需要注意的是,本文跟踪的是非公平锁的加解锁过程,公平锁的实现大体一致,当源码中有与公平锁的显著差别时我会通过注释给出解释

  • 试用例如下
public static void main(String[] args) throws InterruptedException {
    long start = System.currentTimeMillis();
    List<Thread> list = new ArrayList<>();
    ReentrantLock lock = new ReentrantLock();
    for (int i = 0; i < 1000; i++) {
        Thread thread = new Thread(()-> {
            for (int j = 0; j < 1000; j++) {
                // 解锁
                lock.lock();
                count++;
                // 释放锁
                lock.unlock();
            }
        });
        list.add(thread);
    }
    for (Thread thread : list) {
        thread.start();
    }
    for (Thread thread : list) {
        thread.join();
    }
    System.out.println("auto.count = " + count + "耗时:" + (System.currentTimeMillis() -start));
}

(1)、lock()的源码跟踪与解析

跟踪lock.lock()发现其调用的是内部类Sync的lock()方法,该方法是一个抽象方法,具体实现由FairSync和NonfairSync实现,由于我们构造RL时调用的是无参构造函数,所以这里会直接进入NonfairSync的lock()方法;具体实现代码和注释如下:

/**
 * java.util.concurrent.locks.ReentrantLock.NonfairSync#lock()
 */
final void lock() {
    // 由于是非公平锁所以这里上来直接争抢资源,尝试通过CAS操作将state的值由0变成1
    if (compareAndSetState(0, 1)) 
        // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。
        setExclusiveOwnerThread(Thread.currentThread());
    else
        // 争抢失败再进入与公平锁一样的排队逻辑
        acquire(1);
}

tips:

1、上面的compareAndSetState方法也是由AQS提供的,里面借助Unsafe实现了对state的cas操作更新

2、setExclusiveOwnerThread也可以理解成由AQS提供(其实是AQS的父类,不过不影响理解),给exclusiveOwnerThread变量赋值,exclusiveOwnerThread表示当前正在拥有锁的线程

3、acquire方法同样由AQS提供,其内部实现也是lock环节比较关键的代码,下面我会详细解释

(2)、acquire()的源码跟踪与解析

acquire方法的源码如下:

/**
 *  java.util.concurrent.locks.AbstractQueuedSynchronizer#acquire(int)
 */
public final void acquire(int arg) {
    /**
     * 1、尝试获取锁;如果成功此方法结束,当前线程执行同步代码块
     * 2、如果获取失败,则构造Node节点并加入CLH队列
     * 3、然后继续等待锁
     */
    if (!tryAcquire(arg) &&
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))
        // 如果获取锁失败,添加CLH队列也失败,那么直接中断当前线程
        selfInterrupt();
}

tips:

1、tryAcquire方法是AQS的一个模板方法,RL下的公平和非公平锁都有不同的实现,下面会详解

2、addWaiter方法是AQS的一个默认实现方法,负责构造当前线程所在的Node,并将其设置到队列的尾巴上

3、acquireQueued方法也是AQS的默认实现,旨在设置CLH队列的head和阻塞当前线程

上面的三个方法下面也会一一介绍

(3)、tryAcquire()的源码跟踪与解析

  • tryAcquire()方法可以理解成尝试获取锁,如果获取成功即表示当前线程拥有了锁;跟踪源码需要注意的一点是:该方法在非公平锁(NonFairSync)下的实现最终调用的是Sync里的nonfairTryAcquire方法,所以我们直接观察该方法是如何实现的即可
/**
 * java.util.concurrent.locks.ReentrantLock.Sync#nonfairTryAcquire(int)
 */
final boolean nonfairTryAcquire(int acquires) {
    // 当前线程
    final Thread current = Thread.currentThread();
    // 获取当前state的值
    int c = getState();
    if (c == 0) {
        /**
         * 非公平锁发现资源未被占用时直接CAS尝试抢占资源;而公平锁发现资源未被占用时
         * 先判断队列里是否还有前置节点再等待,没有才会去抢占资源
         */
        if (compareAndSetState(0, acquires)) {
            // 如果成功将state值变成1表示争抢锁成功,设置当前拥有独占访问权的线程。
            setExclusiveOwnerThread(current);
            return true;
        }
    }
    /** 
     * 如果state!=0表示有争用,再判断当前系统拥有独占权限的线程是不是当前线程,
     * 如果是,则需要支持线程重入,将state的值加1
     */
    else if (current == getExclusiveOwnerThread()) {// 处理可重入的逻辑
        int nextc = c + acquires;
        if (nextc < 0) // overflow
            throw new Error("Maximum lock count exceeded");
        setState(nextc);
        return true;
    }
    // state既不等于0也不需要重入则返回false;表示获取锁失败,代码返回后继续执行acquireQueued方法
    return false;
}

(4)addWaiter()的源码跟踪与解析

  • 执行到addWaiter方法表示前面的tryAcquire尝试获取锁失败了,需要由此方法构建Node节点并加入到CLH队列的末尾;此方法返回的Node即为当前CLH队列的tail节点
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#addWaiter(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node)
 */
private Node addWaiter(Node mode) {
    // 构建Node对象
    Node node = new Node(Thread.currentThread(), mode);
    /**
     * 将当前队列的尾节点赋值给pred,通过命名和下面的代码其实可以发现就是想让tail作为当前节点的前置节点;
     * 但是为什么不直接用tail而将其赋值给pred再用呢?我想应该是考虑并发环境下tail的引用有可能会被其他线程改变
     */
    Node pred = tail;
    if (pred != null) {
        // 如果当前队列的尾结点(tail)不为空,就将其作为当前Node节点的前置节点
        node.prev = pred;
        // 然后通过AQS自带的cas方法将当前构建的Node节点插入到队列的尾巴上
        if (compareAndSetTail(pred, node)) {
            // 如果成功了,前置节点也就是之前的tail节点的后继节点就是当前节点,赋值
            pred.next = node;
            // 返回构建的Node节点,即当前队列的tail节点
            return node;
        }
    }
    // 如果队列的tail节点为空,或者cas设置tail节点失败的话调用此方法;旨在重新设置队列的tail节点
    enq(node);
    return node;
}

(5)、acquireQueued()的源码跟踪与解析

  • 当线程通过tryAcquire上锁失败,然后通过addWaiter将当前线程添加到队列末尾后,通过此方法再次判断是否轮到当前节点,并再次尝试获取锁,获取不到的话进行阻塞操作,源码与注释如下:
/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#acquireQueued(java.util.concurrent.locks.AbstractQueuedSynchronizer.Node, int)
 */
final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            // 获取tail节点的前置节点
            final Node p = node.predecessor();
            /**
             * 如果前置节点就是头节点表示当前tail节点就是第二个节点,就可以尝试着去获取锁,
             * 然后将tail节点设置成头节点,返回线程中断状态为false;表示当前线程获取到锁
             */
            if (p == head && tryAcquire(arg)) {
                setHead(node);
                /**
                 * 既然tail已经获取到锁了,那么前置节点就没用了,这里将前置节点的next设置为空,
                 * 是为了方便垃圾回收,因为如果不指定为空,前置节点的next就是当前的tail节点,
                 * 不会被回收
                 */
                p.next = null; // help GC
                failed = false;
                return interrupted;
            }
            /**
             * 如果前置节点不为head,或者虽然前置节点是head但是获取锁失败,那么就
             * 需要在这里将线程阻塞,阻塞利用的是LockSupport.park(thread)来实现的
             */
            if (shouldParkAfterFailedAcquire(p, node) &&
                parkAndCheckInterrupt())
                interrupted = true;
        }
    } finally {
        if (failed)
            // 退出获取锁
            cancelAcquire(node);
    }
}

至此,RL非公平锁加锁的过程的源码跟踪完毕,流程也不算复杂,下面简单梳理一遍:

1、上来直接尝试获取锁(修改state值),成功表示获取成功

2、否则执行tryAcquire方法尝试通过cas的方式获取锁,并处理可能存在的重入操作

3、获取失败则通过addWriter方法构建Node节点并加入CLH队列的末尾

4、然后在acquireQueued里再次获取锁,获取失败则阻塞当前线程;

下面简单画了一下lock()方法的调用泳道图

1、调用父类AQS的compareAndSetState通过cas的模式尝试将state状态改为1,修改成功则持有锁,将当前线程设为ExclusiveOwnerThread

1047153-20230207113430395-1440902839.png

3、unLock方法的源码跟踪

  • 释放锁其实就是将state状态减1,然后处理可重入逻辑,如果没有重入的话直接唤醒当前队列的head节点,把当前线程所在的Node节点从队列中剔除
  • unLock方法对应AQS的tryRelease模板方法的实现,其没有lock那么复杂,因为不用支持公平和非公平锁,所以其可以直接在sync中调用AQS提供的release方法,然后触发tryRelease,调用sync里的tryRelease实现从而实现解锁

AQS的release源码

/**
 * java.util.concurrent.locks.AbstractQueuedSynchronizer#release(int)
 */
public final boolean release(int arg) {
    // 尝试释放锁
    if (tryRelease(arg)) {
        // 释放成功,判断当前队列头节点是否为空,不为空并且等待状态不等于0则唤醒当前队列的头节点
        Node h = head;
        if (h != null && h.waitStatus != 0)
            unparkSuccessor(h);
        return true;
    }
    return false;
}

RL的tryRelease实现

/**
 * java.util.concurrent.locks.ReentrantLock.Sync#tryRelease(int)
 * @param releases
 * @return
 */
protected final boolean tryRelease(int releases) {
    // state减1
    int c = getState() - releases;
    // 如果当前线程不是正在获取到锁的线程直接抛异常
    if (Thread.currentThread() != getExclusiveOwnerThread())
        throw new IllegalMonitorStateException();
    boolean free = false;
    // 如果state减1后等于0表示没有重入,表示释放锁成功,将当前获取锁的线程置空
    if (c == 0) {
        free = true;
        setExclusiveOwnerThread(null);
    }
    // 将最新的state状态更新到AQS中
    setState(c);
    return free;
}

unlock()总结:

1、调用父类AQS的release方法实际调用的是tryRelease这个模板方法由ReentrantLock本身实现

2、tryRelease方法尝试将state减1,如果减完等于0表示解锁成功,将ExclusiveOwner线程设为空;并且唤醒队列的头节点(unparkSuccessor)。

3、如果不等于0表示解锁失败,将state设为减1过后的值;也是为了可重入


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK