27

JUC 提供的限流利器-Semaphore(信号量)

 4 years ago
source link: http://mp.weixin.qq.com/s?__biz=MzIyNTM4ODI0OA%3D%3D&%3Bmid=2247484377&%3Bidx=1&%3Bsn=73dc15a285660dd62444212582a5b3e6
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

在 JUC 包下,有一个 Semaphore 类,翻译成信号量,Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。Semaphore 跟锁(synchronized、Lock)有点相似,不同的地方是,锁同一时刻只允许一个线程访问某一资源,而 Semaphore 则可以控制同一时刻多个线程访问某一资源。

Semaphore(信号量)并不是 Java 语言特有的,几乎所有的并发语言都有。所以也就存在一个 「信号量模型」 的概念,如下图所示:

JzuA7bF.png!web 信号量模型

信号量模型比较简单,可以概括为: 「一个计数器、一个队列、三个方法」

计数器:记录当前还可以运行多少个资源访问资源。

队列:待访问资源的线程

「三个方法」:

  • 「init()」:初始化计数器的值,可就是允许多少线程同时访问资源。

  • 「up()」:计数器加1,有线程归还资源时,如果计数器的值大于或者等于 0 时,从等待队列中唤醒一个线程

  • 「down()」:计数器减 1,有线程占用资源时,如果此时计数器的值小于 0 ,线程将被阻塞。

这三个方法都是原子性的,由实现方保证原子性。例如在 Java 语言中,JUC 包下的 Semaphore 实现了信号量模型,所以 Semaphore 保证了这三个方法的原子性。

Semaphore 是基于 AbstractQueuedSynchronizer 接口实现信号量模型的。AbstractQueuedSynchronizer 提供了一个基于 FIFO 队列,可以用于构建锁或者其他相关同步装置的基础框架,利用了一个 int 来表示状态,通过类似 acquire 和 release 的方式来操纵状态。关于 AbstractQueuedSynchronizer  更多的介绍,可以点击链接:

http://ifeve.com/introduce-abstractqueuedsynchronizer/

AbstractQueuedSynchronizer 在 Semaphore 类中的实现类如下:

abstract static class Sync extends AbstractQueuedSynchronizer {
        private static final long serialVersionUID = 1192457210091910933L;

        Sync(int permits) {
            setState(permits);
        }

        final int getPermits() {
            return getState();
        }

        final int nonfairTryAcquireShared(int acquires) {
            for (;;) {
                int available = getState();
                int remaining = available - acquires;
                if (remaining < 0 ||
                    compareAndSetState(available, remaining))
                    return remaining;
            }
        }

        protected final boolean tryReleaseShared(int releases) {
            for (;;) {
                int current = getState();
                int next = current + releases;
                if (next < current) // overflow
                    throw new Error("Maximum permit count exceeded");
                if (compareAndSetState(current, next))
                    return true;
            }
        }

        final void reducePermits(int reductions) {
            for (;;) {
                int current = getState();
                int next = current - reductions;
                if (next > current) // underflow
                    throw new Error("Permit count underflow");
                if (compareAndSetState(current, next))
                    return;
            }
        }

        final int drainPermits() {
            for (;;) {
                int current = getState();
                if (current == 0 || compareAndSetState(current, 0))
                    return current;
            }
        }
    }

在 Semaphore 类中,实现了两种信号量: 「公平的信号量和非公平的信号量」 ,公平的信号量就是大家排好队,先到先进,非公平的信号量就是不一定先到先进,允许插队。非公平的信号量效率会高一些,所以默认使用的是非公平信号量。具体的可以查看 Semaphore 类实现源码。

Semaphore 类中,主要有以下方法:

// 构造方法,参数表示许可证数量,用来创建信号量
public Semaphore(int permits);
// 从信号量中获取许可,相当于获取到执行权
public void acquire() throws InterruptedException;
// 尝试获取1个许可,不管是否能够获取成功,都立即返回,true表示获取成功,false表示获取失败
public boolean tryAcquire();
// 将许可还给信号量
public void release();

Semaphore 类的实现就了解的差不多了。可能你会有疑问 Semaphore 的应用场景是什么?Semaphore 可以用来限流(流量控制),在一些公共资源有限的场景下,Semaphore 可以派上用场。比如在做日志清洗时,可能有几十个线程在并发清洗,但是将清洗的数据存入到数据库时,可能只给数据库分配了 10 个连接池,这样两边的线程数就不对等了,我们必须保证同时只能有 10 个线程获取数据库链接,否则就会存在大量线程无法链接上数据库。

用 Semaphore 信号量来模拟这操作,代码如下:

public class SemaphoreDemo {
    /**
     * semaphore 信号量,可以限流
     *
     * 模拟并发数据库操作,同时有三十个请求,但是系统每秒只能处理 5 个
     */

    private static final int THREAD_COUNT = 30;

    private static ExecutorService threadPool = Executors
            .newFixedThreadPool(THREAD_COUNT);
	// 初始化信号量,个数为 5
    private static Semaphore s = new Semaphore(5);

    public static void main(String[] args) {
        for (int i = 0; i < THREAD_COUNT; i++) {
            threadPool.execute(new Runnable() {
                @Override
                public void run() {
                    try {
                        // 获取许可
                        s.acquire();
                        System.out.println(Thread.currentThread().getName()+" 完成数据库操作 ,"+ new SimpleDateFormat("yyyy-MM-dd hh:mm:ss").format( new Date()));
                        // 休眠两秒钟,效果更直观
                        Thread.sleep(2000);
                        // 释放许可
                        s.release();
                    } catch (InterruptedException e) {
                    }
                }
            });
        }
		// 关闭连接池
        threadPool.shutdown();
    }
}

运行效果如下:

3UBZ3iz.png!web 图片描述

从结果中,可以看出,每秒只有 5 个线程在执行,这符合我们的预期。

好了,关于 Semaphore 的内容就结束了,更加详细的还请您查阅相关资料和阅读 Semaphore 源码。希望这篇文章对您的学习或者工作有所帮助。

感谢您的阅读,祝好。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK