更簡的並發代碼,更強的並發控制


有沒感覺 Gosync 包不夠用?有沒遇到類型沒有 sync/atomic 支持?

我們一起看看 go-zerosyncx 包對標准庫的一些增值補充。

https://github.com/tal-tech/go-zero/tree/master/core/syncx

name 作用
AtomicBool bool類型 原子類
AtomicDuration Duration有關 原子類
AtomicFloat64 float64類型 原子類
Barrier 欄柵【將加鎖解鎖包裝】
Cond 條件變量
DoneChan 優雅通知關閉
ImmutableResource 創建后不會修改的資源
Limit 控制請求數
LockedCalls 確保方法的串行調用
ManagedResource 資源管理
Once 提供 once func
OnceGuard 一次性使用的資源管理
Pool pool,簡單的池
RefResource 引用計數的資源
ResourceManager 資源管理器
SharedCalls 類似 singflight 的功能
SpinLock 自旋鎖:自旋+CAS
TimeoutLimit Limit + timeout 控制

下面開始對以上庫組件做分別介紹。

atomic

因為沒有 泛型 支持,所以才會出現多種類型的原子類支持。以下采用 float64 作為例子:

func (f *AtomicFloat64) Add(val float64) float64 {
	for {
		old := f.Load()
		nv := old + val
		if f.CompareAndSwap(old, nv) {
			return nv
		}
	}
}

func (f *AtomicFloat64) CompareAndSwap(old, val float64) bool {
	return atomic.CompareAndSwapUint64((*uint64)(f), math.Float64bits(old), math.Float64bits(val))
}

func (f *AtomicFloat64) Load() float64 {
	return math.Float64frombits(atomic.LoadUint64((*uint64)(f)))
}

func (f *AtomicFloat64) Set(val float64) {
	atomic.StoreUint64((*uint64)(f), math.Float64bits(val))
}
  • Add(val):如果 CAS 失敗,不斷for循環重試,獲取 old val,並set old+val;

  • CompareAndSwap(old, new):調用底層 atomicCAS

  • Load():調用 atomic.LoadUint64 ,然后轉換

  • Set(val):調用 atomic.StoreUint64

至於其他類型,開發者想自己擴展自己想要的類型,可以依照上述,基本上調用原始 atomic 操作,然后轉換為需要的類型,比如:遇到 bool 可以借助 0, 1 來分辨對應的 false, true

Barrier

這里 Barrier 只是將業務函數操作封裝,作為閉包傳入,內部將 lock 操作的加鎖解鎖自行解決了【防止開發者加鎖了忘記解鎖】

func (b *Barrier) Guard(fn func()) {
	b.lock.Lock()
	defer b.lock.Unlock()
  // 自己的業務邏輯
	fn()
}

Cond/Limit/TimeoutLimit

這個數據結構和 Limit 一起組成了 TimeoutLimit ,這里將這3個一起講:

func NewTimeoutLimit(n int) TimeoutLimit {
	return TimeoutLimit{
		limit: NewLimit(n),
		cond:  NewCond(),
	}
}

func NewLimit(n int) Limit {
	return Limit{
		pool: make(chan lang.PlaceholderType, n),
	}
}
  • limit 這里是有緩沖的 channel
  • cond 是無緩沖的;

所以這里結合名字來理解:因為 Limit 是限制某一種資源的使用,所以需要預先在資源池中放入預置數量的資源;Cond 類似閥門,需要兩邊都准備好,才能進行數據交換,所以使用無緩沖,同步控制。

這里我們看看 stores/mongo 中關於 session 的管理,來理解 資源控制:

func (cs *concurrentSession) takeSession(opts ...Option) (*mgo.Session, error) {
  // 選項參數注入
	...
  // 看 limit 中是否還能取出資源
	if err := cs.limit.Borrow(o.timeout); err != nil {
		return nil, err
	} else {
		return cs.Copy(), nil
	}
}

func (l TimeoutLimit) Borrow(timeout time.Duration) error {
  // 1. 如果還有 limit 中還有資源,取出一個,返回
	if l.TryBorrow() {
		return nil
	}
	// 2. 如果 limit 中資源已經用完了
	var ok bool
	for {
    // 只有 cond 可以取出一個【無緩存,也只有 cond <- 此條才能通過】
		timeout, ok = l.cond.WaitWithTimeout(timeout)
    // 嘗試取出一個【上面 cond 通過時,就有一個資源返回了】
    // 看 `Return()`
		if ok && l.TryBorrow() {
			return nil
		}
		// 超時控制
		if timeout <= 0 {
			return ErrTimeout
		}
	}
}

func (l TimeoutLimit) Return() error {
  // 返回去一個資源
	if err := l.limit.Return(); err != nil {
		return err
	}
	// 同步通知另一個需要資源的協程【實現了閥門,兩方交換】
	l.cond.Signal()
	return nil
}

資源管理

同文件夾中還有 ResourceManager,從名字上類似,這里將兩個組件放在一起講解。

先從結構上:

type ManagedResource struct {
  // 資源
	resource interface{}
	lock     sync.RWMutex
  // 生成資源的邏輯,由開發者自己控制
	generate func() interface{}
  // 對比資源
	equals   func(a, b interface{}) bool
}

type ResourceManager struct {
  // 資源:這里看得出來是 I/O,
	resources   map[string]io.Closer
	sharedCalls SharedCalls
  // 對資源map互斥訪問
	lock        sync.RWMutex
}

然后來看獲取資源的方法簽名:

func (manager *ResourceManager) GetResource(key, create func() (io.Closer, error)) (io.Closer, error)

// 獲取一個資源(有就直接獲取,沒有生成一個)
func (mr *ManagedResource) Take() interface{}
// 判斷這個資源是否不符合傳入的判斷要求,不符合則重置
func (mr *ManagedResource) MarkBroken(resource interface{})
  1. ResourceManager 使用 SharedCalls 做防重復請求,並將資源緩存在內部的 sourMap;另外傳入的 create funcIO 操作有關,常見用在網絡資源的緩存;

  2. ManagedResource 緩存資源沒有 map 而是單一的 interface ,說明只有一份,但是它提供了 Take() 和傳入 generate()說明可以讓開發者自行更新 resource

所以在用途上:

  • ResourceManager:用在網絡資源的管理。如:數據庫連接管理;
  • ManagedResource:用在一些變化資源,可以做資源前后對比,達到更新資源。如:token 管理和驗證

RefResource

這個就和 GC 中引用計數類似:

  • Use() -> ref++
  • Clean() -> ref--; if ref == 0 -> ref clean
func (r *RefResource) Use() error {
  // 互斥訪問
	r.lock.Lock()
	defer r.lock.Unlock()
	// 清除標記
	if r.cleaned {
		return ErrUseOfCleaned
	}
	// 引用 +1
	r.ref++
	return nil
}

SharedCalls

一句話形容:使用SharedCalls可以使得同時多個請求只需要發起一次拿結果的調用,其他請求"坐享其成",這種設計有效減少了資源服務的並發壓力,可以有效防止緩存擊穿

這個組件被反復應用在其他組件中,上面說的 ResourceManager

類似當需要高頻並發訪問一個資源時,就可以使用 SharedCalls 緩存。

// 當多個請求同時使用Do方法請求資源時
func (g *sharedGroup) Do(key string, fn func() (interface{}, error)) (interface{}, error) {
  // 先申請加鎖
  g.lock.Lock()

  // 根據key,獲取對應的call結果,並用變量c保存
  if c, ok := g.calls[key]; ok {
    // 拿到call以后,釋放鎖,此處call可能還沒有實際數據,只是一個空的內存占位
    g.lock.Unlock()
    // 調用wg.Wait,判斷是否有其他goroutine正在申請資源,如果阻塞,說明有其他goroutine正在獲取資源
    c.wg.Wait()
    // 當wg.Wait不再阻塞,表示資源獲取已經結束,可以直接返回結果
    return c.val, c.err
  }

  // 沒有拿到結果,則調用makeCall方法去獲取資源,注意此處仍然是鎖住的,可以保證只有一個goroutine可以調用makecall
  c := g.makeCall(key, fn)
  // 返回調用結果
  return c.val, c.err
}

總結

不重復造輪子,一直是 go-zero 設計主旨之一;也同時將平時業務沉淀到組件中,這才是框架和組件的意義。

關於 go-zero 更多的設計和實現文章,可以持續關注我們。歡迎大家去關注和使用。

項目地址

https://github.com/tal-tech/go-zero

歡迎使用 go-zero 並 star 支持我們!

微信交流群

關注『微服務實踐』公眾號並回復 進群 獲取社區群二維碼。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM