dubbo系列六、dubbo路由規則以及tag路由


dubbo路由

1.dubbo路由簡介

dubbo路由的作用是在RegistryDirectory獲取到Invoker集合后,先根據路由集合進行路由過濾,路由集合即RegistryDirectory.routers,默認是[TagRouter,MockInvokersSelector],如果使用了條件路由則是[ConditionRouter, TagRouter,MockInvokersSelector],其中ConditionRouter是條件路由,由ConditionRouterFactory創建,TagRouter是標簽路由,dubbo2.6.5新增,MockInvokersSelector是mock路由,用於mock降級。此外還有個腳本路由ScriptRouter,由ScriptRouterFactory創建。RouterFactory是個SPI擴展,可以使用它擴展新的路由規則。TagRouter和MockInvokersSelector是在RegistryDirectory創建時候通過setRouters自動增加,沒有對應的XXXRouterFactory,那么ConditionRouter在哪里創建的呢?

2.dubbo路由的創建

路由的創建在com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)內,該方法在dubbo consumer啟動時候調用和zk節點providers、configurators、routers發生變化時候,zk觸發consumer端執行。

@Override
public synchronized void notify(List<URL> urls) {
    List<URL> invokerUrls = new ArrayList<URL>();
    List<URL> routerUrls = new ArrayList<URL>();
    List<URL> configuratorUrls = new ArrayList<URL>();
    for (URL url : urls) {
        String protocol = url.getProtocol();
        String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
        if (Constants.ROUTERS_CATEGORY.equals(category)
            || Constants.ROUTE_PROTOCOL.equals(protocol)) {
            routerUrls.add(url);//把routers節點下的url保存到routerUrls集合
        } else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
                   || Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
            configuratorUrls.add(url);
        } else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
            invokerUrls.add(url);
        } else {
            logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
        }
    }
    //其它省略
    // routers
    if (routerUrls != null && !routerUrls.isEmpty()) {
        List<Router> routers = toRouters(routerUrls);//toRouters是把zk上routers節點轉換為路由集合
        if (routers != null) { // null - do nothing
            setRouters(routers);
        }
    }
    //其它省略
}

接着看toRouters,代碼如下

private List<Router> toRouters(List<URL> urls) {
    List<Router> routers = new ArrayList<Router>();
    if (urls == null || urls.isEmpty()) {
        return routers;
    }
    if (urls != null && !urls.isEmpty()) {
        for (URL url : urls) {
            if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {//routers節點下的url是empty協議,忽略,繼續遍歷
                continue;
            }
            String routerType = url.getParameter(Constants.ROUTER_KEY);//獲取router url上的router節點值
            if (routerType != null && routerType.length() > 0) {
                url = url.setProtocol(routerType);//路由協議router://轉換為具體的協議,比如condition://協議
            }
            try {
                Router router = routerFactory.getRouter(url);//dubbo spi機制獲取對應的路由
                if (!routers.contains(router))
                    routers.add(router);
            } catch (Throwable t) {
                logger.error("convert router url to router error, url: " + url, t);
            }
        }
    }
    return routers;
}

toRouters邏輯也簡單,如果是router url是empty協議,忽略,繼續遍歷,接着獲取router url上的router節點值,然后根據spi機制獲取對應的路由對象,最后返回獲取的路由集合。router url具體例子如下

route://0.0.0.0/org.pangu.api.ProductService
category=routers
dynamic=false
enabled=false
force=false
name=zzz
priority=1000
router=condition
rule=method = findProduct => provider.host = 192.168.5.1
runtime=false

router=condition表示使用條件路由,即生成的就是ConditionRouter,條件路由具體url和解釋如下

condition://0.0.0.0/org.pangu.api.ProductService    // condition://表示路由規則類型,支持條件路由規則和腳本路由規則,可擴展,必填。0.0.0.0表示對所有ip地址生效,如果想對某個ip生效,改為具體的ip。org.pangu.api.ProductService表示服務,說明只針對org.pangu.api.ProductService生效。
category=routers				//動態配置類型,必填
dynamic=false					//說明在zk上是之久節點,當注冊放退出,節點依然保持在zk上。默認是false,表示持久保持。必填
enabled=true					//覆蓋規則是否生效,默認生效true。選填
force=false						//是否強制執行,默認false。選填
priority=1000					//路由規則優先級,越大優先級越高,默認0
router=condition				//路由類型,condition表示條件路由
rule=method = findProduct => provider.host = 192.168.5.1	//路由規則內容,意思是findProduct方法請求指向192.168.5.1
runtime=false					//在請求時候是否執行路由過濾,默認false,不執行,對條件路由來說為true的話,影響性能

上面含義是:對於org.pangu.api.ProductService服務來說,所有的consumer端執行findProduct方法,請求都指向192.168.5.1。

toRouters獲取zk上的路由規則(通常是條件路由),接着在setRouters內又增加了MockInvokersSelector、TagRouter,因此最終RegistryDirectory的路由集合是[ConditionRouter, TagRouter,MockInvokersSelector]

3.dubbo路由的請求處理

在dubbo請求過程中,首先RegistryDirectory獲取所有的Invoker集合,接着使用路由過濾,最后使用負載均衡策略獲取一個Invoker進行調用。使用路由過濾代碼如下

//RegistryDirectory.list(Invocation)
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
    if (destroyed) {
        throw new RpcException("Directory already destroyed .url: " + getUrl());
    }
    List<Invoker<T>> invokers = doList(invocation);//代碼@1 所有所有引用的invoker集合
    List<Router> localRouters = this.routers; // local reference
    if (localRouters != null && !localRouters.isEmpty()) {
        for (Router router : localRouters) {//遍歷RegistryDirectory.routers集合,執行路由過濾
            try {
                if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
                    //路由的url為null或者runtime=true時候才執行路由過濾
                    invokers = router.route(invokers, getConsumerUrl(), invocation);
                }
            } catch (Throwable t) {
                logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
            }
        }
    }
    return invokers;
}

遍歷RegistryDirectory.routers集合,執行路由過濾,只有路由的url為null或者runtime=true時候才執行路由過濾,對於ConditionRouter來說,url上runtime=false,因此不執行;對於TagRouter來說runtime=true(因為TagRouter創建時候url上runtime=true)因此執行TagRouter。對於MockInvokersSelector來說,url是null,因此執行。

這里特別說明下,條件路由是在zk上routers節點變化時候,zk觸發通知consumer執行notify操作,從而refreshInvoker重寫刷新了consumer端持有的Invoker集合,因此在代碼@1處的doList操作,獲取到的就是經過條件路由過濾后的Invoker集合。為什么條件路由通常不在請求時候進行過濾呢?因為每次請求執行一次ConditionRouter,耗費性能,這也是為什么條件路由的runtime=false原因。

4.路由的具體實現

4.1.ConditionRouter實現

根據Url的鍵rule獲取對應的規則字符串,以=>為界,把規則分成兩段,前面為whenRule消費者匹配條件,后面為thenRule是提供者地址列表的過濾條件。具體是根據正則規則進行匹配,有點麻煩,就不分析記錄。

4.2.TagRouter實現

tag路由是dubbo2.6.6新增的功能,功能又簡單實用,常用於流量隔離,可用於灰度、藍綠。具體使用方法是provider端新增dubbo.provider.tag=xxx,針對全局生效。或者針對具體服務@Service(tag=xxx)。consumer端使用,要RpcContext.getContext().setAttachment("dubbo.tag", "xxx");,這樣就可以實現流量隔離(比如套多測試環境),訪問指定的tag服務。TagRouter具體路由代碼如下:

//com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter.route(List<Invoker<T>>, URL, Invocation)
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
    // filter
    List<Invoker<T>> result = new ArrayList<Invoker<T>>();
    // Dynamic param
    String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);//獲取tag參數
    // Tag request
    if (!StringUtils.isEmpty(tag)) {
        // Select tag invokers first
        for (Invoker<T> invoker : invokers) {
            if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
                result.add(invoker);//保存tag相同的Invoker
            }
        }
    }
    // If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
    if (result.isEmpty()) {
        // Only forceTag = true force match, otherwise downgrade
        String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);//dubbo.force.tag,tag降級
        if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
            for (Invoker<T> invoker : invokers) {
                if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {//獲取provider端沒有設置tag的Invoker
                    result.add(invoker);
                }
            }
        }
    }
    return result;
}

具體邏輯如下:

獲取隱式參數dubbo.tag的值,和Invoker集合的tag相同,則把匹配的Invoker集合作為tag過濾結果返回。

如果consuemr調用沒有隱式參數dubbo.tag,獲取consumer請求的dubbo.force.tag=true,則結果集合是空。dubbo.force.tag=false,則獲取provider端沒有設置tag的Invoker作為tag過濾結果,否則如果provider端也都設置了tag,那么就無法獲取到Invoker。

tag路由的兩個問題:

1.寫着有點麻煩,每次調用要顯示的RpcContext.getContext().setAttachment("dubbo.tag", "xxx");,才行,那么有沒有辦法可以只是設置下配置就可以實現呢?

2.在consumer一個方法內多處請求provider,第一次請求consumer 端的 dubbo.tag 通過 dubbo 的 attachment 攜帶給 provider 端,但是請求結束就被ConsumerContextFilter清空了attachment ,因此第二次開始就沒有了dubbo.tag攜帶,這個問題有沒方便辦法解決?

這個看下篇《 dubbo tag路由擴展》

4.3.MockInvokersSelector實現

mock路由是在請求有隱式參數invocation.need.mock=ture的情況下生效,獲取mock協議的Invoker。用於服務降級。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM