前言
Sentinel
的核心功能之一是流量統計,例如我們常用的指標QPS,當前線程數等。上一篇文章中我們已經大致提到了提供數據統計功能的Slot(StatisticSlot)
,StatisticSlot
在Sentinel
的整個體系中扮演了一個非常重要的角色,后續的一系列操作(限流,熔斷)等都依賴於StatisticSlot
所統計出的數據。
本文所要討論的重點就是StatisticSlot
是如何做的流量統計?
其實在之前介紹常用限流算法[常用限流算法](https://www.jianshu.com/p/9edebaa446d3)的時候已經有提到過一個算法滑動窗口限流
,該算法的滑動窗口原理其實跟Sentinel
所提供的流量統計原理是一樣的,都是基於時間窗口的滑動統計
回到StatisticSlot
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
...
// 當前請求線程數加一
node.increaseThreadNum();
// 新增請求數
node.addPassRequest(count);
...
}
可以看到StatisticSlot
主要統計了兩種類型的數據
-
線程數
-
請求數(QPS)
對於線程數的統計比較簡單,通過內部維護一個LongAdder
來進行當前線程數的統計,每進入一個請求加1,每釋放一個請求減1,從而得到當前的線程數
對於請求數QPS的統計則相對比較復雜,其中有用到滑動窗口原理(也是本文的重點),下面根據源碼來深入的分析
DefaultNode和StatisticNode
public void addPassRequest(int count) {
// 調用父類(StatisticNode)來進行統計
super.addPassRequest(count);
// 根據clusterNode 匯總統計(背后也是調用父類StatisticNode)
this.clusterNode.addPassRequest(count);
}
最終都是調用了父類StatisticNode
的addPassRequest
方法
/**
* 按秒統計,分成兩個窗口,每個窗口500ms,用來統計QPS
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* 按分鍾統計,分成60個窗口,每個窗口 1000ms
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
代碼比較簡單,可以知道內部是調用了ArrayMetric
的addPass
方法來統計的,並且統計了兩種不同時間維度的數據(秒級和分鍾級)
ArrayMetric
private final LeapArray<MetricBucket> data;
public ArrayMetric(int sampleCount, int intervalInMs) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
}
public ArrayMetric(int sampleCount, int intervalInMs, boolean enableOccupy) {
if (enableOccupy) {
this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs);
} else {
this.data = new BucketLeapArray(sampleCount, intervalInMs);
}
}
public void addPass(int count) {
// 1\. 獲取當前窗口
WindowWrap<MetricBucket> wrap = data.currentWindow();
// 2\. 當前窗口加1
wrap.value().addPass(count);
}
ArrayMetric
其實也是一個包裝類,內部通過實例化LeapArray
的對應實現類,來實現具體的統計邏輯,LeapArray
是一個抽象類,OccupiableBucketLeapArray
和BucketLeapArray
都是其具體的實現類
OccupiableBucketLeapArray
在1.5版本之后才被引入,主要是為了解決一些高優先級的請求在限流觸發的時候也能通過(通過占用未來時間窗口的名額來實現) 也是默認使用的LeapArray實現類
而統計的邏輯也比較清楚,分成了兩步:
-
定位到當前窗口
-
獲取到當前窗口
WindowWrap
的MetricBucket
並執行addPass
邏輯
這里我們先看下第二步中的MetricBucket
類,看看它做了哪些事情
MetricBucket
/**
* 存放當前窗口各種類型的統計值(類型包括 PASS BLOCK EXCEPTION 等)
*/
private final LongAdder[] counters;
public MetricBucket() {
MetricEvent[] events = MetricEvent.values();
this.counters = new LongAdder[events.length];
for (MetricEvent event : events) {
counters[event.ordinal()] = new LongAdder();
}
initMinRt();
}
// 統計pass數
public void addPass(int n) {
add(MetricEvent.PASS, n);
}
// 統計可占用的pass數
public void addOccupiedPass(int n) {
add(MetricEvent.OCCUPIED_PASS, n);
}
// 統計異常數
public void addException(int n) {
add(MetricEvent.EXCEPTION, n);
}
// 統計block數
public void addBlock(int n) {
add(MetricEvent.BLOCK, n);
}
....
MetricBucket通過定義了一個LongAdder
數組來存儲不同類型的流量統計值,具體的類型則都定義在MetricEvent
枚舉中。
執行addPass
方法對應LongAdder
數組索引下表為0的值遞增
下面再來看下data.currentWindow()
的內部邏輯
OccupiableBucketLeapArray
OccupiableBucketLeapArray
繼承了抽象類LeapArray
,核心邏輯也是在LeapArray
中
/**
* 時間窗口大小 單位ms
*/
protected int windowLengthInMs;
/**
* 切分的窗口數
*/
protected int sampleCount;
/**
* 統計的時間間隔 intervalInMs = windowLengthInMs * sampleCount
*/
protected int intervalInMs;
/**
* 窗口數組 數組大小 = sampleCount
*/
protected final AtomicReferenceArray<WindowWrap<T>> array;
/**
* update lock 更新窗口時需要上鎖
*/
private final ReentrantLock updateLock = new ReentrantLock();
/**
* @param sampleCount 需要划分的窗口數
* @param intervalInMs 間隔的統計時間
*/
public LeapArray(int sampleCount, int intervalInMs) {
this.windowLengthInMs = intervalInMs / sampleCount;
this.intervalInMs = intervalInMs;
this.sampleCount = sampleCount;
this.array = new AtomicReferenceArray<>(sampleCount);
}
/**
* 獲取當前窗口
*/
public WindowWrap<T> currentWindow() {
return currentWindow(TimeUtil.currentTimeMillis());
}
以上需要着重理解的是幾個參數的含義:
-
sampleCount 定義的窗口的數
-
intervalInMs 統計的時間間隔
-
windowLengthInMs 每個窗口的時間大小 = intervalInMs / sampleCount
sampleCount
比較好理解,就是需要定義幾個窗口(默認秒級統計維度的話是兩個窗口),intervalInMs
指的就是我們需要統計的時間間隔,例如我們統計QPS的話那就是1000ms,windowLengthInMs
指的每個窗口的大小,是由intervalInMs
除以sampleCount
得來
類似下圖
理解了上訴幾個參數的含義后,我們直接進入到LeapArray
的currentWindow(long time)
方法中去看看具體的實現
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 根據當前時間戳計算當前所屬的窗口數組索引下標
int idx = calculateTimeIdx(timeMillis);
// 計算當前窗口的開始時間戳
long windowStart = calculateWindowStart(timeMillis);
/*
* 從窗口數組中獲取當前窗口項,分為三種情況
*
* (1) 當前窗口為空還未創建,則初始化一個
* (2) 當前窗口的開始時間和上面計算出的窗口開始時間一致,表明當前窗口還未過期,直接返回當前窗口
* (3) 當前窗口的開始時間 小於 上面計算出的窗口開始時間,表明當前窗口已過期,需要替換當前窗口
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
/*
* 第一種情況,新建一個窗口項
*/
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
// Successfully updated, return the created bucket.
return window;
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
/*
* 第二種情況 直接返回
*/
return old;
} else if (windowStart > old.windowStart()) {
/*
* 第三種情況 替換窗口
*/
if (updateLock.tryLock()) {
try {
// Successfully get the update lock, now we reset the bucket.
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
// Contention failed, the thread will yield its time slice to wait for bucket available.
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// 第四種情況,講道理不會走到這里
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
/**
* 根據當前時間戳計算當前所屬的窗口數組索引下標
*/
private int calculateTimeIdx(/*@Valid*/ long timeMillis) {
long timeId = timeMillis / windowLengthInMs;
return (int)(timeId % array.length());
}
/**
* 計算當前窗口的開始時間戳
*/
protected long calculateWindowStart(/*@Valid*/ long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
上面的方法就是整個滑動窗口邏輯的核心代碼,注釋其實也寫的比較清晰了,簡單概括下可以分為以下幾步:
-
根據當前時間戳 和 窗口數組大小 獲取到當前的窗口數組索引下標
idx
,如果窗口數是2,那其實idx
只有兩種值(0 或 1) -
根據當前時間戳(
windowStart
) 計算得到當前窗口的開始時間戳值。通過calculateWindowStart
計算來得到,這個方法還蠻有意思的,通過當前時間戳和窗口時間大小取余來得到 與當前窗口開始時間的 偏移量。比我用定時任務實現高級多了 ... 😆 可以去對比一下我之前文章中的蠢實現 [滑動窗口算法定時任務實現](https://github.com/WangJunnan/learn/blob/master/algorithm/src/main/java/com/walm/learn/algorithm/ratelimit/SlidingWindowRateLimit.java) -
然后就是根據上面得到的兩個值 來獲取當前時間窗口,這里其實又分為三種情況
-
當前窗口為空還未創建,則初始化一個
-
當前窗口的開始時間和上面計算出的窗口開始時間(
windowStart
)一致,表明當前窗口還未過期,直接返回當前窗口 -
當前窗口的開始時間 小於 上面計算出的窗口(
windowStart
)開始時間,表明當前窗口已過期,需要替換當前窗口
總結
總的來說,currentWindow
方法的實現還是非常巧妙的,因為我在看Sentinel
的源碼前也寫過一篇限流算法的文章,恰好其中也實現過一個滑動窗口限流算法,不過相比於Sentinel
的實現,我用了定時任務去做窗口的切換更新,顯然性能上更差,實現的也不優雅,大家也可以去對比一下。[常用限流算法](https://www.jianshu.com/p/9edebaa446d3)
Sentinel系列