轉載請聲明出處哦~,本篇文章發布於luozhiyun的博客:https://www.luozhiyun.com
本文使用的go的源碼時14.4
chan介紹
package main
import "fmt"
func main() {
c := make(chan int)
go func() {
c <- 1 // send to channel
}()
x := <-c // recv from channel
fmt.Println(x)
}
我們可以這樣查看匯編結果:
go tool compile -N -l -S hello.go
-N表示禁用優化
-l禁用內聯
-S打印結果
通過上面這樣的方式,我們可以直到chan是調用的哪些函數:
源碼分析
結構體與創建
type hchan struct {
qcount uint // 循環列表元素個數
dataqsiz uint // 循環隊列的大小
buf unsafe.Pointer // 循環隊列的指針
elemsize uint16 // chan中元素的大小
closed uint32 // 是否已close
elemtype *_type // chan中元素類型
sendx uint // send在buffer中的索引
recvx uint // recv在buffer中的索引
recvq waitq // receiver的等待隊列
sendq waitq // sender的等待隊列
// 互拆鎖
lock mutex
}
qcount代表chan 中已經接收但還沒被取走的元素的個數,函數 len 可以返回這個字段的值;
dataqsiz和buf分別代表隊列buffer的大小,cap函數可以返回這個字段的值以及隊列buffer的指針,是一個定長的環形數組;
elemtype 和 elemsiz表示chan 中元素的類型和 元素的大小;
sendx:發送數據的指針在 buffer中的位置;
recvx:接收請求時的指針在 buffer 中的位置;
recvq和sendq分別表示等待接收數據的 goroutine 與等待發送數據的 goroutine;
sendq和recvq的類型是waitq的結構體:
type waitq struct {
first *sudog
last *sudog
}
waitq里面連接的是一個sudog雙向鏈表,保存的是等待的goroutine 。整個chan的圖例大概是這樣:
下面看一下創建chan,我們通過匯編結果也可以查看到make(chan int)
這句代碼會調用到runtime的makechan函數中:
const (
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
)
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// 略去檢查代碼
...
//計算需要分配的buf空間
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// chan的size或者元素的size是0,不必創建buf
c = (*hchan)(mallocgc(hchanSize, nil, true))
// Race detector
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 元素不是指針,分配一塊連續的內存給hchan數據結構和buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
// 表示hchan后面在內存里緊跟着就是buf
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 元素包含指針,那么單獨分配buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
return c
}
首先我們可以看到計算hchanSize:
maxAlign = 8
hchanSize = unsafe.Sizeof(hchan{}) + uintptr(-int(unsafe.Sizeof(hchan{}))&(maxAlign-1))
maxAlign是8,那么maxAlign-1的二進制就是111,然后和int(unsafe.Sizeof(hchan{}))取與就是取它的低三位,hchanSize就得到的是8的整數倍,做對齊使用。
這里switch有三種情況,第一種情況是緩沖區所需大小為 0,那么在為 hchan 分配內存時,只需要分配 sizeof(hchan) 大小的內存;
第二種情況是緩沖區所需大小不為 0,而且數據類型不包含指針,那么就分配連續的內存。注意的是,我們在創建channel的時候可以指定類型為指針類型:
//chan里存入的是int的指針
c := make(chan *int)
//chan里存入的是int的值
c := make(chan int)
第三種情況是緩沖區所需大小不為 0,而且數據類型包含指針,那么就不使用add的方式讓hchan和buf放在一起了,而是單獨的為buf申請一塊內存。
發送數據
channel的阻塞非阻塞
在看發送數據的代碼之前,我們先看一下什么是channel的阻塞和非阻塞。
一般情況下,傳入的參數都是 block=true
,即阻塞調用,一個往 channel 中插入數據的 goroutine 會阻塞到插入成功為止。
非阻塞是只這種情況:
select {
case c <- v:
... foo
default:
... bar
}
編譯器會將其改為:
if selectnbsend(c, v) {
... foo
} else {
... bar
}
selectnbsend方法傳入的block就是false:
func selectnbsend(c *hchan, elem unsafe.Pointer) (selected bool) {
return chansend(c, elem, false, getcallerpc())
}
chansend方法
向通道發送數據我們通過匯編結果可以發現是在runtime 中通過 chansend 實現的,方法比較長下面我們分段來進行理解:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
if c == nil {
// 對於非阻塞的發送,直接返回
if !block {
return false
}
// 對於阻塞的通道,將 goroutine 掛起
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
...
}
這里會對chan做一個判斷,如果它是空的,那么對於非阻塞的發送,直接返回 false;對於阻塞的通道,將 goroutine 掛起,並且永遠不會返回。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 非阻塞的情況下,如果通道沒有關閉,滿足以下一條:
// 1.沒有緩沖區並且當前沒有接收者
// 2.緩沖區不為0,並且已滿
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
...
}
需要注意的是這里是沒有加鎖的,go雖然在使用指針讀取單個值的時候原子性的,但是讀取多個值並不能保證,所以在判斷完closed雖然是沒有關閉的,那么在讀取完之后依然可能在這一瞬間從未關閉狀態轉變成關閉狀態。那么就有兩種可能:
- 通道沒有關閉,而且已經滿了,那么需要返回false,沒有問題;
- 通道關閉,而且已經滿了,但是在非阻塞的發送中返回false,也沒有問題;
有關go的一致性原語,可以看這篇:The Go Memory Model。
上面的這些判斷被稱為 fast path,因為加鎖的操作是一個很重的操作,所以能夠在加鎖之前返回的判斷就在加鎖之前做好是最好的。
下面接着看看加鎖部分的代碼:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
//加鎖
lock(&c.lock)
// 是否關閉的判斷
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 從 recvq 中取出一個接收者
if sg := c.recvq.dequeue(); sg != nil {
// 如果接收者存在,直接向該接收者發送數據,繞過buffer
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
...
}
進入了lock區域之后還需要再判斷以下close的狀態,然后從recvq 中取出一個接收者,如果已經有接收者,那么就向第一個接收者發送當前enqueue的消息。這里需要注意的是如果有接收者在隊列中等待,則說明此時的緩沖區是空的。
既然是一行行分析代碼,那么我們再進入到send看一下實現:
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
...
if sg.elem != nil {
// 直接把要發送的數據copy到reciever的棧空間
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 喚醒對應的 goroutine
goready(gp, skip+1)
}
在send方法里,sg就是goroutine打包好的對象,ep是對應要發送數據的指針,sendDirect方法會調用memmove進行數據的內存拷貝。然后goready函數會喚醒對應的 goroutine進行調度。
回到chansend方法,繼續往下看:
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 如果緩沖區沒有滿,直接將要發送的數據復制到緩沖區
if c.qcount < c.dataqsiz {
// 找到buf要填充數據的索引位置
qp := chanbuf(c, c.sendx)
...
// 將數據拷貝到 buffer 中
typedmemmove(c.elemtype, qp, ep)
// 數據索引前移,如果到了末尾,又從0開始
c.sendx++
if c.sendx == c.dataqsiz {
c.sendx = 0
}
// 元素個數加1,釋放鎖並返回
c.qcount++
unlock(&c.lock)
return true
}
...
}
這里會判斷buf緩沖區有沒有滿,如果沒有滿,那么就找到buf要填充數據的索引位置,調用typedmemmove方法將數據拷貝到buf中,然后重新設值sendx偏移量。
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
...
// 緩沖區沒有空間了,所以對於非阻塞調用直接返回
if !block {
unlock(&c.lock)
return false
}
// 創建 sudog 對象
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 將sudog 對象入隊
c.sendq.enqueue(mysg)
// 進入等待狀態
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanSend, traceEvGoBlockSend, 2)
...
}
這里會做兩部分的操作,對於非阻塞的調用會直接返回;對於阻塞的調用會創建sudog 對象,然后將sudog對象入隊之后gopark將 goroutine 轉入 waiting 狀態,並解鎖。調用gopark之后,在使用者看來該向 channel 發送數據的代碼語句會進行阻塞。
這里也需要注意一下,如果緩沖區為0,那么也會進入到這里,會調用到gopark立馬阻塞,所以在使用的時候需要記得接收數據,防止向chan發送數據的那一端永遠阻塞,如:
func process(timeout time.Duration) bool {
ch := make(chan bool)
go func() {
// 模擬處理耗時的業務
time.Sleep((timeout + time.Second))
ch <- true // block
fmt.Println("exit goroutine")
}()
select {
case result := <-ch:
return result
case <-time.After(timeout):
return false
}
}
如果這里在select的時候直接timeout返回了,而沒有調用 result := <-ch
,那么goroutine 就會永遠阻塞。
到這里發送的代碼就講解完了,整個流程大致如下:
比如我要執行:ch<-10

- 檢查 recvq 是否為空,如果不為空,則從 recvq 頭部取一個 goroutine,將數據發送過去;
- 如果 recvq 為空,,並且buf沒有滿,則將數據放入到 buf中;
- 如果 buf已滿,則將要發送的數據和當前 goroutine 打包成sudog,然后入隊到sendq隊列中,並將當前 goroutine 置為 waiting 狀態進行阻塞。
接收數據
從chan獲取數據實現函數為 chanrecv。下面我們看一下代碼實現:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
if c == nil {
// 如果 c 為空且是非阻塞調用,那么直接返回 (false,false)
if !block {
return
}
// 阻塞調用直接等待
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 對於非阻塞的情況,並且沒有關閉的情況
// 如果是無緩沖chan或者是chan中沒有數據,那么直接返回 (false,false)
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
// 上鎖
lock(&c.lock)
// 如果已經關閉,並且chan中沒有數據,返回 (true,false)
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
...
}
chanrecv方法和chansend方法是一樣的,首先也是做非空判斷,如果chan沒有初始化,那么如果是非阻塞調用,那么直接返回 (false,false),阻塞調用會直接等待;
下面的兩個if判斷我放在一起來進行講解,因為這里和chansend是不一樣的,chanrecv要根據不同條件需要返回不同的結果。
在上鎖之前的判斷是邊界條件的判斷:如果是非阻塞調用會判斷chan沒有發送方(dataqsiz為空且發送隊列為空),或chan的緩沖為空(dataqsiz>0 並且qcount==0)並且chan是沒有close,那么需要返回 (false,false);而chan已經關閉了,並且buf中沒有數據,需要返回 (true,false);
為了實現這個需求,所以在chanrecv方法里面邊界條件的判斷都使用atomic方法進行了獲取。
因為需要正確的得到chan已關閉,並且 buf 空會返回 (true, false),而不是 (false,false),所以在lock上鎖之前需要使用atomic來獲取參數防止重排序(Happens Before),因此必須使此處的 qcount 和 closed 的讀取操作的順序通過原子操作得到順序保障。
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 從發送者隊列獲取數據
if sg := c.sendq.dequeue(); sg != nil {
// 發送者隊列不為空,直接從發送者那里提取數據
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
...
}
func recv(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
// 如果是無緩沖區chan
if c.dataqsiz == 0 {
...
if ep != nil {
// 直接從發送者拷貝數據
recvDirect(c.elemtype, sg, ep)
}
// 有緩沖區chan
} else {
// 獲取buf的存放數據指針
qp := chanbuf(c, c.recvx)
...
// 直接從緩沖區拷貝數據給接收者
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
// 從發送者拷貝數據到緩沖區
typedmemmove(c.elemtype, qp, sg.elem)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
sg.elem = nil
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
// 將發送者喚醒
goready(gp, skip+1)
}
在這里如果有發送者在隊列等待,那么直接從發送者那里提取數據,並且喚醒這個發送者。需要注意的是由於有發送者在等待,所以如果有緩沖區,那么緩沖區一定是滿的。
在喚醒發送者之前需要對緩沖區做判斷,如果是無緩沖區,那么直接從發送者那里提取數據;如果有緩沖區首先會獲取recvx的指針,然后將從緩沖區拷貝數據給接收者,再將發送者數據拷貝到緩沖區。
然后將recvx加1,相當於將新的數據移到了隊尾,再將recvx的值賦值給sendx,最后調用goready將發送者喚醒,這里有些繞,我們通過圖片來展示:

這里展示的是在chansend中將數據拷貝到緩沖區中,當數據滿的時候會將sendx的指針置為0,所以當buf環形隊列是滿的時候sendx等於recvx。
然后再來看看chanrecv中發送者隊列有數據的時候移交緩沖區的數據是怎么做的:

這里會將recvx為0處的數據直接從緩存區拷貝數據給接收者,然后將發送者拷貝數據到緩沖區recvx指針處,然后將recvx指針加1並將recvx賦值給sendx,由於是滿的所以用recvx加1的效果實現了將新加入的數據入庫到隊尾的操作。
接着往下看:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 如果緩沖區中有數據
if c.qcount > 0 {
qp := chanbuf(c, c.recvx)
...
// 從緩沖區復制數據到 ep
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
// 接收數據的指針前移
c.recvx++
// 環形隊列,如果到了末尾,再從0開始
if c.recvx == c.dataqsiz {
c.recvx = 0
}
// 緩沖區中現存數據減一
c.qcount--
unlock(&c.lock)
return true, true
}
...
}
到了這里,說明緩沖區中有數據,但是發送者隊列沒有數據,那么將數據拷貝到接收數據的協程,然后將接收數據的指針前移,如果已經到了隊尾,那么就從0開始,最后將緩沖區中現存數據減一並解鎖。
下面就是緩沖區中沒有數據的情況:
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
...
// 非阻塞,直接返回
if !block {
unlock(&c.lock)
return false, false
}
// 創建sudog
gp := getg()
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
// 將sudog添加到接收隊列中
c.recvq.enqueue(mysg)
// 阻塞住goroutine,等待被喚醒
gopark(chanparkcommit, unsafe.Pointer(&c.lock), waitReasonChanReceive, traceEvGoBlockRecv, 2)
...
}
如果是非阻塞調用,直接返回;阻塞調用會將當前goroutine 封裝成sudog,然后將sudog添加到接收隊列中,調用gopark阻塞住goroutine,等待被喚醒。
關閉通道
關閉通道會調用到closechan方法:
func closechan(c *hchan) {
// 1. 校驗chan是否已初始化
if c == nil {
panic(plainError("close of nil channel"))
}
// 加鎖
lock(&c.lock)
// 如果已關閉了,那么不能被再次關閉
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
...
// 設置chan已關閉
c.closed = 1
// 申明一個存放g的list,用於存放在等待隊列中的groutine
var glist gList
// 2. 獲取所有接收者
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 加入隊列中
glist.push(gp)
}
// 獲取所有發送者
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
// 加入隊列中
glist.push(gp)
}
unlock(&c.lock)
// 3.喚醒所有的glist中的goroutine
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
- 這個方法首先會校驗chan是否已被初始化,然后加鎖之后再校驗是否已被關閉過,如果校驗都通過了,那么將closed字段設值為1;
- 遍歷所有的接收者和發送者,並將其goroutine 加入到glist中;
- 將所有glist中的goroutine加入調度隊列,等待被喚醒,這里需要注意的是發送者在被喚醒之后會panic;
總結
chan在go中是一個非常強大的工具,使用它可以實現很多功能,但是為了能夠高效的使用它我們也應該去了解里面是如何實現的。這篇文章通過一步步分析從零開始了解go的chan是如何實現的,以及在使用過程中有什么需要注意的事項,chan的buf環形隊列是怎樣維護的,希望能對你有所幫助~
Reference
https://speakerdeck.com/kavya719/understanding-channels
https://github.com/talkgo/night/issues/450
https://codeburst.io/diving-deep-into-the-golang-channels-549fd4ed21a8