什么是自適應限流
自適應限流從整體維度對應用入口流量進行控制,結合應用的 Load、CPU 使用率、總體平均 RT、入口 QPS 和並發線程數等幾個維度的監控指標,通過自適應的流控策略,讓系統的入口流量和系統的負載達到一個平衡,讓系統盡可能跑在最大吞吐量的同時保證系統整體的穩定性。
核心目標:
- 自動嗅探負載和 qps,減少人工配置
- 削頂,保證超載時系統不被拖垮,並能以高水位 qps 繼續運行
限流規則
計算吞吐量:利特爾法則 L = λ * W
如上圖所示,如果我們開一個小店,平均每分鍾進店 2 個客人(λ),每位客人從等待到完成交易需要 4 分鍾(W),那我們店里能承載的客人數量就是 2 * 4 = 8 個人
同理,我們可以將 λ
當做 QPS, W
呢是每個請求需要花費的時間,那我們的系統的吞吐就是 L = λ * W
,所以我們可以使用利特爾法則來計算系統的吞吐量。
指標介紹
指標名稱 | 指標含義 |
---|---|
cpu | 最近 1s 的 CPU 使用率均值,使用滑動平均計算,采樣周期是 250ms |
inflight | 當前處理中正在處理的請求數量 |
pass | 請求處理成功的量 |
rt | 請求成功的響應耗時 |
滑動窗口
在自適應限流保護中,采集到的指標的時效性非常強,系統只需要采集最近一小段時間內的 qps、rt 即可,對於較老的數據,會自動丟棄。為了實現這個效果,kratos 使用了滑動窗口來保存采樣數據。
如上圖,展示了一個具有兩個桶(bucket)的滑動窗口(rolling window)。整個滑動窗口用來保存最近 1s 的采樣數據,每個小的桶用來保存 500ms 的采樣數據。 當時間流動之后,過期的桶會自動被新桶的數據覆蓋掉,在圖中,在 1000-1500ms 時,bucket 1 的數據因為過期而被丟棄,之后 bucket 3 的數據填到了窗口的頭部。
限流公式
判斷是否丟棄當前請求的算法如下:
cpu > 800 AND (Now - PrevDrop) < 1s AND (MaxPass * MinRt * windows / 1000) < InFlight
MaxPass 表示最近 5s 內,單個采樣窗口中最大的請求數。 MinRt 表示最近 5s 內,單個采樣窗口中最小的響應時間。 windows 表示一秒內采樣窗口的數量,默認配置中是 5s 50 個采樣,那么 windows 的值為 10。
源碼分析
代碼地址:
BBR struct
type BBR struct {
cpu cpuGetter
passStat window.RollingCounter
rtStat window.RollingCounter
inFlight int64
bucketPerSecond int64
bucketSize time.Duration
// prevDropTime defines previous start drop since initTime
prevDropTime atomic.Value
maxPASSCache atomic.Value
minRtCache atomic.Value
opts *options
}
cpu
- cpu的指標函數,CPU的使用率, 這里為了減小誤差,把數字擴大化,乘以1000,比賽使用率60%,也就是0.6 cpu的值就為600
passStat
- 請求數的采樣數據,使用滑動窗口進行統計
rtStat
- 響應時間的采樣數據,同樣使用滑動窗口進行統計
inFlight
- 當前系統中的請求數,數據得來方法是:中間件原理在處理前+1,處理handle之后不管成功失敗都減去1
bucketPerSecond
- 一個 bucket 的時間
bucketSize
- 桶的數量
prevDropTime
- 上次觸發限流時間
maxPASSCache
- 單個采樣窗口中最大的請求數的緩存數據
minRtCache
- 單個采樣窗口中最小的響應時間的緩存數據
Allow接口
// Allow checks all inbound traffic.
// Once overload is detected, it raises limit.ErrLimitExceed error.
func (l *BBR) Allow(ctx context.Context) (func(), error) {
if l.shouldDrop() { // shouldDrop 判斷是否需要限流,如果true表示拒絕 之后重點講
return nil, ErrLimitExceed
}
atomic.AddInt64(&l.inFlight, 1) // 之前說的,正在處理數+1
stime := time.Since(initTime) // 現在時間減去程序初始化時間 表示程序開始執行時刻
return func() { // allow返回函數 在中間件(攔截器)中handle執行完成后調用
rt := int64((time.Since(initTime) - stime) / time.Millisecond) // 執行完handle的時間減去stime 表示 程序執行的總時間 單位ms
l.rtStat.Add(rt) // 把處理時間放進采樣數據window
atomic.AddInt64(&l.inFlight, -1) // 正在處理數-1 便是處理完成
l.passStat.Add(1) // 成功了,把通過數的采樣數據window加1
}, nil
}
shouldDrop方法
func (l *BBR) shouldDrop() bool {
curTime := time.Since(initTime)
if l.cpu() < l.opts.CPUThreshold {
// current cpu payload below the threshold
prevDropTime, _ := l.prevDropTime.Load().(time.Duration)
if prevDropTime == 0 {
// haven't start drop,
// accept current request
return false
}
if curTime-prevDropTime <= time.Second {
// just start drop one second ago,
// check current inflight count
inFlight := atomic.LoadInt64(&l.inFlight)
return inFlight > 1 && inFlight > l.maxInFlight()
}
l.prevDropTime.Store(time.Duration(0))
return false
}
// current cpu payload exceeds the threshold
inFlight := atomic.LoadInt64(&l.inFlight)
drop := inFlight > 1 && inFlight > l.maxInFlight()
if drop {
prevDrop, _ := l.prevDropTime.Load().(time.Duration)
if prevDrop != 0 {
// already started drop, return directly
return drop
}
// store start drop time
l.prevDropTime.Store(curTime)
}
return drop
}
maxInFlight()方法代表過去的負載
int64(math.Floor(float64(l.maxPASS()*l.minRT()*l.bucketPerSecond)/1000.0) + 0.5)
參考算法:https://github.com/alibaba/Sentinel/wiki/系統自適應限流
- maxPass * bucketPerSecond / 1000 為每毫秒處理的請求數
- l.minRT() 為 單個采樣窗口中最小的響應時間
- T ≈ QPS * Avg(RT)
+ 0.5
為向上取整
流程圖
壓測報告
場景1,請求以每秒增加1個的速度不停上升,壓測效果如下:
左測是沒有限流的壓測效果,右側是帶限流的壓測效果。 可以看到,沒有限流的場景里,系統在 700qps 時開始抖動,在 1k qps 時被拖垮,幾乎沒有新的請求能被放行,然而在使用限流之后,系統請求能夠穩定在 600 qps 左右,rt 沒有暴增,服務也沒有被打垮,可見,限流有效的保護了服務。
原文地址:
參考文章: