31

深入浅出Semaphore源码解析

 3 years ago
source link: http://www.cnblogs.com/pinxiong/p/13332609.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

Semaphore 通过permits的值来限制线程访问临界资源的总数,属于有限制次数的共享锁,不支持重入。

前提条件

在理解 Semaphore 时需要具备一些基本的知识:

理解AQS的实现原理

之前有写过一篇 《深入浅出AQS源码解析》 关于AQS的文章,对AQS原理不了解的同学可以先看一下

Semaphore源码解析

Semaphore 中有3个内部类,分别是 SyncNonfairSyncFairSyncSyncNonfairSyncFairSync 的抽象类,我们会从解读 Semaphore 实现的功能开始入手逐渐去解析 SyncNonfairSyncFairSync 的源码

public class Semaphore implements java.io.Serializable {

    private final Sync sync;

    /**
     * 初始化permits个资源
     */
    public Semaphore(int permits) {
        sync = new NonfairSync(permits);
    }

    /**
     * 初始化permits个资源,根据fair来决定是使用公平锁还是非公平锁的方式
     */
    public Semaphore(int permits, boolean fair) {
        sync = fair ? new FairSync(permits) : new NonfairSync(permits);
    }

    /**
     * 中断方式获取一个资源
     */
    public void acquire() throws InterruptedException {
        sync.acquireSharedInterruptibly(1);
    }

    /**
     * 非中断方式获取一个资源
     */
    public void acquireUninterruptibly() {
        sync.acquireShared(1);
    }

    /**
     * 尝试获取一个资源
     */
    public boolean tryAcquire() {
        return sync.nonfairTryAcquireShared(1) >= 0;
    }

    /**
     * 尝试超时获取一个资源
     */
    public boolean tryAcquire(long timeout, TimeUnit unit)
        throws InterruptedException {
        return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
    }

    /**
     * 释放一个资源
     */
    public void release() {
        sync.releaseShared(1);
    }

    /**
     * 中断方式获取permits个资源
     */
    public void acquire(int permits) throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireSharedInterruptibly(permits);
    }

    /**
     * 非中断方式获取permits个资源
     */
    public void acquireUninterruptibly(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.acquireShared(permits);
    }

    /**
     * 尝试获取permits个资源
     */
    public boolean tryAcquire(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.nonfairTryAcquireShared(permits) >= 0;
    }

    /**
     * 尝试超时获取permits个资源
     */
    public boolean tryAcquire(int permits, long timeout, TimeUnit unit)
        throws InterruptedException {
        if (permits < 0) throw new IllegalArgumentException();
        return sync.tryAcquireSharedNanos(permits, unit.toNanos(timeout));
    }

    /**
     * 释放permits个资源
     */
    public void release(int permits) {
        if (permits < 0) throw new IllegalArgumentException();
        sync.releaseShared(permits);
    }

    /**
     * 获取当前可用资源数量
     */
    public int availablePermits() {
        return sync.getPermits();
    }

    /**
     * 用掉所有的资源,并返回用掉的资源数量
     */
    public int drainPermits() {
        return sync.drainPermits();
    }

    /**
     * 缩减reduction个资源
     */
    protected void reducePermits(int reduction) {
        if (reduction < 0) throw new IllegalArgumentException();
        sync.reducePermits(reduction);
    }
}

虽然 Semaphore 中的方法比较多,但是都比较简单,都是转调用 Sync 中的方法,通过解析 Sync 中的源码来帮助大家理解这些方法是如何实现的

Sync类源码解析

abstract static class Sync extends AbstractQueuedSynchronizer {
    // 获取所有可用的资源数量
    final int getPermits() {
        return getState();
    }
    // 非公平的方式尝试获取acquires个可用的资源
    final int nonfairTryAcquireShared(int acquires) {
        // 无限循环,尝试获取acquires个资源
        // 如果资源数量不够,返回剩余资源数量
        // 如果资源数量足够且获取成功,返回剩余的资源数量
        for (;;) {
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }

    // 尝试获取releases个资源
    protected final boolean tryReleaseShared(int releases) {
        for (;;) {
            int current = getState();
            int next = current + releases;
            // 当releases不允许为负数
            if (next < current) // overflow
                throw new Error("Maximum permit count exceeded");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return true;
        }
    }

    // 缩减releases个资源
    final void reducePermits(int reductions) {
        for (;;) {
            int current = getState();
            int next = current - reductions;
            // 当releases不允许为负数,也就时不能通过该方法增加资源
            if (next > current) // underflow
                throw new Error("Permit count underflow");
            // CAS操作尝试修改state的值
            if (compareAndSetState(current, next))
                return;
        }
    }

    // 清空所有的资源数量
    final int drainPermits() {
        for (;;) {
            int current = getState();
            // CAS操作尝试将资源数量设置为0
            if (current == 0 || compareAndSetState(current, 0))
                return current;
        }
    }
}

FairSync类源码解析

FairSync 中的源码很简单,直接上代码

static final class FairSync extends Sync {

    FairSync(int permits) {
        super(permits);
    }

    protected int tryAcquireShared(int acquires) {
        /**
         * 具体思路如下:
         * 1、如果AQS的同步队列中有等待的线程,直接获取失败,会加入到AQS的同步队列中
         * 2、如果AQS的同步队列为空,尝试修改state的值来获取acquires个资源
         * 3、一直重复步骤1和2,直到有结果返回才退出无限循环
         */
        for (;;) {
            if (hasQueuedPredecessors())
                return -1;
            int available = getState();
            int remaining = available - acquires;
            if (remaining < 0 ||
                compareAndSetState(available, remaining))
                return remaining;
        }
    }
}

NonfairSync类源码解析

NonfairSync 中的源码就更简单,解析如下:

static final class NonfairSync extends Sync {

    NonfairSync(int permits) {
        super(permits);
    }
    
    // 抢占式的获取acquires个资源
    protected int tryAcquireShared(int acquires) {
        return nonfairTryAcquireShared(acquires);
    }
}

总结

  • permits 初始化为 1 时, Semaphore 变成了一个互斥的排他锁
  • permits 初始化为无穷大时, Semaphore 变成了无锁模式
  • state 的值为 0 的时候,无法获取资源,获取资源的线程会进入AQS的同步队列等待有资源释放时被唤醒
  • Semaphore 初始化成 非公平锁 时,可能会出现有的线程饿死的情况,一般对于控制资源的使用而言,建议初始化为 公平锁
  • 可以调用 reducePermits 动态的缩减资源的数量,但是不能增加资源的数量

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK