4

Golang context包源码分析

 2 years ago
source link: https://liangyaopei.github.io/2020/11/01/golang-context/
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

Golang context包源码分析

发表于

2020-11-01 分类于 Golang

本文基于 go1.15.2 darwin/amd64

context的目的是实现主协程对子协程的控制,作用包括取消执行、设置超时时间、携带键值对等。
下面是一个使用context防止协程泄露的例子。不使用context,创建了goroutine之后没有办法取消,在程序退出之前,会一直打印”in go loop”。

// 协程泄露的例子
func TestRoutine1(t *testing.T) {
// 创建了go routine,没有办法取消
go func() {
for {
fmt.Print("in go loop\n")
}
}()
time.Sleep(2 * time.Second)
}

为了实现对子协程的控制,一般在goroutine中添加对context信号的等待。

func TestRoutine2(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
go func(ctx context.Context) {
for {
select {
case <-ctx.Done():
fmt.Print("context control\n")
return
default:
fmt.Print("in go loop\n")
time.Sleep(500 * time.Millisecond)
}
}
}(ctx)
time.Sleep(2 * time.Second)
cancel()
}

Context 内部结构

context包里面,有接口Context,对应四个实现emptyCtx, cancelCtx, timerCtx,valueCtx

Context接口

type Context interface {
Deadline() (deadline time.Time, ok bool)

Done() <-chan struct{}

Err() error

Value(key interface{}) interface{}
}
  • Deadline:获取到期时间。如果没有到期时间,ok返回false。
  • Done:返回一个channel,表示取消的信号。如果通道关闭则代表该 Context 已经被取消;如果返回的为 nil,则代表该 Context 是一个永远不会被取消的 Context。
  • Err:返回该 Context 被取消的原因。如果只使用 Context 包的 Context 类型的话,那么只可能返回 Canceled (代表被明确取消)或者 DeadlineExceeded (因超时而取消)。
  • Value:获取Context中的键值对。

emptyCtx

emptyCtxint类型的重新定义,它不是空的struct{}是因为每个这种类型的变量需要用不同的内存地址。
emptyCtx没有过期时间,不能被取消。

// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
return
}

func (*emptyCtx) Done() <-chan struct{} {
return nil
}

func (*emptyCtx) Err() error {
return nil
}

func (*emptyCtx) Value(key interface{}) interface{} {
return nil
}

emptyCtx的作用是作为Context树的根节点。

var (
background = new(emptyCtx)
todo = new(emptyCtx)
)

// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
return todo
}

cancelCtx

cancelFunc

type CancelFunc func()

调用函数WithCancel,WithTimeout,WithDeadline都会返回新的子ContextCancelFunc类型的函数。
CancelFunc用于取消的操作。

cancenler接口

// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
cancel(removeFromParent bool, err error)
Done() <-chan struct{}
}

可以被取消的context都实现了这个接口。

cancelCtx内部结构

type cancelCtx struct {
Context

mu sync.Mutex // protects following fields
done chan struct{} // created lazily, closed by first cancel call
children map[canceler]struct{} // set to nil by the first cancel call
err error // set to non-nil by the first cancel call
}

func (c *cancelCtx) Done() <-chan struct{} {
c.mu.Lock()
if c.done == nil {
c.done = make(chan struct{})
}
d := c.done
c.mu.Unlock()
return d
}

cancelCtx每个字段作用:

  • 包含了一个Context类型的值,存储了当前cancelCtx的父Context的指针。
  • done作为取消信号的channel,子协程监听该通道了解到是否需要取消任务。
  • children存储了当前Context衍生的所有可取消类型的子Context
  • err 会被第一次取消的时候设置

cancelCtx实现了canceler接口。
下面看一下cancel(removeFromParent bool, err error)方法。

// cancel closes c.done, cancels each of c's children, and, if
// removeFromParent is true, removes c from its parent's children.
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
if err == nil {
panic("context: internal error: missing cancel error")
}
c.mu.Lock()
if c.err != nil {
c.mu.Unlock()
return // already canceled
}
c.err = err
if c.done == nil {
c.done = closedchan
} else {
close(c.done)
}
for child := range c.children {
// NOTE: acquiring the child's lock while holding parent's lock.
child.cancel(false, err)
}
c.children = nil
c.mu.Unlock()

if removeFromParent {
removeChild(c.Context, c)
}
}
  • 如果cancelCtx(父协程)被调用cancel方法,cancel会调用childeren里面每个子Context(子协程)的cancel方法。

  • 如果当前的cancelCtx是一个子Context,它被取消了,其父 Context 的children中也就没有必要再存储该子Context了。这通过调用了removeChild来实现:根据存储的父 Context 向上一层层的找(由 parentCancelCtx 实现),如果父Context是已知的 cancelCtxtimerCtx类型就从children中删除,如果是valueCtx类型则继续向上层查找其父Context

    // parentCancelCtx returns the underlying *cancelCtx for parent.
    // It does this by looking up parent.Value(&cancelCtxKey) to find
    // the innermost enclosing *cancelCtx and then checking whether
    // parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
    // has been wrapped in a custom implementation providing a
    // different done channel, in which case we should not bypass it.)
    func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    done := parent.Done()
    if done == closedchan || done == nil {
    return nil, false
    }
    p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    if !ok {
    return nil, false
    }
    p.mu.Lock()
    ok = p.done == done
    p.mu.Unlock()
    if !ok {
    return nil, false
    }
    return p, true
    }

    // removeChild removes a context from its parent.
    func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent)
    if !ok {
    return
    }
    p.mu.Lock()
    if p.children != nil {
    delete(p.children, child)
    }
    p.mu.Unlock()
    }

WithCancel方法

WithCancel帮助创建一个子cancelCtx,并保证父Context取消时该新建的子cancelCtx也能被通知取消。

  • propagateCancel 根据传入的父Context值沿着树向上查找到cancelCtx类型的节点,将新建的子cancelCtx加入到该节点的 children中。如果发现父 Context 已经取消了,那么会立刻将当前新产生的子 Context 也取消掉。

  • 如果找不到cancelCtx类型的节点的话,那么就要新启一个协程等待父Context被取消的时候明确调用新产生的子 cancelCtx的取消函数,从而将parent和子cancelCtx组织成一树形结构。

// WithCancel returns a copy of parent with a new Done channel. The returned
// context's Done channel is closed when the returned cancel function is called
// or when the parent context's Done channel is closed, whichever happens first.
//
// Canceling this context releases resources associated with it, so code should
// call cancel as soon as the operations running in this Context complete.
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
c := newCancelCtx(parent)
propagateCancel(parent, &c)
return &c, func() { c.cancel(true, Canceled) }
}

// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
done := parent.Done()
if done == nil {
return // parent is never canceled
}

select {
case <-done:
// parent is already canceled
child.cancel(false, parent.Err())
return
default:
}

if p, ok := parentCancelCtx(parent); ok {
p.mu.Lock()
// 找到父 cancelCtx
if p.err != nil {
// parent has already been canceled
child.cancel(false, p.err)
} else {
if p.children == nil {
p.children = make(map[canceler]struct{})
}
p.children[child] = struct{}{}
}
p.mu.Unlock()
} else {
atomic.AddInt32(&goroutines, +1)
// 找不到父 cancelCtx,新建goroutine去等待
go func() {
select {
case <-parent.Done():
child.cancel(false, parent.Err())
case <-child.Done():
}
}()
}
}

Context树:
context_tree

timerCtx

timerCtx是超时自动取消的Context,内部使用cancelCtx实现取消功能,它增加了定时器Timer,定时调用cancle函数实现该功能。
但是如果其父 Context 也有超时过期的取消功能,且父 Context 的超时时间点在传入的时间点之前,那么就没有必要再使用 timerCtx 生成子 Context 了,使用 WithCancel 就可以了。

// A timerCtx carries a timer and a deadline. It embeds a cancelCtx to
// implement Done and Err. It implements cancel by stopping its timer then
// delegating to cancelCtx.cancel.
type timerCtx struct {
cancelCtx
timer *time.Timer // Under cancelCtx.mu.

deadline time.Time
}

func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
return c.deadline, true
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
c.cancelCtx.cancel(false, err)
if removeFromParent {
// Remove this timerCtx from its parent cancelCtx's children.
removeChild(c.cancelCtx.Context, c)
}
c.mu.Lock()
if c.timer != nil {
c.timer.Stop()
c.timer = nil
}
c.mu.Unlock()
}

下面是WithTimeOutWithCancel函数。

// WithTimeout returns WithDeadline(parent, time.Now().Add(timeout)).
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
return WithDeadline(parent, time.Now().Add(timeout))
}

// WithDeadline returns a copy of the parent context with the deadline adjusted
// to be no later than d. If the parent's deadline is already earlier than d,
// WithDeadline(parent, d) is semantically equivalent to parent. The returned
// context's Done channel is closed when the deadline expires, when the returned
// cancel function is called, or when the parent context's Done channel is
// closed, whichever happens first.
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
if parent == nil {
panic("cannot create context from nil parent")
}
if cur, ok := parent.Deadline(); ok && cur.Before(d) {
// The current deadline is already sooner than the new one.
return WithCancel(parent)
}
c := &timerCtx{
cancelCtx: newCancelCtx(parent),
deadline: d,
}
propagateCancel(parent, c)
dur := time.Until(d)
if dur <= 0 {
c.cancel(true, DeadlineExceeded) // deadline has already passed
return c, func() { c.cancel(false, Canceled) }
}
c.mu.Lock()
defer c.mu.Unlock()
if c.err == nil {
c.timer = time.AfterFunc(dur, func() {
c.cancel(true, DeadlineExceeded)
})
}
return c, func() { c.cancel(true, Canceled) }
}

valueCtx

valueCtx内部仍然使用Context存储父Context的指针,并用interface{}存储键值。
如果当前valueCtx找不到需要的key,会沿着树向上一直查找直到根节点,类似链表的搜索。

// A valueCtx carries a key-value pair. It implements Value for that key and
// delegates all other calls to the embedded Context.
type valueCtx struct {
Context
key, val interface{}
}

func (c *valueCtx) Value(key interface{}) interface{} {
if c.key == key {
return c.val
}
return c.Context.Value(key)
}

使用WithValue创建时,会判断key是否实现Comparable接口。如果没有实现,会触发panic

// Use context Values only for request-scoped data that transits processes and
// APIs, not for passing optional parameters to functions.
//
// The provided key must be comparable and should not be of type
// string or any other built-in type to avoid collisions between
// packages using context. Users of WithValue should define their own
// types for keys. To avoid allocating when assigning to an
// interface{}, context keys often have concrete type
// struct{}. Alternatively, exported context key variables' static
// type should be a pointer or interface.
func WithValue(parent Context, key, val interface{}) Context {
if parent == nil {
panic("cannot create context from nil parent")
}
if key == nil {
panic("nil key")
}
if !reflectlite.TypeOf(key).Comparable() {
panic("key is not comparable")
}
return &valueCtx{parent, key, val}
}

注释里写了,key的类类型不应该是内置类型,以避免冲突。使用的时候应该自定义类型:

func main() {
ProcessRequest("jane","abc123")
}

type ctxKey int

const (
ctxUserID ctxKey = iota
ctxAuthToken
)

func UserID(c context.Context)string{
return c.Value(ctxUserID).(string)
}

func AuthToken(c context.Context)string{
return c.Value(ctxAuthToken).(string)
}

func ProcessRequest(userID,authToken string){
ctx := context.WithValue(context.Background(),ctxUserID,userID)
ctx = context.WithValue(ctx,ctxAuthToken,authToken)
HandleRequest(ctx)
}

func HandleRequest(ctx context.Context){
fmt.Printf("handling response for %v (%v)",
UserID(ctx),
AuthToken(ctx),
)
}

我的公众号:lyp分享的地方

我的知乎专栏: https://zhuanlan.zhihu.com/c_1275466546035740672

我的博客:www.liangyaopei.com
Github Page: https://liangyaopei.github.io/


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK