7

☕️【Java技术之旅】【AbstractQueuedSynchronizer】教你自定义实现自己的同步器 - Inf...

 3 years ago
source link: https://xie.infoq.cn/article/ea2f4b7208127dfb61f677a71
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

之前的文章中会涉及到了相关 AQS 的原理和相关源码的分析,所谓实践是检验真理的唯一标准!接下来就让我们活化一下 AQS 技术,主要针对于自己动手实现一个 AQS 同步器。

定义 MyLock 实现 Lock

Doug Lea 大神在 JDK1.5 编写了一个 Lock 接口,里面定义了实现一个锁的基本方法,我们只需编写一个 MyLock 类实现这个接口就好。

class MyLock implements Lock {    /**     * 加锁。如果不成功则进入等待队列     */    @Override    public void lock() {}    /**    * 加锁(可被interrupt)    */    @Override    public void lockInterruptibly() throws InterruptedException {}    /**     * 尝试加锁     */    @Override    public boolean tryLock() {}    /**     * 加锁 带超时的     */    @Override    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {}    /**    * 释放锁    */    @Override    public void unlock() {}    /**    * 返回一个条件变量(不在本案例谈论)    */    @Override    public Condition newCondition() {}}

定义好 MyLock 后,接下来就是实现各个方法的逻辑,达到真正的用于线程间 sync 互斥的需求。

自定义一个 MySync 继承自 AQS

接下来我们需要自定义一个继承自 AQS 的 MySync。实现自定义的 MySync 前,先了解 AQS 内部的一些基本概念。在 AQS 中主要的一些成员属性如下:

23c15489688308fb9c8e865137b725c0.png
  • state:用于标记资源状态,如果为 0 表示资源没有被占用,可以加锁成功。如果大于 0 表示资源已经被占用,然后根据自己的定义去实现是否允许对共享资源进行操作

  • 比如:ReentrantLock 的实现方式是当 state 大于 0,那么表示已经有线程获得锁了,我们都知道 ReentrantLock 是可重入的,其原理就是当有线程次进入同一个 lock 标记的临界区时。先判断这个线程是否是获得锁的那个线程,如果是,state 会+1,此时 state 会等于 2。

  • 当 unlock 时,会一层一层的减 1,直到 state 等于 0 则表示完全释放锁成功。

  • head、tail:用于存放获得锁失败的线程。在 AQS 中,每一个线程会被封装成一个 Node 节点,这些节点如果获得锁资源失败会链在 head、tail 中,成为一个双向链表结构。

  • exclusiveOwnerThread用于存放当前获得锁的线程,正如在 state 说明的那样。ReentrantLock 判断可重入的条件就是用这个 exclusiveOwnerThread 线程跟申请获得锁的线程做比较,如果是同一个线程,则 state+1,并重入加锁成功

知道这些概念后我们就可以自定义一个 AQS:

public final class MySync extends AbstractQueuedSynchronizer {    /**    * 尝试加锁    */    @Override    protected boolean tryAcquire(int arg) {        if (compareAndSetState(0, 1)) {            // 修改state状态成功后设置当前线程为占有锁资源线程            setExclusiveOwnerThread(Thread.currentThread());            return true;        }        return false;    }    /**    * 释放锁    */    @Override    protected boolean tryRelease(int arg) {        setExclusiveOwnerThread(null);        // state有volatile修饰,为了保证解锁后其他的一些变量对其他线程可见,把setExclusiveOwnerThread(null)放到上面 happens-before中定义的 volatile规则        setState(0);        return true;    }    /**    * 判断是否是独占锁    */    @Override    protected boolean isHeldExclusively() {        return getState() == 1;    }}

将 MySync 组合进 MyLock

最后一步就是将第一步中的所有方法逻辑完成

class MyLock implements Lock {    // 组合自定义sync器    private MySync sync = new MySync();    /**     * 加锁。如果不成功则进入等待队列     */    public void lock() {        sync.acquire(1);    }    /**    * 加锁(可被interrupt)    */    public void lockInterruptibly() throws InterruptedException {        sync.acquireInterruptibly(1);    }    /**     * 尝试加锁     */    public boolean tryLock() {        return sync.tryAcquire(1);    }    /**     * 加锁 带超时的     */    public boolean tryLock(long time, TimeUnit unit) throws InterruptedException {        return sync.tryAcquireNanos(1, unit.toMillis(time));    }    /**    * 释放锁    */    public void unlock() {        sync.release(0);    }    /**    * 返回一个条件变量(不在本案例谈论)    */    @Override    public Condition newCondition() {        return null;    }}

完成整个 MyLock 的逻辑后,发现在 lock()、unlock()中调用的自定义 sync 的方法 tryAcquire()和 tryRelease()方法。我们就以在 lock()方法中调用 acquire()方法说明模板设计模式在 AQS 中的应用。

点进.acquire()方法后,发现改该方法是来自 AbstractQueuedSynchronizer 中:

4aa6d2a4de57abc59c114214485bfbb3.png
  • 在这里面可以看到 tryAcquire 方法,继续点进去看看 tryAcquire(),发现该方法是一个必须被重写的方法,否则抛出一个运行时异常。

  • 模板方法设计模式在这里得以体现,再回到我们第二部中自定义的 MySync 中,就是重写了 AQS 中的 tryAcquire()方法。

c66b7c14a948aeaefdbfbc8e8ed1249a.png

因此整个自定义加锁的流程如下:

  • 调用 MyLock 的 lock(),lock()方法调用 AQS 的 acquire()方法

  • 在 acquire()方法中调用了 tryAcquire()方法进行加锁

  • 而 tryAcquire()方法在 AQS 中是一个必须让子类自定义重写的方法,否则会抛出一个异常

  • 因此调用 tryAcquire()时实际上是调用了我们自定义的 MySync 类中 tryAcquire()方法

AQS 作为 Java 并发体系下的关键类,在各种并发工具中都有它的身影,如 ReentrantLock、Semaphore 等。这些并发工具用于控制 sync 互斥的手段都是采用 AQS,外加 Cas 机制。AQS 采用了模板方法设计模式让子类们自定义 sync 互斥的条件,比如本案例中 MySync 类重写了 tryAcquire 方法。

下面实现一个自定义的 sync:

public class SelfSynchronizer {    private final Sync sync = new Sync();    public void lock() {        sync.acquire(1);    }    public boolean tryLock() {        return sync.tryAcquire(1);    }    public boolean unLock() {        return sync.release(1);    }    static class Sync extends AbstractQueuedSynchronizer {        //是否处于占用状态        @Override        protected boolean isHeldExclusively() {            return getState() == 1;        }        /**         * 获取sync资源         * @param acquires         * @return         */        @Override        public boolean tryAcquire(int acquires) {            if(compareAndSetState(0, 1)) {                setExclusiveOwnerThread(Thread.currentThread());                return true;            }            //这里没有考虑可重入锁            /*else if (Thread.currentThread() == getExclusiveOwnerThread()) {                int nextc = c + acquires;                if (nextc < 0) // overflow                    throw new Error("Maximum lock count exceeded");                setState(nextc);                return true;            }*/            return false;        }        /**         * 释放sync资源         * @param releases         * @return         */        @Override        protected boolean tryRelease(int releases) {            int c = getState() - releases;            boolean free = false;            if (c == 0) {                free = true;            }            setState(c);            return free;        }    }}

ReentrantLock 源码和上面自定义的 sync 很相似,测试下该 sync,i++在多线程下执行情况:

public class TestSelfSynchronizer {    private static int a = 0;    private static int b = 0;    private static SelfSynchronizer selfSynchronizer = new SelfSynchronizer();    private static ThreadPoolExecutor executor = new ThreadPoolExecutor(20, 50, 1, TimeUnit.SECONDS,            new LinkedBlockingQueue<Runnable>());    private static ExecutorService ec = Executors.newFixedThreadPool(20);    public static void main(String[] args) throws InterruptedException {        for (int i = 0; i < 20 ; i++) {            executor.submit(new Task());        }        for (int j = 0; j < 20 ; j++) {            ec.submit(new TaskSync());        }        Thread.sleep(10000);        System.out.println("a的值:"+ a);        System.out.println("b的值" + b);        executor.shutdown();        ec.shutdown();    }    static class Task implements Runnable {        @Override        public void run() {            for(int i=0;i<10000;i++) {                a++;            }        }    }    static class TaskSync implements Runnable {        @Override        public void run() {            for (int i = 0; i < 10000; i++) {              //使用sync器加锁                selfSynchronizer.lock();                b++;                selfSynchronizer.unLock();            }        }    }}

开启两个线程池,对 int 型变量自增 10000 次,如果不加 sync 器,最后值小于 200000,使用了自定义 sync 器则最后值正常等于 200000,这是因为每次自增操作加锁


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK