go中channel源碼剖析


channel

前言

channel作為go中並發的一神器,深入研究下吧。

設計的原理

在早期,CPU都是以單核的形式順序執⾏機器指令。Go語⾔的祖先C語⾔正是這種順序編程語⾔的代 表。順序編程語⾔中的順序是指:所有的指令都是以串⾏的⽅式執⾏,在相同的時刻有且僅有⼀個 CPU在順序執⾏程序的指令。

隨着處理器技術的發展,單核時代以提升處理器頻率來提⾼運⾏效率的⽅式遇到了瓶頸,⽬前各種主 流的CPU頻率基本被鎖定在了3GHZ附近。單核CPU的發展的停滯,給多核CPU的發展帶來了機遇。 相應地,編程語⾔也開始逐步向並⾏化的⽅向發展。Go語⾔正是在多核和⽹絡化的時代背景下誕⽣的 原⽣⽀持並發的編程語⾔。

常⻅的並⾏編程有多種模型,主要有多線程、消息傳遞等。從理論上來看,多線程和基於消息的並發 編程是等價的。由於多線程並發模型可以⾃然對應到多核的處理器,主流的操作系統因此也都提供了系統級的多線程⽀持,同時從概念上講多線程似乎也更直觀,因此多線程編程模型逐步被吸納到主流的編程語⾔特性或語⾔擴展庫中。⽽主流編程語⾔對基於消息的並發編程模型⽀持則相⽐較少,Erlang語⾔是⽀持基於消息傳遞並發編程模型的代表者,它的並發體之間不共享內存。Go語⾔是基於 消息並發模型的集⼤成者,它將基於CSP模型的並發編程內置到了語⾔中,通過⼀個go關鍵字就可以 輕易地啟動⼀個Goroutine,與Erlang不同的是Go語⾔的Goroutine之間是共享內存的。

共享內存

多線程共享內存。其實就是Java或者C++等語言中的多線程開發。單個的goutine代碼是順序執行,而並發編程時,創建多個goroutine,但我們並不能確定不同的goroutine之間的執行順序,多個goroutine之間大部分情況是代碼交叉執行,在執行過程中,可能會修改或讀取共享內存變量,這樣就會產生數據競爭,但是我們可以用鎖去消除數據的競爭。

當然這種在go中是不推薦的

csp

Go語⾔最吸引⼈的地⽅是它內建的並發⽀持。Go語⾔並發體系的理論是C.A.R Hoare在1978年提出的 CSP(Communicating Sequential Process,通訊順序進程)。CSP有着精確的數學模型,並實際應 ⽤在了Hoare參與設計的T9000通⽤計算機上。NewSqueak、Alef、Limbo到現在的Go語⾔,對於對 CSP有着20多年實戰經驗的Rob Pike來說,他更關注的是將CSP應⽤在通⽤編程語⾔上產⽣的潛⼒。 作為Go並發編程核⼼的CSP理論的核⼼概念只有⼀個:同步通信。

⾸先要明確⼀個概念:並發不是並⾏。並發更關注的是程序的設計層⾯,並發的程序完全是可以順序 執⾏的,只有在真正的多核CPU上才可能真正地同時運⾏。並⾏更關注的是程序的運⾏層⾯,並⾏⼀ 般是簡單的⼤量重復,例如GPU中對圖像處理都會有⼤量的並⾏運算。為更好的編寫並發程序,從設 計之初Go語⾔就注重如何在編程語⾔層級上設計⼀個簡潔安全⾼效的抽象模型,讓程序員專注於分解 問題和組合⽅案,⽽且不⽤被線程管理和信號互斥這些繁瑣的操作分散精⼒。

在並發編程中,對共享資源的正確訪問需要精確的控制,在⽬前的絕⼤多數語⾔中,都是通過加鎖等線程同步⽅案來解決這⼀困難問題,⽽Go語⾔卻另辟蹊徑,它將共享的值通過Channel傳遞(實際上多 個獨⽴執⾏的線程很少主動共享資源)。在任意給定的時刻,最好只有⼀個Goroutine能夠擁有該資源。數據競爭從設計層⾯上就被杜絕了。為了提倡這種思考⽅式,Go語⾔將其並發編程哲學化為⼀句⼝號:

Do not communicate by sharing memory; instead, share memory by communicating.

不要通過共享內存來通信,⽽應通過通信來共享內存。 這是更⾼層次的並發編程哲學(通過管道來傳值是Go語⾔推薦的做法)。雖然像引⽤計數這類簡單的並 發問題通過原⼦操作或互斥鎖就能很好地實現,但是通過Channel來控制訪問能夠讓你寫出更簡潔正確的程序。

channel

Golang中使用 CSP中 channel 這個概念。channel 是被單獨創建並且可以在進程之間傳遞,它的通信模式類似於 boss-worker 模式的,一個實體通過將消息發送到channel 中,然后又監聽這個 channel 的實體處理,兩個實體之間是匿名的,這個就實現實體中間的解耦,其中 channel 是同步的一個消息被發送到 channel 中,最終是一定要被另外的實體消費掉的。

channel的定義

channel 是一個引用類型,所以在它被初始化之前,它的值是 nil,channel 使用 make 函數進行初始化。go中內置的類型,初始化的時候,我們需要初始化channel的長度。

指定了長度代表有緩沖

ch := make(chan int, 1)

未指定就是無緩沖

ch := make(chan int)

有緩沖和無緩沖的差別是什么呢?

對不帶緩沖的 channel 進行的操作實際上可以看作“同步模式”,帶緩沖的則稱為“異步模式”。

同步模式下,發送方和接收方要同步就緒,只有在兩者都 ready 的情況下,數據才能在兩者間傳輸(后面會看到,實際上就是內存拷貝)。否則,任意一方先行進行發送或接收操作,都會被掛起,等待另一方的出現才能被喚醒。

異步模式下,在緩沖槽可用的情況下(有剩余容量),發送和接收操作都可以順利進行。否則,操作的一方(如寫入)同樣會被掛起,直到出現相反操作(如接收)才會被喚醒。

舉個栗子:

無緩沖的 就是一個送信人去你家門口送信 ,你不在家 他不走,你一定要接下信,他才會走。

無緩沖保證信能到你手上

有緩沖的 就是一個送信人去你家仍到你家的信箱 轉身就走 ,除非你的信箱滿了 他必須等信箱空下來。

有緩沖的 保證 信能進你家的郵箱

源碼剖析

type hchan struct {
    qcount   uint           // buffer 中已放入的元素個數
    dataqsiz uint           // 用戶構造 channel 時指定的 buf 大小,也就是底層循環數組的長度
    buf      unsafe.Pointer // 指向底層循環數組的指針 只針對有緩沖的 channel
    elemsize uint16         // buffer 中每個元素的大小
    closed   uint32         // channel 是否關閉,== 0 代表未 closed
    elemtype *_type         // channel 元素的類型信息
    sendx    uint           // 已發送元素在循環數組中的索引
    recvx    uint           // 已接收元素在循環數組中的索引
    recvq    waitq          // 等待接收的 goroutine  list of recv waiters
    sendq    waitq          // 等待發送的 goroutine list of send waiters

    lock mutex              // 保護 hchan 中所有字段
}

簡單分析下:

buf指向底層的循環數組,只有緩沖類型的channel才有。

sendx,recvx 均指向底層循環數組,表示當前可以發送和接收的元素位置索引值(相對於底層數組)。

sendq,recvq 分別表示被阻塞的 goroutine,這些 goroutine 由於嘗試讀取 channel 或向 channel 發送數據而被阻塞。讀的時候,如果循環數據為空,那么當前讀的goroutine就會加入到recvq,等待有消息寫入結束阻塞。同理寫入的goroutine,一樣,如果隊列滿了,就加入到sendq,阻塞直到消息寫入。

waitq 相關的屬性,可以理解為是一個 FIFO 的標准隊列。其中 recvq 中是正在等待接收數據的 goroutine,sendq 中是等待發送數據的 goroutine。waitq 使用雙向鏈表實現。

recvq和sendq,它們是 waitq 結構體,而waitq實際上就是一個雙向鏈表,鏈表的元素是sudog,里面包含 g 字段,g 表示一個 goroutine,所以 sudog 可以看成一個 goroutine。但是兩個還是有區別的。

lock通過互斥鎖保證數據安全。

設計思路:

對於無緩沖的是沒有buf,有緩沖的buf是有buf的,長度也就是創建channel制定的長度。

有緩沖channel的buf是循環使用的,已經讀取過的,會被后面新寫入的消息覆蓋,通過sendx,recvx這兩個指向底層數據的指針的滑動,實現對buf的復用。

具體的消息寫入讀讀取,以及goroutine的阻塞,請看下面

環形隊列

chan內部實現了一個環形隊列作為其緩沖區,隊列的長度是創建chan時指定的。

看下實現的圖片:

  • dataqsiz指示了隊列長度為6,即可緩存6個元素;
  • buf指向隊列的內存,隊列中還剩余兩個元素;
  • qcount表示隊列中還有兩個元素,也就是[1,3};
  • sendx指示后續寫入的數據存儲的位置,取值[0, 6);
  • recvx指示從該位置讀取數據, 取值[0, 6);

創建

func makechan(t *chantype, size int) *hchan {
	elem := t.elem

	// 做的一些檢查
	if elem.size >= 1<<16 {
		throw("makechan: invalid channel element type")
	}
	if hchanSize%maxAlign != 0 || elem.align > maxAlign {
		throw("makechan: bad alignment")
	}

	mem, overflow := math.MulUintptr(elem.size, uintptr(size))
	if overflow || mem > maxAlloc-hchanSize || size < 0 {
		panic(plainError("makechan: size out of range"))
	}

	// Hchan does not contain pointers interesting for GC when elements stored in buf do not contain pointers.
	// buf points into the same allocation, elemtype is persistent.
	// SudoG's are referenced from their owning thread so they can't be collected.
	// TODO(dvyukov,rlh): Rethink when collector can move allocated objects.
	var c *hchan
	switch {
	case mem == 0:
		// 隊列或元素大小為零
        // 當前 Channel 中不存在緩沖區,為 runtime.hchan 分配一段內存空間
		c = (*hchan)(mallocgc(hchanSize, nil, true))
		c.buf = c.raceaddr()
	case elem.ptrdata == 0:
		// 類型不是指針
		// 一次性給channel和buf(也就是底層數組)分類一塊連續的空間
		c = (*hchan)(mallocgc(hchanSize+mem, nil, true))
		c.buf = add(unsafe.Pointer(c), hchanSize)
	default:
		// 默認情況下會單獨為 runtime.hchan 和緩沖區分配內存
		c = new(hchan)
		c.buf = mallocgc(mem, elem, true)
	}

	// 最后更新幾個字段的值
	c.elemsize = uint16(elem.size)
	c.elemtype = elem
	c.dataqsiz = uint(size)

	if debugChan {
		print("makechan: chan=", c, "; elemsize=", elem.size, "; elemalg=", elem.alg, "; dataqsiz=", size, "\n")
	}
	return c
}

寫入數據

向一個channel寫入數據的流程

1、首先判斷recvq等待接收隊列是否為空,不為空說明緩沖區中沒有內容或者是一個無緩沖channel。直接從recvq中取出一個goroutine,然后寫入數據,接着喚醒goroutine,最后結束發發送過程。

2、如果緩沖區有空余的位置,寫入數據到緩沖區,完成發送。

3、如果緩沖區滿了,那么就把寫入數據的goroutine放到sendq中,進入睡眠,最后等待goroutine被喚醒。

func chansend(c *hchan, ep unsafe.Pointer, block bool, callerpc uintptr) bool {
	// 如果 channel 是 nil
	if c == nil {
		if !block {
			return false
		}
		gopark(nil, nil, waitReasonChanSendNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	if debugChan {
		print("chansend: chan=", c, "\n")
	}

	if raceenabled {
		racereadpc(c.raceaddr(), callerpc, funcPC(chansend))
	}
	// 對於不阻塞的 send,快速檢測失敗場景
	//
	// 如果 channel 未關閉且 channel 沒有多余的緩沖空間。這可能是:
	// 1. channel 是非緩沖型的,且等待接收隊列里沒有 goroutine
	// 2. channel 是緩沖型的,但循環數組已經裝滿了元素
	if !block && c.closed == 0 && ((c.dataqsiz == 0 && c.recvq.first == nil) ||
		(c.dataqsiz > 0 && c.qcount == c.dataqsiz)) {
		return false
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}
	// 加鎖
	lock(&c.lock)

	// 如果channel關閉了
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("send on closed channel"))
	}

	// 如果接收隊列里有 goroutine,直接將要發送的數據拷貝到接收 goroutine
	if sg := c.recvq.dequeue(); sg != nil {
		// Found a waiting receiver. We pass the value we want to send
		// directly to the receiver, bypassing the channel buffer (if any).
		send(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true
	}

	// 緩沖型的channel,buffer 中已放入的元素個數小於循環數組的長度
	if c.qcount < c.dataqsiz {
		// qp 指向 buf 的 sendx 位置
		qp := chanbuf(c, c.sendx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 將數據從 ep 處拷貝到 qp
		typedmemmove(c.elemtype, qp, ep)
		// 發送的游標加1
		c.sendx++
		// 如果發送的游標值等於容量值,游標值歸0
		if c.sendx == c.dataqsiz {
			c.sendx = 0
		}
		// 緩沖區的數量加1
		c.qcount++
		// 解鎖
		unlock(&c.lock)
		return true
	}
	// buff空間已經滿了
	// 如果不需要阻塞,則直接返回錯誤
	if !block {
		unlock(&c.lock)
		return false
	}

	// 否則,阻塞該 goroutine.
	// Block on the channel. Some receiver will complete our operation for us.
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}
	// No stack splits between assigning elem and enqueuing mysg
	// on gp.waiting where copystack can find it.
	mysg.elem = ep
	mysg.waitlink = nil
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.waiting = mysg
	gp.param = nil
	// 將該 goroutine 的結構放入 sendq 隊列
	c.sendq.enqueue(mysg)
	// 休眠
	// 等待 goready 喚醒
	goparkunlock(&c.lock, waitReasonChanSend, traceEvGoBlockSend, 3)
	// Ensure the value being sent is kept alive until the
	// receiver copies it out. The sudog has a pointer to the
	// stack object, but sudogs aren't considered as roots of the
	// stack tracer.
	KeepAlive(ep)

	// someone woke us up.
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if gp.param == nil {
		if c.closed == 0 {
			throw("chansend: spurious wakeup")
		}
		panic(plainError("send on closed channel"))
	}
	gp.param = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	mysg.c = nil
	releaseSudog(mysg)
	return true
}

簡單的流程圖如下:

讀取數據

從一個channel讀取數據的流程如下:

1、如果等待發送的goroutine list,也就是sendq不為空。並且沒有緩存區。直接從sendq中取出一個goroutine,讀出當前goroutine中的消息,喚醒goroutine,結束讀取的過程。

2、如果等待發送的goroutine list,也就是sendq不為空。說明緩沖區已經滿了,移動recvx指針的位置,取出一個數據。同時在sendq中取出一個goroutine,讀取里面的數據到buf中,結束當前讀取。

3、如果等待發送的goroutine list,也就是sendq為空。並且緩沖區,有數據。直接在緩沖區取出數據,完成本次讀取。

4、如果等待發送的goroutine list,也就是sendq為空。並且緩沖區,沒有數據。將當前goroutine加入recvq,進入睡眠,等待被寫goroutine喚醒。

// chanrecv在通道c上接收並將接收到的數據寫入ep。
// 如果 ep 是 nil,說明忽略了接收值。
// 如果 block == false,即非阻塞型接收,在沒有數據可接收的情況下,返回 (false, false)
// 否則,如果 c 處於關閉狀態,將 ep 指向的地址清零,返回 (true, false)
// 否則,用返回值填充 ep 指向的內存地址。返回 (true, true)
// 如果 ep 非空,則應該指向堆或者函數調用者的棧
func chanrecv(c *hchan, ep unsafe.Pointer, block bool) (selected, received bool) {

	if debugChan {
		print("chanrecv: chan=", c, "\n")
	}
	// 如果channel為nil
	if c == nil {
		// block == false,即非阻塞型接收,在沒有數據可接收的情況下,返回 (false, false)
		if !block {
			return
		}
		// 接收一個 nil 的 channel,goroutine 掛起
		gopark(nil, nil, waitReasonChanReceiveNilChan, traceEvGoStop, 2)
		throw("unreachable")
	}

	// 在非阻塞模式下,快速檢測到失敗,不用獲取鎖,快速返回
	// 當我們觀察到 channel 沒准備好接收:
	// 1. 非緩沖型,等待發送列隊 sendq 里沒有 goroutine 在等待
	// 2. 緩沖型,但 buf 里沒有元素
	// 之后,又觀察到 closed == 0,即 channel 未關閉。
	// 因為 channel 不可能被重復打開,所以前一個觀測的時候 channel 也是未關閉的,
	// 因此在這種情況下可以直接宣布接收失敗,返回 (false, false)
	if !block && (c.dataqsiz == 0 && c.sendq.first == nil ||
		c.dataqsiz > 0 && atomic.Loaduint(&c.qcount) == 0) &&
		atomic.Load(&c.closed) == 0 {
		return
	}

	var t0 int64
	if blockprofilerate > 0 {
		t0 = cputicks()
	}

	// 加鎖
	lock(&c.lock)

	// channel 已關閉,並且循環數組 buf 里沒有元素
	// 這里可以處理非緩沖型關閉 和 緩沖型關閉但 buf 無元素的情況
	// 也就是說即使是關閉狀態,但在緩沖型的 channel,
	// buf 里有元素的情況下還能接收到元素
	if c.closed != 0 && c.qcount == 0 {
		if raceenabled {
			raceacquire(c.raceaddr())
		}
		// 解鎖
		unlock(&c.lock)
		if ep != nil {
			// 從一個已關閉的 channel 執行接收操作,且未忽略返回值
			// 那么接收的值將是一個該類型的零值
			// typedmemclr 根據類型清理相應地址的內存
			typedmemclr(c.elemtype, ep)
		}
		// 從一個已關閉的 channel 接收,selected 會返回true
		return true, false
	}

	if sg := c.sendq.dequeue(); sg != nil {
		// 發現一個等待的發送者。如果緩沖區大小為0,則接收值
		// 直接來自發件人。否則,從隊列頭接收
		// 並將發送方的值添加到隊列的尾部(兩者都映射到相同的緩沖區槽,因為隊列已滿)。
		recv(c, sg, ep, func() { unlock(&c.lock) }, 3)
		return true, true
	}
	// 緩沖類型
	if c.qcount > 0 {
		// 直接從循環數組里找到要接收的元素
		qp := chanbuf(c, c.recvx)
		if raceenabled {
			raceacquire(qp)
			racerelease(qp)
		}
		// 代碼里,沒有忽略要接收的值,不是 "<- ch",而是 "val <- ch",ep 指向 val
		if ep != nil {
			typedmemmove(c.elemtype, ep, qp)
		}
		// 清除掉循環數組里相應位置的值
		typedmemclr(c.elemtype, qp)
		// 接收游標向前移動
		c.recvx++
		// 達到數據的長度,下標重新計算
		if c.recvx == c.dataqsiz {
			c.recvx = 0
		}
		// buf 數組里的元素個數減 1
		c.qcount--
		// 解鎖
		unlock(&c.lock)
		return true, true
	}
	// 非阻塞接收,解鎖。selected 返回 false,因為沒有接收到值
	if !block {
		unlock(&c.lock)
		return false, false
	}

	// 阻塞
	// 構建recvq的阻塞隊列
	gp := getg()
	mysg := acquireSudog()
	mysg.releasetime = 0
	if t0 != 0 {
		mysg.releasetime = -1
	}

	mysg.elem = ep
	mysg.waitlink = nil
	gp.waiting = mysg
	mysg.g = gp
	mysg.isSelect = false
	mysg.c = c
	gp.param = nil
	c.recvq.enqueue(mysg)
	// 將當前 goroutine 掛起
	goparkunlock(&c.lock, waitReasonChanReceive, traceEvGoBlockRecv, 3)

	// someone woke us up
	if mysg != gp.waiting {
		throw("G waiting list is corrupted")
	}
	gp.waiting = nil
	if mysg.releasetime > 0 {
		blockevent(mysg.releasetime-t0, 2)
	}
	closed := gp.param == nil
	gp.param = nil
	mysg.c = nil
	releaseSudog(mysg)
	return true, !closed
}

梳理下流程圖:

channel的關閉

chanbel的關閉,對於其中的recvq和sendq也就是阻塞的發送者和接收者,對於等待接收者而言,會收到一個相應類型的零值。對於等待發送者,會直接 panic。

channel的關閉不當會出現panic的場景:

1、關閉值為nil的channel
2、關閉已經關閉的channel
3、向已經關閉的channel寫入數據

func closechan(c *hchan) {
	// 關閉值為nil的channel,報錯panic
	if c == nil {
		panic(plainError("close of nil channel"))
	}
	// 加鎖
	lock(&c.lock)
	// 關閉已經關閉的channel,報錯panic
	if c.closed != 0 {
		unlock(&c.lock)
		panic(plainError("close of closed channel"))
	}

	if raceenabled {
		callerpc := getcallerpc()
		racewritepc(c.raceaddr(), callerpc, funcPC(closechan))
		racerelease(c.raceaddr())
	}
	// 修改關閉餓狀態
	c.closed = 1

	var glist gList

	// 釋放recvq中的sudog
	for {
		// 接收一個sudog
		sg := c.recvq.dequeue()
		// 全部接收完畢了
		if sg == nil {
			break
		}
		// 如果 elem 不為空,說明此 receiver 未忽略接收數據
		// 給它賦一個相應類型的零值
		if sg.elem != nil {
			typedmemclr(c.elemtype, sg.elem)
			sg.elem = nil
		}
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		// 取出goroutine
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}

	// 將 channel 等待發送隊列里的 sudog 釋放
	// 如果存在,這些 goroutine 將會 panic
	for {
		// 取出
		sg := c.sendq.dequeue()
		if sg == nil {
			break
		}
		// 發送者會 panic
		sg.elem = nil
		if sg.releasetime != 0 {
			sg.releasetime = cputicks()
		}
		gp := sg.g
		gp.param = nil
		if raceenabled {
			raceacquireg(gp, c.raceaddr())
		}
		glist.push(gp)
	}
	unlock(&c.lock)

	// Ready all Gs now that we've dropped the channel lock.
	for !glist.empty() {
		// 取出一個
		gp := glist.pop()
		gp.schedlink = 0
		// 喚醒相應 goroutine
		goready(gp, 3)
	}
}

優雅的關閉

對於channel的關閉,我們需要注意下:

1、在不能更改channel狀態的情況下,沒有簡單普遍的方式來檢查channel是否已經關閉了

2、關閉已經關閉的channel會導致panic,所以在closer(關閉者)不知道channel是否已經關閉的情況下去關閉channel是很危險的

3、發送值到已經關閉的channel會導致panic,所以如果sender(發送者)在不知道channel是否已經關閉的情況下去向channel發送值是很危險的

對於channel的關閉有這樣的一個原則:

don't close a channel from the receiver side and don't close a channel if the channel has multiple concurrent senders.

不要從一個receiver側關閉channel,也不要在有多個sender時,關閉channel。

向已經關閉的channel發送數據會導致panic,所以在receiver側關閉,sender是不知道channel是否關閉的,多個sender的情況下,某一個sender關閉了channel,其他的sender是不知道這個channel是否關閉的,再次寫入數據和關閉,都會導致panic。

M個receivers,一個sender

這個是最簡單的場景,當需要關閉的時候sender直接關閉就好了

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const MaxRandomNumber = 100000
	const NumReceivers = 100

	// 使用WaitGroup來阻塞查看打印的效果
	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// 設置channel的長度為10
	dataCh := make(chan int, 100)

	// the sender
	go func() {
		for {
			if value := rand.Intn(MaxRandomNumber); value == 0 {
				// 需要關閉的時候直接關閉就好了,是很安全的
				close(dataCh)
				return
			} else {
				dataCh <- value
			}
		}
	}()

	// receivers
	for i := 0; i < NumReceivers; i++ {
		go func() {
			defer wgReceivers.Done()

			// 監聽dataCh,接收里面的值
			for value := range dataCh {
				log.Println(value)
			}
		}()
	}

	wgReceivers.Wait()

}

一個receiver,N個sender

這種情況下可以給個signal channel,然后通知senders去停止向channel發送數據。因為receiver不能去關閉channel,這樣senders將會觸發panic,
但是我們可以讓receiver通知signal channel來告訴senders來停止發送。

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	// ...
	const MaxRandomNumber = 100000
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(1)

	// 發送數據的channel
	dataCh := make(chan int, 100)

	// 無緩沖的channel作為信號量,通知senders的推出
	stopCh := make(chan struct{})

	// 啟動個NumSenders個sender
	for i := 0; i < NumSenders; i++ {
		go func() {
			for {
				value := rand.Intn(MaxRandomNumber)

				// 監測到退出信號,馬上退出goroutine
				// 否則正常寫入dataCh,數據
				select {
				case <-stopCh:
					return
				case dataCh <- value:
				}
			}
		}()
	}

	// 消費者
	go func() {
		defer wgReceivers.Done()

		for value := range dataCh {
			// 某個場景下發出退出的信號量
			if value == MaxRandomNumber-1 {
				close(stopCh)
				return
			}

			log.Println(value)
		}
	}()

	wgReceivers.Wait()
}

通過stopCh來,作為信號量,來通知發送者的goroutine退出。

M個receiver,N個sender

這是比較復雜的一個,相比上面的一個receiver,M個receiver中任意一個receiver發出關閉的信息,需要同步到其他的receiver,防止其他的receiver,再次發出關閉的請求,出發panic。

package main

import (
	"log"
	"math/rand"
	"strconv"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())
	log.SetFlags(0)

	const MaxRandomNumber = 100000
	const NumReceivers = 10
	const NumSenders = 1000

	wgReceivers := sync.WaitGroup{}
	wgReceivers.Add(NumReceivers)

	// 數據的channel
	dataCh := make(chan int, 100)
	// 關閉的channel的信號
	stopCh := make(chan struct{})
	// toStop通知關閉stopCh,同時作為receiver退出的信息
	toStop := make(chan string, 1)

	var stoppedBy string

	// 當收到toStop的信號,關閉stopCh
	go func() {
		stoppedBy = <-toStop
		close(stopCh)
	}()

	// 發送端
	for i := 0; i < NumSenders; i++ {
		go func(id string) {
			for {
				value := rand.Intn(MaxRandomNumber)
				// 滿足條件發出關閉的請求到toStop
				if value == 0 {
					select {
					case toStop <- "sender#" + id:
					default:
					}
					return
				}

				select {
				// 檢測的關閉的stopCh,退出發送者
				case <-stopCh:
					return
				case dataCh <- value:
				}
			}
		}(strconv.Itoa(i))
	}

	// 接收端
	for i := 0; i < NumReceivers; i++ {
		go func(id string) {
			defer wgReceivers.Done()

			for {
				select {
				// 檢測的關閉的stopCh,退出接收者
				case <-stopCh:
					return
				case value := <-dataCh:
					// 滿足條件發出關閉的請求到toStop
					if value == MaxRandomNumber-1 {
						select {
						case toStop <- "receiver#" + id:
						default:
						}
						return
					}

					log.Println(value)
				}
			}
		}(strconv.Itoa(i))
	}

	wgReceivers.Wait()
	log.Println("stopped by", stoppedBy)
}

這樣的設計就很好了,可以在sender和receiver兩端發出關閉的請求。保證了sender和receiver都能夠退出。

關閉的 channel 仍能讀出數據

從一個有緩沖的channel里面讀取數據,當channel被關閉了,仍能讀出有效值。只有當返回的ok為false,讀出的是無效的,也就是數據的零值。

package main

import "fmt"

func main() {
	ch := make(chan int, 5)
	ch <- 18
	close(ch)
	x, ok := <-ch
	if ok {
		fmt.Println("received: ", x)
	}

	x, ok = <-ch
	if !ok {
		fmt.Println("channel closed, data invalid.")
		// ok為false
		fmt.Println(x)
	}
}

一個有緩沖的channel,寫入了一個數據,當返回的ok為false,取出的數據就是定義類型的零值。

received:  18
channel closed, data invalid.
0

控制goroutine的數量

go中在大量並發的情況下會產生很多的goroutine,而goroutine使用之后,是不會被完全回收的,大概會有2kb的空間,所以我們希望控制下goroytine的並發數量。

func main() {
	jobsCount := 100
	group := sync.WaitGroup{}
	var jobsChan = make(chan int, 3)
	// a) 生成指定數目的 goroutine,每個 goroutine 消費 jobsChan 中的數據
	poolCount := 3
	for i := 1; i < poolCount; i++ {
		go func() {
			for j := range jobsChan {
				fmt.Printf("hello %d\n", j)
				time.Sleep(time.Second)
				group.Done()
			}
		}()
	}
	// b) 把 job 依次推送到 jobsChan 供 goroutine 消費
	for i := 0; i < jobsCount; i++ {
		jobsChan <- i
		group.Add(1)
		fmt.Printf("index: %d,goroutine Num: %d\n", i, runtime.NumGoroutine())
	}
	group.Wait()
	fmt.Println("done!")
}

range和select讀取channel的內容區別

range讀取channel內容

range讀取channel里面的內容,會自動等待channel的動作一直到channel被關閉,然后把channel里面的內容全部讀出來。

package main

import "fmt"

func main() {

    queue := make(chan string, 2)
    queue <- "one"
    queue <- "two"
    close(queue)

    for elem := range queue {
        fmt.Println(elem)
    }
}

輸出打印的結果

one
two

select讀取channel內容

select關鍵字用於多個channel的結合,這些channel會通過類似於are-you-ready polling的機制來工作。

package main

import (
    "fmt"
    "time"
)

func main() {

    c1 := make(chan string)
    c2 := make(chan string)

    go func() {
        time.Sleep(1 * time.Second)
        c1 <- "one"
    }()
    go func() {
        time.Sleep(2 * time.Second)
        c2 <- "two"
    }()

    for i := 0; i < 2; i++ {
        select {
        case msg1 := <-c1:
            fmt.Println("received", msg1)
        case msg2 := <-c2:
            fmt.Println("received", msg2)
        }
    }
}

打印下輸出的效果

received one
received two

需要注意的點

操作 nil channel closed channel not nil ,not closed
close panic panic 正常關閉
讀<-ch 阻塞 里面的內容讀完了,之后獲取到的是類型的零值 阻塞或正常讀取數據。緩沖型 channel 為空或非緩沖型 channel 沒有等待發送者時會阻塞
寫ch<- 阻塞 panic 阻塞或正常寫入數據。非緩沖型 channel 沒有等待接收者或緩沖型 channel buf 滿時會被阻塞

發送和接收的本質

Remember all transfer of value on the go channels happens with the copy of value.

和go中函數一樣,channel中發送和接收的操作都是值傳遞。

參考

【Go的CSP並發模型】https://www.jianshu.com/p/a3c9a05466e1
【goroutine, channel 和 CSP】http://www.moye.me/2017/05/05/go-concurrency-patterns/
【通過同步和加鎖解決多線程的線程安全問題】https://blog.ailemon.me/2019/05/15/solving-multithreaded-thread-safety-problems-by-synchronization-and-locking/
【golang channel 有緩沖 與 無緩沖 的重要區別】https://my.oschina.net/u/157514/blog/149192
【Golang channel 源碼深度剖析】https://www.cyhone.com/articles/analysis-of-golang-channel/
【《Go專家編程》Go channel實現原理剖析】https://my.oschina.net/renhc/blog/2246871
【深度解密Go語言之channel】https://www.cnblogs.com/qcrao-2018/p/11220651.html
【如何優雅地關閉Go channel】https://www.jianshu.com/p/d24dfbb33781
【Go by Example: Range over Channels】https://gobyexample.com/range-over-channels
【Go by Example: Select】https://gobyexample.com/select


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM