CSP vs Actor 模型
Actor
Actor 模型是一個通用的並發編程模型,可以應用在幾乎任何一種編程語言中,典型的是 Erlang。多個 actor(進程) 可以同時運行、不共享狀態、通過向與進程綁定的消息隊列(也稱為信箱)異步發送消息來進行通信。
actor-1 與 actor-2 進程通信依賴一個消息隊列,而且消息隊列與進程互相耦合綁定。actor-1 在發送完消息之后,在 actor-2 沒有處理該消息的情況下,可以繼續執行其他任務,這說明 actor 進程之間的通信是異步的。
優點
-
消息傳輸和封裝,多個 Actor 可以同時運行,但不共享狀態,而且單個 actor 中的事件是串行執行(這歸功於隊列)
-
Actor 模型支持共享內存模型,也支持分布式內存模型
缺點
-
盡管 Actor 模型比使用線程和鎖模型的程序更易 debug,但是也會存在死鎖的問題,而且還需要擔心綁定進程的隊列溢出的問題
-
沒有對並行提供直接支持,需要通過並發的技術來構造並行方案
CSP
CSP即通信順序進程(communicating sequential processes),與 Actor 模型類似,該模型也是由獨立的、並發執行的實體所組成,實體之間通過發送消息進行通信。go 中的 csp 模型 channel
對於goroutine
來說是匿名的,不需要和 gid
綁定,通過 channel
完成 goroutine
之間的通信。(channel 在 CSP 代表通道的概念,這里只討論 Go 相關,channel 等價於 Go 中的 channel)
優點
-
與 Actor 相比,CSP 最大的優點是靈活性。Actor 模型,負責通信的媒介和執行單元是耦合的。而 CSP 中,
channel
是第一類對象,可以被獨立創造、寫入、讀出數據,也可以在不同執行單元中傳遞。
缺點
-
CSP 模型也易受死鎖影響,且沒有提供直接的並行支持。並行需要建立在並發基礎上,引入了不確定性。
區別
-
Actor 模型重在參與交流的實體(即進程),而 CSP 重在交流的通道,如 Go 中的
channel
-
CSP 模型不關注發送消息的進程,而是關注發送消息時使用的
channel
,而channel
不像 Actor 模型那樣進程與隊列緊耦合。而是可以單獨創建和讀寫,並在進程 (goroutine) 之間傳遞。
GO 中的並發模型
Go 是采用 SCP 的思想的,channel 是 go 在並發編程通信的推薦手段,Go 的設計者 Rob Pike有一句經典的名言,
Do not communicate by sharing memory; instead, share memory by communicating.
這句話是說“不要使用共享內存通信,而是應該使用通信去共享內存”,Go 語言推薦我們使用通信來進行進程間同步消息。這樣做有三點好處,來源於 draveness 的博客文章。
-
首先,使用發送消息來同步信息相比於直接使用共享內存和互斥鎖是一種更高級的抽象,使用更高級的抽象能夠為我們在程序設計上提供更好的封裝,讓程序的邏輯更加清晰;
-
其次,消息發送在解耦方面與共享內存相比也有一定優勢,我們可以將線程的職責分成生產者和消費者,並通過消息傳遞的方式將它們解耦,不需要再依賴共享內存;
-
最后,Go 語言選擇消息發送的方式,通過保證同一時間只有一個活躍的線程能夠訪問數據,能夠從設計上天然地避免線程競爭和數據沖突的問題;
並發設計模式
上文介紹了 Go 中使用的並發模型,而在這種並發模型下面 channel
是一個重要的概念,而下面每一種模式的設計都依賴於 channel
,所以有必要了解一下。
Barrier 模式
barrier 屏障模式故名思義就是一種屏障,用來阻塞直到聚合所有 goroutine 返回結果。可以使用 channel
來實現。
使用場景
-
多個網絡請求並發,聚合結果
-
粗粒度任務拆分並發執行,聚合結果
代碼實現
/* * Barrier */ type barrierResp struct { Err error Resp string Status int } // 構造請求 func makeRequest(out chan<- barrierResp, url string) { res := barrierResp{} client := http.Client{ Timeout: time.Duration(2*time.Microsecond), } resp, err := client.Get(url) if resp != nil { res.Status = resp.StatusCode } if err != nil { res.Err = err out <- res return } byt, err := ioutil.ReadAll(resp.Body) defer resp.Body.Close() if err != nil { res.Err = err out <- res return } res.Resp = string(byt) out <- res } // 合並結果 func barrier(endpoints ...string) { requestNumber := len(endpoints) in := make(chan barrierResp, requestNumber) response := make([]barrierResp, requestNumber) defer close(in) for _, endpoints := range endpoints { go makeRequest(in, endpoints) } var hasError bool for i := 0; i < requestNumber; i++ { resp := <-in if resp.Err != nil { fmt.Println("ERROR: ", resp.Err, resp.Status) hasError = true } response[i] = resp } if !hasError { for _, resp := range response { fmt.Println(resp.Status) } } } func main() { barrier([]string{"https://www.baidu.com", "http://www.sina.com","https://segmentfault.com/"}...) }
Tips
Barrier 模式也可以使用 errgroup
擴展庫來實現,這樣更加簡單明了。這個包有點類似於 sync.WaitGroup
,但是區別是當其中一個任務發生錯誤時,可以返回該錯誤。而這也滿足我們 Barrier 模式的需求。
func barrier(endpoints ...string) { var g errgroup.Group var mu sync.Mutex response := make([]barrierResp, len(endpoints)) for i, endpoint := range endpoints { i, endpoint := i, endpoint // create locals for closure below g.Go(func() error { res := barrierResp{} resp, err := http.Get(endpoint) if err != nil { return err } byt, err := ioutil.ReadAll(resp.Body) defer resp.Body.Close() if err != nil { return err } res.Resp = string(byt) mu.Lock() response[i] = res mu.Unlock() return err }) } if err := g.Wait(); err != nil { fmt.Println(err) } for _, resp := range response { fmt.Println(resp.Status) } }
Future 模式
future 即未來,來自未來的模式(手動狗頭)。這個模式常用在異步處理也稱為 Promise 模式,采用一種 fire-and-forget
的方式,是指主 goroutine 不等子 goroutine 執行完就直接返回了,然后等到未來執行完的時候再去取結果。在 Go 中由於 goroutine 的存在,實現這種模式是挺簡單的。
使用場景
-
異步
代碼實現
/* * Future */ type Function func(string) (string, error) type Future interface { SuccessCallback() error FailCallback() error Execute(Function) (bool, chan struct{}) } type AccountCache struct { Name string } func (a *AccountCache) SuccessCallback() error { fmt.Println("It's success~") return nil } func (a *AccountCache) FailCallback() error { fmt.Println("It's fail~") return nil } func (a *AccountCache) Execute(f Function) (bool, chan struct{}){ done := make(chan struct{}) go func(a *AccountCache) { _, err := f(a.Name) if err != nil { _ = a.FailCallback() } else { _ = a.SuccessCallback() } done <- struct{}{} }(a) return true, done } func NewAccountCache(name string) *AccountCache { return &AccountCache{ name, } } func testFuture() { var future Future future = NewAccountCache("Tom") updateFunc := func(name string) (string, error){ fmt.Println("cache update:", name) return name, nil } _, done := future.Execute(updateFunc) defer func() { <-done }() } func main() { var future Future future = NewAccountCache("Tom") updateFunc := func(name string) (string, error){ fmt.Println("cache update:", name) return name, nil } _, done := future.Execute(updateFunc) defer func() { <-done }() // do something }
這里有一個技巧:為什么使用
struct
類型作為channel
的通知?很多開源代碼都是使用這種方式來作為信號通知機制,主要是因為空
struct
在 Go 中占的內存是最少的。
Pipeline 模式
使用場景
-
可以利用多核的優勢把一段粗粒度邏輯分解成多個 goroutine 執行
Pipeline 本身翻譯過來就是管道的意思,注意和 Barrire 模式不同的是,它是按順序的,類似於流水線。
這個圖不是很能表達並行的概念,其實三個 goroutine 是同時執行的,通過 buffer channel 將三者串起來,只要前序 goroutine 處理完一部分數據,就往下傳遞,達到並行的目的。
代碼實現
實現一個功能,給定一個切片,然后求它的子項的平方和。
例如,[1, 2, 3] -> 1^2 + 2^2 + 3^2 = 14。
正常的邏輯,遍歷切片,然后求平方累加。使用 pipeline 模式,可以把求和和求平方拆分出來並行計算。
/* * Pipeline 模式 */ func generator(max int) <-chan int{ out := make(chan int, 100) go func() { for i := 1; i <= max; i++ { out <- i } close(out) }() return out } func power(in <-chan int) <-chan int{ out := make(chan int, 100) go func() { for v := range in { out <- v * v } close(out) }() return out } func sum(in <-chan int) <-chan int{ out := make(chan int, 100) go func() { var sum int for v := range in { sum += v } out <- sum close(out) }() return out } func main() { // [1, 2, 3] fmt.Println(<-sum(power(generator(3)))) }
Workers Pool 模式
使用場景
-
高並發任務
在 Go 中 goroutine 已經足夠輕量,甚至 net/http
server 的處理方式也是 goroutine-per-connection
的,所以比起其他語言來說可能場景稍微少一些。每個 goroutine 的初始內存消耗在 2~8kb,當我們有大批量任務的時候,需要起很多 goroutine 來處理,這會給系統代理很大的內存開銷和 GC 壓力,這個時候就可以考慮一下協程池。
代碼實現
/* * Worker pool */ type TaskHandler func(interface{}) type Task struct { Param interface{} Handler TaskHandler } type WorkerPoolImpl interface { AddWorker() // 增加 worker SendTask(Task) // 發送任務 Release() // 釋放 } type WorkerPool struct { wg sync.WaitGroup inCh chan Task } func (d *WorkerPool) AddWorker() { d.wg.Add(1) go func(){ for task := range d.inCh { task.Handler(task.Param) } d.wg.Done() }() } func (d *WorkerPool) Release() { close(d.inCh) d.wg.Wait() } func (d *WorkerPool) SendTask(t Task) { d.inCh <- t } func NewWorkerPool(buffer int) WorkerPoolImpl { return &WorkerPool{ inCh: make(chan Task, buffer), } } func main() { bufferSize := 100 var workerPool = NewWorkerPool(bufferSize) workers := 4 for i := 0; i < workers; i++ { workerPool.AddWorker() } var sum int32 testFunc := func (i interface{}) { n := i.(int32) atomic.AddInt32(&sum, n) } var i, n int32 n = 1000 for ; i < n; i++ { task := Task{ i, testFunc, } workerPool.SendTask(task) } workerPool.Release() fmt.Println(sum) }
協程池使用了反射來獲取執行的函數及參數,在 Go 中可能有點讓人有點膈應。但是如果批量執行的函數是已知的,可以優化成一種只執行指定函數的協程池,能夠提升性能。
Pub/Sub 模式
發布訂閱模式是一種消息通知模式,發布者發送消息,訂閱者接收消息。
使用場景
-
消息隊列
代碼實現
/* * Pub/Sub */ type Subscriber struct { in chan interface{} id int topic string stop chan struct{} } func (s *Subscriber) Close() { s.stop <- struct{}{} close(s.in) } func (s *Subscriber) Notify(msg interface{}) (err error) { defer func() { if rec := recover(); rec != nil { err = fmt.Errorf("%#v", rec) } }() select { case s.in <-msg: case <-time.After(time.Second): err = fmt.Errorf("Timeout\n") } return } func NewSubscriber(id int) SubscriberImpl { s := &Subscriber{ id: id, in: make(chan interface{}), stop: make(chan struct{}), } go func() { for{ select { case <-s.stop: close(s.stop) return default: for msg := range s.in { fmt.Printf("(W%d): %v\n", s.id, msg) } } }}() return s } // 訂閱者需要實現的方法 type SubscriberImpl interface { Notify(interface{}) error Close() } // sub 訂閱 pub func Register(sub Subscriber, pub *publisher){ pub.addSubCh <- sub return } // pub 結果定義 type publisher struct { subscribers []SubscriberImpl addSubCh chan SubscriberImpl removeSubCh chan SubscriberImpl in chan interface{} stop chan struct{} } // 實例化 func NewPublisher () *publisher{ return &publisher{ addSubCh: make(chan SubscriberImpl), removeSubCh: make(chan SubscriberImpl), in: make(chan interface{}), stop: make(chan struct{}), } } // 監聽 func (p *publisher) start() { for { select { // pub 發送消息 case msg := <-p.in: for _, sub := range p.subscribers{ _ = sub.Notify(msg) } // 移除指定 sub case sub := <-p.removeSubCh: for i, candidate := range p.subscribers { if candidate == sub { p.subscribers = append(p.subscribers[:i], p.subscribers[i+1:]...) candidate.Close() break } } // 增加一個 sub case sub := <-p.addSubCh: p.subscribers = append(p.subscribers, sub) // 關閉 pub case <-p.stop: for _, sub := range p.subscribers { sub.Close() } close(p.addSubCh) close(p.in) close(p.removeSubCh) return } } } func main() { // 測試代碼 pub := NewPublisher() go pub.start() sub1 := NewWriterSubscriber(1) Register(sub1, pub) sub2 := NewWriterSubscriber(2) Register(sub2, pub) commands:= []int{1, 2, 3, 4, 5, 6, 7, 8, 9} for _, c := range commands { pub.in <- c } pub.stop <- struct{}{} time.Sleep(time.Second*1) }
注意事項
-
同步問題,尤其同步原語和
channel
一起用時,容易出現死鎖 -
goroutine 崩潰問題,如果子 goroutine panic 沒有 recover 會引起主 goroutine 異常退出
-
goroutine 泄漏問題,確保 goroutine 能正常關閉
參考
-
《go design pattern》書
-
《七周七並發模型》書
-
為什么使用通信來共享內存?· Why's THE Design?
-
[advanced-go-concurrency](https://encore.dev/blog/advanced-go-concurrency)