Golang標准庫深入 - 鎖、信號量(sync)


概述

    sync包提供了基本的同步基元,如互斥鎖。除了Once和WaitGroup類型,大部分都是適用於低水平程序線程,高水平的同步使用channel通信更好一些。

本包的類型的值不應被拷貝。

    雖然文檔解釋可能不夠深入,或者淺顯易懂,但是我覺得還是貼出來,對比了解可能會更好。

    

    Go語言中實現並發或者是創建一個goroutine很簡單,只需要在函數前面加上"go",就可以了,那么並發中,如何實現多個goroutine之間的同步和通信?答: channel 我是第一個想到的, sync, 原子操作atomic等都可以。

詳解:

    首先我們先來介紹一下sync包下的各種類型。那么我們先來羅列一下sync包下所有的類型吧。

1. Cond 條件等待

type Cond struct {

        // L is held while observing or changing the condition
        L Locker
        // contains filtered or unexported fields
}

 

    解釋:

Cond實現了一個條件變量,一個線程集合地,供線程等待或者宣布某事件的發生。

每個Cond實例都有一個相關的鎖(一般是*Mutex或*RWMutex類型的值),它必須在改變條件時或者調用Wait方法時保持鎖定。Cond可以創建為其他結構體的字段,Cond在開始使用后不能被拷貝。

    條件等待通過Wait讓例程等待,通過Signal讓一個等待的例程繼續,通過Broadcase讓所有等待的繼續。

在Wait之前需要手動為c.L上鎖, Wait結束了手動解鎖。為避免虛假喚醒, 需要將Wait放到一個條件判斷的循環中,官方要求寫法:

c.L.Lock()
for !condition() {
    c.Wait()
}
// 執行條件滿足之后的動作...
c.L.Unlock()
 

 

成員文檔:

type Cond struct {
    L Locker // 在“檢查條件”或“更改條件”時 L 應該鎖定。
} 

// 創建一個條件等待
func NewCond(l Locker) *Cond

// Broadcast 喚醒所有等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖。
func (c *Cond) Broadcast()

// Signal 喚醒一個等待的 Wait,建議在“更改條件”時鎖定 c.L,更改完畢再解鎖。
func (c *Cond) Signal()

// Wait 會解鎖 c.L 並進入等待狀態,在被喚醒時,會重新鎖定 c.L
func (c *Cond) Wait()

 

代碼示例:

package main

import (
    "fmt"
    "sync"
    "time"
)

func main() {
    condition := false // 條件不滿足

    var mu sync.Mutex
    cond := sync.NewCond(&mu) // 創建一個Cond

    //讓協程去創造條件
    go func() {
        mu.Lock()
        condition = true // 改寫條件
        time.Sleep(3 * time.Second)
        cond.Signal() // 發送通知:條件ok
        mu.Unlock()
    }()

    mu.Lock()

    // 檢查條件是否滿足,避免虛假通知,同時避免 Signal 提前於 Wait 執行。
    for !condition { // 如果Signal提前執行了,那么此處就是false了

        // 等待條件滿足的通知,如果虛假通知,則繼續循環等待
        cond.Wait() // 等待時 mu 處於解鎖狀態,喚醒時重新鎖定。 (阻塞當前線程)

    }
    fmt.Println("條件滿足,開始后續動作...")
    mu.Unlock()

}
 

 

2. Locker 

type Locker interface {
    Lock()
    Unlock()
}

 

Locker接口代表一個可以加鎖和解鎖的對象。 是一個接口。

 

3. Mutex  互斥鎖

type Mutex struct {
    // contains filtered or unexported fields
}

 

解釋:

    Mutex 是互斥鎖。Mutex 的零值是一個解鎖的互斥鎖。 第一次使用后不得復制 Mutex 。

    互斥鎖是用來保證在任一時刻, 只能有一個例程訪問某個對象。 Mutex的初始值為解鎖的狀態。 通常作為其他結構體的你名字段使用, 並且可以安全的在多個例程中並行使用。

 

成員文檔:

// Lock 用於鎖住 m,如果 m 已經被加鎖,則 Lock 將被阻塞,直到 m 被解鎖。
func (m *Mutex) Lock()

// Unlock 用於解鎖 m,如果 m 未加鎖,則該操作會引發 panic。
func (m *Mutex) Unlock()

 

代碼示例:

package main

import (
    "fmt"
    "sync"
)

type SafeInt struct {
    sync.Mutex
    Num int
}

func main() {
    waitNum := 10 // 設置等待的個數(繼續往下看)

    count := SafeInt{}

    done := make(chan bool)

    for i := 0; i < waitNum; i++ {
        go func(i int) {
            count.Lock() // 加鎖,防止其它例程修改 count
            count.Num = count.Num + i
            fmt.Print(count.Num, " ")
            count.Unlock()

            done <- true
        }(i)
    }

    for i := 0; i < waitNum; i++ {
        <-done
    }
}
[ `go run sync_mutex.go` | done: 216.47974ms ]
    1 4 8 8 10 15 21 30 37 45

 

注意:多次輸出結果不一致, 試想為什么會出現10個結果中有0值得, 為什么10個結果中都大於0呢?或者都大於1呢? 那么會不會出現10個結果中最小值是9 呢?

 

4.  Once 單次執行

type Once struct {
    // contains filtered or unexported fields
}

 

  解釋:

    Once是只執行一次動作的對象。

    Once 的作用是多次調用但只執行一次,Once 只有一個方法,Once.Do(),向 Do 傳入一個函數,這個函數在第一次執行 Once.Do() 的時候會被調用,以后再執行 Once.Do() 將沒有任何動作,即使傳入了其它的函數,也不會被執行,如果要執行其它函數,需要重新創建一個 Once 對象。

成員文檔:

// 多次調用僅執行一次指定的函數 f
func (o *Once) Do(f func())

 

 

代碼示例:

package main

// 官方案例

import (
    "fmt"
    "sync"
)

func main() {
    var once sync.Once
    var num int
    onceBody := func() {
        fmt.Println("Only once")
    }

    done := make(chan bool)

    for i := 0; i < 10; i++ {
        go func() {
            once.Do(onceBody) // 多次調用
            done <- true
        }()
    }

    for i := 0; i < 10; i++ {
        <-done
    }
}

 

 

5. RWMutex 讀寫互斥鎖

type RWMutex struct {
    // 包含隱藏或非導出字段
}

 

解釋: 

RWMutex是讀寫互斥鎖。該鎖可以被同時多個讀取者持有或唯一個寫入者持有。RWMutex可以創建為其他結構體的字段;零值為解鎖狀態。RWMutex類型的鎖也和線程無關,可以由不同的線程加讀取鎖/寫入和解讀取鎖/寫入鎖。

    Mutex 可以安全的在多個例程中並行使用。

成員文檔:

// Lock 將 rw 設置為寫鎖定狀態,禁止其他例程讀取或寫入。
func (rw *RWMutex) Lock()

// Unlock 解除 rw 的寫鎖定狀態,如果 rw 未被寫鎖定,則該操作會引發 panic。
func (rw *RWMutex) Unlock()

// RLock 將 rw 設置為讀鎖定狀態,禁止其他例程寫入,但可以讀取。
func (rw *RWMutex) RLock()

// Runlock 解除 rw 的讀鎖定狀態,如果 rw 未被讀鎖頂,則該操作會引發 panic。
func (rw *RWMutex) RUnlock()

// RLocker 返回一個互斥鎖,將 rw.RLock 和 rw.RUnlock 封裝成了一個 Locker 接口。
func (rw *RWMutex) RLocker() Locker

 

注意,Lock() 鎖定時,其他例程是無法讀寫的。

1. 可以讀時, 多個goroutine可以同時讀。

2. 寫的時候, 其他goroutine不可讀也不可寫。

 

代碼實例:

package main

import (
    "fmt"
    "sync"
    "time"
)

var m *sync.RWMutex
var wg sync.WaitGroup

func main() {
    m = new(sync.RWMutex)
    wg.Add(2)
    go write(1)
    time.Sleep(1 * time.Second)
    go read(2)
    wg.Wait()
}
func write(i int) {
    fmt.Println(i, "寫開始.")
    m.Lock()
    fmt.Println(i, "正在寫入中......")
    time.Sleep(3 * time.Second)
    m.Unlock()
    fmt.Println(i, "寫入結束.")
    wg.Done()
}
func read(i int) {
    fmt.Println(i, "讀開始.")
    m.RLock()
    fmt.Println(i, "正在讀取中......")
    time.Sleep(1 * time.Second)
    m.RUnlock()
    fmt.Println(i, "讀取結束.")
    wg.Done()
}
> Output:
command-line-arguments
1 寫開始.
1 正在寫入中......
2 讀開始.
1 寫入結束.
2 正在讀取中......
2 讀取結束.
> Elapsed: 4.747s
> Result: Success

 

    當寫入開始時,加寫鎖開始寫入, 一秒后, 讀取goroutine開始讀取, 發現有寫入鎖,只能等待。 2秒后寫入完成, 解除寫鎖, 讀取開始加鎖,直到讀取完成。

圖解:

6. WaitGroup 組等待

type WaitGroup struct {
        // contains filtered or unexported fields
}

 

解釋:

WaitGroup用於等待一組線程的結束。父線程調用Add方法來設定應等待的線程的數量。每個被等待的線程在結束時應調用Done方法。同時,主線程里可以調用Wait方法阻塞至所有線程結束(計數器歸零)。

成員文檔:

// 計數器增加 delta,delta 可以是負數。
func (wg *WaitGroup) Add(delta int)

// 計數器減少 1
func (wg *WaitGroup) Done()

// 等待直到計數器歸零。如果計數器小於 0,則該操作會引發 panic。
func (wg *WaitGroup) Wait()
 

 

代碼實例:

func main() {
    wg := sync.WaitGroup{}
    wg.Add(10)
    for i := 0; i < 10; i++ {
        go func(i int) {
            defer wg.Done()
            fmt.Print(i, " ")
        }(i)
    }
    wg.Wait()
}

 

輸出是無序的。

注意此處有一個小坑,看代碼:

func main() {
    wg := sync.WaitGroup{}
    for i := 0; i < 10; i++ {
        go func(i int) {
            wg.Add(1)
            defer wg.Done()
            fmt.Print(i, " ")
        }(i)
    }
    wg.Wait()
}

 

    看輸出,發現會小於10個,甚至一個也沒有。問題就在於goroutine執行時間和main程的退出時間問題,導致Add()是否執行。

    再有就是復制和引用了,如果將wg復制給goroutine作為參數,一定要使用引用,否則就是兩個對象了。

 

那么介紹完上面所有的類型后, 我把Pool留到了最后,這也是要重點將的地方。

7. Pool  臨時對象池

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}

 

        Pool 用於存儲臨時對象,它將使用完畢的對象存入對象池中,在需要的時候取出來重復使用,目的是為了避免重復創建相同的對象造成 GC 負擔過重。其中存放的臨時對象隨時可能被 GC 回收掉(如果該對象不再被其它變量引用)。

  從 Pool 中取出對象時,如果 Pool 中沒有對象,將返回 nil,但是如果給 Pool.New 字段指定了一個函數的話,Pool 將使用該函數創建一個新對象返回。

  Pool 可以安全的在多個例程中並行使用,但 Pool 並不適用於所有空閑對象,Pool 應該用來管理並發的例程共享的臨時對象,而不應該管理短壽命對象中的臨時對象,因為這種情況下內存不能很好的分配,這些短壽命對象應該自己實現空閑列表。

  Pool 在開始使用之后,不能再被復制。
        

Pool的實現:

1.定時清理

    文檔上說,保存在Pool中的對象會在沒有任何通知的情況下被自動移除掉。實際上,這個清理過程是在每次垃圾回收之前做的。垃圾回收是固定兩分鍾觸發一次。而且每次清理會將Pool中的所有對象都清理掉!

2.如何管理數據

先看看這幾個數據結構

type Pool struct {
    noCopy noCopy

    local     unsafe.Pointer // local fixed-size per-P pool, actual type is [P]poolLocal
    localSize uintptr        // size of the local array

    // New optionally specifies a function to generate
    // a value when Get would otherwise return nil.
    // It may not be changed concurrently with calls to Get.
    New func() interface{}
}
  
// Local per-P Pool appendix.
type poolLocalInternal struct {
    private interface{}   // Can be used only by the respective P.
    shared  []interface{} // Can be used by any P.
    Mutex                 // Protects shared.
}

type poolLocal struct {
    poolLocalInternal

    // Prevents false sharing on widespread platforms with
    // 128 mod (cache line size) = 0 .
    pad [128 - unsafe.Sizeof(poolLocalInternal{})%128]byte
}

 


Pool是提供給外部使用的對象。其中的local成員的真實類型是一個poolLocal數組,localSize是數組長度。poolLocal是真正保存數據的地方。priveate保存了一個臨時對象,shared是保存臨時對象的數組。  

為什么Pool中需要這么多poolLocal對象呢?實際上,Pool是給每個線程分配了一個poolLocal對象。也就是說local數組的長度,就是工作線程的數量(size := runtime.GOMAXPROCS(0))。當多線程在並發讀寫的時候,通常情況下都是在自己線程的poolLocal中存取數據。當自己線程的poolLocal中沒有數據時,才會嘗試加鎖去其他線程的poolLocal中“偷”數據。

func (p *Pool) Get() interface{} {
    if race.Enabled {
        race.Disable()
    } 
    l := p.pin()    //獲取當前線程的poolLocal對象,也就是p.local[pid]。
    x := l.private
    l.private = nil
    runtime_procUnpin()
    if x == nil {
        l.Lock()
        last := len(l.shared) - 1
        if last >= 0 {
            x = l.shared[last]
            l.shared = l.shared[:last]
        }
        l.Unlock()
        if x == nil {
            x = p.getSlow()
        }
    }
    if race.Enabled {
        race.Enable()
        if x != nil {
            race.Acquire(poolRaceAddr(x))
        }
    }
    if x == nil && p.New != nil {
        x = p.New()
    }
    return x
}

 


        為什么這里要鎖住。答案在getSlow中。因為當shared中沒有數據的時候,會嘗試去其他的poolLocal的shared中偷數據。Pool.Get的時候,首先會在local數組中獲取當前線程對應的poolLocal對象。如果private中有數據,則取出來直接返回。如果沒有則先鎖住shared,有數據則直接返回。

        Go語言的goroutine雖然可以創建很多,但是真正能物理上並發運行的goroutine數量是有限的,是由runtime.GOMAXPROCS(0)設置的。所以這個Pool高效的設計的地方就在於將數據分散在了各個真正並發的線程中,每個線程優先從自己的poolLocal中獲取數據,很大程度上降低了鎖競爭。 

來源:https://my.oschina.net/90design/blog/1814499

 

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM