go語言中sync包和channel機制


文章轉載至:https://www.bytelang.com/article/content/A4jMIFmobcA=


 

 

golang中實現並發非常簡單,只需在需要並發的函數前面添加關鍵字"Go",但是如何處理go並發機制中不同goroutine之間的同步與通信,golang 中提供了sync包和channel機制來解決這一問題.

sync 包提供了互斥鎖這類的基本的同步原語.除 Once 和 WaitGroup 之外的類型大多用於底層庫的例程。更高級的同步操作通過信道與通信進行。

type Cond
    func NewCond(l Locker) *Cond
    func (c *Cond) Broadcast()
    func (c *Cond) Signal()
    func (c *Cond) Wait()
type Locker
type Mutex
    func (m *Mutex) Lock()
    func (m *Mutex) Unlock()
type Once
    func (o *Once) Do(f func())
type Pool
    func (p *Pool) Get() interface{}
    func (p *Pool) Put(x interface{})
type RWMutex
    func (rw *RWMutex) Lock()
    func (rw *RWMutex) RLock()
    func (rw *RWMutex) RLocker() Locker
    func (rw *RWMutex) RUnlock()
    func (rw *RWMutex) Unlock()
type WaitGroup
    func (wg *WaitGroup) Add(delta int)
    func (wg *WaitGroup) Done()
    func (wg *WaitGroup) Wait()

  

 

而golang中的同步是通過sync.WaitGroup來實現的.WaitGroup的功能:它實現了一個類似隊列的結構,可以一直向隊列中添加任務,當任務完成后便從隊列中刪除,如果隊列中的任務沒有完全完成,可以通過Wait()函數來出發阻塞,防止程序繼續進行,直到所有的隊列任務都完成為止.

WaitGroup總共有三個方法:Add(delta int), Done(), Wait()。Add:添加或者減少等待goroutine的數量Done:相當於Add(-1)Wait:執行阻塞,直到所有的WaitGroup數量變成0

具體例子如下:

 

package main  
  
import (  
    "fmt"  
    "sync"  
)  
  
var waitgroup sync.WaitGroup  
  
func Afunction(shownum int) {  
    fmt.Println(shownum)  
    waitgroup.Done() //任務完成,將任務隊列中的任務數量-1,其實.Done就是.Add(-1)  
}  
  
func main() {  
    for i := 0; i < 10; i++ {  
        waitgroup.Add(1) //每創建一個goroutine,就把任務隊列中任務的數量+1  
        go Afunction(i)  
    }  
    waitgroup.Wait() //.Wait()這里會發生阻塞,直到隊列中所有的任務結束就會解除阻塞  
}  

  

在線示例:https://www.bytelang.com/o/s/c/6z7UkvezTJg=

 

使用場景:

  程序中需要並發,需要創建多個goroutine,並且一定要等這些並發全部完成后才繼續接下來的程序執行.WaitGroup的特點是Wait()可以用來阻塞直到隊列中的所有任務都完成時才解除阻塞,而不需要sleep一個固定的時間來等待.但是其缺點是無法指定固定的goroutine數目.

 

Channel機制:

相對sync.WaitGroup而言,golang中利用channel實習同步則簡單的多.channel自身可以實現阻塞,其通過<-進行數據傳遞,channel是golang中一種內置基本類型,對於channel操作只有4種方式:

創建channel(通過make()函數實現,包括無緩存channel和有緩存channel);

向channel中添加數據(channel<-data);

從channel中讀取數據(data<-channel);

關閉channel(通過close()函數實現,關閉之后無法再向channel中存數據,但是可以繼續從channel中讀取數據)

channel分為有緩沖channel和無緩沖channel,兩種channel的創建方法如下:

var ch = make(chan int) //無緩沖channel,等同於make(chan int ,0)

var ch = make(chan int,10) //有緩沖channel,緩沖大小是5

其中無緩沖channel在讀和寫是都會阻塞,而有緩沖channel在向channel中存入數據沒有達到channel緩存總數時,可以一直向里面存,直到緩存已滿才阻塞.由於阻塞的存在,所以使用channel時特別注意使用方法,防止死鎖的產生.例子如下:

無緩存channel:

 

package main  
  
import "fmt"  
  
func Afuntion(ch chan int) {  
    fmt.Println("finish")  
    <-ch  
}  
  
func main() {  
    ch := make(chan int) //無緩沖的channel  
    go Afuntion(ch)  
    ch <- 1  
      
    // 輸出結果:  
    // finish  
} 

 

在線示例:https://www.bytelang.com/o/s/c/3cxH7Jko7YY=

 

代碼分析:首先創建一個無緩沖channel ch, 然后執行 go Afuntion(ch),此時執行<-ch,則Afuntion這個函數便會阻塞,不再繼續往下執行,直到主進程中ch<-1向channel ch 中注入數據才解除Afuntion該協程的阻塞.

更正:

代碼分析:對於該段程序(只有單核cpu運行的程序)首先創建一個無緩沖channel  ch,然后遇到go Afuntion(ch),查看此時無cpu可以用來運行該任務,則將該任務記下,等到有cpu時再運行該任務,然后執行ch<-1,此時主goroutine阻塞,查找是否有其他協程,查找到有Afuntion(ch)這一goroutine,則執行該goroutine內容,直到<-ch才從主goroutine獲取數據1,解除主goroutine阻塞.(注:這種執行方式僅限於單核cpu)

如果指定多個cpu運行,則首先運行主goroutine創建無緩沖的channel,然后查看是否有空閑cpu可以運行另外一個goroutine,如果有,則運行協程Afuntion(ch),對於多核cpu,主goroutine和另外一個goroutine的運行順序是不確定的.

 

 

package main
import "fmt"  
import "runtime"  
import "time"  
  
func Afuntion(ch chan int) {  
    fmt.Println("finish")  
    <-ch  
}  
  
func main() {  
    runtime.GOMAXPROCS(runtime.NumCPU())  
    ch := make(chan int) //無緩沖的channel  
    go Afuntion(ch)  
    time.Sleep(time.Nanosecond * 1000)  
    fmt.Println("main goroutine")  
    ch <- 1  
}  

 

 

在線示例:https://www.bytelang.com/o/s/c/9z_uWI5ZumA=

 

運行結果: 

finishmain goroutine

或者  main goroutine

finish

主goroutine和另外一個goroutine的執行順序是不確定的(對於多核cpu)

 

package main  
  
import "fmt"  
  
func Afuntion(ch chan int) {  
    fmt.Println("finish")  
    <-ch  
}  
  
func main() {  
    ch := make(chan int) //無緩沖的channel  
    //只是把這兩行的代碼順序對調一下  
    ch <- 1  
    go Afuntion(ch)  
  
    // 輸出結果:  
    // 死鎖,無結果  
}  

 

在線示例:https://www.bytelang.com/o/s/c/sLL_Cto3k4E=

 

代碼分析:首先創建一個無緩沖的channel, 然后在主協程里面向channel ch 中通過ch<-1命令寫入數據,則此時主協程阻塞,就無法執行下面的go Afuntions(ch),自然也就無法解除主協程的阻塞狀態,則系統死鎖

總結:
對於無緩存的channel,放入channel和從channel中向外面取數據這兩個操作不能放在同一個協程中,防止死鎖的發生;同時應該先利用go 開一個協程對channel進行操作,此時阻塞該go 協程,然后再在主協程中進行channel的相反操作(與go 協程對channel進行相反的操作),實現go 協程解鎖.即必須go協程在前,解鎖協程在后.

帶緩存channel:
對於帶緩存channel,只要channel中緩存不滿,則可以一直向 channel中存入數據,直到緩存已滿;同理只要channel中緩存不為0,便可以一直從channel中向外取數據,直到channel緩存變為0才會阻塞.

由此可見,相對於不帶緩存channel,帶緩存channel不易造成死鎖,可以同時在一個goroutine中放心使用,

 

close():

close主要用來關閉channel通道其用法為close(channel),並且實在生產者的地方關閉channel,而不是在消費者的地方關閉.並且關閉channel后,便不可再想channel中繼續存入數據,但是可以繼續從channel中讀取數據.例子如下:

 

package main  
  
import "fmt"  
  
func main() {  
    var ch = make(chan int, 20)  
    for i := 0; i < 10; i++ {  
        ch <- i  
    }  
    close(ch)  
    //ch <- 11 //panic: runtime error: send on closed channel  
    for i := range ch {  
        fmt.Println(i) //輸出0 1 2 3 4 5 6 7 8 9  
    }  
} 

 

在線示例:https://www.bytelang.com/o/s/c/XBiMiCoE7dc=

 

channel阻塞超時處理:
goroutine有時候會進入阻塞情況,那么如何避免由於channel阻塞導致整個程序阻塞的發生那?解決方案:通過select設置超時處理,具體程序如下:

 

package main  
  
 import (  
    "fmt"  
    "time"  
)  
  
func main() {  
    c := make(chan int)  
    o := make(chan bool)  
    go func() {  
        for {  
            select {  
            case i := <-c:  
                fmt.Println(i)  
            case <-time.After(time.Duration(3) * time.Second):    //設置超時時間為3s,如果channel 3s鍾沒有響應,一直阻塞,則報告超時,進行超時處理.  
                fmt.Println("timeout")  
                o <- true  
                break  
            }  
        }  
    }()  
    <-o  
}

 

在線示例:https://www.bytelang.com/o/s/c/6V74LnkRLN0=

 

golang 並發總結:

並發兩種方式:sync.WaitGroup,該方法最大優點是Wait()可以阻塞到隊列中的所有任務都執行完才解除阻塞,但是它的缺點是不能夠指定並發協程數量.
channel優點:能夠利用帶緩存的channel指定並發協程goroutine,比較靈活.但是它的缺點是如果使用不當容易造成死鎖;並且他還需要自己判定並發goroutine是否執行完.

但是相對而言,channel更加靈活,使用更加方便,同時通過超時處理機制可以很好的避免channel造成的程序死鎖,因此利用channel實現程序並發,更加方便,更加易用.


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM