詳解Sentinel中的滑動時間窗算法完成數據QPS統計


前言

在講解滑動時間窗之前,有個問題可以思考一下,如何統計當前時間服務的QPS呢?博主在前公司的時候,他們是這么設計的,在分布式環境下,列如當前時間 2021-9-13 23:12:10, 那么解析成key = 2021-9-13-23-12-10,並通過這個key查詢redis,獲取一個整型的統計值,並執行incr自增。當到下一個時間2021-9-13 23:12:11,則生成key=2021-9-13-23-12-11.從redis中獲取一個新的計數器,並incr。

這有什么問題?

1) 隨着時間的增長,redis中的key越來越多 

2)23時12分10秒600毫秒  與  23時12分11秒600毫秒 這1秒期間,QPS的值將無法統計。

 

滑動時間窗算法 

1 我們把當前的時間看做一個無限延長的有向軸,並且分隔成一個個固定單位,這個固定單位成為時間窗,可以是1s,也可以是1min。為了便於統計一分鍾內或者1秒鍾內的QPS,將單位時間窗划分為多個樣本時間窗。

2  划分樣本時間窗的目的是 為了避免重復統計,列如當時間運行到樣本窗7 ,則需要統計樣本窗4-7。 下次如果當前時間到了樣本窗8,則需要統計5-8,而中間的5-7已經統計過了,只需要再加上樣本窗8中的數據。

3  在sentinel中,緩存了一個單位時間窗的統計數據,並組織成一個環,實際上就是通過對樣本窗口數取余來確定請求落在哪個樣本窗上。

4 使用了滑動時間窗算法之后,獲取的QPS永遠統計的是當前時間之前的一個單位時間窗的統計結果,也解決了在前言中的問題2.

 

 

 

源碼解析

整個Sentinel是按着責任鏈模式來組織架構的,在chain上的slot,StatisticSlot#entry里面,先根據之前的統計執行下面的slot,比如FlowSlot,根據流控結果來統計當前請求是否成功或者失敗。

// Do some checking.
            // 先做其他的slot
            fireEntry(context, resourceWrapper, node, count, prioritized, args);

            // Request passed, add thread count and pass count.
            node.increaseThreadNum();
            // 對當前的資源DefaultNode添加統計
 node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) {
                // Add count for origin node.
                context.getCurEntry().getOriginNode().increaseThreadNum();
                // 對所有相同資源和相同來源。不同的context上的同一個originNode做統計
 context.getCurEntry().getOriginNode().addPassRequest(count);
            }

統計的維度有這么幾個,1 對於當前請求在指定資源上的DefaultNode的統計   2指定資源上上全局的clusterNode的統計。代碼如下node.addPassRequest(count)

@Override
    public void addPassRequest(int count) {
        // 增加當前入口的defaultNode中的統計數據
        super.addPassRequest(count);
        // 增加當前資源的clusterNode中全局統計數據
        this.clusterNode.addPassRequest(count);
    }

3 對於相同資源的全局CluserNode在指定來源上的統計  ,比如 全局統計資源A在來源A上的QPS ,代碼  context.getCurEntry().getOriginNode().addPassRequest(count);

@Override
    public void addPassRequest(int count) {
        // 為滑動計數器增加本次訪問的計數器
        rollingCounterInSecond.addPass(count);
        rollingCounterInMinute.addPass(count);
    }

入口已經講解完畢,接下來具體從闡述數據結構及算法核心邏輯。

 

在StatisticsNode里面有2個重要的計量器,一個是單位時間窗以秒為單位,一個單位時間窗是分為單位

/**
     * Holds statistics of the recent {@code INTERVAL} milliseconds. The {@code INTERVAL} is divided into time spans
     * by given {@code sampleCount}.
     * 定義一個使用數組保存數據的計量器
     */
    private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT /* 樣本窗口數量 默認2 */,
        IntervalProperty.INTERVAL /* 時間窗長度  默認1s */);

    /**
     * Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
     * meaning each bucket per second, in this way we can get accurate statistics of each second.
     */
    private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

在講解之前可以看一下數據結構是如何設計的,實際上一個計量器內部維護這一個數組,即單位時間窗,元素為一個樣本窗口對象windowWrap,一個樣本窗口維護這一個MetricBucket類型的值變量.

數據結構

 

addPass

    public void addPass(int count) {
        // 獲取當前時間點所在的樣本窗口
        WindowWrap<MetricBucket> wrap = data.currentWindow();
        // 在樣本窗中增加count
        wrap.value().addPass(count);
    }

這里面最關鍵是如何根據當前時間來獲取在單位時間窗內的樣本窗,先計算出當前時間在單位時間窗內的樣本窗位置.

    // 計算當前時間在哪個樣本窗口
    private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
        long timeId = timeMillis / windowLengthInMs;
        // Calculate current index so we can map the timestamp to the leap array.
        //  一個時間窗組成一個環,該環由多個樣本窗口組成
        return (int)(timeId % array.length());
    }

然后根據idx來獲取到樣本窗對象,比較樣本窗開始時間來確認是否過時

    public WindowWrap<T> currentWindow(long timeMillis) {
        if (timeMillis < 0) {
            return null;
        }

        // 計算當前時間所在的樣本窗口ID,即在計算數組leapArray中的索引
        // 這個idx不會超過樣本數量
        int idx = calculateTimeIdx(timeMillis);
        // Calculate current bucket start time.
        // 獲取當前時間對應的樣本窗口的起始時間戳
        long windowStart = calculateWindowStart(timeMillis);

        /*
         * Get bucket item at given time from the array.
         *
         * (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
         * (2) Bucket is up-to-date, then just return the bucket.
         * (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
         */
        while (true) {
            // 獲取到當前時間所在的樣本窗口
            WindowWrap<T> old = array.get(idx);
            if (old == null) {
                /*
                 *     B0       B1      B2    NULL      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            bucket is empty, so create new and update
                 *
                 * If the old bucket is absent, then we create a new bucket at {@code windowStart},
                 * then try to update circular array via a CAS operation. Only one thread can
                 * succeed to update, while other threads yield its time slice.
                 */
                WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
                if (array.compareAndSet(idx, null, window)) {
                    // Successfully updated, return the created bucket.
                    return window;
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart == old.windowStart()) {
                /*
                 *     B0       B1      B2     B3      B4
                 * ||_______|_______|_______|_______|_______||___
                 * 200     400     600     800     1000    1200  timestamp
                 *                             ^
                 *                          time=888
                 *            startTime of Bucket 3: 800, so it's up-to-date
                 *
                 * If current {@code windowStart} is equal to the start timestamp of old bucket,
                 * that means the time is within the bucket, so directly return the bucket.
                 */
                return old;
            } else if (windowStart > old.windowStart()) {
                // 在idx上的樣本時間窗可能已經老了,可能是上幾圈的idx位置,本線程的windowStart 已經到了后面圈的樣本時間窗了
                /*
                 *   (old)
                 *             B0       B1      B2    NULL      B4
                 * |_______||_______|_______|_______|_______|_______||___
                 * ...    1200     1400    1600    1800    2000    2200  timestamp
                 *                              ^
                 *                           time=1676
                 *          startTime of Bucket 2: 400, deprecated, should be reset
                 *
                 * If the start timestamp of old bucket is behind provided time, that means
                 * the bucket is deprecated. We have to reset the bucket to current {@code windowStart}.
                 * Note that the reset and clean-up operations are hard to be atomic,
                 * so we need a update lock to guarantee the correctness of bucket update.
                 *
                 * The update lock is conditional (tiny scope) and will take effect only when
                 * bucket is deprecated, so in most cases it won't lead to performance loss.
                 */
                if (updateLock.tryLock()) {
                    try {
                        // Successfully get the update lock, now we reset the bucket.
                        return resetWindowTo(old, windowStart);
                    } finally {
                        updateLock.unlock();
                    }
                } else {
                    // Contention failed, the thread will yield its time slice to wait for bucket available.
                    Thread.yield();
                }
            } else if (windowStart < old.windowStart()) {
                // 這種情況一般不會出現  除非人為改動
                // Should not go through here, as the provided time is already behind.
                return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
            }
        }
    }

 

 這時候已經完成了統計工作,那么在流量控制FlowSlot中是如何的實現QPS查詢呢?

FlowSlot#entry -> FlowRuleChecker#checkFlow -> FlowRuleChecker#canPassCheck  ->  DefaultController#canPass -> DefaultController#avgUsedTokens -> node.passQps()

獲取計量數中的通過數 除以 單位時間窗長度

@Override
    public double passQps() {
        return rollingCounterInSecond.pass() / rollingCounterInSecond.getWindowIntervalInSec();
    }

接下來看一下如何獲取單位時間窗內的請求量

實際上就是把有效的樣本時間窗內的各個統計量相加

@Override
    public long pass() {
        // 更新array中當前時間點所在的樣本窗口實例中的數據
        data.currentWindow();
        long pass = 0;
        List<MetricBucket> list = data.values(); // 將當前時間窗中的所有樣本窗口統計的值求和
        for (MetricBucket window : list) {
            pass += window.pass();
        }
        return pass;
    }

在拿樣本時間窗的時候,需要拿有效的樣本,不能包含過時的

 public List<T> values(long timeMillis) {
        if (timeMillis < 0) {
            return new ArrayList<T>();
        }
        int size = array.length();
        List<T> result = new ArrayList<T>(size);

        for (int i = 0; i < size; i++) {
            WindowWrap<T> windowWrap = array.get(i);
            // 判斷樣本窗是否過時: 當前時間-windowsStart > intervalInms
            if (windowWrap == null || isWindowDeprecated(timeMillis, windowWrap)) {
                // 有可能這個時間窗口 沒有請求,在一個時間窗的時候 發現這個樣本窗已經過時
                continue;
            }
            result.add(windowWrap.value());
        }
        return result;
    }

 

LeapArray有2個實現類,分別是OccupiableBucketLeapArray (單位時間窗為1s)和 BucketLeapArray(單位時間窗為1min),那么這兩個實現類有什么區別的。實際上對於OccupiableBucketLeapArray,如果是優先級較高,同時流控的時候超過了配置的閾值,那么可以等待一定的時間,使用未來的配額,保證通過流控。

DefaultController#canPass()

 @Override
    public boolean canPass(Node node, int acquireCount, boolean prioritized) {
        int curCount = avgUsedTokens(node);
        if (curCount + acquireCount > count) { // 被流控
            // 如果流控策略是基於QPS 並且優先級較高
            if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
                long currentTime;
                long waitInMs;
                currentTime = TimeUtil.currentTimeMillis();
                waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                    // 添加占用未來時間對應的樣本窗的pass配額
                    node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                    // 統計 OccupiedPass
                    node.addOccupiedPass(acquireCount);
                    sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                    throw new PriorityWaitException(waitInMs);
                }
            }
            return false;
        }
        return true;
    }

那么如何確定等待的時間呢?

在上面的代碼中有個比較 waitInMs < OccupyTimeoutProperty.getOccupyTimeout(),這說明就是因為當前樣本窗的pass數量太多造成的,這個時間只要sleep 剩余樣本窗是的時間就可以了。

而等待時間超過500的情況下,說明之前的樣本窗就已經超過閾值了,這個時候不允許通過流量控制,返回false。

public long tryOccupyNext(long currentTime, int acquireCount, double threshold) {
        double maxCount = threshold * IntervalProperty.INTERVAL / 1000;
        // 0
        long currentBorrow = rollingCounterInSecond.waiting();
        if (currentBorrow >= maxCount) {
            return OccupyTimeoutProperty.getOccupyTimeout();
        }

        // 500
        int windowLength = IntervalProperty.INTERVAL / SampleCountProperty.SAMPLE_COUNT;
        /**
         * 這個算法  是獲取當前時間對應的樣本窗的結束時間 往前推一個單位時間窗 來獲取到一個能統計到當前時間對應的樣本窗的最早時間
         */
        long earliestTime = currentTime - currentTime % windowLength + windowLength - IntervalProperty.INTERVAL;

        int idx = 0;
        /*
         * Note: here {@code currentPass} may be less than it really is NOW, because time difference
         * since call rollingCounterInSecond.pass(). So in high concurrency, the following code may
         * lead more tokens be borrowed.
         */
        // 當前時間單位窗口的統計數
        long currentPass = rollingCounterInSecond.pass();
        /**
         * 這塊邏輯很好理解:
         * 從最早時間 到 當前時間為止
         * 初始等待時間為  當前時間對應的樣本窗,剩余的時間長度
         * 如果去掉最早樣本窗的pass數之后,發現仍然大於閾值 ,則等待時間數 + 樣本時間窗長度
         */
        while (earliestTime < currentTime) {
            long waitInMs = idx * windowLength + windowLength - currentTime % windowLength;
            if (waitInMs >= OccupyTimeoutProperty.getOccupyTimeout()) {
                break;
            }
            long windowPass = rollingCounterInSecond.getWindowPass(earliestTime);
            if (currentPass + currentBorrow + acquireCount - windowPass <= maxCount) {
                return waitInMs;
            }
            earliestTime += windowLength;
            currentPass -= windowPass;
            idx++;
        }

        return OccupyTimeoutProperty.getOccupyTimeout();
    }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM