(年初的時候go語言的學習提上了日程,前一篇sync.pool閱讀之后,閱讀代碼進度本該更快些,奈何身體被掏空,所以這篇文章斷斷續續一個月終於攢起來了。)
簡介
channel是golang中用於goroutine之間通訊的數據結構,有以下特點:
- 線程安全
- 創建channel時返回的是指針,不需要考慮拷貝的問題
- 順序通訊,寫入和讀出的順序一致
數據部分
源碼位置go/src/runtime/chan.go
hchan
channel對應的數據結構
type hchan struct {
qcount uint
dataqsiz uint
buf unsafe.Pointer
elemsize uint16
closed uint32
elemtype *_type
sendx uint
recvx uint
recvq waitq
sendq waitq
// lock protects all fields in hchan, as well as several
// fields in sudogs blocked on this channel.
//
// Do not change another G's status while holding this lock
// (in particular, do not ready a G), as this can deadlock
// with stack shrinking.
lock mutex
}
參數意義
qcount uint // 表示channel中元素的個數
dataqsiz uint // 表示channel的大小長度
buf unsafe.Pointer // 存儲元素的環形隊列頭指針
elemsize uint16 // 表示此channel能存儲元素的大小
closed uint32 // channel是否關閉了
elemtype *_type // 表示此channel能存儲元素的類型
sendx uint // 表示發送操作對應buf的下標,超過dataqsiz之后清0(因為是循環隊列嘛)
recvx uint // 表示接收操作對應buf的下標
recvq waitq // 等待接收操作的goroutine隊列
sendq waitq // 等待發送操作的goroutine隊列
lock mutex // channel的鎖
waitq
用來表示等待發送或者接受的goroutine隊列(用sudog表示隊列一個節點)
type waitq struct {
first *sudog
last *sudog
}
參數意義
first goroutine指針,隊首指針
last goroutine指針,隊尾指針
函數
enqueue
兩種情況:
- 隊列為空,將元素放入隊尾將first指針和last指針賦好值
- 隊列不為空,直接將元素放入隊尾
func (q *waitq) enqueue(sgp *sudog) {
// 將goroutine的next置為空
sgp.next = nil
x := q.last
if x == nil { // 如果尾指針為空,說明隊列為空,就把這個goroutine放進去
sgp.prev = nil
q.first = sgp
q.last = sgp
return
}
// 直接入隊列
sgp.prev = x
x.next = sgp
q.last = sgp
}
dequeue
從隊列頭開始遍歷
- first指針為空,說明隊列為空,則直接返回空
- 如果隊列只有一個元素了,將元素取出,並且清空first指針和last指針
- 隊列還有很多元素,直接將first指針對應的元素去除
- 最后判斷如果這個元素(sudog——在channel中用來表示等待接收或者發送的goroutine的)在select結構中並且select結構有其他接口,就跳過,繼續遍歷下一個節點。
func (q *waitq) dequeue() *sudog {
for {
sgp := q.first
if sgp == nil { // 頭指針為空,說明隊列為空,直接返回
return nil
}
y := sgp.next
if y == nil { // 如果next指針為空,說明隊列就一個元素了,取出這個就空了,就將隊列置空
q.first = nil
q.last = nil
} else { // next不為空,就將next作為隊首,將原來的隊首返回
y.prev = nil
q.first = y
sgp.next = nil // mark as removed (see dequeueSudog)
}
// if a gogoroutine was put on this queue because of a
// select, there is a small window between the gogoroutine
// being woken up by a different case and it grabbing the
// channel locks. Once it has the lock
// it removes itself from the queue, so we won't see it after that.
// We use a flag in the G struct to tell us when someone
// else has won the race to signal this gogoroutine but the gogoroutine
// hasn't removed itself from the queue yet.
// 如果goroutine處於select結構中並且select有其他出口就跳過這個
if sgp.isSelect && !atomic.Cas(&sgp.g.selectDone, 0, 1) {
continue
}
return sgp
}
}
sudog
sudog是在等待對channel發送或者接受的goroutine
為什么有了goroutine還要有一個sudog?
- 因為goroutine和等待的channel是多對多的關系,一個goroutine可能在等待多個channel,一個channel也可能有很多goroutine在等待,所以用sudog表示這個等待中的goroutine
- sudog是channel等待或者接發送鏈表的一個node
sudog通過acquireSudog創建,releaseSudog銷毀
- 在go/src/runtime/proc.go中
- go會維護一個全局的緩存(有鎖),然后每個調度器(P)有自己的緩存
- 創建sudog時會先從P的緩存中找,沒有就到全局緩存中找,在沒有才new一個
- 銷毀sudog的時候先判斷P是不是滿了,如果滿了就將一半緩存放到全局緩存然后再把sudog放到自己緩存
- 全局緩存的生存周期時兩次GC的間隔,go/src/runtime/mgc.go 中clearpools()函數中可以看到,每次GC都會清理全局緩存
type sudog struct {
// sudog替哪個goroutine在等待
g *g
isSelect bool // 是否在select結構中(select可能取消阻塞發送或接收)
next *sudog // 下一個節點
prev *sudog // 上一個節點
elem unsafe.Pointer // data element (may point to stack)
acquiretime int64 // 創建時間
releasetime int64 // 釋放時間
ticket uint32
parent *sudog // semaRoot binary tree
waitlink *sudog // g.waiting list or semaRoot
waittail *sudog // semaRoot
c *hchan // channel 在等待哪個channel
}
創建sudog——acquireSudog
大概邏輯就是現在當前goroutine所在調度器(P)的緩存中找,如果沒有就從全局緩存中找,如果還沒有就new一個
func acquireSudog() *sudog {
// 獲得當前goroutine所在的線程(M)
mp := acquirem()
// 獲得當前goroutine所在調度器(P)
pp := mp.p.ptr()
if len(pp.sudogcache) == 0 { // 如果調度器的sudog緩存為空,就從中央緩存找,如果再為空就new一個
lock(&sched.sudoglock)
// First, try to grab a batch from central cache.
for len(pp.sudogcache) < cap(pp.sudogcache)/2 && sched.sudogcache != nil {
s := sched.sudogcache
sched.sudogcache = s.next
s.next = nil
pp.sudogcache = append(pp.sudogcache, s)
}
unlock(&sched.sudoglock)
// If the central cache is empty, allocate a new one.
if len(pp.sudogcache) == 0 {
pp.sudogcache = append(pp.sudogcache, new(sudog))
}
}
n := len(pp.sudogcache)
s := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if s.elem != nil {
throw("acquireSudog: found s.elem != nil in cache")
}
releasem(mp)
return s
}
銷毀sudog——releaseSudog
大概邏輯就是如果當前goroutine所在調度器(P)的緩存滿了,就將調度器(P)的緩存一半放入全局緩存,然后在把sudog放入
func releaseSudog(s *sudog) {
// 這部分都是check sudog 是否合法
if s.elem != nil {
throw("runtime: sudog with non-nil elem")
}
if s.isSelect {
throw("runtime: sudog with non-false isSelect")
}
if s.next != nil {
throw("runtime: sudog with non-nil next")
}
if s.prev != nil {
throw("runtime: sudog with non-nil prev")
}
if s.waitlink != nil {
throw("runtime: sudog with non-nil waitlink")
}
if s.c != nil {
throw("runtime: sudog with non-nil c")
}
gp := getg()
if gp.param != nil {
throw("runtime: releaseSudog with non-nil gp.param")
}
mp := acquirem() // avoid rescheduling to another P
pp := mp.p.ptr()
// 如果當前調度器的緩存滿了,就將一半放入中央緩存
if len(pp.sudogcache) == cap(pp.sudogcache) {
// Transfer half of local cache to the central cache.
var first, last *sudog
for len(pp.sudogcache) > cap(pp.sudogcache)/2 {
n := len(pp.sudogcache)
p := pp.sudogcache[n-1]
pp.sudogcache[n-1] = nil
pp.sudogcache = pp.sudogcache[:n-1]
if first == nil {
first = p
} else {
last.next = p
}
last = p
}
lock(&sched.sudoglock)
last.next = sched.sudogcache
sched.sudogcache = first
unlock(&sched.sudoglock)
}
pp.sudogcache = append(pp.sudogcache, s)
releasem(mp)
}
實現細節
創建channel
go中所有的channel的創建都會使用make關鍵字,make(arg1, arg2)函數最終會調用到runtime.makechan和runtime.makechan64,下面講解go在編譯時期是如何做這些事情的
typecheck.go
編譯器會將make(arg1, arg2)轉化成OMAKE類型的節點,並在類型檢查階段將OMAKE類型的節點按照arg1的類型轉化為OMAKECHAN,OMAKEMAP,OMAKESLICE等類型
func typecheck1(n *Node, top int) (res *Node) {
...
switch n.Op {
...
case OMAKE:
...
switch t.Etype {
...
case TCHAN:
l = nil
if i < len(args) {
....
} else {
n.Left = nodintconst(0)
}
n.Op = OMAKECHAN // 節點類型轉化為OMAKECHAN
}
...
}
...
}
walk.go
OMAKECHAN類型的節點最終會在SSA中間代碼生成之前被轉化成runtime.makechan或者runtime.makechan64
func walkexpr(n *Node, init *Nodes) *Node {
...
switch n.Op {
...
case OMAKECHAN:
// When size fits into int, use makechan instead of
// makechan64, which is faster and shorter on 32 bit platforms.
size := n.Left
fnname := "makechan64"
argtype := types.Types[TINT64]
// Type checking guarantees that TIDEAL size is positive and fits in an int.
// The case of size overflow when converting TUINT or TUINTPTR to TINT
// will be handled by the negative range checks in makechan during runtime.
if size.Type.IsKind(TIDEAL) || maxintval[size.Type.Etype].Cmp(maxintval[TUINT]) <= 0 {
fnname = "makechan"
argtype = types.Types[TINT]
}
n = mkcall1(chanfn(fnname, 1, n.Type), n.Type, init, typename(n.Type), conv(size, argtype))
...
}
...
}
makechan64
check一下size是否是int,然后就執行makechan了
func makechan64(t *chantype, size int64) *hchan {
if int64(int(size)) != size {
panic(plainError("makechan: size out of range"))
}
return makechan(t, int(size))
}
makechan
- 安全檢查: channel能存的元素類型大小是否超過2^16
- 判斷hchanSize是否關於maxAlign對齊,判斷元素對齊是否maxAlign小,如果大maxAlign就沒用了,這里hchanSize設計十分巧妙,位運算神操作優化,可以看另一篇文章關於2的n次冪對齊
- 判斷申請的空間大小是否uint64大,判斷所需空間是否超過最大可申請空間,判斷size是否小於0(非法)
- 然后就是給hchan申請內存空間了
- 無緩沖的size=0的,只需要給hchan申請hchansize大小的內存空間即可
- 有緩沖,但是元素是非指針類型的,就申請hchanSize+mem大小的連續內存空間, 並將hchanSize之后的首地址賦值給buf
- 有緩沖,並且元素類型是指針的,hchan和底層buf內存就可以分開申請不用連續
- 給其他變量賦值
- 返回hchan指針,注意這里返回的是指針,所以channel在各函數之間傳遞時,就不是值傳遞了
為什么元素類型是非指針hchan和buf要在一段地址連續的內存中,而指針類型的則可以分開
這是源碼注釋的原話:
Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
buf points into the same allocation, elemtype is persistent.
SudoG's are referenced from their owning thread so they can't be collected.
TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
猜想:
大概意思是,當channel中元素類型不包含指針時,gc時需要回收這段空間的,當channel中元素類型包含指針時,這些指針被自己所在線程引用gc是不能回收,所以當元素不包含指針時申請一段連續的空間可以減小gc的壓力
func makechan(t *chantype, size int) *hchan {
elem := t.elem
// compiler checks this but be safe.
if elem.size >= 1<<16 {
throw("makechan: invalid channel element type")
}
if hchanSize%maxAlign != 0 || elem.align > maxAlign {
throw("makechan: bad alignment")
}
// 計算大小需要多少空間,check是否
// math.MulUintptr(a, b)函數返回a * b,以及結果是否超過uintptr的最大值
// 判斷所需空間是否比uint64大,判斷所需空間是否超過最大可申請空間,判斷size是否小於0(非法)
mem, overflow := math.MulUintptr(elem.size, uintptr(size))
if overflow || mem > maxAlloc-hchanSize || size < 0 {
panic(plainError("makechan: size out of range"))
}
var c *hchan
switch {
case mem == 0:
// 就是無緩沖channel,只需要申請hchan需要的大小就行
c = (*hchan)(mallocgc(hchanSize, nil, true))
c.buf = c.raceaddr()
case elem.ptrdata == 0:
// 有緩沖隊列channel,但是存放元素不是指針類型的,就要申請hchanSize+這些元素大小的內存空間,然后把申請下來空間首地址賦給buf
c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
c.buf = add(unsafe.Pointer(c), hchanSize)
default:
// 默認除了給hchan申請內存空間之外還需要申請size個元素大小的內存空間,並且把首地址賦給c.buf
c = new(hchan)
c.buf = mallocgc(mem, elem, true)
}
c.elemsize = uint16(elem.size)
c.elemtype = elem
c.dataqsiz = uint(size)
if debugChan {
print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
}
return c
}
發送
具體編譯時做的轉換可參考makechan,代碼都在類似的地方
chansend
- 首先檢測channel是否為空, 如果為空直接報錯
- check是否開啟了競爭檢測,golang的競爭檢測通過ThreadSanitizer庫(C++)做的
- 然后kill掉一些不用加鎖就可以判斷的情況,如果是非阻塞並且channel未關閉,size = 0或者channel滿了, 直接返回false(發送失敗)
- 如果已經有goroutine在等待了,就直接調send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int)發給那個goroutine
- 如果沒有goroutine在等待.
- 如果channel是非阻塞並且還地方,就放入buffer中,如果沒地方了就直接返回false
- 如果channel是阻塞並且不在select中或者在select中且沒有其他出口的,就將創建一個sudog,將sudog初始化並且放入待發送隊列(sendq), 並且調用goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)使當前goroutine陷入沉睡直到被喚醒(已經發出去了)
- 清理這個過程的垃圾數據
第四步中如果有goroutine在等待就直接發送,會影響非阻塞channel數據的順序嗎?
不會,channel的數據由唯一全局鎖保護,讀寫互斥,假設一個goroutine來讀channel,只有兩種情況:
- channel buffer中有數據,這時goroutine會直接讀取數據,不會被阻塞。
- channel buffer中沒有數據,這時goroutine會被阻塞。
只有當buffer中有數據且有goroutine被阻塞時,順序才會被打亂,但這兩個條件是互斥的,有數據就不可能阻塞,阻塞就不可能有數據。
// entry point for c <- x from compiled code
//go:nosplit
func chansend1(c *hchan, elem unsafe.Pointer) {
chansend(c, elem, true, getcallerpc())
}
func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
// 如果c為空
if c == nil {
// 如果是非阻塞的,就是那種有容量的,就返回false寫channel失敗
if !block {
return false
}
// 如果是非阻塞的就讓當前goroutine停止(這里寫個小程序就能看效果,這個goroutine的defer不會執行)
gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
throw("unreachable")
}
if debugChan {
print("chansend: chan=", c, "\n")
}
if raceenabled { // 開啟競爭檢測
racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
}
// 先不加鎖判斷非阻塞channel且沒關閉
// 如果size = 0或者channel滿了, 直接返回false(發送失敗)
if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
return false
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
// channel加鎖
lock(&c.lock)
// 如果channel關閉了,就返回panic
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("send on closed channel"))
}
// 等receive隊列的隊首中取出一個接收者,如果這個接收者不是nil就繞過buffer直接把ep發給他,並且釋放鎖
if sg := c.recvq.dequeue(); sg != nil {
// Found a waiting receiver. We pass the value we want to send
// directly to the receiver, bypassing the channel buffer (if any).
send(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true
}
// channel還沒滿就將元素放入buffer
if c.qcount < c.dataqsiz {
// Space is available in the channel buffer. Enqueue the element to send.
// 獲取一下發送數據的位置
qp := chanbuf(c, c.sendx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
// 將元素拷貝進buffer
typedmemmove(c.elemtype, qp, ep)
c.sendx++
if c.sendx == c.dataqsiz { // 循環一下
c.sendx = 0
}
c.qcount++
unlock(&c.lock)
return true
}
// 如果是非阻塞channel滿了就返回false
if !block {
unlock(&c.lock)
return false
}
// Block on the channel. Some receiver will complete our operation for us.
// 獲取當前goroutine
gp := getg()
// 創建sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
// No stack splits between assigning elem and enqueuing mysg
// on gp.waiting where copystack can find it.
// 完善sudog的信息
mysg.elem = ep
mysg.waitlink = nil
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.waiting = mysg
gp.param = nil
// 放入發送列表中
c.sendq.enqueue(mysg)
// 將當前goroutine陷入沉睡
goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
KeepAlive(ep)
// 再次喚醒的時候說明元素已經發送完畢了
// someone woke us up.
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if gp.param == nil {
if c.closed == 0 {
throw("chansend: spurious wakeup")
}
panic(plainError("send on closed channel"))
}
gp.param = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
mysg.c = nil
releaseSudog(mysg)
return true
}
racereadpc
go/src/runtime/race_amd64.s
PC: 指令計數器寄存器
FP: 函數的幀指針,引用函數的參數。使用形如 symbol+offset(FP) 的方式,引用函數的輸入參數。例如 arg0+0(FP),arg1+8(FP),使用 FP 不加 symbol 時,無法通過編譯,在匯編層面來講,symbol 並沒有什么用,加 symbol 主要是為了提升代碼可讀性。
SP: 當前函數棧幀的底部
SB: 全局靜態基指針,一般用來聲明函數或全局變量
參數0放在DI通用寄存器
參數1放在SI通用寄存器
參數2放在DX通用寄存器
參數3放在CX通用寄存器
#define RARG0 DI
#define RARG1 SI
#define RARG2 DX
#define RARG3 CX
// void runtime·racereadpc(void *addr, void *callpc, void *pc)
TEXT runtime·racereadpc(SB), NOSPLIT, $0-24
MOVQ addr+0(FP), RARG1
MOVQ callpc+8(FP), RARG2
MOVQ pc+16(FP), RARG3
ADDQ $1, RARG3 // pc is function start, tsan wants return address
// void __tsan_read_pc(ThreadState *thr, void *addr, void *callpc, void *pc);
MOVQ $__tsan_read_pc(SB), AX
JMP racecalladdr<>(SB)
send
用於給goroutine直接發送數據
- 如果數據沒問題就直接將數據拷貝到x := <- c表達式x的內存地址上
- 然后將該goroutine放到處理器(P)的runnext上面等待執行,這里不是直接讓goroutine執行,而是等下一次調度的時候直接調這個goroutine
func send(c *hchan, sg *sudog, ep unsafe.Pointer, unlockf func(), skip int) {
if raceenabled {
if c.dataqsiz == 0 {
racesync(c, sg)
} else {
qp := chanbuf(c, c.recvx)
raceacquire(qp)
racerelease(qp)
raceacquireg(sg.g, qp)
racereleaseg(sg.g, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.sendx = c.recvx // c.sendx = (c.sendx+1) % c.dataqsiz
}
}
if sg.elem != nil { // 如果元素沒問題就將發送的數據拷貝到x := <- c表達式x所在內存地址上
sendDirect(c.elemtype, sg, ep)
sg.elem = nil
}
// 然后將將該goroutine放到處理器(P)的runnext上面等待執行,這里不是直接讓goroutine執行,而是等下一次調度的時候直接調這個goroutine
gp := sg.g
unlockf()
gp.param = unsafe.Pointer(sg)
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
goready(gp, skip+1)
}
接收
具體編譯時做的轉換可參考makechan,代碼都在類似的地方
chanrecv
兩種接收方式:
chanrecv1是丟棄channel出來的元素,類似 <- c這中表達式
chanrecv2是使用channel出來的元素,類似 elem := <- c
最終都會調用到chanrecv
// entry points for <- c from compiled code
//go:nosplit
func chanrecv1(c *hchan, elem unsafe.Pointer) {
chanrecv(c, elem, true)
}
//go:nosplit
func chanrecv2(c *hchan, elem unsafe.Pointer) (received bool) {
_, received = chanrecv(c, elem, true)
return
}
- 判斷chan是否為nil,如果是直接報錯
- kill掉一些不用枷鎖就可以判斷的情況,如果是非阻塞並且隊列為空並且channel未關閉就返回false
- 如果channel已經關閉了,就清空ep中的數據,立即返回
- 如果已經有sendq在等待了(發送端提到過,如果沒有goroutine等待接受,就加入sendq), 就直接接收這個元素
- 如果此時沒有goroutine等待發送
- 如果是非阻塞且buffer中有數據直接從buffer中取出,如果沒有數據直接返回false
- 如果是阻塞的且當前goroutine沒在select中或者在select中但沒有其他出口,就把自己加入recvq,然后調用goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3),等待被喚醒(如果被喚醒說明有有數據來了)
- 清理這個過程中的垃圾數據
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {
if debugChan {
print("chanrecv: chan=", c, "\n")
}
// channel為空就使goroutine 停止並報錯
if c == nil {
if !block {
return
}
gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
throw("unreachable")
}
// 如果是非阻塞並且隊列為空並且channel未關閉就返回false
if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
atomic.Load(&c.closed) == 0 {
return
}
var t0 int64
if blockprofilerate > 0 {
t0 = cputicks()
}
lock(&c.lock)
// 如果channel已關閉, 並且沒有數據了就清除ep中的數據立刻返回
if c.closed != 0 && c.qcount == 0 {
if raceenabled {
raceacquire(c.raceaddr())
}
unlock(&c.lock)
if ep != nil {
typedmemclr(c.elemtype, ep)
}
return true, false
}
// 如果已經有goroutine等着了,就直接讓這個goroutine recive
if sg := c.sendq.dequeue(); sg != nil {
recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
return true, true
}
// channel不為空將元素復制到ep中(ep := <- c)
if c.qcount > 0 {
// Receive directly from queue
qp := chanbuf(c, c.recvx)
if raceenabled {
raceacquire(qp)
racerelease(qp)
}
if ep != nil {
typedmemmove(c.elemtype, ep, qp)
}
typedmemclr(c.elemtype, qp)
c.recvx++
if c.recvx == c.dataqsiz {
c.recvx = 0
}
c.qcount--
unlock(&c.lock)
return true, true
}
// 如果是非阻塞直接返回false
if !block {
unlock(&c.lock)
return false, false
}
// 獲取當前goroutine
gp := getg()
// 創建sudog
mysg := acquireSudog()
mysg.releasetime = 0
if t0 != 0 {
mysg.releasetime = -1
}
mysg.elem = ep
mysg.waitlink = nil
gp.waiting = mysg
mysg.g = gp
mysg.isSelect = false
mysg.c = c
gp.param = nil
c.recvq.enqueue(mysg)
goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)
// someone woke us up
if mysg != gp.waiting {
throw("G waiting list is corrupted")
}
gp.waiting = nil
if mysg.releasetime > 0 {
blockevent(mysg.releasetime-t0, 2)
}
closed := gp.param == nil
gp.param = nil
mysg.c = nil
releaseSudog(mysg)
return true, !closed
}
關閉channel
關閉channel大概邏輯就是,將buffer中的數據都釋放掉,然后close設置為0
func closechan(c *hchan) {
// 如果為空拋出異常
if c == nil {
panic(plainError("close of nil channel"))
}
lock(&c.lock)
// 如果channel已經關閉就拋出異常
if c.closed != 0 {
unlock(&c.lock)
panic(plainError("close of closed channel"))
}
if raceenabled {
callerpc := getcallerpc()
racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
racerelease(c.raceaddr())
}
c.closed = 1
var glist gList
// 清理所有的數據
// release all readers
for {
sg := c.recvq.dequeue()
if sg == nil {
break
}
if sg.elem != nil {
typedmemclr(c.elemtype, sg.elem)
sg.elem = nil
}
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
// release all writers (they will panic)
for {
sg := c.sendq.dequeue()
if sg == nil {
break
}
sg.elem = nil
if sg.releasetime != 0 {
sg.releasetime = cputicks()
}
gp := sg.g
gp.param = nil
if raceenabled {
raceacquireg(gp, c.raceaddr())
}
glist.push(gp)
}
unlock(&c.lock)
// Ready all Gs now that we've dropped the channel lock.
for !glist.empty() {
gp := glist.pop()
gp.schedlink = 0
goready(gp, 3)
}
}
附錄
chan.dot
digraph {
bgcolor="#C6CFD532";
node [shape=record, fontsize="8", margin="0.04", height=0.2, color=gray]
edge [fontname="Inconsolata, Consolas", fontsize=10, arrowhead=normal]
hchan [shape=record,label="{qcount|dataqsiz|buf|elemsize|closed|elemtype|<sendx>sendx|<recvx>recvx|recvq|sendq|lock}",xlabel="hchan"]
waitq[shape=record,label="{<first>first|<last>last}",xlabel="waitq"]
sudog[shape=record,label="{g|isSelect|next|prev|elem|acquiretime|releasetime|ticket|parent|waitlink|waittail|c}",xlabel="sudog"]
hchan:sendx -> waitq [label="發送隊列"]
hchan:recvx -> waitq [label="接收隊列"]
waitq:first -> sudog
waitq:last -> sudog
}