前言
在講解滑動時間窗之前,有個問題可以思考一下,如何統計當前時間服務的QPS呢?博主在前公司的時候,他們是這么設計的,在分布式環境下,列如當前時間 2021-9-13 23:12:10, 那么解析成key = 2021-9-13-23-12-10,並通過這個key查詢redis,獲取一個整型的統計值,並執行incr自增。當到下一個時間2021-9-13 23:12:11,則生成key=2021-9-13-23-12-11.從redis中獲取一個新的計數器,並incr。
這有什么問題?
1) 隨着時間的增長,redis中的key越來越多
2)23時12分10秒600毫秒 與 23時12分11秒600毫秒 這1秒期間,QPS的值將無法統計。
滑動時間窗算法
1 我們把當前的時間看做一個無限延長的有向軸,並且分隔成一個個固定單位,這個固定單位成為時間窗,可以是1s,也可以是1min。為了便於統計一分鍾內或者1秒鍾內的QPS,將單位時間窗划分為多個樣本時間窗。
2 划分樣本時間窗的目的是 為了避免重復統計,列如當時間運行到樣本窗7 ,則需要統計樣本窗4-7。 下次如果當前時間到了樣本窗8,則需要統計5-8,而中間的5-7已經統計過了,只需要再加上樣本窗8中的數據。
3 在sentinel中,緩存了一個單位時間窗的統計數據,並組織成一個環,實際上就是通過對樣本窗口數取余來確定請求落在哪個樣本窗上。
4 使用了滑動時間窗算法之后,獲取的QPS永遠統計的是當前時間之前的一個單位時間窗的統計結果,也解決了在前言中的問題2.
源碼解析
整個Sentinel是按着責任鏈模式來組織架構的,在chain上的slot,StatisticSlot#entry里面,先根據之前的統計執行下面的slot,比如FlowSlot,根據流控結果來統計當前請求是否成功或者失敗。
// Do some checking. // 先做其他的slot fireEntry(context, resourceWrapper, node, count, prioritized, args); // Request passed, add thread count and pass count. node.increaseThreadNum(); // 對當前的資源DefaultNode添加統計 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node. context.getCurEntry().getOriginNode().increaseThreadNum(); // 對所有相同資源和相同來源。不同的context上的同一個originNode做統計 context.getCurEntry().getOriginNode().addPassRequest(count); }
統計的維度有這么幾個,1 對於當前請求在指定資源上的DefaultNode的統計 2指定資源上上全局的clusterNode的統計。代碼如下node.addPassRequest(count)
@Override public void addPassRequest(int count) { // 增加當前入口的defaultNode中的統計數據 super.addPassRequest(count); // 增加當前資源的clusterNode中全局統計數據 this.clusterNode.addPassRequest(count); }
3 對於相同資源的全局CluserNode在指定來源上的統計 ,比如 全局統計資源A在來源A上的QPS ,代碼 context.getCurEntry().getOriginNode().addPassRequest(count);
@Override public void addPassRequest(int count) { // 為滑動計數器增加本次訪問的計數器 rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
入口已經講解完畢,接下來具體從闡述數據結構及算法核心邏輯。
在StatisticsNode里面有2個重要的計量器,一個是單位時間窗以秒為單位,一個單位時間窗是分為單位
/** * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans * by given {@code sampleCount}. * 定義一個使用數組保存數據的計量器 */ private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT /* 樣本窗口數量 默認2 */, IntervalProperty.INTERVAL /* 時間窗長度 默認1s */); /** * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds, * meaning each bucket per second, in this way we can get accurate statistics of each second. */ private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
在講解之前可以看一下數據結構是如何設計的,實際上一個計量器內部維護這一個數組,即單位時間窗,元素為一個樣本窗口對象windowWrap,一個樣本窗口維護這一個MetricBucket類型的值變量.
數據結構
addPass
public void addPass(int count) { // 獲取當前時間點所在的樣本窗口 WindowWrap<MetricBucket> wrap = data.currentWindow(); // 在樣本窗中增加count wrap.value().addPass(count); }
這里面最關鍵是如何根據當前時間來獲取在單位時間窗內的樣本窗,先計算出當前時間在單位時間窗內的樣本窗位置.
// 計算當前時間在哪個樣本窗口 private int calculateTimeIdx(/*@Valid*/ long timeMillis) { long timeId = timeMillis / windowLengthInMs; // Calculate current index so we can map the timestamp to the leap array. // 一個時間窗組成一個環,該環由多個樣本窗口組成 return (int)(timeId % array.length()); }
然后根據idx來獲取到樣本窗對象,比較樣本窗開始時間來確認是否過時
public WindowWrap<T> currentWindow(long timeMillis) { if (timeMillis < 0) { return null; } // 計算當前時間所在的樣本窗口ID,即在計算數組leapArray中的索引 // 這個idx不會超過樣本數量 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. // 獲取當前時間對應的樣本窗口的起始時間戳 long windowStart = calculateWindowStart(timeMillis); /* * Get bucket item at given time from the array. * * (1) Bucket is absent, then just create a new bucket and CAS update to circular array. * (2) Bucket is up-to-date, then just return the bucket. * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets. */ while (true) { // 獲取到當前時間所在的樣本窗口 WindowWrap<T> old = array.get(idx); if (old == null) { /* * B0 B1 B2 NULL B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * bucket is empty, so create new and update * * If the old bucket is absent, then we create a new bucket at {@code windowStart}, * then try to update circular array via a CAS operation. Only one thread can * succeed to update, while other threads yield its time slice. */ WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); if (array.compareAndSet(idx, null, window)) { // Successfully updated, return the created bucket. return window; } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart == old.windowStart()) { /* * B0 B1 B2 B3 B4 * ||_______|_______|_______|_______|_______||___ * 200 400 600 800 1000 1200 timestamp * ^ * time=888 * startTime of Bucket 3: 800, so it's up-to-date * * If current {@code windowStart} is equal to the start timestamp of old bucket, * that means the time is within the bucket, so directly return the bucket. */ return old; } else if (windowStart > old.windowStart()) { // 在idx上的樣本時間窗可能已經老了,可能是上幾圈的idx位置,本線程的windowStart 已經到了后面圈的樣本時間窗了 /* * (old) * B0 B1 B2 NULL B4 * |_______||_______|_______|_______|_______|_______||___ * ... 1200 1400 1600 1800 2000 2200 timestamp * ^ * time=1676 * startTime of Bucket 2: 400, deprecated, should be reset * * If the start timestamp of old bucket is behind provided time, that means * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}. * Note that the reset and clean-up operations are hard to be atomic, * so we need a update lock to guarantee the correctness of bucket update. * * The update lock is conditional (tiny scope) and will take effect only when * bucket is deprecated, so in most cases it won't lead to performance loss. */ if (updateLock.tryLock()) { try { // Successfully get the update lock, now we reset the bucket. return resetWindowTo(old, windowStart); } finally { updateLock.unlock(); } } else { // Contention failed, the thread will yield its time slice to wait for bucket available. Thread.yield(); } } else if (windowStart < old.windowStart()) { // 這種情況一般不會出現 除非人為改動 // Should not go through here, as the provided time is already behind. return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis)); } } }
這時候已經完成了統計工作,那么在流量控制FlowSlot中是如何的實現QPS查詢呢?
FlowSlot#entry -> FlowRuleChecker#checkFlow -> FlowRuleChecker#canPassCheck -> DefaultController#canPass -> DefaultController#avgUsedTokens -> node.passQps()
獲取計量數中的通過數 除以 單位時間窗長度
@Override public double passQps() { return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec(); }
接下來看一下如何獲取單位時間窗內的請求量
實際上就是把有效的樣本時間窗內的各個統計量相加
@Override public long pass() { // 更新array中當前時間點所在的樣本窗口實例中的數據 data.currentWindow(); long pass = 0; List<MetricBucket> list = data.values(); // 將當前時間窗中的所有樣本窗口統計的值求和 for (MetricBucket window : list) { pass += window.pass(); } return pass; }
在拿樣本時間窗的時候,需要拿有效的樣本,不能包含過時的
public List<T> values(long timeMillis) { if (timeMillis < 0) { return new ArrayList<T>(); } int size = array.length(); List<T> result = new ArrayList<T>(size); for (int i = 0; i < size; i++) { WindowWrap<T> windowWrap = array.get(i); // 判斷樣本窗是否過時: 當前時間-windowsStart > intervalInms if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) { // 有可能這個時間窗口 沒有請求,在一個時間窗的時候 發現這個樣本窗已經過時 continue; } result.add(windowWrap.value()); } return result; }
LeapArray有2個實現類,分別是OccupiableBucketLeapArray (單位時間窗為1s)和 BucketLeapArray(單位時間窗為1min),那么這兩個實現類有什么區別的。實際上對於OccupiableBucketLeapArray,如果是優先級較高,同時流控的時候超過了配置的閾值,那么可以等待一定的時間,使用未來的配額,保證通過流控。
DefaultController#canPass()
@Override public boolean canPass(Node node, int acquireCount, boolean prioritized) { int curCount = avgUsedTokens(node); if (curCount + acquireCount > count) { // 被流控 // 如果流控策略是基於QPS 並且優先級較高 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { // 添加占用未來時間對應的樣本窗的pass配額 node.addWaitingRequest(currentTime + waitInMs, acquireCount); // 統計 OccupiedPass node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
那么如何確定等待的時間呢?
在上面的代碼中有個比較 waitInMs < OccupyTimeoutProperty.getOccupyTimeout(),這說明就是因為當前樣本窗的pass數量太多造成的,這個時間只要sleep 剩余樣本窗是的時間就可以了。
而等待時間超過500的情況下,說明之前的樣本窗就已經超過閾值了,這個時候不允許通過流量控制,返回false。
public long tryOccupyNext(long currentTime, int acquireCount, double threshold) { double maxCount = threshold * IntervalProperty.INTERVAL / 1000; // 0 long currentBorrow = rollingCounterInSecond.waiting(); if (currentBorrow >= maxCount) { return OccupyTimeoutProperty.getOccupyTimeout(); } // 500 int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT; /** * 這個算法 是獲取當前時間對應的樣本窗的結束時間 往前推一個單位時間窗 來獲取到一個能統計到當前時間對應的樣本窗的最早時間 */ long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL; int idx = 0; /* * Note: here {@code currentPass} may be less than it really is NOW, because time difference * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may * lead more tokens be borrowed. */ // 當前時間單位窗口的統計數 long currentPass = rollingCounterInSecond.pass(); /** * 這塊邏輯很好理解: * 從最早時間 到 當前時間為止 * 初始等待時間為 當前時間對應的樣本窗,剩余的時間長度 * 如果去掉最早樣本窗的pass數之后,發現仍然大於閾值 ,則等待時間數 + 樣本時間窗長度 */ while (earliestTime < currentTime) { long waitInMs = idx * windowLength + windowLength - currentTime % windowLength; if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) { break; } long windowPass = rollingCounterInSecond.getWindowPass(earliestTime); if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) { return waitInMs; } earliestTime += windowLength; currentPass -= windowPass; idx++; } return OccupyTimeoutProperty.getOccupyTimeout(); }