3

JUC同步锁原理源码解析四----Semaphore - bug的自我救赎

 1 year ago
source link: https://www.cnblogs.com/selfsalvation/p/17488560.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

JUC同步锁原理源码解析四----Semaphore

Semaphore

1.Semaphore的来源

A counting semaphore.  Conceptually, a semaphore maintains a set of permits.  Each {@link #acquire} blocks if necessary until a permit isavailable, and then takes it.  Each {@link #release} adds a permit,potentially releasing a blocking acquirer.

​ 一组数量的信号,只有获取到信号的线程才允许执行。通过acquire进行获取,如果获取不到则需要阻塞等待直到一个信号可用。release会释放一个信号量。通过这种方式可以实现限流。

2.Semaphore的底层实现

​ Semaphore的底层实现依旧依赖于AQS的共享锁机制。

2.AQS源码

Node节点

 static final class Node {
        /** Marker to indicate a node is waiting in shared mode */
        static final Node SHARED = new Node();
        /** Marker to indicate a node is waiting in exclusive mode */
        static final Node EXCLUSIVE = null;

        /** waitStatus value to indicate thread has cancelled */
        static final int CANCELLED =  1;
        /** waitStatus value to indicate successor's thread needs unparking */
        static final int SIGNAL    = -1;
        /** waitStatus value to indicate thread is waiting on condition */
        static final int CONDITION = -2;
 
        static final int PROPAGATE = -3;

        volatile int waitStatus;

        volatile Node prev;

        volatile Node next;
       
        volatile Thread thread;

        Node nextWaiter;
}

AbstractQueuedSynchronizer类

public abstract class AbstractQueuedSynchronizer
    extends AbstractOwnableSynchronizer
    implements java.io.Serializable {
    
 	private transient volatile Node head;

    /**
     * Tail of the wait queue, lazily initialized.  Modified only via
     * method enq to add new wait node.
     */
    private transient volatile Node tail;

    /**
     * The synchronization state.
     */
    private volatile int state;//最重要的一个变量
       
}

ConditionObject类

public class ConditionObject implements Condition, java.io.Serializable {
        private static final long serialVersionUID = 1173984872572414699L;
        /** First node of condition queue. */
        private transient Node firstWaiter;
        /** Last node of condition queue. */
        private transient Node lastWaiter;
}

accquire方法

public final void acquire(int arg) {
    if (!tryAcquire(arg) &&//尝试获取锁
        acquireQueued(addWaiter(Node.EXCLUSIVE), arg))//如果获取锁失败,添加到队列中,由于ReentrantLock是独占锁所以节点必须是EXCLUSIVE类型
        selfInterrupt();//添加中断标识位
}

addWaiter方法

private Node addWaiter(Node mode) {
     Node node = new Node(Thread.currentThread(), mode);//新建节点
     // Try the fast path of enq; backup to full enq on failure
     Node pred = tail;//获取到尾指针
     if (pred != null) {//尾指针不等于空,将当前节点替换为尾指针
         node.prev = pred;
         if (compareAndSetTail(pred, node)) {//采用尾插法,充分利用时间局部性和空间局部性。尾插的节点一般不容易被取消。
             pred.next = node;
             return node;
         }
     }
     enq(node);//cas失败后执行入队操作,继续尝试
     return node;
 }

enq方法

private Node enq(final Node node) {
    for (;;) {
        Node t = tail;//获取尾指针
        if (t == null) { //代表当前队列没有节点
            if (compareAndSetHead(new Node()))//将当前节点置为头结点
                tail = head;
        } else {//当前队列有节点
            node.prev = t;//
            if (compareAndSetTail(t, node)) {//将当前节点置为尾结点
                t.next = node;
                return t;
            }
        }
    }
}

acquireQueued方法

final boolean acquireQueued(final Node node, int arg) {
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到当前节点的前驱节点
            if (p == head && tryAcquire(arg)) {//前驱节点等于头节点尝试cas抢锁。
                setHead(node);//抢锁成功将当前节点设置为头节点
                p.next = null; // help GC  当头结点置空
                failed = false;
                return interrupted;
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//当队列中有节点在等待,判断是否应该阻塞
                parkAndCheckInterrupt())//阻塞等待,检查中断标识位
                interrupted = true;//将中断标识位置为true
        }
    } finally {
        if (failed)//
            cancelAcquire(node);//取消当前节点
    }
}


 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//当前节点为空直接返回
         return;

     node.thread = null;//要取消了将当前节点的线程置为空
     // Skip cancelled predecessors
     Node pred = node.prev;//获取到当前节点的前驱节点
     while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//将当前要取消的节点断链

     node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
         compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驱节点不等于头结点
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
             pred.thread != null) {//前驱节点的线程补位空
             Node next = node.next;//获取当前节点的next指针
             if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
                 compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
         } else {
             unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
         }

         node.next = node; // help GC 将引用指向自身
     }
 }

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//获取当前节点状态
     if (ws < 0)//如果节点为负数也即不是取消节点
         compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//获取到下一个节点
     if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
         s = null;//将s置为空
         for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等于空
         LockSupport.unpark(s.thread);//唤醒当前节点s
 }

shouldParkAfterFailedAcquire方法


private static boolean shouldParkAfterFailedAcquire(Node pred, Node node) {
    int ws = pred.waitStatus;//获取上一个节点的等待状态
    if (ws == Node.SIGNAL)//如果状态为SIGNAL,代表后续节点有节点可以唤醒,可以安心阻塞去
        /*
             * This node has already set status asking a release
             * to signal it, so it can safely park.
             */
        return true;
    if (ws > 0) {//如果当前状态大于0,代表节点为CANCELLED状态
        /*
             * Predecessor was cancelled. Skip over predecessors and
             * indicate retry.
             */
        do {
            node.prev = pred = pred.prev;//从尾节点开始遍历,找到下一个状态不是CANCELLED的节点。将取消节点断链移除
        } while (pred.waitStatus > 0);
        pred.next = node;
    } else {
        /*
             * waitStatus must be 0 or PROPAGATE.  Indicate that we
             * need a signal, but don't park yet.  Caller will need to
             * retry to make sure it cannot acquire before parking.
             */
        //这里需要注意ws>0时,已经找到了一个不是取消状态的前驱节点。
        compareAndSetWaitStatus(pred, ws, Node.SIGNAL);//将找到的不是CANCELLED节点的前驱节点,将其等待状态置为SIGNAL
    }
    return false;
}

cancelAcquire方法

 private void cancelAcquire(Node node) {
     // Ignore if node doesn't exist
     if (node == null)//当前节点为空直接返回
         return;

     node.thread = null;//要取消了将当前节点的线程置为空
     // Skip cancelled predecessors
     Node pred = node.prev;//获取到当前节点的前驱节点
     while (pred.waitStatus > 0)//如果当前节点的前驱节点的状态大于0,代表是取消状态,一直找到不是取消状态的节点
         node.prev = pred = pred.prev;
     Node predNext = pred.next;//将当前要取消的节点断链

     node.waitStatus = Node.CANCELLED;//将当前节点的等待状态置为CANCELLED
     // If we are the tail, remove ourselves.
     if (node == tail && compareAndSetTail(node, pred)) {//如果当前节点是尾结点,将尾结点替换为浅语节点
         compareAndSetNext(pred, predNext, null);//将当前节点的下一个节点置为空,因为当前节点是最后一个节点没有next指针
     } else {
         // If successor needs signal, try to set pred's next-link
         // so it will get one. Otherwise wake it up to propagate.
         int ws;
         if (pred != head &&//前驱节点不等于头结点
             ((ws = pred.waitStatus) == Node.SIGNAL ||//前驱节点的状态不等于SIGNAL
              (ws <= 0 && compareAndSetWaitStatus(pred, ws, Node.SIGNAL))) &&//前驱节点的状态小于0,并且cas将前驱节点的等待置为SIGNAL
             pred.thread != null) {//前驱节点的线程补位空
             Node next = node.next;//获取当前节点的next指针
             if (next != null && next.waitStatus <= 0)//如果next指针不等于空并且等待状态小于等于0,标识节点有效
                 compareAndSetNext(pred, predNext, next);//将前驱节点的next指针指向下一个有效节点
         } else {
             unparkSuccessor(node);//唤醒后续节点 条件:1.前驱节点是头结点 2.当前节点不是signal,在ReentransLock中基本不会出现,在读写锁时就会出现
         }

         node.next = node; // help GC 将引用指向自身
     }
 }

unparkSuccessor方法

 private void unparkSuccessor(Node node) {
     /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
     int ws = node.waitStatus;//获取当前节点状态
     if (ws < 0)//如果节点为负数也即不是取消节点
         compareAndSetWaitStatus(node, ws, 0);//cas将当前节点置为0

     /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
     Node s = node.next;//获取到下一个节点
     if (s == null || s.waitStatus > 0) {//下一个节点等于空或者下一个节点是取消节点
         s = null;//将s置为空
         for (Node t = tail; t != null && t != node; t = t.prev)//从尾结点遍历找到一个不是取消状态的节点
             if (t.waitStatus <= 0)
                 s = t;
     }
     if (s != null)//如果s不等于空
         LockSupport.unpark(s.thread);//唤醒当前节点s
 }

release方法

public final boolean release(int arg) {
    if (tryRelease(arg)) {//子类实现如何释放锁
        Node h = head;//获取到头结点
        if (h != null && h.waitStatus != 0)//获取到头结点,如果头结点不为空,等待状态不为0,唤醒后续节点
            unparkSuccessor(h);
        return true;
    }
    return false;
}

private void unparkSuccessor(Node node) {
    /*
         * If status is negative (i.e., possibly needing signal) try
         * to clear in anticipation of signalling.  It is OK if this
         * fails or if status is changed by waiting thread.
         */
    int ws = node.waitStatus;//获取节点的等待状态
    if (ws < 0)//如果等待状态小于0,标识节点属于有效节点
        compareAndSetWaitStatus(node, ws, 0);//将当前节点的等待状态置为0

    /*
         * Thread to unpark is held in successor, which is normally
         * just the next node.  But if cancelled or apparently null,
         * traverse backwards from tail to find the actual
         * non-cancelled successor.
         */
    Node s = node.next;//获取到下一个节点
    if (s == null || s.waitStatus > 0) {//如果节点是空,或者是取消状态的节点,就找到一个非取消状态的节点,将取消状态的节点断链后由垃圾回收器进行回收
        s = null;
        for (Node t = tail; t != null && t != node; t = t.prev)
            if (t.waitStatus <= 0)
                s = t;
    }
    if (s != null)//节点不用空
        LockSupport.unpark(s.thread);//唤醒当前等待的有效节点S
}

acquireShared方法

public final void acquireShared(int arg) {
    if (tryAcquireShared(arg) < 0)//由子类实现
        doAcquireShared(arg);
}

doAcquireShared方法

private void doAcquireShared(int arg) {
    final Node node = addWaiter(Node.SHARED);//将共享节点也即读线程入队并返回
    boolean failed = true;
    try {
        boolean interrupted = false;
        for (;;) {
            final Node p = node.predecessor();//找到节点的前驱节点
            if (p == head) {//如果前驱节点等于头结点
                int r = tryAcquireShared(arg);//尝试获取共享锁数量
                if (r >= 0) {//如果锁的数量大于0,表示还有多余的共享锁。这里等于0也需要进一步判断。由于如果当执行到这里时,有另外的线程释放了共享锁,如果不进行判断,将会导致释放锁的线程没办法唤醒其他线程。所以这里会伪唤醒一个节点,唤醒的节点后续如果没有锁释放,依旧阻塞在当前parkAndCheckInterrupt方法中
                    setHeadAndPropagate(node, r);//将当前节点的等待状态设置为Propagate。
                    p.next = null; // help GC
                    if (interrupted)//判断是会否中断过
                        selfInterrupt();//设置中断标识位
                    failed = false;
                    return;
                }
            }
            if (shouldParkAfterFailedAcquire(p, node) &&//判断是否应该阻塞等待
                parkAndCheckInterrupt方法中())//阻塞并检查中断标识
                interrupted = true;//重置中断标识位
        }
    } finally {
        if (failed)//如果失败
            cancelAcquire(node);//取消节点
    }
}

setHeadAndPropagate方法

private void setHeadAndPropagate(Node node, int propagate) {
        Node h = head; // Record old head for check below
        setHead(node);//将当前节点置为头结点
        /*
         * Try to signal next queued node if:
         *   Propagation was indicated by caller,
         *     or was recorded (as h.waitStatus either before
         *     or after setHead) by a previous operation
         *     (note: this uses sign-check of waitStatus because
         *      PROPAGATE status may transition to SIGNAL.)
         * and
         *   The next node is waiting in shared mode,
         *     or we don't know, because it appears null
         *
         * The conservatism in both of these checks may cause
         * unnecessary wake-ups, but only when there are multiple
         * racing acquires/releases, so most need signals now or soon
         * anyway.
         */
        if (propagate > 0 //可获取的共享锁也即读锁的数量,对于ReentrantReadWriteLock而言,永远都是1,所以会继续唤醒下一个读线程
            || h == null //如果旧的头结点为空
            || h.waitStatus < 0 ||//头结点的等待状态不为0
            (h = head) == null || h.waitStatus < 0) {//旧头节点不为空并且等待状态小于0也即是有效节点
            Node s = node.next;//获取到node的下一个节点
            if (s == null || s.isShared())//如果node的下一个节点为空或者是共享节点
                doReleaseShared();//唤醒下一个线程
        }
    }

releaseShared方法

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//子类实现释放锁
        doReleaseShared();//唤醒后续线程
        return true;//释放成功
    }
    return false;//释放是吧
}

doReleaseShared方法

private void doReleaseShared() {
    /*
         * Ensure that a release propagates, even if there are other
         * in-progress acquires/releases.  This proceeds in the usual
         * way of trying to unparkSuccessor of head if it needs
         * signal. But if it does not, status is set to PROPAGATE to
         * ensure that upon release, propagation continues.
         * Additionally, we must loop in case a new node is added
         * while we are doing this. Also, unlike other uses of
         * unparkSuccessor, we need to know if CAS to reset status
         * fails, if so rechecking.
         */
    for (;;) {
        Node h = head;//获取到当前头结点
        if (h != null && h != tail) {//如果头结点不为空并且不等于尾结点
            int ws = h.waitStatus;//获取当前节点的等待状态
            if (ws == Node.SIGNAL) {//如果状态为SIGNAL
                if (!compareAndSetWaitStatus(h, Node.SIGNAL, 0))//cas将SIGNAL状态置为0。SIGNAL标识后续有线程需要唤醒
                    continue;            // loop to recheck cases
                unparkSuccessor(h);//唤醒后续线程
            }
            else if (ws == 0 &&//如果当前状态为0。表示有线程将其置为0
                     !compareAndSetWaitStatus(h, 0, Node.PROPAGATE))//cas将0状态置为PROPAGATE。在多个共享锁同时释放时,方便继续进行读传播,唤醒后续节点
                continue;                // loop on failed CAS
        }
        if (h == head)//如果头结点没有改变,证明没有必要继续循环等待了,直接退出吧,如果头结点放生变化,可能有其他线程释放了锁。
            break;
    }
}

await()

public final void await() throws InterruptedException {
    if (Thread.interrupted())//线程是否发生中断,是,就抛出中断异常
        throw new InterruptedException();
    Node node = addConditionWaiter();//加入条件等待队列
    int savedState = fullyRelease(node);//释放锁,并返回。因为当前线程需要等待
    int interruptMode = 0;
    while (!isOnSyncQueue(node)) {//判断是否在竞争队列中。AQS分为两个队列一个是竞争队列,等待调度执行,一个是等待队列等待在ConditionObject上。
        LockSupport.park(this);//阻塞等待
        if ((interruptMode = checkInterruptWhileWaiting(node)) != 0)
            break;
    }
    if (acquireQueued(node, savedState) && interruptMode != THROW_IE)//重新去获取锁并判断当前中断模式不是THROW_IE
        interruptMode = REINTERRUPT;//将中断模式置为REINTERRUPT
    if (node.nextWaiter != null) // clean up if cancelled如果当前节点的下一个节点不为空
        unlinkCancelledWaiters();//清除等待队列中已经取消的节点
    if (interruptMode != 0)//如果当前中断模式不等于0
        reportInterruptAfterWait(interruptMode);
}

private void reportInterruptAfterWait(int interruptMode)
    throws InterruptedException {
    if (interruptMode == THROW_IE)//如果是THROW_IE直接抛出异常
        throw new InterruptedException();
    else if (interruptMode == REINTERRUPT)//如果是REINTERRUPT
        selfInterrupt();//重置中断标识位
}

addConditionWaiter方法

private Node addConditionWaiter() {
    Node t = lastWaiter;//获取到最后一个节点
    // If lastWaiter is cancelled, clean out.
    if (t != null && t.waitStatus != Node.CONDITION) {//最后一个节点不等于空,并且等待状态不等于CONDITION
        unlinkCancelledWaiters();//将取消节点断链,标准的链表操作
        t = lastWaiter;//获取到最后一个有效的节点
    }
    Node node = new Node(Thread.currentThread(), Node.CONDITION);//将当前节点封装成node
    if (t == null)//如果最后一个节点为空,表示当前节点是第一个入队的节点
        firstWaiter = node;
    else
        t.nextWaiter = node;//否则将当前node挂在链表末尾
    lastWaiter = node;//设置最后节点的指针指向当前node
    return node;
}

fullyRelease方法

final int fullyRelease(Node node) {
    boolean failed = true;
    try {
        int savedState = getState();//获取当前state状态
        if (release(savedState)) {//释放锁尝试
            failed = false;
            return savedState;//返回
        } else {
            throw new IllegalMonitorStateException();//抛出释放锁异常
        }
    } finally {
        if (failed)
            node.waitStatus = Node.CANCELLED;//如果失败将节点置为取消状态
    }
}

public final boolean release(int arg) {
    if (tryRelease(arg)) {//尝试释放锁,在CyclciBarrier中由于线程需要去阻塞,所以需要将锁释放,后续重新拿锁
        Node h = head;
        if (h != null && h.waitStatus != 0)//从头结点开始唤醒
            unparkSuccessor(h);
        return true;
    }
    return false;
}

isOnSyncQueue方法

final boolean isOnSyncQueue(Node node) {
    if (node.waitStatus == Node.CONDITION || node.prev == null)//如果当前节点是Condition或者node.pre节点为空,标识不在竞争队列中,返回faslse
        return false;
    if (node.next != null) // If has successor, it must be on queue  表示在竞争队列中
        return true;
    /*
         * node.prev can be non-null, but not yet on queue because
         * the CAS to place it on queue can fail. So we have to
         * traverse from tail to make sure it actually made it.  It
         * will always be near the tail in calls to this method, and
         * unless the CAS failed (which is unlikely), it will be
         * there, so we hardly ever traverse much.
         */
    return findNodeFromTail(node);//从竞争队列的尾结点开始找当前node,找到就返回true,否则为false
}

private boolean findNodeFromTail(Node node) {
    Node t = tail;//获取到尾结点
    for (;;) {
        if (t == node)
            return true;
        if (t == null)
            return false;
        t = t.prev;
    }
}

findNodeFromTail方法

private int checkInterruptWhileWaiting(Node node) {
    return Thread.interrupted() ?//判断当前是否中断过
        (transferAfterCancelledWait(node) ? THROW_IE : REINTERRUPT) ://如果移动到竞争队列中并入队成功,返回THROW_IE,否则返回REINTERRUPT
    0;//没有中断过直接返回0
}

//走到这里表示条件队列的条件满足,可以将节点移动到竞争队列中执行
final boolean transferAfterCancelledWait(Node node) {
    if (compareAndSetWaitStatus(node, Node.CONDITION, 0)) {//尝试将当前为Condition的节点置为0,并移动到竞争队列中
        enq(node);
        return true;
    }
    /*
         * If we lost out to a signal(), then we can't proceed
         * until it finishes its enq().  Cancelling during an
         * incomplete transfer is both rare and transient, so just
         * spin.
         */
    while (!isOnSyncQueue(node))//如果不在竞争队列中返回false
        Thread.yield();
    return false;
}

signalAll方法

public final void signalAll() {
    if (!isHeldExclusively())//是不是持有独占锁
        throw new IllegalMonitorStateException();
    Node first = firstWaiter;//获取等待队列的第一个节点
    if (first != null)//如果节点不为空
        doSignalAll(first);//唤醒所有线程
}

//从头指针一直遍历等待队列,将其移动到竞争队列中
private void doSignalAll(Node first) {
    lastWaiter = firstWaiter = null;
    do {
        Node next = first.nextWaiter;
        first.nextWaiter = null;
        transferForSignal(first);//
        first = next;
    } while (first != null);
}

transferForSignal方法

final boolean transferForSignal(Node node) {
    /*
     * If cannot change waitStatus, the node has been cancelled.
     */
    if (!compareAndSetWaitStatus(node, Node.CONDITION, 0))//cas自旋将其等待状态改为0
        return false;

    /*
     * Splice onto queue and try to set waitStatus of predecessor to
     * indicate that thread is (probably) waiting. If cancelled or
     * attempt to set waitStatus fails, wake up to resync (in which
     * case the waitStatus can be transiently and harmlessly wrong).
     */
    Node p = enq(node);//将其放入竞争队列
    int ws = p.waitStatus;//获取节点的等待状态
    if (ws > 0 || !compareAndSetWaitStatus(p, ws, Node.SIGNAL))//如果节点是取消状态或者cas将其置为signal失败,唤醒当前线程,让他自己处理,后续在竞争队列中会自动移除取消节点
        LockSupport.unpark(node.thread);
    return true;
}

总结:AQS提供了统一的模板,对于如何入队出队以及线程的唤醒都由AQS提供默认的实现,只需要子类实现自己上锁和解锁的逻辑。

3.Semaphore

import java.util.concurrent.Semaphore;

public class SemaphoreDemo {
    public static void main(String[] args) {
        //Semaphore s = new Semaphore(2);
        Semaphore s = new Semaphore(2, true);
        //允许一个线程同时执行
        //Semaphore s = new Semaphore(1);
        new Thread(() -> {
            try {
                s.acquire();
                System.out.println("T1 running...");
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();

        new Thread(() -> {
            try {
                s.acquire();
                System.out.println("T2 running...");
                s.release();
            } catch (InterruptedException e) {
                e.printStackTrace();
            } finally {
                s.release();
            }
        }).start();
    }
}

Sync类

abstract static class Sync extends AbstractQueuedSynchronizer {
    private static final long serialVersionUID = 1192457210091910933L;

    Sync(int permits) {
        setState(permits);//设置信号量
    }

    final int getPermits() {
        return getState();//获得信号量
    }

    final int nonfairTryAcquireShared(int acquires) {//非公平锁的抢锁方式
        for (;;) {
            int available = getState();//获取state中的可用信号量
            int remaining = available - acquires;//减1
            if (remaining < 0 ||//信号量小于0,直接返回
                compareAndSetState(available, remaining))//尝试cas抢锁
                return remaining;//返回剩余的信号量
        }
    }

    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();//获取当前state
            int next = current + releases;//将state+1.也即信号量加1
            if (next < current) // overflow 非法条件判断,超过最大数量
                throw new Error("Maximum permit count exceeded");
            if (compareAndSetState(current, next))//cas尝试释放锁
                return true;//释放成功返回
        }
    }
	
    //减少信号量
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();//获取当前state
            int next = current - reductions;
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            if (compareAndSetState(current, next))//cas尝试减少信号量
                return;
        }
    }
	
    //清空信号数量
    final int drainPermits() {
        for (;;) {
            int current = getState();//获取当前state状态
            if (current == 0 || compareAndSetState(current, 0))//当前信号为0 或者将state置为0也即将信号数量置为0
                return current;
        }
    }
}

FairSync与NonfairSync的类实现

//公平锁
static final class FairSync extends Sync {
    private static final long serialVersionUID = 2014338818796000944L;

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        for (;;) {
            if (hasQueuedPredecessors())//队列中是否有线程在排队
                return -1;//获取失败
            int available = getState();//可用的信号量
            int remaining = available - acquires;//减去当前获取的数量
            if (remaining < 0 ||//可用的信号量小于0
                compareAndSetState(available, remaining))//cas设置state变量.
                return remaining;//返回可用的信号量
        }
    }
}

//非公平锁
static final class NonfairSync extends Sync {
    private static final long serialVersionUID = -2694183684443567898L;

    NonfairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);//详情请看父类的实现
    }
}

acquire方法

public void acquire() throws InterruptedException {
    sync.acquireSharedInterruptibly(1);//请查看父类实现,与acquireShared一致,不过加了一场处理
}

release方法:

public void release() {
    sync.releaseShared(1);
}

public final boolean releaseShared(int arg) {
    if (tryReleaseShared(arg)) {//Semaphore的类实现锁获取的方法。
        doReleaseShared();//与AQS中一致,不过多赘述
        return true;
    }
    return false;
}

​ 到了这里,其实AQS的源码基本已经覆盖了,对于AQS的源码也应该有了清楚的认知。总结就是:一个volatile 的state变量,两个等待队列(竞争队列,条件队列),通过cas的方式保证单变量的原子性。后续将会对Exchanger以及Phaser进行源码解析,到此基本AQS已经到了一个段落了。后续观看源码时,请注意多考虑一下多线程并发时可能出现的情况,去理解doug lea写代码的思路。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK