Golang---GMP調度策略


摘要:Go 能很好的在用戶空間支持並發模型,這也是 Go 如此火熱的原因,那今天我們來學習 Go 的調度機制。

數據結構

G 結構體

  G 是 goroutine 的縮寫,相當於操作系統中的進程控制塊,在這里就是 goroutine 的控制結構,是對 goroutine 的抽象,下面是 G 的結構(只列出了部分與調度有關的):

//用於保存上下文的 gobuf 結構體
type gobuf struct {
    sp   uintptr  //棧指針,上下文中的 sp 指針
    pc   uintptr  //程序計數器,上下文中的 pc 指針
    g    guintptr //指向當前 g  的指針
    ...
}
//用於表示一個等待鏈表上的 goroutine
type sudog struct {
    g *g  //阻塞列表上的 G

    next *sudog  //雙向鏈表后指針
    prev *sudog  //雙向鏈表前指針
    elem unsafe.Pointer //該 goroutine 的數據指針

    c        *hchan
    ...
}
基礎結構

下面是 G 結構體:

type g struct {
    stack       stack           // offset known to runtime/cgo

    m            *m            // current m; offset known to arm liblink
    sched        gobuf         //進程切換時,利用 sched 來保存上下文
    param        unsafe.Pointer // 用於傳遞參數,睡眠時其它 goroutine 設置 param, 喚醒時此 goroutine 可以獲取到
    goid         int64          //goroutine 的 ID號

    lockedm        muintptr  //G 被鎖定只能在這個 m 上運行
    gopc           uintptr   //創建這個goroutine 的go 表達式的 pc
    waiting        *sudog    //這個 g 當前正在阻塞的 sudog 結構體
}
G struct

M 結構體

  M 是 machine 的縮寫,是對機器的抽象,每個 m 都是對應到一條操作系統的物理線程。M 必須關聯了 P 才可以執行 Go 代碼,但是當它處理阻塞或者系統調用中時,可以不需要關聯 P。

type m struct {
    g0      *g     // 帶有調度棧的 goroutine(默認開啟一個進程的時候會開啟一個線程,又稱主線程(g0))

    mstartfn      func()   //執行函數體的
    curg          *g       //當前運行的 goroutine
    p             puintptr //為了執行 Go 代碼而獲取的 p(如果不需要執行 Go 代碼(syscall...),可為 nil)
    id            int64    //M 的 ID
    locks         int32
    park          note
    alllink       *m   //用於鏈接 allm(一個全局變量)
    schedlink     muintptr
    lockedg       guintptr  //某些情況下,goroutine 鎖定到當前 m, 而不會到切換到其它 m 中去
    createstack   [32]uintptr // stack that created this thread.

    nextwaitm     muintptr    //期望獲取鎖的下一個 m

    syscall   libcall        //存儲系統調用的參數
}
M struct

P 結構體

  P 是 Processor 的縮寫。結構體 P 的加入是為了提高 Go 程序的並發度。一共有 GOMAXPROCS(一般為 CPU 的核數) 個 P, 所有的 P 被組織成一個數組(allp), 在 P 上實現了工作流竊取的調度器。

type p struct {
    id          int32
    status      uint32 //P 狀態 pidle/prunning/...
    link        puintptr
    schedtick   uint32     // 每次執行 goroutine 調度 +1
    syscalltick uint32     // 每次執行系統調用 +1
    sysmontick  sysmontick // last tick observed by sysmon
    m           muintptr   // 鏈接到它的 m (nil if idle)
    mcache      *mcache
    pcache      pageCache

    // Queue of runnable goroutines. Accessed without lock.
    // P 執行 Go 代碼時,優先從自己這個局部隊列中取,這時可以不用加鎖,提高了並發度
    // 如果發現這個隊列是空,則去其它 P 的隊列中拿一半過來,實現工作流竊取的調度,這種情況需要給調度器加鎖
    runqhead uint32  //本地 G 隊列頭
    runqtail uint32  //本地 G 隊列尾
    runq     [256]guintptr  //本地 G 隊列
    runnext guintptr  //下一個准備好運行的 goroutine

    sudogcache []*sudog
    sudogbuf   [128]*sudog
    ...
}
P struct

Schedt 結構體

type schedt struct {
    lock mutex  //獲取調度器的鎖,是全局性的鎖(比如從全局隊列獲取 G, 此時必須要加鎖)
    
    midle        muintptr // 當前閑置的 m
    nmidle       int32    // 閑置的 m 的個數
    nmidlelocked int32    // 被鎖的閑置的 m 的數量
    mnext        int64    // 下一個 M 的 ID
    maxmcount    int32    // 最大允許的 M 的數量
    nmsys        int32    // 系統中除了死鎖剩余的 M 的數量
    nmfreed      int64    // 將要釋放的 m 的數量

    ngsys uint32          // 系統中的 goroutine 的數量

    pidle      puintptr   // 閑置的 P
    npidle     uint32     // 閑置的 P 的數量
    nmspinning uint32     // 自旋狀態的 M 的個數

    // Global runnable queue.
    runq     gQueue      // 全局的 goroutine 隊列
    runqsize int32       // 當前隊列大小
}
schedt

調度策略

  Go 進行並發的基本流程圖:

我們通過對調度源碼進行分析,來定位上圖中的 6 個步驟,看下具體體現:

  程序入口

// from asm_amd64.s
        CALL    runtime·args(SB)    //處理參數
    CALL    runtime·osinit(SB)  //獲取 cpu 核數
    CALL    runtime·schedinit(SB)  //調度器的初始化

    // create a new goroutine to start program
    MOVQ    $runtime·mainPC(SB), AX        // entry
    PUSHQ    AX
    PUSHQ    $0            // arg size
    CALL    runtime·newproc(SB)  //創建一個 main goroutine
    POPQ    AX
    POPQ    AX

    // start this M
    CALL    runtime·mstart(SB)  //啟動這個 main goroutine 調度系統

    CALL    runtime·abort(SB)    // mstart should never return
    RET
程序入口

  調度入口

func mstart() {
    _g_ := getg() //獲取當前執行的 goroutine
    mstart1()     //主要調度在 mstart1() 中
}

func mstart1() {
    _g_ := getg()  //只有聲明,沒有函數實體,被編譯器寫入,目前只有通過注釋來知道該函數的作用

    if _g_ != _g_.m.g0 {
        throw("bad runtime·mstart")
    }
    //...

    if fn := _g_.m.mstartfn; fn != nil {  //此處可以看到 M 中的 mstartfn 域為要執行的函數實體
        fn()
    }

    if _g_.m != &m0 {
        acquirep(_g_.m.nextp.ptr())
        _g_.m.nextp = 0
    }
    schedule()  //進行調度,是一個不會結束的函數
}
start schedule

  調度過程

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg()

top:
    //...
    if gp == nil {
        // Check the global runnable queue once in a while to ensure fairness.
        // Otherwise two goroutines can completely occupy the local runqueue
        // by constantly respawning each other.
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {  //每61次調度檢查一下全局隊列,從全局隊列中取 G
            lock(&sched.lock)
            gp = globrunqget(_g_.m.p.ptr(), 1)
            unlock(&sched.lock)
        }
    }
    if gp == nil {    // 從本地隊列中取 G, 優先級最高
        gp, inheritTime = runqget(_g_.m.p.ptr())
        // We can see gp != nil here even if the M is spinning,
        // if checkTimers added a local goroutine via goready.
    }
    if gp == nil {   // 從別的地方去獲取可運行的 G,阻塞操作,直到找到可運行的 G
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    // This thread is going to run a goroutine and is not spinning anymore,
    // so if it was marked as spinning we need to reset it now and potentially
    // start a new spinning M.
    if _g_.m.spinning {  //如果當前 M 是自旋狀態,我們需要重置它,或者開啟一個新的 M
        resetspinning()
    }

    execute(gp, inheritTime)  //開始執行 G
}
scheduling policy

  尋找可運行G 的過程

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from local or global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
    _g_ := getg()

top:
    // local runq
    if gp, inheritTime := runqget(_p_); gp != nil { //從本地隊列中獲取G
        return gp, inheritTime
    }

    // global runq
    if sched.runqsize != 0 { //從全局隊列中獲取G
        lock(&sched.lock)
        gp := globrunqget(_p_, 0)
        unlock(&sched.lock)
        if gp != nil {
            return gp, false
        }
    }

    // 從 網絡I/O 中獲取 G
    if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
        if list := netpoll(0); !list.empty() { // non-blocking
            gp := list.pop()
            injectglist(&list)
            casgstatus(gp, _Gwaiting, _Grunnable)
            if trace.enabled {
                traceGoUnpark(gp, 0)
            }
            return gp, false
        }
    }

    // Steal work from other P's.
    procs := uint32(gomaxprocs)
    ranTimer := false
    for i := 0; i < 4; i++ {
        for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
            if sched.gcwaiting != 0 {
                goto top
            }
            stealRunNextG := i > 2      // first look for ready queues with more than 1 g
            p2 := allp[enum.position()] //從全局變量 allp 中隨機獲取一個 P
            // 從 p2 中偷取一半的 G,返回其中一個G
            if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
                return gp, false
            }
        }
    }
}
finding runable G

  執行 G 過程

// Schedules gp to run on the current M.
func execute(gp *g, inheritTime bool) {
    _g_ := getg()

    // Assign gp.m before entering _Grunning so running Gs have an
    // M.
    _g_.m.curg = gp
    gp.m = _g_.m  //綁定 M 准備運行
    casgstatus(gp, _Grunnable, _Grunning)  //修改 G 狀態為 Grunning
    // 給 G 一些變量賦值
    gp.waitsince = 0
    gp.preempt = false
    gp.stackguard0 = gp.stack.lo + _StackGuard

    gogo(&gp.sched)  // 真正執行調用的函數,sched 保存了上下文,只需要傳遞這個就可以執行了
}

//gogo 函數用匯編實現,主要是 從 g0 stack 切換到 g stack, JMP 到任務函數執行
//from asm_amd64.s
// func gogo(buf *gobuf)
// restore state from Gobuf; longjmp
TEXT runtime·gogo(SB), NOSPLIT, $16-8
MOVQ    buf+0(FP), BX        // gobuf
MOVQ    gobuf_g(BX), DX
MOVQ    0(DX), CX        // make sure g != nil
get_tls(CX)
MOVQ    DX, g(CX)
MOVQ    gobuf_sp(BX), SP    // restore SP
MOVQ    gobuf_ret(BX), AX
MOVQ    gobuf_ctxt(BX), DX
MOVQ    gobuf_bp(BX), BP
MOVQ    $0, gobuf_sp(BX)    // clear to help garbage collector
MOVQ    $0, gobuf_ret(BX)
MOVQ    $0, gobuf_ctxt(BX)
MOVQ    $0, gobuf_bp(BX)
MOVQ    gobuf_pc(BX), BX
JMP    BX  // longjmp 到任務函數執行
execute process

  退出過程

// Finishes execution of the current goroutine.
// 切換到 g0 執行退出操作
func goexit1() {
    //...
    mcall(goexit0)
}

// goexit continuation on g0.
func goexit0(gp *g) {
    _g_ := getg()

    casgstatus(gp, _Grunning, _Gdead)  // 修改 G 狀態為 Gdead
    if isSystemGoroutine(gp, false) {
        atomic.Xadd(&sched.ngsys, -1)
    }
    // 重置 G 的一些域
    gp.m = nil
    locked := gp.lockedm != 0
    gp.lockedm = 0
    _g_.m.lockedg = 0
    gp.preemptStop = false
    gp.paniconfault = false
    gp._defer = nil // should be true already but just in case.
    gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data.
    gp.writebuf = nil
    gp.waitreason = 0
    gp.param = nil
    gp.labels = nil
    gp.timer = nil

    dropg()  //與 M 解綁

    if GOARCH == "wasm" { // no threads yet on wasm
        gfput(_g_.m.p.ptr(), gp)
        schedule() // never returns
    }

    if _g_.m.lockedInt != 0 {
        print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n")
        throw("internal lockOSThread error")
    }
    gfput(_g_.m.p.ptr(), gp)  // 將 G 放入本地 freelist, 如果太長,就放入全局 freelist

    schedule()  //繼續進行調度
}
goexit process

  小結

  通過上面的過程分析,可以知道 schedule() 是一個不會結束的函數,循環的過程是:schedule() 找到可運行的 G ------>  execute() 執行------>goexit() 退出------>schedule() 的循環過程,和上圖中的過程一致。

源碼分析

  對 M 進行分析

//from go/src/runtime/proc.go
func newm(fn func(), _p_ *p) {
    mp := allocm(_p_, fn)
    mp.nextp.set(_p_)
    mp.sigmask = initSigmask
    ...
    newm1(mp)  // 調用 newm1() 創建 M
}

func newm1(mp *m) {
    ...
    execLock.rlock() // Prevent process clone.
    newosproc(mp)    // 創建一個系統線程,所以我們可以把 M 看作系統線程
    execLock.runlock()
}
create M

  M 的狀態轉換:

  對 P 進行分析

func procresize(nprocs int32) *p {
    // Grow allp if necessary.
    if nprocs > int32(len(allp)) {
        // Synchronize with retake, which could be running
        // concurrently since it doesn't run on a P.
        lock(&allpLock)
        if nprocs <= int32(cap(allp)) {
            allp = allp[:nprocs]
        } else {
            nallp := make([]*p, nprocs)  //remark1: 創建 nprocs 個 p
            // Copy everything up to allp's cap so we
            // never lose old allocated Ps.
            copy(nallp, allp[:cap(allp)])
            allp = nallp
        }
        unlock(&allpLock)
    }

    // initialize new P's  //remark2: 對所有 P 進行初始化
    for i := old; i < nprocs; i++ {
        pp := allp[i]
        if pp == nil {
            pp = new(p)
        }
        pp.init(i)
        atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
    }
    //...
    var runnablePs *p
    for i := nprocs - 1; i >= 0; i-- {
        p := allp[i]
        if _g_.m.p.ptr() == p {
            continue
        }
        p.status = _Pidle   //把當前 P 置 pidle 狀態
        if runqempty(p) {
            pidleput(p)     //放入 schdet 的 pidle 空閑鏈表中
        } else {
            p.m.set(mget())  //如果有可運行的 P,得到 M 進行綁定, 放入可運行鏈表
            p.link.set(runnablePs)
            runnablePs = p
        }
    }
    return runnablePs
}
create P

  P 的狀態轉換:

  對 G 進行分析

   首先我們知道:執行 go func() ,編譯器會調用 newproc() 創建一個新 goroutine, 我們看一下具體步驟:

func newproc(siz int32, fn *funcval) {
    argp := add(unsafe.Pointer(&fn), sys.PtrSize)
    gp := getg()  //remark1: 獲取當前的 g
    pc := getcallerpc()
    systemstack(func() {
        newg := newproc1(fn, argp, siz, gp, pc)  // 調用 newproc1 進行生成 g

        _p_ := getg().m.p.ptr()
        runqput(_p_, newg, true)  //將 G 放入 P 所在的 M 的本地隊列中(也可能是全局隊列)

        if mainStarted {
            wakep()
        }
    })
}

//go:systemstack
func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
    _g_ := getg()  //獲取當前運行的 goroutine, 當前 G 生成下個 G(繼生代)

    siz := narg
    siz = (siz + 7) &^ 7

    if siz >= _StackMin-4*sys.RegSize-sys.RegSize {  //檢查入參大小
        throw("newproc: function arguments too large for new goroutine")
    }

    _p_ := _g_.m.p.ptr()
    newg := gfget(_p_)  //嘗試從 p 的 freelist 中獲取一個 G
    if newg == nil {
        newg = malg(_StackMin)  //如果獲取不到,新建一個
        casgstatus(newg, _Gidle, _Gdead)  //置狀態為 Gdead
        allgadd(newg) // 將新生成的 G 放入 allg
    }
    //...
    // 對 newG 進行初始化
    newg.sched.sp = sp
    newg.stktopsp = sp
    newg.sched.pc = funcPC(goexit) + sys.PCQuantum //保存 goexit 的地址到 pc,執行完成 fn 之后,可以直接跳到 goexit 函數執行
    newg.sched.g = guintptr(unsafe.Pointer(newg))
    gostartcallfn(&newg.sched, fn)  //設置函數實體
    newg.gopc = callerpc
    newg.ancestors = saveAncestors(callergp)
    newg.startpc = fn.fn
    
    casgstatus(newg, _Gdead, _Grunnable)  //更改 G 狀態從 Gdead 到 Grunnable
    //...
    newg.goid = int64(_p_.goidcache)     //自增 ID

    return newg
}
create G

  G 的狀態轉換:

Go為何快?

Go 非常輕量

    Go 非常輕量,主要體現在如下兩個方面:

   1:上下文切換代價小。Goroutine  可以理解為用戶空間的調度,用 sched 保存 Goroutine 的 上下文狀態;而對比 OS 線程的上下文切換則需要涉及模式切換(從用戶態到內核態) 更輕量;

    2:內存占用少。線程棧空間通常是 2M, Goroutine 棧空間最小 2K; Golang 程序中可以輕松支持 10W 級別的 Goroutine 運行,而線程數量達到 1K 時, 內存占用就已經達到 2G。

充分利用線程的計算資源

     1:任務竊取。由於現實情況是有的 Goroutine 運行的快,有的滿,那么勢必肯定會帶來的問題就是,忙的忙死,閑的閑死,為調高整體處理效率,當每個 P 之間的 G 任務不均衡時,調度器允許從 GRQ, 或者其它 P 的 LRQ 中獲取 G 運行。

    2:減少阻塞。如果正在執行的 Goroutine 阻塞了線程 M 怎么辦?P 上的 LRQ 中的 Goroutine 會獲取不到調度么?

    在 Go 里面阻塞主要分為以下四種場景:

    A: 由於原子、互斥操作或通道操作導致 Goroutine 阻塞。調度器將把當前阻塞的 Goroutine 切換出去,重新調度 LRQ 上的其它 Goroutine;

    B:由於網絡請求和 I/O 操作導致 Goroutine 阻塞。Go 程序提供了網絡輪詢器(NetPoller) 來處理網絡請求和 I/O 操作的問題,其后台通過 kqueue(MacOS), epoll(Linux) 或 iocp(Windows) 來實現多路復用。

    C: 當調用一些系統方法的時候,如果系統方法調用的時候發生阻塞,這種情況下,NetPoller 將無法使用,進行系統調用的 Goroutine 將阻塞當前 M。調度器(Sched)將會介入, 分離 M 和 P, 同時也將 G 帶走(G, M 在一起), 然后調度器引入新的 M1 來服務 P, 此時,可以從 LRQ 中選擇另外的 G1 並在 M1 上進行上下文切換。

    D: 如果在 Goroutine 去執行一個 sleep 操作, 導致 M 被阻塞了。Go 程序后台有一個監控線程 sysmon, 它監控哪些長時間運行的 G任務,然后設置可以強占的標識符,別的 Goroutine 就可以搶先進來執行。

總結

   runtime 准備好 G, M, P, 然后 M 綁定 P, M 從各種隊列中獲取 G, 切換到 G 的執行棧上並執行 G 上的任務函數,調用 goexit 做清理工作並回到 M, 如此反復。

參考文獻

https://cloud.tencent.com/developer/article/1069239

[1] goroutine 的調度 【Go 夜讀】

[2] Go1.5 源碼剖析.pdf   雨痕

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM