Sentinel源碼解析三(滑動窗口流量統計)


前言

Sentinel的核心功能之一是流量統計,例如我們常用的指標QPS,當前線程數等。上一篇文章中我們已經大致提到了提供數據統計功能的Slot(StatisticSlot)StatisticSlotSentinel的整個體系中扮演了一個非常重要的角色,后續的一系列操作(限流,熔斷)等都依賴於StatisticSlot所統計出的數據。

本文所要討論的重點就是StatisticSlot是如何做的流量統計?

其實在之前介紹常用限流算法[常用限流算法](https://www.jianshu.com/p/9edebaa446d3)的時候已經有提到過一個算法滑動窗口限流,該算法的滑動窗口原理其實跟Sentinel所提供的流量統計原理是一樣的,都是基於時間窗口的滑動統計

回到StatisticSlot


public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,

 boolean prioritized, Object... args) throws Throwable {

...

// 當前請求線程數加一

node.increaseThreadNum();

// 新增請求數

node.addPassRequest(count);

...

}

可以看到StatisticSlot主要統計了兩種類型的數據

  1. 線程數

  2. 請求數(QPS)

對於線程數的統計比較簡單,通過內部維護一個LongAdder來進行當前線程數的統計,每進入一個請求加1,每釋放一個請求減1,從而得到當前的線程數

對於請求數QPS的統計則相對比較復雜,其中有用到滑動窗口原理(也是本文的重點),下面根據源碼來深入的分析

DefaultNode和StatisticNode


public void addPassRequest(int count) {

  // 調用父類(StatisticNode)來進行統計

 super.addPassRequest(count);

  // 根據clusterNode 匯總統計(背后也是調用父類StatisticNode)

 this.clusterNode.addPassRequest(count);

}

最終都是調用了父類StatisticNodeaddPassRequest方法


/**

* 按秒統計,分成兩個窗口,每個窗口500ms,用來統計QPS

 */

private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,

 IntervalProperty.INTERVAL);

/**

* 按分鍾統計,分成60個窗口,每個窗口 1000ms

 */

private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);

public void addPassRequest(int count) {

 rollingCounterInSecond.addPass(count);

 rollingCounterInMinute.addPass(count);

}

代碼比較簡單,可以知道內部是調用了ArrayMetricaddPass方法來統計的,並且統計了兩種不同時間維度的數據(秒級和分鍾級)

ArrayMetric


private final LeapArray<MetricBucket> data;

public ArrayMetric(int sampleCount, int intervalInMs) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

}

public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {

 if (enableOccupy) {

 this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);

 } else {

 this.data = new BucketLeapArray(sampleCount, intervalInMs);

 }

}

public void addPass(int count) {

  // 1\. 獲取當前窗口

 WindowWrap<MetricBucket> wrap = data.currentWindow();

  // 2\. 當前窗口加1

 wrap.value().addPass(count);

}

ArrayMetric其實也是一個包裝類,內部通過實例化LeapArray的對應實現類,來實現具體的統計邏輯,LeapArray是一個抽象類,OccupiableBucketLeapArrayBucketLeapArray都是其具體的實現類

OccupiableBucketLeapArray在1.5版本之后才被引入,主要是為了解決一些高優先級的請求在限流觸發的時候也能通過(通過占用未來時間窗口的名額來實現) 也是默認使用的LeapArray實現類

而統計的邏輯也比較清楚,分成了兩步:

  1. 定位到當前窗口

  2. 獲取到當前窗口WindowWrapMetricBucket並執行addPass邏輯

這里我們先看下第二步中的MetricBucket類,看看它做了哪些事情

MetricBucket


/**

 * 存放當前窗口各種類型的統計值(類型包括 PASS BLOCK EXCEPTION 等)

 */

private final LongAdder[] counters;

public MetricBucket() {

 MetricEvent[] events = MetricEvent.values();

 this.counters = new LongAdder[events.length];

 for (MetricEvent event : events) {

 counters[event.ordinal()] = new LongAdder();

 }

 initMinRt();

}

// 統計pass數

public void addPass(int n) {

 add(MetricEvent.PASS, n);

}

// 統計可占用的pass數

public void addOccupiedPass(int n) {

 add(MetricEvent.OCCUPIED_PASS, n);

}

// 統計異常數

public void addException(int n) {

 add(MetricEvent.EXCEPTION, n);

}

// 統計block數

public void addBlock(int n) {

 add(MetricEvent.BLOCK, n);

}

....

MetricBucket通過定義了一個LongAdder數組來存儲不同類型的流量統計值,具體的類型則都定義在MetricEvent枚舉中。

執行addPass方法對應LongAdder數組索引下表為0的值遞增

下面再來看下data.currentWindow()的內部邏輯

OccupiableBucketLeapArray

OccupiableBucketLeapArray繼承了抽象類LeapArray,核心邏輯也是在LeapArray


/**

* 時間窗口大小  單位ms

 */

protected int windowLengthInMs;

/**

* 切分的窗口數

 */

protected int sampleCount;

/**

 * 統計的時間間隔 intervalInMs = windowLengthInMs * sampleCount

 */ 

protected int intervalInMs;

/**

 * 窗口數組 數組大小 = sampleCount

 */

protected final AtomicReferenceArray<WindowWrap<T>> array;

/**

 * update lock 更新窗口時需要上鎖

 */

private final ReentrantLock updateLock = new ReentrantLock();

/**

 * @param sampleCount 需要划分的窗口數

 * @param intervalInMs 間隔的統計時間

 */

public LeapArray(int sampleCount, int intervalInMs) {

 this.windowLengthInMs = intervalInMs / sampleCount;

 this.intervalInMs = intervalInMs;

 this.sampleCount = sampleCount;

 this.array = new AtomicReferenceArray<>(sampleCount);

}

/**

* 獲取當前窗口

 */

public WindowWrap<T> currentWindow() {

 return currentWindow(TimeUtil.currentTimeMillis());

}

以上需要着重理解的是幾個參數的含義:

  1. sampleCount 定義的窗口的數

  2. intervalInMs 統計的時間間隔

  3. windowLengthInMs 每個窗口的時間大小 = intervalInMs / sampleCount

sampleCount 比較好理解,就是需要定義幾個窗口(默認秒級統計維度的話是兩個窗口),intervalInMs 指的就是我們需要統計的時間間隔,例如我們統計QPS的話那就是1000ms,windowLengthInMs 指的每個窗口的大小,是由intervalInMs除以sampleCount得來

類似下圖

理解了上訴幾個參數的含義后,我們直接進入到LeapArraycurrentWindow(long time)方法中去看看具體的實現


public WindowWrap<T> currentWindow(long timeMillis) {

 if (timeMillis < 0) {

 return null;

 }

  // 根據當前時間戳計算當前所屬的窗口數組索引下標

 int idx = calculateTimeIdx(timeMillis);

  // 計算當前窗口的開始時間戳

 long windowStart = calculateWindowStart(timeMillis);

 /*

 * 從窗口數組中獲取當前窗口項,分為三種情況

 *

 * (1) 當前窗口為空還未創建,則初始化一個

 * (2) 當前窗口的開始時間和上面計算出的窗口開始時間一致,表明當前窗口還未過期,直接返回當前窗口

 * (3) 當前窗口的開始時間  小於  上面計算出的窗口開始時間,表明當前窗口已過期,需要替換當前窗口

 */

 while (true) {

 WindowWrap<T> old = array.get(idx);

 if (old == null) {

 /*

 * 第一種情況,新建一個窗口項

 */

 WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 if (array.compareAndSet(idx, null, window)) {

  // Successfully updated, return the created bucket.

 return window;

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart == old.windowStart()) {

 /*

 * 第二種情況 直接返回

 */

 return old;

 } else if (windowStart > old.windowStart()) {

 /*

 * 第三種情況 替換窗口

 */

 if (updateLock.tryLock()) {

 try {

  // Successfully get the update lock, now we reset the bucket.

 return resetWindowTo(old, windowStart);

 } finally {

 updateLock.unlock();

 }

 } else {

  // Contention failed, the thread will yield its time slice to wait for bucket available.

 Thread.yield();

 }

 } else if (windowStart < old.windowStart()) {

  // 第四種情況,講道理不會走到這里

 return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));

 }

 }

}

/**

* 根據當前時間戳計算當前所屬的窗口數組索引下標

 */

private int calculateTimeIdx(/*@Valid*/ long timeMillis) {

 long timeId = timeMillis / windowLengthInMs;

 return (int)(timeId % array.length());

}

/**

* 計算當前窗口的開始時間戳

 */

protected long calculateWindowStart(/*@Valid*/ long timeMillis) {

 return timeMillis - timeMillis % windowLengthInMs;

}

上面的方法就是整個滑動窗口邏輯的核心代碼,注釋其實也寫的比較清晰了,簡單概括下可以分為以下幾步:

  1. 根據當前時間戳 和 窗口數組大小 獲取到當前的窗口數組索引下標idx,如果窗口數是2,那其實idx只有兩種值(0 或 1)

  2. 根據當前時間戳(windowStart) 計算得到當前窗口的開始時間戳值。通過calculateWindowStart計算來得到,這個方法還蠻有意思的,通過當前時間戳和窗口時間大小取余來得到 與當前窗口開始時間的 偏移量。比我用定時任務實現高級多了 ... 😆 可以去對比一下我之前文章中的蠢實現 [滑動窗口算法定時任務實現](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/SlidingWindowRateLimit.java)

  3. 然后就是根據上面得到的兩個值 來獲取當前時間窗口,這里其實又分為三種情況

  • 當前窗口為空還未創建,則初始化一個

  • 當前窗口的開始時間和上面計算出的窗口開始時間(windowStart)一致,表明當前窗口還未過期,直接返回當前窗口

  • 當前窗口的開始時間 小於 上面計算出的窗口(windowStart)開始時間,表明當前窗口已過期,需要替換當前窗口

總結

總的來說,currentWindow方法的實現還是非常巧妙的,因為我在看Sentinel的源碼前也寫過一篇限流算法的文章,恰好其中也實現過一個滑動窗口限流算法,不過相比於Sentinel的實現,我用了定時任務去做窗口的切換更新,顯然性能上更差,實現的也不優雅,大家也可以去對比一下。[常用限流算法](https://www.jianshu.com/p/9edebaa446d3)

Sentinel系列


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM