摘要:Go 能很好的在用戶空間支持並發模型,這也是 Go 如此火熱的原因,那今天我們來學習 Go 的調度機制。
數據結構
G 結構體
G 是 goroutine 的縮寫,相當於操作系統中的進程控制塊,在這里就是 goroutine 的控制結構,是對 goroutine 的抽象,下面是 G 的結構(只列出了部分與調度有關的):
//用於保存上下文的 gobuf 結構體 type gobuf struct { sp uintptr //棧指針,上下文中的 sp 指針 pc uintptr //程序計數器,上下文中的 pc 指針 g guintptr //指向當前 g 的指針 ... } //用於表示一個等待鏈表上的 goroutine type sudog struct { g *g //阻塞列表上的 G next *sudog //雙向鏈表后指針 prev *sudog //雙向鏈表前指針 elem unsafe.Pointer //該 goroutine 的數據指針 c *hchan ... }
下面是 G 結構體:
type g struct { stack stack // offset known to runtime/cgo m *m // current m; offset known to arm liblink sched gobuf //進程切換時,利用 sched 來保存上下文 param unsafe.Pointer // 用於傳遞參數,睡眠時其它 goroutine 設置 param, 喚醒時此 goroutine 可以獲取到 goid int64 //goroutine 的 ID號 lockedm muintptr //G 被鎖定只能在這個 m 上運行 gopc uintptr //創建這個goroutine 的go 表達式的 pc waiting *sudog //這個 g 當前正在阻塞的 sudog 結構體 }
M 結構體
M 是 machine 的縮寫,是對機器的抽象,每個 m 都是對應到一條操作系統的物理線程。M 必須關聯了 P 才可以執行 Go 代碼,但是當它處理阻塞或者系統調用中時,可以不需要關聯 P。
type m struct { g0 *g // 帶有調度棧的 goroutine(默認開啟一個進程的時候會開啟一個線程,又稱主線程(g0)) mstartfn func() //執行函數體的 curg *g //當前運行的 goroutine p puintptr //為了執行 Go 代碼而獲取的 p(如果不需要執行 Go 代碼(syscall...),可為 nil) id int64 //M 的 ID locks int32 park note alllink *m //用於鏈接 allm(一個全局變量) schedlink muintptr lockedg guintptr //某些情況下,goroutine 鎖定到當前 m, 而不會到切換到其它 m 中去 createstack [32]uintptr // stack that created this thread. nextwaitm muintptr //期望獲取鎖的下一個 m syscall libcall //存儲系統調用的參數 }
P 結構體
P 是 Processor 的縮寫。結構體 P 的加入是為了提高 Go 程序的並發度。一共有 GOMAXPROCS(一般為 CPU 的核數) 個 P, 所有的 P 被組織成一個數組(allp), 在 P 上實現了工作流竊取的調度器。
type p struct { id int32 status uint32 //P 狀態 pidle/prunning/... link puintptr schedtick uint32 // 每次執行 goroutine 調度 +1 syscalltick uint32 // 每次執行系統調用 +1 sysmontick sysmontick // last tick observed by sysmon m muintptr // 鏈接到它的 m (nil if idle) mcache *mcache pcache pageCache // Queue of runnable goroutines. Accessed without lock. // P 執行 Go 代碼時,優先從自己這個局部隊列中取,這時可以不用加鎖,提高了並發度 // 如果發現這個隊列是空,則去其它 P 的隊列中拿一半過來,實現工作流竊取的調度,這種情況需要給調度器加鎖 runqhead uint32 //本地 G 隊列頭 runqtail uint32 //本地 G 隊列尾 runq [256]guintptr //本地 G 隊列 runnext guintptr //下一個准備好運行的 goroutine sudogcache []*sudog sudogbuf [128]*sudog ... }
Schedt 結構體
type schedt struct { lock mutex //獲取調度器的鎖,是全局性的鎖(比如從全局隊列獲取 G, 此時必須要加鎖) midle muintptr // 當前閑置的 m nmidle int32 // 閑置的 m 的個數 nmidlelocked int32 // 被鎖的閑置的 m 的數量 mnext int64 // 下一個 M 的 ID maxmcount int32 // 最大允許的 M 的數量 nmsys int32 // 系統中除了死鎖剩余的 M 的數量 nmfreed int64 // 將要釋放的 m 的數量 ngsys uint32 // 系統中的 goroutine 的數量 pidle puintptr // 閑置的 P npidle uint32 // 閑置的 P 的數量 nmspinning uint32 // 自旋狀態的 M 的個數 // Global runnable queue. runq gQueue // 全局的 goroutine 隊列 runqsize int32 // 當前隊列大小 }
調度策略
Go 進行並發的基本流程圖:

我們通過對調度源碼進行分析,來定位上圖中的 6 個步驟,看下具體體現:
程序入口
// from asm_amd64.s CALL runtime·args(SB) //處理參數 CALL runtime·osinit(SB) //獲取 cpu 核數 CALL runtime·schedinit(SB) //調度器的初始化 // create a new goroutine to start program MOVQ $runtime·mainPC(SB), AX // entry PUSHQ AX PUSHQ $0 // arg size CALL runtime·newproc(SB) //創建一個 main goroutine POPQ AX POPQ AX // start this M CALL runtime·mstart(SB) //啟動這個 main goroutine 調度系統 CALL runtime·abort(SB) // mstart should never return RET
調度入口
func mstart() { _g_ := getg() //獲取當前執行的 goroutine mstart1() //主要調度在 mstart1() 中 } func mstart1() { _g_ := getg() //只有聲明,沒有函數實體,被編譯器寫入,目前只有通過注釋來知道該函數的作用 if _g_ != _g_.m.g0 { throw("bad runtime·mstart") } //... if fn := _g_.m.mstartfn; fn != nil { //此處可以看到 M 中的 mstartfn 域為要執行的函數實體 fn() } if _g_.m != &m0 { acquirep(_g_.m.nextp.ptr()) _g_.m.nextp = 0 } schedule() //進行調度,是一個不會結束的函數 }
調度過程
// One round of scheduler: find a runnable goroutine and execute it. // Never returns. func schedule() { _g_ := getg() top: //... if gp == nil { // Check the global runnable queue once in a while to ensure fairness. // Otherwise two goroutines can completely occupy the local runqueue // by constantly respawning each other. if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 { //每61次調度檢查一下全局隊列,從全局隊列中取 G lock(&sched.lock) gp = globrunqget(_g_.m.p.ptr(), 1) unlock(&sched.lock) } } if gp == nil { // 從本地隊列中取 G, 優先級最高 gp, inheritTime = runqget(_g_.m.p.ptr()) // We can see gp != nil here even if the M is spinning, // if checkTimers added a local goroutine via goready. } if gp == nil { // 從別的地方去獲取可運行的 G,阻塞操作,直到找到可運行的 G gp, inheritTime = findrunnable() // blocks until work is available } // This thread is going to run a goroutine and is not spinning anymore, // so if it was marked as spinning we need to reset it now and potentially // start a new spinning M. if _g_.m.spinning { //如果當前 M 是自旋狀態,我們需要重置它,或者開啟一個新的 M resetspinning() } execute(gp, inheritTime) //開始執行 G }
尋找可運行G 的過程
// Finds a runnable goroutine to execute. // Tries to steal from other P's, get g from local or global queue, poll network. func findrunnable() (gp *g, inheritTime bool) { _g_ := getg() top: // local runq if gp, inheritTime := runqget(_p_); gp != nil { //從本地隊列中獲取G return gp, inheritTime } // global runq if sched.runqsize != 0 { //從全局隊列中獲取G lock(&sched.lock) gp := globrunqget(_p_, 0) unlock(&sched.lock) if gp != nil { return gp, false } } // 從 網絡I/O 中獲取 G if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 { if list := netpoll(0); !list.empty() { // non-blocking gp := list.pop() injectglist(&list) casgstatus(gp, _Gwaiting, _Grunnable) if trace.enabled { traceGoUnpark(gp, 0) } return gp, false } } // Steal work from other P's. procs := uint32(gomaxprocs) ranTimer := false for i := 0; i < 4; i++ { for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() { if sched.gcwaiting != 0 { goto top } stealRunNextG := i > 2 // first look for ready queues with more than 1 g p2 := allp[enum.position()] //從全局變量 allp 中隨機獲取一個 P // 從 p2 中偷取一半的 G,返回其中一個G if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil { return gp, false } } } }
執行 G 過程
// Schedules gp to run on the current M. func execute(gp *g, inheritTime bool) { _g_ := getg() // Assign gp.m before entering _Grunning so running Gs have an // M. _g_.m.curg = gp gp.m = _g_.m //綁定 M 准備運行 casgstatus(gp, _Grunnable, _Grunning) //修改 G 狀態為 Grunning // 給 G 一些變量賦值 gp.waitsince = 0 gp.preempt = false gp.stackguard0 = gp.stack.lo + _StackGuard gogo(&gp.sched) // 真正執行調用的函數,sched 保存了上下文,只需要傳遞這個就可以執行了 } //gogo 函數用匯編實現,主要是 從 g0 stack 切換到 g stack, JMP 到任務函數執行 //from asm_amd64.s // func gogo(buf *gobuf) // restore state from Gobuf; longjmp TEXT runtime·gogo(SB), NOSPLIT, $16-8 MOVQ buf+0(FP), BX // gobuf MOVQ gobuf_g(BX), DX MOVQ 0(DX), CX // make sure g != nil get_tls(CX) MOVQ DX, g(CX) MOVQ gobuf_sp(BX), SP // restore SP MOVQ gobuf_ret(BX), AX MOVQ gobuf_ctxt(BX), DX MOVQ gobuf_bp(BX), BP MOVQ $0, gobuf_sp(BX) // clear to help garbage collector MOVQ $0, gobuf_ret(BX) MOVQ $0, gobuf_ctxt(BX) MOVQ $0, gobuf_bp(BX) MOVQ gobuf_pc(BX), BX JMP BX // longjmp 到任務函數執行
退出過程
// Finishes execution of the current goroutine. // 切換到 g0 執行退出操作 func goexit1() { //... mcall(goexit0) } // goexit continuation on g0. func goexit0(gp *g) { _g_ := getg() casgstatus(gp, _Grunning, _Gdead) // 修改 G 狀態為 Gdead if isSystemGoroutine(gp, false) { atomic.Xadd(&sched.ngsys, -1) } // 重置 G 的一些域 gp.m = nil locked := gp.lockedm != 0 gp.lockedm = 0 _g_.m.lockedg = 0 gp.preemptStop = false gp.paniconfault = false gp._defer = nil // should be true already but just in case. gp._panic = nil // non-nil for Goexit during panic. points at stack-allocated data. gp.writebuf = nil gp.waitreason = 0 gp.param = nil gp.labels = nil gp.timer = nil dropg() //與 M 解綁 if GOARCH == "wasm" { // no threads yet on wasm gfput(_g_.m.p.ptr(), gp) schedule() // never returns } if _g_.m.lockedInt != 0 { print("invalid m->lockedInt = ", _g_.m.lockedInt, "\n") throw("internal lockOSThread error") } gfput(_g_.m.p.ptr(), gp) // 將 G 放入本地 freelist, 如果太長,就放入全局 freelist schedule() //繼續進行調度 }
小結
通過上面的過程分析,可以知道 schedule() 是一個不會結束的函數,循環的過程是:schedule() 找到可運行的 G ------> execute() 執行------>goexit() 退出------>schedule() 的循環過程,和上圖中的過程一致。
源碼分析
對 M 進行分析
//from go/src/runtime/proc.go func newm(fn func(), _p_ *p) { mp := allocm(_p_, fn) mp.nextp.set(_p_) mp.sigmask = initSigmask ... newm1(mp) // 調用 newm1() 創建 M } func newm1(mp *m) { ... execLock.rlock() // Prevent process clone. newosproc(mp) // 創建一個系統線程,所以我們可以把 M 看作系統線程 execLock.runlock() }
M 的狀態轉換:

對 P 進行分析
func procresize(nprocs int32) *p { // Grow allp if necessary. if nprocs > int32(len(allp)) { // Synchronize with retake, which could be running // concurrently since it doesn't run on a P. lock(&allpLock) if nprocs <= int32(cap(allp)) { allp = allp[:nprocs] } else { nallp := make([]*p, nprocs) //remark1: 創建 nprocs 個 p // Copy everything up to allp's cap so we // never lose old allocated Ps. copy(nallp, allp[:cap(allp)]) allp = nallp } unlock(&allpLock) } // initialize new P's //remark2: 對所有 P 進行初始化 for i := old; i < nprocs; i++ { pp := allp[i] if pp == nil { pp = new(p) } pp.init(i) atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp)) } //... var runnablePs *p for i := nprocs - 1; i >= 0; i-- { p := allp[i] if _g_.m.p.ptr() == p { continue } p.status = _Pidle //把當前 P 置 pidle 狀態 if runqempty(p) { pidleput(p) //放入 schdet 的 pidle 空閑鏈表中 } else { p.m.set(mget()) //如果有可運行的 P,得到 M 進行綁定, 放入可運行鏈表 p.link.set(runnablePs) runnablePs = p } } return runnablePs }
P 的狀態轉換:

對 G 進行分析
首先我們知道:執行 go func() ,編譯器會調用 newproc() 創建一個新 goroutine, 我們看一下具體步驟:
func newproc(siz int32, fn *funcval) { argp := add(unsafe.Pointer(&fn), sys.PtrSize) gp := getg() //remark1: 獲取當前的 g pc := getcallerpc() systemstack(func() { newg := newproc1(fn, argp, siz, gp, pc) // 調用 newproc1 進行生成 g _p_ := getg().m.p.ptr() runqput(_p_, newg, true) //將 G 放入 P 所在的 M 的本地隊列中(也可能是全局隊列) if mainStarted { wakep() } }) } //go:systemstack func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g { _g_ := getg() //獲取當前運行的 goroutine, 當前 G 生成下個 G(繼生代) siz := narg siz = (siz + 7) &^ 7 if siz >= _StackMin-4*sys.RegSize-sys.RegSize { //檢查入參大小 throw("newproc: function arguments too large for new goroutine") } _p_ := _g_.m.p.ptr() newg := gfget(_p_) //嘗試從 p 的 freelist 中獲取一個 G if newg == nil { newg = malg(_StackMin) //如果獲取不到,新建一個 casgstatus(newg, _Gidle, _Gdead) //置狀態為 Gdead allgadd(newg) // 將新生成的 G 放入 allg } //... // 對 newG 進行初始化 newg.sched.sp = sp newg.stktopsp = sp newg.sched.pc = funcPC(goexit) + sys.PCQuantum //保存 goexit 的地址到 pc,執行完成 fn 之后,可以直接跳到 goexit 函數執行 newg.sched.g = guintptr(unsafe.Pointer(newg)) gostartcallfn(&newg.sched, fn) //設置函數實體 newg.gopc = callerpc newg.ancestors = saveAncestors(callergp) newg.startpc = fn.fn casgstatus(newg, _Gdead, _Grunnable) //更改 G 狀態從 Gdead 到 Grunnable //... newg.goid = int64(_p_.goidcache) //自增 ID return newg }
G 的狀態轉換:

Go為何快?
Go 非常輕量
Go 非常輕量,主要體現在如下兩個方面:
1:上下文切換代價小。Goroutine 可以理解為用戶空間的調度,用 sched 保存 Goroutine 的 上下文狀態;而對比 OS 線程的上下文切換則需要涉及模式切換(從用戶態到內核態) 更輕量;
2:內存占用少。線程棧空間通常是 2M, Goroutine 棧空間最小 2K; Golang 程序中可以輕松支持 10W 級別的 Goroutine 運行,而線程數量達到 1K 時, 內存占用就已經達到 2G。
充分利用線程的計算資源
1:任務竊取。由於現實情況是有的 Goroutine 運行的快,有的滿,那么勢必肯定會帶來的問題就是,忙的忙死,閑的閑死,為調高整體處理效率,當每個 P 之間的 G 任務不均衡時,調度器允許從 GRQ, 或者其它 P 的 LRQ 中獲取 G 運行。
2:減少阻塞。如果正在執行的 Goroutine 阻塞了線程 M 怎么辦?P 上的 LRQ 中的 Goroutine 會獲取不到調度么?
在 Go 里面阻塞主要分為以下四種場景:
A: 由於原子、互斥操作或通道操作導致 Goroutine 阻塞。調度器將把當前阻塞的 Goroutine 切換出去,重新調度 LRQ 上的其它 Goroutine;
B:由於網絡請求和 I/O 操作導致 Goroutine 阻塞。Go 程序提供了網絡輪詢器(NetPoller) 來處理網絡請求和 I/O 操作的問題,其后台通過 kqueue(MacOS), epoll(Linux) 或 iocp(Windows) 來實現多路復用。
C: 當調用一些系統方法的時候,如果系統方法調用的時候發生阻塞,這種情況下,NetPoller 將無法使用,進行系統調用的 Goroutine 將阻塞當前 M。調度器(Sched)將會介入, 分離 M 和 P, 同時也將 G 帶走(G, M 在一起), 然后調度器引入新的 M1 來服務 P, 此時,可以從 LRQ 中選擇另外的 G1 並在 M1 上進行上下文切換。
D: 如果在 Goroutine 去執行一個 sleep 操作, 導致 M 被阻塞了。Go 程序后台有一個監控線程 sysmon, 它監控哪些長時間運行的 G任務,然后設置可以強占的標識符,別的 Goroutine 就可以搶先進來執行。
總結
runtime 准備好 G, M, P, 然后 M 綁定 P, M 從各種隊列中獲取 G, 切換到 G 的執行棧上並執行 G 上的任務函數,調用 goexit 做清理工作並回到 M, 如此反復。
參考文獻
https://cloud.tencent.com/developer/article/1069239
[1] goroutine 的調度 【Go 夜讀】
[2] Go1.5 源碼剖析.pdf 雨痕
