詳解Go語言調度循環源碼實現


轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客: https://www.luozhiyun.com/archives/448

本文使用的go的源碼15.7

概述

提到"調度",我們首先想到的就是操作系統對進程、線程的調度。操作系統調度器會將系統中的多個線程按照一定算法調度到物理CPU上去運行。雖然線程比較輕量,但是在調度時也有比較大的額外開銷。每個線程會都占用 1M 以上的內存空間,線程切換和恢復寄存器中的內容也需要向系統申請資源。

Go 語言的 Goroutine 可以看作對 thread 加的一層抽象,它更輕量級,不僅減少了上下文切換帶來的額外開銷,Goroutine 占用的資源也會更少。如創建一個 Goroutine 的棧內存消耗為 2 KB,而 thread 占用 1M 以上空間;thread 創建和銷毀是內核級的,所以都會有巨大的消耗,而 Goroutine 由 Go runtime 負責管理的,創建和銷毀的消耗非常小;Goroutine 的切換成本也比 thread 要小得多。

G M P 模型

Go 的調度器使用三個結構體來實現 Goroutine 的調度:G M P。

G:代表一個 Goroutine,每個 Goroutine 都有自己獨立的棧存放當前的運行內存及狀態。可以把一個 G 當做一個任務,當 Goroutine 被調離 CPU 時,調度器代碼負責把 CPU 寄存器的值保存在 G 對象的成員變量之中,當 Goroutine 被調度起來運行時,調度器代碼又負責把 G 對象的成員變量所保存的寄存器的值恢復到 CPU 的寄存器。

M:表示內核線程,它本身就與一個內核線程進行綁定,每個工作線程都有唯一的一個 M 結構體的實例對象與之對應。M 結構體對象除了記錄着工作線程的諸如棧的起止位置、當前正在執行的Goroutine 以及是否空閑等等狀態信息之外,還通過指針維持着與 P 結構體的實例對象之間的綁定關系。

P:代表一個虛擬的 Processor 處理器,它維護一個局部 Goroutine 可運行 G 隊列,工作線程優先使用自己的局部運行隊列,只有必要時才會去訪問全局運行隊列,這大大減少了鎖沖突,提高了工作線程的並發性。每個 G 要想真正運行起來,首先需要被分配一個 P。

除了上面三個結構體以外,還有一個存放所有Runnable 可運行 Goroutine 的容器 schedt。每個Go程序中schedt結構體只有一個實例對象,在代碼中是一個共享的全局變量,每個工作線程都可以訪問它以及它所擁有的 Goroutine 運行隊列。

下面是G、P、M以及schedt中的全局隊列的關系:

GMP

從上圖可以看出,每個 m 都綁定了一個 P,每個 P 都有一個私有的本地 Goroutine 隊列,m對應的線程從本地和全局 Goroutine 隊列中獲取 Goroutine 並運行,綠色的 G 代表正在運行的 G。

在默認情況下,運行時會將 GOMAXPROCS 設置成當前機器的核數,假設一個四核機器會創建四個活躍的操作系統線程,每一個線程都對應一個運行時中的 M。

M_bind_CPU

詳解

結構體

G M P 結構體定義於src/runtime/runtime2.go

G

type g struct { 
	// 當前 Goroutine 的棧內存范圍 [stack.lo, stack.hi)
	stack       stack 
	// 用於調度器搶占式調度  
	stackguard0 uintptr   

	_panic       *_panic  
	_defer       *_defer  
	// 當前 Goroutine 占用的線程
	m            *m       
	// 存儲 Goroutine 的調度相關的數據
	sched        gobuf 
	// Goroutine 的狀態
	atomicstatus uint32 
	// 搶占信號
	preempt       bool // preemption signal, duplicates stackguard0 = stackpreempt
	// 搶占時將狀態修改成 `_Gpreempted`
	preemptStop   bool // transition to _Gpreempted on preemption; otherwise, just deschedule
	// 在同步安全點收縮棧
	preemptShrink bool // shrink stack at synchronous safe point
	...
}

下面看看gobuf結構體,主要在調度器保存或者恢復上下文的時候用到:

type gobuf struct {
	// 棧指針
	sp   uintptr
	// 程序計數器
	pc   uintptr
	// gobuf對應的Goroutine
	g    guintptr 
	// 系統調用的返回值
	ret  sys.Uintreg
	...
}

在執行過程中,G可能處於以下幾種狀態:

const (
	//  剛剛被分配並且還沒有被初始化
	_Gidle = iota // 0 
	// 沒有執行代碼,沒有棧的所有權,存儲在運行隊列中
	_Grunnable // 1 
	// 可以執行代碼,擁有棧的所有權,被賦予了內核線程 M 和處理器 P
	_Grunning // 2 
	// 正在執行系統調用,擁有棧的所有權,沒有執行用戶代碼,
	// 被賦予了內核線程 M 但是不在運行隊列上
	_Gsyscall // 3 
	// 由於運行時而被阻塞,沒有執行用戶代碼並且不在運行隊列上,
	// 但是可能存在於 Channel 的等待隊列上
	_Gwaiting // 4  
	// 表示當前goroutine沒有被使用,沒有執行代碼,可能有分配的棧
	_Gdead // 6  
	// 棧正在被拷貝,沒有執行代碼,不在運行隊列上
	_Gcopystack // 8 
	// 由於搶占而被阻塞,沒有執行用戶代碼並且不在運行隊列上,等待喚醒
	_Gpreempted // 9 
	// GC 正在掃描棧空間,沒有執行代碼,可以與其他狀態同時存在
	_Gscan          = 0x1000 
	...
)

上面的狀態看起來很多,但是實際上只需要關注下面幾種就好了:

  • 等待中:_ Gwaiting、_Gsyscall 和 _Gpreempted,這幾個狀態表示G沒有在執行;
  • 可運行:_Grunnable,表示G已經准備就緒,可以在線程運行;
  • 運行中:_Grunning,表示G正在運行;

M

Go 語言並發模型中的 M 是操作系統線程,最多只會有 GOMAXPROCS 個活躍線程能夠正常運行。

type m struct {
	// 持有調度棧的 Goroutine
	g0      *g       
	// 處理 signal 的 G
	gsignal       *g           
	// 線程本地存儲 thread-local
	tls           [6]uintptr   // thread-local storage (for x86 extern register)
	// 當前運行的G
	curg          *g       // current running goroutine
	caughtsig     guintptr // goroutine running during fatal signal
	// 正在運行代碼的P
	p             puintptr // attached p for executing go code (nil if not executing go code)
	nextp         puintptr
	// 之前使用的P
	oldp          puintptr  
	...
}

P

調度器中的處理器 P 是線程 M 和 G 的中間層,用於調度 G 在 M 上執行。

type p struct {
	id          int32
	// p 的狀態
	status      uint32  
    // 調度器調用會+1
	schedtick   uint32     // incremented on every scheduler call
    // 系統調用會+1
	syscalltick uint32     // incremented on every system call
	// 對應關聯的 M
	m           muintptr    
	mcache      *mcache
	pcache      pageCache 
	// defer 結構池
	deferpool    [5][]*_defer  
	deferpoolbuf [5][32]*_defer  
	// 可運行的 Goroutine 隊列,可無鎖訪問
	runqhead uint32
	runqtail uint32
	runq     [256]guintptr
	// 緩存可立即執行的 G
	runnext guintptr 
	// 可用的 G 列表,G 狀態等於 Gdead 
	gFree struct {
		gList
		n int32
	}
	...
}

下面看看P的幾個狀態:

const ( 
	// 表示P沒有運行用戶代碼或者調度器 
	_Pidle = iota 
	// 被線程 M 持有,並且正在執行用戶代碼或者調度器
	_Prunning 
	// 沒有執行用戶代碼,當前線程陷入系統調用
	_Psyscall
	// 被線程 M 持有,當前處理器由於垃圾回收 STW 被停止
	_Pgcstop 
	// 當前處理器已經不被使用
	_Pdead
)

sched

sched 我們在上面也提到了,主要存放了調度器持有的全局資源,如空閑的 P 鏈表、 G 的全局隊列等。

type schedt struct {
	...
	lock mutex 
	// 空閑的 M 列表
	midle        muintptr  
	// 空閑的 M 列表數量
	nmidle       int32      
	// 下一個被創建的 M 的 id
	mnext        int64  
	// 能擁有的最大數量的 M  
	maxmcount    int32    
	// 空閑 p 鏈表
	pidle      puintptr // idle p's
	// 空閑 p 數量
	npidle     uint32
	// 處於 spinning 狀態的 M 的數量
	nmspinning uint32   
	// 全局 runnable G 隊列
	runq     gQueue
	runqsize int32  
	// 有效 dead G 的全局緩存.
	gFree struct {
		lock    mutex
		stack   gList // Gs with stacks
		noStack gList // Gs without stacks
		n       int32
	} 
	// sudog 結構的集中緩存
	sudoglock  mutex
	sudogcache *sudog 
	// defer 結構的池
	deferlock mutex
	deferpool [5]*_defer 
	...
}

從Go程序啟動講起

這里還是借助dlv來進行調試。有關 dlv 如何斷點匯編的內容我在這一篇:https://www.luozhiyun.com/archives/434 《詳解Go中內存分配源碼實現》已經有很詳細的介紹了,感興趣的可以去看看。需要注意的是這里有個坑,下面的例子是在Linux中進行的。

首先我們寫一個非常簡單的例子:

package main

import "fmt"

func main() {
	fmt.Println("hello world")
}

然后進行構建:

go build main.go
dlv exec ./main

開打程序后按步驟輸入下面的命令:

(dlv) r
Process restarted with PID 33191
(dlv) list
> _rt0_amd64_linux() /usr/local/go/src/runtime/rt0_linux_amd64.s:8 (PC: 0x4648c0)
Warning: debugging optimized function
Warning: listing may not match stale executable
     3: // license that can be found in the LICENSE file.
     4:
     5: #include "textflag.h"
     6:
     7: TEXT _rt0_amd64_linux(SB),NOSPLIT,$-8
=>   8:         JMP     _rt0_amd64(SB)
     9:
    10: TEXT _rt0_amd64_linux_lib(SB),NOSPLIT,$0
    11:         JMP     _rt0_amd64_lib(SB) 
(dlv) si
> _rt0_amd64() /usr/local/go/src/runtime/asm_amd64.s:15 (PC: 0x4613e0)
Warning: debugging optimized function
Warning: listing may not match stale executable
    10: // _rt0_amd64 is common startup code for most amd64 systems when using
    11: // internal linking. This is the entry point for the program from the
    12: // kernel for an ordinary -buildmode=exe program. The stack holds the
    13: // number of arguments and the C-style argv.
    14: TEXT _rt0_amd64(SB),NOSPLIT,$-8
=>  15:         MOVQ    0(SP), DI       // argc
    16:         LEAQ    8(SP), SI       // argv
    17:         JMP     runtime·rt0_go(SB)
    18:
    19: // main is common startup code for most amd64 systems when using
    20: // external linking. The C startup code will call the symbol "main"
(dlv)

通過上面的斷點可以知道在linux amd64系統的啟動函數是在asm_amd64.s的runtime·rt0_go函數中。當然,不同的平台有不同的程序入口,感興趣的同學可以自行去了解。

下面我們看看runtime·rt0_go

TEXT runtime·rt0_go(SB),NOSPLIT,$0
	...
	// 初始化執行文件的絕對路徑
	CALL	runtime·args(SB)
	// 初始化 CPU 個數和內存頁大小
	CALL	runtime·osinit(SB)
	// 調度器初始化
	CALL	runtime·schedinit(SB) 
	// 創建一個新的 goroutine 來啟動程序
	MOVQ	$runtime·mainPC(SB), AX		// entry
	// 新建一個 goroutine,該 goroutine 綁定 runtime.main
	CALL	runtime·newproc(SB) 
	// 啟動M,開始調度goroutine
	CALL	runtime·mstart(SB)
	...

上面的CALL方法中:

schedinit進行各種運行時組件初始化工作,這包括我們的調度器與內存分配器、回收器的初始化;

newproc負責根據主 G 入口地址創建可被運行時調度的執行單元;

mstart開始啟動調度器的調度循環;

調度初始化 runtime.schedinit

func schedinit() {
	...
	_g_ := getg()
	...
	// 最大線程數10000
	sched.maxmcount = 10000 
	// M0 初始化
	mcommoninit(_g_.m, -1)
	...	  
    // 垃圾回收器初始化
	gcinit()

	sched.lastpoll = uint64(nanotime())
    // 通過 CPU 核心數和 GOMAXPROCS 環境變量確定 P 的數量
	procs := ncpu
	if n, ok := atoi32(gogetenv("GOMAXPROCS")); ok && n > 0 {
		procs = n
	}
	// P 初始化
	if procresize(procs) != nil {
		throw("unknown runnable goroutine during bootstrap")
	}
    ...
}

schedinit函數會將 maxmcount 設置成10000,這也就是一個 Go 語言程序能夠創建的最大線程數。然后調用 mcommoninit 對 M0 進行初始化,通過 CPU 核心數和 GOMAXPROCS 環境變量確定 P 的數量之后就會調用 procresize 函數對 P 進行初始化。

M0 初始化

func mcommoninit(mp *m, id int64) {
	_g_ := getg()
	...
	lock(&sched.lock)
	// 如果傳入id小於0,那么id則從mReserveID獲取,初次從mReserveID獲取id為0
	if id >= 0 {
		mp.id = id
	} else {
		mp.id = mReserveID()
	}
	//random初始化,用於竊取 G
	mp.fastrand[0] = uint32(int64Hash(uint64(mp.id), fastrandseed))
	mp.fastrand[1] = uint32(int64Hash(uint64(cputicks()), ^fastrandseed))
	if mp.fastrand[0]|mp.fastrand[1] == 0 {
		mp.fastrand[1] = 1
	}
	// 創建用於信號處理的gsignal,只是簡單的從堆上分配一個g結構體對象,然后把棧設置好就返回了
	mpreinit(mp)
	if mp.gsignal != nil {
		mp.gsignal.stackguard1 = mp.gsignal.stack.lo + _StackGuard
	}

	// 把 M 掛入全局鏈表allm之中
	mp.alllink = allm
	...
}

這里傳入的 id 是-1,初次調用會將 id 設置為 0,這里並未對m0做什么關於調度相關的初始化,所以可以簡單的認為這個函數只是把m0放入全局鏈表allm之中就返回了。

P 初始化

runtime.procresize

var allp       []*p 

func procresize(nprocs int32) *p {
	// 獲取先前的 P 個數
	old := gomaxprocs
	// 更新統計信息
	now := nanotime()
	if sched.procresizetime != 0 {
		sched.totaltime += int64(old) * (now - sched.procresizetime)
	}
	sched.procresizetime = now
	// 根據 runtime.MAXGOPROCS 調整 p 的數量,因為 runtime.MAXGOPROCS 用戶可以自行設定
	if nprocs > int32(len(allp)) { 
		lock(&allpLock)
		if nprocs <= int32(cap(allp)) {
			allp = allp[:nprocs]
		} else {
			nallp := make([]*p, nprocs) 
			copy(nallp, allp[:cap(allp)])
			allp = nallp
		}
		unlock(&allpLock)
	}
 
	// 初始化新的 P
	for i := old; i < nprocs; i++ {
		pp := allp[i]
		// 為空,則申請新的 P 對象
		if pp == nil {
			pp = new(p)
		}
		pp.init(i)
		atomicstorep(unsafe.Pointer(&allp[i]), unsafe.Pointer(pp))
	}

	_g_ := getg()
	// P 不為空,並且 id 小於 nprocs ,那么可以繼續使用當前 P
	if _g_.m.p != 0 && _g_.m.p.ptr().id < nprocs {
		// continue to use the current P
		_g_.m.p.ptr().status = _Prunning
		_g_.m.p.ptr().mcache.prepareForSweep()
	} else { 
		// 釋放當前 P,因為已失效
		if _g_.m.p != 0 { 
			_g_.m.p.ptr().m = 0
		}
		_g_.m.p = 0
		p := allp[0]
		p.m = 0
		p.status = _Pidle
		// P0 綁定到當前的 M0
		acquirep(p) 
	}
	// 從未使用的 P 釋放資源
	for i := nprocs; i < old; i++ {
		p := allp[i]
		p.destroy() 
		// 不能釋放 p 本身,因為他可能在 m 進入系統調用時被引用
	}
	// 釋放完 P 之后重置allp的長度
	if int32(len(allp)) != nprocs {
		lock(&allpLock)
		allp = allp[:nprocs]
		unlock(&allpLock)
	}
	var runnablePs *p
	// 將沒有本地任務的 P 放到空閑鏈表中
	for i := nprocs - 1; i >= 0; i-- {
		p := allp[i]
		// 當前正在使用的 P 略過
		if _g_.m.p.ptr() == p {
			continue
		}
		// 設置狀態為 _Pidle 
		p.status = _Pidle
		// P 的任務列表是否為空
		if runqempty(p) {
			// 放入到空閑列表中
			pidleput(p)
		} else {
			// 獲取空閑 M 綁定到 P 上
			p.m.set(mget())
            // 
			p.link.set(runnablePs)
			runnablePs = p
		}
	}
	stealOrder.reset(uint32(nprocs))
	var int32p *int32 = &gomaxprocs // make compiler check that gomaxprocs is an int32
	atomic.Store((*uint32)(unsafe.Pointer(int32p)), uint32(nprocs))
	return runnablePs
}

procresize方法的執行過程如下:

  1. allp 是全局變量 P 的資源池,如果 allp 的切片中的處理器數量少於期望數量,會對切片進行擴容;
  2. 擴容的時候會使用 new 申請一個新的 P ,然后使用 init 初始化,需要注意的是初始化的 P 的 id 就是傳入的 i 的值,狀態為 _Pgcstop;
  3. 然后通過 _g_.m.p 獲取 M0,如果 M0 已與有效的 P 綁定上,則將 被綁定的 P 的狀態修改為 _Prunning。否則獲取 allp[0] 作為 P0 調用 runtime.acquirep 與 M0 進行綁定;
  4. 超過處理器個數的 P 通過p.destroy釋放資源,p.destroy會將與 P 相關的資源釋放,並將 P 狀態設置為 _Pdead;
  5. 通過截斷改變全局變量 allp 的長度保證與期望處理器數量相等;
  6. 遍歷 allp 檢查 P 的是否處於空閑狀態,是的話放入到空閑列表中;

P.init

func (pp *p) init(id int32) {
	// 設置id
	pp.id = id
	// 設置狀態為 _Pgcstop
	pp.status = _Pgcstop
	// 與 sudog 相關
	pp.sudogcache = pp.sudogbuf[:0]
	for i := range pp.deferpool {
		pp.deferpool[i] = pp.deferpoolbuf[i][:0]
	}
	pp.wbBuf.reset()
	// mcache 初始化
	if pp.mcache == nil {
		if id == 0 {
			if mcache0 == nil {
				throw("missing mcache?")
			} 
			pp.mcache = mcache0
		} else {
			pp.mcache = allocmcache()
		}
	}
	...
	lockInit(&pp.timersLock, lockRankTimers)
}

這里會初始化一些 P 的字段值,如設置 id、status、sudogcache、mcache、lock相關 。

初始化 sudogcache 這個字段存的是 sudog 的集合與 Channel 相關,可以看這里:多圖詳解Go中的Channel源碼 https://www.luozhiyun.com/archives/427。

每個 P 中會保存相應的 mcache ,能快速的進行分配微對象和小對象的分配,具體的可以看這里:詳解Go中內存分配源碼實現 https://www.luozhiyun.com/archives/434。

下面再來看看 runtime.acquirep 是如何將 P 與 M 綁定的:

runtime.acquirep

func acquirep(_p_ *p) { 
	wirep(_p_)
	...
}

func wirep(_p_ *p) {
	_g_ := getg()

	...
	// 將 P 與 M 相互綁定
	_g_.m.p.set(_p_)
	_p_.m.set(_g_.m)
	// 設置 P 狀態為 _Prunning
	_p_.status = _Prunning
}

這個方法十分簡單,就不解釋了。下面再看看 runtime.pidleput將 P 放入空閑列表:

func pidleput(_p_ *p) {
	// 如果 P 運行隊列不為空,那么不能放入空閑列表
	if !runqempty(_p_) {
		throw("pidleput: P has non-empty run queue")
	}
	// 將 P 與 pidle 列表關聯
	_p_.link = sched.pidle
	sched.pidle.set(_p_)
	atomic.Xadd(&sched.npidle, 1) // TODO: fast atomic
}

G 初始化

從匯編可以知道執行完runtime·schedinit后就會執行 runtime.newproc是創建G的入口。

runtime.newproc

func newproc(siz int32, fn *funcval) {
	argp := add(unsafe.Pointer(&fn), sys.PtrSize)
	// 獲取當前的 G 
	gp := getg()
	// 獲取調用者的程序計數器 PC
	pc := getcallerpc() 
	systemstack(func() {
		// 獲取新的 G 結構體
		newg := newproc1(fn, argp, siz, gp, pc)
		_p_ := getg().m.p.ptr()
        // 將 G 加入到 P 的運行隊列
		runqput(_p_, newg, true)
		// mainStarted 為 True 表示主M已經啟動
		if mainStarted {
			// 喚醒新的  P 執行 G
			wakep()
		}
	})
}

runtime.newproc會獲取 當前 G 以及調用方的程序計數器,然后調用 newproc1 獲取新的 G 結構體;然后將 G 放入到 P 的 runnext 字段中。

runtime.newproc1

func newproc1(fn *funcval, argp unsafe.Pointer, narg int32, callergp *g, callerpc uintptr) *g {
	_g_ := getg()

	if fn == nil {
		_g_.m.throwing = -1 // do not dump full stacks
		throw("go of nil func value")
	}
	// 加鎖,禁止 G 的 M 被搶占
	acquirem() // disable preemption because it can be holding p in a local var
	siz := narg
	siz = (siz + 7) &^ 7 

	_p_ := _g_.m.p.ptr()
	// 從 P 的空閑列表 gFree 查找空閑 G
	newg := gfget(_p_)
	if newg == nil {
		// 創建一個棧大小為 2K 大小的 G
		newg = malg(_StackMin)
		// CAS 改變 G 狀態為 _Gdead
		casgstatus(newg, _Gidle, _Gdead)
		// 將 G 加入到全局 allgs 列表中
		allgadd(newg) // publishes with a g->status of Gdead so GC scanner doesn't look at uninitialized stack.
	}
	...
	// 計算運行空間大小
	totalSize := 4*sys.RegSize + uintptr(siz) + sys.MinFrameSize // extra space in case of reads slightly beyond frame
	totalSize += -totalSize & (sys.SpAlign - 1)                  // align to spAlign
	sp := newg.stack.hi - totalSize
	spArg := sp
	...
	if narg > 0 {
		// 從 argp 參數開始的位置,復制 narg 個字節到 spArg(參數拷貝)
		memmove(unsafe.Pointer(spArg), argp, uintptr(narg))
		...
	}
	// 清理、創建並初始化的 G
	memclrNoHeapPointers(unsafe.Pointer(&newg.sched), unsafe.Sizeof(newg.sched))
	newg.sched.sp = sp
	newg.stktopsp = sp
	newg.sched.pc = funcPC(goexit) + sys.PCQuantum // +PCQuantum so that previous instruction is in same function
	newg.sched.g = guintptr(unsafe.Pointer(newg))
	gostartcallfn(&newg.sched, fn)
	newg.gopc = callerpc
	newg.ancestors = saveAncestors(callergp)
	newg.startpc = fn.fn
	if _g_.m.curg != nil {
		newg.labels = _g_.m.curg.labels
	}
	if isSystemGoroutine(newg, false) {
		atomic.Xadd(&sched.ngsys, +1)
	}
	// 將 G 狀態CAS為 _Grunnable 狀態
	casgstatus(newg, _Gdead, _Grunnable) 
	newg.goid = int64(_p_.goidcache)
	_p_.goidcache++
	...
	// 釋放鎖,對應上面 acquirem
	releasem(_g_.m)

	return newg
}

newproc1函數比較長,下面總結一下主要做了哪幾件事:

  1. 從 P 的空閑列表 gFree 查找空閑 G;
  2. 如果獲取不到 G ,那么調用 malg 創建創建一個新的 G ,需要注意的是 _StackMin 為2048,表示創建的 G 的棧上內存占用為2K。然后 CAS 改變 G 狀態為 _Gdead,並加入到全局 allgs 列表中;
  3. 根據要執行函數的入口地址和參數,初始化執行棧的 SP 和參數的入棧位置,調用 memmove 進行參數拷貝;
  4. 清理、創建並初始化的 G,將 G 狀態CAS為 _Grunnable 狀態,返回;

下面看看 runtime.gfget是如何查找 G:

runtime.gfget

func gfget(_p_ *p) *g {
	retry:
		// 如果 P 的空閑列表 gFree 為空,sched 的的空閑列表 gFree 不為空
		if _p_.gFree.empty() && (!sched.gFree.stack.empty() || !sched.gFree.noStack.empty()) {
			lock(&sched.gFree.lock) 
			// 從sched 的 gFree 列表中移動 32 個到 P 的 gFree 中
			for _p_.gFree.n < 32 { 
				gp := sched.gFree.stack.pop()
				if gp == nil {
					gp = sched.gFree.noStack.pop()
					if gp == nil {
						break
					}
				}
				sched.gFree.n--
				_p_.gFree.push(gp)
				_p_.gFree.n++
			}
			unlock(&sched.gFree.lock)
			goto retry
		}
		// 此時如果 gFree 列表還是為空,返回空 
		gp := _p_.gFree.pop()
		if gp == nil {
			return nil
		}
		...
		return gp
}
  1. 當 P 的空閑列表 gFree 為空時會從 sched 持有的空閑列表 gFree 轉移32個 G 到當前的 P 的空閑列表上;
  2. 然后從 P 的 gFree 列表頭返回一個 G;

當 newproc 運行完 newproc1 后會調用 runtime.runqput將 G 放入到運行列表中:

runtime.runqput

func runqput(_p_ *p, gp *g, next bool) {
	if randomizeScheduler && next && fastrand()%2 == 0 {
		next = false
	} 
	if next {
	retryNext:
	// 將 G 放入到 runnext 中作為下一個處理器執行的任務
		oldnext := _p_.runnext
		if !_p_.runnext.cas(oldnext, guintptr(unsafe.Pointer(gp))) {
			goto retryNext
		}
		if oldnext == 0 {
			return
		} 
		// 將原來 runnext 的 G 放入到運行隊列中
		gp = oldnext.ptr()
	}

retry:
	h := atomic.LoadAcq(&_p_.runqhead)  
	t := _p_.runqtail
	// 放入到 P 本地運行隊列中
	if t-h < uint32(len(_p_.runq)) {
		_p_.runq[t%uint32(len(_p_.runq))].set(gp)
		atomic.StoreRel(&_p_.runqtail, t+1)  
		return
	}
	// P 本地隊列放不下了,放入到全局的運行隊列中
	if runqputslow(_p_, gp, h, t) {
		return
	} 
	goto retry
}
  1. runtime.runqput會根據 next 來判斷是否要將 G 放入到 runnext 中;

  2. next 為 false 的時候會將傳入的 G 嘗試放入到本地隊列中,本地隊列時一個大小為256的環形鏈表,如果放不下了則調用 runqputslow函數將 G 放入到全局隊列的 runq 中。

runq

調度循環

我們繼續回到runtime·rt0_go中,在初始化工作完成后,會調用runtime·mstart開始調度 G

TEXT runtime·rt0_go(SB),NOSPLIT,$0
	...
	// 初始化執行文件的絕對路徑
	CALL	runtime·args(SB)
	// 初始化 CPU 個數和內存頁大小
	CALL	runtime·osinit(SB)
	// 調度器初始化
	CALL	runtime·schedinit(SB) 
	// 創建一個新的 goroutine 來啟動程序
	MOVQ	$runtime·mainPC(SB), AX		// entry
	// 新建一個 goroutine,該 goroutine 綁定 runtime.main
	CALL	runtime·newproc(SB) 
	// 啟動M,開始調度goroutine
	CALL	runtime·mstart(SB)
	...

runtime·mstart會調用到runtime·mstart1會初始化 M0 並調用runtime.schedule進入調度循環。

mstart

func mstart1() {
	_g_ := getg()

	if _g_ != _g_.m.g0 {
		throw("bad runtime·mstart")
	} 
	// 一旦調用 schedule 就不會返回,所以需要保存一下棧幀
	save(getcallerpc(), getcallersp())
	asminit()
	minit() 
	// 設置信號 handler
	if _g_.m == &m0 {
		mstartm0()
	}
	// 執行啟動函數
	if fn := _g_.m.mstartfn; fn != nil {
		fn()
	}
	// 如果當前 m 並非 m0,則要求綁定 p
	if _g_.m != &m0 {
		acquirep(_g_.m.nextp.ptr())
		_g_.m.nextp = 0
	}
	// 開始調度
	schedule()
}

mstart1保存調度信息后,會調用schedule進入調度循環,尋找一個可執行的 G 並執行。下面看看schedule執行函數。

schedule

func schedule() {
	_g_ := getg()

	if _g_.m.locks != 0 {
		throw("schedule: holding locks")
	} 
	... 
top:
	pp := _g_.m.p.ptr()
	pp.preempt = false
	// GC 等待
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	// 不等於0,說明在安全點
	if pp.runSafePointFn != 0 {
		runSafePointFn()
	}

	// 如果在 spinning ,那么運行隊列應該為空,
	if _g_.m.spinning && (pp.runnext != 0 || pp.runqhead != pp.runqtail) {
		throw("schedule: spinning with local work")
	}
	// 運行 P 上准備就緒的 Timer
	checkTimers(pp, 0)

	var gp *g
	var inheritTime bool 
	...
	if gp == nil { 
		// 為了公平,每調用 schedule 函數 61 次就要從全局可運行 G 隊列中獲取
		if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
			lock(&sched.lock)
			// 從全局隊列獲取1個 G
			gp = globrunqget(_g_.m.p.ptr(), 1)
			unlock(&sched.lock)
		}
	}
	// 從 P 本地獲取 G 任務
	if gp == nil {
		gp, inheritTime = runqget(_g_.m.p.ptr()) 
	}
	// 運行到這里表示從本地運行隊列和全局運行隊列都沒有找到需要運行的 G
	if gp == nil {
		// 阻塞地查找可用 G
		gp, inheritTime = findrunnable() // blocks until work is available
	}
	...
	// 執行 G 任務函數
	execute(gp, inheritTime)
}

在這個函數中,我們只關注調度有關的代碼。從上面的代碼可以知道主要是從下面幾個方向去尋找可用的 G:

  1. 為了保證公平,當全局運行隊列中有待執行的 G 時,通過對 schedtick 取模 61 ,表示調度器每調度 61 次的時候,都會嘗試從全局隊列里取出待運行的 G 來運行;
  2. 調用 runqget 從 P 本地的運行隊列中查找待執行的 G;
  3. 如果前兩種方法都沒有找到 G ,會通過 findrunnable 函數去其他 P 里面去“偷”一些 G 來執行,如果“偷”不到,就阻塞直到有可運行的 G;

全局隊列獲取 G

func globrunqget(_p_ *p, max int32) *g {
	// 如果全局隊列中沒有 G 直接返回
	if sched.runqsize == 0 {
		return nil
	}
	// 計算 n 的個數
	n := sched.runqsize/gomaxprocs + 1
	if n > sched.runqsize {
		n = sched.runqsize
	}
	// n 的最大個數
	if max > 0 && n > max {
		n = max
	}
	if n > int32(len(_p_.runq))/2 {
		n = int32(len(_p_.runq)) / 2
	}

	sched.runqsize -= n
	// 拿到全局隊列隊頭 G
	gp := sched.runq.pop()
	n--
	// 將其余 n-1 個 G 從全局隊列放入本地隊列
	for ; n > 0; n-- {
		gp1 := sched.runq.pop()
		runqput(_p_, gp1, false)
	}
	return gp
}

globrunqget 會從全局 runq 隊列中獲取 n 個 G ,其中第一個 G 用於執行,n-1 個 G 從全局隊列放入本地隊列。

本地隊列獲取 G

func runqget(_p_ *p) (gp *g, inheritTime bool) {
	// 如果 runnext 不為空,直接獲取返回
	for {
		next := _p_.runnext
		if next == 0 {
			break
		}
		if _p_.runnext.cas(next, 0) {
			return next.ptr(), true
		}
	}
	// 從本地隊列頭指針遍歷本地隊列
	for {
		h := atomic.LoadAcq(&_p_.runqhead)  
		t := _p_.runqtail
		// 表示本地隊列為空
		if t == h {
			return nil, false
		}
		gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
		if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
			return gp, false
		}
	}
}

本地隊列的獲取會先從 P 的 runnext 字段中獲取,如果不為空則直接返回。如果 runnext 為空,那么從本地隊列頭指針遍歷本地隊列,本地隊列是一個環形隊列,方便復用。

任務竊取 G

任務竊取方法 findrunnable 非常的復雜,足足有300行之多,我們慢慢來分析:

func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()
top:
	_p_ := _g_.m.p.ptr()
	// 如果在 GC,則休眠當前 M,直到復始后回到 top
	if sched.gcwaiting != 0 {
		gcstopm()
		goto top
	}
	// 運行到安全點
	if _p_.runSafePointFn != 0 {
		runSafePointFn()
	}

	now, pollUntil, _ := checkTimers(_p_, 0)
	...
	// 從本地 P 的可運行隊列獲取 G
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime
	}

	// 從全局的可運行隊列獲取 G
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	} 
	// 從I/O輪詢器獲取 G
	if netpollinited() && atomic.Load(&netpollWaiters) > 0 && atomic.Load64(&sched.lastpoll) != 0 {
		// 嘗試從netpoller獲取Glist
		if list := netpoll(0); !list.empty() { // non-blocking
			gp := list.pop()
			//將其余隊列放入 P 的可運行G隊列
			injectglist(&list)
			casgstatus(gp, _Gwaiting, _Grunnable)
			if trace.enabled {
				traceGoUnpark(gp, 0)
			}
			return gp, false
		}
	}
	...
	if !_g_.m.spinning {
		// 設置 spinning ,表示正在竊取 G
		_g_.m.spinning = true
		atomic.Xadd(&sched.nmspinning, 1)
	}
	// 開始竊取
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			// 如果 i>2 表示如果其他 P 運行隊列中沒有 G ,將要從其他隊列的 runnext 中獲取
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			// 隨機獲取一個 P
			p2 := allp[enum.position()]
			if _p_ == p2 {
				continue
			}
			// 從其他 P 的運行隊列中獲取一般的 G 到當前隊列中
			if gp := runqsteal(_p_, p2, stealRunNextG); gp != nil {
				return gp, false
			}

			// 如果運行隊列中沒有 G,那么從 timers 中獲取可執行的定時器
			if i > 2 || (i > 1 && shouldStealTimers(p2)) {
				tnow, w, ran := checkTimers(p2, now)
				now = tnow
				if w != 0 && (pollUntil == 0 || w < pollUntil) {
					pollUntil = w
				}
				if ran {
					if gp, inheritTime := runqget(_p_); gp != nil {
						return gp, inheritTime
					}
					ranTimer = true
				}
			}
		}
	}
	if ranTimer {
		goto top
	}

stop: 
	// 處於 GC 階段的話,獲取執行GC標記任務的G
	if gcBlackenEnabled != 0 && _p_.gcBgMarkWorker != 0 && gcMarkWorkAvailable(_p_) {
		_p_.gcMarkWorkerMode = gcMarkWorkerIdleMode
		gp := _p_.gcBgMarkWorker.ptr()
		//將本地 P 的 GC 標記專用 G 職位 Grunnable
		casgstatus(gp, _Gwaiting, _Grunnable)
		if trace.enabled {
			traceGoUnpark(gp, 0)
		}
		return gp, false
	}

	...
	// 放棄當前的 P 之前,對 allp 做一個快照
	allpSnapshot := allp

	// return P and block
	lock(&sched.lock)
	// 進入了 gc,回到頂部並阻塞
	if sched.gcwaiting != 0 || _p_.runSafePointFn != 0 {
		unlock(&sched.lock)
		goto top
	}
	// 全局隊列中又發現了任務
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false
	}
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
	// 將 p 放入 idle 空閑鏈表
	pidleput(_p_)
	unlock(&sched.lock)
 
	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
		// M 即將睡眠,狀態不再是 spinning
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}
	}
 
	// 休眠之前再次檢查全局 P 列表
	//遍歷全局 P 列表的 P,並檢查他們的可運行G隊列
	for _, _p_ := range allpSnapshot {
		// 如果這時本地隊列不空
		if !runqempty(_p_) {
			lock(&sched.lock)
			// 重新獲取 P
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				// M 綁定 P
				acquirep(_p_)
				if wasSpinning {
					// spinning 重新切換為 true
					_g_.m.spinning = true
					atomic.Xadd(&sched.nmspinning, 1)
				}
				// 這時候是有 work 的,回到頂部尋找 G
				goto top
			}
			break
		}
	}
 
	// 休眠前再次檢查 GC work
	if gcBlackenEnabled != 0 && gcMarkWorkAvailable(nil) {
		lock(&sched.lock)
		_p_ = pidleget()
		if _p_ != nil && _p_.gcBgMarkWorker == 0 {
			pidleput(_p_)
			_p_ = nil
		}
		unlock(&sched.lock)
		if _p_ != nil {
			acquirep(_p_)
			if wasSpinning {
				_g_.m.spinning = true
				atomic.Xadd(&sched.nmspinning, 1)
			}
			// Go back to idle GC check.
			goto stop
		}
	}

	// poll network
	// 休眠前再次檢查 poll 網絡
	if netpollinited() && (atomic.Load(&netpollWaiters) > 0 || pollUntil != 0) && atomic.Xchg64(&sched.lastpoll, 0) != 0 {
		...
		lock(&sched.lock)
		_p_ = pidleget()
		unlock(&sched.lock)
		if _p_ == nil {
			injectglist(&list)
		} else {
			acquirep(_p_)
			if !list.empty() {
				gp := list.pop()
				injectglist(&list)
				casgstatus(gp, _Gwaiting, _Grunnable)
				if trace.enabled {
					traceGoUnpark(gp, 0)
				}
				return gp, false
			}
			if wasSpinning {
				_g_.m.spinning = true
				atomic.Xadd(&sched.nmspinning, 1)
			}
			goto top
		}
	} else if pollUntil != 0 && netpollinited() {
		pollerPollUntil := int64(atomic.Load64(&sched.pollUntil))
		if pollerPollUntil == 0 || pollerPollUntil > pollUntil {
			netpollBreak()
		}
	}
	// 休眠當前 M
	stopm()
	goto top
}

這個函數需要注意一下,工作線程M的自旋狀態(spinning)。工作線程在從其它工作線程的本地運行隊列中盜取 G 時的狀態稱為自旋狀態。有關netpoller的知識可以到這里看:詳解Go語言I/O多路復用netpoller模型 https://www.luozhiyun.com/archives/439。

下面我們看一下 findrunnable 做了什么:

  1. 首先檢查是是否正在進行 GC,如果是則暫止當前的 M 並阻塞休眠;

  2. 從本地運行隊列、全局運行隊列中查找 G;

  3. 從網絡輪詢器中查找是否有 G 等待運行;

  4. 將 spinning 設置為 true 表示開始竊取 G。竊取過程用了兩個嵌套for循環,內層循環遍歷 allp 中的所有 P ,查看其運行隊列是否有 G,如果有,則取其一半到當前工作線程的運行隊列,然后從 findrunnable 返回,如果沒有則繼續遍歷下一個 P 。需要注意的是,遍歷 allp 時是從隨機位置上的 P 開始,防止每次遍歷時使用同樣的順序訪問allp中的元素;

  5. 所有的可能性都嘗試過了,在准備休眠 M 之前,還要進行額外的檢查;

  6. 首先檢查此時是否是 GC mark 階段,如果是,則直接返回 mark 階段的 G;

  7. 休眠之前再次檢查全局 P 列表,遍歷全局 P 列表的 P,並檢查他們的可運行G隊列;

  8. 還需要再檢查是否有 GC mark 的 G 出現,如果有,獲取 P 並回到第一步,重新執行偷取工作;

  9. 再檢查是否存在 poll 網絡的 G,如果有,則直接返回;

  10. 什么都沒找到,那么休眠當前的 M ;

任務執行

schedule 運行到到這里表示終於找到了可以運行的 G:

func execute(gp *g, inheritTime bool) {
	_g_ := getg()

	// 將 G 綁定到當前 M 上
	_g_.m.curg = gp
	gp.m = _g_.m
	// 將 g 正式切換為 _Grunning 狀態
	casgstatus(gp, _Grunnable, _Grunning)
	gp.waitsince = 0
	// 搶占信號
	gp.preempt = false
	gp.stackguard0 = gp.stack.lo + _StackGuard
	if !inheritTime {
		// 調度器調度次數增加 1
		_g_.m.p.ptr().schedtick++
	} 
	... 
    // gogo 完成從 g0 到 gp 真正的切換
	gogo(&gp.sched)
}

當開始執行 execute 后,G 會被切換到 _Grunning 狀態,並將 M 和 G 進行綁定,最終調用 runtime.gogo 開始執行。

runtime.gogo 中會從 runtime.gobuf 中取出 runtime.goexit 的程序計數器和待執行函數的程序計數器,然后跳轉到 runtime.goexit 中並執行:

TEXT runtime·goexit(SB),NOSPLIT,$0-0
	CALL	runtime·goexit1(SB)
	
func goexit1() {
    // 調用goexit0函數 
	mcall(goexit0)
}

goexit1 通過 mcall 完成 goexit0 的調用 :

func goexit0(gp *g) {
	_g_ := getg()
	// 設置當前 G 狀態為 _Gdead
	casgstatus(gp, _Grunning, _Gdead) 
	// 清理 G
	gp.m = nil
	...
	gp.writebuf = nil
	gp.waitreason = 0
	gp.param = nil
	gp.labels = nil
	gp.timer = nil
 
	// 解綁 M 和 G
	dropg() 
	...
	// 將 G 扔進 gfree 鏈表中等待復用
	gfput(_g_.m.p.ptr(), gp)
	// 再次進行調度
	schedule()
}

goexit0 會對 G 進行復位操作,解綁 M 和 G 的關聯關系,將其 放入 gfree 鏈表中等待其他的 go 語句創建新的 g。在最后,goexit0 會重新調用 schedule觸發新一輪的調度。

execute

總結

下面用一張圖大致總結一下調度過程:

schedule

Reference

調度器 https://draveness.me/golang/docs/part3-runtime/ch06-concurrency/golang-goroutine/#m

MPG 模型與並發調度單元 https://golang.design/under-the-hood/zh-cn/part2runtime/ch06sched/mpg/

Scheduling In Go https://www.ardanlabs.com/blog/2018/08/scheduling-in-go-part2.html

golang的啟動原理 https://blog.csdn.net/byxiaoyuonly/article/details/103882201

Go語言調度器之盜取goroutine https://zhuanlan.zhihu.com/p/66090420

luozhiyun很酷


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM