go通道基於go的並發調度實現,本身並不復雜,go並發調度請看我的這篇文章:go並發調度原理學習
1.channel數據結構
type hchan struct { qcount uint // 緩沖區中已有元素個數 dataqsiz uint //循環隊列容量大小 buf unsafe.Pointer // 緩沖區指針 elemsize uint16 //元素大小 closed uint32 //關閉標記,0沒關閉,1關閉 elemtype *_type //數據項類型 sendx uint //發送索引 recvx uint //接收索引 recvq waitq //等待接收排隊鏈表 sendq waitq //等待發送排隊鏈表 lock mutex //鎖 } type waitq struct { first *sudog last *sudog }
2.創建channel實現
創建channel實例:
ch := make(chan int, 4)
實現函數:
func makechan(t *chantype, size int64) *hchan
大致實現:
執行上面這行代碼會new一個hchan結構,同時創建一個dataqsiz=4的int類型的循環隊列,其實就是一個容納4個元素的數組,就是按順序往里面寫數據,寫滿之后又從0開始寫,這個順序索引就是hchan.sendx
3.發送數據
發送數據實例:
ch <- 100
發送數據實現函數:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool
ep指向要發送數據的首地址
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool { lock(&c.lock) if c.closed != 0 { unlock(&c.lock) panic(plainError("send on closed channel")) } if sg := c.recvq.dequeue(); sg != nil { //緩沖區就是一個固定長度的循環列表 //發送隊列是一個雙向鏈表,接在緩沖區的后面,整體是一個隊列,保證先進先出 //有接收者,並不是將當前要發送的數據直接發出,而是將緩沖區的第一個元素發送給接收者,同時將發送隊列的第一個元素加入緩沖區剛出隊列的位置 send(c, sg, ep, func() { unlock(&c.lock) }, 3) return true } if c.qcount < c.dataqsiz { //緩沖區沒有滿,直接將要發送的數據復制到緩沖區,直接返回, qp := chanbuf(c, c.sendx) typedmemmove(c.elemtype, qp, ep) c.sendx++ if c.sendx == c.dataqsiz { c.sendx = 0 } c.qcount++ unlock(&c.lock) return true } if !block { unlock(&c.lock) return false } //以上都是同步非阻塞的,ch <- 100直接返回 //以下是同步阻塞 //緩沖區滿了,也沒有接收者,通道將被阻塞,其實就是不執行當前G了,將狀態改成等待狀態 gp := getg() mysg := acquireSudog() c.sendq.enqueue(mysg) goparkunlock(&c.lock, "chan send", traceEvGoBlockSend, 3) //當G被喚醒,狀態改成可執行狀態,從這里開始繼續執行 releaseSudog(mysg) return true }
大致實現:
1:接收隊列不為空,從接收隊列中取出第一個接收者*sudog,將數據復制到sudog.elem,復制函數為memmove用匯編實現,通知接收方數據給你了,將接收方協程由等待狀態改成可運行狀態,將當前協程加入協程隊列,等待被調度。
2:沒有接收者,有緩沖區且沒有滿,直接將數據復制到緩沖中,寫入緩沖區的位置為hchan.buf[sendx++],如果緩沖區已滿sendx=0,就是循環隊列的實現,往sendx指定的位置寫數據,hchan.qcount++
3:沒有接收者,沒有緩沖區或是滿了,則從當前協程對應的P的sudog隊列中取一個struct sudog,將數據復制到sudog.elem,將sudog加入sendq隊列中,通知接收方,當前流程阻塞,等待被喚醒,接收方收到通知后(被喚醒),繼續往下執行,接收數據完成后會通知發送方,即將發送方協程狀態由等待狀態改成可運行狀態,加入協程可運行隊列,等着被執行不會阻塞的情況:
1:通道緩沖區沒有滿之前,因為只是將要發送的數據復制到緩沖區就返回了
2:有接收者的情況,有數據復制到接收方的數據結構中(不是最終接收數據的變量,在執行接收函數的時候會拷貝到最終接收數據的變量),喚醒接收協程會阻塞的情況:自然就是緩沖區滿了,也沒有接收方,這個時候會將數據打包放到發送隊列,當前協程被設置成等待狀態,這個狀態不會被調度,當有接收方收到數據后,才會被喚醒
4.接收數據
接收數據實例:
val := <- ch
接收數據實現函數:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) { lock(&c.lock) if sg := c.sendq.dequeue(); sg != nil { // Found a waiting sender. If buffer is size 0, receive value // directly from sender. Otherwise, receive from head of queue // and add sender's value to the tail of the queue (both map to // the same buffer slot because the queue is full). recv(c, sg, ep, func() { unlock(&c.lock) }, 3) return true, true } if c.qcount > 0 { // Receive directly from queue qp := chanbuf(c, c.recvx) if ep != nil { typedmemmove(c.elemtype, ep, qp) } typedmemclr(c.elemtype, qp) c.recvx++ if c.recvx == c.dataqsiz { c.recvx = 0 } c.qcount-- unlock(&c.lock) return true, true } if !block { unlock(&c.lock) return false, false } //以上同步非阻塞 //以下同步阻塞 gp := getg() mysg := acquireSudog() c.recvq.enqueue(mysg) //將當前G狀態改成等待狀態,停止調度 goparkunlock(&c.lock, "chan receive", traceEvGoBlockRecv, 3) //當前G被喚醒從這里繼續執行 mysg.c = nil releaseSudog(mysg) return true, !closed }
大致實現:
1.發送隊列不為空(說明緩沖區已滿),從發送隊列中取出第一個發送者*sudog
1.1.沒有緩沖區,直接將發送隊列中的數據sudog.elem復制出來,存到接收數據的變量val中,通知發送方我處理完了,你可以繼續執行
1.2.有緩沖區,復制出緩沖區hchan.buf[recvx]對應的元素到val,在將發送方sudog.elem復制到hchan.buf[recvx],發送方按順序寫,接收方按順序讀,典型的FIFO,為了保證是先進先出,所以先復制出,再將隊列首元素復制到對應的緩沖區中,其實就是發送隊列連接在緩沖區后面,緩沖區滿了,就寫隊列,接收的時候先從緩沖區中拿數據,拿掉之后空出來的位置從發送隊列中取第一個填滿,並喚醒對應的G,只要發送隊列不為空,緩沖區肯定會被填滿
2.發送隊列為空,緩沖區不為空,復制出緩沖區hchan.buf[recvx]對應的元素到val,hchan.qcount--
3.發送隊列為空,緩沖區也為空,那就是沒有任何待接收的數據,接收流程就只能等了,將接收信息打包成sudog,加入接收隊列recvq,當前執行流程阻塞,等有發送數據后會被喚醒繼續
5.channel FIFO在解釋一次
5.1:緩沖區沒滿,發送數據就是進緩沖隊列,接收數據就是出緩沖隊列,比較好理解
5.2:緩沖區已滿,發送數據就是進等待隊列,接收數據先出緩沖隊列,即為要接收的數據,等待隊列出列,將數據存在緩沖隊列剛出列的位置,剛出列的位置相當於緩沖隊列的末尾,也就是說等待隊列的列頭連在緩沖隊列的末尾,將等待隊列的列頭加入緩存隊列的列尾,保證了緩沖隊列是滿的,減少的是緩沖隊列中的數據,保證先進先出
5.3:接收數據,緩沖隊列或等待隊列有數據,拿走第一個,保證等待隊列是接在緩沖區末尾,即緩沖區末尾有空缺,就讓等待隊列出列,並填充至緩沖區末尾,否則將自己打包加入接收隊列,當前G進入等待狀態,有數據發送自然會通知你
總結:Go channel基於go的並發調度實現阻塞和非阻塞兩種通訊方式