4

源码角度了解阻塞队列之SynchronousQueue

 2 years ago
source link: https://blog.51cto.com/u_15460453/5689089
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

源码角度了解阻塞队列之SynchronousQueue

推荐 原创

周杰伦本人 2022-09-19 14:56:39 ©著作权

文章标签 java 阻塞队列 sed 文章分类 Java 编程语言 yyds干货盘点 阅读数341

源码角度了解阻塞队列之SynchronousQueue

SynchronousQueue是一个同步队列,它没有任何的容量,插入操作都必须等待另一个线程的相应删除操作

从它的构造方法中我们可以看到,可以指定是否为公平的队列,如果是公平的使用队列,如果不是公平的,使用栈来存储

put()方法

public void put(E e) throws InterruptedException {
    if (e == null) throw new NullPointerException();
    if (transferer.transfer(e, false, 0) == null) {
        Thread.interrupted();
        throw new InterruptedException();
    }
}

我们可以看到它的put()方法没有什么逻辑,主要调用Transfererd的transfer()方法来放入元素

transfer()方法

Transfererd是一个抽象类,它的实现类有TransferQueue和TransferStack,分别是公平队列的实现和非公平队列的实现

我们先看一下TransferQueue的transfer()方法的实现逻辑

TransferQueue的transfer()方法

E transfer(E e, boolean timed, long nanos) {
    QNode s = null; // constructed/reused as needed
    boolean isData = (e != null);

    for (;;) {
        QNode t = tail;
        QNode h = head;
        if (t == null || h == null)         // saw uninitialized value
            continue;                       // spin

        if (h == t || t.isData == isData) { // empty or same-mode
            QNode tn = t.next;
            if (t != tail)                  // inconsistent read
                continue;
            if (tn != null) {               // lagging tail
                advanceTail(t, tn);
                continue;
            }
            if (timed && nanos <= 0)        // can't wait
                return null;
            if (s == null)
                s = new QNode(e, isData);
            if (!t.casNext(null, s))        // failed to link in
                continue;

            advanceTail(t, s);              // swing tail and wait
            Object x = awaitFulfill(s, e, timed, nanos);
            if (x == s) {                   // wait was cancelled
                clean(t, s);
                return null;
            }

            if (!s.isOffList()) {           // not already unlinked
                advanceHead(t, s);          // unlink if head
                if (x != null)              // and forget fields
                    s.item = s;
                s.waiter = null;
            }
            return (x != null) ? (E)x : e;

        } else {                            // complementary-mode
            QNode m = h.next;               // node to fulfill
            if (t != tail || m == null || h != head)
                continue;                   // inconsistent read

            Object x = m.item;
            if (isData == (x != null) ||    // m already fulfilled
                x == m ||                   // m cancelled
                !m.casItem(x, e)) {         // lost CAS
                advanceHead(h, m);          // dequeue and retry
                continue;
            }

            advanceHead(h, m);              // successfully fulfilled
            LockSupport.unpark(m.waiter);
            return (x != null) ? (E)x : e;
        }
    }
}

代码有点长,我们分析一下:

  1. isData代表是生产者模式还是消费者模式,如果传入的元素e是空的话显然是消费者模式
  2. for循环,初始化QNode的头结点和尾结点,如果为空的话说明没有初始化好,继续for循环进行初始化
  3. 如果头尾节点相同或者是尾部节点模式和传入的模式相同的话,添加到尾部
  4. 新建节点,添加到尾部,然后调用awaitFulfill()方法进入阻塞状态
  5. 当线程唤醒发现在等待队列中第一个的时候,返回元素
  6. 如果模式不相同的话,比如来的是消费者,而等待队列全是生产者,这时候与队列中的第一个元素进行配的,配对成功出队列,同时调用LockSupport.unpark(m.waiter)来唤醒生产者

TransferStack的transfer()方法

  1. 同样判断是否为同种模式,如果是同种模式,入栈 阻塞
  2. 如果不是同种模式,进行配对一起出栈

take()方法

SynchronousQueue的take()方法也是调用transferer的transfer()方法来获取元素,然后返回元素

    public E take() throws InterruptedException {
        E e = transferer.transfer(null, false, 0);
        if (e != null)
            return e;
        Thread.interrupted();
        throw new InterruptedException();
    }

SynchronousQueue有公平队列和非公平队列两种实现方式,公平队列采用队列实现先进先出,非公平队列采用栈实现,先进后出,它的最大特点就是生产者来了需要阻塞,当对应的消费者来之后匹配成功才会被唤醒,这个SynchronousQueue产生阻塞,效率不高,一般不怎么使用

至此,我们的阻塞队列的实现基本介绍了,有ArrayBlockingQueue、LinkedBlockingQueue、PriorityQueue、DelayQueue和SynchronousQueue

❤️ 感谢大家

如果你觉得这篇内容对你挺有有帮助的话:

  1. 欢迎关注我❤️,点赞👍🏻,评论🤤,转发🙏
  2. 关注盼盼小课堂,定期为你推送好文,还有群聊不定期抽奖活动,可以畅所欲言,与大神们一起交流,一起学习。
  • 收藏
  • 评论
  • 分享
  • 举报

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK