Sentinel限流與熔斷分析


一、概述

在 Sentinel 里面,所有的資源都對應一個資源名稱(resourceName),每次資源調用都會創建一個 Entry 對象。Entry 可以通過對主流框架的適配自動創建,也可以通過注解的方式或調用 SphU API 顯式創建。Entry 創建的時候,同時也會創建一系列功能插槽(slot chain),這些插槽有不同的職責,例如:

  • NodeSelectorSlot 負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;
  • ClusterBuilderSlot 則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據;
  • LogSlot 則用於打印異常日志;
  • StatisticSlot 則用於記錄、統計不同緯度的 runtime 指標監控信息;
  • SystemSlot 則通過系統的狀態,例如 load1 等,來控制總的入口流量;
  • AuthoritySlot 則根據配置的黑白名單和調用來源信息,來做黑白名單控制
  • FlowSlot 則用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制
  • DegradeSlot 則通過統計信息以及預設的規則,來做熔斷降級;

Sentinel 將 ProcessorSlot 作為 SPI 接口進行擴展(1.7.2 版本以前 SlotChainBuilder 作為 SPI),使得 Slot Chain 具備了擴展的能力。我們可以自行加入自定義的 slot 並編排 slot 間的順序,從而可以給 Sentinel 添加自定義的功能。比如我不希望sentinel具有限流功能,可以引入自定義的ChainBuilder代碼如下:

public class SentinelChainBuilder implements SlotChainBuilder {

    @Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();
        chain.addLast(new NodeSelectorSlot());
        chain.addLast(new ClusterBuilderSlot());
        chain.addLast(new LogSlot());
        chain.addLast(new StatisticSlot());
        chain.addLast(new SystemSlot());
        chain.addLast(new AuthoritySlot());
        chain.addLast(new DegradeSlot());

        return chain;
    }
}

然后在MATE-INF中引入加入下圖文件即可

image.png

二、sentinel核心概念

2.1 ProcessorSlot(SlotChainBuilder)

Sentinel 的核心骨架,將不同的 Slot 按照順序串在一起(責任鏈模式),從而將不同的功能(限流、降級、系統保護)組合在一起。slot chain 其實可以分為兩部分:統計數據構建部分(statistic)和判斷部分(rule checking)。核心結構如下圖:

image.png

2.2 Resource

resource是sentinel中最重要的一個概念,sentinel通過資源來保護具體的業務代碼或其他后方服務。我們再控制台上實際配置的都是一個個資源。也可以使用@SentinelResource注解來自定義資源。

2.3 Context

Context 代表調用鏈路上下文,貫穿一次調用鏈路中的所有 Entry。Context 維持着入口節點(entranceNode)、本次調用鏈路的 curNode、調用來源(origin)等信息。Context 名稱即為調用鏈路入口名稱。

2.4 Entry

每一次資源調用都會創建一個 EntryEntry 包含了資源名、curNode(當前統計節點)、originNode(來源統計節點)等信息。

2.5 主流程調用鏈路

image.png

 

三、限流的實現

關於sentinel限流的一些基本常識,可參見官方文檔,此處不多做贅述,附上鏈接如下:https://github.com/alibaba/Sentinel/wiki/%E6%B5%81%E9%87%8F%E6%8E%A7%E5%88%B6

3.1sentinel鏈路分析

對於sentinel中的http請求限流,我們再使用過程中,引入依賴包后,不需要新增任何代碼,只需要在sentinel的配置面板中新增限流規則即可。其具體實現,在於sentinel的sentinel-web-serverlet包中,CommonFilter類實現類javax.servlet.Filter接口,在請求處理之前做了一次攔截。具體代碼如下:

public void doFilter(ServletRequest request, ServletResponse response, FilterChain chain)
    throws IOException, ServletException {
    HttpServletRequest sRequest = (HttpServletRequest)request;
    Entry urlEntry = null;
    try {
        String target = FilterUtil.filterTarget(sRequest);
        // Clean and unify the URL.
        // For REST APIs, you have to clean the URL (e.g. `/foo/1` and `/foo/2` -> `/foo/:id`), or
        // the amount of context and resources will exceed the threshold.
        UrlCleaner urlCleaner = WebCallbackManager.getUrlCleaner();
        if (urlCleaner != null) {
            target = urlCleaner.clean(target);
        }
        // If you intend to exclude some URLs, you can convert the URLs to the empty string ""
        // in the UrlCleaner implementation.
        if (!StringUtil.isEmpty(target)) {
            // Parse the request origin using registered origin parser.
            String origin = parseOrigin(sRequest);
            ContextUtil.enter(WebServletConfig.WEB_SERVLET_CONTEXT_NAME, origin);
            if (httpMethodSpecify) {
                // Add HTTP method prefix if necessary.
                String pathWithHttpMethod = sRequest.getMethod().toUpperCase() + COLON + target;
                urlEntry = SphU.entry(pathWithHttpMethod, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            } else {
                // 進入sentinel流控的核心方法
                urlEntry = SphU.entry(target, ResourceTypeConstants.COMMON_WEB, EntryType.IN);
            }
        }
        chain.doFilter(request, response);
    } catch (BlockException e) {
        HttpServletResponse sResponse = (HttpServletResponse)response;
        // Return the block page, or redirect to another URL.
        WebCallbackManager.getUrlBlockHandler().blocked(sRequest, sResponse, e);
    } catch (IOException | ServletException | RuntimeException e2) {
        Tracer.traceEntry(e2, urlEntry);
        throw e2;
    } finally {
        if (urlEntry != null) {
            urlEntry.exit();
        }
        ContextUtil.exit();
    }
}

此處把請求路徑作為資源點下傳。其中實際核心代碼是在圈出處進入SphU中(sentienel中限流主要通過SphU.entry與SphO.entry作為入口,前者限流拋出BlockException,后者限流返回false)。查看CtSph方法,可見實際sentinel各核心組件串聯方法如下:

//這里傳入得參數count是1,prioritized=false,args是容量為1的空數組
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
    //獲取當前線程的上下文
    Context context = ContextUtil.getContext();
    if (context instanceof NullContext) {
        // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
        // so here init the entry only. No rule checking will be done.
        return new CtEntry(resourceWrapper, null, context);
    }
    //為空的話,創建一個默認的context
    if (context == null) { //1
        // Using default context.
        context = MyContextUtil.myEnter(Constants.CONTEXT_DEFAULT_NAME, "", resourceWrapper.getType());
    }

    // Global switch is close, no rule checking will do.
    if (!Constants.ON) {//這里會返回false
        return new CtEntry(resourceWrapper, null, context);
    }

    //創建一系列功能插槽
    ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper);

    /*
     * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
     * so no rule checking will be done.
     */
    //如果超過了插槽的最大數量,那么會返回null
    if (chain == null) {
        return new CtEntry(resourceWrapper, null, context);
    }

    // 獲取entry
    Entry e = new CtEntry(resourceWrapper, chain, context);
    try {
        //調用責任鏈
        chain.entry(context, resourceWrapper, null, count, prioritized, args);
    } catch (BlockException e1) {
        e.exit(count, args);
        throw e1;
    } catch (Throwable e1) {
        // This should not happen, unless there are errors existing in Sentinel internal.
        RecordLog.info("Sentinel unexpected exception", e1);
    }
    return e;
}

3.2限流原理分析

上圖中已經會調用ProcessorSlot.entry進入一系列功能插槽(slot chain)中,其中限流規則檢查是FlowSlot,FlowSlot中調用FlowRuleChecker.checkFlow方法,進行實質的降級檢查,代碼如下:

public void checkFlow(Function<String, Collection<FlowRule>> ruleProvider, ResourceWrapper resource,
                      Context context, DefaultNode node, int count, boolean prioritized) throws BlockException {
    if (ruleProvider == null || resource == null) {
        return;
    }
    //返回FlowRuleManager里面注冊的所有規則
    Collection<FlowRule> rules = ruleProvider.apply(resource.getName());
    if (rules != null) {
        for (FlowRule rule : rules) {
            //如果當前的請求不能通過,那么就拋出FlowException異常
            if (!canPassCheck(rule, context, node, count, prioritized)) {
                throw new FlowException(rule.getLimitApp(), rule);
            }
        }
    }
}

// 定義一個Function,此處的ruleProvider即是上文中的ruleProvider
private final Function<String, Collection<FlowRule>> ruleProvider = new Function<String, Collection<FlowRule>>() {
    @Override
    public Collection<FlowRule> apply(String resource) {
        // Flow rule map should not be null.
        Map<String, List<FlowRule>> flowRules = FlowRuleManager.getFlowRuleMap();
        return flowRules.get(resource);
    }
};

// 真實進行限流校驗的方法
public boolean canPassCheck(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                         boolean prioritized) {
    //如果沒有設置limitapp,那么不進行校驗,控制台配置時,默認會給個defualt
    String limitApp = rule.getLimitApp();
    if (limitApp == null) {
        return true;
    }
    //集群模式
    if (rule.isClusterMode()) {
        return passClusterCheck(rule, context, node, acquireCount, prioritized);
    }
    //本地模式
    return passLocalCheck(rule, context, node, acquireCount, prioritized);
}

上述代碼中會通過rule.isClusterMode方法判斷是分布式限流還是本地限流,下面我們只看本地限流的代碼

private static boolean passLocalCheck(FlowRule rule, Context context, DefaultNode node, int acquireCount,
                                      boolean prioritized) {
    //節點選擇
    Node selectedNode = selectNodeByRequesterAndStrategy(rule, context, node);
    if (selectedNode == null) {
        return true;
    }
    //根據設置的規則來攔截
    return rule.getRater().canPass(selectedNode, acquireCount, prioritized);
}

其中selectNodeByRequesterAndStrategy方法用於選擇統計數據的載體用戶,rule.getRater用於選擇流控的方式(快速失敗,預熱,令牌桶)。selectNodeByRequesterAndStrategy代碼如下:

static Node selectNodeByRequesterAndStrategy(/*@NonNull*/ FlowRule rule, Context context, DefaultNode node) {
    // 控制台中配置的限流針對的來源
    String limitApp = rule.getLimitApp();
    // 關系限流策略
    int strategy = rule.getStrategy();
    // 請求的來源
    String origin = context.getOrigin();
    //origin不為`default` or `other`,並且limitApp和origin相等
    if (limitApp.equals(origin) && filterOrigin(origin)) {//1
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            // Matches limit origin, return origin statistic node.
            return context.getOriginNode();
        }
        //關系限流策略為關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_DEFAULT.equals(limitApp)) {//2
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            //這里返回ClusterNode,表示所有應用對該資源的所有請求情況
            // Return the cluster node.
            return node.getClusterNode();
        }
        //關系限流策略為關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    } else if (RuleConstant.LIMIT_APP_OTHER.equals(limitApp)
        && FlowRuleManager.isOtherOrigin(origin, rule.getResource())) {//3
        if (strategy == RuleConstant.STRATEGY_DIRECT) {
            return context.getOriginNode();
        }
        //關系限流策略為關聯或者鏈路的處理
        return selectReferenceNode(rule, context, node);
    }

    return null;
}

此方法的中的limitApp為限制的資源,strategy為流控的類型(調用方,關聯,鏈路),origin為調用來源。最后我們看一下立即通過時的代碼

public boolean canPass(Node node, int acquireCount, boolean prioritized) {
    //判斷是限流還是限制並發數量,然后獲取流量或並發數量
    int curCount = avgUsedTokens(node);
    //如果兩者相加大於限定的並發數
    if (curCount + acquireCount > count) {
       // 如果此請求是一個高優先級請求,並且限流類型為qps,則不會立即失敗,而是去占用未來的窗口
       if (prioritized && grade == RuleConstant.FLOW_GRADE_QPS) {
           long currentTime;
           long waitInMs;
           currentTime = TimeUtil.currentTimeMillis();
           waitInMs = node.tryOccupyNext(currentTime, acquireCount, count);
           if (waitInMs < OccupyTimeoutProperty.getOccupyTimeout()) {
                node.addWaitingRequest(currentTime + waitInMs, acquireCount);
                node.addOccupiedPass(acquireCount);
                sleep(waitInMs);

                // PriorityWaitException indicates that the request will pass after waiting for {@link @waitInMs}.
                throw new PriorityWaitException(waitInMs);
           }
       }
        return false;
    }
    return true;
}

四、熔斷的觸發與實現

4.1熔斷觸發條件分析

4.1.1RT降級

接下來重點來了,比如rt降級規則,我配置的是1ms超時,而此接口實際rt遠高於1ms。但是當我測試接口的時候,發現實際並沒有進入降級處理方法中。但是人舉國猛點則會進入降級代碼,查看sentinel的github上文檔后發現sentinel對於rt降級策略如下:

image.png

所以需要多次觸發才能進入熔斷機制

4.1.2異常降級

異常數量降級需要注意,此時的時間窗口是分鍾機制,官方文檔如下:

image.png

異常比例熔斷也存在最小次數的機制。

異常數熔斷由於窗口是分鍾,所以集中出現異常時,如果熔斷時間不到1min,比如10s,那么熔斷結束后,向前推一分鍾內異常數可能依舊高於最大數,會再次進入熔斷邏輯。

附上sentinel的熔斷降級熔斷文檔如下:

https://github.com/alibaba/Sentinel/wiki/%E7%86%94%E6%96%AD%E9%99%8D%E7%BA%A7

4.2熔斷源碼分析

4.2.1熔斷鏈路

對於熔斷,為了更好的配置熔斷發生后的接口返回值,我們通常使用的@SentinelResource注解,並指定blockHandler方法處理熔斷發生后的回執。對於@SentinelResource注解,查看sentinel源碼可發現其切面如下:

@Around("sentinelResourceAnnotationPointcut()")
public Object invokeResourceWithSentinel(ProceedingJoinPoint pjp) throws Throwable {
    Method originMethod = resolveMethod(pjp);
    SentinelResource annotation = originMethod.getAnnotation(SentinelResource.class);
    if (annotation == null) {
        // Should not go through here.
        throw new IllegalStateException("Wrong state for SentinelResource annotation");
    }
    String resourceName = getResourceName(annotation.value(), originMethod);
    EntryType entryType = annotation.entryType();
    int resourceType = annotation.resourceType();
    Entry entry = null;
    try {
        // 進入sentinel限流方法
        entry = SphU.entry(resourceName, resourceType, entryType, pjp.getArgs());
        Object result = pjp.proceed();
        return result;
    } catch (BlockException ex) {
        return handleBlockException(pjp, annotation, ex);
    } catch (Throwable ex) {
        Class<? extends Throwable>[] exceptionsToIgnore = annotation.exceptionsToIgnore();
        // The ignore list will be checked first.
        if (exceptionsToIgnore.length > 0 && exceptionBelongsTo(ex, exceptionsToIgnore)) {
            throw ex;
        }
        if (exceptionBelongsTo(ex, annotation.exceptionsToTrace())) {
            traceException(ex, annotation);
            return handleFallback(pjp, annotation, ex);
        }
        // No fallback function can handle the exception, so throw it out.
        throw ex;
    } finally {
        // exit時進行數據統計
        if (entry != null) {
            entry.exit(1, pjp.getArgs());
        }
    }
}

發現其對於@SentinelResource注解切面中,會在進入真實方法調用前,調用SphU.entry(resourceName, resourceType, entryType, pjp.getArgs())方法,根據前文分析可知,此處會調用ProcessorSlot.entry進入一系列功能插槽(slot chain)中,其中降級規則檢查是DegradeSlot,DegradeSlot中調用DegradeRuleManager.checkDegrade方法,進行實質的降級檢查,代碼如下:

public static void checkDegrade(ResourceWrapper resource, Context context, DefaultNode node, int count)
    throws BlockException {
    //根據resource來獲取降級策略
    Set<DegradeRule> rules = degradeRules.get(resource.getName());
    if (rules == null) {
        return;
    }
    
    for (DegradeRule rule : rules) {
        if (!rule.passCheck(context, node, count)) {
            throw new DegradeException(rule.getLimitApp(), rule);
        }
    }
}

其中degradeRules.get方法通過資源名稱,查看此資源對應的降級規則列表。DegradeRule.passCheck方法為實際降級檢查。

4.2.2熔斷處理

熔斷相關代碼如下:

public boolean passCheck(Context context, DefaultNode node, int acquireCount, Object... args) {
    //返回false直接進行降級
    if (cut.get()) {
        return false;
    }
    //降級是根據資源的全局節點來進行判斷降級策略的
    ClusterNode clusterNode = ClusterBuilderSlot.getClusterNode(this.getResource());
    if (clusterNode == null) {
        return true;
    }
    //根據響應時間降級策略
    if (grade == RuleConstant.DEGRADE_GRADE_RT) {
        //獲取節點的平均響應時間
        double rt = clusterNode.avgRt();
        if (rt < this.count) {
            passCount.set(0);
            return true;
        }
        //rtSlowRequestAmount默認是5
        // Sentinel will degrade the service only if count exceeds.
        if (passCount.incrementAndGet() < rtSlowRequestAmount) {
            return true;
        }
    // 根據異常比例降級
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_RATIO) {
        //獲取每秒異常的次數
        double exception = clusterNode.exceptionQps();
        //獲取每秒成功的次數
        double success = clusterNode.successQps();
        //獲取每秒總調用次數
        double total = clusterNode.totalQps();
        // If total amount is less than minRequestAmount, the request will pass.
        // 如果總調用次數少於5,那么不進行降級
        if (total < minRequestAmount) {
            return true;
        }
        
        // In the same aligned statistic time window,
        // "success" (aka. completed count) = exception count + non-exception count (realSuccess)
        // 獲取真實成功數
        double realSuccess = success - exception;
        if (realSuccess <= 0 && exception < minRequestAmount) {
            return true;
        }

        if (exception / success < count) {
            return true;
        }
    // 根據異常數降級
    } else if (grade == RuleConstant.DEGRADE_GRADE_EXCEPTION_COUNT) {
        double exception = clusterNode.totalException();
        if (exception < count) {
            return true;
        }
    }
    //根據設置的時間窗口進行重置
    if (cut.compareAndSet(false, true)) {
        ResetTask resetTask = new ResetTask(this);
        pool.schedule(resetTask, timeWindow, TimeUnit.SECONDS);
    }

    return false;
}

4.2.3數據統計

在接資源點調用結束后,會進入com.alibaba.csp.sentinel.Entry.exit方法,此方法最后會調用StatisticNode.addRtAndSuccess方法記錄成功數和RT。具體如下:

    @Override
    public void addRtAndSuccess(long rt, int successCount) {
        rollingCounterInSecond.addSuccess(successCount);
        rollingCounterInSecond.addRT(rt);

        rollingCounterInMinute.addSuccess(successCount);
        rollingCounterInMinute.addRT(rt);
    }


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM