channel的關閉的經典場景


關閉原則

一個常用的使用Go通道的原則是不要在數據接收方或者在有多個發送者的情況下關閉通
道。

通用的原則是不要關閉已關閉的通道

錯誤關閉

	ch:=make(chan int,1)
	go func(ch chan int) {
		ch<-1
		log.Println("OK1")
		ch<-2 //阻塞1秒
		log.Println("OK2")
		ch<-3 //報錯 因為下面把管道關閉了
	}(ch)

	go func(ch chan int) {
		log.Println("進入讀")
		time.Sleep(time.Second)
		log.Println(<-ch)
		close(ch)
	}(ch)
	time.Sleep(time.Hour)

優雅關閉

do once代表只能執行一次,防止重復關閉

type MyChannel struct {
	C chan int
	once sync.Once
}

func (myChannel MyChannel) SafeClose(){
	myChannel.once.Do(func() {
		close(myChannel.C)
	}) //once代表只執行一次
}

func main() {
	write:=MyChannel{C:make(chan int)}
	go func() {write.C<-1;write.SafeClose()}()
	go func() {log.Println(<-write.C)}()

	time.Sleep(time.Hour)
}



關閉情況

1、多個接收者,一個發送者

這里只有一個發送者,當value=0時,這個發送者關閉管道,這里只有一個寫進程,所以不用擔心重復關閉問題,寫進程可以直接關閉管道

這時,所有的接受者for-range會全部結束

這里的結束信號由寫者發出

func Write(dataCh chan int){
	for{
		if value:=rand.Intn(1000);value!=0{
			time.Sleep(time.Microsecond*500)
			dataCh<-value
		}else{
			close(dataCh)
		}
	}
}
const NumReceivers  =100
var wg=sync.WaitGroup{}

//多個接收者
func Readers(dataCh <-chan int){
	for i:=0;i<NumReceivers;i++{
		go func() {
			defer wg.Done()
			for value:=range dataCh{
				log.Println(value)
			}
		}()
	}
}

func main() {
	rand.Seed(time.Now().UnixNano())
	wg.Add(NumReceivers)


	dataCh:=make(chan int)

	go Write(dataCh)
	Readers(dataCh)

	wg.Wait()


}

2、【變種】多個接收者,一個發送者,但是關閉信號是由第三方發送

這里用了兩個關閉信號

closing:用來通知寫攜程可以停止寫了

closed:用來表示closing信號已經由一個第三方發出過了,后面的第三方可以不用再發了

package main

import (
	"log"
	"math/rand"
	"sync"
	"time"
)

func main() {
	rand.Seed(time.Now().UnixNano())

	const NumReceivers  = 100
	const NumThirdParties  = 100 //第三方

	wg4:=sync.WaitGroup{}
	wg4.Add(NumReceivers)
	dataCh2:=make(chan int)
	closing:=make(chan bool) //通知寫進程 關閉的信號
	closed:=make(chan bool) //通道已經關閉的 信號

	stop:= func() {
		select {
		case closing<-true: //通知關閉
			<-closed   //有一個寫進程收到通知 則結束
		case <-closed: //如果已經關閉 則執行這一句  所以多次調用stop是安全非阻塞的
		}
	}

	//多個第三方攜程
	for i:=0;i<NumThirdParties;i++{
		go func() {
			time.Sleep(time.Second*time.Duration(rand.Intn(100)))
			stop()
		}()
	}

	//發送者 1
	go func() {

		defer func() {
			close(closed) //通知第三方 已經關閉了
			close(dataCh2) //關閉數據通道
		}()

		for{

			select {
			case <-closing:
				return
			default:
			}

			select {
			case <-closing: //收到第三方的關閉請求
				return
			case dataCh2<-rand.Intn(100000):
			}
		}
	}()


	for i:=0;i<NumReceivers;i++{
		go func() {

			defer wg4.Done()

			for value:=range dataCh2{
				log.Println(value)
			}

		}()
	}

	wg4.Wait()
}

3、多個發送者,一個接收者

這里有10000個發送者,為了遵循 管道不能被讀者關閉的原則,於是這里又建立了一個stopCh非緩沖管道,目的只是為了通知多個發送者,可以不用發送了,發送者先檢查有沒有關閉信號,然后再發送。這里用了兩次select,來檢查 。

同時注意,為了防止重復關閉管道,這里沒有讓寫進程關閉dataCh,當所有讀寫進程都不再引用dataCh的時候,就會被gc

這里的關閉信號由讀者發出

const NumSenders  = 10000
var wg2  = sync.WaitGroup{}
var dataCh = make(chan int)
var stopCh = make(chan bool)

func Sender(){
	for i:=0;i<NumSenders;i++{

		go func() {

			select {
			case <-stopCh: //通道關閉后直接執行這個
				return
			default:
			}

			select {
			case <-stopCh:  //當通道被關閉 則直接執行這個
				return  //直接返回
			case dataCh<-rand.Intn(1000):
			}



		}()
	}
}


func Receive(){
	defer wg2.Done()
	for value:=range dataCh{
		log.Println(value)
		if value==0{
			close(stopCh)
			return
		}
	}
}

func main() {
	wg2.Add(1)
	rand.Seed(time.Now().UnixNano())
	go Sender()
	go Receive()

	wg2.Wait()
}

4、多個接收者和多個發送者

這種情況比較復雜,必須叫一個中間人來通知雙方,當讀寫進程需要關閉,通過toStop管道通知中間人,中間人收到通知然后關閉stopCh2管道,這個管道是讓讀寫進程結束讀寫的信號

toStop的容量必須設置成>=1 如果設置成非緩沖管道,那么如果中間人沒有准備好,要發出結束信號時會阻塞,就會轉到default,從而讓結束信號丟失。

這里的結束信號讀寫進程都可以發出

var NumSends = 100
var NumRecives = 50
var wg3 = sync.WaitGroup{}

func main() {
	rand.Seed(time.Now().UnixNano())
	wg3.Add(NumRecives)
	dataCh2 := make(chan int)
	stopCh2 := make(chan bool)
	toStop := make(chan bool, 1) //通知中間者可以關閉了 至少為1

	//中間人
	go func() {
		_ = <-toStop
		close(stopCh2)
	}()

	//發送者
	for i := 0; i < NumSends; i++ {
		go func() {
			for {
				v := rand.Intn(1000)
				if v == 0 { //寫進程通知關閉
					select {
					case toStop <- true:
						dataCh2 <- v
						log.Println("===========================", 0, "-")
					default:
					}
					return
				}
				//嘗試關閉
				select {
				case <-stopCh2:
					return
				default:
				}

				select {
				case <-stopCh2:
					return
				case dataCh2 <- v:
					log.Println(v)
				}

			}
		}()
	}
    
//接收者
	for i := 0; i < NumRecives; i++ {
		go func() {
			defer wg3.Done()
			for {
				select {
				case <-stopCh2:
					return
				default:
				}

				select {
				case <-stopCh2:
					return
				case val := <-dataCh2:
					log.Println(val)
					if val == 888 { //讀進程通知關閉
						select {
						case toStop <- true:
							log.Println("==========================", 888, "-")
						default:
						}
						return
					}
				}

			}
		}()
	}

	wg3.Wait()

}


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM