golang select多路復用


在golang中,select一般是和chan一起工作的,用於同時監聽多個chan的信息,其實用方法和switch差不多:

select {
case <-ch1:
// ...
case x := <-ch2:
// ...
case ch3 <- y:
// ...
default :
// ...
}

 

和switch不同的是,每個case語句都必須對應channel的讀寫操作,select語句會陷入阻塞,直到一個或者多個channel可以讀寫才能恢復

 

1. select的阻塞機制

1.1 select的隨機選擇

當多個channel都具備了讀寫能力的時候,也就是說多個case都可以執行的條件下,select會執行哪一個?答案是隨機執行一個

我們可以寫一個簡單的demo,來看一下select實際的執行情況

func main() {
	c1 := make(chan int, 10)
	c2 := make(chan int, 10)
	for i := 0; i < 10; i++ {
		c1 <- i
		c2 <- i
	}
	for {
		select {
		case <-c1:
			fmt.Println("random 1")
		case <-c2:
			fmt.Println("random 2")
		default:
			//fmt.Println("default")
		}
	}
}

 

當兩個channel c1和c2都可以讀取數據時,select的執行選擇是隨機的

 

1.2 select的阻塞和控制

如果select中沒有任何的channel准備好,那么當前的select所在的協程會陷入阻塞,直到有一個case滿足條件

通常在實踐中不想一直阻塞的話,為了避免這種情況可以加上default分支,或者加入一個超時定時器

c := make( chan int, 1)
select {
case <-c:
    fmt.Println( "got it" )
case <-time.After(10 * time.Second):
    fmt.Println( "timeout" )
}

 

加入定時器超時的方式在實際中很常用,可以與超時重試或者超時直接報錯等方式結合

 

1.3 select循環等待

通常我們對於select的需求,就是想讓它一直阻塞,比如我們想要監聽一個chan所下達的任務

for select結構就是為此而生的,通常的做法下,select分支需要配合定時器來使用,實現超時通知或者定時任務等功能

func main() {
    c := make( chan int, 1)
    tick := time.Tick(time.Second)

    for {
        select {
        case <-c:
            fmt.Println("got it")
        case <-tick:
            fmt.Println("crontab")
        case <-time.After(800 * time.Millisecond):
            fmt.Println("timeout")
        }
    }
}

 

注意這里的兩個定時器time.Tick和time.After

time.After在每次for中都會被重置,所以它在記錄進入一次for循環的800ms時間

time.Tick是在for循環外部初始化的,所以它會按照時間累計,只要時間滿1s就會執行一次定時任務

所以這兩個定時器一個是為了超時重試,一個是為了執行一個間隔為1s的定時任務

 

1.4 select和nil channel

一個為nil的channel,讀寫都處於阻塞狀態,如果它在case分支中,select將永遠不會執行

nil channel這種特性讓我們可以設計一些特殊的數據傳輸方法,比如現在的需求是輪流向兩個channel發送數據

那么我們可以在給一個channel發送完數據之后,將其置nil

func main() {
    c1 := make( chan int)
    c2 := make( chan int)
    go func () {
        for i := 0; i < 2; i++ {
            select {
            case c1 <- 1:
                c1 = nil
            case c2 <- 2:
                c2 = nil
            }
        }
    }()

    fmt.Println(<-c1)
    fmt.Println(<-c2)
}

  

2. select的底層原理

select在運行時會調用核心函數selectgo

func selectgo(cas0 *scase, order0 *uint16, ncases int) (int, bool) {
    pollorder := order1[:ncases:ncases]
    lockorder := order1[ncases:][:ncases:ncases]
    for i := 1; i < ncases; i++ {
        j := fastrandn(uint32(i + 1))
        pollorder[i] = pollorder[j]
        pollorder[j] = uint16(i)
    }
}

  

每一個case在運行時都是一個scase結構體,存放了chan和chan中的元素類型

type scase  struct {
    c  *hchan
    elem unsafe.Pointer
    kind uint16
    ...
}

  

其中的kind代表case的類型,主要有四種類型:

  • caseNil
  • caseRecv
  • caseSend
  • caseDefault

分別對應着四種case的操作,對於每一種分支,select會執行不同的函數

 

在selectgo中,有兩個重要的序列結構:pollorder和lockorder

pollorder是一個亂序的case序列,就是函數體中那一段for循環代碼,算法類似於洗牌算法,保證了select的隨機性

lockorder是按照大小對chan地址排序的算法,對所有的scase按照其chan在堆區的地址大小,使用了小頂堆算法來排序

selectgo會按照該次序對select中的case加鎖,按照地址排序的順序加鎖是為了防止多個協程並發產生死鎖

 

當所有scase中的chan加鎖完畢之后,就開始第一輪循環找出是否有准備好的分支:

  • 如果是caseNil,忽略
  • 如果是caseRecv,判斷是否有正在等待寫入的協程,如果有跳轉到recv分支;判斷緩沖區是否有數據,如果有則跳轉bufrecv分支
  • 如果是caseSend,判斷是否有正在等待讀取的協程,如果有跳轉到send分支;判斷緩沖區是否有空余,如果有跳轉bufsend分支
  • 如果是caseDefault,記錄下來,當循環結束發現沒有其他case准備好時,執行default

 

當select完成一輪循環不能直接退出時,意味着當前協程需要進入阻塞狀態等到至少一個case具備執行條件

不管是讀取還是寫入chan都需要創建一個新的sudog並將其放入指定通道的等待隊列,之后重新進入阻塞狀態

當select case中任意一個case不再阻塞時,當前協程將會被喚醒

要注意的是,最后需要將sudog結構體在其他通道的等待隊列中出棧,因為當前協程已經能夠正常運行,不需要再被其他通道喚醒

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM