qps統計方法


https://blog.csdn.net/prestigeding/article/details/106005837

https://blog.csdn.net/prestigeding/article/details/103753595

https://zhuanlan.zhihu.com/p/59332962

https://juejin.cn/post/6886061044788264974

https://www.pianshen.com/article/3329358206/

https://www.zyxiao.com/p/9565

https://www.cnblogs.com/my_life/articles/14871604.html

 

QPS中文譯作每秒查詢率(Query Per Second),含義是應用系統每秒鍾處理的查詢請求數。互聯網業務場景中,往往也把寫請求加入進來,把QPS的含義擴展到每秒的響應請求數。后文中的QPS都是指擴展的QPS概念,及每秒的響應請求數。

 

統計QPS指標有什么用?

1. QPS是最重要的衡量系統性能指標之一。研發運維人員需要清楚的了解系統的QPS承載能力,同時也是重要優化目標和數據依據。
2. 線上系統的實際QPS值以及時間分布情況,能夠讓系統運維人員了解系統實時和歷史的運行情況,出現流量異常(過高或過低)可以第一時間進行排查。
3. 實時QPS統計是限流和過載保護的基礎。


QPS指標的重要性不言而喻,而且也是大部分后端研發和運維小伙伴的“心頭好”,那我們又該如何統計QPS呢?

短平快方案

  1. 初始化累加器,每次發生調用,累加計數器加一;

  2. 后台線程執行一個周期任務,任務讀取計數器當前值,同時對計數器清零。任務的執行周期就是統計窗口,QPS = 計數器當前值 / 任務執行周期;

  3. 清零后計數器從頭開始累加,開始新一個統計窗口。


此方案實現簡單,但缺點也很明顯

  1. 統計精度低。后台任務執行的周期,以及周期任務的定時精度,決定了統計精度。一般后台任務的執行周期為秒級,且定時執行的精度易受其他因素(如GC)的影響。此種方案的統計窗口達到秒級后,很難再提升精度

  2. 易出現性能瓶頸。一個進程中可能需要統計多個QPS指標(比如一個RPC服務中的每個方法),如果提升精度縮短執行周期,當指標數量較多時,可能會出現性能瓶頸,而性能瓶頸又會導致統計精度丟失。



高精度方案

核心思想

初始化時確定統計周期和統計窗口精度。計數器由一個拆分為多個,計數時根據時間在相應的計數器上做累加和清零,不再通過后台線程計算和reset。方法說明如下:

 

將一個統計周期(interval)按照統計窗口精度(window)划分為bucketCount個桶(bucket),每個bucket負責一個統計窗口(如圖所示),有如下關系:

Interval = window * bucketCount

 

 

 

划分后每個 bucket 都有一個 start time 和 end time,表明此bucket負責統計的起始時間和終止時間

bucket_i end time = bucket_i+1 start time的關系,所以可以只記錄每個 bucket 的 start time。

當調用發生的時間 t 滿足:bucket_i start time <= t < bucket_i+1 start time,則 bucket_i 的計數器加1,如圖所示:

 

 

 

這樣,根據 bucket 里面的計數器值,和預設的統計窗口精度(window)直接可計算得到一個統計周期內的 QPS。


每到一個新的統計周期,把 bucket 內的計數器清零,之后的統計周期,可以復用一個周期的數據結構。
這種算法,看上去就像一個統計數組延時間軸向前滑動,沒錯,這就是著名的滑動窗口算法(Sliding Window Algorithm)


滑動窗口算法具體流程

滑動窗口具體的工程化方法分為兩個主要部分:次數統計和QPS計算


  • 次數統計算法

1. 初始化interval、window,bucket數組長度bucketCount = interval / window,創建bucket數組。
2. 調用量統計時,獲取當前bucket:
    2.1 對於發生調用的時間t,首先計算出應該落到bucket的下標 i

計算方法:    i = (t / window) % bucketCount

由於時間無限向前延伸,所以bucket數組需要做循環數組使用,取余為循環數組下標計算。

    2.2 計算 t 應在bucket窗口的start time:window start time,如圖所示:
    window start time = t – (t % window)    #當前時間所在的bucket理應的開始時間

特別注意,window start time是應在窗口的start time,和此時bucket[i] start time是有區別的。
區別就在於步驟2所提到的,bucket數組是循環數組,有可能bucket[i]此時還記錄着前一個或幾個interval的數據,下面處理這種case。
【自己舉個例子看一下就明白了】

比如:window是10s,interval是40s,即bucketCount是4,當前時間是43s

那么: i = (43 / 10) % 4 = 0, 放在第一個桶中

window start time = 43 - (43 % 10) = 40,  #當前時間所在的bucket的開始時間應該是40s

bucket[i] start time # bucket[i] 當前記錄着的開始時間

    2.3 如果window start time > bucket[i] start time,則bucket[i] start time = window     start time,且將計數器清零;
    2.4 返回bucket[i]。
3. 返回bucket的計數器加1。
偽代碼:

init:
  int window;
  int interval;
  int bucketCount = interval/windowBucket
  [] bucketArray = new Bucket [bucketCount]
end init

Bucket currentBucket index = (currentTime / window) % bucketCount

startTime = currentTime - (nowTime % window) #當前時間理應的開始時間

if(startTime > bucketArray[index].startTime) #是前一個interval
  bucketArray[index].startTime = startTime

  bucketArray[index].count = 0
#計數器清零
  currentBucket = bucketArray[index]
else if(currentTime == bucket[index].startTime) #應該是startTime吧?
  currentBucket = bucketArray[index]
else // 此處currentTime < bucket[index].startTime 為異常,比如出現了服務器時鍾回調 #這里應該也是startTime吧?
  handle exception
end if
currentBucket.count++ #對應的bucket計數器++
注意:偽代碼中沒有做判空和線程同步的處理

 

  • QPS計算步驟

1. 獲取所有有效的bucket。
    1.1 創建有效bucket結果集result,遍歷bucket數組:
    1.2 對於每一個bucket[i];
    1.3 如果 當前時間 - bucket[i] start time >= interval,則說明bucket[i]不屬於此interval(同時也說明此interval在bucket[i]的時間窗口內沒有計數),bucket[i] 不加入result;
    1.4 返回有效bucket結果集result。
2. 遍歷result,將result中每一個bucket的count相加求和,得到總次數total。
3. QPS = total / interval


偽代碼:

List<Bucket> result
for(i=0; i<bucketArray.size; i++)
  if(currentTime - bucketArray[i].startTime >= interval)

    continue

  end if

  result.add(bucketArray[i])
end for

for(Bucket bucket: result)
  total = total + bucket.count
end for

QPS = total/interval //1 #這種方式,在qps下降時能及時反應出來,但剛開始啟動時,qps是緩慢增長的,不能立即體現
// QPS = total / result.count() * window /1000; //2 #這種方式,在剛開始啟動時,qps能立馬反應實時值,但在qps停止時,不能立即體現,要等interval時間后才能體現,因為總的total qps 和 有效的bucket count是同時降低的。
// 更精准的方式:分為兩部分。啟動階段使用方式2.穩定滑動階段使用方式1.

 

總結

滑動窗口是時間相關數值統計最常用的方法之一阿里著名的開源限流保護組件Sentinel,其QPS統計內核使用的就是滑動窗口。

讀者可以根據本文的原理說明和偽代碼步驟,對照查閱Sentinel中com.alibaba.csp.sentinel.slots.statistic包的滑動窗口代碼實現。

 

google 搜索:滑動窗口 阿里 sentinal


 

https://juejin.cn/post/6886061044788264974

 

/** * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
* -- 統計近一分鍾的數據, 按分鍾統計,分成60個窗口,每個窗口 1000ms */
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
 

2.2 滑動窗口實現原理

2.2.1 ArrayMetric -- 滑動窗口的核心實現類

ArrayMetric 是一個包裝類,真正實現數據統計的是 LeapArray

可以看到 ArrayMetric 是構造函數中初始化的時候就是為了初始化 LeapArray,有兩個核心的參數

  1. sampleCount 樣本數
  2. intervalInMs 采樣周期
/**
 * The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
 * 使用 BucketLeapArray 來實現Sentinel數據統計
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 */
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    // enableOccupy: 是否允許搶占,即當前時間戳已經達到限制后,是否可以占用下一個時間窗口的容量,這里對應 LeapArray 的兩個實現類,如果允許搶占,則為  OccupiableBucketLeapArray,否則為 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    ...
}
2.2.2 LeapArray -- 滑動窗口的頂層數據結構

LeapArray: 用滑動窗口數據結構來統計實時的秒級指標數據,其中包含了滑動窗口的基本信息:大小,窗口個數,窗口內容,每個窗口就是一個統計單位

2.2.2.1 LeapArray 核心屬性和構造方法
public abstract class LeapArray<T> {

    protected int windowLengthInMs; 	// 每一個窗口的時間間隔,單位為毫秒。
    protected int sampleCount;				// 樣本數,就一個統計時間間隔中包含的滑動窗口個數,在 intervalInMs 相同的情況下,sampleCount 越多,抽樣的統計數據就越精確,相應的需要的內存也越多。
    protected int intervalInMs; 			// 采樣周期 ms為單位
    private double intervalInSecond;	// 采樣周期 s為單位
    protected final AtomicReferenceArray<WindowWrap<T>> array; // 一個統計時間間隔中滑動窗口的數組,從這里也可以看出,一個滑動窗口就是使用的 WindowWrap< MetricBucket > 來表示。

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();
    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}

public class WindowWrap<T> {   #窗口數據結構
    //Time length of a single window bucket in milliseconds. --- 該窗口的長度
    private final long windowLengthInMs;
    //Start timestamp of the window in milliseconds. --- 該窗口的起始時間
    private long windowStart;
    //Statistic data. 統計信息
    private T value;

    /**
     * @param windowLengthInMs a single window bucket's time length in milliseconds.
     * @param windowStart      the start timestamp of the window
     * @param value            statistic data
     */
    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
    }
}

  

 

2.2.2.2 根據當前時間獲取滑動窗口
 
/**
     * Get bucket item at provided timestamp.
     * 根據指定的時間戳獲取對應的窗口
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
				
      	// 根據當前時間計算出當前時間屬於那個滑動窗口的數組下標
        int idx = calculateTimeIdx(timeMillis);
        // 計算當前窗口的起始時間
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * 根據下標在環形數組中獲取滑動窗口.
         *
         * (1) 如果指定下標的窗口不存在, 創建一個新的窗口並通過CAS賦值到數組的指定下標位置.
         * (2) 如果指定下標的窗口存在,並且該窗口的開始時間等於計算出來的windowStart,返回當前的窗口.
         * (3) 如果指定下標的窗口存在,但是該窗口的開始時間小於計算出來的windowStart,證明是上一圈已經用過的過期的窗口,則重置當前的窗口數據
         * (4) 如果指定下標的窗口存在,但是該窗口的開始時間大於剛剛算出來的開始時間,理論上不應該出現這種情況。
         */
        while (true) { // 死循環查找當前的時間窗口,這里之所有需要循環,是因為可能多個線程都在獲取當前時間窗口
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

  

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM