上一篇中,我們介紹了dubbo的負載均衡實現,見識了幾種常用的負載均衡算法。就單個功能而言,似乎dubbo並沒有太多的突出之處。事實上,一個成功的產品不必每個地方都要打破常規。更重要的是其全局優化的架構設計,以及如何使用現有的優秀解決方案為己服務。
本篇將介紹另一種集群環境中的高可用實現:路由服務的實現。它將從另一個角度補充dubbo的集群功能完整性。
1. 路由出現的時機?
服務路由是什么?服務路由包含一條路由規則,路由規則決定了服務消費者的調用目標,即規定了服務消費者可調用哪些服務提供者。
服務路由是什么派上用場的呢?實際上,它是在進行消費都調用提供者的第一步操作。集群的幾個策略的先后為: 服務路由 -> 負載均衡 -> 集群容錯(重試);
其調用入口框架是在 org.apache.dubbo.rpc.cluster.support.FailoverClusterInvoker 中的:
@Override @SuppressWarnings({"unchecked", "rawtypes"}) public Result doInvoke(Invocation invocation, final List<Invoker<T>> invokers, LoadBalance loadbalance) throws RpcException { List<Invoker<T>> copyInvokers = invokers; checkInvokers(copyInvokers, invocation); String methodName = RpcUtils.getMethodName(invocation); int len = getUrl().getMethodParameter(methodName, RETRIES_KEY, DEFAULT_RETRIES) + 1; if (len <= 0) { len = 1; } // retry loop. RpcException le = null; // last exception. List<Invoker<T>> invoked = new ArrayList<Invoker<T>>(copyInvokers.size()); // invoked invokers. Set<String> providers = new HashSet<String>(len); for (int i = 0; i < len; i++) { //Reselect before retry to avoid a change of candidate `invokers`. //NOTE: if `invokers` changed, then `invoked` also lose accuracy. if (i > 0) { checkWhetherDestroyed(); // 服務路由,入口,由父類中調用 copyInvokers = list(invocation); // check again checkInvokers(copyInvokers, invocation); } // 負載均衡入口 Invoker<T> invoker = select(loadbalance, invocation, copyInvokers, invoked); invoked.add(invoker); RpcContext.getContext().setInvokers((List) invoked); try { Result result = invoker.invoke(invocation); if (le != null && logger.isWarnEnabled()) { logger.warn("Although retry the method " + methodName + " in the service " + getInterface().getName() + " was successful by the provider " + invoker.getUrl().getAddress() + ", but there have been failed providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le); } return result; } catch (RpcException e) { if (e.isBiz()) { // biz exception. throw e; } // 集群容錯,進行重試調用 le = e; } catch (Throwable e) { le = new RpcException(e.getMessage(), e); } finally { providers.add(invoker.getUrl().getAddress()); } } throw new RpcException(le.getCode(), "Failed to invoke the method " + methodName + " in the service " + getInterface().getName() + ". Tried " + len + " times of the providers " + providers + " (" + providers.size() + "/" + copyInvokers.size() + ") from the registry " + directory.getUrl().getAddress() + " on the consumer " + NetUtils.getLocalHost() + " using the dubbo version " + Version.getVersion() + ". Last error is: " + le.getMessage(), le.getCause() != null ? le.getCause() : le); } // org.apache.dubbo.rpc.cluster.support.AbstractClusterInvoker#list protected List<Invoker<T>> list(Invocation invocation) throws RpcException { // 直接調用對應的路徑服務的 list() 方法進行路由。 return directory.list(invocation); } // org.apache.dubbo.rpc.cluster.directory.AbstractDirectory#list @Override public List<Invoker<T>> list(Invocation invocation) throws RpcException { if (destroyed) { throw new RpcException("Directory already destroyed .url: " + getUrl()); } return doList(invocation); } // org.apache.dubbo.registry.integration.RegistryDirectory#doList @Override public List<Invoker<T>> doList(Invocation invocation) { if (forbidden) { // 1. No service provider 2. Service providers are disabled throw new RpcException(RpcException.FORBIDDEN_EXCEPTION, "No provider available from registry " + getUrl().getAddress() + " for service " + getConsumerUrl().getServiceKey() + " on consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", please check status of providers(disabled, not registered or in blacklist)."); } if (multiGroup) { return this.invokers == null ? Collections.emptyList() : this.invokers; } List<Invoker<T>> invokers = null; try { // Get invokers from cache, only runtime routers will be executed. invokers = routerChain.route(getConsumerUrl(), invocation); } catch (Throwable t) { logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t); } return invokers == null ? Collections.emptyList() : invokers; } // org.apache.dubbo.rpc.cluster.RouterChain#route public List<Invoker<T>> route(URL url, Invocation invocation) { List<Invoker<T>> finalInvokers = invokers; // 根據注冊的 routers 依次調用,過濾 finalInvokers 之后返回 for (Router router : routers) { finalInvokers = router.route(finalInvokers, url, invocation); } return finalInvokers; }
2. dubbo提供了哪些路由策略?
Dubbo 目前提供了三種服務路由實現,分別為條件路由 ConditionRouter、腳本路由 ScriptRouter 和標簽路由 TagRouter。
router 的創建時機:每次url發生變更后(如后台修改),都會觸發一次路由信息重建。
// org.apache.dubbo.registry.integration.RegistryDirectory#notify @Override public synchronized void notify(List<URL> urls) { Map<String, List<URL>> categoryUrls = urls.stream() .filter(Objects::nonNull) .filter(this::isValidCategory) .filter(this::isNotCompatibleFor26x) .collect(Collectors.groupingBy(this::judgeCategory)); List<URL> configuratorURLs = categoryUrls.getOrDefault(CONFIGURATORS_CATEGORY, Collections.emptyList()); this.configurators = Configurator.toConfigurators(configuratorURLs).orElse(this.configurators); List<URL> routerURLs = categoryUrls.getOrDefault(ROUTERS_CATEGORY, Collections.emptyList()); // 從url中取出相應的路由服務類,添加 routerChain 中,備用 toRouters(routerURLs).ifPresent(this::addRouters); // providers List<URL> providerURLs = categoryUrls.getOrDefault(PROVIDERS_CATEGORY, Collections.emptyList()); /** * 3.x added for extend URL address */ ExtensionLoader<AddressListener> addressListenerExtensionLoader = ExtensionLoader.getExtensionLoader(AddressListener.class); List<AddressListener> supportedListeners = addressListenerExtensionLoader.getActivateExtension(getUrl(), (String[]) null); if (supportedListeners != null && !supportedListeners.isEmpty()) { for (AddressListener addressListener : supportedListeners) { providerURLs = addressListener.notify(providerURLs, getConsumerUrl(),this); } } refreshOverrideAndInvoker(providerURLs); } // org.apache.dubbo.registry.integration.RegistryDirectory#toRouters /** * @param urls * @return null : no routers ,do nothing * else :routers list */ private Optional<List<Router>> toRouters(List<URL> urls) { if (urls == null || urls.isEmpty()) { return Optional.empty(); } List<Router> routers = new ArrayList<>(); for (URL url : urls) { if (EMPTY_PROTOCOL.equals(url.getProtocol())) { continue; } String routerType = url.getParameter(ROUTER_KEY); if (routerType != null && routerType.length() > 0) { url = url.setProtocol(routerType); } try { // 根據router工廠類進行創建router, 該工廠類使用 SPI 機制進行生成,實現 RouterFactory // file=org.apache.dubbo.rpc.cluster.router.file.FileRouterFactory // script=org.apache.dubbo.rpc.cluster.router.script.ScriptRouterFactory // condition=org.apache.dubbo.rpc.cluster.router.condition.ConditionRouterFactory // service=org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouterFactory // app=org.apache.dubbo.rpc.cluster.router.condition.config.AppRouterFactory // tag=org.apache.dubbo.rpc.cluster.router.tag.TagRouterFactory // mock=org.apache.dubbo.rpc.cluster.router.mock.MockRouterFactory Router router = ROUTER_FACTORY.getRouter(url); if (!routers.contains(router)) { routers.add(router); } } catch (Throwable t) { logger.error("convert router url to router error, url: " + url, t); } } return Optional.of(routers); }
所以,整體上整個router的創建,依賴於url中的router參數,用該參數找到對應的router工廠類,然后調用其 getRouter()方法生成具體的router. 我們簡單看看router的工廠類一般是什么樣的?
2.1. 路由工廠類的構建
/** * Application level router factory */ @Activate(order = 200) public class AppRouterFactory implements RouterFactory { public static final String NAME = "app"; private volatile Router router; @Override public Router getRouter(URL url) { // 一個工廠類中,只有一個單例的router if (router != null) { return router; } // 雙重鎖 懶加載 synchronized (this) { if (router == null) { router = createRouter(url); } } return router; } private Router createRouter(URL url) { return new AppRouter(url); } } // 可緩存路由 /** * Service level router factory */ @Activate(order = 300) public class ServiceRouterFactory extends CacheableRouterFactory { public static final String NAME = "service"; @Override protected Router createRouter(URL url) { return new ServiceRouter(url); } } // 條件路徑工廠類 public class ConditionRouterFactory implements RouterFactory { public static final String NAME = "condition"; @Override public Router getRouter(URL url) { // 直接new對象返回 return new ConditionRouter(url); } } // 文件路由工廠類,事實上它並不一個單純的路由工廠類,它需要依賴於別的路由工廠 public class FileRouterFactory implements RouterFactory { public static final String NAME = "file"; private RouterFactory routerFactory; // 將別的路由工廠注入進來 public void setRouterFactory(RouterFactory routerFactory) { this.routerFactory = routerFactory; } @Override public Router getRouter(URL url) { try { // Transform File URL into Script Route URL, and Load // file:///d:/path/to/route.js?router=script ==> script:///d:/path/to/route.js?type=js&rule=<file-content> String protocol = url.getParameter(ROUTER_KEY, ScriptRouterFactory.NAME); // Replace original protocol (maybe 'file') with 'script' String type = null; // Use file suffix to config script type, e.g., js, groovy ... String path = url.getPath(); if (path != null) { int i = path.lastIndexOf('.'); if (i > 0) { type = path.substring(i + 1); } } String rule = IOUtils.read(new FileReader(new File(url.getAbsolutePath()))); // FIXME: this code looks useless boolean runtime = url.getParameter(RUNTIME_KEY, false); URL script = URLBuilder.from(url) .setProtocol(protocol) .addParameter(TYPE_KEY, type) .addParameter(RUNTIME_KEY, runtime) .addParameterAndEncoded(RULE_KEY, rule) .build(); // 將重新組裝后的url,傳遞委托給注入的路由工廠進行處理 return routerFactory.getRouter(script); } catch (IOException e) { throw new IllegalStateException(e.getMessage(), e); } } } @Activate public class MockRouterFactory implements RouterFactory { public static final String NAME = "mock"; @Override public Router getRouter(URL url) { return new MockInvokersSelector(); } } // 腳本路由工廠 /** * ScriptRouterFactory * <p> * Example URLS used by Script Router Factory: * <ol> * <li> script://registryAddress?type=js&rule=xxxx * <li> script:///path/to/routerfile.js?type=js&rule=xxxx * <li> script://D:\path\to\routerfile.js?type=js&rule=xxxx * <li> script://C:/path/to/routerfile.js?type=js&rule=xxxx * </ol> * The host value in URL points out the address of the source content of the Script Router,Registry、File etc * */ public class ScriptRouterFactory implements RouterFactory { public static final String NAME = "script"; @Override public Router getRouter(URL url) { // 直接new對象返回 return new ScriptRouter(url); } } // 標簽路由工廠,可緩存路由(使用一個ConcurrentHashMap集合容器進行保存已創建的router) @Activate(order = 100) public class TagRouterFactory extends CacheableRouterFactory { public static final String NAME = "tag"; // getRouter() 由 父類統一進行框架搭建,子類只需實現 createRouter() 即可 @Override protected Router createRouter(URL url) { return new TagRouter(url); } } /** * If you want to provide a router implementation based on design of v2.7.0, please extend from this abstract class. * For 2.6.x style router, please implement and use RouterFactory directly. */ public abstract class CacheableRouterFactory implements RouterFactory { private ConcurrentMap<String, Router> routerMap = new ConcurrentHashMap<>(); @Override public Router getRouter(URL url) { return routerMap.computeIfAbsent(url.getServiceKey(), k -> createRouter(url)); } protected abstract Router createRouter(URL url); }
可以看出這些個工廠類,基本都是使用new的方法就返回了對應的路由實例類。那么是否有必要都在這些類外面包一個工廠類進行創建呢?直接創建不好嗎?事實上,這只是個一種工廠模式的最佳實踐,是為了更好的隱藏創建邏輯。
2.2. 條件路由 ConditionRouter 詳解
路由功能的實現,主要分為規則解析和規則應用兩個部分!
// 構造方法,主要是解析一些參數 public ConditionRouter(URL url) { this.url = url; // priority=1 this.priority = url.getParameter(PRIORITY_KEY, 0); // force=false this.force = url.getParameter(FORCE_KEY, false); // enabled=true this.enabled = url.getParameter(ENABLED_KEY, true); // rule=xxx // init 方法中詳細解析路由規則 init(url.getParameterAndDecoded(RULE_KEY)); } // 解析條件規則 host = 10.20.153.10 => host = 10.20.153.11 public void init(String rule) { try { if (rule == null || rule.trim().length() == 0) { throw new IllegalArgumentException("Illegal route rule!"); } // 規則如: host = 10.20.153.10 => host = 10.20.153.11 rule = rule.replace("consumer.", "").replace("provider.", ""); int i = rule.indexOf("=>"); // 如果沒有=>, 則全部路由到 該規則指定的host中 String whenRule = i < 0 ? null : rule.substring(0, i).trim(); String thenRule = i < 0 ? rule.trim() : rule.substring(i + 2).trim(); Map<String, MatchPair> when = StringUtils.isBlank(whenRule) || "true".equals(whenRule) ? new HashMap<String, MatchPair>() : parseRule(whenRule); Map<String, MatchPair> then = StringUtils.isBlank(thenRule) || "false".equals(thenRule) ? null : parseRule(thenRule); // NOTE: It should be determined on the business level whether the `When condition` can be empty or not. this.whenCondition = when; this.thenCondition = then; } catch (ParseException e) { throw new IllegalStateException(e.getMessage(), e); } } // 解析條件規則鍵值對 private static Map<String, MatchPair> parseRule(String rule) throws ParseException { Map<String, MatchPair> condition = new HashMap<String, MatchPair>(); if (StringUtils.isBlank(rule)) { return condition; } // Key-Value pair, stores both match and mismatch conditions MatchPair pair = null; // Multiple values Set<String> values = null; // ROUTE_PATTERN = Pattern.compile("([&!=,]*)\\s*([^&!=,\\s]+)"); final Matcher matcher = ROUTE_PATTERN.matcher(rule); while (matcher.find()) { // Try to match one by one String separator = matcher.group(1); String content = matcher.group(2); // Start part of the condition expression. if (StringUtils.isEmpty(separator)) { pair = new MatchPair(); condition.put(content, pair); } // The KV part of the condition expression // &host=xxx else if ("&".equals(separator)) { if (condition.get(content) == null) { pair = new MatchPair(); condition.put(content, pair); } else { pair = condition.get(content); } } // The Value in the KV part. else if ("=".equals(separator)) { if (pair == null) { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } values = pair.matches; values.add(content); } // The Value in the KV part. else if ("!=".equals(separator)) { if (pair == null) { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } values = pair.mismatches; values.add(content); } // The Value in the KV part, if Value have more than one items. else if (",".equals(separator)) { // Should be separated by ',' if (values == null || values.isEmpty()) { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } values.add(content); } else { throw new ParseException("Illegal route rule \"" + rule + "\", The error char '" + separator + "' at index " + matcher.start() + " before \"" + content + "\".", matcher.start()); } } return condition; }
2. 接下來是如何使用這些配置好的規則
路由服務由routerChain進行統一調用:
// org.apache.dubbo.rpc.cluster.RouterChain#route /** * * @param url * @param invocation * @return */ public List<Invoker<T>> route(URL url, Invocation invocation) { List<Invoker<T>> finalInvokers = invokers; for (Router router : routers) { finalInvokers = router.route(finalInvokers, url, invocation); } return finalInvokers; } // 以下是條件路由的route()實現: @Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (!enabled) { return invokers; } if (CollectionUtils.isEmpty(invokers)) { return invokers; } try { // 如果不符合路由條件,直接返回所有原樣 invokers 即可 if (!matchWhen(url, invocation)) { return invokers; } List<Invoker<T>> result = new ArrayList<Invoker<T>>(); if (thenCondition == null) { logger.warn("The current consumer in the service blacklist. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey()); return result; } for (Invoker<T> invoker : invokers) { // 否則依次匹配每個候選 invokers, 符合條件的才返回 // 具體匹配實現如下: if (matchThen(invoker.getUrl(), url)) { result.add(invoker); } } if (!result.isEmpty()) { return result; } else if (force) { logger.warn("The route result is empty and force execute. consumer: " + NetUtils.getLocalHost() + ", service: " + url.getServiceKey() + ", router: " + url.getParameterAndDecoded(RULE_KEY)); return result; } } catch (Throwable t) { logger.error("Failed to execute condition router rule: " + getUrl() + ", invokers: " + invokers + ", cause: " + t.getMessage(), t); } return invokers; } // 路由源地址檢測,檢查要調用的服務地址是否命中了條件路由的規則 boolean matchWhen(URL url, Invocation invocation) { // whenCondition 為空,代表攔截所有路徑 return CollectionUtils.isEmptyMap(whenCondition) || matchCondition(whenCondition, url, null, invocation); } // 路由目的地址匹配檢測,與路由源地址匹配模式相同,僅將 whenCondition 換為 thenCondition private boolean matchThen(URL url, URL param) { return CollectionUtils.isNotEmptyMap(thenCondition) && matchCondition(thenCondition, url, param, null); } private boolean matchCondition(Map<String, MatchPair> condition, URL url, URL param, Invocation invocation) { Map<String, String> sample = url.toMap(); boolean result = false; for (Map.Entry<String, MatchPair> matchPair : condition.entrySet()) { String key = matchPair.getKey(); String sampleValue; //get real invoked method name from invocation if (invocation != null && (METHOD_KEY.equals(key) || METHODS_KEY.equals(key))) { sampleValue = invocation.getMethodName(); } else if (ADDRESS_KEY.equals(key)) { sampleValue = url.getAddress(); } else if (HOST_KEY.equals(key)) { sampleValue = url.getHost(); } else { sampleValue = sample.get(key); // 為什么要獲取兩次 sample.get(key); ? if (sampleValue == null) { sampleValue = sample.get(key); } } if (sampleValue != null) { // 依次調用 MatchPair.isMatch() 方法,進行驗證 // 只要有一次驗證不通過,則當前 invocation 即不符合路由條件了 if (!matchPair.getValue().isMatch(sampleValue, param)) { return false; } else { result = true; } } else { //not pass the condition if (!matchPair.getValue().matches.isEmpty()) { return false; } else { result = true; } } } return result; } // 在 MatchPair 中實現具體的判定是否當前地址是否匹配路由信息 private boolean isMatch(String value, URL param) { // 只有相等匹配情況,直接取 matches 進行校驗即可 if (!matches.isEmpty() && mismatches.isEmpty()) { for (String match : matches) { // 簡單正則匹配檢測, 主要處理 * 規則 if (UrlUtils.isMatchGlobPattern(match, value, param)) { return true; } } return false; } // 只有不相等匹配情況, 直接取出 mismatches 校驗,反向輸出即可 if (!mismatches.isEmpty() && matches.isEmpty()) { for (String mismatch : mismatches) { if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) { return false; } } return true; } // 相等和不相等兩種條件都存在時,優先使用 mismatches 進行配置,然后使用 matches 匹配,即 mismatches 優先級高於 matches if (!matches.isEmpty() && !mismatches.isEmpty()) { //when both mismatches and matches contain the same value, then using mismatches first for (String mismatch : mismatches) { if (UrlUtils.isMatchGlobPattern(mismatch, value, param)) { return false; } } for (String match : matches) { if (UrlUtils.isMatchGlobPattern(match, value, param)) { return true; } } return false; } return false; } }
2.3. 腳本路由的實現 ScriptRouter
構造方法中主要解析一些必要參數,以及根據類型獲取操作系統的腳本解析引擎,非常重要。
public ScriptRouter(URL url) { this.url = url; this.priority = url.getParameter(PRIORITY_KEY, SCRIPT_ROUTER_DEFAULT_PRIORITY); // 獲取解析引擎,根據 type=javascript 等返回 engine = getEngine(url); // 獲取 rule=xxxx, 規則 rule = getRule(url); try { // 有 GroovyScriptEngineImpl, NashornScriptEngine Compilable compilable = (Compilable) engine; function = compilable.compile(rule); } catch (ScriptException e) { logger.error("route error, rule has been ignored. rule: " + rule + ", url: " + RpcContext.getContext().getUrl(), e); } }
而實際路由的方法,也是直接調用腳本引擎進行腳本解析而得:
// org.apache.dubbo.rpc.cluster.router.script.ScriptRouter#route @Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { try { // 將參數信息封裝為 Bindings, 統一傳入腳本引擎 Bindings bindings = createBindings(invokers, invocation); if (function == null) { return invokers; } // 調用腳本引擎的 function.eval() 方法,即將參數傳入規則腳本中,得到invokers // 並通過 getRoutedInvokers 將結果轉換成 List<Invoker<T>> 類型返回 return getRoutedInvokers(function.eval(bindings)); } catch (ScriptException e) { logger.error("route error, rule has been ignored. rule: " + rule + ", method:" + invocation.getMethodName() + ", url: " + RpcContext.getContext().getUrl(), e); return invokers; } } /** * create bindings for script engine */ private <T> Bindings createBindings(List<Invoker<T>> invokers, Invocation invocation) { Bindings bindings = engine.createBindings(); // create a new List of invokers bindings.put("invokers", new ArrayList<>(invokers)); bindings.put("invocation", invocation); bindings.put("context", RpcContext.getContext()); return bindings; }
上面的實現看起來還是有點抽象。我們拿出一個dubbo中的單測試樣例,看一下腳本路由的使用方式:
@Test public void testRoutePickInvokers() { // rule 寫法,即是 javascript 的語法,不過它需要調用一些java的方法,以便識別java中傳遞過來的參數以及返回結果的對接 // 該js代碼脫離了java引擎應該是不可被解析的 String rule = "var result = new java.util.ArrayList(invokers.size());" + "for (i=0;i<invokers.size(); i++){ " + // 獲取 isAvailable() 屬性進行判斷是否可將該invoker列入候選列表 "if (invokers.get(i).isAvailable()) {" + "result.add(invokers.get(i)) ;" + "}" + "} ; " + "return result;"; // 定義一個 route函數,並立即調用它,從而達到返回腳本結果的效果 String script = "function route(invokers,invocation,context){" + rule + "} route(invokers,invocation,context)"; Router router = new ScriptRouterFactory().getRouter(getRouteUrl(script)); List<Invoker<String>> invokers = new ArrayList<Invoker<String>>(); // 模型invoker 不可用 Invoker<String> invoker1 = new MockInvoker<String>(false); Invoker<String> invoker2 = new MockInvoker<String>(true); Invoker<String> invoker3 = new MockInvoker<String>(true); invokers.add(invoker1); invokers.add(invoker2); invokers.add(invoker3); List<Invoker<String>> filteredInvokers = router.route(invokers, invokers.get(0).getUrl(), new RpcInvocation()); Assertions.assertEquals(2, filteredInvokers.size()); Assertions.assertEquals(invoker2, filteredInvokers.get(0)); Assertions.assertEquals(invoker3, filteredInvokers.get(1)); }
所以,其實腳本路由可以寫得非常靈活多變,但是維護成本有點高,它不像條件路由那樣簡潔明了。需要進行反復自測試后才可配置在正式環境中。
2.4. 標簽路由 TagRouter
大概就是根據tag=xxx 選擇相應的路由地址。該router還未正式發布,不過可以看一下其大概實現:
@Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (CollectionUtils.isEmpty(invokers)) { return invokers; } // since the rule can be changed by config center, we should copy one to use. final TagRouterRule tagRouterRuleCopy = tagRouterRule; if (tagRouterRuleCopy == null || !tagRouterRuleCopy.isValid() || !tagRouterRuleCopy.isEnabled()) { return filterUsingStaticTag(invokers, url, invocation); } List<Invoker<T>> result = invokers; // 從url中取出 dubbo.tag=xxx 值 String tag = StringUtils.isEmpty(invocation.getAttachment(TAG_KEY)) ? url.getParameter(TAG_KEY) : invocation.getAttachment(TAG_KEY); // if we are requesting for a Provider with a specific tag if (StringUtils.isNotEmpty(tag)) { List<String> addresses = tagRouterRuleCopy.getTagnameToAddresses().get(tag); // filter by dynamic tag group first if (CollectionUtils.isNotEmpty(addresses)) { result = filterInvoker(invokers, invoker -> addressMatches(invoker.getUrl(), addresses)); // if result is not null OR it's null but force=true, return result directly if (CollectionUtils.isNotEmpty(result) || tagRouterRuleCopy.isForce()) { return result; } } else { // dynamic tag group doesn't have any item about the requested app OR it's null after filtered by // dynamic tag group but force=false. check static tag result = filterInvoker(invokers, invoker -> tag.equals(invoker.getUrl().getParameter(TAG_KEY))); } // If there's no tagged providers that can match the current tagged request. force.tag is set by default // to false, which means it will invoke any providers without a tag unless it's explicitly disallowed. if (CollectionUtils.isNotEmpty(result) || isForceUseTag(invocation)) { return result; } // FAILOVER: return all Providers without any tags. else { List<Invoker<T>> tmp = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), tagRouterRuleCopy.getAddresses())); return filterInvoker(tmp, invoker -> StringUtils.isEmpty(invoker.getUrl().getParameter(TAG_KEY))); } } else { // List<String> addresses = tagRouterRule.filter(providerApp); // return all addresses in dynamic tag group. List<String> addresses = tagRouterRuleCopy.getAddresses(); if (CollectionUtils.isNotEmpty(addresses)) { result = filterInvoker(invokers, invoker -> addressNotMatches(invoker.getUrl(), addresses)); // 1. all addresses are in dynamic tag group, return empty list. if (CollectionUtils.isEmpty(result)) { return result; } // 2. if there are some addresses that are not in any dynamic tag group, continue to filter using the // static tag group. } return filterInvoker(result, invoker -> { String localTag = invoker.getUrl().getParameter(TAG_KEY); return StringUtils.isEmpty(localTag) || !tagRouterRuleCopy.getTagNames().contains(localTag); }); } }
其配置格式大致如下:
String serviceStr = "---\n" + "force: false\n" + "runtime: true\n" + "enabled: false\n" + "priority: 1\n" + "key: demo-provider\n" + "tags:\n" + " - name: tag1\n" + " addresses: [\"30.5.120.37:20881\"]\n" + " - name: tag2\n" + " addresses: [\"30.5.120.37:20880\"]\n" + "...";
2.5. AppRouter + ServiceRouter
這兩個路由服務實際上不是獨立的路由實現類,它是包裝了 ConditionRouter 的實現,來完成特殊的業務邏輯。
// org.apache.dubbo.rpc.cluster.router.condition.config.AppRouter#AppRouter public AppRouter(URL url) { // 將 application=xxx 作為路由key super(url, url.getParameter(CommonConstants.APPLICATION_KEY)); this.priority = APP_ROUTER_DEFAULT_PRIORITY; } // org.apache.dubbo.rpc.cluster.router.condition.config.ListenableRouter#ListenableRouter public ListenableRouter(URL url, String ruleKey) { super(url); this.force = false; // 初始化路由服務 this.init(ruleKey); } private synchronized void init(String ruleKey) { if (StringUtils.isEmpty(ruleKey)) { return; } // + .condition-router String routerKey = ruleKey + RULE_SUFFIX; ruleRepository.addListener(routerKey, this); String rule = ruleRepository.getRule(routerKey, DynamicConfiguration.DEFAULT_GROUP); if (StringUtils.isNotEmpty(rule)) { this.process(new ConfigChangedEvent(routerKey, DynamicConfiguration.DEFAULT_GROUP, rule)); } } @Override public synchronized void process(ConfigChangedEvent event) { if (logger.isInfoEnabled()) { logger.info("Notification of condition rule, change type is: " + event.getChangeType() + ", raw rule is:\n " + event.getContent()); } if (event.getChangeType().equals(ConfigChangeType.DELETED)) { routerRule = null; conditionRouters = Collections.emptyList(); } else { try { routerRule = ConditionRuleParser.parse(event.getContent()); generateConditions(routerRule); } catch (Exception e) { logger.error("Failed to parse the raw condition rule and it will not take effect, please check " + "if the condition rule matches with the template, the raw rule is:\n " + event.getContent(), e); } } } // 進行路由服務調用時,僅把功能委托給 conditionRouters 即可 @Override public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException { if (CollectionUtils.isEmpty(invokers) || conditionRouters.size() == 0) { return invokers; } // We will check enabled status inside each router. for (Router router : conditionRouters) { invokers = router.route(invokers, url, invocation); } return invokers; }
ServiceRouter 的實現也大致一樣,只是取的 routerKey 不同而已。
// org.apache.dubbo.rpc.cluster.router.condition.config.ServiceRouter#ServiceRouter public ServiceRouter(URL url) { // 與 AppRouter 的差別在於 routerKey 取值不同 super(url, DynamicConfiguration.getRuleKey(url)); this.priority = SERVICE_ROUTER_DEFAULT_PRIORITY; } // org.apache.dubbo.common.config.configcenter.DynamicConfiguration#getRuleKey /** * The format is '{interfaceName}:[version]:[group]' * * @return */ static String getRuleKey(URL url) { return url.getColonSeparatedKey(); } // org.apache.dubbo.common.URL#getColonSeparatedKey /** * The format is "{interface}:[version]:[group]" * * @return */ public String getColonSeparatedKey() { StringBuilder serviceNameBuilder = new StringBuilder(); serviceNameBuilder.append(this.getServiceInterface()); append(serviceNameBuilder, VERSION_KEY, false); append(serviceNameBuilder, GROUP_KEY, false); return serviceNameBuilder.toString(); }
服務路由的出發點,是為了讓用戶能夠更靈活地配置一些特殊的調用場景,如跨機房調用,或者應用一些異常情況比如某實例不希望再被調用。總之,應用場景總是有的,否則就是在玩自嗨。
了解其運行原理,讓我們更清楚,我們到底在路由什么!