golang--Channel有無緩存區別,以及關閉原則


有無緩存的區別

無緩存並不等價於緩存為1

func main(){
	ch := make(chan int)
	ch <- 1
}

這句話會報錯,當向無緩存的chan放數據時,如果一直沒有接收者,那么它會一直堵塞,直到有接收者。

無緩沖的 就是一個送信人去你家門口送信,你不在家他不走,你一定要接下信,他才會走,無緩沖保證信能到你手上。有緩沖的 就是一個送信人去你家仍到你家的信箱轉身就走,除非你的信箱滿了 他必須等信箱空下來。有緩沖的 保證 信能進你家的郵箱

關閉channel

參考,這篇文章已經總結的很好

The Channel Closing Principle(channel 關閉原則):

在使用Go channel的時候,一個適用的原則是不要從接收端關閉channel,也不要關閉有多個並發發送者的channel。換句話說,如果sender(發送者)只是唯一的sender或者是channel最后一個活躍的sender,那么你應該在sender的goroutine關閉channel,從而通知receiver(s)(接收者們)已經沒有值可以讀了。維持這條原則將保證永遠不會發生向一個已經關閉的channel發送值或者關閉一個已經關閉的channel。

優雅方案

  • M個receivers,一個sender

sender通過關閉data channel說“不再發送”,這是最簡單的場景了,就只是當sender不想再發送的時候讓sender關閉data 來關閉channel:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 100
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    
    // the sender
    go func() {
        for {
            if value := rand.Intn(MaxRandomNumber); value == 0 {
                // the only sender can close the channel safely.
                close(dataCh)
                return
            } else {            
                dataCh <- value
            }
        }
    }()
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func() {
            defer wgReceivers.Done()
            
            // receive values until dataCh is closed and
            // the value buffer queue of dataCh is empty.
            for value := range dataCh {
                log.Println(value)
            }
        }()
    }
    
    wgReceivers.Wait()
}

這個例子好理解,sender發送消息,並當符合條件時關閉dataCh。reciver中,用for range接收消息直到dataCh關閉,每個協程輸出完消息后用WaitGroup確認下Done。

注意這里reciver循環時,dataCh沒有關閉,是邊接收后來關閉了。sender有個return因為他是無限循環,然而reciver沒有return因為它可以自己退出。 <-Ch可以不關閉,里面有數據時。

  • 一個receiver,N個sender

receiver通過關閉一個額外的signal channel說“請停止發送”這種場景比上一個要復雜一點。我們不能讓receiver關閉data channel,因為這么做將會打破channel closing principle。但是我們可以讓receiver關閉一個額外的signal channel來通知sender停止發送值:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(1)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the receiver of channel dataCh.
        // Its reveivers are the senders of channel dataCh.
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func() {
            for {
                value := rand.Intn(MaxRandomNumber)
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }()
    }
    
    // the receiver
    go func() {
        defer wgReceivers.Done()
        
        for value := range dataCh {
            if value == MaxRandomNumber-1 {
                // the receiver of the dataCh channel is
                // also the sender of the stopCh cahnnel.
                // It is safe to close the stop channel here.
                close(stopCh)
                return
            }
            
            log.Println(value)
        }
    }()
    
    // ...
    wgReceivers.Wait()
}

注意這個例子沒有關閉dataCh,sender里的無線循環時刻盯着stopCh,一旦它能返回來值就退出。reciver里,滿足退出條件后,關閉stopCh,因為只有關閉了channel在(sender)無限循環里,stopCh才能返回值,正常情況下一個Channel沒有關閉,無限循環會返回deadlock。

還要注意這里的reciver,關閉stopCh后直接return了,這樣就還沒有取到dataCh最后的值,也就不用關閉dataCh,但我覺得一般情況下需要在sender里面關閉。

還需注意,這兩個例子的WaitGroup的Done都放在reciver里面,因為wg是為了保證程序運行結束,結束只有當reciver接收完才結束,而不是sender發送完結束,所以放在reciver里面判斷waitgroup是否Done一個。

正如注釋說的,對於額外的signal channel來說,它的sender是data channel的receiver。這個額外的signal channel被它唯一的sender關閉,遵守了channel closing principle。

判斷sender的標准就是能否有權利關閉channel

  • M個receiver,N個sender

它們當中任意一個通過通知一個moderator(仲裁者)關閉額外的signal channel來說“讓我們結束游戲吧”,這是最復雜的場景了。

我們不能讓任意的receivers和senders關閉data channel,也不能讓任何一個receivers通過關閉一個額外的signal channel來通知所有的senders和receivers退出游戲。這么做的話會打破channel closing principle。但是,我們可以引入一個moderator來關閉一個額外的signal channel。這個例子的一個技巧是怎么通知moderator去關閉額外的signal channel:

package main

import (
    "time"
    "math/rand"
    "sync"
    "log"
    "strconv"
)

func main() {
    rand.Seed(time.Now().UnixNano())
    log.SetFlags(0)
    
    // ...
    const MaxRandomNumber = 100000
    const NumReceivers = 10
    const NumSenders = 1000
    
    wgReceivers := sync.WaitGroup{}
    wgReceivers.Add(NumReceivers)
    
    // ...
    dataCh := make(chan int, 100)
    stopCh := make(chan struct{})
        // stopCh is an additional signal channel.
        // Its sender is the moderator goroutine shown below.
        // Its reveivers are all senders and receivers of dataCh.
    toStop := make(chan string, 1)
        // the channel toStop is used to notify the moderator
        // to close the additional signal channel (stopCh).
        // Its senders are any senders and receivers of dataCh.
        // Its reveiver is the moderator goroutine shown below.
    
    var stoppedBy string
    
    // moderator
    go func() {
        stoppedBy = <- toStop // part of the trick used to notify the moderator
                              // to close the additional signal channel.
        close(stopCh)
    }()
    
    // senders
    for i := 0; i < NumSenders; i++ {
        go func(id string) {
            for {
                value := rand.Intn(MaxRandomNumber)
                if value == 0 {
                    // here, a trick is used to notify the moderator
                    // to close the additional signal channel.
                    select {
                    case toStop <- "sender#" + id:
                    default:
                    }
                    return
                }
                
                // the first select here is to try to exit the
                // goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case dataCh <- value:
                }
            }
        }(strconv.Itoa(i))
    }
    
    // receivers
    for i := 0; i < NumReceivers; i++ {
        go func(id string) {
            defer wgReceivers.Done()
            
            for {
                // same as senders, the first select here is to 
                // try to exit the goroutine as early as possible.
                select {
                case <- stopCh:
                    return
                default:
                }
                
                select {
                case <- stopCh:
                    return
                case value := <-dataCh:
                    if value == MaxRandomNumber-1 {
                        // the same trick is used to notify the moderator 
                        // to close the additional signal channel.
                        select {
                        case toStop <- "receiver#" + id:
                        default:
                        }
                        return
                    }
                    
                    log.Println(value)
                }
            }
        }(strconv.Itoa(i))
    }
    
    // ...
    wgReceivers.Wait()
    log.Println("stopped by", stoppedBy)
}

在這個例子中,仍然遵守着channel closing principle。

請注意channel toStop的緩沖大小是1.這是為了避免當mederator goroutine 准備好之前第一個通知就已經發送了,導致丟失。

if value == 0 {
// here, a trick is used to notify the moderator
// to close the additional signal channel.
select {
case toStop <- "sender#" + id:
default:
}
return
}

這里用select的目的是:

toStop的buffer只有1,如果多個同時發送給toStop的話,會導致阻塞在 toStop <- id,所以使用了select,這樣子當不能發送的時候就知道已經有其他goroutine發送了信號了。其實也可以將toStop的buffer大小改成接收者和發送者數量之和,這樣子就可以直接發送了。

// the first select here is to try to exit the
// goroutine as early as possible.
select {
case <- stopCh:
return
default:
}

select {
case <- stopCh:
return
case dataCh <- value:
}

這里寫兩次的目的:

為了提前知道channel是否已經關閉了,如果省略了這個select,有可能計算關閉了channel,也會執行發送操作,因為在一個select里面,是隨機選擇一個能執行的case來執行的

經驗

不是所有channel都需要關閉的, 因為它完全遵循GC回收規則. 但是如果用channel來通知其他協程停止工作的話, 就需要用到關閉了. 典型的例子就是其他協程使用for xxx := range channel 這樣的語句時, 如果不關閉channel的話, 這些代碼會一直堵住


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM