有無緩存的區別
無緩存並不等價於緩存為1
func main(){
ch := make(chan int)
ch <- 1
}
這句話會報錯,當向無緩存的chan放數據時,如果一直沒有接收者,那么它會一直堵塞,直到有接收者。
無緩沖的 就是一個送信人去你家門口送信,你不在家他不走,你一定要接下信,他才會走,無緩沖保證信能到你手上。有緩沖的 就是一個送信人去你家仍到你家的信箱轉身就走,除非你的信箱滿了 他必須等信箱空下來。有緩沖的 保證 信能進你家的郵箱
關閉channel
參考,這篇文章已經總結的很好
The Channel Closing Principle(channel 關閉原則):
在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要關閉有多個並發發送者的channel。換句話說,如果sender(發送者)只是唯一的sender或者是channel最后一個活躍的sender,那么你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值可以讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。
優雅方案
- M個receivers,一個sender
sender通過關閉data channel說“不再發送”,這是最簡單的場景了,就只是當sender不想再發送的時候讓sender關閉data 來關閉channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 100
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
// the sender
go func() {
for {
if value := rand.Intn(MaxRandomNumber); value == 0 {
// the only sender can close the channel safely.
close(dataCh)
return
} else {
dataCh <- value
}
}
}()
// receivers
for i := 0; i < NumReceivers; i++ {
go func() {
defer wgReceivers.Done()
// receive values until dataCh is closed and
// the value buffer queue of dataCh is empty.
for value := range dataCh {
log.Println(value)
}
}()
}
wgReceivers.Wait()
}
這個例子好理解,sender發送消息,並當符合條件時關閉dataCh。reciver中,用for range接收消息直到dataCh關閉,每個協程輸出完消息后用WaitGroup確認下Done。
注意這里reciver循環時,dataCh沒有關閉,是邊接收后來關閉了。sender有個return因為他是無限循環,然而reciver沒有return因為它可以自己退出。 <-Ch可以不關閉,里面有數據時。
- 一個receiver,N個sender
receiver通過關閉一個額外的signal channel說“請停止發送”這種場景比上一個要復雜一點。我們不能讓receiver關閉data channel,因為這么做將會打破channel closing principle。但是我們可以讓receiver關閉一個額外的signal channel來通知sender停止發送值:
package main
import (
"time"
"math/rand"
"sync"
"log"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(1)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the receiver of channel dataCh.
// Its reveivers are the senders of channel dataCh.
// senders
for i := 0; i < NumSenders; i++ {
go func() {
for {
value := rand.Intn(MaxRandomNumber)
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}()
}
// the receiver
go func() {
defer wgReceivers.Done()
for value := range dataCh {
if value == MaxRandomNumber-1 {
// the receiver of the dataCh channel is
// also the sender of the stopCh cahnnel.
// It is safe to close the stop channel here.
close(stopCh)
return
}
log.Println(value)
}
}()
// ...
wgReceivers.Wait()
}
注意這個例子沒有關閉dataCh,sender里的無線循環時刻盯着stopCh,一旦它能返回來值就退出。reciver里,滿足退出條件后,關閉stopCh,因為只有關閉了channel在(sender)無限循環里,stopCh才能返回值,正常情況下一個Channel沒有關閉,無限循環會返回deadlock。
還要注意這里的reciver,關閉stopCh后直接return了,這樣就還沒有取到dataCh最后的值,也就不用關閉dataCh,但我覺得一般情況下需要在sender里面關閉。
還需注意,這兩個例子的WaitGroup的Done都放在reciver里面,因為wg是為了保證程序運行結束,結束只有當reciver接收完才結束,而不是sender發送完結束,所以放在reciver里面判斷waitgroup是否Done一個。
正如注釋說的,對於額外的signal channel來說,它的sender是data channel的receiver。這個額外的signal channel被它唯一的sender關閉,遵守了channel closing principle。
判斷sender的標准就是能否有權利關閉channel
- M個receiver,N個sender
它們當中任意一個通過通知一個moderator(仲裁者)關閉額外的signal channel來說“讓我們結束游戲吧”,這是最復雜的場景了。
我們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers通過關閉一個額外的signal channel來通知所有的senders和receivers退出游戲。這么做的話會打破channel closing principle。但是,我們可以引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎么通知moderator去關閉額外的signal channel:
package main
import (
"time"
"math/rand"
"sync"
"log"
"strconv"
)
func main() {
rand.Seed(time.Now().UnixNano())
log.SetFlags(0)
// ...
const MaxRandomNumber = 100000
const NumReceivers = 10
const NumSenders = 1000
wgReceivers := sync.WaitGroup{}
wgReceivers.Add(NumReceivers)
// ...
dataCh := make(chan int, 100)
stopCh := make(chan struct{})
// stopCh is an additional signal channel.
// Its sender is the moderator goroutine shown below.
// Its reveivers are all senders and receivers of dataCh.
toStop := make(chan string, 1)
// the channel toStop is used to notify the moderator
// to close the additional signal channel (stopCh).
// Its senders are any senders and receivers of dataCh.
// Its reveiver is the moderator goroutine shown below.
var stoppedBy string
// moderator
go func() {
stoppedBy = <- toStop // part of the trick used to notify the moderator
// to close the additional signal channel.
close(stopCh)
}()
// senders
for i := 0; i < NumSenders; i++ {
go func(id string) {
for {
value := rand.Intn(MaxRandomNumber)
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
}
}(strconv.Itoa(i))
}
// receivers
for i := 0; i < NumReceivers; i++ {
go func(id string) {
defer wgReceivers.Done()
for {
// same as senders, the first select here is to
// try to exit the goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case value := <-dataCh:
if value == MaxRandomNumber-1 {
// the same trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "receiver#" + id:
default:
}
return
}
log.Println(value)
}
}
}(strconv.Itoa(i))
}
// ...
wgReceivers.Wait()
log.Println("stopped by", stoppedBy)
}
在這個例子中,仍然遵守着channel closing principle。
請注意channel toStop的緩沖大小是1.這是為了避免當mederator goroutine 准備好之前第一個通知就已經發送了,導致丟失。
if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}
這里用select的目的是:
toStop的buffer只有1,如果多個同時發送給toStop的話,會導致阻塞在 toStop <- id,所以使用了select,這樣子當不能發送的時候就知道已經有其他goroutine發送了信號了。其實也可以將toStop的buffer大小改成接收者和發送者數量之和,這樣子就可以直接發送了。
// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}
select {
case <- stopCh:
return
case dataCh <- value:
}
這里寫兩次的目的:
為了提前知道channel是否已經關閉了,如果省略了這個select,有可能計算關閉了channel,也會執行發送操作,因為在一個select里面,是隨機選擇一個能執行的case來執行的
經驗
不是所有channel都需要關閉的, 因為它完全遵循GC回收規則. 但是如果用channel來通知其他協程停止工作的話, 就需要用到關閉了. 典型的例子就是其他協程使用for xxx := range channel 這樣的語句時, 如果不關閉channel的話, 這些代碼會一直堵住