5

golang disruptor lockfree 高性能无锁队列的设计实现原理

 1 year ago
source link: https://xiaorui.cc/archives/7380
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

golang disruptor lockfree 高性能无锁队列的设计实现原理 – 峰云就她了

专注于Golang、Kubernetes、Nosql、Istio

golang disruptor lockfree 高性能无锁队列的设计实现原理

腾讯的老哥在社区中开源了 golang lockfree 的库,本人看到该库的设计很有意思,就参与该项目的开发改进,已经是该项目的 contributor 了. 该库使用 golang 开发,相比社区中其他 golang lockfree 来说,api 更友好,性能更好,其提供了各种的休眠阻塞策略,避免过度的 spin 引发 cpu 开销升高。

项目地址: https://github.com/bruceshao/lockfree

202304181309777.png

lockfree queue 的基本原理还是好理解的,设计上借鉴参考了无锁队列的标杆 java disruptor,另外在代码中很多性能的优化点也是参照了 java disruptor。

lockfree queue 无锁的设计大概流程是这样,首先需要一个原子递增的发号器。生产者写数据时,先拿到一个 seq 序号,通过位运算找到 ringbuffer 的位置,如何 ringbuffer 还有空余空间,只需写到 ringbuffer 对应位置即可,如果空间已满,则需要等待。而读取数据只需判断对应结构的 seq 跟 consumer seq 是否一致即可。

什么场景下需要使用 lockfree 无锁队列?

就 go lockfree 和 disruptor 的设计来说,首先对性能很敏感,另外 consumer 为计算密集型或非阻塞操作,比如你使用 disruptor 队列做通知队列,consumer 收到事件后,进行 http 请求操作,这显然是无法体现 lockfree 队列的性能。反而因为 consumer 消费过慢,引发 producer 和 consumer 的 spin 自旋开销。

我的理解,性能敏感且 consumer 处理的足够快的场景可以使用 lockfree 无锁队列。

lockfree vs channel 的性能表现

如下所述.

  1. 在goroutine数量比较小时,lockfree 和 channel 性能差别不明显;
  2. 当goroutine打到一定数量(大于1000)后,lockfree无论从时间还是 QR 都远远超过chan;

一句话,大多数场景不需要 lockfree 无锁队列,除非追求机制的性能体验。平时用 golang channel 足矣了,如果 golang channel 出现并发的性能瓶颈,其实也可以变通下,切分多个 channel 来分担 mutex 锁竞争冲突,以提高 channel 的读写并行吞吐。

lockfree 如何使用 ?

该库的使用很简单。先实例化 lockfree 队列对象,然后就可并发写,读则是注册一个回调方法,lockfree consumer 消费数后会回调注册的方法。

package main

import (
    "fmt"
    "sync"
    "sync/atomic"
    "time"

    "github.com/bruceshao/lockfree"
)

var (
    goSize    = 10000
    sizePerGo = 10000

    total = goSize * sizePerGo
)

func main() {
    // lockfree计时
    now := time.Now()

    // 创建事件处理器
    handler := &eventHandler[uint64]{
        signal: make(chan struct{}, 0),
        now:    now,
    }

    // 创建消费端串行处理的Lockfree
    lf := lockfree.NewLockfree[uint64](
        1024*1024,
        handler,
        lockfree.NewSleepBlockStrategy(time.Millisecond),
    )

    // 启动Lockfree
    if err := lf.Start(); err != nil {
        panic(err)
    }

    // 获取生产者对象
    producer := lf.Producer()

    // 并发写入
    var wg sync.WaitGroup
    wg.Add(goSize)
    for i := 0; i < goSize; i++ {
        go func(start int) {
            for j := 0; j < sizePerGo; j++ {
                err := producer.Write(uint64(start*sizePerGo + j + 1))
                if err != nil {
                    panic(err)
                }
            }
            wg.Done()
        }(i)
    }

    // wait for producer
    wg.Wait()

    fmt.Printf("producer has been writed, write count: %v, time cost: %v \n", total, time.Since(now).String())

    // wait for consumer
    handler.wait()

    // 关闭Lockfree
    lf.Close()
}

type eventHandler[T uint64] struct {
    signal   chan struct{}
    gcounter uint64
    now      time.Time
}

func (h *eventHandler[T]) OnEvent(v uint64) {
    cur := atomic.AddUint64(&h.gcounter, 1)
    if cur == uint64(total) {
        fmt.Printf("eventHandler has been consumed already, read count: %v, time cose: %v\n", total, time.Since(h.now))
        close(h.signal)
        return
    }

    if cur%10000000 == 0 {
        fmt.Printf("eventHandler consume %v\n", cur)
    }
}

func (h *eventHandler[T]) wait() {
    <-h.signal
}

go lockfree 源码分析

实例化 lockfree 对象

type Lockfree[T any] struct {
    writer   *Producer[T]
    consumer *consumer[T]
    status   int32
}

func NewLockfree[T any](capacity int, handler EventHandler[T], blocks blockStrategy) *Lockfree[T] {
    // 重新计算正确的容量
    capacity = minSuitableCap(capacity)

    // 发号器
    seqer := newSequencer(capacity)

    // 实例化 ringbuffer,队列是环形数组
    rbuf := newRingBuffer[T](capacity)

    // 实例化定义消费者
    cmer := newConsumer[T](rbuf, handler, seqer, blocks)

    // 实例化生产者
    writer := newProducer[T](seqer, rbuf, blocks)
    return &Lockfree[T]{
        writer:   writer,
        consumer: cmer,
        status:   READY,
    }
}

启动 lockfree

Start() 用来启动 lockfree,对于 consumer 来说会启动一个消费协程消费 ringbuffer 队列中的数据,而对于 producer 来说,只是把 status 只为 running 即可。

func (d *Lockfree[T]) Start() error {
    // 保证并发安全
    if atomic.CompareAndSwapInt32(&d.status, READY, RUNNING) {
        // 启动消费者
        if err := d.consumer.start(); err != nil {
            // 启动失败,重置 status
            atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
            return err
        }

        // 启动生产者
        if err := d.writer.start(); err != nil {
            // 恢复现场
            atomic.CompareAndSwapInt32(&d.status, RUNNING, READY)
            return err
        }
        return nil
    }
    return fmt.Errorf(StartErrorFormat, "Disruptor")
}

lockfree queue 通常都需要一个原子发号器的,其实就是递增的 uint64。lockfree 库里为了避免并发下带来的伪共享问题,使用填充的方式让 uint64 独占缓存行 cache line。

type sequencer struct {
    wc       *cursor
    rc       uint64 // 读取游标,因为该值仅会被一个g修改,所以不需要使用cursor
    capacity uint64
}

type cursor struct {
    // cache line 填充,这里适配了 64 和 128 字节的缓存行。
    p1, p2, p3, p4, p5, p6, p7       uint64
    v                                uint64
    p9, p10, p11, p12, p13, p14, p15 uint64
}

func newSequencer(capacity int) *sequencer {
    return &sequencer{
        wc:       newCursor(),
        rc:       1,
        capacity: uint64(capacity),
    }
}

// nextRead 获取下个要读取的位置
// 使用原子操作解决data race问题
func (s *sequencer) nextRead() uint64 {
    return atomic.LoadUint64(&s.rc)
}

func (s *sequencer) readIncrement() uint64 {
    return atomic.AddUint64(&s.rc, 1)
}

初始化 producer 对象

type Producer[T any] struct {
    seqer    *sequencer
    rbuf     *ringBuffer[T]
    blocks   blockStrategy
    capacity uint64
    status   int32
}

func newProducer[T any](seqer *sequencer, rbuf *ringBuffer[T], blocks blockStrategy) *Producer[T] {
    return &Producer[T]{
        seqer:    seqer,
        rbuf:     rbuf,
        blocks:   blocks,
        capacity: rbuf.cap(),
        status:   READY,
    }
}

func (q *Producer[T]) start() error {
    if atomic.CompareAndSwapInt32(&q.status, READY, RUNNING) {
        return nil
    }
    return fmt.Errorf(StartErrorFormat, "Producer")
}

先获取写的 seq 序号,判断 ringbuffer 是否可写,如可写,直接写入到对应的 ringbuffer 位置即可,不可写则需要自旋判断,直到可写为止。

func (q *Producer[T]) Write(v T) error {
    if q.closed() {
        return ClosedError
    }

    // 获取写的位置
    next := q.seqer.wc.increment()

    for {
        r := atomic.LoadUint64(&q.seqer.rc) - 1

        // 判断 ringbuffer 是否已满,consumer 的 seq + ringbuffer 长度为判断条件.
        if next <= r+q.capacity {
            // 可以写入数据,将数据写入到指定位置
            q.rbuf.write(next-1, v)
            // 释放,防止消费端阻塞
            q.blocks.release()
            return nil
        }

        // 队列已满,则暂时让出 cpu 资源,等待 runtime 下次调度,如果 consumer 消费不及时,那么这里会引发自旋问题。
        runtime.Gosched()

        // 再次判断是否已关闭
        if q.closed() {
            return ClosedError
        }
    }
}

如果消费者处理速度慢一些,那么队列会无空闲位置,也就是已满。那么生产者无法 write,内部会触发 runtime.Gosched 退让资源和切出调度,如果一段时间内未消费,生产者必然会因为这个 spin 操作引发 cpu 开销飙高。

所以,上面一直有说 lockfree / disruptor 的使用场景,消费者要足够的快。

runtime.Gosched 是暂停当前的 G,然后把该 G 放到队列尾部,重新切 g0 选择新的 G 进行调度。这个设计很好,社区中一些开源库有使用 atomic cas 实现的锁,当无法 cas 时,会执行 gosched 切换调度。

但就 lockfree 场景来说,如果 consumer 消费的不够快,那么 producer write 失败后会一直频繁的 gosched。这时候 runq 只有 producer,runtime goready 调度起来后又无法写,这个操作一直循环下来会使 cpu 飙高满载。

所以我这边又做了进一步的调整,写入时可以选择是否 sleep 休眠一下。经过压测得出 sleep 开销如下。

  • 10us 的空闲时间,cpu开销在 3% 左右;
  • 5us 的空间实现,cpu 开销在 10% 左右。
  • < 5us 的空间实现,cpu 开销接近 100%。
// WriteSleep 当 ringbuffer 已满时,先 cpu pause 则 sched 让出资源,最后进行 sleep 休眠,以此避免频繁 spin 引发的 cpu 开销。
func (q *Producer[T]) WriteSleep(v T, dur time.Duration) error {
    return q.write(v, dur)
}

func (q *Producer[T]) write(v T, dur time.Duration) error {
    if q.closed() {
        return ClosedError
    }

    next := q.seqer.wc.increment()
    var i = 0
    for {
        // 判断是否可以写入
        r := atomic.LoadUint64(&q.seqer.rc) - 1
        if next <= r+q.capacity {
            // 可以写入数据,将数据写入到指定位置
            q.rbuf.write(next-1, v)
            // 释放,防止消费端阻塞
            q.blocks.release()
            return nil
        }

        if i < spin {
            procyield(30)
        } else if i < spin+passiveSpin {
            runtime.Gosched()
        } else {
            time.Sleep(dur)
            i = 0
        }
        i++

        // 再次判断是否已关闭
        if q.closed() {
            return ClosedError
        }
    }
}

😁 关于 lockfree write 写失败 spin 引发的 cpu 过高问题,已经给作者提价 pull request (pr),但作者还没给合并,说要考虑用户的选择再考虑下。我表示不理解。

https://github.com/bruceshao/lockfree/pull/15

consumer 消费者

初始化 consumer

type consumer[T any] struct {
    status int32 // 运行状态
    rbuf   *ringBuffer[T]
    seqer  *sequencer
    blocks blockStrategy
    hdl    EventHandler[T]
}

func newConsumer[T any](rbuf *ringBuffer[T], hdl EventHandler[T], sequer *sequencer, blocks blockStrategy) *consumer[T] {
    return &consumer[T]{
        rbuf:   rbuf,
        seqer:  sequer,
        hdl:    hdl,
        blocks: blocks,
        status: READY,
    }
}

func (c *consumer[T]) start() error {
    // 保证单例
    if atomic.CompareAndSwapInt32(&c.status, READY, RUNNING) {
        go c.handle()
        return nil
    }
    return fmt.Errorf(StartErrorFormat, "Consumer")
}

consumer 消费数据

func (c *consumer[T]) handle() {
    // 获取 read 的 位置
    rc := c.seqer.nextRead()
    for {
        if c.closed() {
            return
        }
        var i = 0
        for {
            if c.closed() {
                return
            }

            // 位置的对象是否可读,可读获取对象的数据.
            if v, exist := c.rbuf.contains(rc - 1); exist {
                // 既然可读了,则递增 read 位置,下次轮询新位置.
                rc = c.seqer.readIncrement()

                // 调用注册的回调方法,把数据传递过去.
                c.hdl.OnEvent(v)
                i = 0
                break
            }

            // 当无数据可读时, 可按照策略进行阻塞等待.
            // 当轮询小于 4 次时, 每次执行 30 次 cpu pause 
            if i < spin {
                procyield(30)
            } else if i < spin+passiveSpin {
                // 空闲阻塞策略,当轮询在 4-6 次时, 每次执行 runtime sched 让出调度资源
                runtime.Gosched()
            } else {
                // 自定义阻塞策略
                c.blocks.block()

                // 重置为 0
                i = 0
            }
            i++
        }
    }
}

判断 ringbuffer 某个 seq 是否可读 ?

lockfree 开始的设计使用 bitmap 判断 ringbuffer 各个位置的数据情况,后来改成在每个 entry 对象里存放了 producer 写入的 seq。这样判断位置是否可读只需判断 entry.seq 是否一致即可。

func (r *ringBuffer[T]) contains(c uint64) (T, bool) {
    x := &r.buf[c&r.capMask]
    if atomic.LoadUint64(&x.c) == c+1 {
        v := x.val
        return v, true
    }
    return r.tDefault, false
}

lockfree queue 的设计还是很简单,首先需要一个原子递增的发号器,生产者并发写下,先拿到一个 seq 序号,然后 ringbuffer 不满下,只需写到 ringbuffer 对应位置即可。而读取数据只需判断对应结构的 seq 跟 consumer seq 是否一致即可。

至于 lockfree 的性能表现,如下所述.

  1. 在goroutine数量比较小时,lockfree和chan性能差别不明显;
  2. 当goroutine打到一定数量(大于1000)后,lockfree无论从时间还是QR都远远超过chan;

一句话,大多数场景不需要 lockfree 无锁队列,除非追求机制的性能体验。平时用 golang channel 足矣了,如果 golang channel 出现并发的性能瓶颈,其实也可以变通下,切分多个 channel 来分担 mutex 锁竞争冲突,以提高 channel 的读写并行吞吐。


大家觉得文章对你有些作用! 如果想赏钱,可以用微信扫描下面的二维码,感谢!
另外再次标注博客原地址  xiaorui.cc
weixin_new.jpg

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK