在前面搞清楚了Sentinel的使用后,大致理了一下Sentinel的責任鏈,搞清楚了這個,基本就已經梳理清楚sentinel-core模塊的大部分內容,順着這條鏈路可以繼續梳理很多東西。
知其然、知其所以然。而閱讀源碼就是最好的知其所以然的方式。這一次找了一些空閑時間,捋了一下它的滑動窗口算法,在這里做一個記錄。后面會繼續去梳理它的令牌算法和漏桶算法。
關於滑動窗口的原理,Sentinel為什么要使用滑動窗口,Sentinel是怎樣使用的滑動,直接使用下面這兩張圖。一圖勝千言,一張好的圖足以說明問題,在這里我引用兩張圖。
圖片說明:第一張圖為Sentinel github上的圖片,因為有時加載不出來,所以拷貝出來了。圖二為一張微信公眾號的圖片,具體公眾號見水印。這里引用只是為了學習使用,但是還是注明一下來源。
首先從StatisticSlot類開始,它是Sentinel統計的核心功能槽,先看它的entry[^對這個方法做了一下精簡,只保留了幾行能夠說明問題的代碼。]方法:
@SpiOrder(-7000)
public class StatisticSlot extends AbstractLinkedProcessorSlot<DefaultNode> {
@Override
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count,
boolean prioritized, Object... args) throws Throwable {
try {
// 先執行后續限流、降級等功能
fireEntry(context, resourceWrapper, node, count, prioritized, args);
// 上面執行通過,更新通過請求數據
node.addPassRequest(count);
} catch (PriorityWaitException ex) {
} catch (BlockException e) {
// 上面執行阻塞,更新阻塞請求數據
node.increaseBlockQps(count);
} catch (Throwable e) {
}
}
}
通過代碼可以看出,它是先執行后面的限流、降級等,然后以后面的執行結果為基礎來更新對應資源的通過、阻塞、異常等統計數據。上面的執行通過和異常處理邏輯大體一致。這里就以執行通過這條線來說明問題,所以對應代碼就是node.addPassRequest(count);進入到這一行代碼,經過幾次調用轉到了StatisticNode這個類上,根據類名可以知道這個表示一個統計節點,調用的方法是addPassRequest:
@Override
public void addPassRequest(int count) {
rollingCounterInSecond.addPass(count);
rollingCounterInMinute.addPass(count);
}
由這個方法可以看出,StatisticNode在處理統計數據的時候,分了兩個維度,分別是秒級的和分鍾級的。對應的rollingCounterInSecond和rollingCounterInMinute是它的兩個成員屬性。其定義如下:
public class StatisticNode implements Node {
/**
* Holds statistics of the recent {@code INTERVAL} seconds. The {@code INTERVAL} is divided into time spans
* by given {@code sampleCount}.
*/
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,
IntervalProperty.INTERVAL);
/**
* Holds statistics of the recent 60 seconds. The windowLengthInMs is deliberately set to 1000 milliseconds,
* meaning each bucket per second, in this way we can get accurate statistics of each second.
*/
private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
}
Metric是一個度量單位接口,其具體實現下面需要提一下,在這里先不展開,只需要知道它存儲的是一些執行數據,如成功數、異常數等。而在上面的StatisticNode.addPassRequest方法中就是分別調用兩個維度的統計單位增加請求通過數量。從rollingCounterInSecond.addPass(count)這一句進入,對應的方法在ArrayMetric類中,這一個就是Metric的唯一實現。ArrayMetric.addPass這個方法代碼如下:
@Override
public void addPass(int count) {
// 獲取當前時間對應的窗口,返回的是當前窗口的一個包裝類
WindowWrap<MetricBucket> wrap = data.currentWindow();
wrap.value().addPass(count);
}
可以看出這里就已經是滑動窗口算法的入口了。通過滑動窗口算法,使用當前時間獲取一個合適的窗口,然后在這個窗口中增加通過的請求數。進入到代碼里面,最終落實到了LeapArray的currentWindow方法中了。就LeapArray這個類名來說,非常有我在開篇第二張圖的那味道了。
在看LeapArray.currentWindow這個方法之前,先來看一個短小簡單但是足夠核心的一個方法LeapArray.calculateTimeIdx,整個方法只有兩行代碼,如下:
private int calculateTimeIdx(long timeMillis) {
// 將傳入的當前時間按照窗口時長進行分段,拿到當前時間對應的分段ID
long timeId = timeMillis / windowLengthInMs;
// 將當前時間的分段段ID對應到窗口數組的下標ID上
return (int)(timeId % array.length());
}
上面的array定義如下:
protected final AtomicReferenceArray<WindowWrap<T>> array;
其賦值在構造方法中,如下語句:
this.array = new AtomicReferenceArray<>(sampleCount);
通過上面這個方法,我們就能夠得到當前時間對應的窗口在窗口數組中的位置了,接下來我們要做的事情就是根據這個位置取出對應的窗口返回去給對應的統計邏輯使用。
直接看LeapArray.currentWindow[^為了方便閱讀,精簡了它的注釋]方法定義:
public WindowWrap<T> currentWindow(long timeMillis) {
if (timeMillis < 0) {
return null;
}
// 獲取當前時間在窗口數組中映射的下標
int idx = calculateTimeIdx(timeMillis);
// 計算當前時間對應的窗口的開始時間,具體方法見下面
long windowStart = calculateWindowStart(timeMillis);
/*
* Get bucket item at given time from the array.
*
* (1) Bucket is absent, then just create a new bucket and CAS update to circular array.
* (2) Bucket is up-to-date, then just return the bucket.
* (3) Bucket is deprecated, then reset current bucket and clean all deprecated buckets.
*/
while (true) {
WindowWrap<T> old = array.get(idx);
if (old == null) {
// 第一次進入,新建窗口,並使用cas的方式設置,如果出現爭搶導致設置失敗,暫時讓出執行權待其它線程成功設置
WindowWrap<T> window = new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
if (array.compareAndSet(idx, null, window)) {
return window;
} else {
Thread.yield();
}
} else if (windowStart == old.windowStart()) {
// 當前時間對應的窗口開始時間等於獲取到的窗口開始時間,那么當前獲取到的窗口就是我們需要的
return old;
} else if (windowStart > old.windowStart()) {
// 當前時間對應的窗口開始時間大於獲取到的窗口開始時間,那么當前獲取到的窗口為已過期窗口,加鎖重置
if (updateLock.tryLock()) {
try {
return resetWindowTo(old, windowStart);
} finally {
updateLock.unlock();
}
} else {
Thread.yield();
}
} else if (windowStart < old.windowStart()) {
// Should not go through here, as the provided time is already behind.
return new WindowWrap<T>(windowLengthInMs, windowStart, newEmptyBucket(timeMillis));
}
}
}
LeapArray.calculateWindowStart方法:
protected long calculateWindowStart(long timeMillis) {
return timeMillis - timeMillis % windowLengthInMs;
}
總結上面的代碼就是:先將當前時間按照統計時長分段,得到當前時間對應的分段ID。因為窗口數組是固定的,所以隨着時間線向前發展,會不斷的順序循環使用數組中的窗口。所以使用當前時間對應的分段ID與窗口數組的長度求余得到當前時間對應的窗口在窗口數組中的下標,拿到這個下標后,接着就是在循環中獲取這個下標對應的窗口了。
在獲取指定下標對應的窗口時,要分情況進行處理:
- 如果對應下標窗口為null,那么就是第一次進入,創建新窗口並使用cas設置。如果非空走下面的邏輯。
- 如果獲取到的窗口開始時間等於當前時間計算出來的對應窗口開始時間,那么就拿到了當前時間需要的窗口,直接返回。
- 如果獲取到的窗口開始時間小於當前時間計算出來的對應窗口開始時間,那么就說明這個窗口已經過期了,所以加鎖重置,然后重復使用。
- 當前時間小於舊的窗口的開始時間,理論上來說是不應該出現這種情況的,如果存在這種情況,那么返回一個無效的空窗口。
整個Sentinel滑動窗口算法的使用就上面這些代碼,看完后第一感覺是代碼如此簡潔,但是功能卻如此高效強大。