數據結構
調度相關的數據結構有三個,M(線程),P(調度器),G(goroutine)
M表示線程,P作為調度器用來幫助每個線程管理自己的goroutine,G就是golang的協程。我們可以通過runtime.GOMAXPROCS(n int)函數設置P的個數,注意P的個數並不代表M的個數,例如程序啟動時runtime代碼會出實話procs個P,但開始的時候只會啟動一個M,就是M0和一個棧為64K(其他goroutine默認初始棧大小2K)來執行runtime代碼。
那其他線程是什么時候創建的吶?
當goroutine被喚醒時,要在M上運行(恢復goroutine的上下文),P是幫助M管理goroutine的,恢復上下文的操作也由P來完成。如果被喚醒時發現還有空閑的P,並且沒有其他M在竊取goroutine(M發現本地goroutine隊列和全局goroutine隊列都沒有goroutine的時候,會去其他線程竊取goroutine),說明其他M都在忙,就會創建一個M讓這個空閑的P幫他來管理goroutine。
總之一句話,開始的時候創建一個M,當發現調度不過來且還有空閑P沒有工作就在創建新的,直到創建procs個M(procs通過runtime.GOMAXPROCS設置)

G
golang 用結構體g表示goroutine
g
type g struct {
stack stack // 當前棧的范圍[stack.lo, stack.hi)
stackguard0 uintptr // 用於搶占的,一般情況值為stack.lo + StackGuard
stackguard1 uintptr // 用於C語言的搶占
_panic *_panic // 最內側的panic函數
_defer *_defer // 最外側的defer函數
m *m // 當前goroutine屬於哪個m
sched gobuf // 調度相關信息
...
schedlink guintptr // sched是全局的goroutine鏈表,schedlink表示這個goroutine在鏈表中的下一個goroutine的指針
...
preempt bool // 搶占標志,如果需要搶占就將preempt設置為true
...
}
gobuf
gobuf保存goroutine的調度信息,當一個goroutine被調度的時,本質上就是把這個goroutine放到cpu,恢復各個寄存器的值,然后運行
type gobuf struct {
sp uintptr // 棧指針
pc uintptr // 程序計數器
g guintptr // 當前被哪個goroutine持有
ctxt unsafe.Pointer
ret sys.Uintreg // 系統調用返回值,防止系統調用后被其他goroutine搶占,所以有個地方保存返回值
lr uintptr
bp uintptr // 保存CPU的rip寄存器的值
}
M
golang中M表示實際操作系統的線程
m
type m struct {
g0 *g // g0幫M處理大小事務的goroutine,他是m中的第一個goroutine
...
gsignal *g // 用於信號處理的goroutine
tls [6]uintptr // 線程私有空間
mstartfn func()
curg *g // current running goroutine
...
p puintptr // 當前正在運行的p(處理器)
nextp puintptr // 暫存的p
oldp puintptr // 執行系統調用之前的p
...
spinning bool // 表示當前m沒有goroutine了,正在從其他m偷取goroutine
blocked bool // m is blocked on a note
...
park note // m沒有goroutine的時候會在park上sleep,需要其他m在park中wake up這個m
alllink *m // on allm // 所有m的鏈表
...
thread uintptr // thread handle
...
}
P
golang中P表示一個調度器,為M提供上下文環境,使得M可以執行多個goroutine
p
type p struct {
m muintptr // 與哪個M關聯(可能為空的)
...
runqhead uint32 // p本地goroutine隊列的頭
runqtail uint32 // p本地goroutine隊列的尾
runq [256]guintptr // 隊列指針,和sync.pool中數據結構一樣也是循環隊列
...
sudogcache []*sudog // sudog緩存,channel用的
sudogbuf [128]*sudog // 也是防止false sharing
...
pad cpu.CacheLinePad // 防止false sharing
}
schedt
schedt結構體用來保存P的狀態信息和goroutine的全局運行隊列
type schedt struct {
...
lock mutex // 全局鎖
// 維護空閑的M
midle muintptr // 等待中的M鏈表
nmidle int32 // 等待中的M的數量
nmidlelocked int32 // number of locked m's waiting for work
mnext int64 // number of m's that have been created and next M ID
maxmcount int32 // 最多創建多少個M(10000)
nmsys int32 // number of system m's not counted for deadlock
nmfreed int64 // cumulative number of freed m's
ngsys uint32 // number of system goroutines; updated atomically
// 維護空閑的P
pidle puintptr // idle p's
npidle uint32
nmspinning uint32 // See "Worker thread parking/unparking" comment in proc.go.
// goroutine的全局隊列
runq gQueue
runqsize int32
...
// 全局緩存已經退出的goroutine鏈表,下次再創建的時候直接用
// Global cache of dead G's.
gFree struct {
lock mutex
stack gList // Gs with stacks
noStack gList // Gs without stacks
n int32
}
...
}
重要的全局變量
allgs []*g // 保存所有的g
allm *m // 所有的m構成的一個鏈表,包括下面的m0
allp []*p // 保存所有的p,len(allp) == gomaxprocs
ncpu int32 // 系統中cpu核的數量,程序啟動時由runtime代碼初始化
gomaxprocs int32 // p的最大值,默認等於ncpu,但可以通過GOMAXPROCS修改
sched schedt // 調度器結構體對象,記錄了調度器的工作狀態
m0 m // 代表進程的主線程
g0 g // m0的g0,也就是m0.g0 = &g0
分步驟剖析調度的初始化
下面是用go實現的hello world,代碼里並沒有關於調度的初始化,所以程序的入口並非是main.main,下面通過gdb一步步找到go是如何初始化調度的。
// test.go
package main
func main() {
println("hello, world!")
}
編譯
go build -gcflags "-N -l" test.go
使用OS X的同學注意,go1.11之后壓縮的debug信息,OS X的同學需要同時做以下設置參考Debug Go Program With Gdb On Macos
export GOFLAGS="-ldflags=-compressdwarf=false"
調試
- 利用斷點可以找出目標文件的信息,在入口處打一個斷點,找到程序入口在rt0_darwin_amd64.s的第8行
➜ sudo gdb test
(gdb) info files
Symbols from "/Users/journey/workspace/src/tool/gdb/test".
Local exec file:
`/Users/journey/workspace/src/tool/gdb/test', file type mach-o-x86-64.
Entry point: 0x104cd00
0x0000000001001000 - 0x00000000010515b1 is .text
0x00000000010515c0 - 0x000000000108162a is __TEXT.__rodata
0x0000000001081640 - 0x0000000001081706 is __TEXT.__symbol_stub1
0x0000000001081720 - 0x0000000001081e80 is __TEXT.__typelink
0x0000000001081e80 - 0x0000000001081e88 is __TEXT.__itablink
0x0000000001081e88 - 0x0000000001081e88 is __TEXT.__gosymtab
0x0000000001081ea0 - 0x00000000010bfacd is __TEXT.__gopclntab
0x00000000010c0000 - 0x00000000010c0020 is __DATA.__go_buildinfo
0x00000000010c0020 - 0x00000000010c0128 is __DATA.__nl_symbol_ptr
0x00000000010c0140 - 0x00000000010c0d08 is __DATA.__noptrdata
0x00000000010c0d20 - 0x00000000010c27f0 is .data
0x00000000010c2800 - 0x00000000010ddc90 is .bss
0x00000000010ddca0 - 0x00000000010e01e8 is __DATA.__noptrbss
(gdb) b *0x104cd00
Breakpoint 1 at 0x104cd00: file /usr/local/go/src/runtime/rt0_darwin_amd64.s, line 8.
- 進入上面找到的文件rt0_darwin_amd64.s(不同的架構文件是不同的)
➜ runtime ls rt0_*
rt0_aix_ppc64.s rt0_darwin_amd64.s rt0_freebsd_arm.s rt0_linux_arm64.s rt0_nacl_386.s rt0_netbsd_arm64.s rt0_plan9_amd64.s
rt0_android_386.s rt0_darwin_arm.s rt0_illumos_amd64.s rt0_linux_mips64x.s rt0_nacl_amd64p32.s rt0_openbsd_386.s rt0_plan9_arm.s
rt0_android_amd64.s rt0_darwin_arm64.s rt0_js_wasm.s rt0_linux_mipsx.s rt0_nacl_arm.s rt0_openbsd_amd64.s rt0_solaris_amd64.s
rt0_android_arm.s rt0_dragonfly_amd64.s rt0_linux_386.s rt0_linux_ppc64.s rt0_netbsd_386.s rt0_openbsd_arm.s rt0_windows_386.s
rt0_android_arm64.s rt0_freebsd_386.s rt0_linux_amd64.s rt0_linux_ppc64le.s rt0_netbsd_amd64.s rt0_openbsd_arm64.s rt0_windows_amd64.s
rt0_darwin_386.s rt0_freebsd_amd64.s rt0_linux_arm.s rt0_linux_s390x.s rt0_netbsd_arm.s rt0_plan9_386.s rt0_windows_arm.s
- 打開文件go/src/runtime/rt0_darwin_amd64.s:8
這里沒有做什么就調了函數_rt0_amd64
TEXT _rt0_amd64_darwin(SB),NOSPLIT,$-8 // 參數+返回值共8字節
JMP _rt0_amd64(SB)
- 然后在打斷點看看_rt0_amd64在哪
在ams_amd64.s第15行
(gdb) b _rt0_amd64
Breakpoint 2 at 0x1049350: file /usr/local/go/src/runtime/asm_amd64.s, line 15.
這里首先把參數放到DI,SI寄存器中,然后調用runtime.rt0_go,這就是進程初始化主要函數了
參數0放在DI通用寄存器
參數1放在SI通用寄存器
參數2放在DX通用寄存器
參數3放在CX通用寄存器
TEXT _rt0_amd64(SB),NOSPLIT,$-8 // 參數+返回值共8字節
MOVQ 0(SP), DI // argc
LEAQ 8(SP), SI // argv
JMP runtime·rt0_go(SB)
- 然后跳轉到runtime.rt0_go
(gdb) b runtime.rt0_go
Breakpoint 3 at 0x1049360: file /usr/local/go/src/runtime/asm_amd64.s, line 89.
初始化
這個函數有點長,下面我們分段來看rt0_go這個函數
初始化參數以及創建g0
-
首先將之前放入通用寄存器的參數放入AX,BX寄存器,然后調整棧頂指針(真SP寄存器)的位置,SP指針先減39,關於16字節向下對齊(因為CPU有一組 SSE 指令,這些指令中出現的內存地址必須是16的倍數),然后把參數放到SP+16字節和SP+24字節處
golang的匯編有抽象出來的寄存器,通過是否有前綴變量區分真假寄存器,例如a+8(SP)就是golang的寄存器,8(SP)就是真的寄存器 -
創建g0,並初始化g.stackgruard0,g.stackguard1以及g.stack.lo,g.stack.hi的值(實際上是分配一段內存,然后分割成小段,約定哪小段表示哪個變量)
TEXT runtime·rt0_go(SB),NOSPLIT,$0
MOVQ DI, AX // argc
MOVQ SI, BX // argv
SUBQ $(4*8+7), SP // 2args 2auto
ANDQ $~15, SP
MOVQ AX, 16(SP)
MOVQ BX, 24(SP)
// 初始化g0,g0就是go的第一個協程
// 給g0分配棧空間大概64K
//
MOVQ $runtime·g0(SB), DI
LEAQ (-64*1024+104)(SP), BX // BX = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard0(DI) // g0.g_stackguard0 = SP - 64 * 1024 + 104
MOVQ BX, g_stackguard1(DI) // g0.g_stackguard1 = SP - 64 * 1024 + 104
MOVQ BX, (g_stack+stack_lo)(DI) // g0.stack.lo = SP - 64 * 1024 + 104
MOVQ SP, (g_stack+stack_hi)(DI) // g0.stack.hi = SP
創建完g0的內存分布

然后略過一段CPU型號檢測和CGO初始化的代碼
...
創建m0
- 創建將m0.tls放入DI寄存器,然后調用runtime.settls將m0設置為線程私有變量(mac下什么也沒干),將m0與主線程綁定,然后對m0.tls進行存取操作驗證是否能用,不能用就直接退出
- 綁定m0和g0的關系,m0.g0 = g0,g0.m = m0
// 將m0與主線程綁定
LEAQ runtime·m0+m_tls(SB), DI // 將m0的thread local store成員的地址到DI
CALL runtime·settls(SB) // 調用settls設置線程本地存儲(mac 下settls什么都沒做,線程已經設置好本地存儲了)
// 通過往TLS存0x123在判斷tls[0]是不是0x123驗證TLS是否可用,如果不可用就abort
get_tls(BX)
MOVQ $0x123, g(BX)
MOVQ runtime·m0+m_tls(SB), AX
CMPQ AX, $0x123
JEQ 2(PC)
CALL runtime·abort(SB)
ok:
// 把g0存入m0的本地存儲tls[0]
get_tls(BX) // 將m0.tls[0]地址放入BX
LEAQ runtime·g0(SB), CX // 將g0地址放入CX
MOVQ CX, g(BX) // m0.tls[0] = &g0
LEAQ runtime·m0(SB), AX // 將m0地址放入AX
// 將m0和g0建立映射關系
// save m->g0 = g0
MOVQ CX, m_g0(AX) // m0.g0 = g0
// save m0 to g0->m
MOVQ AX, g_m(CX) // g0.m = m0
CLD // convention is D is always left cleared
CALL runtime·check(SB)
創建完m0之后的內存分布

m0和g0的關系
- m0表示主線程,g0表示主線程的第一個goroutine
- g0主要是記錄主線程的棧信息,執行調度函數(schedule后邊會講)時會用,而用戶goroutine有自己的棧,執行的時候會從g0棧切換到用戶goroutine棧
初始化調度
g0和m0都創建並初始化好了,下面就該進行調度初始化了
- 將參數放入AX(初始化g0時將參數放入SP+16和SP+24的位置
- runtime.args初始化參數的
- runtime.osinit是初始化CPU核數的
- 重點看runtime.schedinit
// 初始化m0
// 將argc和argv入棧
MOVL 16(SP), AX // copy argc
MOVL AX, 0(SP)
MOVQ 24(SP), AX // copy argv
MOVQ AX, 8(SP)
// 處理參數
CALL runtime·args(SB)
// 獲取cpu的核數
CALL runtime·osinit(SB)
// 調度系統初始化
CALL runtime·schedinit(SB)
runtime.schedinit
下面函數省略了調度無關的代碼,大概流程:
- 設置最大線程數
- 根據GOMAXPROCS設置procs(P的數量)
- 調用procresizeprocs調整P的數量
func schedinit() {
// 取出g0
_g_ := getg()
if raceenabled {
_g_.racectx, raceprocctx0 = raceinit()
}
// 設置最大線程數
sched.maxmcount = 10000
...
// 初始化m0, 前邊已經將m0和g0的關系綁定好了
// 只是檢查一下各種變量,然后將m0掛到allm鏈表中
mcommoninit(_g_.m)
...
sched.lastpoll = uint64(nanotime())
// ncpu在osinit時已經獲取
procs := ncpu
// 如果GOMAXPROCS設置並且合法就將procs的設置為GOMAXPROCS
if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
procs = n
}
if procresize(procs) != nil {
throw("unknown runnable goroutine during bootstrap")
}
...
}
runtime.procresize
- 調度初始化最后一步
- 更新最后一次修改P數量動作的時間戳並累加花費時間
- 根據nprocs調整P的數量(加鎖)
- nprocs > 現有P數量,就擴展allp(p的全局數組)的長度為nprocs
- nprocs < 現有P數量,就縮容allp的長度為nprocs
- 如果上一步是擴容了,就從堆中創建新P,並把P放入擴容出來的位置
- 通過g0找到m0,然后將allp[0]和m0綁定
- 如果allp縮容了,就將多余的p銷毀
- 將空閑的p加入空閑鏈表
到目前為止,創建了m0,g0,和nprocs個P,但是還是沒有讓調度真正的跑起來
func procresize(nprocs int32) *p {
old := gomaxprocs
if old < 0 || nprocs <= 0 {
throw("procresize: invalid arg")
}
if trace.enabled {
traceGomaxprocs(nprocs)
}
// update statistics
now := nanotime()
if sched.procresizetime != 0 {
sched.totaltime += int64(old) * (now - sched.procresizetime)
}
sched.procresizetime = now
if nprocs > int32(len(allp)) { // 初始化的len(allp) == 0
lock(&allpLock)
if nprocs <= int32(cap(allp)) { // 需要縮容
allp = allp[:nprocs]
} else { // 擴容
nallp := make([]*p, nprocs)
// Copy everything up to allp's cap so we
// never lose old allocated Ps.
copy(nallp, allp[:cap(allp)])
allp = nallp
}
unlock(&allpLock)
}
for i := old; i < nprocs; i++ {
pp := allp[i]
if pp == nil {
pp = new(p)
}
pp.init(i)
atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
}
_g_ := getg() // 獲取g0
if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs { // 進程初始化時g0.m與p沒有綁定,所以g0.m.p == 0
_g_.m.p.ptr().status = _Prunning
_g_.m.p.ptr().mcache.prepareForSweep()
} else {
if _g_.m.p != 0 {
if trace.enabled {
traceGoSched()
traceProcStop(_g_.m.p.ptr())
}
_g_.m.p.ptr().m = 0
}
_g_.m.p = 0
_g_.m.mcache = nil
p := allp[0]
p.m = 0
p.status = _Pidle
acquirep(p) // 把allp[0]和m0關聯起來
if trace.enabled {
traceGoStart()
}
}
// 如果有需要銷毀的p,就是銷毀
for i := nprocs; i < old; i++ {
p := allp[i]
p.destroy()
// can't free P itself because it can be referenced by an M in syscall
}
if int32(len(allp)) != nprocs {
lock(&allpLock)
allp = allp[:nprocs]
unlock(&allpLock)
}
// 將空閑p放入空閑鏈表
var runnablePs *p
for i := nprocs - 1; i >= 0; i-- {
p := allp[i]
if _g_.m.p.ptr() == p { // allp[0]已經和m0關聯了,所以不用放入空閑鏈表
continue
}
p.status = _Pidle
if runqempty(p) {
pidleput(p)
} else {
p.m.set(mget())
p.link.set(runnablePs)
runnablePs = p
}
}
stealOrder.reset(uint32(nprocs))
var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
return runnablePs
}
創建"第一個"goroutine
我們返回runtime·rt0_go接着看
- 將runtime.main地址放入AX
- 參數AX, 0入棧(函數參數入棧由右向左)
- 然后調用runtime.newproc創建goroutine
// create a new goroutine to start program
// 創建第一個goroutine執行runtime.main,源碼里沒搜到runtime.mainPC,在schedinit函數前注釋里找到的runtime.mainPC就是runtime.main
MOVQ $runtime·mainPC(SB), AX // entry AX = func(runtime.main)
PUSHQ AX
PUSHQ $0 // arg size runtime.main沒有參數所以入棧0
CALL runtime·newproc(SB) // 創建goroutine執行runtime.main(還沒執行,只是將goroutine加入待運行隊列)
POPQ AX // 出棧
POPQ AX // 出棧
newproc
- 首先獲取參數地址
- 獲取當前所在goroutine(初始化時runtime代碼都在g0執行)
- 獲取要執行指令地址
- 在gp的棧上執行runtime.newproc1(在g0棧上執行)
func newproc(siz int32, fn *funcval) {
// 獲取函數fn的第一個參數的位置
argp := add(unsafe.Pointer(&fn), sys.PtrSize)
// 獲取當前所有goroutine ---- g0
gp := getg()
// 獲取要執行指令的位置
pc := getcallerpc()
/*
systemstack是將函數切換到g0的棧上運行,初始化時本來就在g0的棧上,所以直接調用函數返回
*/
systemstack(func() {
newproc1(fn, (*uint8)(argp), siz, gp, pc)
})
}
newproc1函數主要的工作
這個函數有點長分段來看
- 首先獲得當前所在goroutine(g0)
- 禁止搶占
- 計算參數位置
- 計算下參數是否過大
- 獲取當前goroutine所在m的p,前邊講過g0對應的m是m0,m0對應的p是allp[0]
- 創建一個goroutine(先從p的緩存里找,找不到就new一個),並且確認goroutine棧邊界是初始化好的(方式p緩存里的goroutine參數沒初始化)
- 計算棧頂的地址,如果有參數就將參數放到新創建的這個goroutine上
func newproc1(fn *funcval, argp *uint8, narg int32, callergp *g, callerpc uintptr) {
// 獲取當前所在goroutine,初始化的是g0
_g_ := getg()
if fn == nil {
_g_.m.throwing = -1 // do not dump full stacks
throw("go of nil func value")
}
// 禁止搶占,把p固定在本地變量
acquirem() // disable preemption because it can be holding p in a local var
siz := narg
siz = (siz + 7) &^ 7
// 檢查一下參數是否需要空間是否過大,參數大小 和 棧大小 - (額外棧底空間(猜的)) - 返回需要的棧大小
if siz >= _StackMin-4*sys.RegSize-sys.RegSize {
throw("newproc: function arguments too large for new goroutine")
}
// _p_ = allp[0]
_p_ := _g_.m.p.ptr()
// 從_p_的緩存中取一個g,初始化的時候沒有可用的g所以newg==nil
newg := gfget(_p_)
if newg == nil {
// 創建一個新g,棧空間2k, 並且給stack,stackguard0,stackguard1初始化
newg = malg(_StackMin)
// 將g的狀態設置為_Gdead
casgstatus(newg, _Gidle, _Gdead)
// 將g加入allg鏈表
allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
}
// 確認剛才的初始化是否有效
if newg.stack.hi == 0 {
throw("newproc1: newg missing stack")
}
if readgstatus(newg) != _Gdead {
throw("newproc1: new g is not Gdead")
}
// 省略一段調整sp指針的函數,並且如果有參數就將參數放入new goroutine的棧中
...
設置各個寄存器的值(在cpu上恢復上下文時使用)
1) 清理sched
2) 設置棧頂置針位置
3) 設置pc寄存器值(goexit函數第二條指令,常理應該是goroutine本身函數的第一條指令,這個妙用后邊說)
4) 設置goroutine地址
5) 調用gostartcallfn,參數是sched和goroutine的參數
// 清理sched(各參數清零)
memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
// 設置sched, 在CPU上運行的相關參數
newg.sched.sp = sp
newg.stktopsp = sp
// 設置pc,被調度時第一條指令的位置,將pc設置為goexit函數一個偏移量的位置(goexit函數第二條指令)
// 這里把pc設置為goexit函數的第二條指令的作用就是,偽裝成goexit函數調用的fn函數,當fn執行完跳回goexit函數繼續做退出需要的操作
newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
newg.sched.g = guintptr(unsafe.Pointer(newg))
gostartcallfn(&newg.sched, fn)
判斷一下goroutine的函數是否為空,然后調用gostartcall
func gostartcallfn(gobuf *gobuf, fv *funcval) {
var fn unsafe.Pointer
if fv != nil {
fn = unsafe.Pointer(fv.fn)
} else {
fn = unsafe.Pointer(funcPC(nilfunc))
}
gostartcall(gobuf, fn, unsafe.Pointer(fv))
}
- 獲取sp,現在新goroutine的棧上之后本身的函數,sp指向函數的第一個參數
- 將sp指向pc里面的指令地址,也就是goexit的第二條指令,然后重新設置新goroutinesp地址
- 這時候pc才指向goroutine自己的函數
gostartcall的主要作用就是將goexit入棧,然后設置goroutine的pc指向自身函數,偽裝成是goexit調用的自身函數,當自身函數執行完時返回goexit清理線程,大概就是下面這樣
func goexit() {
goroutine自身函數()
清理現場()
}
func gostartcall(buf *gobuf, fn, ctxt unsafe.Pointer) {
sp := buf.sp
if sys.RegSize > sys.PtrSize {
sp -= sys.PtrSize
*(*uintptr)(unsafe.Pointer(sp)) = 0
}
// 預留返回值空間
sp -= sys.PtrSize
// sp指向pc指令的位置,前邊已經將goexit第二條指令的地址放入pc
*(*uintptr)(unsafe.Pointer(sp)) = buf.pc
// 然后設置sp
buf.sp = sp
// 這時候的pc才是goroutine的函數
buf.pc = uintptr(fn)
buf.ctxt = ctxt
}
然后再回到newproc函數,剩下的就是設置goroutine的狀態,然后把goroutine放入p的待執行隊列中
newg.gopc = callerpc // 用於traceback
newg.ancestors = saveAncestors(callergp)
// newg的函數從哪里開始執行依賴於sched.pc 不依賴於startpc
newg.startpc = fn.fn
if _g_.m.curg != nil {
newg.labels = _g_.m.curg.labels
}
if isSystemGoroutine(newg, false) {
atomic.Xadd(&sched.ngsys, +1)
}
newg.gcscanvalid = false
// 設置newg狀態為_Grunnable, 到這里newg就可以運行了
casgstatus(newg, _Gdead, _Grunnable)
if _p_.goidcache == _p_.goidcacheend {
_p_.goidcache = atomic.Xadd64(&sched.goidgen, _GoidCacheBatch)
_p_.goidcache -= _GoidCacheBatch - 1
_p_.goidcacheend = _p_.goidcache + _GoidCacheBatch
}
newg.goid = int64(_p_.goidcache)
_p_.goidcache++
if raceenabled {
newg.racectx = racegostart(callerpc)
}
if trace.enabled {
traceGoCreate(newg, newg.startpc)
}
// 將newg加入p的待運行隊列
runqput(_p_, newg, true)
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 && mainStarted {
wakep()
}
// 取消m的固定
releasem(_g_.m)
}
總結一下創建第一個goroutine執行runtime.main的過程(只是創建啊,整個調度這時候還是沒有跑起來)

調度循環
我們再返回runtime·rt0_go繼續看,總結一下到目前為止已經准備好的事情
- 將m0與主線程綁定了(將m0結構體設為主線程的私有變量)
- 創建了g0,並且與m0綁定
- 創建了procs個p並且初始化,將allp[0]與m0綁定,形成初步的GMP模型(g0,m0,p0)
- 創建了一個執行runtime.main(不是代碼里的main.main,runtime.main會做加載init函數等操作然后調用main.main)的goroutine並且放入了p0的待運行隊列
接下來就是調度循環了,調用runtime.mstart,這個函數就是調度循環,除非程序退出否則永遠阻塞住
// start this M
// 運行runtime.mstart這個函數會阻塞住,運行結束的時候就是程序退出的時候
CALL runtime·mstart(SB)
CALL runtime·abort(SB) // mstart should never return
RET
// Prevent dead-code elimination of debugCallV1, which is
// intended to be called by debuggers.
MOVQ $runtime·debugCallV1(SB), AX
RET
runtime.mstart
- 獲取了當前所在goroutine(初始化時代碼都是在g0上執行的)
- 初始化棧保護
- 調用mstart1
go/src/runtime/proc.go, line 1146
func mstart() {
_g_ := getg() // 獲取g0
osStack := _g_.stack.lo == 0 // g0.stack.lo在前邊已經初始化過了,所以osStack = false
if osStack {
size := _g_.stack.hi
if size == 0 {
size = 8192 * sys.StackGuardMultiplier
}
_g_.stack.hi = uintptr(noescape(unsafe.Pointer(&size)))
_g_.stack.lo = _g_.stack.hi - size + 1024
}
// 初始化棧保護
_g_.stackguard0 = _g_.stack.lo + _StackGuard
_g_.stackguard1 = _g_.stackguard0
// 開始m0開始運行
mstart1()
// Exit this thread.
if GOOS == "windows" || GOOS == "solaris" || GOOS == "illumos" || GOOS == "plan9" || GOOS == "darwin" || GOOS == "aix" {
osStack = true
}
mexit(osStack)
}
runtime.mstart1
- 保存g0的指令指針和棧指針,保存這兩個值是理解調度循環的關鍵,mstart1執行完之后,g0繼續執行指令,不會再返回來了,保存了指令和棧指針之后,g0要繼續執行指令的時候,就會又從上面開始執行
- 做一些初始化工作
- 調用schedule開始調度
func mstart1() {
// 獲取當前goroutine g0
_g_ := getg()
if _g_ != _g_.m.g0 {
throw("bad runtime·mstart")
}
// save函數保存了g0再次運行時(循環調度下一次回頭)調度相關信息
save(getcallerpc(), getcallersp())
// asminit
asminit()
// 信號相關初始化
minit()
// 初始化時m == m0,mstartm0也是信號相關的初始化
if _g_.m == &m0 {
mstartm0()
}
// 初始化時fn == ni
if fn := _g_.m.mstartfn; fn != nil {
fn()
}
if _g_.m != &m0 {
acquirep(_g_.m.nextp.ptr())
_g_.m.nextp = 0
}
// 開始調度
schedule()
}
runtime.schedule
調度開始了,m要找gorutine放到cpu上執行了
- 每調度61次(具體為啥是61有待思考),就從全局的goroutine列表中選goroutine
- 如果上一步沒找到,就從m對應的p的緩存里找
- 如果上一步還沒有找到,就調findrunnable從其他線程竊取goroutine,如果發現有就竊取一半放到自己的p緩存中,如果都沒有就說明真的沒有待運行的goroutine了,就陷入睡眠一直阻塞在findrunnable函數,等待被喚醒
- 直到有goroutine需要執行了,就調用execute執行goroutine
func schedule() {
// 獲得g0
_g_ := getg()
if _g_.m.locks != 0 {
throw("schedule: holding locks")
}
if _g_.m.lockedg != 0 {
stoplockedm()
execute(_g_.m.lockedg.ptr(), false) // Never returns.
}
if _g_.m.incgo {
throw("schedule: in cgo")
}
top:
// 等待gc
if sched.gcwaiting != 0 {
gcstopm()
goto top
}
if _g_.m.p.ptr().runSafePointFn != 0 {
runSafePointFn()
}
var gp *g
var inheritTime bool
tryWakeP := false
if trace.enabled || trace.shutdown {
gp = traceReader()
if gp != nil {
casgstatus(gp, _Gwaiting, _Grunnable)
traceGoUnpark(gp, 0)
tryWakeP = true
}
}
if gp == nil && gcBlackenEnabled != 0 {
gp = gcController.findRunnableGCWorker(_g_.m.p.ptr())
tryWakeP = tryWakeP || gp != nil
}
// 先從全局隊列中獲取,每61次調度都會從全局隊列中獲取goroutine
if gp == nil {
if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
lock(&sched.lock)
gp = globrunqget(_g_.m.p.ptr(), 1)
unlock(&sched.lock)
}
}
// 如果還空就從本地隊列中獲取
if gp == nil {
gp, inheritTime = runqget(_g_.m.p.ptr())
if gp != nil && _g_.m.spinning {
throw("schedule: spinning with local work")
}
}
// 如果本地也沒有就調用findrunnable從其他線程偷一個過來,直到偷過來在運行
if gp == nil {
gp, inheritTime = findrunnable() // blocks until work is available
}
if _g_.m.spinning {
resetspinning()
}
if sched.disable.user && !schedEnabled(gp) {
lock(&sched.lock)
if schedEnabled(gp) {
unlock(&sched.lock)
} else {
sched.disable.runnable.pushBack(gp)
sched.disable.n++
unlock(&sched.lock)
goto top
}
}
if tryWakeP {
if atomic.Load(&sched.npidle) != 0 && atomic.Load(&sched.nmspinning) == 0 {
wakep()
}
}
if gp.lockedm != 0 {
startlockedm(gp)
goto top
}
// 執行這個goroutine
execute(gp, inheritTime)
}
觸發調度
觸發調度地方大致有:
- 主動掛起
- 系統調用
- 協作式調度
- 正常退出
- proc.go:1208 runtime.mstart1(調度開始)
主動掛起
- proc.go:2610 runtime.park_m
在上一章內容里講過golang channel源碼閱讀,當goroutine接收一個channel為空且為阻塞的時候,goroutine會調用goparkunlock使goroutine陷入睡眠,等待send端調用goready函數喚醒函數,主動掛起就是這種情況,當goroutine由於某些條件在等待時,就會主動掛起,不放回待運行隊列,等待被喚醒
各種阻塞條件 -> runtime.gopark() -> runtime.park_m() -> runtime.schedule
- 獲取當前所在m,並且固定m
- 獲取當前程序所在goroutine
- 設置鎖狀態以及阻塞原因
- 調用runtime.park_m掛起goroutine
func gopark(unlockf func(*g, unsafe.Pointer) bool, lock unsafe.Pointer, reason waitReason, traceEv byte, traceskip int) {
if reason != waitReasonSleep {
checkTimeouts() // timeouts may expire while two goroutines keep the scheduler busy
}
mp := acquirem()
gp := mp.curg
status := readgstatus(gp)
if status != _Grunning && status != _Gscanrunning {
throw("gopark: bad g status")
}
mp.waitlock = lock
mp.waitunlockf = unlockf
gp.waitreason = reason
mp.waittraceev = traceEv
mp.waittraceskip = traceskip
releasem(mp)
// can't do anything that might move the G between Ms here.
mcall(park_m)
}
- 獲取當前goroutine
- 將goroutine狀態設置為Gwaiting
- 重新調度
func park_m(gp *g) {
_g_ := getg()
if trace.enabled {
traceGoPark(_g_.m.waittraceev, _g_.m.waittraceskip)
}
casgstatus(gp, _Grunning, _Gwaiting)
dropg()
if fn := _g_.m.waitunlockf; fn != nil {
ok := fn(gp, _g_.m.waitlock)
_g_.m.waitunlockf = nil
_g_.m.waitlock = nil
if !ok {
if trace.enabled {
traceGoUnpark(gp, 2)
}
casgstatus(gp, _Gwaiting, _Grunnable)
execute(gp, true) // Schedule it back, never returns.
}
}
schedule()
}
協作式調度
- proc.go:2625 runtime.goschedImpl(協作式調度)
- 主動讓出cpu,這個情況不會掛起goroutine,而是放回隊列,等待下次調度,這個函數(GoSched)被暴露出去,可以調用,例如,線上有這種情況,寫log是異步的,但由於機器磁盤老舊性能不佳,所以當log goroutine運行時還是會過多的占用cpu,這時候可以調用GoSched適當降低當前goroutine優先級
runtime.Gosched -> runtime.gosched_m -> runtime.goschedImpl runtime.schedule
// Gosched continuation on g0.
func gosched_m(gp *g) {
if trace.enabled {
traceGoSched()
}
goschedImpl(gp)
}
- 調度保護,當調度器發現goroutine處於禁止的狀態時就會主動調度讓出cpu
// goschedguarded is a forbidden-states-avoided version of gosched_m
func goschedguarded_m(gp *g) {
if gp.m.locks != 0 || gp.m.mallocing != 0 || gp.m.preemptoff != "" || gp.m.p.ptr().status != _Prunning {
gogo(&gp.sched) // never return
}
if trace.enabled {
traceGoSched()
}
goschedImpl(gp)
}
- 發生搶占,例如當一個goroutine運行時間過長但不像等待channel那樣阻塞,一直有事情做時,其他goroutine可能會搶占cpu
func gopreempt_m(gp *g) {
if trace.enabled {
traceGoPreempt()
}
goschedImpl(gp)
}
func goschedImpl(gp *g) {
status := readgstatus(gp)
if status&^_Gscan != _Grunning {
dumpgstatus(gp)
throw("bad g status")
}
casgstatus(gp, _Grunning, _Grunnable)
dropg()
lock(&sched.lock)
globrunqput(gp)
unlock(&sched.lock)
schedule()
}
非main goroutine結束
- proc.go:2704,2727 runtime.goexit0(goroutine正常執行完)
非main goroutine結束后會繼續調度,這個是正常繼續下一次調度不做過多介紹
系統調用
- proc.go:3141 runtime.exitsyscall0(系統調用)
runtime·exitsyscall -> runtime·exitsyscall0 -> runtime.schedule
我們來看下系統調用的過程
func syscall_syscall(fn, a1, a2, a3 uintptr) (r1, r2, err uintptr) {
entersyscall()
libcCall(unsafe.Pointer(funcPC(syscall)), unsafe.Pointer(&fn))
exitsyscall()
return
}
func syscall()
首先會調用runtime.entersyscall獲取當前的指令位置和棧指針,然后調用reentersyscall做goroutine進入系統調用之前的准備
func entersyscall() {
reentersyscall(getcallerpc(), getcallersp())
}
- 禁止線程搶占防止出現棧不一致的情況
- 保證當前函數不會觸發棧調整(golang進程的棧初始2k,然后動態調整)
- 設置goroutine狀態為Gsyscall
- 將goroutine的P暫時和M分離,並且設置P狀態為Psyscall
- 釋放鎖
func reentersyscall(pc, sp uintptr) {
// 獲得當前goroutine
_g_ := getg()
_g_.m.locks++
_g_.stackguard0 = stackPreempt
_g_.throwsplit = true
// Leave SP around for GC and traceback.
save(pc, sp)
_g_.syscallsp = sp
_g_.syscallpc = pc
casgstatus(_g_, _Grunning, _Gsyscall)
if _g_.syscallsp < _g_.stack.lo || _g_.stack.hi < _g_.syscallsp {
systemstack(func() {
print("entersyscall inconsistent ", hex(_g_.syscallsp), " [", hex(_g_.stack.lo), ",", hex(_g_.stack.hi), "]\n")
throw("entersyscall")
})
}
if trace.enabled {
systemstack(traceGoSysCall)
save(pc, sp)
}
if atomic.Load(&sched.sysmonwait) != 0 {
systemstack(entersyscall_sysmon)
save(pc, sp)
}
if _g_.m.p.ptr().runSafePointFn != 0 {
systemstack(runSafePointFn)
save(pc, sp)
}
_g_.m.syscalltick = _g_.m.p.ptr().syscalltick
_g_.sysblocktraced = true
_g_.m.mcache = nil
pp := _g_.m.p.ptr()
pp.m = 0
_g_.m.oldp.set(pp)
_g_.m.p = 0
atomic.Store(&pp.status, _Psyscall)
if sched.gcwaiting != 0 {
systemstack(entersyscall_gcwait)
save(pc, sp)
}
_g_.m.locks--
}
然后就進入系統調用
...
- 獲得goroutine
- 線程加鎖
- 調exitsyscallfast替當前goroutine找一個P
- 如果原P處於Psyscall就讓這個P接管,否則的話進行2)
- 否則的話就找空閑的P,有的話就調用exitsyscall0繼續調度,否則的話進行3)
- 將goroutine設置為Grunning,加入全局隊列,調用Gosched()繼續調度
func exitsyscall() {
_g_ := getg()
_g_.m.locks++ // see comment in entersyscall
if getcallersp() > _g_.syscallsp {
throw("exitsyscall: syscall frame is no longer valid")
}
_g_.waitsince = 0
oldp := _g_.m.oldp.ptr()
_g_.m.oldp = 0
if exitsyscallfast(oldp) {
if _g_.m.mcache == nil {
throw("lost mcache")
}
if trace.enabled {
if oldp != _g_.m.p.ptr() || _g_.m.syscalltick != _g_.m.p.ptr().syscalltick {
systemstack(traceGoStart)
}
}
// There's a cpu for us, so we can run.
_g_.m.p.ptr().syscalltick++
// We need to cas the status and scan before resuming...
casgstatus(_g_, _Gsyscall, _Grunning)
// Garbage collector isn't running (since we are),
// so okay to clear syscallsp.
_g_.syscallsp = 0
_g_.m.locks--
if _g_.preempt {
// restore the preemption request in case we've cleared it in newstack
_g_.stackguard0 = stackPreempt
} else {
// otherwise restore the real _StackGuard, we've spoiled it in entersyscall/entersyscallblock
_g_.stackguard0 = _g_.stack.lo + _StackGuard
}
_g_.throwsplit = false
if sched.disable.user && !schedEnabled(_g_) {
// Scheduling of this goroutine is disabled.
Gosched()
}
return
}
_g_.sysexitticks = 0
if trace.enabled {
// Wait till traceGoSysBlock event is emitted.
// This ensures consistency of the trace (the goroutine is started after it is blocked).
for oldp != nil && oldp.syscalltick == _g_.m.syscalltick {
osyield()
}
// We can't trace syscall exit right now because we don't have a P.
// Tracing code can invoke write barriers that cannot run without a P.
// So instead we remember the syscall exit time and emit the event
// in execute when we have a P.
_g_.sysexitticks = cputicks()
}
_g_.m.locks--
// Call the scheduler.
mcall(exitsyscall0)
if _g_.m.mcache == nil {
throw("lost mcache")
}
// Scheduler returned, so we're allowed to run now.
// Delete the syscallsp information that we left for
// the garbage collector during the system call.
// Must wait until now because until gosched returns
// we don't know for sure that the garbage collector
// is not running.
_g_.syscallsp = 0
_g_.m.p.ptr().syscalltick++
_g_.throwsplit = false
}
