在本文發表數日前,我曾寫了一篇文章來解釋通道的規則。 那篇文章在reddit和HN上獲得了很多點贊,但也有很多人對Go通道的細節設計提出了一些批評意見。
這些批評主要針對於通道設計中的下列細節:- 沒有一個簡單和通用的方法用來在不改變一個通道的狀態的情況下檢查這個通道是否已經關閉。
- 關閉一個已經關閉的通道將產生一個恐慌,所以在不知道一個通道是否已經關閉的時候關閉此通道是很危險的。
- 向一個已關閉的通道發送數據將產生一個恐慌,所以在不知道一個通道是否已經關閉的時候向此通道發送數據是很危險的。
這些批評看上去有幾分道理(實際上屬於對通道的不正確使用導致的偏見)。 是的,Go語言中並沒有提供一個內置函數來檢查一個通道是否已經關閉。
package main import "fmt" type T int func IsClosed(ch <-chan T) bool { select { case <-ch: return true default: } return false } func main() { c := make(chan T) fmt.Println(IsClosed(c)) // false close(c) fmt.Println(IsClosed(c)) // true }
如前所述,此方法並不是一個通用的檢查通道是否已經關閉的方法。
事實上,即使有一個內置closed
函數用來檢查一個通道是否已經關閉,它的有用性也是十分有限的。 原因是當此函數的一個調用的結果返回時,被查詢的通道的狀態可能已經又改變了,導致此調用結果並不能反映出被查詢的通道的最新狀態。 雖然我們可以根據一個調用closed(ch)
的返回結果為true
而得出我們不應該再向通道ch
發送數據的結論, 但是我們不能根據一個調用closed(ch)
的返回結果為false
而得出我們可以繼續向通道ch
發送數據的結論。
通道關閉原則
一個常用的使用Go通道的原則是不要在數據接收方或者在有多個發送者的情況下關閉通道。 換句話說,我們只應該讓一個通道唯一的發送者關閉此通道。
下面我們將稱此原則為通道關閉原則。
當然,這並不是一個通用的關閉通道的原則。通用的原則是不要關閉已關閉的通道。 如果我們能夠保證從某個時刻之后,再沒有協程將向一個未關閉的非nil通道發送數據,則一個協程可以安全地關閉此通道。 然而,做出這樣的保證常常需要很大的努力,從而導致代碼過度復雜。 另一方面,遵循通道關閉原則是一件相對簡單的事兒。
粗魯地關閉通道的方法
func SafeClose(ch chan T) (justClosed bool) { defer func() { if recover() != nil { // 一個函數的返回結果可以在defer調用中修改。 justClosed = false } }() // 假設ch != nil。 close(ch) // 如果ch已關閉,則產生一個恐慌。 return true // <=> justClosed = true; return }
此方法違反了通道關閉原則。
同樣的方法可以用來粗魯地向一個關閉狀態未知的通道發送數據。
func SafeSend(ch chan T, value T) (closed bool) { defer func() { if recover() != nil { closed = true } }() ch <- value // 如果ch已關閉,則產生一個恐慌。 return false // <=> closed = false; return }
這樣的粗魯方法不僅違反了通道關閉原則,而且Go白皮書和標准編譯器不保證它的實現中不存在數據競爭。
禮貌地關閉通道的方法
sync.Once
來關閉通道。
type MyChannel struct { C chan T once sync.Once } func NewMyChannel() *MyChannel { return &MyChannel{C: make(chan T)} } func (mc *MyChannel) SafeClose() { mc.once.Do(func() { close(mc.C) }) }
當然,我們也可以使用sync.Mutex
來防止多次關閉一個通道。
type MyChannel struct { C chan T closed bool mutex sync.Mutex } func NewMyChannel() *MyChannel { return &MyChannel{C: make(chan T)} } func (mc *MyChannel) SafeClose() { mc.mutex.Lock() defer mc.mutex.Unlock() if !mc.closed { close(mc.C) mc.closed = true } } func (mc *MyChannel) IsClosed() bool { mc.mutex.Lock() defer mc.mutex.Unlock() return mc.closed }
這些實現確實比上一節中的方法禮貌一些,但是它們不能完全有效地避免數據競爭。 目前的Go白皮書並不保證發生在一個通道上的並發關閉操作和發送操縱不會產生數據競爭。 如果一個SafeClose
函數和同一個通道上的發送操作同時運行,則數據競爭可能發生(雖然這樣的數據競爭一般並不會帶來什么危害)。
優雅地關閉通道的方法
上一節中介紹的SafeSend
函數有一個弊端,它的調用不能做為case
操作而被使用在select
代碼塊中。 另外,很多Go程序員(包括我)認為上面兩節展示的關閉通道的方法不是很優雅。 本節下面將介紹一些在各種情形下使用純通道操作來關閉通道的方法。
(為了演示程序的完整性,下面這些例子中使用到了sync.WaitGroup
。在實踐中,sync.WaitGroup
並不是必需的。)
情形一:M個接收者和一個發送者。發送者通過關閉用來傳輸數據的通道來傳遞發送結束信號
package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 100 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) // 發送者 go func() { for { if value := rand.Intn(Max); value == 0 { // 此唯一的發送者可以安全地關閉此數據通道。 close(dataCh) return } else { dataCh <- value } } }() // 接收者 for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() // 接收數據直到通道dataCh已關閉 // 並且dataCh的緩沖隊列已空。 for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
情形二:一個接收者和N個發送者,此唯一接收者通過關閉一個額外的信號通道來通知發送者不要在發送數據了
package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(1) // ... dataCh := make(chan int) stopCh := make(chan struct{}) // stopCh是一個額外的信號通道。它的 // 發送者為dataCh數據通道的接收者。 // 它的接收者為dataCh數據通道的發送者。 // 發送者 for i := 0; i < NumSenders; i++ { go func() { for { // 這里的第一個嘗試接收用來讓此發送者 // 協程盡早地退出。對於這個特定的例子, // 此select代碼塊並非必需。 select { case <- stopCh: return default: } // 即使stopCh已經關閉,此第二個select // 代碼塊中的第一個分支仍很有可能在若干個 // 循環步內依然不會被選中。如果這是不可接受 // 的,則上面的第一個select代碼塊是必需的。 select { case <- stopCh: return case dataCh <- rand.Intn(Max): } } }() } // 接收者 go func() { defer wgReceivers.Done() for value := range dataCh { if value == Max-1 { // 此唯一的接收者同時也是stopCh通道的 // 唯一發送者。盡管它不能安全地關閉dataCh數 // 據通道,但它可以安全地關閉stopCh通道。 close(stopCh) return } log.Println(value) } }() // ... wgReceivers.Wait() }
如此例中的注釋所述,對於此額外的信號通道stopCh
,它只有一個發送者,即dataCh
數據通道的唯一接收者。 dataCh
數據通道的接收者關閉了信號通道stopCh
,這是不違反通道關閉原則的。
在此例中,數據通道dataCh
並沒有被關閉。是的,我們不必關閉它。 當一個通道不再被任何協程所使用后,它將逐漸被垃圾回收掉,無論它是否已經被關閉。 所以這里的優雅性體現在通過不關閉一個通道來停止使用此通道。
情形三:M個接收者和N個發送者。它們中的任何協程都可以讓一個中間調解協程幫忙發出停止數據傳送的信號
這是最復雜的一種情形。我們不能讓接收者和發送者中的任何一個關閉用來傳輸數據的通道,我們也不能讓多個接收者之一關閉一個額外的信號通道。 這兩種做法都違反了通道關閉原則。
然而,我們可以引入一個中間調解者角色並讓其關閉額外的信號通道來通知所有的接收者和發送者結束工作。 具體實現見下例。注意其中使用了一個嘗試發送操作來向中間調解者發送信號。
package main import ( "time" "math/rand" "sync" "log" "strconv" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 10 const NumSenders = 1000 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) stopCh := make(chan struct{}) // stopCh是一個額外的信號通道。它的發送 // 者為中間調解者。它的接收者為dataCh // 數據通道的所有的發送者和接收者。 toStop := make(chan string, 1) // toStop是一個用來通知中間調解者讓其 // 關閉信號通道stopCh的第二個信號通道。 // 此第二個信號通道的發送者為dataCh數據 // 通道的所有的發送者和接收者,它的接收者 // 為中間調解者。它必須為一個緩沖通道。 var stoppedBy string // 中間調解者 go func() { stoppedBy = <-toStop close(stopCh) }() // 發送者 for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { // 為了防止阻塞,這里使用了一個嘗試 // 發送操作來向中間調解者發送信號。 select { case toStop <- "發送者#" + id: default: } return } // 此處的嘗試接收操作是為了讓此發送協程盡早 // 退出。標准編譯器對嘗試接收和嘗試發送做了 // 特殊的優化,因而它們的速度很快。 select { case <- stopCh: return default: } // 即使stopCh已關閉,如果這個select代碼塊 // 中第二個分支的發送操作是非阻塞的,則第一個 // 分支仍很有可能在若干個循環步內依然不會被選 // 中。如果這是不可接受的,則上面的第一個嘗試 // 接收操作代碼塊是必需的。 select { case <- stopCh: return case dataCh <- value: } } }(strconv.Itoa(i)) } // 接收者 for i := 0; i < NumReceivers; i++ { go func(id string) { defer wgReceivers.Done() for { // 和發送者協程一樣,此處的嘗試接收操作是為了 // 讓此接收協程盡早退出。 select { case <- stopCh: return default: } // 即使stopCh已關閉,如果這個select代碼塊 // 中第二個分支的接收操作是非阻塞的,則第一個 // 分支仍很有可能在若干個循環步內依然不會被選 // 中。如果這是不可接受的,則上面嘗試接收操作 // 代碼塊是必需的。 select { case <- stopCh: return case value := <-dataCh: if value == Max-1 { // 為了防止阻塞,這里使用了一個嘗試 // 發送操作來向中間調解者發送信號。 select { case toStop <- "接收者#" + id: default: } return } log.Println(value) } } }(strconv.Itoa(i)) } // ... wgReceivers.Wait() log.Println("被" + stoppedBy + "終止了") }
在此例中,通道關閉原則依舊得到了遵守。
請注意,信號通道toStop
的容量必須至少為1。 如果它的容量為0,則在中間調解者還未准備好的情況下就已經有某個協程向toStop
發送信號時,此信號將被拋棄。
我們也可以不使用嘗試發送操作向中間調解者發送信號,但信號通道toStop
的容量必須至少為數據發送者和數據接收者的數量之和,以防止向其發送數據時(有一個極其微小的可能)導致某些發送者和接收者協程永久阻塞。
... toStop := make(chan string, NumReceivers + NumSenders) ... value := rand.Intn(Max) if value == 0 { toStop <- "sender#" + id return } ... if value == Max-1 { toStop <- "receiver#" + id return } ...
情形四:“M個接收者和一個發送者”情形的一個變種:用來傳輸數據的通道的關閉請求由第三方發出
dataCh
)的關閉請求需要由某個第三方協程發出。對於這種情形,我們可以使用一個額外的信號通道來通知唯一的發送者關閉數據通道(
dataCh
)。
package main import ( "time" "math/rand" "sync" "log" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 100000 const NumReceivers = 100 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) closing := make(chan struct{}) // 信號通道 closed := make(chan struct{}) // 此stop函數可以被安全地多次調用。 stop := func() { select { case closing<-struct{}{}: <-closed case <-closed: } } // 一些第三方協程 for i := 0; i < NumThirdParties; i++ { go func() { r := 1 + rand.Intn(3) time.Sleep(time.Duration(r) * time.Second) stop() }() } // 發送者 go func() { defer func() { close(closed) close(dataCh) }() for { select{ case <-closing: return default: } select{ case <-closing: return case dataCh <- rand.Intn(Max): } } }() // 接收者 for i := 0; i < NumReceivers; i++ { go func() { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } wgReceivers.Wait() }
情形五:“N個發送者”的一個變種:用來傳輸數據的通道必須被關閉以通知各個接收者數據發送已經結束了
dataCh
)。 但是有時候,數據通道(
dataCh
)必須被關閉以通知各個接收者數據發送已經結束。 對於這種“N個發送者”情形,我們可以使用一個中間通道將它們轉化為“一個發送者”情形,然后繼續使用上一節介紹的技巧來關閉此中間通道,從而避免了關閉原始的
dataCh
數據通道。
package main import ( "time" "math/rand" "sync" "log" "strconv" ) func main() { rand.Seed(time.Now().UnixNano()) log.SetFlags(0) // ... const Max = 1000000 const NumReceivers = 10 const NumSenders = 1000 const NumThirdParties = 15 wgReceivers := sync.WaitGroup{} wgReceivers.Add(NumReceivers) // ... dataCh := make(chan int) // 將被關閉 middleCh := make(chan int) // 不會被關閉 closing := make(chan string) closed := make(chan struct{}) var stoppedBy string stop := func(by string) { select { case closing <- by: <-closed case <-closed: } } // 中間層 go func() { exit := func(v int, needSend bool) { close(closed) if needSend { dataCh <- v } close(dataCh) } for { select { case stoppedBy = <-closing: exit(0, false) return case v := <- middleCh: select { case stoppedBy = <-closing: exit(v, true) return case dataCh <- v: } } } }() // 一些第三方協程 for i := 0; i < NumThirdParties; i++ { go func(id string) { r := 1 + rand.Intn(3) time.Sleep(time.Duration(r) * time.Second) stop("3rd-party#" + id) }(strconv.Itoa(i)) } // 發送者 for i := 0; i < NumSenders; i++ { go func(id string) { for { value := rand.Intn(Max) if value == 0 { stop("sender#" + id) return } select { case <- closed: return default: } select { case <- closed: return case middleCh <- value: } } }(strconv.Itoa(i)) } // 接收者 for range [NumReceivers]struct{}{} { go func() { defer wgReceivers.Done() for value := range dataCh { log.Println(value) } }() } // ... wgReceivers.Wait() log.Println("stopped by", stoppedBy) }