Go語言調度器之盜取goroutine(17)


本文是《Go語言調度器源代碼情景分析》系列的第17篇,也是第三章《Goroutine調度策略》的第2小節。


 

上一小節我們分析了從全局運行隊列與工作線程的本地運行隊列獲取goroutine的過程,這一小節我們繼續分析因無法從上述兩個隊列中拿到需要運行的goroutine而導致的從其它工作線程的本地運行隊列中盜取goroutine的過程。

findrunnable() 函數負責處理與盜取相關的邏輯,該函數代碼很繁雜,因為它還做了與gc和netpoll等相關的事情,為了不影響我們的分析思路,這里我們仍然把不相關的代碼刪掉了,不過代碼還是比較多,但總結起來就一句話:盡力去各個運行隊列中尋找goroutine,如果實在找不到則進入睡眠狀態。下面是代碼細節:

runtime/proc.go : 2176

// Finds a runnable goroutine to execute.
// Tries to steal from other P's, get g from global queue, poll network.
func findrunnable() (gp *g, inheritTime bool) {
	_g_ := getg()

	// The conditions here and in handoffp must agree: if
	// findrunnable would return a G to run, handoffp must start
	// an M.

top:
	_p_ := _g_.m.p.ptr()
	
    ......

	// local runq
    //再次看一下本地運行隊列是否有需要運行的goroutine
	if gp, inheritTime := runqget(_p_); gp != nil {
		return gp, inheritTime
	}

	// global runq
    //再看看全局運行隊列是否有需要運行的goroutine
	if sched.runqsize != 0 {
		lock(&sched.lock)
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		if gp != nil {
			return gp, false
		}
	}

    ......

	// Steal work from other P's.
    //如果除了當前工作線程還在運行外,其它工作線程已經處於休眠中,那么也就不用去偷了,肯定沒有
	procs := uint32(gomaxprocs)
	if atomic.Load(&sched.npidle) == procs-1 {
		// Either GOMAXPROCS=1 or everybody, except for us, is idle already.
		// New work can appear from returning syscall/cgocall, network or timers.
		// Neither of that submits to local run queues, so no point in stealing.
		goto stop
	}
	// If number of spinning M's >= number of busy P's, block.
	// This is necessary to prevent excessive CPU consumption
	// when GOMAXPROCS>>1 but the program parallelism is low.
    // 這個判斷主要是為了防止因為尋找可運行的goroutine而消耗太多的CPU。
    // 因為已經有足夠多的工作線程正在尋找可運行的goroutine,讓他們去找就好了,自己偷個懶去睡覺
	if !_g_.m.spinning && 2*atomic.Load(&sched.nmspinning) >= procs-atomic.Load(&sched.npidle) {
		goto stop
	}
	if !_g_.m.spinning {
        //設置m的狀態為spinning
		_g_.m.spinning = true
        //處於spinning狀態的m數量加一
		atomic.Xadd(&sched.nmspinning, 1)
	}
    
    //從其它p的本地運行隊列盜取goroutine
	for i := 0; i < 4; i++ {
		for enum := stealOrder.start(fastrand()); !enum.done(); enum.next() {
			if sched.gcwaiting != 0 {
				goto top
			}
			stealRunNextG := i > 2 // first look for ready queues with more than 1 g
			if gp := runqsteal(_p_, allp[enum.position()], stealRunNextG); gp != nil {
				return gp, false
			}
		}
	}

stop:
	
	......

	// Before we drop our P, make a snapshot of the allp slice,
	// which can change underfoot once we no longer block
	// safe-points. We don't need to snapshot the contents because
	// everything up to cap(allp) is immutable.
	allpSnapshot := allp

	// return P and block
	lock(&sched.lock)
  
	......
  
	if sched.runqsize != 0 {
		gp := globrunqget(_p_, 0)
		unlock(&sched.lock)
		return gp, false
	}
    
    // 當前工作線程解除與p之間的綁定,准備去休眠
	if releasep() != _p_ {
		throw("findrunnable: wrong p")
	}
    //把p放入空閑隊列
	pidleput(_p_)
	unlock(&sched.lock)

	// Delicate dance: thread transitions from spinning to non-spinning state,
	// potentially concurrently with submission of new goroutines. We must
	// drop nmspinning first and then check all per-P queues again (with
	// #StoreLoad memory barrier in between). If we do it the other way around,
	// another thread can submit a goroutine after we've checked all run queues
	// but before we drop nmspinning; as the result nobody will unpark a thread
	// to run the goroutine.
	// If we discover new work below, we need to restore m.spinning as a signal
	// for resetspinning to unpark a new worker thread (because there can be more
	// than one starving goroutine). However, if after discovering new work
	// we also observe no idle Ps, it is OK to just park the current thread:
	// the system is fully loaded so no spinning threads are required.
	// Also see "Worker thread parking/unparking" comment at the top of the file.
	wasSpinning := _g_.m.spinning
	if _g_.m.spinning {
        //m即將睡眠,狀態不再是spinning
		_g_.m.spinning = false
		if int32(atomic.Xadd(&sched.nmspinning, -1)) < 0 {
			throw("findrunnable: negative nmspinning")
		}
	}

	// check all runqueues once again
    // 休眠之前再看一下是否有工作要做
	for _, _p_ := range allpSnapshot {
		if !runqempty(_p_) {
			lock(&sched.lock)
			_p_ = pidleget()
			unlock(&sched.lock)
			if _p_ != nil {
				acquirep(_p_)
				if wasSpinning {
					_g_.m.spinning = true
					atomic.Xadd(&sched.nmspinning, 1)
				}
				goto top
			}
			break
		}
	}

	......
    //休眠
	stopm()
	goto top
}

從上面的代碼可以看到,工作線程在放棄尋找可運行的goroutine而進入睡眠之前,會反復嘗試從各個運行隊列尋找需要運行的goroutine,可謂是盡心盡力了。這個函數需要重點注意以下兩點:

第一點,工作線程M的自旋狀態(spinning)工作線程在從其它工作線程的本地運行隊列中盜取goroutine時的狀態稱為自旋狀態。從上面代碼可以看到,當前M在去其它p的運行隊列盜取goroutine之前把spinning標志設置成了true,同時增加處於自旋狀態的M的數量,而盜取結束之后則把spinning標志還原為false,同時減少處於自旋狀態的M的數量,從后面的分析我們可以看到,當有空閑P又有goroutine需要運行的時候,這個處於自旋狀態的M的數量決定了是否需要喚醒或者創建新的工作線程。

第二點,盜取算法。盜取過程用了兩個嵌套for循環。內層循環實現了盜取邏輯,從代碼可以看出盜取的實質就是遍歷allp中的所有p,查看其運行隊列是否有goroutine,如果有,則取其一半到當前工作線程的運行隊列,然后從findrunnable返回,如果沒有則繼續遍歷下一個p。但這里為了保證公平性,遍歷allp時並不是固定的從allp[0]即第一個p開始,而是從隨機位置上的p開始,而且遍歷的順序也隨機化了,並不是現在訪問了第i個p下一次就訪問第i+1個p,而是使用了一種偽隨機的方式遍歷allp中的每個p,防止每次遍歷時使用同樣的順序訪問allp中的元素。下面是這個算法的偽代碼:

offset := uint32(random()) % nprocs
coprime := 隨機選取一個小於nprocs且與nprocs互質的數
for i := 0; i < nprocs; i++ {
    p := allp[offset]
    從p的運行隊列偷取goroutine
    if 偷取成功 {
        break
    }
    offset += coprime
    offset = offset % nprocs
}

下面舉例說明一下上述算法過程,現假設nprocs為8,也就是一共有8個p。

如果第一次隨機選擇的offset = 6,coprime = 3(3與8互質,滿足算法要求)的話,則從allp切片中偷取的下標順序為6, 1, 4, 7, 2, 5, 0, 3,計算過程:

6,(6+3)%8=1,(1+3)%8=4, (4+3)%8=7, (7+3)%8=2, (2+3)%8=5, (5+3)%8=0, (0+3)%8=3

如果第二次隨機選擇的offset = 4,coprime = 5的話,則從allp切片中偷取的下標順序為1, 6, 3, 0, 5, 2, 7, 4,計算過程:

1,(1+5)%8=6,(6+5)%8=3, (3+5)%8=0, (0+5)%8=5, (5+5)%8=2, (2+5)%8=7, (7+5)%8=4

可以看到只要隨機數不一樣,偷取p的順序也不一樣,但可以保證經過8次循環,每個p都會被訪問到。可以用數論知識證明,不管nprocs是多少,這個算法都可以保證經過nprocs次循環,每個p都可以得到訪問。

挑選出盜取的對象p之后,則調用runqsteal盜取p的運行隊列中的goroutine,runqsteal函數再調用runqgrap從p的隊列中批量拿出多個goroutine,這兩個函數本身比較簡單,但runqgrab有一個小細節需要注意一下,見下面代碼:

runtime/proc.go : 4854

// Grabs a batch of goroutines from _p_'s runnable queue into batch.
// Batch is a ring buffer starting at batchHead.
// Returns number of grabbed goroutines.
// Can be executed by any P.
func runqgrab(_p_ *p, batch *[256]guintptr, batchHead uint32, stealRunNextG bool) uint32 {
	for {
		h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
		t := atomic.LoadAcq(&_p_.runqtail) // load-acquire, synchronize with the producer
		n := t - h        //計算隊列中有多少個goroutine
		n = n - n/2     //取隊列中goroutine個數的一半
		if n == 0 {
			......
			return ......
		}
        //小細節:按理說隊列中的goroutine個數最多就是len(_p_.runq),
        //所以n的最大值也就是len(_p_.runq)/2,那為什么需要這個判斷呢?
		if n > uint32(len(_p_.runq)/2) { // read inconsistent h and t
			continue
		}
        
		......
	}
}

代碼中n的計算很簡單,從計算過程來看n應該是runq隊列中goroutine數量的一半,它的最大值不會超過隊列容量的一半,但為什么這里的代碼卻偏偏要去判斷n是否大於隊列容量的一半呢?這里關鍵點在於讀取runqhead和runqtail是兩個操作而非一個原子操作,當我們讀取runqhead之后但還未讀取runqtail之前,如果有其它線程快速的在增加(這是完全有可能的,其它偷取者從隊列中偷取goroutine會增加runqhead,而隊列的所有者往隊列中添加goroutine會增加runqtail)這兩個值,則會導致我們讀取出來的runqtail已經遠遠大於我們之前讀取出來放在局部變量h里面的runqhead了,也就是代碼注釋中所說的h和t已經不一致了,所以這里需要這個if判斷來檢測異常情況。

工作線程進入睡眠

分析完盜取過程,我們繼續回到findrunnable函數。

如果工作線程經過多次努力一直找不到需要運行的goroutine則調用stopm進入睡眠狀態,等待被其它工作線程喚醒。

runtime/proc.go : 1918

// Stops execution of the current m until new work is available.
// Returns with acquired P.
func stopm() {
   _g_ := getg()

   if _g_.m.locks != 0 {
      throw("stopm holding locks")
   }
   if _g_.m.p != 0 {
      throw("stopm holding p")
   }
   if _g_.m.spinning {
      throw("stopm spinning")
   }

   lock(&sched.lock)
   mput(_g_.m)   //把m結構體對象放入sched.midle空閑隊列
   unlock(&sched.lock)
   notesleep(&_g_.m.park)  //進入睡眠狀態
  
   //被其它工作線程喚醒
   noteclear(&_g_.m.park)
   acquirep(_g_.m.nextp.ptr())
   _g_.m.nextp = 0
}

stopm的核心是調用mput把m結構體對象放入sched的midle空閑隊列,然后通過notesleep(&m.park)函數讓自己進入睡眠狀態

note是go runtime實現的一次性睡眠和喚醒機制,一個線程可以通過調用notesleep(*note)進入睡眠狀態,而另外一個線程則可以通過notewakeup(*note)把其喚醒。note的底層實現機制跟操作系統相關,不同系統使用不同的機制,比如linux下使用的futex系統調用,而mac下則是使用的pthread_cond_t條件變量,note對這些底層機制做了一個抽象和封裝,這種封裝給擴展性帶來了很大的好處,比如當睡眠和喚醒功能需要支持新平台時,只需要在note層增加對特定平台的支持即可,不需要修改上層的任何代碼。

回到stopm,當從notesleep函數返回后,需要再次綁定一個p,然后返回到findrunnable函數繼續重新尋找可運行的goroutine,一旦找到可運行的goroutine就會返回到schedule函數,並把找到的goroutine調度起來運行,如何把goroutine調度起來運行的代碼我們已經分析過了。現在繼續看notesleep函數。

runtime/lock_futex.go : 139

func notesleep(n *note) {
	gp := getg()
	if gp != gp.m.g0 {
		throw("notesleep not on g0")
	}
	ns := int64(-1)  //超時時間設置為-1,表示無限期等待
	if *cgo_yield != nil {
		// Sleep for an arbitrary-but-moderate interval to poll libc interceptors.
		ns = 10e6
	}
  
        //使用循環,保證不是意外被喚醒
	for atomic.Load(key32(&n.key)) == 0 {
		gp.m.blocked = true
		futexsleep(key32(&n.key), 0, ns)
		if *cgo_yield != nil {
			asmcgocall(*cgo_yield, nil)
		}
		gp.m.blocked = false
	}
}

notesleep函數調用futexsleep進入睡眠,這里之所以需要用一個循環,是因為futexsleep有可能意外從睡眠中返回,所以從futexsleep函數返回后還需要檢查note.key是否還是0,如果是0則表示並不是其它工作線程喚醒了我們,只是futexsleep意外返回了,需要再次調用futexsleep進入睡眠。

futexsleep調用futex函數進入睡眠。

runtime/os_linux.go : 32

// Atomically,
//	if(*addr == val) sleep
// Might be woken up spuriously; that's allowed.
// Don't sleep longer than ns; ns < 0 means forever.
//go:nosplit
func futexsleep(addr *uint32, val uint32, ns int64) {
	var ts timespec

	// Some Linux kernels have a bug where futex of
	// FUTEX_WAIT returns an internal error code
	// as an errno. Libpthread ignores the return value
	// here, and so can we: as it says a few lines up,
	// spurious wakeups are allowed.
	if ns < 0 {
         //調用futex進入睡眠
		futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, nil, nil, 0)
		return
	}

	// It's difficult to live within the no-split stack limits here.
	// On ARM and 386, a 64-bit divide invokes a general software routine
	// that needs more stack than we can afford. So we use timediv instead.
	// But on real 64-bit systems, where words are larger but the stack limit
	// is not, even timediv is too heavy, and we really need to use just an
	// ordinary machine instruction.
	if sys.PtrSize == 8 {
		ts.set_sec(ns / 1000000000)
		ts.set_nsec(int32(ns % 1000000000))
	} else {
		ts.tv_nsec = 0
		ts.set_sec(int64(timediv(ns, 1000000000, (*int32)(unsafe.Pointer(&ts.tv_nsec)))))
	}
	futex(unsafe.Pointer(addr), _FUTEX_WAIT_PRIVATE, val, unsafe.Pointer(&ts), nil, 0)
}

futex是go匯編實現的函數,主要功能就是執行futex系統調用進入操作系統內核進行睡眠。

runtime/sys_linux_amd64.s : 525

// int64 futex(int32 *uaddr, int32 op, int32 val,
//    struct timespec *timeout, int32 *uaddr2, int32 val2);
TEXT runtime·futex(SB),NOSPLIT,$0
    #下面的6條指令在為futex系統調用准備參數
    MOVQ    addr+0(FP), DI
    MOVL    op+8(FP), SI
    MOVL    val+12(FP), DX
    MOVQ    ts+16(FP), R10
    MOVQ    addr2+24(FP), R8
    MOVL    val3+32(FP), R9
    
    MOVL    $SYS_futex, AX   #系統調用編號放入AX寄存器
    SYSCALL  #執行futex系統調用進入睡眠,從睡眠中被喚醒后接着執行下一條MOVL指令
    MOVL    AX, ret+40(FP)    #保存系統調用的返回值
    RET

futex系統的參數比較多,其函數原型為

int64 futex(int32*uaddr, int32op, int32val, structtimespec*timeout, int32*uaddr2, int32val2);

這里,futex系統調用為我們提供的功能為如果 *uaddr == val 則進入睡眠,否則直接返回。順便說一下,為什么futex系統調用需要第三個參數val,需要在內核判斷*uaddr與val是否相等,而不能在用戶態先判斷它們是否相等,如果相等才進入內核睡眠豈不是更高效?原因在於判斷*uaddr與val是否相等和進入睡眠這兩個操作必須是一個原子操作,否則會存在一個競態條件:如果不是原子操作,則當前線程在第一步判斷完*uaddr與val相等之后進入睡眠之前的這一小段時間內,有另外一個線程通過喚醒操作把*uaddr的值修改了,這就會導致當前工作線程永遠處於睡眠狀態而無人喚醒它。而在用戶態無法實現判斷與進入睡眠這兩步為一個原子操作,所以需要內核來為其實現原子操作。

我們知道線程一旦進入睡眠狀態就停止了運行,那么如果后來又有可運行的goroutine需要工作線程去運行,正在睡眠的線程怎么知道有工作可做了呢?

從前面的代碼我們已經看到,stopm調用notesleep時給它傳遞的參數是m結構體的park成員,而m又早已通過mput放入了全局的milde空閑隊列,這樣其它運行着的線程一旦發現有更多的goroutine需要運行時就可以通過全局的m空閑隊列找到處於睡眠狀態的m,然后調用notewakeup(&m.park)將其喚醒,至於怎么喚醒,我們在其它章節繼續討論。

到此,我們已經完整分析了調度器的調度策略,從下一章起我們將開始討論有關調度的另外一個話題:調度時機,即什么時候會發生調度。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM