30

Go cond 源码学习

 5 years ago
source link: https://www.tuicool.com/articles/jEJZZz6
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

概述

cond是go语言sync提供的条件变量,通过cond可以让一系列的goroutine在触发某个条件时才被唤醒。每一个cond结构体都包含一个锁L。cond提供了三个方法:

  • Signal:调用Signal之后可以唤醒单个goroutine。
  • Broadcast:唤醒等待队列中所有的goroutine。
  • Wait:会把当前goroutine放入到队列中等待获取通知,调用此方法必须先Lock,不然方法里会调用Unlock()报错。

简单使用

创建40个goroutine都wait阻塞住。调用Signal则唤醒第一个goroutine。调用Broadcast则唤醒所有等待的goroutine。

package main

import (
    "fmt"
    "sync"
    "time"
)

var locker = new(sync.Mutex)
var cond = sync.NewCond(locker)

func test(x int) {
    cond.L.Lock() //获取锁
    cond.Wait()   //等待通知  暂时阻塞
    fmt.Println(x)
    time.Sleep(time.Second * 1)
    cond.L.Unlock() //释放锁
}
func main() {
    for i := 0; i < 40; i++ {
        go test(i)
    }
    fmt.Println("start all")
    time.Sleep(time.Second * 3)
    fmt.Println("broadcast")
    cond.Signal() // 下发一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 3)
    cond.Signal() // 3秒之后 下发一个通知给已经获取锁的goroutine
    time.Sleep(time.Second * 3)
    cond.Broadcast() //3秒之后 下发广播给所有等待的goroutine
    time.Sleep(time.Second * 60)
}

源码分析

Cond

type Cond struct {
    noCopy noCopy

    // 锁的具体实现,通常为 mutex 或者rwmutex
    L Locker
    // notifyList对象,维护等待唤醒的goroutine队列,使用链表实现
    notify  notifyList
    checker copyChecker
}

// 新建cond初始化cond对象
func NewCond(l Locker) *Cond {
    return &Cond{L: l}
}

type notifyList struct {
    // 等待数量
    wait uint32

    // 通知数量
    notify uint32

    // 锁对象
    lock mutex
    // 链表头
    head *sudog
    // 链表尾
    tail *sudog
}

Wait

// 等待函数
func (c *Cond) Wait() {
    c.checker.check()
    // 等待计数器加1 看下面具体实现
    t := runtime_notifyListAdd(&c.notify)
    c.L.Unlock()
    // 
    runtime_notifyListWait(&c.notify, t)
    c.L.Lock()
}

// 此函数在sema.go中控制计数器加1
func notifyListAdd(l *notifyList) uint32 {
    // This may be called concurrently, for example, when called from
    // sync.Cond.Wait while holding a RWMutex in read mode.
    return atomic.Xadd(&l.wait, 1) - 1
}

// 此函数在sema.go中
// 获取当前goroutine 添加到链表末端,然后goparkunlock函数休眠阻塞当前goroutine
// goparkunlock函数会让出当前处理器的使用权并等待调度器的唤醒
func notifyListWait(l *notifyList, t uint32) {
    lock(&l.lock)

    // Return right away if this ticket has already been notified.
    if less(t, l.notify) {
        unlock(&l.lock)
        return
    }

    // Enqueue itself.
    s := acquireSudog()
    s.g = getg()
    s.ticket = t
    s.releasetime = 0
    t0 := int64(0)
    if blockprofilerate > 0 {
        t0 = cputicks()
        s.releasetime = -1
    }
    if l.tail == nil {
        l.head = s
    } else {
        l.tail.next = s
    }
    l.tail = s
    goparkunlock(&l.lock, "semacquire", traceEvGoBlockCond, 3)
    if t0 != 0 {
        blockevent(s.releasetime-t0, 2)
    }
    releaseSudog(s)
}

Broadcast

唤醒链表中所有的阻塞中的goroutine,还是使用readyWithTime来实现这个功能

func (c *Cond) Broadcast() {
    c.checker.check()
    runtime_notifyListNotifyAll(&c.notify)
}

// 源代码在sema.go中
func notifyListNotifyAll(l *notifyList) {
    // Fast-path: if there are no new waiters since the last notification
    // we don't need to acquire the lock.
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    // Pull the list out into a local variable, waiters will be readied
    // outside the lock.
    lock(&l.lock)
    s := l.head
    l.head = nil
    l.tail = nil

    // Update the next ticket to be notified. We can set it to the current
    // value of wait because any previous waiters are already in the list
    // or will notice that they have already been notified when trying to
    // add themselves to the list.
    atomic.Store(&l.notify, atomic.Load(&l.wait))
    unlock(&l.lock)

    // Go through the local list and ready all waiters.
    for s != nil {
        next := s.next
        s.next = nil
        readyWithTime(s, 4)
        s = next
    }
}

Signal

// 调用runtime_notifyListNotifyOne方法唤醒链表头的goroutine
func (c *Cond) Signal() {
    c.checker.check()
    runtime_notifyListNotifyOne(&c.notify)
}

// runtime_notifyListNotifyOne具体实现 获取链表头部的G,然后调用readyWithTime唤醒goroutine
// 源代码在sema.go中
func notifyListNotifyOne(l *notifyList) {
    if atomic.Load(&l.wait) == atomic.Load(&l.notify) {
        return
    }

    lock(&l.lock)

    t := l.notify
    if t == atomic.Load(&l.wait) {
        unlock(&l.lock)
        return
    }

    atomic.Store(&l.notify, t+1)
    
    for p, s := (*sudog)(nil), l.head; s != nil; p, s = s, s.next {
        if s.ticket == t {
            n := s.next
            if p != nil {
                p.next = n
            } else {
                l.head = n
            }
            if n == nil {
                l.tail = p
            }
            unlock(&l.lock)
            s.next = nil
            readyWithTime(s, 4)
            return
        }
    }
    unlock(&l.lock)
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK