5

【2-3 Golang】Go并发编程—调度器schedule

 1 year ago
source link: https://studygolang.com/articles/35882
Go to the source link to view the article. You can view the picture content, updated content and better typesetting reading experience. If the link is broken, please click the button below to view the snapshot at that time.
neoserver,ios ssh client

  我们一直提到,每一个线程都有一个线程栈,也称为系统栈;协程g0就运行在这个栈上,而且协程g0执行的就是调度逻辑schedule。Go语言调度器是如何管理以及调度这些成千上万个协程呢?和操作系统一样,维护着可运行队列和阻塞队列吗,有没有所谓的按照时间片或者是优先级或者是抢占式调度呢?

调度器schedule

  我们已经知道每一个P都有一个协程队列runq,该队列存储的都是处于可运行状态的协程,调度器一般情况下只需要从当前p.runq获取协程即可;另外,Go语言为了避免多个P负载分配不均衡,还有一个全局队列sched.runq,如果当前p.runq队列为空,也会从全局队列sched.runq尝试获取协程;如果还为获取不到可执行协程,甚至会从其他P的队列去偷。

  当然,无论是p.runq,还是全局的sched.runq,存储的都是处于可运行状态的协程;那处于阻塞状态的协程呢,这些协程在调度器执行的时候,还处于阻塞状态码?不知道,所以在获取不到可执行协程时,还会尝试去看一下有没有协程解除阻塞了,如果有则还可以调度执行这些协程。

  调度器schedule看着比较简单,获取可运行协程,通过execute调度执行该协程:

func schedule() {
    // schedtick调度计数器,没调度以此加1
    // 调度器周期61次,首先从全局队列获取可运行协程
    if gp == nil {
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        gp, inheritTime = runqget(_g_.m.p.ptr())
    }
    if gp == nil {
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    //调度执行
    execute(gp, inheritTime)
}

  按照之前我们说的,先查找当前P的协程队列,再查找全局队列,但是这样可能会导致全局队列的协程长时间得不到调度,所以Go语言调度器每执行61次,都会优先从全局队列获取可运行协程。注意,在查找全局队列的时候,存在多线程并发问题,所以是需要先加锁的。findrunnable是一个比较复杂的函数,看注释"blocks until work is available",获取不到协程时,甚至会block(当前线程M暂停)。execute当然就是切换栈,执行当前协程了。

func execute(gp *g, inheritTime bool) {
    //设置协程g与M的互相引用关系
    _g_ := getg()
    _g_.m.curg = gp
    gp.m = _g_.m
    //协程状态:运行中
    casgstatus(gp, _Grunnable, _Grunning)

    //协程切换
    gogo(&gp.sched)
}

  gogo我们上一篇文章已经介绍过了,纯汇编代码写的,完成了栈桢的切换,以及代码的跳转。不知道你有没有注意到第二个参数inheritTime,这是什么含义呢?表示这次协程执行是否继承上一个协程的时间片。假如时间片为10ms,上一个协程已经执行了5ms,如果继承,则标明这一个协程最多只能执行5ms,时间片就会结束,从而再次调度其他协程。这么说Go语言调度器是有时间片的概念了?我们先保留一个疑问。

  Go语言什么时候执行调度schedule呢?程序刚启动肯定会执行,而协程因为某些原因阻塞了(chan的读写,socket的读写等等),或者是协程执行结束了,这时候也是需要重新调度其他协程的;协程阻塞通常是通过runtime.gopark函数完成的:

func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
    // 切换到系统栈,执行park_m
    mcall(park_m)
}

func park_m(gp *g) {
    //协程状态:阻塞
    casgstatus(gp, _Grunning, _Gwaiting)

    //重新调度
    schedule()
}

  协程阻塞之后,想恢复协程的调度呢?与gopark对应的,runtime.goready函数用于恢复协程的调度:

func goready(gp *g, traceskip int) {
    systemstack(func() {
        ready(gp, traceskip, true)
    })
}

func ready(gp *g, traceskip int, next bool) {
    //更改状态为可运行;添加到P的协程队列
    casgstatus(gp, _Gwaiting, _Grunnable)
    runqput(_g_.m.p.ptr(), gp, next)
    wakep()
    releasem(mp)
}

协作式抢占调度

  Go语言调度器到底有时间片的概念吗?其实我们可以通过一个小程序测试一下:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    //设置P的数目为1
    runtime.GOMAXPROCS(1)
    go func() {
        fmt.Println("hello world")
        for {
            //死循环
        }

    }()
    //main协程主动让出
    runtime.Gosched()
    fmt.Println("main end")
}

  上一篇文章讲解协程创建的时候提到,go关键字创建协程时,只是将该协程添加到当前P的队列,并没有调度执行;所以,为了避免主协程执行打印语句结束后程序退出,我们可以通过runtime.Gosched函数使得main协程主动让出CPU,这样Go调度器就能先调度执行其他协程了。另外,我们通过runtime.GOMAXPROCS设置P的数目为1,即最多只能有一个线程M绑定P,即最多只能有一个调度器运行。这样,如果Go语言调度器没有时间片的概念,则一旦子协程执行到循环,就会一直执行死循环,导致调度器再也没有机会调度其他协程了;最终的现象就是main协程的打印语句无法执行。

  执行结果怎么样呢?如果你是在Go1.18环境运行该程序,你会发现正常输出了"main end";但是如果你是在Go1.13版本及以下运行该程序,你将发现程序一直执行,没有输出"main end"。你可以下载两个版本的Go试一试,看看结果是不是这样的。Go1.13版本及以下不会输出,是否说明Go1.13版本及以下,没有时间片的概念?其实也不然。你可以再试试下面这个程序:

package main

import (
    "fmt"
    "runtime"
)

func main() {
    runtime.GOMAXPROCS(1)

    go func() {
        fmt.Println("hello world")
        var arr []int
        for i := 0; i < 100; i ++ {
            arr = append(arr, i)
        }
        for {
            test(arr)
        }

    }()
    runtime.Gosched()
    fmt.Println("main end")
}

func test(arr []int) []int {
    diff := make([]int, len(arr), len(arr))
    diff[0] = arr[0]
    for i := 1; i < len(arr); i ++ {
        diff[i] = arr[i] - arr[i - 1]
    }
    return diff
}

  这一次我们的死循环不是简单的空语句,而是函数调用,而且test函数也有一些稍微复杂的语句。Go1.13版本及以下再执行这个程序试试呢?你会发现,神奇的是,主协程又输出了"main end"。为什么呢?唯一不同的是第一个程序的死循环只是简单的空语句,第二个程序的循环是函数调用!

  怎么,函数调用就特殊?是的,函数调用就是不同于普通语句,就是特殊。Go语言在编译函数的时候,还添加了一些自己的代码。还记得上一篇文章,在介绍协程栈溢出时候提到,Go语言编译阶段,在所有用户函数,都加了一点代码逻辑,判断栈顶指针SP小于某个位置时,说明栈空间不足,需要扩容了。需要扩容的时候,执行的是函数runtime.morestack_noctxt,而该函数(其实是runtime.newstack)不仅仅是判断是否需要扩容,还会判断当前协程是否应该让出CPU。

  注意,Go语言并没有严格限制协程执行时间片,而是通过一种协作式抢占调度(1.13版本及以下)的方式,实现伪时间片功能。这就需要一个帮手了,Go程序启动时不止创建普通的调度线程,还存在辅助线程,辅助线程的主函数是runtime.sysmon,每10ms轮询一次,检测是否有协程执行时间过长,如果有,则通知该协程让出CPU。

//创建新线程,主函数sysmon
newm(sysmon, nil)

func sysmon() {
    delay = 10 * 1000   // up to 10ms
    usleep(delay)

    for {
        //preempt long running G's
        retake(nanotime())
    }
}

  preempt的意思是抢占。我们先思考两个问题:

  1)sysmon线程如何判断哪些协程执行时间过长?遍历协程吗?肯定不是这样。想想线程M调度协程流程,要求必须先绑定P,而且M正在调度执行的协程只有一个,所以呢?只需要遍历P,通过p.m.curg就能获取到正在执行的协程。接下来就是检测协程执行时间了,每个协程记录调度时间吗?貌似也行,Go语言为每一个P维护了p.schedtick,M每调度一次协程,该值加1,而且还有一个变量p.schedwhen记录了上次调度的时间。这就好办了,每10毫秒检测的时候,如果p.schedtick没法发生改变,说明这10ms内没有发生调度,则应该通知当前协程p.m.curg让出CPU了。

  2)sysmon线程如何跨线程通知该协程让出CPU呢?还记得协程栈扩容是怎么判断的吗?stackguard0!通知协程让出CPU也是通过在协程栈stackguard0位置设置特殊标识实现的。

  似乎整个流程通顺了,第一步,Go语言编译阶段在test函数添加一些自己的代码,如下:

"".test STEXT
    0x0000 00000 (test.go:26)    CMPQ    SP, 16(R14)
    0x0004 00004 (test.go:26)    PCDATA    $0, $-2
    0x0004 00004 (test.go:26)    JLS    404

    0x0194 00404 (test.go:26)    MOVQ    AX, 8(SP)
    0x0199 00409 (test.go:26)    MOVQ    BX, 16(SP)
    0x019e 00414 (test.go:26)    MOVQ    CX, 24(SP)
    0x01a3 00419 (test.go:26)    CALL    runtime.morestack_noctxt(SB)
    0x01b7 00439 (test.go:26)    JMP    0

  R14寄存器就是当前协程g,想想结构体g的第一个字段stack占16字节,第二个字段就是stackguard0,所以这里比较栈顶SP寄存器与16(R14)地址大小。那明白了,stackguard0位置处设置的特殊标识肯定是一个非常大的值,任何栈位置都小于该值。

  第二步,sysmon线程10ms周期执行retake函数抢占长时间执行的G:

func retake(now int64) uint32 {
    //遍历所有的P
    for i := 0; i < len(allp); i++ {
        _p_ := allp[i]
        s := _p_.status
        if s == _Prunning {
            t := int64(_p_.schedtick)
            //不等于,说明在这10ms期间重新调度协程了;
            if int64(pd.schedtick) != t {
                pd.schedtick = uint32(t)
                pd.schedwhen = now
                continue
            }
            //G长时间运行
            if pd.schedwhen+forcePreemptNS > now {
                continue
            }
            preemptone(_p_)
        }
    }

}

func preemptone(_p_ *p) bool {
    mp := _p_.m.ptr()
    gp := mp.curg
    if gp == nil || gp == mp.g0 {
        return false
    }

    //抢占标识
    gp.preempt = true
    gp.stackguard0 = stackPreempt
    return true
}

const forcePreemptNS = 10 * 1000 * 1000 // 10ms

stackPreempt = (1<<(8*sys.PtrSize) - 1) & -1314 //非常大

  一般情况下,每一个P都有可能有正在执行的协程p.m.curg,所以这里需要遍历所有的P,如果P的状态为_Prunning,说明该P已经被M绑定且正在调度协程。p.schedtick每调度一次协程值加1,所以检测时如果这个值与上次记录不一样,则说明这10ms期间肯定重新调度协程了,跳过即可。否则,如果距上次调度时间已经过去很久了,则通过preemptone抢占,看吧,抢占标识就是通过设置gp.stackguard0实现的。

  第三步,子协程执行10ms之后,进入到函数test,检测stackguard0标识,发现栈顶指针SP小于stackguard0,这时候跳转到了runtime.morestack_noctxt函数。函数morestack_noctxt也是汇编写的,一系列判断之后,最终调用了函数runtime.newstack,就是在这里,判断是否被抢占了,如果是,则让出CPU。

func newstack() {
    gp := getg().m.curg
    preempt := atomic.Loaduintptr(&gp.stackguard0) == stackPreempt
    if preempt {
        gopreempt_m(gp) // never return;抢占
    }
}

func gopreempt_m(gp *g) {
    //修改协程状态
    casgstatus(gp, _Grunning, _Grunnable)
    //添加到全局队列
    lock(&sched.lock)
    globrunqput(gp)
    unlock(&sched.lock)

    //重新调度
    schedule()
}

  newstack就是通过gp.stackguard0判断是否被抢占了。另外,我们发现协程被抢占时,被添加到了全局队列,这样相当于优先级降低了。

  这就是Go1.13版本及以下实现的协作式抢占调度,协程只有在进入函数时,才有可能检测是否被抢占了,所以死循环中只是简单的语句是无法抢占的。而且函数如果非常简单,还有可能被优化掉,所以你测试的时候,可能发现循环中调用了函数,但是运行结果却显示无法被抢占。

基于信号的抢占式调度

  Go1.13版本及以下是基于协作式的抢占调度,所以死循环中是简单的语句,还是复杂的函数调用,最终结果是不一样的。那Go1.14版本以上呢?好像无论哪一种情况,都能被抢占,是做了哪些优化吗?是的,Go1.14版本以上实现的是基于信号的抢占。

  信号?kill -signal pid发送的就是信号,比如我们常用SIGTERM信号终止进程,而Linux总共有64种信号可供选择。当然,程序想要接收并处理某种信号,还需要设置信号处理器:

struct sigaction{
       void (*sa_handler)(int); 
       sigset_t sa_mask; 
       int sa_flags;
       void (*sa_restorer)(void); 
}

  sa_hander就是我们的信号处理器函数指针。Go语言设置的信号处理函数为runtime.sighandler。

  下面看一下Go1.18实现的抢占逻辑,同样是函数preemptone:

func preemptone(_p_ *p) bool {
    //协作式抢占标记
    gp.stackguard0 = stackPreempt

    //如果支持信号抢占,发送信号
    if preemptMSupported {
        preemptM(mp)
    }

    return true
}

func preemptM(mp *m) {
    pthread_kill(pthread(mp.procid), sigPreempt)
}

const sigPreempt = _SIGURG

  函数pthread_kill可用于向指定线程发送信号,选择第几种信号也是有要求的,有很多信号有特殊含义,是不能随便使用的,Go语言选择的协程抢占信号是SIGURG。sysmon线程发送抢占信号,调度线程M就会收到信号,判断收到的是抢占信号,则换出当前协程,重新调度。

func sighandler(sig uint32, info *siginfo, ctxt unsafe.Pointer, gp *g) {
    //抢占信号
    if sig == sigPreempt {
        doSigPreempt(gp, c)
    }
}

  最终其实和协作式抢占一样,都是将当前协程添加到全局队列,触发调度,这里就不在赘述。

  本篇文章主要介绍了Go语言调度器,调度算法由runtime.schedule函数实现,程因为某些原因阻塞了(chan的读写,socket的读写等等),或者是协程执行结束了,都会触发重新调度。另外Go语言还支持抢占调度,辅助协程sysmon检测长时间执行协程,设置抢占标识或者发送抢占信号。Go1.13版本及以下实现的是协作式调度,普通死循环语句没有办法被抢占,只有执行函数调用时(Go编译阶段添加了一些代码),才有可能实现抢占,而Go1.14版本及以上,通过信号实现的抢占调度则没有这个问题。


About Joyk


Aggregate valuable and interesting links.
Joyk means Joy of geeK