33

golang 源码剖析(6): 通道

 4 years ago
source link: https://studygolang.com/articles/27050
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.

简介(js)

通道(channel) 是Go实现CSP并发模型的关键, 鼓励用通信来实现数据共享。 Dont' communicate by sharing memory, share memory by communicating.

CSP : Communicating Sequential Process

创建

chan.go 中 hchan的结构

type hchan struct {
    qcount   uint           // total data in the queue
    dataqsiz uint           // size of the circular queue
    buf      unsafe.Pointer // points to an array of dataqsiz elements
    elemsize uint16
    closed   uint32
    elemtype *_type // element type
    sendx    uint   // send index
    recvx    uint   // receive index
    recvq    waitq  // list of recv waiters
    sendq    waitq  // list of send waiters

    // lock protects all fields in hchan, as well as several
    // fields in sudogs blocked on this channel.
    //
    // Do not change another G's status while holding this lock
    // (in particular, do not ready a G), as this can deadlock
    // with stack shrinking.
    lock mutex
}

type waitq struct {
    first *sudog
    last  *sudog
}

makechan : 这里先做了一些元素大小,队列大小检查。受垃圾回收器的限制,如果包含指针类型,则缓冲槽需单独分配内存,否则可一次性分配,调整buf的指针,最后设置size等属性

func makechan(t *chantype, size int) *hchan {
    elem := t.elem
    // compiler checks this but be safe.
    if elem.size >= 1<<16 { //限制chan的元素大小
        throw("makechan: invalid channel element type")
    }
    mem, overflow := math.MulUintptr(elem.size, uintptr(size)) //检查是否溢出
    if overflow || mem > maxAlloc-hchanSize || size < 0 {
        panic(plainError("makechan: size out of range"))
    }
    var c *hchan
    switch {
    case mem == 0:
        // Queue or element size is zero.
        c = (*hchan)(mallocgc(hchanSize, nil, true))
        // Race detector uses this location for synchronization.
        c.buf = c.raceaddr()
    case elem.ptrdata == 0:
        // Elements do not contain pointers.
        // Allocate hchan and buf in one call.
        c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
        c.buf = add(unsafe.Pointer(c), hchanSize)
    default:
        // Elements contain pointers.
        c = new(hchan)
        c.buf = mallocgc(mem, elem, true)
    }

收发

这里用sudog用来保存收发队列,其中包含一个元素和g的指针,这里也实现了cache,central那一套缓存体系.

acquireSudog 获取sudog和 releaseSudog 释放sudog, 大致流程也是先从本地p获取,接着再去sched.sudogcache中获取.

type sudog struct {
    g *g
    elem     unsafe.Pointer // data element (may point to stack)
}
type p struct {
    sudogcache []*sudog
    sudogbuf   [128]*sudog
}
type schedt struct {
    // Central cache of sudog structs.
    sudoglock  mutex
    sudogcache *sudog
}

发送

在go1.13的源码中已经不判断 c.dataqsiz==0 , 也就是将缓冲长度的0的大于0的整合在一起了。

如果 block=false : 如果通道为nil, 则直接返回false. 对于无缓冲的情况,如果没有接收者会直接return false。 如果有缓冲但是缓冲满了也会return false。

如果通道关闭了,会触发panic。

尝试等待队列 c.recvq 中有等待者的话, 就直接将数据复制到sg.elem(如果是带缓冲的则更新缓冲的index等参数),并唤醒对应的groutine。

如果没有等待者,并且缓冲队列能存下,则获取一个sudog之后将数据放入 sendq 并返回

如果缓冲队列存不下,则调用 goparkunlock 然当前goroutine休眠,直到被 goready 唤醒,然后释放当前的sudog

// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
    chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) {
    if c == nil {
        if !block {
            return false
        }
        gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
        throw("unreachable")
    }
    if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
        (c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
        return false
    }
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("send on closed channel"))
    }
    if sg := c.recvq.dequeue(); sg != nil {
        // Found a waiting receiver. We pass the value we want to send
        // directly to the receiver, bypassing the channel buffer (if any).
        send(c, sg, ep, func() { unlock(&c.lock) }, 3)
        return true
    }
    if c.qcount < c.dataqsiz {
        // Space is available in the channel buffer. Enqueue the element to send.
        qp := chanbuf(c, c.sendx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
        }
        typedmemmove(c.elemtype, qp, ep)
        c.sendx++
        if c.sendx == c.dataqsiz {
            c.sendx = 0
        }
        c.qcount++
        unlock(&c.lock)
        return true
    }
}

接收

接收类似,但是在通道关闭并且缓冲中无数据时,会返回一个默认值。

故而在通道关闭之后还是能获取到一个值. 但是此时的返回中 received 变成了false

注意: 可能是由于如果队列满的话,可以直接将那块地址的数据做swap,才将有数据分为队列满不满的两种.在看select的时候判断条件有点让人不好理解.

在recv函数中, sg是sender,go 在这边的处理流程是 sg := c.sendq.dequeue() ,先从sendq中取出一个,如果sg不为nil,则调用 recv(c, sg, ep, func() { unlock(&c.lock) }, 3)

在recv()中,如果 c.dataqsiz>0 ,也就是带缓冲chan,将调用 typedmemmove(c.elemtype, ep, qp) 把queue的数据复制给 ep ,然后调用 typedmemmove(c.elemtype, qp, sg.elem) 将sg.elem(也就是pop出来的sender)的数据复制给qp

func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
    if c.dataqsiz == 0 {
    } else {
        // Queue is full. Take the item at the
        // head of the queue. Make the sender enqueue
        // its item at the tail of the queue. Since the
        // queue is full, those are both the same slot.
        qp := chanbuf(c, c.recvx)
        if raceenabled {
            raceacquire(qp)
            racerelease(qp)
            raceacquireg(sg.g, qp)
            racereleaseg(sg.g, qp)
        }
        // copy data from queue to receiver
        if ep != nil {
            typedmemmove(c.elemtype, ep, qp)
        }
        // copy data from sender to queue
        typedmemmove(c.elemtype, qp, sg.elem)
        c.recvx++
        if c.recvx == c.dataqsiz {
            c.recvx = 0
        }
        c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
    }
}
if c.closed != 0 && c.qcount == 0 {
        if raceenabled {
            raceacquire(c.raceaddr())
        }
        unlock(&c.lock)
        if ep != nil {
            typedmemclr(c.elemtype, ep)
        }
        return true, false
    }

关闭

buf
func closechan(c *hchan) {
    if c == nil {
        panic(plainError("close of nil channel"))
    }
    lock(&c.lock)
    if c.closed != 0 {
        unlock(&c.lock)
        panic(plainError("close of closed channel"))
    }
    c.closed = 1
    // release all readers
    for {
        sg := c.recvq.dequeue()
        if sg == nil {
            break
        }
        if sg.elem != nil {
            typedmemclr(c.elemtype, sg.elem)
            sg.elem = nil
        }
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    // release all writers (they will panic)
    for {
        sg := c.sendq.dequeue()
        if sg == nil {
            break
        }
        sg.elem = nil
        if sg.releasetime != 0 {
            sg.releasetime = cputicks()
        }
        gp := sg.g
        gp.param = nil
        if raceenabled {
            raceacquireg(gp, c.raceaddr())
        }
        glist.push(gp)
    }
    unlock(&c.lock)
    // Ready all Gs now that we've dropped the channel lock.
    for !glist.empty() {
        gp := glist.pop()
        gp.schedlink = 0
        goready(gp, 3)
    }
}

select

在go1.13的源码中, runtime/select.go 中已经没有newselect方法了,

select的处理移到了 src/cmd/compile/internal/gc/select.go 中. 大概看看注释就好了,不然就涉及到编译的过程了

在编译的时候,会遍历所有的节点,生成节点树,这是如果是 OSELECT 的话,则会调用 walkselect ,

walkselectcases 中。对Node这个对象就不研究了

mkcall("block", nil, &ln)
selectgo
// The result of walkstmt MUST be assigned back to n, e.g.
//  n.Left = walkstmt(n.Left)
func walkstmt(n *Node) *Node {
    case OSELECT:
        walkselect(n)
}
func walkselect(sel *Node) {
}
func walkselectcases(cases *Nodes) []*Node {
    if n == 0 {
        return []*Node{mkcall("block", nil, nil)}
    }

    // optimization: one-case select: single op.
    // TODO(rsc): Reenable optimization once order.go can handle it.
    // golang.org/issue/7672.
    if n == 1 {}

    // convert case value arguments to addresses.
    // this rewrite is used by both the general code and the next optimization.
    for _, cas := range cases.Slice() {}

    // optimization: two-case select but one is default: single non-blocking op.
    if n == 2 && (cases.First().Left == nil || cases.Second().Left == nil) {}
    // generate sel-struct
    selv := temp(types.NewArray(scasetype(), int64(n)))
    order := temp(types.NewArray(types.Types[TUINT16], 2*int64(n)))

    // register cases
    for i, cas := range cases.Slice() {
        setField("kind", nodintconst(kind))
    if c != nil {
            c = convnop(c, types.Types[TUNSAFEPTR])
            setField("c", c)
        }
        if elem != nil {
            elem = convnop(elem, types.Types[TUNSAFEPTR])
            setField("elem", elem)
        }
        if instrumenting {
            r = mkcall("selectsetpc", nil, nil, bytePtrToIndex(selv, int64(i)))
            init = append(init, r)
        }
    fn := syslook("selectgo")
    r.Rlist.Set1(mkcall1(fn, fn.Type.Results(), nil, bytePtrToIndex(selv, 0), bytePtrToIndex(order, 0), nodintconst(int64(n))))
}

selectgo 就是go总select语句的实现了

  1. 转类型成scases,pollorder,lockorder三个数组
  2. 将nil channel的scase统一成scase{},也就是 caseNil 类型方便处理
  3. 遍历case, 用 fastrandn 随机生成一个j,交换i,j的数据放到交换后的 pollorder 数组中
  4. 根据hchan的地址获得locking order(锁的顺序),使用简单堆排序来保证nlogn时间和常熟堆栈足迹
  5. 设置锁,将所有的chan锁住
  6. 开始遍历选择
    • 第一轮,按照pollorder,查找是否有已经在等待的,如果未找到,则看是否有caseDefault,有的话执行默认,然后返回. 这里对通道的检查, 如果所有的数据都堵塞(进不去,或者出不来) 则进入第二轮
    • 第二轮,将所有的chan都入队列。 caseRecv入c.recvq,caseSend入sendq,将当前G休眠等待被某一个chan唤醒( selparkcommit 会将unlock所有chan)
    • 第三轮, 轮训所有的case,将原先入队的数据全部dequeue,从queue中移除,并返回casei, 也就是获取到数据的case位置,
      然后判断,cas是不是nil, 因为有可能是close(chan)事件唤醒的,这时就需要再次loop,当然如果还是判断到closed的这个case, 这里就会返回默认值然后退出。
      这里比较重要的一个是:
  1. 如果chan是nil,则分支永远走不到。 如果chan是closed,那么只要轮到(由于算法的随机,可能有别的chan先走到)肯定都能进去
type scase struct {
    c           *hchan         // chan
    elem        unsafe.Pointer // data element
    kind        uint16
    pc          uintptr // race pc (for race detector / msan)
    releasetime int64
}
// selectgo implements the select statement.
//
// cas0 points to an array of type [ncases]scase, and order0 points to
// an array of type [2*ncases]uint16. Both reside on the goroutine's
// stack (regardless of any escaping in selectgo).
func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    // 将cas0和order0都转为数组
    cas1 := (*[1 << 16]scase)(unsafe.Pointer(cas0))
    order1 := (*[1 << 17]uint16)(unsafe.Pointer(order0))

        //转为slice,并拆分为pollorder和lockorder
    scases := cas1[:ncases:ncases]
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]

    // 遍历,将所有chan为nil的都改为scase{} 
    // Replace send/receive cases involving nil channels with
    // caseNil so logic below can assume non-nil channel.
    for i := range scases {
        cas := &scases[i]
        if cas.c == nil && cas.kind != caseDefault {
            *cas = scase{}
        }
    }
    // generate permuted order
    for i := 1; i < ncases; i++ {
        j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }
    // lock all the channels involved in the select
    sellock(scases, lockorder)
loop:
    // pass 1 - look for something already waiting
    // pass 2 - enqueue on all chans
    // wait for someone to wake us up
    // pass 3 - dequeue from unsuccessful chans
    selunlock(scases, lockorder)
    goto retc
}

其他

这里想到一个竞争的问题,也就是select阻塞时入了所有的chan列表,当多个chan都去唤醒时怎么保证这个竞争问题

ready这个函数中如果一个协程已经不是Gwaiting状态,再次设置则会报错.

解决的关键就在于 selectDone 这个参数

dequeue 函数中, sgp.g.selectDone这个参数是原子性的,在入队时将其isSelect参数设置为true.

通过这个判断,和对selectDone改为1的过程中,如果改失败了则会跳过这个g,继续选择, 在select的处理逻辑中,当该协程唤醒后,会将select中的chan全部退回,这样就不会出现问题了。

// Mark runnable.
_g_ := getg()
mp := acquirem() // disable preemption because it can be holding p in a local var
if status&^_Gscan != _Gwaiting {
    dumpgstatus(gp)
    throw("bad g->status in ready")
}
func (q *waitq) dequeue() *sudog {
        if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
            continue
        }
}

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK