Sentinel整體架構


總體架構

 

上圖是來自官網的總體架構圖,這張圖上可以清晰的看到整個流量控制以責任鏈的模式進行的,每一個slot負責特定的處理,后續會給大家具體講解chain上每一個slot的功能。

  • NodeSelectorSlot 負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;
  • ClusterBuilderSlot 則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據;
  • StatisticSlot 則用於記錄、統計不同緯度的 runtime 指標監控信息;
  • FlowSlot 則用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制;
  • AuthoritySlot 則根據配置的黑白名單和調用來源信息,來做黑白名單控制;
  • DegradeSlot 則通過統計信息以及預設的規則,來做熔斷降級;
  • SystemSlot 則通過系統的狀態,例如 load1 等,來控制總的入口流量;

 

模型設計

上圖只是展示了樹狀結構,無法清晰解釋多線程下的執行態。

 

在這圖中可以清晰的看到資源的調用情況及統計的維度,包括早DefaultNode上的統計  ClusterNode上的統計,以及StatisticNode上的統計。

在執行SphU.entry之前,首先會去創建EntranceNode入口   ContextUtil.enter(contextName, origin);

    protected static Context trueEnter(String name, String origin) {
        Context context = contextHolder.get();
        if (context == null) {
            // 一個context name 對應一個線程,  一個contextname又對應一個 EntranceNode
            Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap;
            DefaultNode node = localCacheNameMap.get(name);
            // DCL
            if (node == null) {
                //  超過2000個context、  則返回null_context
                if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                    setNullContext();

                    return NULL_CONTEXT;
                } else {
                    LOCK.lock();
                    try {
                        node = contextNameNodeMap.get(name);
                        if (node == null) {
                            if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) {
                                setNullContext();
                                return NULL_CONTEXT;
                            } else {
                                // 每一個資源名稱都會對應
                                node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null);
                                // Add entrance node.
                                /**
                                 * root 本身就是EntranceNode  資源名為machine-root"
                                 * addChild,相當於放在上層node的list里面
                                 */
                                Constants.ROOT.addChild(node);

                                // copy-on-write
                                Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1);
                                newMap.putAll(contextNameNodeMap);
                                // 緩存頂層資源對應的EntranceNode
                                newMap.put(name, node);
                                contextNameNodeMap = newMap;
                            }
                        }
                    } finally {
                        LOCK.unlock();
                    }
                }
            }
            // 保存了context name  以及對應的EntranceNode
            context = new Context(node, name);
            context.setOrigin(origin);
            // 設置到ThreadLocal中
            contextHolder.set(context);
        }

        return context;
    }

實際上建立了線程與Context關系一對一關系 ,context name與EntranceNode一對一關系。接下來開始執行SphU.entry -> CtSph#entryWithPriority,  在此之前會做一些初始化,這塊后續再講解。

    private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args)
        throws BlockException {
        // 獲取當前線程中context  ThreadLocal<Context>中獲取
        Context context = ContextUtil.getContext();
        // 如果是 NullContext,那么說明 context name 超過了 2000 個,參見 ContextUtil#trueEnter
        // 這個時候,Sentinel 不再接受處理新的 context 配置,也就是不做這些新的接口的統計、限流熔斷等
        if (context instanceof NullContext) {
            // The {@link NullContext} indicates that the amount of context has exceeded the threshold,
            // so here init the entry only. No rule checking will be done.
            return new CtEntry(resourceWrapper, null, context);
        }

        if (context == null) {
            // Using default context.
            context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME);
        }

        // Global switch is close, no rule checking will do.
        if (!Constants.ON) {
            return new CtEntry(resourceWrapper, null, context);
        }

        ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /*
         * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE},
         * so no rule checking will be done.
         */
        if (chain == null) {
            return new CtEntry(resourceWrapper, null, context);
        }

        Entry e = new CtEntry(resourceWrapper, chain, context);
        try {
            chain.entry(context, resourceWrapper, null, count, prioritized, args);
        } catch (BlockException e1) {
            e.exit(count, args);
            throw e1;
        } catch (Throwable e1) {
            // This should not happen, unless there are errors existing in Sentinel internal.
            RecordLog.info("Sentinel unexpected exception", e1);
        }
        return e;
    }

這邊根據資源名稱去獲取對應的ProcessSlotChain調用鏈。也就是說相同資源名稱的DefaultNode共享同一個chain。

 

責任鏈構建

調用鏈是是一個單向鏈表,上一個slot通過Next變量來持有下一個slot的引用,下圖展示了責任鏈模式的數據結構。

 

接下來看一下如何初始化調用鏈的,CtSph#lookProcessChain

這邊有一個概念 就是一個class類型 對應一個SpiLoader對象並緩存在內存中

    ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) {
        ProcessorSlotChain chain = chainMap.get(resourceWrapper);
        // DCL
        if (chain == null) {
            synchronized (LOCK) {
                chain = chainMap.get(resourceWrapper);
                if (chain == null) {
                    // Entry size limit.
                    if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) {
                        return null;
                    }

                    // 創建資源調用鏈
                    chain = SlotChainProvider.newSlotChain();
                    Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>(
                        chainMap.size() + 1);
                    newMap.putAll(chainMap);
                    newMap.put(resourceWrapper, chain);
                    chainMap = newMap;
                }
            }
        }
        return chain;
    }

接下來會涉及到SPI相關的內容,對SPI不太熟悉的小伙伴,可以自行查閱資料。從SPI加載slot構建器

slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();

首先獲取對應class類型的SpiLoader,根據classname保存在內存中

    public static <T> SpiLoader<T> of(Class<T> service) {
        AssertUtil.notNull(service, "SPI class cannot be null");
        AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()),
                "SPI class[" + service.getName() + "] must be interface or abstract class");

        String className = service.getName();
        SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className);
        if (spiLoader == null) {
            synchronized (SpiLoader.class) {
                spiLoader = SPI_LOADER_MAP.get(className);
                if (spiLoader == null) {
                    SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service));
                    spiLoader = SPI_LOADER_MAP.get(className);
                }
            }
        }

        return spiLoader;
    }

接下來看一下loadFirstInstanceOrDefault方法

 public S loadFirstInstanceOrDefault() {
        // 加載class
        load();

        // 相同接口的實現類中  加載第一個
        for (Class<? extends S> clazz : classList) {
            if (defaultClass == null || clazz != defaultClass) {
                return createInstance(clazz);
            }
        }

        // 如果沒有 加載默認實現類
        return loadDefaultInstance();
    }
/**
     * Load the Provider class from Provider configuration file
     */
    public void load() {
        // 如果加載過了 返回
        if (!loaded.compareAndSet(false, true)) {
            return;
        }

        // 獲取文件路徑
        String fullFileName = SPI_FILE_PREFIX + service.getName();
        ClassLoader classLoader;
        if (SentinelConfig.shouldUseContextClassloader()) {
            // 線程上下文加載器
            classLoader = Thread.currentThread().getContextClassLoader();
        } else {
            // 使用加載service的加載器   可能是系統類子加載器
            classLoader = service.getClassLoader();
        }
        if (classLoader == null) {
            //  使用系統類加載器加載
            classLoader = ClassLoader.getSystemClassLoader();
        }
        Enumeration<URL> urls = null;
        try {
            urls = classLoader.getResources(fullFileName);
        } catch (IOException e) {
            fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e);
        }

        if (urls == null || !urls.hasMoreElements()) {
            RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader);
            return;
        }

        while (urls.hasMoreElements()) {
            URL url = urls.nextElement();

            InputStream in = null;
            BufferedReader br = null;
            try {
                in = url.openStream();
                br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8));
                String line;
                while ((line = br.readLine()) != null) {
                    // 每次讀取一行
                    if (StringUtil.isBlank(line)) {
                        // Skip blank line
                        continue;
                    }

                    line = line.trim();
                    int commentIndex = line.indexOf("#");
                    if (commentIndex == 0) {
                        // 跳過注釋
                        // Skip comment line
                        continue;
                    }

                    // 如果行開頭不是#  取行頭到#之間的字符串
                    if (commentIndex > 0) {
                        line = line.substring(0, commentIndex);
                    }
                    line = line.trim();

                    Class<S> clazz = null;
                    try {
                        // 加載到JVM
                        clazz = (Class<S>) Class.forName(line, false, classLoader);
                    } catch (ClassNotFoundException e) {
                        fail("class " + line + " not found", e);
                    }

                    // clazz 是否是service 的子類 或相同
                    if (!service.isAssignableFrom(clazz)) {
                        fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName);
                    }

                    // 緩存實現類
                    classList.add(clazz);
                    Spi spi = clazz.getAnnotation(Spi.class);
                    String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value();
                    if (classMap.containsKey(aliasName)) {
                        Class<? extends S> existClass = classMap.get(aliasName);
                        fail("Found repeat alias name for " + clazz.getName() + " and "
                                + existClass.getName() + ",SPI configuration file=" + fullFileName);
                    }
                    // 別名與class  緩存
                    classMap.put(aliasName, clazz);

                    if (spi != null && spi.isDefault()) {
                        if (defaultClass != null) {
                            fail("Found more than one default Provider, SPI configuration file=" + fullFileName);
                        }
                        defaultClass = clazz;
                    }

                    RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}"
                            + ", isSingleton={}, isDefault={}, order={}",
                        service.getName(), line, aliasName
                            , spi == null ? true : spi.isSingleton()
                            , spi == null ? false : spi.isDefault()
                            , spi == null ? 0 : spi.order());
                }
            } catch (IOException e) {
                fail("error reading SPI configuration file", e);
            } finally {
                closeResources(in, br);
            }
        }

        // 根據order進行排序
        sortedClassList.addAll(classList);
        Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() {
            @Override
            public int compare(Class<? extends S> o1, Class<? extends S> o2) {
                Spi spi1 = o1.getAnnotation(Spi.class);
                int order1 = spi1 == null ? 0 : spi1.order();

                Spi spi2 = o2.getAnnotation(Spi.class);
                int order2 = spi2 == null ? 0 : spi2.order();

                return Integer.compare(order1, order2);
            }
        });
    }

至此完成了chain構建器的類加載。后面通過構建器build一個調用鏈,同樣使用SPI技術,slotChainBuilder.build();

@Override
    public ProcessorSlotChain build() {
        ProcessorSlotChain chain = new DefaultProcessorSlotChain();

        // 獲取到所有的slot  並排好序
        List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted();
        for (ProcessorSlot slot : sortedSlotList) {
            // 校驗
            if (!(slot instanceof AbstractLinkedProcessorSlot)) {
                RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain");
                continue;
            }

            // ProcessorSlotChain中的end一直引用最新的slot
            //每一次的addlast 都會將當前最后一個slot的next引用到新的slot上
            //同時更新ProcessorSlotChain中的end為新的slot
            chain.addLast((AbstractLinkedProcessorSlot<?>) slot);
        }

        return chain;
    }

這就是整個資源調用鏈初始化的過程,也是基於單向鏈表的責任鏈模式的構建。

 

 

 

 
       


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM