要說現在工程師最重要的能力,我覺得工程能力要排第一。
就算現在大廠面試經常要手撕算法,也是更偏向考查代碼工程實現的能力,之前在群里看到這樣的圖片,就覺得很離譜。
算法與工程實現
在 Sentinel-Go 中,一個很核心的算法是流控(限流)算法。
流控可能每個人都聽過,但真要手寫一個,還是有些困難。為什么流控算法難寫?以我的感覺是算法和工程實現上存在一定差異,雖然算法好理解,但卻沒法照着實現。
舉個例子,令牌桶算法很好理解,只需給定一個桶,以恆定的速率往桶內放令牌,滿了則丟棄,執行任務前先去桶里拿令牌,只有拿到令牌才可以執行,否則拒絕。
如果實現令牌桶,按道理應該用一個單獨線程(或進程)往桶里放令牌,業務線程去桶里取,但真要這么實現,怎么保證這個單獨線程能穩定執行,萬一掛了豈不是很危險?
所以工程實現上和算法原本肯定存在一定的差異,這也是為什么需要深入源碼的一個原因。
滑動時間窗口的演進
通常來說,流控的度量是按每秒的請求數,也就是 QPS
QPS:query per second,指每秒查詢數,當然他的意義已經泛化了,不再特指查詢,可以泛指所有請求。如果非要區分,TPS 指每秒事務數,即寫入數,或 RPS,每秒請求數,本文不分這么細,統計叫QPS。
當然也有按並發數來度量,並發數的流控就非常簡單
並發數流控
並發是一個瞬時概念,它跟時間沒有關系。和進程中的線程數、協程數一樣,每次取的時候只能拿到一個瞬間的快照,但可能很快就變化了。
並發數怎么定義?可以近似認為進入業務代碼開始就算一個並發,執行完這個並發就消失。
這樣說來,實現就非常簡單了,只需要定義一個全局變量,責任鏈開始時對這個變量原子增1,並獲取當前並發數的一個快照,判斷並發數是否超限,如果超限則直接阻斷,執行完了別忘了原子減1即可,由於太過簡單,就不需要放代碼了。
固定時間窗口
參考並發數流控,當需要度量 QPS 時,是否也可以利用這樣的思想呢?
由於 QPS 有時間的度量,第一直覺是和並發數一樣弄個變量,再起個單獨線程每隔 1s 重置這個變量。
但單獨線程始終不放心,需要稍微改一下。
如果系統有一個起始時間,每次請求時,獲取當前時間,兩者之差,就能算出當前處於哪個時間窗口,這個時間窗口單獨計數即可。
如果稍微思考下,你會發現問題不簡單,如下圖,10t 到20t 只有60個請求,20t到30t之間只有80個請求,但有可能16t到26t之間有110個請求,這就很有可能把系統打垮。
滑動時間窗口
為了解決上面的問題,工程師想出了一個好辦法:別固定時間窗口,以當前時間往前推算窗口
但問題又來了,這該怎么實現呢?
滑動時間窗口工程實現
在工程實現上,可以將時間划分為細小的采樣窗口,緩存一段時間的采樣窗口,這樣每當請求來的時候,只需要往前拿一段時間的采樣窗口,然后求和就能拿到總的請求數。
Sentinel-Go 滑動時間窗口的實現
前方代碼高能預警~
Sentinel-Go 是基於 LeapArray
實現的滑動窗口,其數據結構如下
type LeapArray struct {
bucketLengthInMs uint32 // bucket大小
sampleCount uint32 // bucket數量
intervalInMs uint32 // 窗口總大小
array *AtomicBucketWrapArray // bucket數組
updateLock mutex // 更新鎖
}
type AtomicBucketWrapArray struct {
base unsafe.Pointer // 數組的起始地址
length int // 長度,不能改變
data []*BucketWrap // 真正bucket的數據
}
type BucketWrap struct {
BucketStart uint64 // bucket起始時間
Value atomic.Value // bucket數據結構,例如 MetricBucket
}
type MetricBucket struct {
counter [base.MetricEventTotal]int64 // 計數數組,可放不同類型
minRt int64 // 最小RT
maxConcurrency int32 // 最大並發數
}
再看下是如何寫入指標的,例如當流程正常通過時
// ①
sn.AddCount(base.MetricEventPass, int64(count))
// ②
func (bla *BucketLeapArray) AddCount(event base.MetricEvent, count int64) {
bla.addCountWithTime(util.CurrentTimeMillis(), event, count)
}
// ③
func (bla *BucketLeapArray) addCountWithTime(now uint64, event base.MetricEvent, count int64) {
b := bla.currentBucketWithTime(now)
if b == nil {
return
}
b.Add(event, count)
}
// ④
func (mb *MetricBucket) Add(event base.MetricEvent, count int64) {
if event >= base.MetricEventTotal || event < 0 {
logging.Error(errors.Errorf("Unknown metric event: %v", event), "")
return
}
if event == base.MetricEventRt {
mb.AddRt(count)
return
}
mb.addCount(event, count)
}
// ⑤
func (mb *MetricBucket) addCount(event base.MetricEvent, count int64) {
atomic.AddInt64(&mb.counter[event], count)
}
取到相應的 bucket,然后寫入相應 event 的 count,對 RT 會特殊處理,因為有一個最小 RT 需要處理。
重點看是如何取到相應的 bucket 的:
func (bla *BucketLeapArray) currentBucketWithTime(now uint64) *MetricBucket {
// ①根據當前時間取bucket
curBucket, err := bla.data.currentBucketOfTime(now, bla)
...
b, ok := mb.(*MetricBucket)
if !ok {
...
return nil
}
return b
}
func (la *LeapArray) currentBucketOfTime(now uint64, bg BucketGenerator) (*BucketWrap, error) {
...
// ②計算index = (now / bucketLengthInMs) % LeapArray.array.length
idx := la.calculateTimeIdx(now)
// ③計算bucket開始時間 = now - (now % bucketLengthInMs)
bucketStart := calculateStartTime(now, la.bucketLengthInMs)
for {
old := la.array.get(idx)
if old == nil { // ④未使用,直接返回
newWrap := &BucketWrap{
BucketStart: bucketStart,
Value: atomic.Value{},
}
newWrap.Value.Store(bg.NewEmptyBucket())
if la.array.compareAndSet(idx, nil, newWrap) {
return newWrap, nil
} else {
runtime.Gosched()
}
} else if bucketStart == atomic.LoadUint64(&old.BucketStart) { // ⑤剛好取到是當前bucket,返回
return old, nil
} else if bucketStart > atomic.LoadUint64(&old.BucketStart) { // ⑥取到了舊的bucket,重置使用
if la.updateLock.TryLock() {
old = bg.ResetBucketTo(old, bucketStart)
la.updateLock.Unlock()
return old, nil
} else {
runtime.Gosched()
}
} else if bucketStart < atomic.LoadUint64(&old.BucketStart) { // ⑦取到了比當前還新的bucket,總共只有一個bucket時,並發情況可能會出現這種情況,其他情況不可能,直接報錯
if la.sampleCount == 1 {
return old, nil
}
return nil, errors.New(fmt.Sprintf("Provided time timeMillis=%d is already behind old.BucketStart=%d.", bucketStart, old.BucketStart))
}
}
}
舉個直觀的例子,看如何拿到 bucket:
- 假設 B2 取出來是 nil,則 new 一個 bucket 通過 compareAndSet 寫入,保證線程安全,如果別別的線程先寫入,這里會執行失敗,調用 runtime.Gosched(),讓出時間片,進入下一次循環
- 假設取出 B2 的開始時間是3400,與計算的相同,則直接使用
- 假設取出的 B2 的開始時間小於 3400,說明這個 bucket 太舊了,需要覆蓋,使用更新鎖來更新,保證線程安全,如果拿不到鎖,也讓出時間片,進入下一次循環
- 假設取出 B2 的開始時間大於3400,說明已經有其他線程更新了,而 bucketLengthInMs 通常遠遠大於鎖的獲取時間,所以這里只考慮只有一個 bucket 的情況直接返回,其他情況報錯
回到 QPS 計算:
qps := stat.InboundNode().GetQPS(base.MetricEventPass)
該方法會先計算一個起始時間范圍
func (m *SlidingWindowMetric) getBucketStartRange(timeMs uint64) (start, end uint64) {
curBucketStartTime := calculateStartTime(timeMs, m.real.BucketLengthInMs())
end = curBucketStartTime
start = end - uint64(m.intervalInMs) + uint64(m.real.BucketLengthInMs())
return
}
例如當前時間為3500,則計算出
- end = 3400
- start = 3400 - 1200 + 200 = 2400
然后遍歷所有 bucket,把在這個范圍內的 bucket 都拿出來,計算 QPS,只需要相加即可。
最后
本節從滑動窗口流控算法的工程實現演進到 Sentinel-Go 里滑動窗口的實現,從 Sentinel-Go 的實現上看到,還得考慮內存的使用,並發控制等等,如果完全寫出來,還是非常不容易的。
《Sentinel-Go源碼系列》已經寫了三篇,只介紹了兩個知識點:責任鏈模式、滑動窗口限流,后續還有對象池等,但這其實和 Sentinel-Go 關系不是很大,到時候單獨成文,就不放在本系列里了。
本文算是一個結束,與其說是結束,不如說是一個開始。
搜索關注微信公眾號"捉蟲大師",后端技術分享,架構設計、性能優化、源碼閱讀、問題排查、踩坑實踐。