golang channel 使用總結


原文地址

不同於傳統的多線程並發模型使用共享內存來實現線程間通信的方式,golang 的哲學是通過 channel 進行協程(goroutine)之間的通信來實現數據共享:

Do not communicate by sharing memory; instead, share memory by communicating.

這種方式的優點是通過提供原子的通信原語,避免了競態情形(race condition)下復雜的鎖機制。
channel 可以看成一個 FIFO 隊列,對 FIFO 隊列的讀寫都是原子的操作,不需要加鎖。對 channel 的操作行為結果總結如下:

操作 nil channel closed channel not-closed non-nil channel
close panic panic 成功 close
ch <- 一直阻塞 panic 阻塞或成功寫入數據
<- ch 一直阻塞 讀取對應類型零值 阻塞或成功讀取數據

讀取一個已關閉的 channel 時,總是能讀取到對應類型的零值,為了和讀取非空未關閉 channel 的行為區別,可以使用兩個接收值:

// ok is false when ch is closed
v, ok := <-ch

golang 中大部分類型都是值類型(只有 slice / channel / map 是引用類型),讀/寫類型是值類型的 channel 時,如果元素 size 比較大時,應該使用指針代替,避免頻繁的內存拷貝開銷。

內部實現

如圖所示,在 channel 的內部實現中(具體定義在 $GOROOT/src/runtime/chan.go 里),維護了 3 個隊列:

  • 讀等待協程隊列 recvq,維護了阻塞在讀此 channel 的協程列表
  • 寫等待協程隊列 sendq,維護了阻塞在寫此 channel 的協程列表
  • 緩沖數據隊列 buf,用環形隊列實現,不帶緩沖的 channel 此隊列 size 則為 0

當協程嘗試從未關閉的 channel 中讀取數據時,內部的操作如下:

  1. 當 buf 非空時,此時 recvq 必為空,buf 彈出一個元素給讀協程,讀協程獲得數據后繼續執行,此時若 sendq 非空,則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據入隊列 buf ,此時讀取操作 <- ch 未阻塞;
  2. 當 buf 為空但 sendq 非空時(不帶緩沖的 channel),則從 sendq 中彈出一個寫協程轉入 running 狀態,待寫數據直接傳遞給讀協程,讀協程繼續執行,此時讀取操作 <- ch 未阻塞;
  3. 當 buf 為空並且 sendq 也為空時,讀協程入隊列 recvq 並轉入 blocking 狀態,當后續有其他協程往 channel 寫數據時,讀協程才會重新轉入 running 狀態,此時讀取操作 <- ch 阻塞。

類似的,當協程嘗試往未關閉的 channel 中寫入數據時,內部的操作如下:

  1. 當隊列 recvq 非空時,此時隊列 buf 必為空,從 recvq 彈出一個讀協程接收待寫數據,此讀協程此時結束阻塞並轉入 running 狀態,寫協程繼續執行,此時寫入操作 ch <- 未阻塞;
  2. 當隊列 recvq 為空但 buf 未滿時,此時 sendq 必為空,寫協程的待寫數據入 buf 然后繼續執行,此時寫入操作 ch <- 未阻塞;
  3. 當隊列 recvq 為空並且 buf 為滿時,此時寫協程入隊列 sendq 並轉入 blokcing 狀態,當后續有其他協程從 channel 中讀數據時,寫協程才會重新轉入 running 狀態,此時寫入操作 ch <- 阻塞。

當關閉 non-nil channel 時,內部的操作如下:

  1. 當隊列 recvq 非空時,此時 buf 必為空,recvq 中的所有協程都將收到對應類型的零值然后結束阻塞狀態;
  2. 當隊列 sendq 非空時,此時 buf 必為滿,sendq 中的所有協程都會產生 panic ,在 buf 中數據仍然會保留直到被其他協程讀取。

使用場景

除了常規的用來在協程之間傳遞數據外,本節列出了一些特殊的使用 channel 的場景。

futures / promises

golang 雖然沒有直接提供 futrue / promise 模型的操作原語,但通過 goroutine 和 channel 可以實現類似的功能:

package main

import (
	"io/ioutil"
	"log"
	"net/http"
)

// RequestFuture, http request promise.
func RequestFuture(url string) <-chan []byte {
    c := make(chan []byte, 1)
    go func() {
        var body []byte
        defer func() {
            c <- body
        }()

        res, err := http.Get(url)
        if err != nil {
            return
        }
        defer res.Body.Close()

        body, _ = ioutil.ReadAll(res.Body)
    }()

    return c
}

func main() {
    future := RequestFuture("https://api.github.com/users/octocat/orgs")
    body := <-future
    log.Printf("reponse length: %d", len(body))
}

條件變量(condition variable)

類型於 POSIX 接口中線程通知其他線程某個事件發生的條件變量,channel 的特性也可以用來當成協程之間同步的條件變量。因為 channel 只是用來通知,所以 channel 中具體的數據類型和值並不重要,這種場景一般用 strct {} 作為 channel 的類型。

一對一通知

類似 pthread_cond_signal() 的功能,用來在一個協程中通知另個某一個協程事件發生:

package main

import (
	"fmt"
	"time"
)

func main() {
	ch := make(chan struct{})
	nums := make([]int, 100)

	go func() {
		time.Sleep(time.Second)
		for i := 0; i < len(nums); i++ {
			nums[i] = i
		}
		// send a finish signal
		ch <- struct{}{}
	}()

	// wait for finish signal
	<-ch
	fmt.Println(nums)
}
廣播通知

類似 pthread_cond_broadcast() 的功能。利用從已關閉的 channel 讀取數據時總是非阻塞的特性,可以實現在一個協程中向其他多個協程廣播某個事件發生的通知:

package main

import (
	"fmt"
	"time"
)

func main() {
	N := 10
	exit := make(chan struct{})
	done := make(chan struct{}, N)

	// start N worker goroutines
	for i := 0; i < N; i++ {
		go func(n int) {
			for {
				select {
				// wait for exit signal
				case <-exit:
					fmt.Printf("worker goroutine #%d exit\n", n)
					done <- struct{}{}
					return
				case <-time.After(time.Second):
					fmt.Printf("worker goroutine #%d is working...\n", n)
				}
			}
		}(i)
	}

	time.Sleep(3 * time.Second)
	// broadcast exit signal
	close(exit)
	// wait for all worker goroutines exit
	for i := 0; i < N; i++ {
		<-done
	}
	fmt.Println("main goroutine exit")
}

信號量

channel 的讀/寫相當於信號量的 P / V 操作,下面的示例程序中 channel 相當於信號量:

package main

import (
	"log"
	"math/rand"
	"time"
)

type Seat int
type Bar chan Seat

func (bar Bar) ServeConsumer(customerId int) {
	log.Print("-> consumer#", customerId, " enters the bar")
	seat := <-bar // need a seat to drink
	log.Print("consumer#", customerId, " drinks at seat#", seat)
	time.Sleep(time.Second * time.Duration(2+rand.Intn(6)))
	log.Print("<- consumer#", customerId, " frees seat#", seat)
	bar <- seat // free the seat and leave the bar
}

func main() {
	rand.Seed(time.Now().UnixNano())

	bar24x7 := make(Bar, 10) // the bar has 10 seats
	// Place seats in an bar.
	for seatId := 0; seatId < cap(bar24x7); seatId++ {
		bar24x7 <- Seat(seatId) // none of the sends will block
	}

	// a new consumer try to enter the bar for each second
	for customerId := 0; ; customerId++ {
		time.Sleep(time.Second)
		go bar24x7.ServeConsumer(customerId)
	}
}

互斥量

互斥量相當於二元信號里,所以 cap 為 1 的 channel 可以當成互斥量使用:

package main

import "fmt"

func main() {
	mutex := make(chan struct{}, 1) // the capacity must be one

	counter := 0
	increase := func() {
		mutex <- struct{}{} // lock
		counter++
		<-mutex // unlock
	}

	increase1000 := func(done chan<- struct{}) {
		for i := 0; i < 1000; i++ {
			increase()
		}
		done <- struct{}{}
	}

	done := make(chan struct{})
	go increase1000(done)
	go increase1000(done)
	<-done; <-done
	fmt.Println(counter) // 2000
}

關閉 channel

關閉不再需要使用的 channel 並不是必須的。跟其他資源比如打開的文件、socket 連接不一樣,這類資源使用完后不關閉后會造成句柄泄露,channel 使用完后不關閉也沒有關系,channel 沒有被任何協程用到后最終會被 GC 回收。關閉 channel 一般是用來通知其他協程某個任務已經完成了。golang 也沒有直接提供判斷 channel 是否已經關閉的接口,雖然可以用其他不太優雅的方式自己實現一個:

func isClosed(ch chan int) bool {
	select {
	case <-ch:
		return true
	default:
	}
	return false
}

不過實現一個這樣的接口也沒什么必要。因為就算通過 isClosed() 得到當前 channel 當前還未關閉,如果試圖往 channel 里寫數據,仍然可能會發生 panic ,因為在調用 isClosed() 后,其他協程可能已經把 channel 關閉了。
關閉 channel 時應該注意以下准則:

  • 不要在讀取端關閉 channel ,因為寫入端無法知道 channel 是否已經關閉,往已關閉的 channel 寫數據會 panic ;
  • 有多個寫入端時,不要再寫入端關閉 channle ,因為其他寫入端無法知道 channel 是否已經關閉,關閉已經關閉的 channel 會發生 panic ;
  • 如果只有一個寫入端,可以在這個寫入端放心關閉 channel 。

關閉 channel 粗暴一點的做法是隨意關閉,如果產生了 panic 就用 recover 避免進程掛掉。稍好一點的方案是使用標准庫的 sync 包來做關閉 channel 時的協程同步,不過使用起來也稍微復雜些。下面介紹一種優雅些的做法。

一寫多讀

這種場景下這個唯一的寫入端可以關閉 channel 用來通知讀取端所有數據都已經寫入完成了。讀取端只需要用 for range 把 channel 中數據遍歷完就可以了,當 channel 關閉時,for range 仍然會將 channel 緩沖中的數據全部遍歷完然后再退出循環:

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)

	send := func() {
		for i := 0; i < 100; i++ {
			ch <- i
		}
		// signal sending finish
		close(ch)
	}

	recv := func(id int) {
		defer wg.Done()
		for i := range ch {
			fmt.Printf("receiver #%d get %d\n", id, i)
		}
		fmt.Printf("receiver #%d exit\n", id)
	}

	wg.Add(3)
	go recv(0)
	go recv(1)
	go recv(2)
	send()

	wg.Wait()
}

多寫一讀

這種場景下雖然可以用 sync.Once 來解決多個寫入端重復關閉 channel 的問題,但更優雅的辦法設置一個額外的 channel ,由讀取端通過關閉來通知寫入端任務完成不要再繼續再寫入數據了:

package main

import (
	"fmt"
	"sync"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)
	done := make(chan struct{})

	send := func(id int) {
		defer wg.Done()
		for i := 0; ; i++ {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("sender #%d exit\n", id)
				return
			case ch <- id*1000 + i:
			}
		}
	}

	recv := func() {
		count := 0
		for i := range ch {
			fmt.Printf("receiver get %d\n", i)
			count++
			if count >= 1000 {
				// signal recving finish
				close(done)
				return
			}
		}
	}

	wg.Add(3)
	go send(0)
	go send(1)
	go send(2)
	recv()

	wg.Wait()
}

多寫多讀

這種場景稍微復雜,和上面的例子一樣,也需要設置一個額外 channel 用來通知多個寫入端和讀取端。另外需要起一個額外的協程來通過關閉這個 channel 來廣播通知:

package main

import (
	"fmt"
	"sync"
	"time"
)

func main() {
	wg := &sync.WaitGroup{}
	ch := make(chan int, 100)
	done := make(chan struct{})

	send := func(id int) {
		defer wg.Done()
		for i := 0; ; i++ {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("sender #%d exit\n", id)
				return
			case ch <- id*1000 + i:
			}
		}
	}

	recv := func(id int) {
		defer wg.Done()
		for {
			select {
			case <-done:
				// get exit signal
				fmt.Printf("receiver #%d exit\n", id)
				return
			case i := <-ch:
				fmt.Printf("receiver #%d get %d\n", id, i)
				time.Sleep(time.Millisecond)
			}
		}
	}

	wg.Add(6)
	go send(0)
	go send(1)
	go send(2)
	go recv(0)
	go recv(1)
	go recv(2)

	time.Sleep(time.Second)
	// signal finish
	close(done)
	// wait all sender and receiver exit
	wg.Wait()
}

總結

channle 作為 golang 最重要的特性,用起來還是比較爽的。傳統的 C 里要實現類型的功能的話,一般需要用到 socket 或者 FIFO 來實現,另外還要考慮數據包的完整性與並發沖突的問題,channel 則屏蔽了這些底層細節,使用者只需要考慮讀寫就可以了。 channel 是引用類型,了解一下 channel 底層的機制對更好的使用 channel 還是很用必要的。雖然操作原語簡單,但涉及到阻塞的問題,使用不當可能會造成死鎖或者無限制的協程創建最終導致進程掛掉。
channel 除在可以用來在協程之間通信外,其阻塞和喚醒協程的特性也可以用作協程之間的同步機制,文中也用示例簡單介紹了這種場景下的用法。
關閉 channel 並不是必須的,只要沒有協程沒用引用 channel ,最終會被 GC 清理。所以使用的時候要特別注意,不要讓協程阻塞在 channel 上,這種情況很難檢測到,而且會造成 channel 和阻塞在 channel 的協程占有的資源無法被 GC 清理最終導致內存泄露。
channle 方便 golang 程序使用 CSP 的編程范形,但是 golang 是一種多范形的編程語言,golang 也支持傳統的通過共享內存來通信的編程方式。終極的原則是根據場景選擇合適的編程范型,不要因為 channel 好用而濫用 CSP 。

參考資料


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM