4

Golang chan

 2 years ago
source link: https://jerryzhou343.github.io/post/golang_chan_component/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

go 语言中重要的组件chan,可以简单的理解为一个带锁的消息队列。

0x01 组件元素

1.1 chan的定义

更愿意称chan为golang语言中的组件,而不是语法糖;chan有边界,例如一个抽水机,有和外部交互的输入输出,内部有自己组成元素的交互逻辑。下面是chan的定义,主要的关键元素有data buff,recvq,sendq,三者呈现了生产者,消费者,数据数组的模型。

//sodog 表达了一个g协程在等待列表的情况。 例如在chan中的发送和接受
//sudog 表述的是多对多的关系,一个go协程可以在多个等待列表中。
//所以多个sodog 关联了一个go 协程,可能多个sudogs 在等待同一个同步对象。
type sudog struct {
	//关联的go协程
	g *g
	next *sudog
	prev *sudog
	//省略....
	//发送消息成功的情况下被唤醒,该字段为true
	//chan close 发送消息失败,情况下被唤醒,该字段为false
	success bool
}

//sodog 等待队列,记录了头部和尾部,环状。
type waitq struct {
	first *sudog
	last  *sudog
}

// 语法中 chan的定义
type hchan struct {
 //队列中数据量
	qcount   uint           // total data in the queue
	//环形队列的大小
	dataqsiz uint           // size of the circular queue
	//数据数组指针
	buf      unsafe.Pointer // points to an array of dataqsiz elements
	//元素大小
	elemsize uint16
	//chan组件关闭标识
	closed   uint32
	//元素类型
	elemtype *_type // element type
	//发送者index
	sendx    uint   // send index
	//接受者index
	recvx    uint   // receive index
	//接受者,环状队列
	recvq    waitq  // list of recv waiters
	//发送者,环状队列
	sendq    waitq  // list of send waiters

	// lock protects all fields in hchan, as well as several
	// fields in sudogs blocked on this channel.
	//
	// Do not change another G's status while holding this lock
	// (in particular, do not ready a G), as this can deadlock
	// with stack shrinking.
	//排它锁
	lock mutex
}

1.2 和外部的交互

chan 对象实例的生命周期过程包含创建,关闭,输入数据,输出数据。

1.2.1 make

func makechan(t *chantype, size int) *hchan {
	elem := t.elem
	//元素尺寸大小安全检查
	// compiler checks this but be safe.
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	//内存对齐检查
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}
	//缓冲buff 内存大小溢出检查,mem 表示要分配的内存大小
	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	//不带缓冲
	case mem == 0:
		// Queue or element size is zero.
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		// Race detector uses this location for synchronization.
		c.buf = c.raceaddr()
		//元素指向的数据载体大小为零
	case elem.ptrdata == 0:
		// Elements do not contain pointers.
		// Allocate hchan and buf in one call.
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// Elements contain pointers.
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}
	//初始化chan的属性。
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)
	lockInit(&c.lock, lockRankHchan)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; dataqsiz=", size, "\n")
	}
	return c
}

1.2.2 close

下面是chan的关闭源码,chan在关闭的时候会唤醒所有在等待的sender go协程 和 receiver 协程。

func closechan(c *hchan) {
//关闭一个空的chan,抛出异常
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	//判断关闭标志,如果已经关闭,抛出异常。
	lock(&c.lock)
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}
	
	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
		racerelease(c.raceaddr())
	}
	//关闭
	c.closed = 1

	// A gList is a list of Gs linked through g.schedlink. A G can only be
  // on one gQueue or gList at a time.
  //type gList struct {
  //	head guintptr
  //}
  //调度器使用go协程链表
	var glist gList

	// release all readers
	for {
	  //接收队列弹出一个sg
		sg := c.recvq.dequeue()
		if sg == nil {
			break
		}
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 得到关联的协程
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// release all writers (they will panic)
	for {
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = unsafe.Pointer(sg)
		sg.success = false
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		gp := glist.pop()
		gp.schedlink = 0
		// Mark gp ready to run.
		goready(gp, 3)
	}
}

1.2.3 send

/*
 * generic single channel send/recv
 * If block is not nil,
 * then the protocol will not
 * sleep but return if it could
 * not complete.
 *
 * sleep can wake up with g.param == nil
 * when a channel involved in the sleep has
 * been closed.  it is easiest to loop and re-run
 * the operation; we'll see that it's now closed.
 */
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	//判空检查,根据在不同场景下对nil chan写入时候的异常返回,这里的block表示协程在调
	//用的时候是否需要阻塞。例如:select 使用场景的时候协程不阻塞的,所以select场景
	//向nil chan写入数据时会返回false,而 chan <- xx 是阻塞场景,这时候向nil chan
	//写入数据会抛异常
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	//省略....
	//非阻塞场景,chan关闭或者满的情况下返回false
	if !block && c.closed == 0 && full(c) {
		return false
	}
	//省略...
 //如果接收者协程队列不为空,直接发送数据
  if sg := c.recvq.dequeue(); sg != nil {
  // Found a waiting receiver. We pass the value we want to send
  // directly to the receiver, bypassing the channel buffer (if any).
  send(c, sg, ep, func() { unlock(&c.lock) }, 3)
  return true
	}
	//如果buff还有空间,投递的消息对象入队列
	if c.qcount < c.dataqsiz {
		// Space is available in the channel buffer. Enqueue the element to send.
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			racenotify(c, c.sendx, nil)
		}
		typedmemmove(c.elemtype, qp, ep)
		c.sendx++
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		c.qcount++
		unlock(&c.lock)
		return true
	}
	//阻塞当前协程
	gp := getg()
	mysg := acquireSudog()
	c.sendq.enqueue(mysg)
	atomic.Store8(&gp.parkingOnChan, 1)
	
	//唤醒,释放协程
	closed := !mysg.success
	releaseSudog(mysg)
	if closed { //如果是chan 关闭情况下被唤醒抛异常
		if c.closed == 0 { 
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}

1.2.4 recev

// chan接收函数,接收值写入到ep 指针
// chanrecv receives on channel c and writes the received data to ep.
// ep may be nil, in which case received data is ignored.
// If block == false and no elements are available, returns (false, false).
//如果非阻塞,没有元素 返回false,false
// Otherwise, if c is closed, zeros *ep and returns (true, false).
//如果 chan关闭,接受元素为空,忽略接收的值返回true,false
// Otherwise, fills in *ep with an element and returns (true, true).
// 如果收到元素,返回true,true
// A non-nil ep must point to the heap or the caller's stack.
//非空的接收数据指针需要在堆上,或者调用者的栈上。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
	//阻塞情况下向空chan 收数据抛异常
	if c == nil {
		if !block {
			return
		}
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}
	//非阻塞、队列为空
	//如果chan关闭,直接返回false
		if !block && empty(c) {
      //省略....
      if atomic.Load(&c.closed) == 0 {
        //return false,false
        return 
      }
      //队列为空
      if empty(c) {
        return true, false
      }
		}
	
    if c.closed != 0 && c.qcount == 0 {
      return true, false
    }
	

	if sg := c.sendq.dequeue(); sg != nil {
		// Found a waiting sender. If buffer is size 0, receive value
		// directly from sender. Otherwise, receive from head of queue
		// and add sender's value to the tail of the queue (both map to
		// the same buffer slot because the queue is full).
		
		// 存在等待中的sender协程,如果buffer 为空,直接从sender处获得值
		// 如果buffer不为空,receiver 从缓冲队列的头部读取值,sender的值,存放到
		// buffer的队尾。
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	//队列不为空,从队列中拿
	if c.qcount > 0 {
		// Receive directly from queue
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			racenotify(c, c.recvx, nil)
		}
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		typedmemclr(c.elemtype, qp)
		c.recvx++
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		c.qcount--
		unlock(&c.lock)
		return true, true
	}
	//非阻塞情况,队列为空
	if !block {
		unlock(&c.lock)
		return false, false
	}
	
	//阻塞
	// no sender available: block on this channel.
	gp := getg()
	mysg := acquireSudog()
	c.recvq.enqueue(mysg)
	//唤醒
	success := mysg.success
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, success

0x02 使用tips

  1. 向关闭的chan写入数据会panic
  2. 由生产者关闭chan
  3. 多生产者时,引入协调者关闭chan
  4. chan 可以做FIFO

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK