https://blog.csdn.net/prestigeding/article/details/106005837
https://blog.csdn.net/prestigeding/article/details/103753595
https://zhuanlan.zhihu.com/p/59332962
https://juejin.cn/post/6886061044788264974
https://www.pianshen.com/article/3329358206/
https://www.zyxiao.com/p/9565
https://www.cnblogs.com/my_life/articles/14871604.html
QPS中文譯作每秒查詢率(Query Per Second),含義是應用系統每秒鍾處理的查詢請求數。互聯網業務場景中,往往也把寫請求加入進來,把QPS的含義擴展到每秒的響應請求數。后文中的QPS都是指擴展的QPS概念,及每秒的響應請求數。
統計QPS指標有什么用?
1. QPS是最重要的衡量系統性能指標之一。研發運維人員需要清楚的了解系統的QPS承載能力,同時也是重要優化目標和數據依據。
2. 線上系統的實際QPS值以及時間分布情況,能夠讓系統運維人員了解系統實時和歷史的運行情況,出現流量異常(過高或過低)可以第一時間進行排查。
3. 實時QPS統計是限流和過載保護的基礎。
QPS指標的重要性不言而喻,而且也是大部分后端研發和運維小伙伴的“心頭好”,那我們又該如何統計QPS呢?
短平快方案
-
初始化累加器,每次發生調用,累加計數器加一;
-
后台線程執行一個周期任務,任務讀取計數器當前值,同時對計數器清零。任務的執行周期就是統計窗口,QPS = 計數器當前值 / 任務執行周期;
-
清零后計數器從頭開始累加,開始新一個統計窗口。
此方案實現簡單,但缺點也很明顯
-
統計精度低。后台任務執行的周期,以及周期任務的定時精度,決定了統計精度。一般后台任務的執行周期為秒級,且定時執行的精度易受其他因素(如GC)的影響。此種方案的統計窗口達到秒級后,很難再提升精度。
-
易出現性能瓶頸。一個進程中可能需要統計多個QPS指標(比如一個RPC服務中的每個方法),如果提升精度縮短執行周期,當指標數量較多時,可能會出現性能瓶頸,而性能瓶頸又會導致統計精度丟失。
高精度方案
核心思想
初始化時確定統計周期和統計窗口精度。計數器由一個拆分為多個,計數時根據時間在相應的計數器上做累加和清零,不再通過后台線程計算和reset。方法說明如下:
將一個統計周期(interval)按照統計窗口精度(window)划分為bucketCount個桶(bucket),每個bucket負責一個統計窗口(如圖所示),有如下關系:
Interval = window * bucketCount
划分后每個 bucket 都有一個 start time 和 end time,表明此bucket負責統計的起始時間和終止時間。
有bucket_i end time = bucket_i+1 start time的關系,所以可以只記錄每個 bucket 的 start time。
當調用發生的時間 t 滿足:bucket_i start time <= t < bucket_i+1 start time,則 bucket_i 的計數器加1,如圖所示:
這樣,根據 bucket 里面的計數器值,和預設的統計窗口精度(window)直接可計算得到一個統計周期內的 QPS。
每到一個新的統計周期,把 bucket 內的計數器清零,之后的統計周期,可以復用一個周期的數據結構。
這種算法,看上去就像一個統計數組延時間軸向前滑動,沒錯,這就是著名的滑動窗口算法(Sliding Window Algorithm)。
滑動窗口算法具體流程
滑動窗口具體的工程化方法分為兩個主要部分:次數統計和QPS計算。
- 次數統計算法
1. 初始化interval、window,bucket數組長度bucketCount = interval / window,創建bucket數組。
2. 調用量統計時,獲取當前bucket:
2.1 對於發生調用的時間t,首先計算出應該落到bucket的下標 i。
計算方法: i = (t / window) % bucketCount
由於時間無限向前延伸,所以bucket數組需要做循環數組使用,取余為循環數組下標計算。
2.2 計算 t 應在bucket窗口的start time:window start time,如圖所示:
window start time = t – (t % window) #當前時間所在的bucket理應的開始時間
特別注意,window start time是應在窗口的start time,和此時bucket[i] start time是有區別的。
區別就在於步驟2所提到的,bucket數組是循環數組,有可能bucket[i]此時還記錄着前一個或幾個interval的數據,下面處理這種case。
【自己舉個例子看一下就明白了】
比如:window是10s,interval是40s,即bucketCount是4,當前時間是43s
那么: i = (43 / 10) % 4 = 0, 放在第一個桶中
window start time = 43 - (43 % 10) = 40, #當前時間所在的bucket的開始時間應該是40s
bucket[i] start time # bucket[i] 當前記錄着的開始時間
2.3 如果window start time > bucket[i] start time,則bucket[i] start time = window start time,且將計數器清零;
2.4 返回bucket[i]。
3. 返回bucket的計數器加1。
偽代碼:
init:
int window;
int interval;int bucketCount = interval/window
Bucket
[] bucketArray = new Bucket [bucketCount]end init
Bucket currentBucket
index = (currentTime / window) % bucketCount
startTime = currentTime - (nowTime % window) #當前時間理應的開始時間
if(startTime > bucketArray[index].startTime)
#是前一個interval
bucketArray[index].startTime = startTime
bucketArray[index].count = 0#計數器清零
currentBucket = bucketArray[index]else if(currentTime == bucket[index].startTime)
#應該是startTime吧?
currentBucket = bucketArray[index]else
// 此處currentTime < bucket[index].startTime 為異常,比如出現了服務器時鍾回調
#這里應該也是startTime吧?
handle exceptionend if
currentBucket.count++ #對應的bucket計數器++
注意:偽代碼中沒有做判空和線程同步的處理
- QPS計算步驟
1. 獲取所有有效的bucket。
1.1 創建有效bucket結果集result,遍歷bucket數組:
1.2 對於每一個bucket[i];
1.3 如果 當前時間 - bucket[i] start time >= interval,則說明bucket[i]不屬於此interval(同時也說明此interval在bucket[i]的時間窗口內沒有計數),bucket[i] 不加入result;
1.4 返回有效bucket結果集result。
2. 遍歷result,將result中每一個bucket的count相加求和,得到總次數total。
3. QPS = total / interval。
偽代碼:
List<Bucket> result
for(i=0; i<bucketArray.size; i++)
if(currentTime - bucketArray[i].startTime >= interval)
continue
end if
result.add(bucketArray[i])end for
for(Bucket bucket: result)
total = total + bucket.countend for
QPS = total/interval //1 #這種方式,在qps下降時能及時反應出來,但剛開始啟動時,qps是緩慢增長的,不能立即體現
// QPS = total / result.count() * window /1000; //2 #這種方式,在剛開始啟動時,qps能立馬反應實時值,但在qps停止時,不能立即體現,要等interval時間后才能體現,因為總的total qps 和 有效的bucket count是同時降低的。
// 更精准的方式:分為兩部分。啟動階段使用方式2.穩定滑動階段使用方式1.
總結
滑動窗口是時間相關數值統計最常用的方法之一,阿里著名的開源限流保護組件Sentinel,其QPS統計內核使用的就是滑動窗口。
讀者可以根據本文的原理說明和偽代碼步驟,對照查閱Sentinel中com.alibaba.csp.sentinel.slots.statistic包的滑動窗口代碼實現。
google 搜索:滑動窗口 阿里 sentinal
https://juejin.cn/post/6886061044788264974
2.2 滑動窗口實現原理
2.2.1 ArrayMetric -- 滑動窗口的核心實現類
ArrayMetric 是一個包裝類,真正實現數據統計的是 LeapArray
可以看到 ArrayMetric 是構造函數中初始化的時候就是為了初始化 LeapArray,有兩個核心的參數
- sampleCount 樣本數
- intervalInMs 采樣周期
/** * The basic metric class in Sentinel using a {@link BucketLeapArray} internal. * 使用 BucketLeapArray 來實現Sentinel數據統計 * * @author jialiang.linjl * @author Eric Zhao */ public class ArrayMetric implements Metric { private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } // enableOccupy: 是否允許搶占,即當前時間戳已經達到限制后,是否可以占用下一個時間窗口的容量,這里對應 LeapArray 的兩個實現類,如果允許搶占,則為 OccupiableBucketLeapArray,否則為 BucketLeapArray public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } ... }
2.2.2 LeapArray -- 滑動窗口的頂層數據結構
LeapArray: 用滑動窗口數據結構來統計實時的秒級指標數據,其中包含了滑動窗口的基本信息:大小,窗口個數,窗口內容,每個窗口就是一個統計單位
2.2.2.1 LeapArray 核心屬性和構造方法
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 每一個窗口的時間間隔,單位為毫秒。
protected int sampleCount; // 樣本數,就一個統計時間間隔中包含的滑動窗口個數,在 intervalInMs 相同的情況下,sampleCount 越多,抽樣的統計數據就越精確,相應的需要的內存也越多。
protected int intervalInMs; // 采樣周期 ms為單位
private double intervalInSecond; // 采樣周期 s為單位
protected final AtomicReferenceArray<WindowWrap<T>> array; // 一個統計時間間隔中滑動窗口的數組,從這里也可以看出,一個滑動窗口就是使用的 WindowWrap< MetricBucket > 來表示。
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
public class WindowWrap<T> { #窗口數據結構
//Time length of a single window bucket in milliseconds. --- 該窗口的長度
private final long windowLengthInMs;
//Start timestamp of the window in milliseconds. --- 該窗口的起始時間
private long windowStart;
//Statistic data. 統計信息
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
}
2.2.2.2 根據當前時間獲取滑動窗口
/** * Get bucket item at provided timestamp. * 根據指定的時間戳獲取對應的窗口 * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根據當前時間計算出當前時間屬於那個滑動窗口的數組下標 int idx = calculateTimeIdx(timeMillis); // 計算當前窗口的起始時間 long windowStart = calculateWindowStart(timeMillis); /* * 根據下標在環形數組中獲取滑動窗口. * * (1) 如果指定下標的窗口不存在, 創建一個新的窗口並通過CAS賦值到數組的指定下標位置. * (2) 如果指定下標的窗口存在,並且該窗口的開始時間等於計算出來的windowStart,返回當前的窗口. * (3) 如果指定下標的窗口存在,但是該窗口的開始時間小於計算出來的windowStart,證明是上一圈已經用過的過期的窗口,則重置當前的窗口數據 * (4) 如果指定下標的窗口存在,但是該窗口的開始時間大於剛剛算出來的開始時間,理論上不應該出現這種情況。 */ while (true) { // 死循環查找當前的時間窗口,這里之所有需要循環,是因為可能多個線程都在獲取當前時間窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }