3

Go Context 原理详解 - 木的树

 2 years ago
source link: https://www.cnblogs.com/dojo-lzz/p/16214261.html
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

实现一个小目标

很开心的一件事,学习了一个月的后端拿到一个13k的offer,今年年底目标拿到一个30k的go方向offer。
0
好了回归正文,这篇文章是回答交流时一个老哥的问题,跟go的context相关内容,上一篇(https://www.cnblogs.com/dojo-lzz/p/16183006.html)讲了一些基础知识,这一篇继续在并发处理上进行研究。主要是Go Context的使用、原理。因为时间和精力有限,所以文章中大量引用相关资料中的内容以及图片,再此致敬。

Go Context

React中Context主要用来跨组件传递一些数据,Go中Context其中一个作用也跟传递数据有关,不过是在goroutine中相互传递数据;Context的另一个作用在于可以便捷关闭被创建出来的goroutine。
在实际中当服务器端收到一个请求时,很可能需要发送几个请求去请求其他服务的数据,由于Go 语法上的同步阻塞写法,我们一般会创建几个goroutine并发去做一些事情;那么这时候很可能几个goroutine之间需要共享数据,还有当request被取消时,创建的几个goroutine也应该被取消掉。那么这就是Go Context的用武之地。
关于协程泄露:
一般main函数是主协程,主协程执行完毕后子协程也会被销毁;但是对于服务来说,主协程不会执行完毕就退出。
所以如果每个请求都自己创建协程,而协程有没有受到完毕信息结束信息,可能处于阻塞状态,这种情况下才会产生协程泄露
context包中核心是Context接口:
type Context interface {
    Deadline() (deadline time.Time, ok bool)
    Done() <-chan struct{}
    Err() error
    Value(key interface{}) interface{}
}
  • Deadline 方法返回当前Context被取消的时间,也就是完成工作的截止时间(deadline);
  • Done方法需要返回一个channel,这个Channel会在当前工作完成或者上下文被取消之后关闭,可以在子goroutine中利用select进行监控,来回收子goroutine;多次调用Done方法会返回同一个Channel;
// Done is provided for use in select statements:
//  // Stream generates values with DoSomething and sends them to out
//  // until DoSomething returns an error or ctx.Done is closed.
//  func Stream(ctx context.Context, out chan<- Value) error {
//      for {
//          v, err := DoSomething(ctx)
//          if err != nil {
//              return err
//          }
//          select {
//          case <-ctx.Done():
//              return ctx.Err()
//          case out <- v:
//          }
//      }
//  }
// See https://blog.golang.org/pipelines for more examples of how to use
// a Done channel for cancellation.
  • Err方法会返回当前Context结束的原因,它只会在Done返回的Channel被关闭时才会返回空值:
    • 如果当前Context被取消就会返回Canceled错误;
    • 如果当前Context超时就会返回DeadlineExceeded错误;
  • Value 方法会从Context中返回键对应的值,对于同一个上下文来说,多次调用Value并传入相同的Key会返回相同的结果,该方法仅用于传递跨API和进程间跟请求域的数据。
//     // Package user defines a User type that's stored in Contexts.
//     package user
//     import "context"
//     // User is the type of value stored in the Contexts.
//     type User struct {...}
//
//     // key is an unexported type for keys defined in this package.
//     // This prevents collisions with keys defined in other packages.
//     type key int
//     // userKey is the key for user.User values in Contexts. It is
//     // unexported; clients use user.NewContext and user.FromContext
//     // instead of using this key directly.
//     var userKey key

//     // NewContext returns a new Context that carries value u.
//     func NewContext(ctx context.Context, u *User) context.Context {
//         return context.WithValue(ctx, userKey, u)
//     }
//     // FromContext returns the User value stored in ctx, if any.
//     func FromContext(ctx context.Context) (*User, bool) {
//         u, ok := ctx.Value(userKey).(*User)
//         return u, ok
//     }
ctx.Value(userKey).(*User)这里是Go语言中的类型断言(http://c.biancheng.net/view/4281.html
value, ok := x.(T)
x 表示一个接口的类型,T 表示一个具体的类型(也可为接口类型)
该断言表达式会返回 x 的值(也就是 value)和一个布尔值(也就是 ok),可根据该布尔值判断 x 是否为 T 类型:

如果 T 是具体某个类型,类型断言会检查 x 的动态类型是否等于具体类型 T。如果检查成功,类型断言返回的结果是 x 的动态值,其类型是 T。
如果 T 是接口类型,类型断言会检查 x 的动态类型是否满足 T。如果检查成功,x 的动态值不会被提取,返回值是一个类型为 T 的接口值。
无论 T 是什么类型,如果 x 是 nil 接口值,类型断言都会失败。
在context包中Context一个接口有四个具体实现和六个函数:
0

emptyCtx

emptyCtx本质是一个整型类型,他对Context接口的实现,非常简单,其实是什么也没做,都是一堆空方法:
// An emptyCtx is never canceled, has no values, and has no deadline. It is not
// struct{}, since vars of this type must have distinct addresses.
type emptyCtx int

func (*emptyCtx) Deadline() (deadline time.Time, ok bool) {
    return
}

func (*emptyCtx) Done() <-chan struct{} {
    return nil
}

func (*emptyCtx) Err() error {
    return nil
}

func (*emptyCtx) Value(key any) any {
    return nil
}

func (e *emptyCtx) String() string {
    switch e {
    case background:
        return "context.Background"
    case todo:
        return "context.TODO"
    }
    return "unknown empty Context"
}

var (
    background = new(emptyCtx)
    todo       = new(emptyCtx)
)
这里的String方法挺有意思,因为下面中可以看到background和todo都是一个emptyContext所以,这里直接case进行对比background和todo;
// Background returns a non-nil, empty Context. It is never canceled, has no
// values, and has no deadline. It is typically used by the main function,
// initialization, and tests, and as the top-level Context for incoming
// requests.
func Background() Context {
    return background
}

// TODO returns a non-nil, empty Context. Code should use context.TODO when
// it's unclear which Context to use or it is not yet available (because the
// surrounding function has not yet been extended to accept a Context
// parameter).
func TODO() Context {
    return todo
}

cancelCtx

通过WithCancel来创建的就是cancelCtx,WithCancel返回一个ctx和cancel方法,通过调用cancel方法,可以将Context取消,来控制协程,具体看下面例子:
在这个例子中,通过defer调用cancel,在FixLeakingByContext函数结束时去掉context,在CancelByContext中配合select和context的done方式来使用,可以避免协程资源没有被回收引起的内存泄露。
func FixLeakingByContex() {
    //创建上下文用于管理子协程
    ctx, cancel := context.WithCancel(context.Background())
    
    //结束前清理未结束协程
    defer cancel()
    
    ch := make(chan int)
    go CancelByContext(ctx, ch)
    go CancelByContext(ctx, ch)
    go CancelByContext(ctx, ch)
    
    // 随机触发某个子协程退出
    ch <- 1
}

func CancelByContext(ctx context.Context, ch chan (int)) int {
    select {
    case <-ctx.Done():
        //fmt.Println("cancel by ctx.")
        return 0
    case n := <-ch :
        return n
    }
}
看下WithCancel的源码:
func WithCancel(parent Context) (ctx Context, cancel CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
         // WithCancel通过一个父级Context来创建出一个cancelCtx
    c := newCancelCtx(parent)
         // 调用propagateCancel根据父级context的状态来关联cancelCtx的cancel行为
    propagateCancel(parent, &c)
         // 返回c和一个方法,方法中调用c.cancel并传递Canceled变量
    return &c, func() { c.cancel(true, Canceled) }
}

func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}
var Canceled = errors.New("context canceled")
WithCancel通过一个父级Context来创建出一个cancelCtx,然后调用propagateCancel根据父级context的状态来关联cancelCtx的cancel行为(感觉这里不应该叫propagate,冒泡一般理解是自下向上,这个函数明显是自下向上,应该叫cascade更为合理一些)。随后返回c和一个方法,方法中调用c.cancel并传递Canceled变量(其实是一个error实例);
cancelCtx是WidthDeadline和WidthTimeout的基石,所以cancelCtx的实现相对复杂,我们重点讲解。
newCancelCtx方法可以看到是创建了一个cancelCtx实例
func newCancelCtx(parent Context) cancelCtx {
    return cancelCtx{Context: parent}
}
我们也看下cancelCtx的定义:
// A cancelCtx can be canceled. When canceled, it also cancels any children
// that implement canceler.
type cancelCtx struct {
    Context // 内嵌结构体

    mu       sync.Mutex            // protects following fields
    done     atomic.Value          // of chan struct{}, created lazily, closed by first cancel call
    children map[canceler]struct{} // set to nil by the first cancel call
    err      error                 // set to non-nil by the first cancel call
}
cancelCtx有一个内嵌的Context类型,实际存储的都是父级上下文对象,还有四个独立的字段:
  • mu:一个互斥量,用来加锁保证某些操作的线程安全性
  • done:atomic.Value一个可以对任意类型进行原子型操作的结构;提供Load和Store方法;看Go源码这里存的是一个struct{}类型的channel
  • children:一个key为canceler值为struct{}的map类型;
  • err:存放error的字段
这里的cancelder是一个接口,代表可以直接被cancel的Context类型,基本指的是 *cancelCtx和 *timerCtx两种context,也被他俩实现
// A canceler is a context type that can be canceled directly. The
// implementations are *cancelCtx and *timerCtx.
type canceler interface {
    cancel(removeFromParent bool, err error)
    Done() <-chan struct{}
}
下面看下propagateCancel,据父级context的状态来关联cancelCtx的cancel行为
// propagateCancel arranges for child to be canceled when parent is.
func propagateCancel(parent Context, child canceler) {
    // 如果父元素的Done方法返回为空,也就是说父context是emptyCtx
    // 直接返回,因为父上下文不会做任何处理
    done := parent.Done()
    if done == nil {
        return // parent is never canceled
    }
    // 如果父上下文不是emptyCtx类型,使用select来判断一下父上下文的done channel是不是已经被关闭掉了
    // 关闭则调用child的cancel方法
    // select其实会阻塞,但这里给了一个default方法,所以如果父上下文的done channel没有被关闭则继续之心后续代码
    // 这里相当于利用了select的阻塞性来做if-else判断
    select {
    case <-done:
        // parent is already canceled
        child.cancel(false, parent.Err())
        return
    default:
    }

    // parentCancelCtx目的在于寻找父上下文中最底层的cancelCtx,因为像timerCtx等会内嵌cancelCtx
    if p, ok := parentCancelCtx(parent); ok {
        // 如果找的到,就把最内层的cancelCtx跟child的设置好关联关系
        // 这里要考虑到多线程环境,所以是加锁处理
        p.mu.Lock()
        if p.err != nil {
            // 如果祖先cancelCtx已经被取消了,那么也调用child的cancel方法
            // parent has already been canceled
            child.cancel(false, p.err)
        } else {
             // 这里设置内层cancelCtx与child的父子层级关系
            if p.children == nil {
                p.children = make(map[canceler]struct{})
            }
            p.children[child] = struct{}{}
        }
        p.mu.Unlock()
    } else {
        // 这里代表没有找到祖先cancelCtx,单启了一个协程来进行监听(因为select是阻塞的),如果父上下文的done 关闭了,则子上下文取消
        
        // goroutines在别的地方代码中没有使用,不知道为什么要做增加操作,看源码英文解释也是为了测试使用
                // 单独的协程会在阻塞完毕后被GC回收,不会有泄露风险                  
        atomic.AddInt32(&goroutines, +1)
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
}
里面调用了一个parentCancelCtx函数,这个函数比较晦涩,市面上资料也还没有人去仔细研究,这里我来讲解一下;
这个函数中最重要的就是12行,通过cancelCtxKey获取最近的内嵌cancelCtx;然后让在propagateCancel中设置内嵌cancelCtx与child的关联关系;
同时这个函数也考虑了几种情况,如果parent的done已经是closedchan或者是nil那么没必要去拿内层的cancelCtx来建立层级关系,直接用parent本身与child做好关联cancel即可。这是9-11行代码干的事。
16行-19行,看源码解释是如果这个内嵌cancelCtx可能加了一些自定义方法,比如复写了Done或者cancel,那么它就不是这里的timerCtx、cancelCtx或者valueCtx,这种情况下用户自己负责处理;放到propagateCancel这个函数中就是把parent和child直接关联起来,不建立层级关系。及时子child自己cancel也不去跟parent的children有什么关联。
// parentCancelCtx returns the underlying *cancelCtx for parent.
// It does this by looking up parent.Value(&cancelCtxKey) to find
// the innermost enclosing *cancelCtx and then checking whether
// parent.Done() matches that *cancelCtx. (If not, the *cancelCtx
// has been wrapped in a custom implementation providing a
// different done channel, in which case we should not bypass it.)
func parentCancelCtx(parent Context) (*cancelCtx, bool) {
    done := parent.Done()
    if done == closedchan || done == nil {
        return nil, false
    }
    p, ok := parent.Value(&cancelCtxKey).(*cancelCtx)
    if !ok {
        return nil, false
    }
    pdone, _ := p.done.Load().(chan struct{})
    if pdone != done {
        return nil, false
    }
    return p, true
}
那么这里就有了一个问题,propagateCancel函数中一定建立parent和child的children关系么?我理解是不用的,因为这个else部分代码我理解完全可以实现父级上下文结束后,child也进行取消;我猜这里尽量建立children的map关系,是如果不这么做就要起一个goroutine来处理,相当于一个监护线程,goroutine资源的消耗以及调度成本,比单纯的children层级关系更大,所以这里尽力使用map结构来建立层级关系。这也可以看到作者在写代码时候还是很花心思去考量各种情况的。
    } else {
        // 这里代表没有找到祖先cancelCtx,单启了一个协程来进行监听(因为select是阻塞的),如果父上下文的done 关闭了,则子上下文取消
        
        // goroutines在别的地方代码中没有使用,不知道为什么要做增加操作,看源码英文解释也是为了测试使用
                // 单独的协程会在阻塞完毕后被GC回收,不会有泄露风险                  
        atomic.AddInt32(&goroutines, +1)
        go func() {
            select {
            case <-parent.Done():
                child.cancel(false, parent.Err())
            case <-child.Done():
            }
        }()
    }
接下来看下cancelCtx中Value、Done、Err以及私有方法cancel的实现;
Value方法
func (c *cancelCtx) Value(key any) any {
    if key == &cancelCtxKey {
        return c
    }
    return value(c.Context, key)
}
首先要介绍下cancelCtxKey,这是一个context包中的私有变量,当对cancelCtx调用Value方法并用这个key作为参数时,返回cancelCtx本身;
如果没有找到则是调用的context包中的私有方法value,来在父级上下文中key对应的值;
这个方法首先进行类型断言,判断Context是否是valueCtx、cancelCtx、timerCtx以及emptyCtx等;根据不同的类型做不同处理,比如cancelCtx和timerCtx先进行cancelCtxKey判断,emptyCtx直接返回nil,valueCtx则判断是否是自己实例化时候传入的key,否则就去自己的内层context也就是parent层级上冒泡获取对应的值。
func value(c Context, key any) any {
    for {
        switch ctx := c.(type) {
        case *valueCtx:
            if key == ctx.key {
                return ctx.val
            }
            c = ctx.Context
        case *cancelCtx:
            if key == &cancelCtxKey {
                return c
            }
            c = ctx.Context
        case *timerCtx:
            if key == &cancelCtxKey {
                return &ctx.cancelCtx
            }
            c = ctx.Context
        case *emptyCtx:
            return nil
        default:
            return c.Value(key)
        }
    }
}
Done方法
func (c *cancelCtx) Done() <-chan struct{} {
        // 返回atomic.Value中存储的值
    d := c.done.Load()
    if d != nil {
                 // atomic.Value类型的Load方法返回的是ifaceWords类型,所以这里是利用了类型断言
                 // 把ifaceWords类型转换为 struct类型的chan
        return d.(chan struct{})
    }
         // 这里是并发场景要考虑的问题,因为会存在多个线程并发进行的过程,所以不一定哪个goroutine就对c.done进行了修改
         // 所以这里不能直接像单线程一样,if d!=nil else。。。;首先得抢锁。
    c.mu.Lock()
    defer c.mu.Unlock()
    d = c.done.Load()
         // 上面抢锁的过程可能抢到了,也可能没抢到,所以到这里是抢到了锁,但是c.done未必还是nil;
         // 所以这里要再次做判断
    if d == nil {
        d = make(chan struct{})
        c.done.Store(d)
    }
    return d.(chan struct{})
}
看到上面锁的过程,发现并发情况的处理要比js这种单线程考虑的多得多。并发对一个变量的处理不能简单的if-else;要结合锁、CAS、原子操作一起考虑(对于atomic.Value中的ifaceWords的部分可以看这篇文章:https://www.cnblogs.com/dojo-lzz/p/16183006.html中原子操作部分)。
Err方法
func (c *cancelCtx) Err() error {
    c.mu.Lock()
    err := c.err
    c.mu.Unlock()
    return err
}
这个方法比较简单只是获取了cancelCtx的err属性,这个属性在cancel中会会被设置。
cancel方法
func (c *cancelCtx) cancel(removeFromParent bool, err error) {
    if err == nil {
        panic("context: internal error: missing cancel error")
    }
         // 因为后面要对c.err和c.done进行更新,所以这里要抢锁
    c.mu.Lock()
    if c.err != nil {
                 // if这部分放到锁的外部是否可以?看起来是可以的,但是如果放到外面,if判断不通过此时c.err为nil
                 // 接着进行抢锁,那么在抢到锁之后仍然要对c.err判断是否还是nil,才能进行更新
                 // 因为在抢锁过程中,可能c.err已经被某个协程修改了
                 // 所以把这部分放到锁之后是合理的。
        c.mu.Unlock()
        return // already canceled
    }
    c.err = err // 赋值
    d, _ := c.done.Load().(chan struct{})
 // 读取done的值
    if d == nil { 
                 // 如果done为nil,就把一个内部的closedchan存入c.done中;
                 // closedchan是一个channel类型,在context包的init函数中就会把它close掉
        c.done.Store(closedchan)
    } else {
        close(d)
    }         
         // 遍历c的children调用他们的cancel;
    for child := range c.children {
        // NOTE: acquiring the child's lock while holding parent's lock.
        child.cancel(false, err)
    }
    c.children = nil
    c.mu.Unlock()
         // 这部分没有在锁的代码中,是因为函数中会自己加锁?

    if removeFromParent {
        removeChild(c.Context, c)
    }
}
代码最后调用removeChild方法,这部分为什么没在c.mu锁中,我猜是因为这个函数的代码自己会进行锁的处理。
// removeChild removes a context from its parent.
func removeChild(parent Context, child canceler) {
    p, ok := parentCancelCtx(parent)
    if !ok {
        return
    }
    p.mu.Lock()
    if p.children != nil {
        delete(p.children, child)
    }
    p.mu.Unlock()
}
可以看到代码中的锁部分,是在第7行开始的,那么为什么parentCancelCtx没有被包含在锁中,这里猜测下,因为parentCancelCtx的主要目的是为了获取父级上下文内层的cancelCtx,而这个值是在实例化时候就已经确定的,这里只是读取所以可以不用放在互斥锁的临界区代码中,避免性能浪费。
接下来就是p.mu来抢锁,完成对层级结构的接触。

timerCtx

WithTimeout和WithDeadline创建的都是timerCtx,timerCtx内部内嵌了cancelCtx;
type timerCtx struct {
    cancelCtx
    timer *time.Timer // Under cancelCtx.mu.

    deadline time.Time
}
因为内嵌了cancelCtx,而cancelCtx实现了Done、Value、Err以及cancel(私有)方法,所以timerCtx上也可以直接调用这几个方法(http://c.biancheng.net/view/72.html);cancelCtx并未实现Deadline方法,但是emptyCtx实现了,如果他的父级上下文是emptyCtx那么cancelCtx也可以调用Deadline方法。
看完cancelCtx的方法之后,对比起来timerCtx的方法都比较简单,不做过多解释
func (c *timerCtx) Deadline() (deadline time.Time, ok bool) {
    return c.deadline, true
}

func (c *timerCtx) String() string {
    return contextName(c.cancelCtx.Context) + ".WithDeadline(" +
        c.deadline.String() + " [" +
        time.Until(c.deadline).String() + "])"
}

func (c *timerCtx) cancel(removeFromParent bool, err error) {
    c.cancelCtx.cancel(false, err)
    if removeFromParent {
        // Remove this timerCtx from its parent cancelCtx's children.
        removeChild(c.cancelCtx.Context, c)
    }
    c.mu.Lock()
    if c.timer != nil {
        c.timer.Stop()
        c.timer = nil
    }
    c.mu.Unlock()
}
可以看到cancel方法中先调用了内嵌的cancelCtx的cancel方法;然后利用cancelCtx的互斥锁抢锁来对c.timer进行操作修改;cancel方法第13-16行需要注意,因为withDeadline在创建时把parent和timerCtx建立了层级关系,所以这里根据条件进行移除操作。
下面来看下withDeadline函数:
func WithDeadline(parent Context, d time.Time) (Context, CancelFunc) {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
         // 如果parent的deadline小于当前时间,直接创建cancelCtx,里面会调用propagateCancel方法
         // 来根据父上下文状态进行处理
    if cur, ok := parent.Deadline(); ok && cur.Before(d) {
        // The current deadline is already sooner than the new one.
        return WithCancel(parent)
    }
         // 创建timerCtx,这里可以看到cancelCtx是私有变量,而cancelCtx中的Context字段是公有变量
    c := &timerCtx{
        cancelCtx: newCancelCtx(parent),
        deadline:  d,
    }
         // 设置层级取消关联
    propagateCancel(parent, c)
    dur := time.Until(d)
         // 如果已经超时直接取消
    if dur <= 0 {
        c.cancel(true, DeadlineExceeded) // deadline has already passed
        return c, func() { c.cancel(false, Canceled) }
    }
    c.mu.Lock()
    defer c.mu.Unlock()
         // 如果没有超时并且没有被调用过cancel,那么设置timer,超时则调用cancel方法;         
    if c.err == nil {
        c.timer = time.AfterFunc(dur, func() {
            c.cancel(true, DeadlineExceeded)
        })
    }
    return c, func() { c.cancel(true, Canceled) }
}
了解上面内容之后,WithTimeout就很简单了,只是调用了WidthDeadline方法
func WithTimeout(parent Context, timeout time.Duration) (Context, CancelFunc) {
    return WithDeadline(parent, time.Now().Add(timeout))
}

valeCtx

这个结构体相对简单,有一个Context公共变量,一个任意类型的key和任意类型的any:
type valueCtx struct {
    Context
    key, val any
}
withValue方法也比较简单,这里就不做过多介绍
func WithValue(parent Context, key, val any) Context {
    if parent == nil {
        panic("cannot create context from nil parent")
    }
    if key == nil {
        panic("nil key")
    }
    if !reflectlite.TypeOf(key).Comparable() {
        panic("key is not comparable")
    }
    return &valueCtx{parent, key, val}
}
还有一个Value方法:
如果key与WithValue调用时相同,则返回对应的val,否则进入value方法,在内嵌的Context中查找key对应的值,这个方法上面介绍过,根据Context类型先做一些类型判断,来判断一些关键的key如cancelCtxKey,不然继续在内嵌Context中查找。
func (c *valueCtx) Value(key any) any {
    if c.key == key {
        return c.val
    }
    return value(c.Context, key)
}
本文大量引用了相关参考资料的图片和语言。版权问题请与我联系,侵删。

About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK