在 Go 語言中,如何正確的使用並發


從多個花絮中提取,但是如果我斗膽提出主要觀點的總結,其內容就是:搶占式多任務和一般共享狀態結合導致軟件開發過程不可管理的復雜性, 開發人員可能更喜歡保持自己的一些理智以此避免這種不可管理的復雜性。搶占式調度對於哪些真正的並行任務是好的,但是當可變狀態通過多並發線程共享時,明確的多任務合作更招人喜歡 。

盡管合作多任務,你的代碼仍有可能是復雜的,它只是有機會保持可管理下一定的復雜性。當控制轉移是明確一個代碼閱讀者至少有一些可見的跡象表明事情可能脫離正軌。沒有明確標記每個新階段是潛在的地雷:“如果這個操作不是原子操作,最后出現什么情況?”那么在每個命令之間的空間變成無盡的空間黑洞,可怕的Heisenbugs出現

在過去的一年多,盡管在Heka上的工作(一個高性能數據、日志和指標處理引擎)已大多數使用GO語言開發。Go的亮點之一就是語言本身有一些非常有用的並發原語。但是Go的並發性能怎么樣,需要通過支持本地推理的鼓勵代碼鏡頭觀察。

並非事實都是好的。所有的Goroutine訪問相同的共享內存空間,狀態默認可變,但是Go的調度程序不保證在上下文選擇過程中的准確性。在單核設置中,Go的運行時間進入“隱式協同工作”一類, 在Glyph中經常提到的異步程序模型列表選擇4。 當Goroutine能夠在多核系統中並行運行,世事難料。

Go不可能保護你,但是並不意味着你不能采取措施保護自己。在寫代碼過程中通過使用一些Go提供的原語,可最小化相關的搶占式調度產生的異常行為。請看下面Glyph示例“賬號轉換”代碼段中Go接口(忽略哪些不易於最終存儲定點小數的浮點數)

func Transfer(amount float64, payer, payee *Account, 
   server SomeServerType) error {
   if payer.Balance() < amount {
       return errors.New("Insufficient funds")
   }
   log.Printf("%s has sufficient funds", payer)
   payee.Deposit(amount)
   log.Printf("%s received payment", payee)
   payer.Withdraw(amount)
   log.Printf("%s made payment", payer)
   server.UpdateBalances(payer, payee) // Assume this is magic and always works.
   return nil
}

這明顯的是不安全的,如果從多個goroutine中調用的話,因為它們可能並發的從存款調度中得到相同的結果,然后一起請求更多的已取消調用的存款變量。最好是代碼中危險部分不會被多goroutine執行。在此一種方式實現了該功能:

type transfer struct { 
   payer *Account
   payee *Account
   amount float64
}
var xferChan = make(chan *transfer)
var errChan = make(chan error)
func init() {
   go transferLoop()
}
func transferLoop() {
   for xfer := range xferChan {
       if xfer.payer.Balance < xfer.amount {
           errChan <- errors.New("Insufficient funds")
           continue
       }
       log.Printf("%s has sufficient funds", xfer.payer)
       xfer.payee.Deposit(xfer.amount)
       log.Printf("%s received payment", xfer.payee)
       xfer.payer.Withdraw(xfer.amount)
       log.Printf("%s made payment", xfer.payer)
       errChan <- nil
   }
}
func Transfer(amount float64, payer, payee *Account,
   server SomeServerType) error {
   xfer := &transfer{
       payer: payer,
       payee: payee,
       amount: amount,
   }
   xferChan <- xfer
   err := <-errChan
   if err == nil  {
       server.UpdateBalances(payer, payee) // Still magic.
   }
   return err
}

這里有更多代碼,但是我們通過實現一個微不足道的事件循環消除並發問題。當代碼首次執行時,它激活一個goroutine運行循環。轉發請求為了此目的而傳遞入一個新創建的通道。結果經由一個錯誤通道返回到循環外部。因為通道不是緩沖的,它們加鎖,並且通過Transfer函數無論多個並發的轉發請求怎么進,它們都將通過單一的運行事件循環被持續的服務。

上面的代碼看起來有點別扭,也許吧. 對於這樣一個簡單的場景一個互斥鎖(mutex)也許會是一個更好的選擇,但是我正要嘗試去證明的是可以向一個go例程應用隔離狀態操作. 即使稍稍有點尷尬,但是對於大多數需求而言它的表現已經足夠好了,並且它工作起來,甚至使用了最簡單的賬號結構實現:

type Account struct { 
   balance float64
}
func (a *Account) Balance() float64 {
   return a.balance
}
func (a *Account) Deposit(amount float64) {
   log.Printf("depositing: %f", amount)
   a.balance += amount
}
func (a *Account) Withdraw(amount float64) {
   log.Printf("withdrawing: %f", amount)
   a.balance -= amount
}

不過如此笨拙的賬戶實現看起來會有點天真. 通過不讓任何大於當前平衡的撤回操作執行,從而讓賬戶結構自身提供一些保護也許更起作用。那如果我們把撤回函數變成下面這個樣子會怎么樣呢?

func (a *Account) Withdraw(amount float64) { 
   if amount > a.balance {
       log.Println("Insufficient funds")
       return
   }
   log.Printf("withdrawing: %f", amount)
   a.balance -= amount
}

不幸的是,這個代碼患有和我們原來的 Transfer 實現相同的問題。並發執行或不幸的上下文切換意味着我們可能以負平衡結束。幸運的是,內部的事件循環理念應用在這里同樣很好,甚至更好,因為事件循環 goroutine 可以與每個個人賬戶結構實例很好的耦合。這里有一個例子說明這一點:

type Account struct { 
   balance float64
   deltaChan chan float64
   balanceChan chan float64
   errChan chan error
}
func NewAccount(balance float64) (a *Account) {
   a = &Account{
       balance:     balance,
       deltaChan:   make(chan float64),
       balanceChan: make(chan float64),
       errChan:     make(chan error),
   }
   go a.run()
   return
}
func (a *Account) Balance() float64 {
   return <-a.balanceChan
}
func (a *Account) Deposit(amount float64) error {
   a.deltaChan <- amount
   return <-a.errChan
}
func (a *Account) Withdraw(amount float64) error {
   a.deltaChan <- -amount
   return <-a.errChan
}
func (a *Account) applyDelta(amount float64) error {
   newBalance := a.balance + amount
   if newBalance < 0 {
       return errors.New("Insufficient funds")
   }
   a.balance = newBalance
   return nil
}
func (a *Account) run() {
   var delta float64
   for {
       select {
       case delta = <-a.deltaChan:
           a.errChan <- a.applyDelta(delta)
       case a.balanceChan <- a.balance:
           // Do nothing, we've accomplished our goal w/ the channel put.
       }
   }
}

這個API略有不同,Deposit 和 Withdraw 方法現在都返回了錯誤。它們並非直接處理它們的請求,而是把賬戶余額的調整量放入 deltaChan,在 run 方法運行時的事件循環中訪問 deltaChan。同樣的,Balance 方法通過阻塞不斷地在事件循環中請求數據,直到它通過 balanceChan 接收到一個值。

須注意的要點是上述的代碼,所有對結構內部數據值得直接訪問和修改都是有事件循環觸發的within 代碼來完成的。如果公共 API 調用表現良好並且只使用給出的渠道同數據進行交互的話, 那么不管對公共方法進行多少並發的調用,我們都知道在任意給定的時間只會有它們之中的一個方法得到處理。我們的時間循環代碼推理起來更加容易了很多。

該模式的核心是 Heke 的設計. 當Heka啟動時,它會讀取配置文件並且在它自己的go例程中啟動每一個插件. 隨着時鍾信號、關閉通知和其它控制信號,數據經由通道被送入插件中. 這樣就鼓勵了插件作者使用一種想上述事例那樣的 事件循環類型的架構 來實現插件的功能.

再次,GO不會保護你自己. 寫一個同其內部數據管理和主題有爭議的條件保持松耦合的Heka插件(或者任何架構)是完全可能的。但是有一些需要注意的小地方,還有Go的爭議探測器的自由應用程序,你可以編寫的代碼其行為可以預測,甚至在搶占式調度的門面代碼中。

英文原文:Sane Concurrency with Go

譯文鏈接:http://www.oschina.net/translate/sane-concurrency-with-go


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM