https://blog.csdn.net/prestigeding/article/details/106005837
https://blog.csdn.net/prestigeding/article/details/103753595
https://zhuanlan.zhihu.com/p/59332962
https://juejin.cn/post/6886061044788264974
https://www.pianshen.com/article/3329358206/
https://www.zyxiao.com/p/9565
https://www.cnblogs.com/my_life/articles/14871604.html
QPS中文译作每秒查询率(Query Per Second),含义是应用系统每秒钟处理的查询请求数。互联网业务场景中,往往也把写请求加入进来,把QPS的含义扩展到每秒的响应请求数。后文中的QPS都是指扩展的QPS概念,及每秒的响应请求数。
统计QPS指标有什么用?
1. QPS是最重要的衡量系统性能指标之一。研发运维人员需要清楚的了解系统的QPS承载能力,同时也是重要优化目标和数据依据。
2. 线上系统的实际QPS值以及时间分布情况,能够让系统运维人员了解系统实时和历史的运行情况,出现流量异常(过高或过低)可以第一时间进行排查。
3. 实时QPS统计是限流和过载保护的基础。
QPS指标的重要性不言而喻,而且也是大部分后端研发和运维小伙伴的“心头好”,那我们又该如何统计QPS呢?
短平快方案
-
初始化累加器,每次发生调用,累加计数器加一;
-
后台线程执行一个周期任务,任务读取计数器当前值,同时对计数器清零。任务的执行周期就是统计窗口,QPS = 计数器当前值 / 任务执行周期;
-
清零后计数器从头开始累加,开始新一个统计窗口。
此方案实现简单,但缺点也很明显
-
统计精度低。后台任务执行的周期,以及周期任务的定时精度,决定了统计精度。一般后台任务的执行周期为秒级,且定时执行的精度易受其他因素(如GC)的影响。此种方案的统计窗口达到秒级后,很难再提升精度。
-
易出现性能瓶颈。一个进程中可能需要统计多个QPS指标(比如一个RPC服务中的每个方法),如果提升精度缩短执行周期,当指标数量较多时,可能会出现性能瓶颈,而性能瓶颈又会导致统计精度丢失。
高精度方案
核心思想
初始化时确定统计周期和统计窗口精度。计数器由一个拆分为多个,计数时根据时间在相应的计数器上做累加和清零,不再通过后台线程计算和reset。方法说明如下:
将一个统计周期(interval)按照统计窗口精度(window)划分为bucketCount个桶(bucket),每个bucket负责一个统计窗口(如图所示),有如下关系:
Interval = window * bucketCount
划分后每个 bucket 都有一个 start time 和 end time,表明此bucket负责统计的起始时间和终止时间。
有bucket_i end time = bucket_i+1 start time的关系,所以可以只记录每个 bucket 的 start time。
当调用发生的时间 t 满足:bucket_i start time <= t < bucket_i+1 start time,则 bucket_i 的计数器加1,如图所示:
这样,根据 bucket 里面的计数器值,和预设的统计窗口精度(window)直接可计算得到一个统计周期内的 QPS。
每到一个新的统计周期,把 bucket 内的计数器清零,之后的统计周期,可以复用一个周期的数据结构。
这种算法,看上去就像一个统计数组延时间轴向前滑动,没错,这就是著名的滑动窗口算法(Sliding Window Algorithm)。
滑动窗口算法具体流程
滑动窗口具体的工程化方法分为两个主要部分:次数统计和QPS计算。
- 次数统计算法
1. 初始化interval、window,bucket数组长度bucketCount = interval / window,创建bucket数组。
2. 调用量统计时,获取当前bucket:
2.1 对于发生调用的时间t,首先计算出应该落到bucket的下标 i。
计算方法: i = (t / window) % bucketCount
由于时间无限向前延伸,所以bucket数组需要做循环数组使用,取余为循环数组下标计算。
2.2 计算 t 应在bucket窗口的start time:window start time,如图所示:
window start time = t – (t % window) #当前时间所在的bucket理应的开始时间
特别注意,window start time是应在窗口的start time,和此时bucket[i] start time是有区别的。
区别就在于步骤2所提到的,bucket数组是循环数组,有可能bucket[i]此时还记录着前一个或几个interval的数据,下面处理这种case。
【自己举个例子看一下就明白了】
比如:window是10s,interval是40s,即bucketCount是4,当前时间是43s
那么: i = (43 / 10) % 4 = 0, 放在第一个桶中
window start time = 43 - (43 % 10) = 40, #当前时间所在的bucket的开始时间应该是40s
bucket[i] start time # bucket[i] 当前记录着的开始时间
2.3 如果window start time > bucket[i] start time,则bucket[i] start time = window start time,且将计数器清零;
2.4 返回bucket[i]。
3. 返回bucket的计数器加1。
伪代码:
init:
int window;
int interval;int bucketCount = interval/window
Bucket
[] bucketArray = new Bucket [bucketCount]end init
Bucket currentBucket
index = (currentTime / window) % bucketCount
startTime = currentTime - (nowTime % window) #当前时间理应的开始时间
if(startTime > bucketArray[index].startTime)
#是前一个interval
bucketArray[index].startTime = startTime
bucketArray[index].count = 0#计数器清零
currentBucket = bucketArray[index]else if(currentTime == bucket[index].startTime)
#应该是startTime吧?
currentBucket = bucketArray[index]else
// 此处currentTime < bucket[index].startTime 为异常,比如出现了服务器时钟回调
#这里应该也是startTime吧?
handle exceptionend if
currentBucket.count++ #对应的bucket计数器++
注意:伪代码中没有做判空和线程同步的处理
- QPS计算步骤
1. 获取所有有效的bucket。
1.1 创建有效bucket结果集result,遍历bucket数组:
1.2 对于每一个bucket[i];
1.3 如果 当前时间 - bucket[i] start time >= interval,则说明bucket[i]不属于此interval(同时也说明此interval在bucket[i]的时间窗口内没有计数),bucket[i] 不加入result;
1.4 返回有效bucket结果集result。
2. 遍历result,将result中每一个bucket的count相加求和,得到总次数total。
3. QPS = total / interval。
伪代码:
List<Bucket> result
for(i=0; i<bucketArray.size; i++)
if(currentTime - bucketArray[i].startTime >= interval)
continue
end if
result.add(bucketArray[i])end for
for(Bucket bucket: result)
total = total + bucket.countend for
QPS = total/interval //1 #这种方式,在qps下降时能及时反应出来,但刚开始启动时,qps是缓慢增长的,不能立即体现
// QPS = total / result.count() * window /1000; //2 #这种方式,在刚开始启动时,qps能立马反应实时值,但在qps停止时,不能立即体现,要等interval时间后才能体现,因为总的total qps 和 有效的bucket count是同时降低的。
// 更精准的方式:分为两部分。启动阶段使用方式2.稳定滑动阶段使用方式1.
总结
滑动窗口是时间相关数值统计最常用的方法之一,阿里著名的开源限流保护组件Sentinel,其QPS统计内核使用的就是滑动窗口。
读者可以根据本文的原理说明和伪代码步骤,对照查阅Sentinel中com.alibaba.csp.sentinel.slots.statistic包的滑动窗口代码实现。
google 搜索:滑动窗口 阿里 sentinal
https://juejin.cn/post/6886061044788264974
2.2 滑动窗口实现原理
2.2.1 ArrayMetric -- 滑动窗口的核心实现类
ArrayMetric 是一个包装类,真正实现数据统计的是 LeapArray
可以看到 ArrayMetric 是构造函数中初始化的时候就是为了初始化 LeapArray,有两个核心的参数
- sampleCount 样本数
- intervalInMs 采样周期
/** * The basic metric class in Sentinel using a {@link BucketLeapArray} internal. * 使用 BucketLeapArray 来实现Sentinel数据统计 * * @author jialiang.linjl * @author Eric Zhao */ public class ArrayMetric implements Metric { private final LeapArray<MetricBucket> data; public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } // enableOccupy: 是否允许抢占,即当前时间戳已经达到限制后,是否可以占用下一个时间窗口的容量,这里对应 LeapArray 的两个实现类,如果允许抢占,则为 OccupiableBucketLeapArray,否则为 BucketLeapArray public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) { if (enableOccupy) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } else { this.data = new BucketLeapArray(sampleCount, intervalInMs); } } ... }
2.2.2 LeapArray -- 滑动窗口的顶层数据结构
LeapArray: 用滑动窗口数据结构来统计实时的秒级指标数据,其中包含了滑动窗口的基本信息:大小,窗口个数,窗口内容,每个窗口就是一个统计单位
2.2.2.1 LeapArray 核心属性和构造方法
public abstract class LeapArray<T> {
protected int windowLengthInMs; // 每一个窗口的时间间隔,单位为毫秒。
protected int sampleCount; // 样本数,就一个统计时间间隔中包含的滑动窗口个数,在 intervalInMs 相同的情况下,sampleCount 越多,抽样的统计数据就越精确,相应的需要的内存也越多。
protected int intervalInMs; // 采样周期 ms为单位
private double intervalInSecond; // 采样周期 s为单位
protected final AtomicReferenceArray<WindowWrap<T>> array; // 一个统计时间间隔中滑动窗口的数组,从这里也可以看出,一个滑动窗口就是使用的 WindowWrap< MetricBucket > 来表示。
/**
* The conditional (predicate) update lock is used only when current bucket is deprecated.
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* The total bucket count is: {@code sampleCount = intervalInMs / windowLengthInMs}.
*
* @param sampleCount bucket count of the sliding window
* @param intervalInMs the total time interval of this {@link LeapArray} in milliseconds
*/
public LeapArray(int sampleCount, int intervalInMs) {
AssertUtil.isTrue(sampleCount > 0, "bucket count is invalid: " + sampleCount);
AssertUtil.isTrue(intervalInMs > 0, "total time interval of the sliding window should be positive");
AssertUtil.isTrue(intervalInMs % sampleCount == 0, "time span needs to be evenly divided");
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.intervalInSecond = intervalInMs / 1000.0;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
}
public class WindowWrap<T> { #窗口数据结构
//Time length of a single window bucket in milliseconds. --- 该窗口的长度
private final long windowLengthInMs;
//Start timestamp of the window in milliseconds. --- 该窗口的起始时间
private long windowStart;
//Statistic data. 统计信息
private T value;
/**
* @param windowLengthInMs a single window bucket's time length in milliseconds.
* @param windowStart the start timestamp of the window
* @param value statistic data
*/
public WindowWrap(long windowLengthInMs, long windowStart, T value) {
this.windowLengthInMs = windowLengthInMs;
this.windowStart = windowStart;
this.value = value;
}
}
2.2.2.2 根据当前时间获取滑动窗口
/** * Get bucket item at provided timestamp. * 根据指定的时间戳获取对应的窗口 * * @param timeMillis a valid timestamp in milliseconds * @return current bucket item at provided timestamp if the time is valid; null if time is invalid */ public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 根据当前时间计算出当前时间属于那个滑动窗口的数组下标 int idx = calculateTimeIdx(timeMillis); // 计算当前窗口的起始时间 long windowStart = calculateWindowStart(timeMillis); /* * 根据下标在环形数组中获取滑动窗口. * * (1) 如果指定下标的窗口不存在, 创建一个新的窗口并通过CAS赋值到数组的指定下标位置. * (2) 如果指定下标的窗口存在,并且该窗口的开始时间等于计算出来的windowStart,返回当前的窗口. * (3) 如果指定下标的窗口存在,但是该窗口的开始时间小于计算出来的windowStart,证明是上一圈已经用过的过期的窗口,则重置当前的窗口数据 * (4) 如果指定下标的窗口存在,但是该窗口的开始时间大于刚刚算出来的开始时间,理论上不应该出现这种情况。 */ while (true) { // 死循环查找当前的时间窗口,这里之所有需要循环,是因为可能多个线程都在获取当前时间窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }