Sentinel-Go 源碼系列(三)滑動時間窗口算法的工程實現


要說現在工程師最重要的能力,我覺得工程能力要排第一。

就算現在大廠面試經常要手撕算法,也是更偏向考查代碼工程實現的能力,之前在群里看到這樣的圖片,就覺得很離譜。

image

算法與工程實現

在 Sentinel-Go 中,一個很核心的算法是流控(限流)算法。

流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什么流控算法難寫?以我的感覺是算法和工程實現上存在一定差異,雖然算法好理解,但卻沒法照着實現。

舉個例子,令牌桶算法很好理解,只需給定一個桶,以恆定的速率往桶內放令牌,滿了則丟棄,執行任務前先去桶里拿令牌,只有拿到令牌才可以執行,否則拒絕。

如果實現令牌桶,按道理應該用一個單獨線程(或進程)往桶里放令牌,業務線程去桶里取,但真要這么實現,怎么保證這個單獨線程能穩定執行,萬一掛了豈不是很危險?

所以工程實現上和算法原本肯定存在一定的差異,這也是為什么需要深入源碼的一個原因。

滑動時間窗口的演進

通常來說,流控的度量是按每秒的請求數,也就是 QPS

QPS:query per second,指每秒查詢數,當然他的意義已經泛化了,不再特指查詢,可以泛指所有請求。如果非要區分,TPS 指每秒事務數,即寫入數,或 RPS,每秒請求數,本文不分這么細,統計叫QPS。

當然也有按並發數來度量,並發數的流控就非常簡單

並發數流控

並發是一個瞬時概念,它跟時間沒有關系。和進程中的線程數、協程數一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。

並發數怎么定義?可以近似認為進入業務代碼開始就算一個並發,執行完這個並發就消失。

image

這樣說來,實現就非常簡單了,只需要定義一個全局變量,責任鏈開始時對這個變量原子增1,並獲取當前並發數的一個快照,判斷並發數是否超限,如果超限則直接阻斷,執行完了別忘了原子減1即可,由於太過簡單,就不需要放代碼了。

固定時間窗口

參考並發數流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?

由於 QPS 有時間的度量,第一直覺是和並發數一樣弄個變量,再起個單獨線程每隔 1s 重置這個變量。

但單獨線程始終不放心,需要稍微改一下。

如果系統有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處於哪個時間窗口,這個時間窗口單獨計數即可。

image

如果稍微思考下,你會發現問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統打垮。

image

滑動時間窗口

為了解決上面的問題,工程師想出了一個好辦法:別固定時間窗口,以當前時間往前推算窗口

image

但問題又來了,這該怎么實現呢?

滑動時間窗口工程實現

在工程實現上,可以將時間划分為細小的采樣窗口,緩存一段時間的采樣窗口,這樣每當請求來的時候,只需要往前拿一段時間的采樣窗口,然后求和就能拿到總的請求數。

image

Sentinel-Go 滑動時間窗口的實現

前方代碼高能預警~

Sentinel-Go 是基於 LeapArray 實現的滑動窗口,其數據結構如下

type LeapArray struct {
	bucketLengthInMs uint32 // bucket大小
	sampleCount      uint32 // bucket數量
	intervalInMs     uint32 // 窗口總大小
	array            *AtomicBucketWrapArray // bucket數組
	updateLock mutex // 更新鎖
}

type AtomicBucketWrapArray struct {
	base unsafe.Pointer // 數組的起始地址
	length int // 長度,不能改變
	data   []*BucketWrap // 真正bucket的數據
}

type BucketWrap struct {
	BucketStart uint64 // bucket起始時間
	Value atomic.Value // bucket數據結構,例如 MetricBucket
}

type MetricBucket struct {
	counter        [base.MetricEventTotal]int64 // 計數數組,可放不同類型
	minRt          int64 // 最小RT
	maxConcurrency int32 // 最大並發數
}

再看下是如何寫入指標的,例如當流程正常通過時

// ①
sn.AddCount(base.MetricEventPass, int64(count))

// ②
func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
	bla.addCountWithTime(util.CurrentTimeMillis(), event, count)
}

// ③
func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
	b := bla.currentBucketWithTime(now)
	if b == nil {
		return
	}
	b.Add(event, count)
}

// ④
func (mb *MetricBucket) Add(event base.MetricEvent, count int64) {
	if event >= base.MetricEventTotal || event < 0 {
		logging.Error(errors.Errorf("Unknown metric event: %v", event), "")
		return
	}
	if event == base.MetricEventRt {
		mb.AddRt(count)
		return
	}
	mb.addCount(event, count)
}

// ⑤
func (mb *MetricBucket) addCount(event base.MetricEvent, count int64) {
	atomic.AddInt64(&mb.counter[event], count)
}

取到相應的 bucket,然后寫入相應 event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。

重點看是如何取到相應的 bucket 的:

func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
	// ①根據當前時間取bucket
	curBucket, err := bla.data.currentBucketOfTime(now, bla)
	...
	b, ok := mb.(*MetricBucket)
	if !ok {
		...
		return nil
	}
	return b
}

func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
	...
	// ②計算index = (now / bucketLengthInMs) % LeapArray.array.length
	idx := la.calculateTimeIdx(now)
	// ③計算bucket開始時間 = now - (now % bucketLengthInMs)
	bucketStart := calculateStartTime(now, la.bucketLengthInMs)

	for { 
		old := la.array.get(idx)
		if old == nil { // ④未使用,直接返回
			newWrap := &BucketWrap{
				BucketStart: bucketStart,
				Value:       atomic.Value{},
			}
			newWrap.Value.Store(bg.NewEmptyBucket())
			if la.array.compareAndSet(idx, nil, newWrap) {
				return newWrap, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart == atomic.LoadUint64(&old.BucketStart) { // ⑤剛好取到是當前bucket,返回
			return old, nil
		} else if bucketStart > atomic.LoadUint64(&old.BucketStart) { // ⑥取到了舊的bucket,重置使用
			if la.updateLock.TryLock() {
				old = bg.ResetBucketTo(old, bucketStart)
				la.updateLock.Unlock()
				return old, nil
			} else {
				runtime.Gosched()
			}
		} else if bucketStart < atomic.LoadUint64(&old.BucketStart) { // ⑦取到了比當前還新的bucket,總共只有一個bucket時,並發情況可能會出現這種情況,其他情況不可能,直接報錯
			if la.sampleCount == 1 {
				return old, nil
			}
			
			return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
		}
	}
}

舉個直觀的例子,看如何拿到 bucket:

image

  • 假設 B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證線程安全,如果別別的線程先寫入,這里會執行失敗,調用 runtime.Gosched(),讓出時間片,進入下一次循環
  • 假設取出 B2 的開始時間是3400,與計算的相同,則直接使用
  • 假設取出的 B2 的開始時間小於 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證線程安全,如果拿不到鎖,也讓出時間片,進入下一次循環
  • 假設取出 B2 的開始時間大於3400,說明已經有其他線程更新了,而 bucketLengthInMs 通常遠遠大於鎖的獲取時間,所以這里只考慮只有一個 bucket 的情況直接返回,其他情況報錯

回到 QPS 計算:

qps := stat.InboundNode().GetQPS(base.MetricEventPass)

該方法會先計算一個起始時間范圍

func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
	curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
	end = curBucketStartTime
	start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
	return
}

例如當前時間為3500,則計算出

  • end = 3400
  • start = 3400 - 1200 + 200 = 2400

image

然后遍歷所有 bucket,把在這個范圍內的 bucket 都拿出來,計算 QPS,只需要相加即可。

最后

本節從滑動窗口流控算法的工程實現演進到 Sentinel-Go 里滑動窗口的實現,從 Sentinel-Go 的實現上看到,還得考慮內存的使用,並發控制等等,如果完全寫出來,還是非常不容易的。

《Sentinel-Go源碼系列》已經寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動窗口限流,后續還有對象池等,但這其實和 Sentinel-Go 關系不是很大,到時候單獨成文,就不放在本系列里了。

本文算是一個結束,與其說是結束,不如說是一個開始。


搜索關注微信公眾號"捉蟲大師",后端技術分享,架構設計、性能優化、源碼閱讀、問題排查、踩坑實踐。
image


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM