總體架構
上圖是來自官網的總體架構圖,這張圖上可以清晰的看到整個流量控制以責任鏈的模式進行的,每一個slot負責特定的處理,后續會給大家具體講解chain上每一個slot的功能。
NodeSelectorSlot
負責收集資源的路徑,並將這些資源的調用路徑,以樹狀結構存儲起來,用於根據調用路徑來限流降級;ClusterBuilderSlot
則用於存儲資源的統計信息以及調用者信息,例如該資源的 RT, QPS, thread count 等等,這些信息將用作為多維度限流,降級的依據;StatisticSlot
則用於記錄、統計不同緯度的 runtime 指標監控信息;FlowSlot
則用於根據預設的限流規則以及前面 slot 統計的狀態,來進行流量控制;AuthoritySlot
則根據配置的黑白名單和調用來源信息,來做黑白名單控制;DegradeSlot
則通過統計信息以及預設的規則,來做熔斷降級;SystemSlot
則通過系統的狀態,例如 load1 等,來控制總的入口流量;
模型設計
上圖只是展示了樹狀結構,無法清晰解釋多線程下的執行態。
在這圖中可以清晰的看到資源的調用情況及統計的維度,包括早DefaultNode上的統計 ClusterNode上的統計,以及StatisticNode上的統計。
在執行SphU.entry之前,首先會去創建EntranceNode入口 ContextUtil.enter(contextName, origin);
protected static Context trueEnter(String name, String origin) { Context context = contextHolder.get(); if (context == null) { // 一個context name 對應一個線程, 一個contextname又對應一個 EntranceNode Map<String, DefaultNode> localCacheNameMap = contextNameNodeMap; DefaultNode node = localCacheNameMap.get(name); // DCL if (node == null) { // 超過2000個context、 則返回null_context if (localCacheNameMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { LOCK.lock(); try { node = contextNameNodeMap.get(name); if (node == null) { if (contextNameNodeMap.size() > Constants.MAX_CONTEXT_NAME_SIZE) { setNullContext(); return NULL_CONTEXT; } else { // 每一個資源名稱都會對應 node = new EntranceNode(new StringResourceWrapper(name, EntryType.IN), null); // Add entrance node. /** * root 本身就是EntranceNode 資源名為machine-root" * addChild,相當於放在上層node的list里面 */ Constants.ROOT.addChild(node); // copy-on-write Map<String, DefaultNode> newMap = new HashMap<>(contextNameNodeMap.size() + 1); newMap.putAll(contextNameNodeMap); // 緩存頂層資源對應的EntranceNode newMap.put(name, node); contextNameNodeMap = newMap; } } } finally { LOCK.unlock(); } } } // 保存了context name 以及對應的EntranceNode context = new Context(node, name); context.setOrigin(origin); // 設置到ThreadLocal中 contextHolder.set(context); } return context; }
實際上建立了線程與Context關系一對一關系 ,context name與EntranceNode一對一關系。接下來開始執行SphU.entry -> CtSph#entryWithPriority, 在此之前會做一些初始化,這塊后續再講解。
private Entry entryWithPriority(ResourceWrapper resourceWrapper, int count, boolean prioritized, Object... args) throws BlockException { // 獲取當前線程中context ThreadLocal<Context>中獲取 Context context = ContextUtil.getContext(); // 如果是 NullContext,那么說明 context name 超過了 2000 個,參見 ContextUtil#trueEnter // 這個時候,Sentinel 不再接受處理新的 context 配置,也就是不做這些新的接口的統計、限流熔斷等 if (context instanceof NullContext) { // The {@link NullContext} indicates that the amount of context has exceeded the threshold, // so here init the entry only. No rule checking will be done. return new CtEntry(resourceWrapper, null, context); } if (context == null) { // Using default context. context = InternalContextUtil.internalEnter(Constants.CONTEXT_DEFAULT_NAME); } // Global switch is close, no rule checking will do. if (!Constants.ON) { return new CtEntry(resourceWrapper, null, context); } ProcessorSlot<Object> chain = lookProcessChain(resourceWrapper); /* * Means amount of resources (slot chain) exceeds {@link Constants.MAX_SLOT_CHAIN_SIZE}, * so no rule checking will be done. */ if (chain == null) { return new CtEntry(resourceWrapper, null, context); } Entry e = new CtEntry(resourceWrapper, chain, context); try { chain.entry(context, resourceWrapper, null, count, prioritized, args); } catch (BlockException e1) { e.exit(count, args); throw e1; } catch (Throwable e1) { // This should not happen, unless there are errors existing in Sentinel internal. RecordLog.info("Sentinel unexpected exception", e1); } return e; }
這邊根據資源名稱去獲取對應的ProcessSlotChain調用鏈。也就是說相同資源名稱的DefaultNode共享同一個chain。
責任鏈構建
調用鏈是是一個單向鏈表,上一個slot通過Next變量來持有下一個slot的引用,下圖展示了責任鏈模式的數據結構。
接下來看一下如何初始化調用鏈的,CtSph#lookProcessChain
這邊有一個概念 就是一個class類型 對應一個SpiLoader對象並緩存在內存中
ProcessorSlot<Object> lookProcessChain(ResourceWrapper resourceWrapper) { ProcessorSlotChain chain = chainMap.get(resourceWrapper); // DCL if (chain == null) { synchronized (LOCK) { chain = chainMap.get(resourceWrapper); if (chain == null) { // Entry size limit. if (chainMap.size() >= Constants.MAX_SLOT_CHAIN_SIZE) { return null; } // 創建資源調用鏈 chain = SlotChainProvider.newSlotChain(); Map<ResourceWrapper, ProcessorSlotChain> newMap = new HashMap<ResourceWrapper, ProcessorSlotChain>( chainMap.size() + 1); newMap.putAll(chainMap); newMap.put(resourceWrapper, chain); chainMap = newMap; } } } return chain; }
接下來會涉及到SPI相關的內容,對SPI不太熟悉的小伙伴,可以自行查閱資料。從SPI加載slot構建器
slotChainBuilder = SpiLoader.of(SlotChainBuilder.class).loadFirstInstanceOrDefault();
首先獲取對應class類型的SpiLoader,根據classname保存在內存中
public static <T> SpiLoader<T> of(Class<T> service) { AssertUtil.notNull(service, "SPI class cannot be null"); AssertUtil.isTrue(service.isInterface() || Modifier.isAbstract(service.getModifiers()), "SPI class[" + service.getName() + "] must be interface or abstract class"); String className = service.getName(); SpiLoader<T> spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) { synchronized (SpiLoader.class) { spiLoader = SPI_LOADER_MAP.get(className); if (spiLoader == null) { SPI_LOADER_MAP.putIfAbsent(className, new SpiLoader<>(service)); spiLoader = SPI_LOADER_MAP.get(className); } } } return spiLoader; }
接下來看一下loadFirstInstanceOrDefault方法
public S loadFirstInstanceOrDefault() { // 加載class load(); // 相同接口的實現類中 加載第一個 for (Class<? extends S> clazz : classList) { if (defaultClass == null || clazz != defaultClass) { return createInstance(clazz); } } // 如果沒有 加載默認實現類 return loadDefaultInstance(); }
/** * Load the Provider class from Provider configuration file */ public void load() { // 如果加載過了 返回 if (!loaded.compareAndSet(false, true)) { return; } // 獲取文件路徑 String fullFileName = SPI_FILE_PREFIX + service.getName(); ClassLoader classLoader; if (SentinelConfig.shouldUseContextClassloader()) { // 線程上下文加載器 classLoader = Thread.currentThread().getContextClassLoader(); } else { // 使用加載service的加載器 可能是系統類子加載器 classLoader = service.getClassLoader(); } if (classLoader == null) { // 使用系統類加載器加載 classLoader = ClassLoader.getSystemClassLoader(); } Enumeration<URL> urls = null; try { urls = classLoader.getResources(fullFileName); } catch (IOException e) { fail("Error locating SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader, e); } if (urls == null || !urls.hasMoreElements()) { RecordLog.warn("No SPI configuration file, filename=" + fullFileName + ", classloader=" + classLoader); return; } while (urls.hasMoreElements()) { URL url = urls.nextElement(); InputStream in = null; BufferedReader br = null; try { in = url.openStream(); br = new BufferedReader(new InputStreamReader(in, StandardCharsets.UTF_8)); String line; while ((line = br.readLine()) != null) { // 每次讀取一行 if (StringUtil.isBlank(line)) { // Skip blank line continue; } line = line.trim(); int commentIndex = line.indexOf("#"); if (commentIndex == 0) { // 跳過注釋 // Skip comment line continue; } // 如果行開頭不是# 取行頭到#之間的字符串 if (commentIndex > 0) { line = line.substring(0, commentIndex); } line = line.trim(); Class<S> clazz = null; try { // 加載到JVM clazz = (Class<S>) Class.forName(line, false, classLoader); } catch (ClassNotFoundException e) { fail("class " + line + " not found", e); } // clazz 是否是service 的子類 或相同 if (!service.isAssignableFrom(clazz)) { fail("class " + clazz.getName() + "is not subtype of " + service.getName() + ",SPI configuration file=" + fullFileName); } // 緩存實現類 classList.add(clazz); Spi spi = clazz.getAnnotation(Spi.class); String aliasName = spi == null || "".equals(spi.value()) ? clazz.getName() : spi.value(); if (classMap.containsKey(aliasName)) { Class<? extends S> existClass = classMap.get(aliasName); fail("Found repeat alias name for " + clazz.getName() + " and " + existClass.getName() + ",SPI configuration file=" + fullFileName); } // 別名與class 緩存 classMap.put(aliasName, clazz); if (spi != null && spi.isDefault()) { if (defaultClass != null) { fail("Found more than one default Provider, SPI configuration file=" + fullFileName); } defaultClass = clazz; } RecordLog.info("[SpiLoader] Found SPI implementation for SPI {}, provider={}, aliasName={}" + ", isSingleton={}, isDefault={}, order={}", service.getName(), line, aliasName , spi == null ? true : spi.isSingleton() , spi == null ? false : spi.isDefault() , spi == null ? 0 : spi.order()); } } catch (IOException e) { fail("error reading SPI configuration file", e); } finally { closeResources(in, br); } } // 根據order進行排序 sortedClassList.addAll(classList); Collections.sort(sortedClassList, new Comparator<Class<? extends S>>() { @Override public int compare(Class<? extends S> o1, Class<? extends S> o2) { Spi spi1 = o1.getAnnotation(Spi.class); int order1 = spi1 == null ? 0 : spi1.order(); Spi spi2 = o2.getAnnotation(Spi.class); int order2 = spi2 == null ? 0 : spi2.order(); return Integer.compare(order1, order2); } }); }
至此完成了chain構建器的類加載。后面通過構建器build一個調用鏈,同樣使用SPI技術,slotChainBuilder.build();
@Override public ProcessorSlotChain build() { ProcessorSlotChain chain = new DefaultProcessorSlotChain(); // 獲取到所有的slot 並排好序 List<ProcessorSlot> sortedSlotList = SpiLoader.of(ProcessorSlot.class).loadInstanceListSorted(); for (ProcessorSlot slot : sortedSlotList) { // 校驗 if (!(slot instanceof AbstractLinkedProcessorSlot)) { RecordLog.warn("The ProcessorSlot(" + slot.getClass().getCanonicalName() + ") is not an instance of AbstractLinkedProcessorSlot, can't be added into ProcessorSlotChain"); continue; } // ProcessorSlotChain中的end一直引用最新的slot //每一次的addlast 都會將當前最后一個slot的next引用到新的slot上 //同時更新ProcessorSlotChain中的end為新的slot chain.addLast((AbstractLinkedProcessorSlot<?>) slot); } return chain; }
這就是整個資源調用鏈初始化的過程,也是基於單向鏈表的責任鏈模式的構建。