go中x/sync/semaphore解讀


semaphore

semaphore的作用

信號量是在並發編程中比較常見的一種同步機制,它會保證持有的計數器在0到初始化的權重之間,每次獲取資源時都會將信號量中的計數器減去對應的數值,在釋放時重新加回來,當遇到計數器大於信號量大小時就會進入休眠等待其他進程釋放信號。

go中的semaphore,提供sleepwakeup原語,使其能夠在其它同步原語中的競爭情況下使用。當一個goroutine需要休眠時,將其進行集中存放,當需要wakeup時,再將其取出,重新放入調度器中。

go中本身提供了semaphore的相關方法,不過只能在內部調用

// go/src/sync/runtime.go
func runtime_Semacquire(s *uint32)

func runtime_SemacquireMutex(s *uint32, lifo bool, skipframes int)

func runtime_Semrelease(s *uint32, handoff bool, skipframes int)

擴展包golang.org/x/sync/semaphore提供了一種帶權重的信號量實現方式,我們可以按照不同的權重對資源的訪問進行管理。

如何使用

通過信號量來限制並行的goroutine數量,達到最大的maxWorkers數量,Acquire將會阻塞,直到其中一個goroutine執行完成,釋放出信號量。

// Example_workerPool演示如何使用信號量來限制
// 用於並行任務的goroutine。
func main() {
	ctx := context.Background()

	var (
		maxWorkers = runtime.GOMAXPROCS(0)
		sem        = semaphore.NewWeighted(int64(maxWorkers))
		out        = make([]int, 32)
	)

	// Compute the output using up to maxWorkers goroutines at a time.
	for i := range out {
		// When maxWorkers goroutines are in flight, Acquire blocks until one of the
		// workers finishes.
		if err := sem.Acquire(ctx, 1); err != nil {
			log.Printf("Failed to acquire semaphore: %v", err)
			break
		}

		go func(i int) {
			defer sem.Release(1)
			// doSomething
			out[i] = i + 1
		}(i)
	}

	// Acquire all of the tokens to wait for any remaining workers to finish.
	//
	// If you are already waiting for the workers by some other means (such as an
	// errgroup.Group), you can omit this final Acquire call.
	if err := sem.Acquire(ctx, int64(maxWorkers)); err != nil {
		log.Printf("Failed to acquire semaphore: %v", err)
	}

	fmt.Println(out)
}

分析下原理

type waiter struct {
	// 信號量的權重
	n     int64
	// 獲得信號量后關閉
	ready chan<- struct{}
}

// NewWeighted使用給定的值創建一個新的加權信號量
// 並發訪問的最大組合權重。
func NewWeighted(n int64) *Weighted {
	w := &Weighted{size: n}
	return w
}

// 加權提供了一種方法來綁定對資源的並發訪問。
// 呼叫者可以請求以給定的權重進行訪問。
type Weighted struct {
	// 表示最大資源數量,取走時會減少,釋放時會增加
	size    int64
	// 計數器,記錄當前已使用資源數,值范圍[0 - size]
	cur     int64
	mu      sync.Mutex
	// 等待隊列,表示申請資源時由於可使用資源不夠而陷入阻塞等待的調用者列表
	waiters list.List

Acquire

阻塞的獲取指定權種的資源,如果沒有空閑的資源,會進去休眠等待。

// Acquire獲取權重為n的信號量,阻塞直到資源可用或ctx完成。
// 成功時,返回nil。失敗時返回 ctx.Err()並保持信號量不變。
// 如果ctx已經完成,則Acquire仍然可以成功執行而不會阻塞
func (s *Weighted) Acquire(ctx context.Context, n int64) error {
	s.mu.Lock()
	// 如果資源足夠,並且沒有排隊等待的waiters
	// cur+n,直接返回
	if s.size-s.cur >= n && s.waiters.Len() == 0 {
		s.cur += n
		s.mu.Unlock()
		return nil
	}
	// 資源不夠,err返回
	if n > s.size {
		// 不要其他的Acquire,阻塞在此
		s.mu.Unlock()
		<-ctx.Done()
		return ctx.Err()
	}

	ready := make(chan struct{})
	// 組裝waiter
	w := waiter{n: n, ready: ready}
	// 插入waiters中
	elem := s.waiters.PushBack(w)
	s.mu.Unlock()

	// 阻塞等待,直到資源可用或ctx完成
	select {
	case <-ctx.Done():
		err := ctx.Err()
		s.mu.Lock()
		select {
		case <-ready:
			// 在canceled之后獲取了信號量,不要試圖去修復隊列,假裝沒看到取消
			err = nil
		default:
			s.waiters.Remove(elem)
		}
		s.mu.Unlock()
		return err
		// 等待者被喚醒了
	case <-ready:
		return nil
	}
}

梳理下流程:

1、如果資源夠用並且沒有等待隊列,添加已經使用的資源數;

2、如果超過資源數,拋出err;

3、資源夠用,並且等待隊列,將之后的加入到等待隊列中;

4、阻塞直到資源可用或ctx完成。

TryAcquire

非阻塞地獲取指定權重的資源,如果當前沒有空閑資源,會直接返回false

// TryAcquire獲取權重為n的信號量而不阻塞。
// 成功時返回true。 失敗時,返回false並保持信號量不變。
func (s *Weighted) TryAcquire(n int64) bool {
	s.mu.Lock()
	success := s.size-s.cur >= n && s.waiters.Len() == 0
	if success {
		s.cur += n
	}
	s.mu.Unlock()
	return success
}

TryAcquire獲取權重為n的信號量而不阻塞,相比Acquire少了等待隊列的處理。

Release

用於釋放指定權重的資源,如果有waiters則嘗試去一一喚醒waiter

// Release釋放權值為n的信號量。
func (s *Weighted) Release(n int64) {
	s.mu.Lock()
	s.cur -= n
	// cur的范圍在[0 - size]
	if s.cur < 0 {
		s.mu.Unlock()
		panic("semaphore: bad release")
	}
	s.notifyWaiters()
	s.mu.Unlock()
}

func (s *Weighted) notifyWaiters() {
	// 如果有阻塞的waiters,嘗試去進行一一喚醒 
	// 喚醒的時候,先進先出,避免被資源比較大的waiter被餓死
	for {
		next := s.waiters.Front()
		// 已經沒有waiter了
		if next == nil {
			break
		}

		w := next.Value.(waiter)
		// waiter需要的資源不足
		if s.size-s.cur < w.n {
			// 沒有足夠的令牌供下一個waiter使用。我們可以繼續(嘗試
			// 查找請求較小的waiter),但在負載下可能會導致
			// 飢餓的大型請求;相反,我們留下所有剩余的waiter阻塞
			//
			// 考慮一個用作讀寫鎖的信號量,帶有N個令牌,N個reader和一位writer
			// 每個reader都可以通過Acquire(1)獲取讀鎖。
			// writer寫入可以通過Acquire(N)獲得寫鎖定,但不包括所有的reader。
			// 如果我們允許讀者在隊列中前進,writer將會餓死-總是有一個令牌可供每個讀者。
			break
		}

		s.cur += w.n
		s.waiters.Remove(next)
		close(w.ready)
	}
}

對於waiters的喚醒,遵循的原則總是先進先出。當有10個資源可以被使用,第一個waiter需要100個資源,第二個waiter需要1個資源。不會讓第二個先釋放,必須等待第一個waiter被釋放。這樣避免需要資源比較大waiter的被餓死,因為這樣需要資源數比較小的waiter,總是可以被最先釋放,需要資源比較大的waiter,就沒有獲取資源的機會了。

總結

AcquireTryAcquire都可用於獲取資源,Acquire是可以阻塞的獲取資源,TryAcquire只能非阻塞的獲取資源;

Release對於waiters的喚醒原則,總是先進先出,避免資源需求比較大的waiter被餓死;

參考

【Golang並發同步原語之-信號量Semaphor】https://blog.haohtml.com/archives/25563
【Go並發編程實戰--信號量的使用方法和其實現原理】https://juejin.cn/post/6906677772479889422

本文作者:liz
本文鏈接https://boilingfrog.github.io/2021/04/01/x-sync.semaphore/
版權聲明:本文為博主原創文章,遵循 CC 4.0 BY-SA 版權協議,轉載請附上原文出處鏈接和本聲明。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM