qps统计方法


https://blog.csdn.net/prestigeding/article/details/106005837

https://blog.csdn.net/prestigeding/article/details/103753595

https://zhuanlan.zhihu.com/p/59332962

https://juejin.cn/post/6886061044788264974

https://www.pianshen.com/article/3329358206/

https://www.zyxiao.com/p/9565

https://www.cnblogs.com/my_life/articles/14871604.html

 

QPS中文译作每秒查询率(Query Per Second),含义是应用系统每秒钟处理的查询请求数。互联网业务场景中,往往也把写请求加入进来,把QPS的含义扩展到每秒的响应请求数。后文中的QPS都是指扩展的QPS概念,及每秒的响应请求数。

 

统计QPS指标有什么用?

1. QPS是最重要的衡量系统性能指标之一。研发运维人员需要清楚的了解系统的QPS承载能力,同时也是重要优化目标和数据依据。
2. 线上系统的实际QPS值以及时间分布情况,能够让系统运维人员了解系统实时和历史的运行情况,出现流量异常(过高或过低)可以第一时间进行排查。
3. 实时QPS统计是限流和过载保护的基础。


QPS指标的重要性不言而喻,而且也是大部分后端研发和运维小伙伴的“心头好”,那我们又该如何统计QPS呢?

短平快方案

  1. 初始化累加器,每次发生调用,累加计数器加一;

  2. 后台线程执行一个周期任务,任务读取计数器当前值,同时对计数器清零。任务的执行周期就是统计窗口,QPS = 计数器当前值 / 任务执行周期;

  3. 清零后计数器从头开始累加,开始新一个统计窗口。


此方案实现简单,但缺点也很明显

  1. 统计精度低。后台任务执行的周期,以及周期任务的定时精度,决定了统计精度。一般后台任务的执行周期为秒级,且定时执行的精度易受其他因素(如GC)的影响。此种方案的统计窗口达到秒级后,很难再提升精度

  2. 易出现性能瓶颈。一个进程中可能需要统计多个QPS指标(比如一个RPC服务中的每个方法),如果提升精度缩短执行周期,当指标数量较多时,可能会出现性能瓶颈,而性能瓶颈又会导致统计精度丢失。



高精度方案

核心思想

初始化时确定统计周期和统计窗口精度。计数器由一个拆分为多个,计数时根据时间在相应的计数器上做累加和清零,不再通过后台线程计算和reset。方法说明如下:

 

将一个统计周期(interval)按照统计窗口精度(window)划分为bucketCount个桶(bucket),每个bucket负责一个统计窗口(如图所示),有如下关系:

Interval = window * bucketCount

 

 

 

划分后每个 bucket 都有一个 start time 和 end time,表明此bucket负责统计的起始时间和终止时间

bucket_i end time = bucket_i+1 start time的关系,所以可以只记录每个 bucket 的 start time。

当调用发生的时间 t 满足:bucket_i start time <= t < bucket_i+1 start time,则 bucket_i 的计数器加1,如图所示:

 

 

 

这样,根据 bucket 里面的计数器值,和预设的统计窗口精度(window)直接可计算得到一个统计周期内的 QPS。


每到一个新的统计周期,把 bucket 内的计数器清零,之后的统计周期,可以复用一个周期的数据结构。
这种算法,看上去就像一个统计数组延时间轴向前滑动,没错,这就是著名的滑动窗口算法(Sliding Window Algorithm)


滑动窗口算法具体流程

滑动窗口具体的工程化方法分为两个主要部分:次数统计和QPS计算


  • 次数统计算法

1. 初始化interval、window,bucket数组长度bucketCount = interval / window,创建bucket数组。
2. 调用量统计时,获取当前bucket:
    2.1 对于发生调用的时间t,首先计算出应该落到bucket的下标 i

计算方法:    i = (t / window) % bucketCount

由于时间无限向前延伸,所以bucket数组需要做循环数组使用,取余为循环数组下标计算。

    2.2 计算 t 应在bucket窗口的start time:window start time,如图所示:
    window start time = t – (t % window)    #当前时间所在的bucket理应的开始时间

特别注意,window start time是应在窗口的start time,和此时bucket[i] start time是有区别的。
区别就在于步骤2所提到的,bucket数组是循环数组,有可能bucket[i]此时还记录着前一个或几个interval的数据,下面处理这种case。
【自己举个例子看一下就明白了】

比如:window是10s,interval是40s,即bucketCount是4,当前时间是43s

那么: i = (43 / 10) % 4 = 0, 放在第一个桶中

window start time = 43 - (43 % 10) = 40,  #当前时间所在的bucket的开始时间应该是40s

bucket[i] start time # bucket[i] 当前记录着的开始时间

    2.3 如果window start time > bucket[i] start time,则bucket[i] start time = window     start time,且将计数器清零;
    2.4 返回bucket[i]。
3. 返回bucket的计数器加1。
伪代码:

init:
  int window;
  int interval;
  int bucketCount = interval/windowBucket
  [] bucketArray = new Bucket [bucketCount]
end init

Bucket currentBucket index = (currentTime / window) % bucketCount

startTime = currentTime - (nowTime % window) #当前时间理应的开始时间

if(startTime > bucketArray[index].startTime) #是前一个interval
  bucketArray[index].startTime = startTime

  bucketArray[index].count = 0
#计数器清零
  currentBucket = bucketArray[index]
else if(currentTime == bucket[index].startTime) #应该是startTime吧?
  currentBucket = bucketArray[index]
else // 此处currentTime < bucket[index].startTime 为异常,比如出现了服务器时钟回调 #这里应该也是startTime吧?
  handle exception
end if
currentBucket.count++ #对应的bucket计数器++
注意:伪代码中没有做判空和线程同步的处理

 

  • QPS计算步骤

1. 获取所有有效的bucket。
    1.1 创建有效bucket结果集result,遍历bucket数组:
    1.2 对于每一个bucket[i];
    1.3 如果 当前时间 - bucket[i] start time >= interval,则说明bucket[i]不属于此interval(同时也说明此interval在bucket[i]的时间窗口内没有计数),bucket[i] 不加入result;
    1.4 返回有效bucket结果集result。
2. 遍历result,将result中每一个bucket的count相加求和,得到总次数total。
3. QPS = total / interval


伪代码:

List<Bucket> result
for(i=0; i<bucketArray.size; i++)
  if(currentTime - bucketArray[i].startTime >= interval)

    continue

  end if

  result.add(bucketArray[i])
end for

for(Bucket bucket: result)
  total = total + bucket.count
end for

QPS = total/interval //1 #这种方式,在qps下降时能及时反应出来,但刚开始启动时,qps是缓慢增长的,不能立即体现
// QPS = total / result.count() * window /1000; //2 #这种方式,在刚开始启动时,qps能立马反应实时值,但在qps停止时,不能立即体现,要等interval时间后才能体现,因为总的total qps 和 有效的bucket count是同时降低的。
// 更精准的方式:分为两部分。启动阶段使用方式2.稳定滑动阶段使用方式1.

 

总结

滑动窗口是时间相关数值统计最常用的方法之一阿里著名的开源限流保护组件Sentinel,其QPS统计内核使用的就是滑动窗口。

读者可以根据本文的原理说明和伪代码步骤,对照查阅Sentinel中com.alibaba.csp.sentinel.slots.statistic包的滑动窗口代码实现。

 

google 搜索:滑动窗口 阿里 sentinal


 

https://juejin.cn/post/6886061044788264974

 

/** * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
* -- 统计近一分钟的数据, 按分钟统计,分成60个窗口,每个窗口 1000ms */
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
 

2.2 滑动窗口实现原理

2.2.1 ArrayMetric -- 滑动窗口的核心实现类

ArrayMetric 是一个包装类,真正实现数据统计的是 LeapArray

可以看到 ArrayMetric 是构造函数中初始化的时候就是为了初始化 LeapArray,有两个核心的参数

  1. sampleCount 样本数
  2. intervalInMs 采样周期
/**
 * The basic metric class in Sentinel using a {@link BucketLeapArray} internal.
 * 使用 BucketLeapArray 来实现Sentinel数据统计
 *
 * @author jialiang.linjl
 * @author Eric Zhao
 */
public class ArrayMetric implements Metric {

    private final LeapArray<MetricBucket> data;

    public ArrayMetric(int sampleCount, int intervalInMs) {
        this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
    }
    
    // enableOccupy: 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为  OccupiableBucketLeapArray,否则为 BucketLeapArray
    public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
        if (enableOccupy) {
            this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
        } else {
            this.data = new BucketLeapArray(sampleCount, intervalInMs);
        }
    }
    
    ...
}
2.2.2 LeapArray -- 滑动窗口的顶层数据结构

LeapArray: 用滑动窗口数据结构来统计实时的秒级指标数据,其中包含了滑动窗口的基本信息:大小,窗口个数,窗口内容,每个窗口就是一个统计单位

2.2.2.1 LeapArray 核心属性和构造方法
public abstract class LeapArray<T> {

    protected int windowLengthInMs; 	// 每一个窗口的时间间隔,单位为毫秒。
    protected int sampleCount;				// 样本数,就一个统计时间间隔中包含的滑动窗口个数,在 intervalInMs 相同的情况下,sampleCount 越多,抽样的统计数据就越精确,相应的需要的内存也越多。
    protected int intervalInMs; 			// 采样周期 ms为单位
    private double intervalInSecond;	// 采样周期 s为单位
    protected final AtomicReferenceArray<WindowWrap<T>> array; // 一个统计时间间隔中滑动窗口的数组,从这里也可以看出,一个滑动窗口就是使用的 WindowWrap< MetricBucket > 来表示。

    /**
     * The conditional (predicate) update lock is used only when current bucket is deprecated.
     */
    private final ReentrantLock updateLock = new ReentrantLock();
    /**
     * The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
     *
     * @param sampleCount  bucket count of the sliding window
     * @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
     */
    public LeapArray(int sampleCount, int intervalInMs) {
        AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
        AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
        AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");

        this.windowLengthInMs = intervalInMs / sampleCount;
        this.intervalInMs = intervalInMs;
        this.intervalInSecond = intervalInMs / 1000.0;
        this.sampleCount = sampleCount;

        this.array = new AtomicReferenceArray<>(sampleCount);
    }
}

public class WindowWrap<T> {   #窗口数据结构
    //Time length of a single window bucket in milliseconds. --- 该窗口的长度
    private final long windowLengthInMs;
    //Start timestamp of the window in milliseconds. --- 该窗口的起始时间
    private long windowStart;
    //Statistic data. 统计信息
    private T value;

    /**
     * @param windowLengthInMs a single window bucket's time length in milliseconds.
     * @param windowStart      the start timestamp of the window
     * @param value            statistic data
     */
    public WindowWrap(long windowLengthInMs, long windowStart, T value) {
        this.windowLengthInMs = windowLengthInMs;
        this.windowStart = windowStart;
        this.value = value;
    }
}

  

 

2.2.2.2 根据当前时间获取滑动窗口
 
/**
     * Get bucket item at provided timestamp.
     * 根据指定的时间戳获取对应的窗口
     *
     * @param timeMillis a valid timestamp in milliseconds
     * @return current bucket item at provided timestamp if the time is valid; null if time is invalid
     */
    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }
				
      	// 根据当前时间计算出当前时间属于那个滑动窗口的数组下标
        int idx = calculateTimeIdx(timeMillis);
        // 计算当前窗口的起始时间
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * 根据下标在环形数组中获取滑动窗口.
         *
         * (1) 如果指定下标的窗口不存在, 创建一个新的窗口并通过CAS赋值到数组的指定下标位置.
         * (2) 如果指定下标的窗口存在,并且该窗口的开始时间等于计算出来的windowStart,返回当前的窗口.
         * (3) 如果指定下标的窗口存在,但是该窗口的开始时间小于计算出来的windowStart,证明是上一圈已经用过的过期的窗口,则重置当前的窗口数据
         * (4) 如果指定下标的窗口存在,但是该窗口的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。
         */
        while (true) { // 死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

  

 


免责声明!

本站转载的文章为个人学习借鉴使用,本站对版权不负任何法律责任。如果侵犯了您的隐私权益,请联系本站邮箱yoyou2525@163.com删除。



 
粤ICP备18138465号  © 2018-2025 CODEPRJ.COM