dubbo路由
1.dubbo路由簡介
dubbo路由的作用是在RegistryDirectory獲取到Invoker集合后,先根據路由集合進行路由過濾,路由集合即RegistryDirectory.routers,默認是[TagRouter,MockInvokersSelector],如果使用了條件路由則是[ConditionRouter, TagRouter,MockInvokersSelector],其中ConditionRouter是條件路由,由ConditionRouterFactory創建,TagRouter是標簽路由,dubbo2.6.5新增,MockInvokersSelector是mock路由,用於mock降級。此外還有個腳本路由ScriptRouter,由ScriptRouterFactory創建。RouterFactory是個SPI擴展,可以使用它擴展新的路由規則。TagRouter和MockInvokersSelector是在RegistryDirectory創建時候通過setRouters自動增加,沒有對應的XXXRouterFactory,那么ConditionRouter在哪里創建的呢?
2.dubbo路由的創建
路由的創建在com.alibaba.dubbo.registry.integration.RegistryDirectory.notify(List<URL>)
內,該方法在dubbo consumer啟動時候調用和zk節點providers、configurators、routers發生變化時候,zk觸發consumer端執行。
@Override
public synchronized void notify(List<URL> urls) {
List<URL> invokerUrls = new ArrayList<URL>();
List<URL> routerUrls = new ArrayList<URL>();
List<URL> configuratorUrls = new ArrayList<URL>();
for (URL url : urls) {
String protocol = url.getProtocol();
String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
if (Constants.ROUTERS_CATEGORY.equals(category)
|| Constants.ROUTE_PROTOCOL.equals(protocol)) {
routerUrls.add(url);//把routers節點下的url保存到routerUrls集合
} else if (Constants.CONFIGURATORS_CATEGORY.equals(category)
|| Constants.OVERRIDE_PROTOCOL.equals(protocol)) {
configuratorUrls.add(url);
} else if (Constants.PROVIDERS_CATEGORY.equals(category)) {
invokerUrls.add(url);
} else {
logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost());
}
}
//其它省略
// routers
if (routerUrls != null && !routerUrls.isEmpty()) {
List<Router> routers = toRouters(routerUrls);//toRouters是把zk上routers節點轉換為路由集合
if (routers != null) { // null - do nothing
setRouters(routers);
}
}
//其它省略
}
接着看toRouters,代碼如下
private List<Router> toRouters(List<URL> urls) {
List<Router> routers = new ArrayList<Router>();
if (urls == null || urls.isEmpty()) {
return routers;
}
if (urls != null && !urls.isEmpty()) {
for (URL url : urls) {
if (Constants.EMPTY_PROTOCOL.equals(url.getProtocol())) {//routers節點下的url是empty協議,忽略,繼續遍歷
continue;
}
String routerType = url.getParameter(Constants.ROUTER_KEY);//獲取router url上的router節點值
if (routerType != null && routerType.length() > 0) {
url = url.setProtocol(routerType);//路由協議router://轉換為具體的協議,比如condition://協議
}
try {
Router router = routerFactory.getRouter(url);//dubbo spi機制獲取對應的路由
if (!routers.contains(router))
routers.add(router);
} catch (Throwable t) {
logger.error("convert router url to router error, url: " + url, t);
}
}
}
return routers;
}
toRouters邏輯也簡單,如果是router url是empty協議,忽略,繼續遍歷,接着獲取router url上的router節點值,然后根據spi機制獲取對應的路由對象,最后返回獲取的路由集合。router url具體例子如下
route://0.0.0.0/org.pangu.api.ProductService
category=routers
dynamic=false
enabled=false
force=false
name=zzz
priority=1000
router=condition
rule=method = findProduct => provider.host = 192.168.5.1
runtime=false
router=condition表示使用條件路由,即生成的就是ConditionRouter,條件路由具體url和解釋如下
condition://0.0.0.0/org.pangu.api.ProductService // condition://表示路由規則類型,支持條件路由規則和腳本路由規則,可擴展,必填。0.0.0.0表示對所有ip地址生效,如果想對某個ip生效,改為具體的ip。org.pangu.api.ProductService表示服務,說明只針對org.pangu.api.ProductService生效。
category=routers //動態配置類型,必填
dynamic=false //說明在zk上是之久節點,當注冊放退出,節點依然保持在zk上。默認是false,表示持久保持。必填
enabled=true //覆蓋規則是否生效,默認生效true。選填
force=false //是否強制執行,默認false。選填
priority=1000 //路由規則優先級,越大優先級越高,默認0
router=condition //路由類型,condition表示條件路由
rule=method = findProduct => provider.host = 192.168.5.1 //路由規則內容,意思是findProduct方法請求指向192.168.5.1
runtime=false //在請求時候是否執行路由過濾,默認false,不執行,對條件路由來說為true的話,影響性能
上面含義是:對於org.pangu.api.ProductService服務來說,所有的consumer端執行findProduct方法,請求都指向192.168.5.1。
toRouters獲取zk上的路由規則(通常是條件路由),接着在setRouters內又增加了MockInvokersSelector、TagRouter,因此最終RegistryDirectory的路由集合是[ConditionRouter, TagRouter,MockInvokersSelector]
3.dubbo路由的請求處理
在dubbo請求過程中,首先RegistryDirectory獲取所有的Invoker集合,接着使用路由過濾,最后使用負載均衡策略獲取一個Invoker進行調用。使用路由過濾代碼如下
//RegistryDirectory.list(Invocation)
@Override
public List<Invoker<T>> list(Invocation invocation) throws RpcException {
if (destroyed) {
throw new RpcException("Directory already destroyed .url: " + getUrl());
}
List<Invoker<T>> invokers = doList(invocation);//代碼@1 所有所有引用的invoker集合
List<Router> localRouters = this.routers; // local reference
if (localRouters != null && !localRouters.isEmpty()) {
for (Router router : localRouters) {//遍歷RegistryDirectory.routers集合,執行路由過濾
try {
if (router.getUrl() == null || router.getUrl().getParameter(Constants.RUNTIME_KEY, false)) {
//路由的url為null或者runtime=true時候才執行路由過濾
invokers = router.route(invokers, getConsumerUrl(), invocation);
}
} catch (Throwable t) {
logger.error("Failed to execute router: " + getUrl() + ", cause: " + t.getMessage(), t);
}
}
}
return invokers;
}
遍歷RegistryDirectory.routers集合,執行路由過濾,只有路由的url為null或者runtime=true時候才執行路由過濾,對於ConditionRouter來說,url上runtime=false,因此不執行;對於TagRouter來說runtime=true(因為TagRouter創建時候url上runtime=true)因此執行TagRouter。對於MockInvokersSelector來說,url是null,因此執行。
這里特別說明下,條件路由是在zk上routers節點變化時候,zk觸發通知consumer執行notify操作,從而refreshInvoker重寫刷新了consumer端持有的Invoker集合,因此在代碼@1處的doList操作,獲取到的就是經過條件路由過濾后的Invoker集合。為什么條件路由通常不在請求時候進行過濾呢?因為每次請求執行一次ConditionRouter,耗費性能,這也是為什么條件路由的runtime=false原因。
4.路由的具體實現
4.1.ConditionRouter實現
根據Url的鍵rule獲取對應的規則字符串,以=>為界,把規則分成兩段,前面為whenRule消費者匹配條件,后面為thenRule是提供者地址列表的過濾條件。具體是根據正則規則進行匹配,有點麻煩,就不分析記錄。
4.2.TagRouter實現
tag路由是dubbo2.6.6新增的功能,功能又簡單實用,常用於流量隔離,可用於灰度、藍綠。具體使用方法是provider端新增dubbo.provider.tag=xxx,針對全局生效。或者針對具體服務@Service(tag=xxx)。consumer端使用,要RpcContext.getContext().setAttachment("dubbo.tag", "xxx");
,這樣就可以實現流量隔離(比如套多測試環境),訪問指定的tag服務。TagRouter具體路由代碼如下:
//com.alibaba.dubbo.rpc.cluster.router.tag.TagRouter.route(List<Invoker<T>>, URL, Invocation)
@Override
public <T> List<Invoker<T>> route(List<Invoker<T>> invokers, URL url, Invocation invocation) throws RpcException {
// filter
List<Invoker<T>> result = new ArrayList<Invoker<T>>();
// Dynamic param
String tag = RpcContext.getContext().getAttachment(Constants.TAG_KEY);//獲取tag參數
// Tag request
if (!StringUtils.isEmpty(tag)) {
// Select tag invokers first
for (Invoker<T> invoker : invokers) {
if (tag.equals(invoker.getUrl().getParameter(Constants.TAG_KEY))) {
result.add(invoker);//保存tag相同的Invoker
}
}
}
// If Constants.REQUEST_TAG_KEY unspecified or no invoker be selected, downgrade to normal invokers
if (result.isEmpty()) {
// Only forceTag = true force match, otherwise downgrade
String forceTag = RpcContext.getContext().getAttachment(Constants.FORCE_USE_TAG);//dubbo.force.tag,tag降級
if (StringUtils.isEmpty(forceTag) || "false".equals(forceTag)) {
for (Invoker<T> invoker : invokers) {
if (StringUtils.isEmpty(invoker.getUrl().getParameter(Constants.TAG_KEY))) {//獲取provider端沒有設置tag的Invoker
result.add(invoker);
}
}
}
}
return result;
}
具體邏輯如下:
獲取隱式參數dubbo.tag的值,和Invoker集合的tag相同,則把匹配的Invoker集合作為tag過濾結果返回。
如果consuemr調用沒有隱式參數dubbo.tag,獲取consumer請求的dubbo.force.tag=true,則結果集合是空。dubbo.force.tag=false,則獲取provider端沒有設置tag的Invoker作為tag過濾結果,否則如果provider端也都設置了tag,那么就無法獲取到Invoker。
tag路由的兩個問題:
1.寫着有點麻煩,每次調用要顯示的RpcContext.getContext().setAttachment("dubbo.tag", "xxx");
,才行,那么有沒有辦法可以只是設置下配置就可以實現呢?
2.在consumer一個方法內多處請求provider,第一次請求consumer 端的 dubbo.tag 通過 dubbo 的 attachment 攜帶給 provider 端,但是請求結束就被ConsumerContextFilter清空了attachment ,因此第二次開始就沒有了dubbo.tag攜帶,這個問題有沒方便辦法解決?
這個看下篇《 dubbo tag路由擴展》
4.3.MockInvokersSelector實現
mock路由是在請求有隱式參數invocation.need.mock=ture的情況下生效,獲取mock協議的Invoker。用於服務降級。