Sentinel限流的神秘面紗:
之前我們學習過限流比較主流的三種算法:漏桶,令牌桶,滑動窗口。而Sentinel采用的是最后一種,滑動窗口來實現限流的。
通過對Sentinel基礎Api的使用,我們可用發現,從我們定義好限流規則以后,在我們需要進行業務處理之前,都需要調用一下 SphU.entry(resource),具體代碼如下:
public static void main(String[] args) { initFlowRules(); //初始化一個規則
while(true){ Entry entry=null; try{ entry= SphU.entry(resource); //它做了什么
System.out.println("Hello Word"); }catch (BlockException e){//如果被限流了,那么會拋出這個異常
e.printStackTrace(); }finally { if(entry!=null){ entry.exit();// 釋放
} } } }
代碼中我們可能唯一疑惑的,也是最關鍵的一步是 SphU.entry(resource) , 我們傳進去了一個資源,這個資源可用是方法名,可以是接口,那么他具體做了什么呢?讓我們來一步步揭開他的神秘面紗:
public static Entry entry(String name) throws BlockException { return Env.sph.entry(name, EntryType.OUT, 1, OBJECTS0); } public class Env { public static final Sph sph = new CtSph(); ......//省略部分代碼 }
從 SphU.entry()
方法往下執行會進入到 Sph.entry()
,Sph的默認實現類是 CtSph,而最終會進入CtSph 的entry 方法:
@Override public Entry entry(String name, EntryType type, int count, Object... args) throws BlockException {
//封裝了一個資源對象 StringResourceWrapper resource = new StringResourceWrapper(name, type); return entry(resource, count, args); }
這里的主要步驟是通過我們給定的資源去封裝了一個 StringResourceWrapper ,然后傳入自己的重載方法,繼而調用 entryWithPriority(resourceWrapper, count, false, args):
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done.
return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context.使用默認上下文
context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } //全局開關關閉,沒有規則檢查。 // Global switch is close, no rule checking will do.
if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } // 獲取該資源對應的 chain ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */
if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try {//執行chain的 entry方法 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal.
RecordLog.info("Sentinel unexpected exception", e1); } return e; }
從上面的代碼我們可以知道,該方法中主要是獲取到了本資源所對應的資源處理鏈,從起命名 lookProcessChain 中發現,就是去獲取到一條處理鏈,去執行資源的整合處理,當然,這里處於限流的環境下,那么這個處理鏈肯定是對於當前環境下請求的流量整合限流相關的處理。可以分為以下幾個部分:
- 對參全局配置項做檢測,如果不符合要求就直接返回了一個CtEntry對象,不會再進行后面的限流檢測,否則進入下面的檢測流程。
- 根據包裝過的資源對象獲取對應的SlotChain
- 執行SlotChain的entry方法,如果SlotChain的entry方法拋出了BlockException,則將該異常繼續向上拋出,如果SlotChain的entry方法正常執行了,則最后會將該entry對象返回
- 如果上層方法捕獲了BlockException,則說明請求被限流了,否則請求能正常執行
SlotChain 的鏈路獲取:
我們重點關注這個 SlotChain 的獲取以及相關中葯邏輯的執行,我們跟進 lookProcessChain :
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); if (chain == null) {//這里與spring(緩存bean) dubbo(雙重檢查鎖)中如出一轍,采用緩存機制 synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. 6000
if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 真正構造SlotChain的和新方法 chain = SlotChainProvider.newSlotChain();
// 資源 --> 處理鏈 Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
這里的代碼很清晰可以發現,首先從緩存中獲取該處理鏈,而第一次進來肯定是沒有的,所以這里會走 SlotChainProvider 去構造處理鏈,構造完成后將起放入緩存以備下次使用:
public static ProcessorSlotChain newSlotChain() { if (builder != null) { return builder.build(); } resolveSlotChainBuilder(); if (builder == null) { RecordLog.warn("[SlotChainProvider] Wrong state when resolving slot chain builder, using default"); builder = new DefaultSlotChainBuilder(); } return builder.build(); }
這個方法進行了多次的校驗,確保builder 不為空,然后通過其去構造這個處理鏈:
public class DefaultSlotChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new FlowSlot()); chain.addLast(new DegradeSlot()); return chain; } }
到了這里我們終於發現了這個處理鏈的組成情況,官網也有對其進行說明,畢竟是Sentinel的限流核心算法的實現腹地,我們看一下官網的介紹:
在 Sentinel 里面,所有的資源都對應一個資源名稱(resourceName
),每次資源調用都會創建一個 Entry
對象。Entry 可以通過對主流框架的適配自動創建,也可以通過注解的方式或調用 SphU
API 顯式創建。Entry 創建的時候,同時也會創建一系列功能插槽(slot chain),這些插槽有不同的職責,例如:
NodeSelectorSlot
:收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;ClusterBuilderSlot
:用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據;StatisticSlot
:用於記錄、統計不同緯度的 runtime 指標監控信息;SystemSlot
:通過系統的狀態,例如 load1 等,來控制總的入口流量;AuthoritySlot
:根據配置的黑白名單和調用來源信息,來做黑白名單控制;FlowSlot
:用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制;DegradeSlot
:通過統計信息以及預設的規則,來做熔斷降級;
總體的框架如下:
從這個架構圖可以發現,整個調用鏈中最核心的就是 StatisticSlot
(用於記錄、統計不同緯度的 runtime 指標監控信息) 以及FlowSlot
(根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制).
Chain是鏈條的意思,從build的方法可看出,ProcessorSlotChain是一個鏈表,里面添加了很多個Slot。具體的實現需要到DefaultProcessorSlotChain中去看。
public class DefaultProcessorSlotChain extends ProcessorSlotChain { AbstractLinkedProcessorSlot<?> first = new AbstractLinkedProcessorSlot<Object>() { @Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); } @Override public void exit(Context context, ResourceWrapper resourceWrapper, int count, Object... args) { super.fireExit(context, resourceWrapper, count, args); } }; AbstractLinkedProcessorSlot<?> end = first; ......//省略部分代碼 }
DefaultProcessorSlotChain中有兩個AbstractLinkedProcessorSlot類型的變量:first和end,這就是鏈表的頭結點和尾節點。創建DefaultProcessorSlotChain對象時,首先創建了首節點,然后把首節點賦值給了尾節點,可以用下圖表示:
然后通過 chain.addLast(new NodeSelectorSlot()); 添加第一個節點:
public void addLast(AbstractLinkedProcessorSlot<?> protocolProcessor) { end.setNext(protocolProcessor); end = protocolProcessor; } public void setNext(AbstractLinkedProcessorSlot<?> next) { this.next = next; }
執行完添加 addLast 以后的鏈如下圖:
當執行完整個處理鏈的添加后, SlotChain 如下:
這樣就將所有的Slot對象添加到了鏈表中去了,每一個Slot都是繼承自AbstractLinkedProcessorSlot。而AbstractLinkedProcessorSlot是一種責任鏈的設計,每個對象中都有一個next屬性,指向的是另一個AbstractLinkedProcessorSlot對象。其實責任鏈模式在很多框架中都有,比如Netty中是通過pipeline來實現的,還有Zookeeper中的服務端的請求處理鏈RequestProcessor等。
SlotChain 的鏈路執行:
lookProcessChain方法獲得的ProcessorSlotChain的實例是DefaultProcessorSlotChain,那么執行chain.entry方法,就會執行DefaultProcessorSlotChain.first的entry方法,而DefaultProcessorSlotChain.first的entry方法是這樣的:
@Override public void entry(Context context, ResourceWrapper resourceWrapper, Object t, int count, boolean prioritized, Object... args) throws Throwable { super.fireEntry(context, resourceWrapper, t, count, prioritized, args); }
繼而調用父類 AbstractLinkedProcessorSlot 的 fireEntry 方法:
@Override public void fireEntry(Context context, ResourceWrapper resourceWrapper, Object obj, int count, boolean prioritized, Object... args) throws Throwable { if (next != null) { next.transformEntry(context, resourceWrapper, obj, count, prioritized, args); } }
從這里可以看到,從fireEntry方法中就開始傳遞執行entry了,這里會執行當前節點的下一個節點transformEntry方法,上面已經分析過了,transformEntry方法會觸發當前節點的entry,也就是說fireEntry方法實際是觸發了下一個節點的entry方法。由上面的分析我們可以知道第一個Slot節點是 NodeSelectorSlot。
根據之前官網的介紹,我們着重注意兩個 Slot ,就像我們使用的時候一樣,我們需要配置規則,那么在Sentinel 中去校驗這個規則的是 FlowSlot ,既然是一個做規則匹配的,那么進行匹配的數據是哪里來的呢? 在Sentinel中他提供了一個Slot 來統計這些數據,然后交給FlowSlot進行校驗,他就是StatisticSlot
。我們首先來看StatisticSlot
的entry方法中的實現邏輯:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { try { // Do some checking. 傳播到下一個Slot
fireEntry(context, resourceWrapper, node, count, prioritized, args); // 執行到這里表示通過檢查,不被限流 // Request passed, add thread count and pass count.
node.increaseThreadNum(); node.addPassRequest(count); if (context.getCurEntry().getOriginNode() != null) { // Add count for origin node.
context.getCurEntry().getOriginNode().increaseThreadNum(); context.getCurEntry().getOriginNode().addPassRequest(count); } if (resourceWrapper.getEntryType() == EntryType.IN) { // Add count for global inbound entry node for global statistics.
Constants.ENTRY_NODE.increaseThreadNum(); Constants.ENTRY_NODE.addPassRequest(count); } // Handle pass event with registered entry callback handlers.
for (ProcessorSlotEntryCallback<DefaultNode> handler : StatisticSlotCallbackRegistry.getEntryCallbacks()) { handler.onPass(context, resourceWrapper, node, count, args); } } catch (PriorityWaitException ex) {
....省略部分代碼
//增加線程統計 node.increaseThreadNum(); } catch (BlockException e) {
....省略部分代碼 // Blocked, set block exception to current entry.
context.getCurEntry().setError(e); // Add block count.
node.increaseBlockQps(count); } catch (Throwable e) { ....省略部分代碼// This should not happen.
node.increaseExceptionQps(count); } }
代碼分成了兩部分,第一部分是entry方法,該方法首先會觸發后續slot的entry方法,即SystemSlot、FlowSlot、DegradeSlot等的規則,如果規則不通過,就會拋出BlockException,則會在node中統計被block的數量。反之會在node中統計通過的請求數和線程數等信息。第二部分是在exit方法中,當退出該Entry入口時,會統計rt的時間,並減少線程數。
我們可以看到 node.addPassRequest()
這段代碼是在fireEntry執行之后執行的,這意味着,當前請求通過了sentinel的流控等規則,此時需要將當次請求記錄下來,也就是執行 node.addPassRequest()
這行代碼,我們跟進去看看:
public void addPassRequest(int count) { super.addPassRequest(count); this.clusterNode.addPassRequest(count); }
首先我們知道這里的node是一個 DefaultNode 實例,在第一個NodeSelectorSlot 的entry方法中對資源進行了封裝,封裝成了一個DefaultNode。
- DefaultNode:保存着某個resource在某個context中的實時指標,每個DefaultNode都指向一個ClusterNode
- ClusterNode:保存着某個resource在所有的context中實時指標的總和,同樣的resource會共享同一個ClusterNode,不管他在哪個context中
// SAMPLE_COUNT=2 INTERVAL=1000
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT, IntervalProperty.INTERVAL); private transient Metric rollingCounterInMinute = new ArrayMetric(60, 60 * 1000, false);
public void addPassRequest(int count) { rollingCounterInSecond.addPass(count); rollingCounterInMinute.addPass(count); }
從代碼中我們可以看到,增加指標調用 addPass 是通過一個叫 ArrayMetric 的類,現在我們在進入 ArrayMetric 中看一下。具體的代碼如下所示:
private final LeapArray<MetricBucket> data; // SAMPLE_COUNT=2 INTERVAL=1000 public ArrayMetric(int sampleCount, int intervalInMs) { this.data = new OccupiableBucketLeapArray(sampleCount, intervalInMs); } public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }
這里終於出現了與滑動窗口有那么點關聯的 window了,window不就是窗戶嘛,這里通過 data 來獲取當前窗口。而這里的窗口大小為 sampleCount=2.我們可以看到,這里是通過 MetricBucket 來保存各項指標,其中維護了一個統計是數組LongAdder[] counters 來保存,而 WindowWrap,我們可以看到每一個 WindowWrap對象由三個部分組成:
public class WindowWrap<T> { // 時間窗口的長度 private final long windowLengthInMs; // 時間窗口的開始時間,單位是毫秒 private long windowStart; //時間窗口的內容,在 WindowWrap 中是用泛型表示這個值的,但實際上就是 MetricBucket 類 private T value; //......省略部分代碼 }
再看 LeapArray 這個類:
public abstract class LeapArray<T> { // 時間窗口的長度 protected int windowLength; // 采樣窗口的個數 protected int sampleCount; // 以毫秒為單位的時間間隔 protected int intervalInMs; // 采樣的時間窗口數組 protected AtomicReferenceArray<WindowWrap<T>> array; /** * LeapArray對象 * @param windowLength 時間窗口的長度,單位:毫秒 * @param intervalInSec 統計的間隔,單位:秒 */ public LeapArray(int windowLength, int intervalInSec) { this.windowLength = windowLength; // 時間窗口的采樣個數,默認為2個采樣窗口 this.sampleCount = intervalInSec * 1000 / windowLength; this.intervalInMs = intervalInSec * 1000; this.array = new AtomicReferenceArray<WindowWrap<T>>(sampleCount); } }
可以很清晰的看出來在 LeapArray
中創建了一個 AtomicReferenceArray 數組,用來對時間窗口中的統計值進行采樣。通過采樣的統計值再計算出平均值,就是我們需要的最終的實時指標的值了。可以看到我在上面的代碼中通過注釋,標明了默認采樣的時間窗口的個數是2個,這個值是怎么得到的呢?我們回憶一下 LeapArray
對象創建,是通過在 StatisticNode
中,new了一個 ArrayMetric
,然后將參數一路往上傳遞后創建的:
private transient volatile Metric rollingCounterInSecond = new ArrayMetric(SampleCountProperty.SAMPLE_COUNT,IntervalProperty.INTERVAL);
我們跟進獲取當前窗口的方法 data.currentWindow() 中:
@Override public WindowWrap<Window> currentWindow(long time) { .....//省略部分代碼 int idx = calculateTimeIdx(timeMillis); // Calculate current bucket start time. long windowStart = calculateWindowStart(timeMillis); // time每增加一個windowLength的長度,timeId就會增加1,時間窗口就會往前滑動一個while (true) { // 從采樣數組中根據索引獲取緩存的時間窗口 WindowWrap<Window> old = array.get(idx); // array數組長度不宜過大,否則old很多情況下都命中不了,就會創建很多個WindowWrap對象 if (old == null) { // 如果沒有獲取到,則創建一個新的 WindowWrap<Window> window = new WindowWrap<Window>(windowLength, currentWindowStart, new Window()); // 通過CAS將新窗口設置到數組中去 if (array.compareAndSet(idx, null, window)) { // 如果能設置成功,則將該窗口返回 return window; } else { // 否則當前線程讓出時間片,等待 Thread.yield(); } // 如果當前窗口的開始時間與old的開始時間相等,則直接返回old窗口 } else if (currentWindowStart == old.windowStart()) { return old; // 如果當前時間窗口的開始時間已經超過了old窗口的開始時間,則放棄old窗口 // 並將time設置為新的時間窗口的開始時間,此時窗口向前滑動 } else if (currentWindowStart > old.windowStart()) { if (addLock.tryLock()) { try { // if (old is deprecated) then [LOCK] resetTo currentTime. return resetWindowTo(old, currentWindowStart); } finally { addLock.unlock(); } } else { Thread.yield(); } // 這個條件不可能存在 } else if (currentWindowStart < old.windowStart()) { // Cannot go through here. return new WindowWrap<Window>(windowLength, currentWindowStart, new Window()); } } }
代碼很長,我們逐步將其分解,我們實際可以把他分成以下幾步:
- 根據當前時間,算出該時間的timeId,並根據timeId算出當前窗口在采樣窗口數組中的索引idx。
- 根據當前時間算出當前窗口的應該對應的開始時間time,以毫秒為單位。
- 根據索引idx,在采樣窗口數組中取得一個時間窗口。
- 循環判斷直到獲取到一個當前時間窗口 old 。
- 如果old為空,則創建一個時間窗口,並將它插入到array的第idx個位置,array上面已經分析過了,是一個 AtomicReferenceArray。
- 如果當前窗口的開始時間time與old的開始時間相等,那么說明old就是當前時間窗口,直接返回old。
- 如果當前窗口的開始時間time大於old的開始時間,則說明old窗口已經過時了,將old的開始時間更新為最新值:time,進入下一次得循環再判斷當前窗口的開始時間time與old的開始時間相等的時候返回。
- 如果當前窗口的開始時間time小於old的開始時間,實際上這種情況是不可能存在的,因為time是當前時間,old是過去的一個時間。
另外timeId是會隨着時間的增長而增加,當前時間每增長一個windowLength的長度,timeId就加1。但是idx不會增長,只會在0和1之間變換,因為array數組的長度是2,只有兩個采樣時間窗口。至於為什么默認只有兩個采樣窗口,個人覺得因為sentinel是比較輕量的框架。時間窗口中保存着很多統計數據,如果時間窗口過多的話,一方面會占用過多內存,另一方面時間窗口過多就意味着時間窗口的長度會變小,如果時間窗口長度變小,就會導致時間窗口過於頻繁的滑動。先來看一下其中的第一步及第二步:
private int calculateTimeIdx(/*@Valid*/ long timeMillis) { // time每增加一個windowLength的長度,timeId就會增加1,時間窗口就會往前滑動一個 long timeId = timeMillis / windowLengthInMs; // idx被分成[0,arrayLength-1]中的某一個數,作為array數組中的索引 return (int)(timeId % array.length()); } protected long calculateWindowStart(/*@Valid*/ long timeMillis) { return timeMillis - timeMillis % windowLengthInMs; }
根據當前時間除於 windowLength 得到一個 timeId(相差500ms計算出來的值將是一致的),再用timeId跟取樣窗口的長度進行一個取模,那么她一定會落在 0,1兩個位置的其中一個。然后根據當前時間算出當前窗口的應該對應的開始時間time。由於剛剛開始的時候 array 是空的,那么她獲取到的old應當是null,那么他會創建一個新的實例,我們用圖看一下初始化的 LeapArray:
對應上面 currentWindow 方法的 4.1 步驟(假設idx=0):
當獲取到的是null,那么初始的時候arrays數組中只有一個窗口(可能是第一個(idx=0),也可能是第二個(idx=1)),每個時間窗口的長度是500ms,這就意味着只要當前時間與時間窗口的差值在500ms之內,時間窗口就不會向前滑動。例如,假如當前時間走到300或者500時,當前時間窗口仍然是相同的那個:
對應上面 currentWindow 方法的 4.2 步驟:
時間繼續往前走,當超過500ms時,時間窗口就會向前滑動到下一個,這時就會更新當前窗口的開始時間,時間繼續往前走,只要不超過1000ms,則當前窗口不會發生變化,其中代碼實現是 resetWindowTo 方法:
protected WindowWrap<MetricBucket> resetWindowTo(WindowWrap<MetricBucket> w, long time) { // Update the start time and reset value. // 重置windowStart w.resetTo(time); MetricBucket borrowBucket = borrowArray.getWindowValue(time); if (borrowBucket != null) { w.value().reset(); w.value().addPass((int)borrowBucket.pass()); } else { w.value().reset(); } return w; }
對應上面 currentWindow 方法的 4.3 步驟:
當時間繼續往前走,當前時間超過1000ms時,就會再次進入下一個時間窗口,此時arrays數組中的窗口將會有一個失效,會有另一個新的窗口進行替換:
以此類推隨着時間的流逝,時間窗口也在發生變化,在當前時間點中進入的請求,會被統計到當前時間對應的時間窗口中,回到addpass 方法中:
public void addPass(int count) { WindowWrap<MetricBucket> wrap = data.currentWindow(); wrap.value().addPass(count); }
獲取到窗口以后會進入到 wrap.value().addPass(count); QPS的增加。而這里的 wrap.value() 得到的是之前提到的 MetricBucket ,在 Sentinel 中QPS相關數據的統計結果是維護在這個類的 LongAdder[] 中,最終由這個指標來與我們實現設置好的規則進行匹配,查看是否限流,也就是 StatisticSlot
的entry 方法中的 fireEntry(context, resourceWrapper, node, count, prioritized, args); 都要先進入到 FlowSlot的entry方法進行限流過濾:
public void entry(Context context, ResourceWrapper resourceWrapper, DefaultNode node, int count, boolean prioritized, Object... args) throws Throwable { checkFlow(resourceWrapper, context, node, count, prioritized); fireEntry(context, resourceWrapper, node, count, prioritized, args); }
可以看到這里有個很重要的方法 checkFlow ,進去看看:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } }
到這里一切都應該清晰了,這里拿到了我們設置的 FlowRule 循環匹配資源進行限流過濾。這就是Sentinel 能做到限流的原因。