goroutine間的同步&協作


摘要

本文列舉 Golang 協程間的同步和協作工具,同步工具包括 sync.Mutex & sync.RWMutexsync.Cond、原子操作、sync.Poolsync.Map,協作工具包括 sync.WaitGroupsync.Oncecontext.Context
本文還涉及到了 Golang 結構體禁止復制特性 nocopy 的實現方式、計算機原子操作的實現原理。

Go語言中的同步工具

基礎概念

競態條件(race condition)

一份數據被多個線程共享,可能會產生爭用和沖突的情況。這種情況被稱為競態條件,競態條件會破壞共享數據的一致性,影響一些線程中代碼和流程的正確執行。
Go 語言提供了競態競爭檢測工具,使用 go build -race 可以啟用競爭檢測器編譯程序。

同步

同步可以解決競態問題。它本質上是在控制多個線程對共享資源的訪問。這種控制主要包含兩點:

  1. 避免多個線程在同一時刻操作同一個數據塊。
  2. 協調多個線程,以避免它們在同一時刻執行同一個代碼塊。

根據同步的原則,程序想使用一個共享資源,就必須先請求該資源並獲取到對它的訪問權限。當程序不再使用這個資源時,它應該釋放該資源,放棄對它的訪問權。一個程序對資源的請求不應該導致其他正在訪問該資源的程序中斷,而應該等到那個程序釋放該資源后再進行請求。同一時刻某種資源應該只被一個程序占用

互斥量sync.Mutex

一個互斥鎖可以被用來保護一個臨界區或者一組相關臨界區。它可以保證,在同一時刻只有一個 goroutine 處於該臨界區之內。
每當有 goroutine 想進入臨界區時,都需要先加鎖,每個 goroutine 離開臨界區時,都要及時解鎖。

Mutex的使用

var mutex sync.Mutex

func updatePublicResource() {
    mutex.Lock()
    doUpdate()
    mutex.Unlock()
}
Tips
  • 不要重復鎖定互斥鎖。
  • 不要忘記解鎖互斥鎖,推薦使用defer。
  • 不要對尚未鎖定的互斥鎖解鎖。不要重復解鎖。
  • 不要復制鎖。

對一個已經被鎖定的互斥鎖進行鎖定,會阻塞當前的 goroutine 。如果其他的用戶級 goroutine 也處於等待狀態,整個程序就停止執行了,Go 語言運行時系統會拋出一個死鎖的 panic 錯誤,程序就會崩潰。因此,每一個鎖定操作,都要有且只有一個對應的解鎖操作。

讀寫鎖sync.RWMutex

讀寫鎖是讀 / 寫互斥鎖的簡稱,讀寫鎖是互斥鎖的一種擴展。一個讀寫鎖中包含了兩個鎖,即:讀鎖和寫鎖。
讀寫鎖可以對共享資源的“讀操作”和“寫操作”進行區別,實現更加細膩的訪問控制。
對於某個受到讀寫鎖保護的共享資源,多個寫操作不能同時進行,寫操作和讀操作也不能同時進行,多個讀操作可以同時進行。

var mutex sync.RWMutex

func updatePublicResource() {
    mutex.Lock()
    doUpdate()
    mutex.Unlock()
}

func readPublicResource() {
    mutex.RLock()
    read()
    mutex.RUnlock()
}

對寫鎖進行解鎖,會喚醒“所有因試圖鎖定讀鎖,而被阻塞的 goroutine”,通常它們都能成功完成對讀鎖的鎖定。
對讀鎖進行解鎖,會在沒有其他鎖定中讀鎖的前提下,喚醒“因試圖鎖定寫鎖,而被阻塞的 goroutine”;只有一個等待時間最長的被喚醒的 goroutine 能夠成功完成對寫鎖的鎖定。
讀寫鎖是互斥鎖的擴展,因此有些方面它還是沿用了互斥鎖的行為模式。比如,解鎖未被鎖定的寫鎖或讀鎖,會立刻引發 panic。

條件變量sync.Cond

條件變量是基於互斥鎖的,它不用於保護臨界區和共享資源,而是用於協調想要訪問共享資源的那些線程的。當共享資源的狀態發生變化時,它可以被用來通知被互斥鎖阻塞的線程。
io.Pipe 的實現就基於 sync.Cond。有興趣的讀者可以通過這篇文章 《理解golang io.Pipe》 了解 io.Pipe,這篇文章對 io.Pipe 的內部實現和使用場景進行了介紹。
sync.Cond 需要 sync.Locker 類型的參數用於初始化。

type Locker interface {
	Lock()
	Unlock()
}

noCopy

大多數同步工具禁止在使用后進行復制。Golang 使用兩個內嵌字段實現 coCopy 功能:noCopy 和 checker。noCopy 字段用於代碼檢查工具,checker 字段用於保證運行時不發生復制。

type Cond struct {
    // 用於標識當前結構體在第一次使用后不應該再復制
    // 用於 go vet 編譯檢查
	noCopy noCopy
	// Cond 基於的鎖
	L Locker
    // 一個基於ticket的通知列表
    // 保存了 goroutine 信息的雙向鏈表
	notify  notifyList
	// 保證運行時發生拷貝拋出  panic
	// 在第一次生成時,初始化為 Cond 地址,如果發生復制,復制對象的地址和當前地址將會不同
	checker copyChecker
}

sync.Cond 提供 3 個方法:

  • Broadcast():喚醒所有等待 Cond 的 goroutine。不需要在鎖的保護下進行。
  • Signal():喚醒一個等待 Cond 的 goroutine。不需要在鎖的保護下進行。
  • Wait():解鎖互斥鎖,掛起當前 goroutine。當 Broadcast 或 Signal 喚醒這個 goroutine,Wait 在返回前會再鎖定互斥鎖。因此 Wait() 需要在鎖的保護下進行。
var lock sync.RWMutex
var sendCond, recvCond *sync.Cond

func init() {
    sendCond = sync.NewCond(&lock)
    recvCond = sync.NewCond(&lock.RLock()) // 獲取讀寫鎖中的讀鎖
}

func send() {
    lock.Lock()
    for !writeCondition() {
        sendCond.Wait()
    }
    writeResource()
    lock.Unlock()
    recvCond.Signal()// 如果有多個接收的 goroutine 就使用 recvCond.Broadcast()
}

func receive() {
    lock.Lock()
    for !readCondition() {
        recvCond.Wait()
    }
    receiveResource()
    lock.Unlock()
    sendCond.Signal()// 如果有多個發送的 goroutine 就使用Broadcast()
}

有時 sync.Cond 的功能用 channel 也能實現,不過 channel 的意義更多地在於傳遞數據,而 sync.Cond 的意義在於協程的協作;並且 sync.Cond 更為底層,效率更高。

Tips
  • Cond 在第一次使用后不能復制。
  • 條件變量的通知具有即時性。如果發送通知的時候沒有 goroutine 為此等待,該通知就會被直接丟棄。
  • Signal() 和 Broadcast() 需要在非鎖定的情況下調用,因為 Wait() 的調用方處於阻塞狀態,可能錯過通知。
  • Wait() 的調用需要基於鎖定狀態。

sync.Cond.Wait()

func (c *Cond) Wait() {
    // 檢查是否發生復制
	c.checker.check()
	// 將當前 gorouitne 加入當前條件變量的通知隊列
	t := runtime_notifyListAdd(&c.notify)
	c.L.Unlock()
	// 阻塞當前的 goroutine,直至收到通知
	runtime_notifyListWait(&c.notify, t)
	// 收到通知后,加鎖,進入臨界區
	c.L.Lock()
}

為什么要由調用方先加鎖,再由Wait()解鎖?
調用方在對共享資源的條件進行判斷時,保證共享資源的狀態不被修改,因此進行加鎖。
而當共享資源不滿足當前goroutine的條件時,需要讓出共享資源的執行權,以便其他 goroutine 對其進行修改,因此進行解鎖。

為什么使用for循環多次多次檢查共享資源條件?

  1. 如果存在多個 goroutine 同時等待通知,最終只有一個 goroutine 可以成功獲得執行權限。那么其他的 goroutine 應該在檢查不滿足執行條件后繼續等待。
  2. 共享資源存在多種狀態,狀態改變通知是基於鎖的,無法實現更細膩的判斷。這時需要每個 goroutine 對自己所需的狀態反復檢查。
  3. 即使共享資源的狀態只有兩個,並且每種狀態都只有一個 goroutine 在關注,如上文展示,也應當使用 for 循環。因為一個 gorouinte 即使沒有收到條件通知,也可能被喚醒。這是多核 CPU 計算機硬件層面的調度機制。

sync.Cond 的應用場景

  1. 條件變量適合保護那些可執行兩個對立操作的共享資源。比如,一個既可讀又可寫的共享文件。又比如,既有生產者又有消費者的產品池。
  2. 對於有着對立操作的共享資源(比如一個共享文件),我們通常需要基於同一個讀寫鎖的兩個條件變量(比如 rcond 和 wcond)分別保護讀操作和寫操作(比如 rcond 保護讀,wcond 保護寫)。讀操作在操作完成后要向 wcond 發通知;寫操作在操作完成后要向 rcond 發通知。
// 針對讀寫操作的控制只在初始化時有所變化
var lock sync.RWMutex
var sendCond, recvCond *sync.Cond

func init() {
    sendCond = sync.NewCond(&lock)
    recvCond = sync.NewCond(&lock.RLocker())
}

atomic operation(原子操作)

互斥鎖可以保證臨界區中代碼的串行執行,但卻不能保證這些代碼執行的原子性(atomicity)。
只有原子操作才能保證代碼片段的原子性,原子操作是一個單一的匯編指令代表,來自芯片級別的支持。
針對同一共享資源的原子操作不能同時進行,針對不同共享資源的原子操作可以同時進行。
由於原子操作是無法終端的,因此原子操作的代碼必須是非惡意的(不會獨占CPU)、正確的、不會意外掛起機器。過於復雜的操作可能會帶來安全隱患),因此內核只提供針對二進制位和整數的原子操作。
sync/atomic 提供了以下操作:

  • 加法(add)
  • 比較並交換(compare and swap,簡稱 CAS)
  • 加載(load)
  • 存儲(store)
  • 交換(swap)

支持的數據類型有:

  • int32
  • int64
  • uint32
  • uint64
  • uintptr
  • unsafe.Pointer

原子操作可以創建一個同步標志,用於向程序里多個 goroutine 通知某個特殊狀態。
CAS 包含2步操作,但 Load、Store 這類操作只有一步,不具原子性嗎?
即使像 a = 1 這種簡單的賦值操作也並不一定能夠一次完成。如果右邊的值的存儲寬度超出了計算機的字寬,那么實際的步驟就會多於一個(或者說底層指令多於一個)。比如,你計算機是32位的,但是你要把一個Int64類型的數賦給變量a,那么底層指令就肯定多於一個。在這種情況下,多個底層指令的執行期間是可以被打斷的,也就是說CPU在這時可以被切換到別的任務上。如果新任務恰巧要讀寫這個變量a,那么就會出現值不完整的問題。況且,就算是 a = 1,操作系統和CPU也都不保證這個操作一定不會被打斷。只要被打斷,就很有可能出現並發訪問上的問題,並發安全性也就被破壞了。
所以,當有多個goroutine在並發的讀寫同一變量時,它們之間就可能會造成干擾。這種操作不是原子性,並發安全性也無法得到保障。

原子操作的實現原理

對於單核處理器單核系統,只需保證指令序列不被打斷即可實現原子操作。對於簡單的原子操作,CPU提供了單條指令INC、XCHG等。對於復雜的原子操作,需要自旋鎖spinlock保證指令序列執行不被中斷。
對於多核處理器或多核系統,除了保證指令不被中斷,還需要保證這一指令序列不會受到同處理器上其他核或其他處理器的影響。這需要硬件級別的支持。
在x86架構中,提供了指令前綴LOCK。LOCK保證了指令不會受其他處理器或cpu核的影響。CPU芯片上有一條引線#HLOCK pin,如果匯編語言的程序中在一條指令前面加上前綴"LOCK",經過匯編以后的機器代碼就使CPU在執行這條指令的時候把#HLOCK pin的電位拉低,持續到這條指令結束時放開,從而把總線鎖住,這樣同一總線上別的CPU就暫時不能通過總線訪問內存,從而保證這條指令在多處理器環境中的原子性。
在比較早期的cpu型號中(intel486,pentium系列),多核處理器下阻止其他核對某塊內存區域的修改,是通過鎖住總線來實現的。
現在,大多數的x86處理器都支持通過cache coherency機制來實現原子操作功能,保證了多處理器多核系統下的原子操作的正確性,不需要鎖住總線。在MIPS和ARM架構下,還支持原子操作的LL/SC指令實現。
cache coherency機制利用了MESI緩存協議的狀態。每個cache line存在四種狀態:

  • Modified代表該cache line為該cpu核獨有,且尚未寫回到內存
  • Exclusive代表該cache line為該cpu核獨有,且與內存一致。
  • Shared代表該cache line為多核共享,且與內存一致。
  • Invalid代表緩存失效。

處理器系統中多個處理器之間通過快速通道直接通信。
當運行在某個cpu核的線程准備讀取某個cache line的內容時:

  • 如果狀態處於M,E,S,直接讀取即可。
  • 如果狀態處於I,則需要向其他cpu核廣播讀消息,在接受到其他cpu核的讀響應后,更新cache line,並將狀態設置為S。

當線程准備寫入某個cache line時:

  • 如果處於M狀態,直接寫入。
  • 如果處於E狀態,寫入並將cache line狀態改為M。
  • 如果處於S,則需要向其他cpu核廣播使無效消息,並進入E狀態,寫入修改,后進入M狀態。
  • 如果處於I,則需要向其他cpu核廣播讀消息核使無效消息,在收集到讀響應后,更新cache line。在收集到使無效響應后,進入E狀態,寫入修改,后進入M狀態。

因此,只要保持cache line的M和E狀態,此時就可以阻止其他cpu核對該塊內存的修改,而不用鎖住整個總線。

uint 類型的減法原子操作
// 法一
var num uint32
num = 100
delta := int32(-3)
atomic.AddUint32(&num, uint32(delta))
fmt.Println(num) // 97

// 法二
var num uint32
num = 100
delta := -3
atomic.AddUint32(&num, ^uint32(-delta-1))
fmt.Println(num) // 97

自旋鎖(Spinlock)

自旋鎖(spinlock)是指當一個線程在獲取鎖的時候,如果鎖已經被其它線程獲取,那么該線程將循環等待,然后不斷的判斷鎖是否能夠被成功獲取,直到獲取到鎖才會退出循環。
獲取鎖的線程一直處於活躍狀態,但是並沒有執行任何有效的任務,使用這種鎖會造成busy-waiting。
自旋鎖利用了 CPU 層面的指令,因此性能比互斥鎖高很多。適合簡單對象的操作以及沖突較少的場景。

var num int32 = 10
for {
 if atomic.CompareAndSwapInt32(&num, 10, 0) {
  fmt.Println("The second number has gone to zero.")
  break
 }
 time.Sleep(time.Millisecond * 500)
}

這在效果上與互斥鎖有些類似。我們在使用互斥鎖的時候,總是假設共享資源的狀態會被其他的 goroutine 頻繁地改變。而for語句加 CAS 操作的假設往往是:共享資源狀態的改變並不頻繁,或者,它的狀態總會變成期望的那樣。這是一種更加樂觀,或者說更加寬松的做法。

Tips
  • 當真正使用了一個 atomic.Value 變量(第一次賦值)后,就不應該再進行復制操作了。
  • 不能存儲 nil 值。不過對於接口類型的變量,它的動態值是 nil,動態類型不是 nil,它就不是 nil。
  • 對於一個原子變量,向它存儲的第一個值決定了它的可存儲類型。即使是同一接口的不同類型,也是禁止更換的。對於暴露給外部的存儲函數,應當先判斷其存儲值的合法性。
  • 存儲引用類型時,注意不要把指針暴露給外部。

sync.Pool

sync.Pool 是一個臨時對象池。初次使用后禁止復制。它存儲的對象應該滿足以下特征:

  • 不需要持久使用,對程序來說可有可無,對象的創建和銷毀不會影響程序功能。因為 Go 語言的 GC 每次執行時都會將臨時對象池清空。
  • 池子中的每一個對象都可以相互替代。

因此,sync.Pool 很適合作為緩存池。
GC 是如何清理臨時對象池的?
sync 初始化時,向運行時系統注冊一個函數,這個函數用於清除所有已創建的臨時對象池中的值。這個函數在每次 GC 運行時被調用。sync 包中有一個全局變量 allPools 負責保存使用中的池列表,供池清理函數使用。

Pool 的內部實現

type Pool struct {
	noCopy noCopy

	local     unsafe.Pointer // per-P pool, 實際類型是 [P]poolLocal
	localSize uintptr        // size of the local array

	victim     unsafe.Pointer // local from previous cycle
	victimSize uintptr        // size of victims array

	// 創建一個臨時對象
	New func() interface{}
}

// Local per-P Pool
type poolLocalInternal struct {
	private interface{} // 只能由當前 P 使用
	shared  poolChain   // 雙向隊列,Local P can pushHead/popHead; any P can popTail.
}

Pool 提供了 Put 和 Get 方法用於存取臨時對象。存取臨時對象時,優先操作private,其次是 poolLocal 的共享臨時對象列表 shared (先訪問 goroutine 關聯的 P 對應的 poolLocal,再訪問非關聯的 poolLocal )。當 Get 無法找到可用的臨時對象,就會調用 New 創建以一個新的臨時對象。

sync.Map

sync.Map 是一個並發安全的字典。

// 可自定義鍵類型和值類型的並發安全字典

type ConcurrentMap struct {
	m         sync.Map
	keyType   reflect.Type
	valueType reflect.Type
}

func NewConcurrentMap(keyType, valueType reflect.Type) (*ConcurrentMap, error) {
	if keyType == nil {
		return nil, errors.New("nil key type")
	}
	if !keyType.Comparable() {
		return nil, fmt.Errorf("incomparable key type: %s", keyType)
	}
	if valueType == nil {
		return nil, errors.New("nil value type")
	}
	cMap := &ConcurrentMap{
		keyType:   keyType,
		valueType: valueType,
	}
	return cMap, nil
}

func (cMap *ConcurrentMap) Delete(key interface{}) {
	if reflect.TypeOf(key) != cMap.keyType {
		return
	}
	cMap.m.Delete(key)
}

func (cMap *ConcurrentMap) Load(key interface{}) (value interface{}, ok bool) {
	if reflect.TypeOf(key) != cMap.keyType {
		return
	}
	return cMap.m.Load(key)
}

func (cMap *ConcurrentMap) LoadOrStore(key, value interface{}) (actual interface{}, loaded bool) {
	if reflect.TypeOf(key) != cMap.keyType {
		panic(fmt.Errorf("wrong key type: %v", reflect.TypeOf(key)))
	}
	if reflect.TypeOf(value) != cMap.valueType {
		panic(fmt.Errorf("wrong value type: %v", reflect.TypeOf(value)))
	}
	actual, loaded = cMap.m.LoadOrStore(key, value)
	return
}

func (cMap *ConcurrentMap) Range(f func(key, value interface{}) bool) {
	cMap.m.Range(f)
}

func (cMap *ConcurrentMap) Store(key, value interface{}) {
	if reflect.TypeOf(key) != cMap.keyType {
		panic(fmt.Errorf("wrong key type: %v", reflect.TypeOf(key)))
	}
	if reflect.TypeOf(value) != cMap.valueType {
		panic(fmt.Errorf("wrong value type: %v", reflect.TypeOf(value)))
	}
	cMap.m.Store(key, value)
}

sync.Map 的內部實現

type Map struct {
	mu Mutex

	// read contains the portion of the map's contents that are safe for
	// concurrent access (with or without mu held).
	//
	// The read field itself is always safe to load, but must only be stored with
	// mu held.
	//
	// Entries stored in read may be updated concurrently without mu, but updating
	// a previously-expunged entry requires that the entry be copied to the dirty
	// map and unexpunged with mu held.
	read atomic.Value // readOnly

	// dirty contains the portion of the map's contents that require mu to be
	// held. To ensure that the dirty map can be promoted to the read map quickly,
	// it also includes all of the non-expunged entries in the read map.
	//
	// Expunged entries are not stored in the dirty map. An expunged entry in the
	// clean map must be unexpunged and added to the dirty map before a new value
	// can be stored to it.
	//
	// If the dirty map is nil, the next write to the map will initialize it by
	// making a shallow copy of the clean map, omitting stale entries.
	dirty map[interface{}]*entry

	// misses counts the number of loads since the read map was last updated that
	// needed to lock mu to determine whether the key was present.
	//
	// Once enough misses have occurred to cover the cost of copying the dirty
	// map, the dirty map will be promoted to the read map (in the unamended
	// state) and the next store to the map will make a new dirty copy.
	misses int
}

Map.read 相當於字典的快照,支持更新和查詢操作,原子操作,不需要持有鎖。Map.dirty 是原生字典,支持增刪改查操作,所有操作需要持有鎖 mu
Map.readMap.dirty 中存儲的鍵值都是指針,而不是基本值。
查找鍵值對時,首先去 read 字典查找,如果沒找到,再加鎖去 dirty 字典查找。
存儲鍵值對時,如果 read 字典中存在這個鍵,就直接更新。如果這個鍵被標記為“已刪除”,則保存到 dirty 字典,清除“已刪除”的標記。
刪除鍵值時,如果只讀字典中不存在該鍵值對,就直接在 dirty 字典中進行刪除。如果只讀字典中存在該鍵值對,還要對其進行邏輯刪除(標記為“已刪除”)。
在臟字典中查找鍵值對次數足夠多的時候,sync.Map 會把臟字典直接作為只讀字典,保存在它的 read 字段中,然后把代表臟字典的 dirty 字段的值置為 nil。在這之后,一旦再有新的鍵值對存入,它就會依據只讀字典去重建臟字典。這個時候,它會把只讀字典中已被邏輯刪除的鍵值對過濾掉。
總的來說,只讀字典可能只包含部分鍵值對(含邏輯刪除鍵值對),而臟字典中始終包含全量的鍵值對(不含邏輯刪除鍵值對)。
sync.Map 適用於讀多寫少的情況,如果寫數據比較頻繁可以參考:https://github.com/orcaman/concurrent-map

channel

作為 Go 語言的數據類型,channel提供了在多個 goroutine 間發送和接收共享資源的方法。channel 來自於一種叫做 通信順序進程(Communicating Sequential Processes,CSP) 的模型。
channel 從設計上規避數據競爭,將 goroutine 區分為生產者和消費者。但它本身不能保證多個 goroutine 並發讀寫的數據安全。如果 channel 中傳遞的是數據的引用,並且這份數據的讀、寫由多個 goroutine 完成,仍然需要額外的處理來保證數據安全。
channel 可能是開發者最熟悉的一個同步工具,它可以實現一些高級功能。以下提供一個簡單的范例:

ch := make(chan string)
go func(){
    // process task
    defer close(ch)
    ch <- result
}
res, open := <-ch
fmt.Println(res, open)

網上關於 channel 的優秀文章很多,這篇文章 golang channel 使用總結 介紹了 channel 的實現原理和常見使用場景,給出了優雅關閉 channel 的幾個范例。

Tips
  • 對非緩沖通道的讀寫操作會造成阻塞,對有緩沖且緩沖已滿的通道的讀寫操作會造成阻塞。
  • 對已關閉通道發送數據將引發 panic,從已關閉通道讀取數據永遠會成功,如果通道內沒有元素,將返回類型的初始值。
  • 對一個值為 nil 的通道的讀寫操作會造成阻塞。
  • 關閉一個值為 nil 的通道會引發 panic。
  • 重復關閉通道會引發 panic。
  • 延遲關閉:關閉通道后,通道狀態不會立刻更改為false,直至通道為空。

goroutine的協作工具

sync.WaitGroup

用於同步 goroutine 的協作流程。它可以使一個 goroutine 在其他協程完成后再繼續執行后續任務。
開始使用后禁止復制。

var wg sync.WaitGroup
func main() {
    wg.Add(3)
    for i := 0; i < 3; i++ {
        go doSomething()
    }
    wg.Wait()
}
func doSomething() {
    defer wg.Done()
}
Tips
  • 禁止同時調用 WaitGroup 的 Add() 和 Wait(),即杜絕並發執行用 WaitGroup 的方法。原因是在 Wait() 執行時更改其計數器的值會引發 panic。

sync.Once

執行首次被調用時的入參函數,並且只執行一次。

Once() 中的fail-fast機制

func (o *Once) Do(f func()) {
	// Note: Here is an incorrect implementation of Do:
	//
	//	if atomic.CompareAndSwapUint32(&o.done, 0, 1) {
	//		f()
	//	}
	//
	// Do guarantees that when it returns, f has finished.
	// This implementation would not implement that guarantee:
	// given two simultaneous calls, the winner of the cas would
	// call f, and the second would return immediately, without
	// waiting for the first's call to f to complete.
	// This is why the slow path falls back to a mutex, and why
	// the atomic.StoreUint32 must be delayed until after f returns.

	if atomic.LoadUint32(&o.done) == 0 {
		// Outlined slow-path to allow inlining of the fast-path.
		o.doSlow(f)
	}
}

func (o *Once) doSlow(f func()) {
	o.m.Lock()
	defer o.m.Unlock()
	if o.done == 0 {
		defer atomic.StoreUint32(&o.done, 1)
		f()
	}
}

由於 Once.Do() 保證在返回前 f() 已經執行完成,如果存在多個 goroutine 並發調用 Do(),會導致除了獲勝者,其余 goroutine 都被阻塞在 o.m.Lock() 上。如果 f() 阻塞,可能會導致死鎖。
Once.Do() 不保證 f() 執行成功。

context.Context

func coordinateWithContext() {
    cxt, cancelFunc := context.WithCancel(context.Background())
    // 啟動 3 個具有相關任務的協程
    // 如果有一個協程出現問題,取消其他協程
    for i := 1; i < 3; i++ {
        go func() {
            r, e := fn(ctx)
            if e != nil {
                cancelFunc()
            }
        }
    }
    time.Sleep(10 * time.Second)
    fmt.Println("End.")
}

func fn(ctx context.Context) string, error {
    resp := make(chan string)
    err := make(chan error)
    go func(){
        responseString, e := doSomething()
        if e != nil {
            err <- e
        } else {
            resp <- responseString
        }
    }()
    select {
        case <- ctx.Done():
            return "", ctx.Err()
        case r:= <- resp
            return r, nil
        case e := <- err
            return "", e
    }
}

Context.Done() 返回一個 <-chan struct{} 類型的值,這是一個接收通道。調用 cancelFunc() 時,該通道會關閉,阻塞的接收操作會立刻返回。
Context 類型值的撤銷操作會聯動它的子值。

Context 類型還提供了 WithDeadline()WithTimeout() 方法,生成擁有生命周期的 Context 類型。
此外,Context.WithValue() 可以提供協程間的數據傳輸功能。在 Context 中查詢數據時,先在當前 Context 中查找,如果沒找到,再去父值中查找。不過 Context 不提供數據更新的方法,只能通過 在子值中覆蓋同名數據、或撤銷 Context 丟棄數據 間接實現。

推薦閱讀

以上出現過的推文在此再做匯總:

參考文章:

更新記錄

2020.07.25 初版。
2020.07.28 修改了疏漏,補充了細節。添加了小節:摘要。添加了小節:goroutine同步方式:channel。添加小節:推薦閱讀。
2020.08.28 修改了原子操作小節,添加了原子操作的實現原理。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM