一、概述
在 Sentinel 里面,所有的資源都對應一個資源名稱(resourceName
),每次資源調用都會創建一個 Entry
對象。Entry 可以通過對主流框架的適配自動創建,也可以通過注解的方式或調用 SphU
API 顯式創建。Entry 創建的時候,同時也會創建一系列功能插槽(slot chain),這些插槽有不同的職責,例如:
NodeSelectorSlot
負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;ClusterBuilderSlot
則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據;LogSlot
則用於打印異常日志;StatisticSlot
則用於記錄、統計不同緯度的 runtime 指標監控信息;SystemSlot
則通過系統的狀態,例如 load1 等,來控制總的入口流量;AuthoritySlot
則根據配置的黑白名單和調用來源信息,來做黑白名單控制FlowSlot
則用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制DegradeSlot
則通過統計信息以及預設的規則,來做熔斷降級;
Sentinel 將 ProcessorSlot
作為 SPI 接口進行擴展(1.7.2 版本以前 SlotChainBuilder
作為 SPI),使得 Slot Chain 具備了擴展的能力。我們可以自行加入自定義的 slot 並編排 slot 間的順序,從而可以給 Sentinel 添加自定義的功能。比如我不希望sentinel具有限流功能,可以引入自定義的ChainBuilder代碼如下:
public class SentinelChainBuilder implements SlotChainBuilder { @Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); chain.addLast(new NodeSelectorSlot()); chain.addLast(new ClusterBuilderSlot()); chain.addLast(new LogSlot()); chain.addLast(new StatisticSlot()); chain.addLast(new SystemSlot()); chain.addLast(new AuthoritySlot()); chain.addLast(new DegradeSlot()); return chain; } }
然后在MATE-INF中引入加入下圖文件即可
二、sentinel核心概念
2.1 ProcessorSlot(SlotChainBuilder)
Sentinel 的核心骨架,將不同的 Slot 按照順序串在一起(責任鏈模式),從而將不同的功能(限流、降級、系統保護)組合在一起。slot chain 其實可以分為兩部分:統計數據構建部分(statistic)和判斷部分(rule checking)。核心結構如下圖:
2.2 Resource
resource是sentinel中最重要的一個概念,sentinel通過資源來保護具體的業務代碼或其他后方服務。我們再控制台上實際配置的都是一個個資源。也可以使用@SentinelResource注解來自定義資源。
2.3 Context
Context 代表調用鏈路上下文,貫穿一次調用鏈路中的所有 Entry
。Context 維持着入口節點(entranceNode
)、本次調用鏈路的 curNode、調用來源(origin
)等信息。Context 名稱即為調用鏈路入口名稱。
2.4 Entry
每一次資源調用都會創建一個 Entry
。Entry
包含了資源名、curNode(當前統計節點)、originNode(來源統計節點)等信息。
2.5 主流程調用鏈路
三、限流的實現
關於sentinel限流的一些基本常識,可參見官方文檔,此處不多做贅述,附上鏈接如下:https://github.com/alibaba/Sentinel/wiki/%E6%B5%81%E9%87%8F%E6%8E%A7%E5%88%B6
3.1sentinel鏈路分析
對於sentinel中的http請求限流,我們再使用過程中,引入依賴包后,不需要新增任何代碼,只需要在sentinel的配置面板中新增限流規則即可。其具體實現,在於sentinel的sentinel-web-serverlet包中,CommonFilter類實現類javax.servlet.Filter接口,在請求處理之前做了一次攔截。具體代碼如下:
public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain) throws IOException, ServletException { HttpServletRequest sRequest = (HttpServletRequest)request; Entry urlEntry = null; try { String target = FilterUtil.filterTarget(sRequest); // Clean and unify the URL. // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or // the amount of context and resources will exceed the threshold. UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner(); if (urlCleaner != null) { target = urlCleaner.clean(target); } // If you intend to exclude some URLs, you can convert the URLs to the empty string "" // in the UrlCleaner implementation. if (!StringUtil.isEmpty(target)) { // Parse the request origin using registered origin parser. String origin = parseOrigin(sRequest); ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin); if (httpMethodSpecify) { // Add HTTP method prefix if necessary. String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target; urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN); } else { // 進入sentinel流控的核心方法 urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN); } } chain.doFilter(request, response); } catch (BlockException e) { HttpServletResponse sResponse = (HttpServletResponse)response; // Return the block page, or redirect to another URL. WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e); } catch (IOException | ServletException | RuntimeException e2) { Tracer.traceEntry(e2, urlEntry); throw e2; } finally { if (urlEntry != null) { urlEntry.exit(); } ContextUtil.exit(); } }
此處把請求路徑作為資源點下傳。其中實際核心代碼是在圈出處進入SphU中(sentienel中限流主要通過SphU.entry與SphO.entry作為入口,前者限流拋出BlockException,后者限流返回false)。查看CtSph方法,可見實際sentinel各核心組件串聯方法如下:
//這里傳入得參數count是1,prioritized=false,args是容量為1的空數組 private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { //獲取當前線程的上下文 Context context = ContextUtil.getContext(); if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } //為空的話,創建一個默認的context if (context == null) { //1 // Using default context. context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType()); } // Global switch is close, no rule checking will do. if (!Constants.ON) {//這里會返回false return new CtEntry(resourceWrapper, null, context); } //創建一系列功能插槽 ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ //如果超過了插槽的最大數量,那么會返回null if (chain == null) { return new CtEntry(resourceWrapper, null, context); } // 獲取entry Entry e = new CtEntry(resourceWrapper, chain, context); try { //調用責任鏈 chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
3.2限流原理分析
上圖中已經會調用ProcessorSlot.entry進入一系列功能插槽(slot chain)中,其中限流規則檢查是FlowSlot,FlowSlot中調用FlowRuleChecker.checkFlow方法,進行實質的降級檢查,代碼如下:
public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource, Context context, DefaultNode node, int count, boolean prioritized) throws BlockException { if (ruleProvider == null || resource == null) { return; } //返回FlowRuleManager里面注冊的所有規則 Collection<FlowRule> rules = ruleProvider.apply(resource.getName()); if (rules != null) { for (FlowRule rule : rules) { //如果當前的請求不能通過,那么就拋出FlowException異常 if (!canPassCheck(rule, context, node, count, prioritized)) { throw new FlowException(rule.getLimitApp(), rule); } } } } // 定義一個Function,此處的ruleProvider即是上文中的ruleProvider private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() { @Override public Collection<FlowRule> apply(String resource) { // Flow rule map should not be null. Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap(); return flowRules.get(resource); } }; // 真實進行限流校驗的方法 public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //如果沒有設置limitapp,那么不進行校驗,控制台配置時,默認會給個defualt String limitApp = rule.getLimitApp(); if (limitApp == null) { return true; } //集群模式 if (rule.isClusterMode()) { return passClusterCheck(rule, context, node, acquireCount, prioritized); } //本地模式 return passLocalCheck(rule, context, node, acquireCount, prioritized); }
上述代碼中會通過rule.isClusterMode方法判斷是分布式限流還是本地限流,下面我們只看本地限流的代碼
private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount, boolean prioritized) { //節點選擇 Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node); if (selectedNode == null) { return true; } //根據設置的規則來攔截 return rule.getRater().canPass(selectedNode, acquireCount, prioritized); }
其中selectNodeByRequesterAndStrategy方法用於選擇統計數據的載體用戶,rule.getRater用於選擇流控的方式(快速失敗,預熱,令牌桶)。selectNodeByRequesterAndStrategy代碼如下:
static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) { // 控制台中配置的限流針對的來源 String limitApp = rule.getLimitApp(); // 關系限流策略 int strategy = rule.getStrategy(); // 請求的來源 String origin = context.getOrigin(); //origin不為`default` or `other`,並且limitApp和origin相等 if (limitApp.equals(origin) && filterOrigin(origin)) {//1 if (strategy == RuleConstant.STRATEGY_DIRECT) { // Matches limit origin, return origin statistic node. return context.getOriginNode(); } //關系限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2 if (strategy == RuleConstant.STRATEGY_DIRECT) { //這里返回ClusterNode,表示所有應用對該資源的所有請求情況 // Return the cluster node. return node.getClusterNode(); } //關系限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp) && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3 if (strategy == RuleConstant.STRATEGY_DIRECT) { return context.getOriginNode(); } //關系限流策略為關聯或者鏈路的處理 return selectReferenceNode(rule, context, node); } return null; }
此方法的中的limitApp為限制的資源,strategy為流控的類型(調用方,關聯,鏈路),origin為調用來源。最后我們看一下立即通過時的代碼
public boolean canPass(Node node, int acquireCount, boolean prioritized) { //判斷是限流還是限制並發數量,然后獲取流量或並發數量 int curCount = avgUsedTokens(node); //如果兩者相加大於限定的並發數 if (curCount + acquireCount > count) { // 如果此請求是一個高優先級請求,並且限流類型為qps,則不會立即失敗,而是去占用未來的窗口 if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) { long currentTime; long waitInMs; currentTime = TimeUtil.currentTimeMillis(); waitInMs = node.tryOccupyNext(currentTime, acquireCount, count); if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) { node.addWaitingRequest(currentTime + waitInMs, acquireCount); node.addOccupiedPass(acquireCount); sleep(waitInMs); // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}. throw new PriorityWaitException(waitInMs); } } return false; } return true; }
四、熔斷的觸發與實現
4.1熔斷觸發條件分析
4.1.1RT降級
接下來重點來了,比如rt降級規則,我配置的是1ms超時,而此接口實際rt遠高於1ms。但是當我測試接口的時候,發現實際並沒有進入降級處理方法中。但是人舉國猛點則會進入降級代碼,查看sentinel的github上文檔后發現sentinel對於rt降級策略如下:
所以需要多次觸發才能進入熔斷機制
4.1.2異常降級
異常數量降級需要注意,此時的時間窗口是分鍾機制,官方文檔如下:
異常比例熔斷也存在最小次數的機制。
異常數熔斷由於窗口是分鍾,所以集中出現異常時,如果熔斷時間不到1min,比如10s,那么熔斷結束后,向前推一分鍾內異常數可能依舊高於最大數,會再次進入熔斷邏輯。
附上sentinel的熔斷降級熔斷文檔如下:
https://github.com/alibaba/Sentinel/wiki/%E7%86%94%E6%96%AD%E9%99%8D%E7%BA%A7
4.2熔斷源碼分析
4.2.1熔斷鏈路
對於熔斷,為了更好的配置熔斷發生后的接口返回值,我們通常使用的@SentinelResource注解,並指定blockHandler方法處理熔斷發生后的回執。對於@SentinelResource注解,查看sentinel源碼可發現其切面如下:
@Around("sentinelResourceAnnotationPointcut()") public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable { Method originMethod = resolveMethod(pjp); SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class); if (annotation == null) { // Should not go through here. throw new IllegalStateException("Wrong state for SentinelResource annotation"); } String resourceName = getResourceName(annotation.value(), originMethod); EntryType entryType = annotation.entryType(); int resourceType = annotation.resourceType(); Entry entry = null; try { // 進入sentinel限流方法 entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs()); Object result = pjp.proceed(); return result; } catch (BlockException ex) { return handleBlockException(pjp, annotation, ex); } catch (Throwable ex) { Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore(); // The ignore list will be checked first. if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) { throw ex; } if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) { traceException(ex, annotation); return handleFallback(pjp, annotation, ex); } // No fallback function can handle the exception, so throw it out. throw ex; } finally { // exit時進行數據統計 if (entry != null) { entry.exit(1, pjp.getArgs()); } } }
發現其對於@SentinelResource注解切面中,會在進入真實方法調用前,調用SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())方法,根據前文分析可知,此處會調用ProcessorSlot.entry進入一系列功能插槽(slot chain)中,其中降級規則檢查是DegradeSlot,DegradeSlot中調用DegradeRuleManager.checkDegrade方法,進行實質的降級檢查,代碼如下:
public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count) throws BlockException { //根據resource來獲取降級策略 Set<DegradeRule> rules = degradeRules.get(resource.getName()); if (rules == null) { return; } for (DegradeRule rule : rules) { if (!rule.passCheck(context, node, count)) { throw new DegradeException(rule.getLimitApp(), rule); } } }
其中degradeRules.get方法通過資源名稱,查看此資源對應的降級規則列表。DegradeRule.passCheck方法為實際降級檢查。
4.2.2熔斷處理
熔斷相關代碼如下:
public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) { //返回false直接進行降級 if (cut.get()) { return false; } //降級是根據資源的全局節點來進行判斷降級策略的 ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource()); if (clusterNode == null) { return true; } //根據響應時間降級策略 if (grade == RuleConstant.DEGRADE_GRADE_RT) { //獲取節點的平均響應時間 double rt = clusterNode.avgRt(); if (rt < this.count) { passCount.set(0); return true; } //rtSlowRequestAmount默認是5 // Sentinel will degrade the service only if count exceeds. if (passCount.incrementAndGet() < rtSlowRequestAmount) { return true; } // 根據異常比例降級 } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) { //獲取每秒異常的次數 double exception = clusterNode.exceptionQps(); //獲取每秒成功的次數 double success = clusterNode.successQps(); //獲取每秒總調用次數 double total = clusterNode.totalQps(); // If total amount is less than minRequestAmount, the request will pass. // 如果總調用次數少於5,那么不進行降級 if (total < minRequestAmount) { return true; } // In the same aligned statistic time window, // "success" (aka. completed count) = exception count + non-exception count (realSuccess) // 獲取真實成功數 double realSuccess = success - exception; if (realSuccess <= 0 && exception < minRequestAmount) { return true; } if (exception / success < count) { return true; } // 根據異常數降級 } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) { double exception = clusterNode.totalException(); if (exception < count) { return true; } } //根據設置的時間窗口進行重置 if (cut.compareAndSet(false, true)) { ResetTask resetTask = new ResetTask(this); pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS); } return false; }
4.2.3數據統計
在接資源點調用結束后,會進入com.alibaba.csp.sentinel.Entry.exit方法,此方法最后會調用StatisticNode.addRtAndSuccess方法記錄成功數和RT。具體如下:
@Override public void addRtAndSuccess(long rt, int successCount) { rollingCounterInSecond.addSuccess(successCount); rollingCounterInSecond.addRT(rt); rollingCounterInMinute.addSuccess(successCount); rollingCounterInMinute.addRT(rt); }