第三章 Goroutine調度策略(16)


本文是《Go語言調度器源代碼情景分析》系列的第16篇,也是第三章《Goroutine調度策略》的第1小節。


 

在調度器概述一節我們提到過,所謂的goroutine調度,是指程序代碼按照一定的算法在適當的時候挑選出合適的goroutine並放到CPU上去運行的過程。這句話揭示了調度系統需要解決的三大核心問題:

  1. 調度時機:什么時候會發生調度?

  2. 調度策略:使用什么策略來挑選下一個進入運行的goroutine?

  3. 切換機制:如何把挑選出來的goroutine放到CPU上運行?

對這三大問題的解決構成了調度器的所有工作,因而我們對調度器的分析也必將圍繞着它們所展開。

第二章我們已經詳細的分析了調度器的初始化以及goroutine的切換機制,本章將重點討論調度器如何挑選下一個goroutine出來運行的策略問題,而剩下的與調度時機相關的內容我們將在第4~6章進行全面的分析。

再探schedule函數

在討論main goroutine的調度時我們已經見過schedule函數,因為當時我們的主要關注點在於main goroutine是如何被調度到CPU上運行的,所以並未對schedule函數如何挑選下一個goroutine出來運行做深入的分析,現在是重新回到schedule函數詳細分析其調度策略的時候了。

runtime/proc.go : 2467

// One round of scheduler: find a runnable goroutine and execute it.
// Never returns.
func schedule() {
    _g_ := getg()   //_g_ = m.g0

    ......

    var gp *g

    ......
   
    if gp == nil {
    // Check the global runnable queue once in a while to ensure fairness.
    // Otherwise two goroutines can completely occupy the local runqueue
    // by constantly respawning each other.
       //為了保證調度的公平性,每個工作線程每進行61次調度就需要優先從全局運行隊列中獲取goroutine出來運行,
       //因為如果只調度本地運行隊列中的goroutine,則全局運行隊列中的goroutine有可能得不到運行
        if _g_.m.p.ptr().schedtick%61 == 0 && sched.runqsize > 0 {
            lock(&sched.lock) //所有工作線程都能訪問全局運行隊列,所以需要加鎖
            gp = globrunqget(_g_.m.p.ptr(), 1) //從全局運行隊列中獲取1個goroutine
            unlock(&sched.lock)
        }
    }
    if gp == nil {
        //從與m關聯的p的本地運行隊列中獲取goroutine
        gp, inheritTime = runqget(_g_.m.p.ptr())
        if gp != nil && _g_.m.spinning {
            throw("schedule: spinning with local work")
        }
    }
    if gp == nil {
        //如果從本地運行隊列和全局運行隊列都沒有找到需要運行的goroutine,
        //則調用findrunnable函數從其它工作線程的運行隊列中偷取,如果偷取不到,則當前工作線程進入睡眠,
        //直到獲取到需要運行的goroutine之后findrunnable函數才會返回。
        gp, inheritTime = findrunnable() // blocks until work is available
    }

    ......

    //當前運行的是runtime的代碼,函數調用棧使用的是g0的棧空間
    //調用execte切換到gp的代碼和棧空間去運行
    execute(gp, inheritTime)  
}

schedule函數分三步分別從各運行隊列中尋找可運行的goroutine:

第一步,從全局運行隊列中尋找goroutine。為了保證調度的公平性,每個工作線程每經過61次調度就需要優先嘗試從全局運行隊列中找出一個goroutine來運行,這樣才能保證位於全局運行隊列中的goroutine得到調度的機會。全局運行隊列是所有工作線程都可以訪問的,所以在訪問它之前需要加鎖。

第二步,從工作線程本地運行隊列中尋找goroutine。如果不需要或不能從全局運行隊列中獲取到goroutine則從本地運行隊列中獲取。

第三步,從其它工作線程的運行隊列中偷取goroutine。如果上一步也沒有找到需要運行的goroutine,則調用findrunnable從其他工作線程的運行隊列中偷取goroutine,findrunnable函數在偷取之前會再次嘗試從全局運行隊列和當前線程的本地運行隊列中查找需要運行的goroutine。

下面我們先來看如何從全局運行隊列中獲取goroutine。

從全局運行隊列中獲取goroutine

從全局運行隊列中獲取可運行的goroutine是通過globrunqget函數來完成的,該函數的第一個參數是與當前工作線程綁定的p,第二個參數max表示最多可以從全局隊列中拿多少個g到當前工作線程的本地運行隊列中來。

runtime/proc.go : 4663

// Try get a batch of G's from the global runnable queue.
// Sched must be locked.
func globrunqget(_p_ *p, max int32) *g {
    if sched.runqsize == 0 {  //全局運行隊列為空
        return nil
    }

    //根據p的數量平分全局運行隊列中的goroutines
    n := sched.runqsize / gomaxprocs + 1
    if n > sched.runqsize { //上面計算n的方法可能導致n大於全局運行隊列中的goroutine數量
        n = sched.runqsize
    }
    if max > 0 && n > max {
        n = max   //最多取max個goroutine
    }
    if n > int32(len(_p_.runq)) / 2 {
        n = int32(len(_p_.runq)) / 2  //最多只能取本地隊列容量的一半
    }

    sched.runqsize -= n

    //直接通過函數返回gp,其它的goroutines通過runqput放入本地運行隊列
    gp := sched.runq.pop()  //pop從全局運行隊列的隊列頭取
    n--
    for ; n > 0; n-- {
        gp1 := sched.runq.pop()  //從全局運行隊列中取出一個goroutine
        runqput(_p_, gp1, false)  //放入本地運行隊列
    }
    return gp
}

globrunqget函數首先會根據全局運行隊列中goroutine的數量,函數參數max以及_p_的本地隊列的容量計算出到底應該拿多少個goroutine,然后把第一個g結構體對象通過返回值的方式返回給調用函數,其它的則通過runqput函數放入當前工作線程的本地運行隊列。這段代碼值得一提的是,計算應該從全局運行隊列中拿走多少個goroutine時根據p的數量(gomaxprocs)做了負載均衡。

如果沒有從全局運行隊列中獲取到goroutine,那么接下來就在工作線程的本地運行隊列中尋找需要運行的goroutine。

從工作線程本地運行隊列中獲取goroutine

從代碼上來看,工作線程的本地運行隊列其實分為兩個部分,一部分是由p的runq、runqhead和runqtail這三個成員組成的一個無鎖循環隊列,該隊列最多可包含256個goroutine;另一部分是p的runnext成員,它是一個指向g結構體對象的指針,它最多只包含一個goroutine。

從本地運行隊列中尋找goroutine是通過runqget函數完成的,尋找時,代碼首先查看runnext成員是否為空,如果不為空則返回runnext所指的goroutine,並把runnext成員清零,如果runnext為空,則繼續從循環隊列中查找goroutine。

runtime/proc.go : 4825

// Get g from local runnable queue.
// If inheritTime is true, gp should inherit the remaining time in the
// current time slice. Otherwise, it should start a new time slice.
// Executed only by the owner P.
func runqget(_p_ *p) (gp *g, inheritTime bool) {
    // If there's a runnext, it's the next G to run.
    //從runnext成員中獲取goroutine
    for {
        //查看runnext成員是否為空,不為空則返回該goroutine
        next := _p_.runnext  
        if next == 0 {
            break
        }
        if _p_.runnext.cas(next, 0) {
            return next.ptr(), true
        }
    }

    //從循環隊列中獲取goroutine
    for {
        h := atomic.LoadAcq(&_p_.runqhead) // load-acquire, synchronize with other consumers
        t := _p_.runqtail
        if t == h {
            return nil, false
        }
        gp := _p_.runq[h%uint32(len(_p_.runq))].ptr()
        if atomic.CasRel(&_p_.runqhead, h, h+1) { // cas-release, commits consume
            return gp, false
        }
    }
}

這里首先需要注意的是不管是從runnext還是從循環隊列中拿取goroutine都使用了cas操作,這里的cas操作是必需的,因為可能有其他工作線程此時此刻也正在訪問這兩個成員,從這里偷取可運行的goroutine。

其次,代碼中對runqhead的操作使用了atomic.LoadAcq和atomic.CasRel,它們分別提供了load-acquire和cas-release語義。

對於atomic.LoadAcq來說,其語義主要包含如下幾條

  1. 原子讀取,也就是說不管代碼運行在哪種平台,保證在讀取過程中不會有其它線程對該變量進行寫入;

  2. 位於atomic.LoadAcq之后的代碼,對內存的讀取和寫入必須在atomic.LoadAcq讀取完成后才能執行,編譯器和CPU都不能打亂這個順序;

  3. 當前線程執行atomic.LoadAcq時可以讀取到其它線程最近一次通過atomic.CasRel對同一個變量寫入的值,與此同時,位於atomic.LoadAcq之后的代碼,不管讀取哪個內存地址中的值,都可以讀取到其它線程中位於atomic.CasRel(對同一個變量操作)之前的代碼最近一次對內存的寫入。

對於atomic.CasRel來說,其語義主要包含如下幾條

  1. 原子的執行比較並交換的操作;

  2. 位於atomic.CasRel之前的代碼,對內存的讀取和寫入必須在atomic.CasRel對內存的寫入之前完成,編譯器和CPU都不能打亂這個順序;

  3. 線程執行atomic.CasRel完成后其它線程通過atomic.LoadAcq讀取同一個變量可以讀到最新的值,與此同時,位於atomic.CasRel之前的代碼對內存寫入的值,可以被其它線程中位於atomic.LoadAcq(對同一個變量操作)之后的代碼讀取到。

因為可能有多個線程會並發的修改和讀取runqhead,以及需要依靠runqhead的值來讀取runq數組的元素,所以需要使用atomic.LoadAcq和atomic.CasRel來保證上述語義。

我們可能會問,為什么讀取p的runqtail成員不需要使用atomic.LoadAcq或atomic.load?因為runqtail不會被其它線程修改,只會被當前工作線程修改,此時沒有人修改它,所以也就不需要使用原子相關的操作。

最后,由p的runq、runqhead和runqtail這三個成員組成的這個無鎖循環隊列非常精妙,我們會在后面的章節對這個循環隊列進行分析。

CAS操作與ABA問題

我們知道使用cas操作需要特別注意ABA的問題,那么runqget函數這兩個使用cas的地方會不會有問題呢?答案是這兩個地方都不會有ABA的問題。原因分析如下:

首先來看對runnext的cas操作。只有跟_p_綁定的當前工作線程才會去修改runnext為一個非0值,其它線程只會把runnext的值從一個非0值修改為0值,然而跟_p_綁定的當前工作線程正在此處執行代碼,所以在當前工作線程讀取到值A之后,不可能有線程修改其值為B(0)之后再修改回A。

再來看對runq的cas操作。當前工作線程操作的是_p_的本地隊列,只有跟_p_綁定在一起的當前工作線程才會因為往該隊列里面添加goroutine而去修改runqtail,而其它工作線程不會往該隊列里面添加goroutine,也就不會去修改runqtail,它們只會修改runqhead,所以,當我們這個工作線程從runqhead讀取到值A之后,其它工作線程也就不可能修改runqhead的值為B之后再第二次把它修改為值A(因為runqtail在這段時間之內不可能被修改,runqhead的值也就無法越過runqtail再回繞到A值),也就是說,代碼從邏輯上已經杜絕了引發ABA的條件。

到此,我們已經分析完工作線程從全局運行隊列和本地運行隊列獲取goroutine的代碼,由於篇幅的限制,我們下一節再來分析從其它工作線程的運行隊列偷取goroutine的流程。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM