channel 功能解析


在golang中,channel是一個比較核心的功能,作為goroutine之間通信的一種方式,channel和linux系統中管道/消息隊列比較類似。但不同之處在於,channel在golang中,提倡基於通信實現內存共享(linux是基於內存實現通信),重點是如何利用channel節省內存,共享消息。

channel的定義

channelType = ("chan" | "chan" "<-" | "<-" "chan") ElementType
chan T          // 可以接受和發送類型為 T 的數據
chan<- float64  // 只可以用來發送float64類型的數據
<- chan int     // 只可以用來接受int類型的數據

與slice、map類型,可以使用make關鍵字來初始化channel

ch1 := make(chan int)       // 定義無緩沖的整形通道
ch2 := make(chan string,10)    // 定義有緩沖的字符串通道

有緩沖的channel可以部分避免阻塞讀取操作

容量(capacity)代表Channel容納的最多的元素的數量,代表Channel的緩存的大小。
如果沒有設置容量,或者容量設置為0, 說明Channel沒有緩存,只有sender和receiver都准備好了后它們的通訊(communication)才會發生(Blocking)。如果設置了緩存,就有可能不發生阻塞, 只有buffer滿了后 send才會阻塞, 而只有緩存空了后receive才會阻塞。一個nil channel不會通信。

可以通過內建的close方法可以關閉Channel。

你可以在多個goroutine從/往 一個channel 中 receive/send 數據, 不必考慮額外的同步措施。

Channel可以作為一個先入先出(FIFO)的隊列,接收的數據和發送的數據的順序是一致的。

channel的功能

channel的通信作用

package main

import (
	"fmt"
	"time"
)

func main() {
	resCh := make(chan string)
	go func() {
		for i := 0; i < 5; i++ {
			time.Sleep(1 * time.Second)
			resCh <- fmt.Sprintf("執行任務完成: %d", i)
		}
		// close(resCh)  // 1.避免channel讀寫進入deadlock,子goroutine寫入結束之后,主動close
	}()

	for {
		i, ok := <-resCh
		if !ok {
			break
		}
		fmt.Println(i)
	}

	// for i := 0; i < 5; i++ {     // 2.避免channel讀寫進入deadlock,在已經數據寫入5次之后,通過for也只讀取5次
	// 	fmt.Println(<-resCh)
	// }
}

輸出結果:
執行任務結果: 0
執行任務結果: 1
執行任務結果: 2
執行任務結果: 3
執行任務結果: 4
fatal error: all goroutines are asleep - deadlock!

這里可以看到main線程(主goroutine協程),里面開啟了一個子協程,將執行結果寫入到channel中,並在main線程中讀出,以實現子協程的數據在main線程中展示。
注意,這里讀取完5個任務之后,報錯 fatal error: all goroutines are asleep - deadlock! 只是因為main線程期待不斷從管道中讀取數據,但數據必須是其他goroutine線程放進管道的,但是其他goroutine線程都已經執行完了,再也沒有數據寫入而main線程將等待不到新的數據讀出,因此報錯給系統目前進入了一個死鎖等待(deadlock)。解決辦法有兩種:
1.在其他goroutine中,沒有數據寫入之后,進行close操作
2.在已經數據寫入5次之后,通過for也只讀取5次

channel的同步作用(blocking)

默認情況下,發送和接收會一直阻塞着,直到另一方准備好。這種方式可以用來在gororutine中進行同步,而不必使用顯示的鎖或者條件變量。

import "fmt"
func sum(s []int, c chan int) {
    sum := 0
    for _, v := range s {
        sum += v
    }
    c <- sum // send sum to c
}
func main() {
    s := []int{7, 2, 8, -9, 4, 0}
    c := make(chan int)
    go sum(s[:len(s)/2], c)
    go sum(s[len(s)/2:], c)
    x, y := <-c, <-c // receive from c
    fmt.Println(x, y, x+y)
}

如官方的例子中x, y := <-c, <-c這句會一直等待計算結果發送到channel中。
樣例2:main goroutine通過done channel等待worker完成任務。 worker做完任務后只需往channel發送一個數據就可以通知main goroutine任務完成。

import (
    "fmt"
    "time"
)
func worker(done chan bool) {
    time.Sleep(time.Second)
    // 通知任務已完成
    done <- true
}
func main() {
    done := make(chan bool, 1)
    go worker(done)
    // 等待任務完成
    <-done
}

range

如上面樣例中:

	for {
		i, ok := <-resCh
		if !ok {
			break
		}
		fmt.Println(i)
	}

可以用range遍歷

	for res := range resCh {
		fmt.Println(res)
	}

range channel 產生的迭代值為channel中發送的值,會一直迭代到channel被關閉,也就是說如果close(resCh)代碼注釋,同樣程序會一直阻塞再for .... range 這里面,進入deadlock錯誤

select

select語句選擇一組可能的send操作和receive操作去處理。它類似switch,但是只是用來處理通訊(communication)操作。
它的case可以是send語句,也可以是receive語句,亦或者default。
receive語句可以將值賦值給一個或者兩個變量。它必須是一個receive操作。
最多允許有一個default case,它可以放在case列表的任何位置,盡管我們大部分會將它放在最后。

import "fmt"
func fibonacci(c, quit chan int) {
    x, y := 0, 1
    for {
        select {
        case c <- x:
            x, y = y, x+y
        case <-quit:
            fmt.Println("quit")
            return
        }
    }
}
func main() {
    c := make(chan int)
    quit := make(chan int)
    go func() {
        for i := 0; i < 10; i++ {
            fmt.Println(<-c)
        }
        quit <- 0
    }()
    fibonacci(c, quit)
}

如果有同時多個case去處理,比如同時有多個channel可以接收數據,那么Go會偽隨機的選擇一個case處理(pseudo-random)。如果沒有case需要處理,則會選擇default去處理,如果default case存在的情況下。如果沒有default case,則select語句會阻塞,直到某個case需要處理。
需要注意的是,nil channel上的操作會一直被阻塞,如果沒有default case,只有nil channel的select會一直被阻塞。
select語句和switch語句一樣,它不是循環,它只會選擇一個case來處理,如果想一直處理channel,你可以在外面加一個無限的for循環:

for {
    select {
    case c <- x:
        x, y = y, x+y
    case <-quit:
        fmt.Println("quit")
        return
    }
}

timeout

select有很重要的一個應用就是超時處理。 因為上面我們提到,如果沒有case需要處理,select語句就會一直阻塞着。這時候我們可能就需要一個超時操作,用來處理超時的情況。
下面這個例子我們會在2秒后往channel c1中發送一個數據,但是select設置為1秒超時,因此我們會打印出timeout 1,而不是result 1。

import "time"
import "fmt"
func main() {
    c1 := make(chan string, 1)
    go func() {
        time.Sleep(time.Second * 2)
        c1 <- "result 1"
    }()
    select {
    case res := <-c1:
        fmt.Println(res)
    case <-time.After(time.Second * 1):
        fmt.Println("timeout 1")
    }
}

其實它利用的是time.After方法,它返回一個類型為<-chan Time的單向的channel,在指定的時間發送一個當前時間給返回的channel中。

Timer和Ticker

我們看一下關於時間的兩個Channel。
timer是一個定時器,代表未來的一個單一事件,你可以告訴timer你要等待多長時間,它提供一個Channel,在將來的那個時間那個Channel提供了一個時間值。下面的例子中第二行會阻塞2秒鍾左右的時間,直到時間到了才會繼續執行。

timer1 := time.NewTimer(time.Second * 2)
<-timer1.C
fmt.Println("Timer 1 expired")

當然如果你只是想單純的等待的話,可以使用time.Sleep來實現。
你還可以使用timer.Stop來停止計時器。

timer2 := time.NewTimer(time.Second)
go func() {
    <-timer2.C
    fmt.Println("Timer 2 expired")
}()
stop2 := timer2.Stop()
if stop2 {
    fmt.Println("Timer 2 stopped")
}

ticker是一個定時觸發的計時器,它會以一個間隔(interval)往Channel發送一個事件(當前時間),而Channel的接收者可以以固定的時間間隔從Channel中讀取事件。下面的例子中ticker每500毫秒觸發一次,你可以觀察輸出的時間。

ticker := time.NewTicker(time.Millisecond * 500)
go func() {
    for t := range ticker.C {
        fmt.Println("Tick at", t)
    }
}()

類似timer, ticker也可以通過Stop方法來停止。一旦它停止,接收者不再會從channel中接收數據了。

close

內建的close方法可以用來關閉channel。

總結一下channel關閉后sender的receiver操作。
如果channel c已經被關閉,繼續往它發送數據會導致panic: send on closed channel:

import "time"
func main() {
    go func() {
        time.Sleep(time.Hour)
    }()
    c := make(chan int, 10)
    c <- 1
    c <- 2
    close(c)
    c <- 3
}

但是從這個關閉的channel中不但可以讀取出已發送的數據,還可以不斷的讀取零值:

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
fmt.Println(<-c) //1
fmt.Println(<-c) //2
fmt.Println(<-c) //0
fmt.Println(<-c) //0

但是如果通過range讀取,channel關閉后for循環會跳出:

c := make(chan int, 10)
c <- 1
c <- 2
close(c)
for i := range c {
    fmt.Println(i)
}

通過i, ok := <-c可以查看Channel的狀態,判斷值是零值還是正常讀取的值。

c := make(chan int, 10)
close(c)
i, ok := <-c
fmt.Printf("%d, %t", i, ok) //0, false

來源:https://colobu.com/2016/04/14/Golang-Channels/


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM