-
goroutines
: 獨立執行每個任務,並
可能並行
執行
-
channels
: 用於 goroutines 之間的通訊、同步
對於下面這樣的非並發的程序:
func main()
{
tasks := getTasks()
// 處理每個任務
for
_, task :=
range
tasks {
process(task)
}
}
|
將其轉換為 Go 的並發模式很容易,使用典型的 Task Queue 的模式:
func main()
{
// 創建帶緩沖的 channel
ch :=
make
(
chan
Task,
3
)
// 運行固定數量的 workers
for
i :=
0
; i < numWorkers; i++ {
go
worker(ch)
}
// 發送任務到 workers
hellaTasks := getTasks()
for
_, task :=
range
hellaTasks {
ch <- task
}
...
}
func worker(ch chan Task)
{
for
{
// 接收任務
task := <-ch
process(task)
}
}
- goroutine-safe,多個 goroutine 可以同時訪問一個 channel 而不會出現競爭問題
- 可以用於在 goroutine 之間存儲和傳遞值
- 其語義是先入先出(FIFO)
- 可以導致 goroutine 的 block 和 unblock
// 帶緩沖的 channel
ch :=
make
(
chan
Task,
3
)
// 無緩沖的 channel
ch :=
make
(
chan
Tass)
|
回顧前面提到的 channel 的特性,特別是前兩個。如果忽略內置的 channel,讓你設計一個具有 goroutines-safe 並且可以用來存儲、傳遞值的東西,你會怎么做?很多人可能覺得或許可以用一個帶鎖的隊列來做。沒錯,事實上,channel 內部就是一個帶鎖的隊列。
https://golang.org/src/runtime/chan.go
type
hchan
struct
{
...
buf unsafe.Pointer
// 指向一個環形隊列
...
sendx
uint
// 發送 index
recvx
uint
// 接收 index
...
lock mutex
// 互斥量
}
|
buf 的具體實現很簡單,就是一個環形隊列的實現。sendx 和 recvx 分別用來記錄發送、接收的位置。然后用一個 lock 互斥鎖來確保無競爭冒險。
對於每一個 ch := make(chan Task, 3) 這類操作,都會在堆中,分配一個空間,建立並初始化一個 hchan 結構變量,而 ch 則是指向這個 hchan 結構的指針。
因為 ch 本身就是個指針,所以我們才可以在 goroutine 函數調用的時候直接將 ch 傳遞過去,而不用再 &ch 取指針了,所以所有使用同一個 ch 的 goroutine 都指向了同一個實際的內存空間。
為了方便描述,我們用 G1 表示 main() 函數的 goroutine,而 G2 表示 worker 的 goroutine。
// G1
func main()
{
...
for
_, task :=
range
tasks {
ch <- task
}
...
}
|
// G2
func worker(ch chan Task)
{
for
{
task :=<-ch
process(task)
}
}
那么 G1 中的 ch <- task0 具體是怎么做的呢?
- 獲取鎖
enqueue(task0) (這里是內存復制 task0)
- 釋放鎖
這一步很簡單,接下來看 G2 的 t := <- ch 是如何讀取數據的。
- 獲取鎖
t = dequeue() (同樣,這里也是內存復制)
- 釋放鎖
這一步也非常簡單。但是我們從這個操作中可以看到,所有 goroutine 中共享的部分只有這個 hchan 的結構體,而所有通訊的數據都是內存復制。這遵循了 Go 並發設計中很核心的一個理念:
“Do not communicate by sharing memory;instead, share memory by communicating.”
假設 G2 需要很長時間的處理,在此期間,G1 不斷的發送任務:
ch <- task1
ch <- task2
ch <- task3
但是當再一次 ch <- task4 的時候,由於 ch 的緩沖只有 3 個,所以沒有地方放了,於是 G1 被 block 了,當有人從隊列中取走一個 Task 的時候,G1 才會被恢復。這是我們都知道的,不過我們今天關心的不是發生了什么,而是如何做到的?
首先,goroutine 不是操作系統線程,而是用戶空間線程。因此 goroutine 是由 Go runtime 來創建並管理的,而不是 OS,所以要比操作系統線程輕量級。
當然,goroutine 最終還是要運行於某個線程中的,控制 goroutine 如何運行於線程中的是 Go runtime 中的 scheduler (調度器)。
Go 的運行時調度器是 M:N 調度模型,既 N 個 goroutine,會運行於 M 個 OS 線程中。換句話說,一個 OS 線程中,可能會運行多個 goroutine。
Go 的 M:N 調度中使用了3個結構:
M : OS 線程
G : goroutine
P : 調度上下文
P 擁有一個運行隊列,里面是所有可以運行的 goroutine 及其上下文
要想運行一個 goroutine - G ,那么一個線程 M ,就必須持有一個該 goroutine 的上下文 P 。
那么當 ch <- task4 執行的時候,channel 中已經滿了,需要pause G1 。這個時候,:
G1 會調用運行時的 gopark ,
- 然后 Go 的運行時調度器就會接管
- 將
G1 的狀態設置為 waiting
- 斷開
G1 和 M 之間的關系(switch out),因此 G1 脫離 M ,換句話說,M 空閑了,可以安排別的任務了。
- 從
P 的運行隊列中,取得一個可運行的 goroutine G
- 建立新的
G 和 M 的關系(Switch in),因此 G 就准備好運行了。
- 當調度器返回的時候,新的
G 就開始運行了,而 G1 則不會運行,也就是 block 了。
從上面的流程中可以看到,對於 goroutine 來說,G1 被阻塞了,新的 G 開始運行了;而對於操作系統線程 M 來說,則根本沒有被阻塞。
我們知道 OS 線程要比 goroutine 要沉重的多,因此這里盡量避免 OS 線程阻塞,可以提高性能。
前面理解了阻塞,那么接下來理解一下如何恢復運行。不過,在繼續了解如何恢復之前,我們需要先進一步理解 hchan 這個結構。因為,當 channel 不在滿的時候,調度器是如何知道該讓哪個 goroutine 繼續運行呢?而且 goroutine 又是如何知道該從哪取數據呢?
在 hchan 中,除了之前提到的內容外,還定義有 sendq 和 recvq 兩個隊列,分別表示等待發送、接收的 goroutine,及其相關信息。
type
hchan
struct
{
...
buf unsafe.Pointer
// 指向一個環形隊列
...
sendq waitq
// 等待發送的隊列
recvq waitq
// 等待接收的隊列
...
lock mutex
// 互斥量
}
|
其中 waitq 是一個鏈表結構的隊列,每個元素是一個 sudog 的結構,其定義大致為:
type
sudog
struct
{
g *g
// 正在等候的 goroutine
elem unsafe.Pointer
// 指向需要接收、發送的元素
...
}
|
https://golang.org/src/runtime/runtime2.go?h=sudog#L270
所以在之前的阻塞 G1 的過程中,實際上:
G1 會給自己創建一個 sudog 的變量
- 然后追加到
sendq 的等候隊列中,方便將來的 receiver 來使用這些信息恢復 G1 。
這些都是發生在調用調度器之前。
那么現在開始看一下如何恢復。
當 G2 調用 t := <- ch 的時候,channel 的狀態是,緩沖是滿的,而且還有一個 G1 在等候發送隊列里,然后 G2 執行下面的操作:
G2 先執行 dequeue() 從緩沖隊列中取得 task1 給 t
G2 從 sendq 中彈出一個等候發送的 sudog
- 將彈出的
sudog 中的 elem 的值 enqueue() 到 buf 中
- 將彈出的
sudog 中的 goroutine,也就是 G1 ,狀態從 waiting 改為 runnable
- 然后,
G2 需要通知調度器 G1 已經可以進行調度了,因此調用 goready(G1) 。
- 調度器將
G1 的狀態改為 runnable
- 調度器將
G1 壓入 P 的運行隊列,因此在將來的某個時刻調度的時候,G1 就會開始恢復運行。
- 返回到 G2
注意,這里是由 G2 來負責將 G1 的 elem 壓入 buf 的,這是一個優化。這樣將來 G1 恢復運行后,就不必再次獲取鎖、enqueue() 、釋放鎖了。這樣就避免了多次鎖的開銷。
更酷的地方是接收方先阻塞的流程。
如果 G2 先執行了 t := <- ch ,此時 buf 是空的,因此 G2 會被阻塞,他的流程是這樣:
G2 給自己創建一個 sudog 結構變量。其中 g 是自己,也就是 G2 ,而 elem 則指向 t
- 將這個
sudog 變量壓入 recvq 等候接收隊列
G2 需要告訴 goroutine,自己需要 pause 了,於是調用 gopark(G2)
- 和之前一樣,調度器將其
G2 的狀態改為 waiting
- 斷開
G2 和 M 的關系
- 從
P 的運行隊列中取出一個 goroutine
- 建立新的 goroutine 和
M 的關系
- 返回,開始繼續運行新的
goroutine
這些應該已經不陌生了,那么當 G1 開始發送數據的時候,流程是什么樣子的呢?
G1 可以將 enqueue(task) ,然后調用 goready(G2) 。不過,我們可以更聰明一些。
我們根據 hchan 結構的狀態,已經知道 task 進入 buf 后,G2 恢復運行后,會讀取其值,復制到 t 中。那么 G1 可以根本不走 buf ,G1 可以直接把數據給 G2 。
Goroutine 通常都有自己的棧,互相之間不會訪問對方的棧內數據,除了 channel。這里,由於我們已經知道了 t 的地址(通過 elem 指針),而且由於 G2 不在運行,所以我們可以很安全的直接賦值。當 G2 恢復運行的時候,既不需要再次獲取鎖,也不需要對 buf 進行操作。從而節約了內存復制、以及鎖操作的開銷。
goroutine-safe
存儲、傳遞值,FIFO
導致 goroutine 的阻塞和恢復
hchan 中的 sendq 和recvq ,也就是 sudog 結構的鏈表隊列
- 調用運行時調度器 (
gopark() , goready() )
無緩沖的 channel 行為就和前面說的直接發送的例子一樣:
- 接收方阻塞 → 發送方直接寫入接收方的棧
- 發送方阻塞 → 接受法直接從發送方的
sudog 中讀取
https://golang.org/src/runtime/select.go
- 先把所有需要操作的 channel 上鎖
- 給自己創建一個
sudog ,然后添加到所有 channel 的 sendq 或recvq (取決於是發送還是接收)
- 把所有的 channel 解鎖,然后 pause 當前調用
select 的 goroutine(gopark() )
- 然后當有任意一個 channel 可用時,
select 的這個 goroutine 就會被調度執行。
- resuming mirrors the pause sequence
更傾向於帶鎖的隊列,而不是無鎖的實現。
“性能提升不是憑空而來的,是隨着復雜度增加而增加的。” - dvyokov
后者雖然性能可能會更好,但是這個優勢,並不一定能夠戰勝隨之而來的實現代碼的復雜度所帶來的劣勢。
- 調用 Go 運行時調度器,這樣可以保持 OS 線程不被阻塞
跨 goroutine 的棧讀、寫。
- 可以讓 goroutine 醒來后不必獲取鎖
- 可以避免一些內存復制
當然,任何優勢都會有其代價。這里的代價是實現的復雜度,所以這里有更復雜的內存管理機制、垃圾回收以及棧收縮機制。
在這里性能的提高優勢,要比復雜度的提高帶來的劣勢要大。
所以在 channel 實現的各種代碼中,我們都可以見到這種 simplicity vs performance 的權衡后的結果。
|
|