首先還是Spring碰到dubbo的標簽之后,會使用parseCustomElement解析dubbo標簽,使用的解析器是dubbo的DubboBeanDefinitionParser,解析完成之后返回BeanDefinition給Spring管理。
服務消費者端對應的是ReferenceBean,實現了ApplicationContextAware接口,Spring會在Bean的實例化那一步回調setApplicationContext方法。也實現了InitializingBean接口,接着會回調afterPropertySet方法。還實現了FactoryBean接口,實現FactoryBean可以在后期獲取bean的時候做一些操作,dubbo在這個時候做初始化。另外ReferenceBean還實現了DisposableBean,會在bean銷毀的時候調用destory方法。
消費者的初始化是在ReferenceBean的init方法中執行,分為兩種情況:
reference標簽中沒有配置init屬性,此時是延遲初始化的,也就是只有等到bean引用被注入到其他Bean中,或者調用getBean獲取這個Bean的時候,才會初始化。比如在這里的例子里reference沒有配置init屬性,只有等到HelloService helloService = (HelloService) applicationContext.getBean("helloService");
這句getBean的時候,才會開始調用init方法進行初始化。
另外一種情況是立即初始化,即是如果reference標簽中init屬性配置為true,會立即進行初始化(也就是上面說到的實現了FactoryBean接口)。
初始化開始
這里以沒有配置init的reference為例,只要不注入bean或者不調用getBean獲取bean的時候,就不會被初始化。HelloService helloService = (HelloService) applicationContext.getBean("helloService");
另外在ReferenceBean這個類在Spring中初始化的時候,有幾個靜態變量會被初始化:
1 2 3 4 5
private static final Protocol refprotocol = ExtensionLoader.getExtensionLoader(Protocol.class).getAdaptiveExtension(); private static final Cluster cluster = ExtensionLoader.getExtensionLoader(Cluster.class).getAdaptiveExtension(); private static final ProxyFactory proxyFactory = ExtensionLoader.getExtensionLoader(ProxyFactory.class).getAdaptiveExtension();
這幾個變量的初始化是根據dubbo的SPI擴展機制動態生成的代碼:
refprotocol:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Protocol$Adpative implements com.alibaba.dubbo.rpc.Protocol { public com.alibaba.dubbo.rpc.Invoker refer(java.lang.Class arg0, com.alibaba.dubbo.common.URL arg1) throws java.lang.Class { if (arg1 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg1; String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.refer(arg0, arg1); } public com.alibaba.dubbo.rpc.Exporter export(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() ); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.Protocol) name from url(" + url.toString() + ") use keys([protocol])"); com.alibaba.dubbo.rpc.Protocol extension = (com.alibaba.dubbo.rpc.Protocol)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.Protocol.class).getExtension(extName); return extension.export(arg0); } public void destroy() { throw new UnsupportedOperationException("method public abstract void com.alibaba.dubbo.rpc.Protocol.destroy() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } public int getDefaultPort() { throw new UnsupportedOperationException("method public abstract int com.alibaba.dubbo.rpc.Protocol.getDefaultPort() of interface com.alibaba.dubbo.rpc.Protocol is not adaptive method!"); } }
cluster:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class Cluster$Adpative implements com.alibaba.dubbo.rpc.cluster.Cluster { public com.alibaba.dubbo.rpc.Invoker join(com.alibaba.dubbo.rpc.cluster.Directory arg0) throws com.alibaba.dubbo.rpc.cluster.Directory { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.cluster.Directory argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("cluster", "failover"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.cluster.Cluster) name from url(" + url.toString() + ") use keys([cluster])"); com.alibaba.dubbo.rpc.cluster.Cluster extension = (com.alibaba.dubbo.rpc.cluster.Cluster)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.cluster.Cluster.class).getExtension(extName); return extension.join(arg0); } }
proxyFactory:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
import com.alibaba.dubbo.common.extension.ExtensionLoader; public class ProxyFactory$Adpative implements com.alibaba.dubbo.rpc.ProxyFactory { public java.lang.Object getProxy(com.alibaba.dubbo.rpc.Invoker arg0) throws com.alibaba.dubbo.rpc.Invoker { if (arg0 == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument == null"); if (arg0.getUrl() == null) throw new IllegalArgumentException("com.alibaba.dubbo.rpc.Invoker argument getUrl() == null");com.alibaba.dubbo.common.URL url = arg0.getUrl(); String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getProxy(arg0); } public com.alibaba.dubbo.rpc.Invoker getInvoker(java.lang.Object arg0, java.lang.Class arg1, com.alibaba.dubbo.common.URL arg2) throws java.lang.Object { if (arg2 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg2; String extName = url.getParameter("proxy", "javassist"); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.rpc.ProxyFactory) name from url(" + url.toString() + ") use keys([proxy])"); com.alibaba.dubbo.rpc.ProxyFactory extension = (com.alibaba.dubbo.rpc.ProxyFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.rpc.ProxyFactory.class).getExtension(extName); return extension.getInvoker(arg0, arg1, arg2); } }
初始化入口
初始化的入口在ReferenceConfig的get()方法:
1 2 3 4 5 6 7 8 9
public synchronized T get() { if (destroyed){ throw new IllegalStateException("Already destroyed!"); } if (ref == null) { init(); } return ref; }
init()方法會先檢查初始化所有的配置信息,然后調用ref = createProxy(map);
創建代理,消費者最終得到的是服務的代理。初始化主要做的事情就是引用對應的遠程服務,大概的步驟:
監聽注冊中心
連接服務提供者端進行服務引用
創建服務代理並返回
文檔上關於Zookeeper作為注冊中心時,服務消費者啟動時要做的事情有:
訂閱/dubbo/com.foo.BarService/providers目錄下的提供者URL地址。 並向/dubbo/com.foo.BarService/consumers目錄下寫入自己的URL地址。
createProxy方法
init()中createProxy方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
private T createProxy(Map<String, String> map) { //先判斷是否是本地服務引用injvm //判斷是否是點對點直連 //判斷是否是通過注冊中心連接 //然后是服務的引用 //這里url為 //registry://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-consumer&dubbo=2.5.3&pid=12272& //refer=application%3Ddubbo-consumer%26dubbo%3D2.5.3%26 //interface%3Ddubbo.common.hello.service.HelloService%26 //methods%3DsayHello%26pid%3D12272%26side%3D //consumer%26timeout%3D100000%26timestamp%3D1489318676447& //registry=zookeeper×tamp=1489318676641 //引用遠程服務由Protocol的實現來處理 refprotocol.refer(interfaceClass, url); //最后返回服務代理 return (T) proxyFactory.getProxy(invoker); }
這里refprotocol是上面生成的代碼,會根據協議不同選擇不同的Protocol協議。
引用遠程服務
對於服務引用refprotocol.refer(interfaceClass, url)
會首先進入ProtocolListenerWrapper的refer方法,然后在進入ProtocolFilterWrapper的refer方法,然后再進入RegistryProtocol的refer方法,這里的url協議是registry,所以上面兩個Wrapper中不做處理,直接進入了RegistryProtocol,看下RegistryProtocol中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { //這里獲得的url是 //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-consumer&dubbo=2.5.3&pid=12272& //refer=application%3Ddubbo-consumer%26dubbo%3D2.5.3%26 //interface%3Ddubbo.common.hello.service.HelloService%26 //methods%3DsayHello%26pid%3D12272%26side%3D //consumer%26timeout%3D100000%26 //timestamp%3D1489318676447×tamp=1489318676641 url = url.setProtocol(url.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_REGISTRY)).removeParameter(Constants.REGISTRY_KEY); //根據url獲取Registry對象 //先連接注冊中心,把消費者注冊到注冊中心 Registry registry = registryFactory.getRegistry(url); //判斷引用是否是注冊中心RegistryService,如果是直接返回剛得到的注冊中心服務 if (RegistryService.class.equals(type)) { return proxyFactory.getInvoker((T) registry, type, url); } //以下是普通服務,需要進入注冊中心和集群下面的邏輯 // group="a,b" or group="*" //獲取ref的各種屬性 Map<String, String> qs = StringUtils.parseQueryString(url.getParameterAndDecoded(Constants.REFER_KEY)); //獲取分組屬性 String group = qs.get(Constants.GROUP_KEY); //先判斷引用服務是否需要合並不同實現的返回結果 if (group != null && group.length() > 0 ) { if ( ( Constants.COMMA_SPLIT_PATTERN.split( group ) ).length > 1 || "*".equals( group ) ) { //使用默認的分組聚合集群策略 return doRefer( getMergeableCluster(), registry, type, url ); } } //選擇配置的集群策略(cluster="failback")或者默認策略 return doRefer(cluster, registry, type, url); }
獲取注冊中心
連接注冊中心Registry registry = registryFactory.getRegistry(url);
首先會到AbstractRegistryFactory的getRegistry方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
public Registry getRegistry(URL url) { //這里url是 //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService? //application=dubbo-consumer&dubbo=2.5.3& //interface=com.alibaba.dubbo.registry.RegistryService& //pid=12272×tamp=1489318676641 url = url.setPath(RegistryService.class.getName()) .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); //這里key是 //zookeeper://127.0.0.1:2181/com.alibaba.dubbo.registry.RegistryService String key = url.toServiceString(); // 鎖定注冊中心獲取過程,保證注冊中心單一實例 LOCK.lock(); try { Registry registry = REGISTRIES.get(key); if (registry != null) { return registry; } //這里用的是ZookeeperRegistryFactory //返回的Registry中封裝了已經連接到Zookeeper的zkClient實例 registry = createRegistry(url); if (registry == null) { throw new IllegalStateException("Can not create registry " + url); } //放到緩存中 REGISTRIES.put(key, registry); return registry; } finally { // 釋放鎖 LOCK.unlock(); } }
ZookeeperRegistryFactory的createRegistry方法:
1 2 3 4 5
public Registry createRegistry(URL url) { //直接返回一個新的ZookeeperRegistry實例 //這里的zookeeperTransporter代碼在下面,動態生成的適配類 return new ZookeeperRegistry(url, zookeeperTransporter); }
zookeeperTransporter代碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
package com.alibaba.dubbo.remoting.zookeeper; import com.alibaba.dubbo.common.extension.ExtensionLoader; public class ZookeeperTransporter$Adpative implements com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter { public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) { if (arg0 == null) throw new IllegalArgumentException("url == null"); com.alibaba.dubbo.common.URL url = arg0; String extName = url.getParameter("client", url.getParameter("transporter", "zkclient")); if(extName == null) throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])"); com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName); return extension.connect(arg0); } }
上面代碼中可以看到,如果我們沒有指定Zookeeper的client屬性,默認使用zkClient,所以上面的zookeeperTransporter是ZkclientZookeeperTransporter。
繼續看new ZookeeperRegistry(url, zookeeperTransporter);
:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30
public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { //這里會先經過AbstractRegistry的處理,然后經過FailbackRegistry的處理(解釋在下面) super(url); if (url.isAnyHost()) { throw new IllegalStateException("registry address == null"); } //服務分組,默認dubbo String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT); if (! group.startsWith(Constants.PATH_SEPARATOR)) { group = Constants.PATH_SEPARATOR + group; } //注冊中心的節點 this.root = group; //ZkclientZookeeperTransporter的connect方法 //直接返回一個ZkclientZookeeperClient實例 //具體的步驟是,new一個ZkClient實例,然后訂閱了一個狀態變化的監聽器 zkClient = zookeeperTransporter.connect(url); //添加一個狀態改變的監聽器 zkClient.addStateListener(new StateListener() { public void stateChanged(int state) { if (state == RECONNECTED) { try { recover(); } catch (Exception e) { logger.error(e.getMessage(), e); } } } }); }
AbstractRegistry的處理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
public AbstractRegistry(URL url) { //設置registryUrl setUrl(url); // 啟動文件保存定時器 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); //會先去用戶主目錄下的.dubbo目錄下加載緩存注冊中心的緩存文件比如:dubbo-registry-127.0.0.1.cache String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); File file = null; if (ConfigUtils.isNotEmpty(filename)) { file = new File(filename); if(! file.exists() && file.getParentFile() != null && ! file.getParentFile().exists()){ if(! file.getParentFile().mkdirs()){ throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); } } } this.file = file; //緩存文件存在的話就把文件讀進內存中 loadProperties(); //先獲取backup url //然后通知訂閱 notify(url.getBackupUrls()); }
notify方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
protected void notify(List<URL> urls) { if(urls == null || urls.isEmpty()) return; //getSubscribed()方法獲取訂閱者列表 for (Map.Entry<URL, Set<NotifyListener>> entry : getSubscribed().entrySet()) { URL url = entry.getKey(); if(! UrlUtils.isMatch(url, urls.get(0))) { continue; } Set<NotifyListener> listeners = entry.getValue(); if (listeners != null) { for (NotifyListener listener : listeners) { try { //通知每個監聽器 notify(url, listener, filterEmpty(url, urls)); } catch (Throwable t) { } } } } }
notify(url, listener, filterEmpty(url, urls));
代碼:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
protected void notify(URL url, NotifyListener listener, List<URL> urls) { Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { //分類 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); //通知 listener.notify(categoryList); } }
AbstractRegistry構造完,接着是FailbackRegistry的處理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public FailbackRegistry(URL url) { super(url); int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD); //啟動失敗重試定時器 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { public void run() { // 檢測並連接注冊中心 try { //重試方法由每個具體子類實現 //獲取到注冊失敗的,然后嘗試注冊 retry(); } catch (Throwable t) { // 防御性容錯 } } }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); }
這里會啟動一個新的定時線程,主要是有連接失敗的話,會進行重試連接retry();,啟動完之后返回ZookeeperRegistry中繼續處理。接下來的處理在代碼注釋中,不再詳細寫,看下一步服務的引用。
引用遠程服務
繼續看ref方法中最后一步,服務的引用,返回的是一個Invoker,return doRefer(cluster, registry, type, url);
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35
private <T> Invoker<T> doRefer(Cluster cluster, Registry registry, Class<T> type, URL url) { //初始化Directory //組裝Directory,可以看成一個消費端的List,可以隨着注冊中心的消息推送而動態的變化服務的Invoker //封裝了所有服務真正引用邏輯,覆蓋配置,路由規則等邏輯 //初始化時只需要向注冊中心發起訂閱請求,其他邏輯均是異步處理,包括服務的引用等 //緩存接口所有的提供者端Invoker以及注冊中心接口相關的配置等 RegistryDirectory<T> directory = new RegistryDirectory<T>(type, url); directory.setRegistry(registry); directory.setProtocol(protocol); //此處的subscribeUrl為 //consumer://192.168.1.100/dubbo.common.hello.service.HelloService? //application=dubbo-consumer&dubbo=2.5.3& //interface=dubbo.common.hello.service.HelloService& //methods=sayHello&pid=16409& //side=consumer&timeout=100000×tamp=1489322133987 URL subscribeUrl = new URL(Constants.CONSUMER_PROTOCOL, NetUtils.getLocalHost(), 0, type.getName(), directory.getUrl().getParameters()); if (! Constants.ANY_VALUE.equals(url.getServiceInterface()) && url.getParameter(Constants.REGISTER_KEY, true)) { //到注冊中心注冊服務 //此處regist是上面一步獲得的registry,即是ZookeeperRegistry,包含zkClient的實例 //會先經過AbstractRegistry的處理,然后經過FailbackRegistry的處理(解析在下面) registry.register(subscribeUrl.addParameters(Constants.CATEGORY_KEY, Constants.CONSUMERS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false))); } //訂閱服務 //有服務提供的時候,注冊中心會推送服務消息給消費者,消費者再進行服務的引用。 directory.subscribe(subscribeUrl.addParameter(Constants.CATEGORY_KEY, Constants.PROVIDERS_CATEGORY + "," + Constants.CONFIGURATORS_CATEGORY + "," + Constants.ROUTERS_CATEGORY)); //服務的引用與變更全部由Directory異步完成 //集群策略會將Directory偽裝成一個Invoker返回 //合並所有相同的invoker return cluster.join(directory); }
注冊中心接收到消費者發送的訂閱請求后,會根據提供者注冊服務的列表,推送服務消息給消費者。消費者端接收到注冊中心發來的提供者列表后,進行服務的引用。觸發Directory監聽器的可以是訂閱請求,覆蓋策略消息,路由策略消息。
AbstractRegistry的register方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15
public void register(URL url) { //此時url是 //consumer://192.168.1.100/dubbo.common.hello.service.HelloService? //application=dubbo-consumer& //category=consumers&check=false&dubbo=2.5.3& //interface=dubbo.common.hello.service.HelloService&methods=sayHello //&pid=16409&side=consumer&timeout=100000×tamp=1489322133987 if (url == null) { throw new IllegalArgumentException("register url == null"); } if (logger.isInfoEnabled()){ logger.info("Register: " + url); } registered.add(url); }
上面只是把url添加到registered這個set中。
接着看FailbackRegistry的register方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
public void register(URL url) { super.register(url); failedRegistered.remove(url); failedUnregistered.remove(url); try { // 向服務器端發送注冊請求 //這里調用的是ZookeeperRegistry中的doRegister方法 doRegister(url); } catch (Exception e) { Throwable t = e; // 如果開啟了啟動時檢測,則直接拋出異常 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) && url.getParameter(Constants.CHECK_KEY, true) && ! Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); boolean skipFailback = t instanceof SkipFailbackWrapperException; if (check || skipFailback) { if(skipFailback) { t = t.getCause(); } throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); } else { logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); } // 將失敗的注冊請求記錄到失敗列表,定時重試 failedRegistered.add(url); } }
接着看下doRegister(url);方法,向服務器端發送注冊請求,在ZookeeperRegistry中:
1 2 3 4 5 6 7 8
protected void doRegister(URL url) { try { //直接調用create,在AbstractZookeeperClient類中 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); } catch (Throwable e) { throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
zkClient.create()方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31
//path為 ///dubbo/dubbo.common.hello.service.HelloService/consumers/ //consumer%3A%2F%2F192.168.1.100%2F //dubbo.common.hello.service.HelloService%3Fapplication%3D //dubbo-consumer%26category%3Dconsumers%26check%3Dfalse%26 //dubbo%3D2.5.3%26interface%3D //dubbo.common.hello.service.HelloService%26 //methods%3DsayHello%26pid%3D28819%26 //side%3Dconsumer%26timeout%3D100000%26timestamp%3D1489332839677 public void create(String path, boolean ephemeral) { int i = path.lastIndexOf('/'); if (i > 0) { create(path.substring(0, i), false); } //循環完得到的path為/dubbo //dynamic=false 表示該數據為持久數據,當注冊方退出時,數據依然保存在注冊中心 if (ephemeral) { //創建臨時的節點 createEphemeral(path); } else { //創建持久的節點,/dubbo/dubbo.common.hello.service.HelloService/consumers/ //consumer%3A%2F%2F192.168.110.197%2F //dubbo.common.hello.service.HelloService%3Fapplication%3Ddubbo-consumer%26 //category%3Dconsumers%26check%3Dfalse%26 //dubbo%3D2.5.3%26interface%3D //dubbo.common.hello.service.HelloService%26 //methods%3DsayHello%26pid%3D6370%26side%3D //consumer%26timeout%3D100000%26timestamp%3D1489367959659 createPersistent(path); } }
經過上面create之后,Zookeeper中就存在了消費者需要訂閱的服務的節點:
1 2 3 4 5 6 7 8 9 10 11 12 13
/dubbo /dubbo.common.hello.service.HelloService /consumers /http://0.0.0.0:4550/?path=dubbo%2F dubbo.common.hello.service.HelloService%2F consumers%2Fconsumer%253A%252F%252F192.168.110.197%252F dubbo.common.hello.service.HelloService%253F application%253Ddubbo-consumer%2526category%253D consumers%2526check%253Dfalse%2526 dubbo%253D2.5.3%2526interface%253D dubbo.common.hello.service.HelloService%2526 methods%253DsayHello%2526pid%253D22392%2526side%253D consumer%2526timeout%253D100000%2526timestamp%253D1490063394184
消費者自己注冊到注冊中心之后,接着是訂閱服務提供者,directory.subscribe():
1 2 3 4 5 6
public void subscribe(URL url) { //設置消費者url setConsumerUrl(url); //這里的registry是ZookeeperRegistry registry.subscribe(url, this); }
看下registry.subscribe(url, this);,這里registry是ZookeeperRegistry,會先經過AbstractRegistry的處理,然后是FailbackRegistry的處理。
在AbstractRegistry中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14
//此時url為consumer://192.168.1.100/dubbo.common.hello.service.HelloService?application=dubbo-consumer& //category=providers,configurators,routers&dubbo=2.5.3&interface=dubbo.common.hello.service.HelloService&methods= //sayHello&pid=28819&side=consumer&timeout=100000×tamp=1489332839677 public void subscribe(URL url, NotifyListener listener) { //先根據url獲取已注冊的監聽器 Set<NotifyListener> listeners = subscribed.get(url); //沒有監聽器,就創建,並添加進去 if (listeners == null) { subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); listeners = subscribed.get(url); } //有監聽器,直接把當前RegistryDirectory添加進去 listeners.add(listener); }
然后是FailbackRegistry中:
1 2 3 4 5 6 7 8
public void subscribe(URL url, NotifyListener listener) { super.subscribe(url, listener); removeFailedSubscribed(url, listener); try { // 向服務器端發送訂閱請求 doSubscribe(url, listener); } catch (Exception e) {...} }
繼續看doSubscribe(url, listener);向服務端發送訂閱請求,在ZookeeperRegistry中:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
protected void doSubscribe(final URL url, final NotifyListener listener) { try { if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {... } else { List<URL> urls = new ArrayList<URL>(); for (String path : toCategoriesPath(url)) { ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); if (listeners == null) { zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); listeners = zkListeners.get(url); } //將zkClient的事件IZkChildListener轉換到registry事件NotifyListener ChildListener zkListener = listeners.get(listener); if (zkListener == null) { listeners.putIfAbsent(listener, new ChildListener() { public void childChanged(String parentPath, List<String> currentChilds) { ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); } }); zkListener = listeners.get(listener); } //創建三個節點 // /dubbo/dubbo.common.hello.service.HelloService/providers/ // /dubbo/dubbo.common.hello.service.HelloService/configurators/ // /dubbo/dubbo.common.hello.service.HelloService/routers/ //上面三個路徑會被消費者端監聽,當提供者,配置,路由發生變化之后, //注冊中心會通知消費者刷新本地緩存。 zkClient.create(path, false); List<String> children = zkClient.addChildListener(path, zkListener); if (children != null) { urls.addAll(toUrlsWithEmpty(url, path, children)); } } notify(url, listener, urls); } } catch (Throwable e) { throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); } }
服務訂閱完成之后,接着就是notify(url, listener, urls);:
會先經過FailbackRegistry將失敗的通知請求記錄到失敗列表,定時重試。
1 2 3 4 5 6 7 8 9 10 11 12 13 14
protected void notify(URL url, NotifyListener listener, List<URL> urls) { try { doNotify(url, listener, urls); } catch (Exception t) { // 將失敗的通知請求記錄到失敗列表,定時重試 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); if (listeners == null) { failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); listeners = failedNotified.get(url); } listeners.put(listener, urls); logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); } }
doNotify(url, listener, urls);:
1 2 3 4
protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { //父類實現 super.notify(url, listener, urls); }
AbstractRegistry中的doNotify實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33
protected void notify(URL url, NotifyListener listener, List<URL> urls) { Map<String, List<URL>> result = new HashMap<String, List<URL>>(); for (URL u : urls) { if (UrlUtils.isMatch(url, u)) { //不同類型的數據分開通知,providers,consumers,routers,overrides //允許只通知其中一種類型,但該類型的數據必須是全量的,不是增量的。 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); List<URL> categoryList = result.get(category); if (categoryList == null) { categoryList = new ArrayList<URL>(); result.put(category, categoryList); } categoryList.add(u); } } if (result.size() == 0) { return; } Map<String, List<URL>> categoryNotified = notified.get(url); if (categoryNotified == null) { notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); categoryNotified = notified.get(url); } //對這里得到的providers,configurators,routers分別進行通知 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { String category = entry.getKey(); List<URL> categoryList = entry.getValue(); categoryNotified.put(category, categoryList); saveProperties(url); //這里的listener是RegistryDirectory listener.notify(categoryList); } }
到RegistryDirectory中查看notify方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42
public synchronized void notify(List<URL> urls) { List<URL> invokerUrls = new ArrayList<URL>(); List<URL> routerUrls = new ArrayList<URL>(); List<URL> configuratorUrls = new ArrayList<URL>(); for (URL url : urls) { String protocol = url.getProtocol(); String category = url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); if (Constants.ROUTERS_CATEGORY.equals(category) || Constants.ROUTE_PROTOCOL.equals(protocol)) { routerUrls.add(url); } else if (Constants.CONFIGURATORS_CATEGORY.equals(category) || Constants.OVERRIDE_PROTOCOL.equals(protocol)) { configuratorUrls.add(url); } else if (Constants.PROVIDERS_CATEGORY.equals(category)) { invokerUrls.add(url); } else { logger.warn("Unsupported category " + category + " in notified url: " + url + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost()); } } // configurators 更新緩存的服務提供方配置 if (configuratorUrls != null && configuratorUrls.size() >0 ){ this.configurators = toConfigurators(configuratorUrls); } // routers//更新緩存的路由規則配置 if (routerUrls != null && routerUrls.size() >0 ){ List<Router> routers = toRouters(routerUrls); if(routers != null){ // null - do nothing setRouters(routers); } } List<Configurator> localConfigurators = this.configurators; // local reference // 合並override參數 this.overrideDirectoryUrl = directoryUrl; if (localConfigurators != null && localConfigurators.size() > 0) { for (Configurator configurator : localConfigurators) { this.overrideDirectoryUrl = configurator.configure(overrideDirectoryUrl); } } // providers //重建invoker實例 refreshInvoker(invokerUrls); }
refreshInvoker(invokerUrls);:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45
/** * 根據invokerURL列表轉換為invoker列表。轉換規則如下: * 1.如果url已經被轉換為invoker,則不在重新引用,直接從緩存中獲取,注意如果url中任何一個參數變更也會重新引用 * 2.如果傳入的invoker列表不為空,則表示最新的invoker列表 * 3.如果傳入的invokerUrl列表是空,則表示只是下發的override規則或route規則,需要重新交叉對比,決定是否需要重新引用。 * @param invokerUrls 傳入的參數不能為null */ private void refreshInvoker(List<URL> invokerUrls){ if (invokerUrls != null && invokerUrls.size() == 1 && invokerUrls.get(0) != null && Constants.EMPTY_PROTOCOL.equals(invokerUrls.get(0).getProtocol())) { this.forbidden = true; // 禁止訪問 this.methodInvokerMap = null; // 置空列表 destroyAllInvokers(); // 關閉所有Invoker } else { this.forbidden = false; // 允許訪問 Map<String, Invoker<T>> oldUrlInvokerMap = this.urlInvokerMap; // local reference if (invokerUrls.size() == 0 && this.cachedInvokerUrls != null){ invokerUrls.addAll(this.cachedInvokerUrls); } else { this.cachedInvokerUrls = new HashSet<URL>(); this.cachedInvokerUrls.addAll(invokerUrls);//緩存invokerUrls列表,便於交叉對比 } if (invokerUrls.size() ==0 ){ return; } //會重新走一遍服務的引用過程 //給每個提供者創建一個Invoker Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;// 將URL列表轉成Invoker列表 Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap); // 換方法名映射Invoker列表 // state change //如果計算錯誤,則不進行處理. if (newUrlInvokerMap == null || newUrlInvokerMap.size() == 0 ){ logger.error(new IllegalStateException("urls to invokers error .invokerUrls.size :"+invokerUrls.size() + ", invoker.size :0. urls :"+invokerUrls.toString())); return ; } //服務提供者Invoker保存在這個map中 this.methodInvokerMap = multiGroup ? toMergeMethodInvokerMap(newMethodInvokerMap) : newMethodInvokerMap; this.urlInvokerMap = newUrlInvokerMap; try{ destroyUnusedInvokers(oldUrlInvokerMap,newUrlInvokerMap); // 關閉未使用的Invoker }catch (Exception e) { logger.warn("destroyUnusedInvokers error. ", e); } } }
toInvokers(invokerUrls) 方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43 44 45 46 47 48 49 50 51 52 53 54 55 56 57 58 59 60 61 62 63 64 65 66 67 68 69 70
private Map<String, Invoker<T>> toInvokers(List<URL> urls) { Map<String, Invoker<T>> newUrlInvokerMap = new HashMap<String, Invoker<T>>(); if(urls == null || urls.size() == 0){ return newUrlInvokerMap; } Set<String> keys = new HashSet<String>(); String queryProtocols = this.queryMap.get(Constants.PROTOCOL_KEY); for (URL providerUrl : urls) { //此時url是dubbo://192.168.110.197:20880/dubbo.common.hello.service.HelloService?anyhost=true& //application=dubbo-provider&application.version=1.0&dubbo=2.5.3&environment=product& //interface=dubbo.common.hello.service.HelloService&methods=sayHello&organization=china& //owner=cheng.xi&pid=5631&side=provider×tamp=1489367571986 //從注冊中心獲取到的攜帶提供者信息的url //如果reference端配置了protocol,則只選擇匹配的protocol if (queryProtocols != null && queryProtocols.length() >0) { boolean accept = false; String[] acceptProtocols = queryProtocols.split(","); for (String acceptProtocol : acceptProtocols) { if (providerUrl.getProtocol().equals(acceptProtocol)) { accept = true; break; } } if (!accept) { continue; } } if (Constants.EMPTY_PROTOCOL.equals(providerUrl.getProtocol())) { continue; } if (! ExtensionLoader.getExtensionLoader(Protocol.class).hasExtension(providerUrl.getProtocol())) { logger.error(new IllegalStateException("Unsupported protocol " + providerUrl.getProtocol() + " in notified url: " + providerUrl + " from registry " + getUrl().getAddress() + " to consumer " + NetUtils.getLocalHost() + ", supported protocol: "+ExtensionLoader.getExtensionLoader(Protocol.class).getSupportedExtensions())); continue; } URL url = mergeUrl(providerUrl); String key = url.toFullString(); // URL參數是排序的 if (keys.contains(key)) { // 重復URL continue; } keys.add(key); // 緩存key為沒有合並消費端參數的URL,不管消費端如何合並參數,如果服務端URL發生變化,則重新refer Map<String, Invoker<T>> localUrlInvokerMap = this.urlInvokerMap; // local reference Invoker<T> invoker = localUrlInvokerMap == null ? null : localUrlInvokerMap.get(key); if (invoker == null) { // 緩存中沒有,重新refer try { boolean enabled = true; if (url.hasParameter(Constants.DISABLED_KEY)) { enabled = ! url.getParameter(Constants.DISABLED_KEY, false); } else { enabled = url.getParameter(Constants.ENABLED_KEY, true); } if (enabled) { //根據擴展點加載機制,這里使用的protocol是DubboProtocol invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl); } } catch (Throwable t) { logger.error("Failed to refer invoker for interface:"+serviceType+",url:("+url+")" + t.getMessage(), t); } if (invoker != null) { // 將新的引用放入緩存 newUrlInvokerMap.put(key, invoker); } }else { newUrlInvokerMap.put(key, invoker); } } keys.clear(); return newUrlInvokerMap; }
創建invoker invoker = new InvokerDelegete<T>(protocol.refer(serviceType, url), url, providerUrl);
:
先使用DubboProtocol的refer方法,這一步會依次調用ProtocolFIlterListenerWrapper,ProtocolFilterWrapper,DubboProtocol中的refer方法。經過兩個Wrapper中,會添加對應的InvokerListener並構建Invoker Filter鏈,在DubboProtocol中會創建一個DubboInvoker對象,該Invoker對象持有服務Class,providerUrl,負責和服務提供端通信的ExchangeClient。
接着使用得到的Invoker創建一個InvokerDelegete
在DubboProtocol中創建DubboInvoker的時候代碼如下:
1 2 3 4 5 6 7
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. //這里有一個getClients方法 DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); invokers.add(invoker); return invoker; }
查看getClients方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22
private ExchangeClient[] getClients(URL url){ //是否共享連接 boolean service_share_connect = false; int connections = url.getParameter(Constants.CONNECTIONS_KEY, 0); //如果connections不配置,則共享連接,否則每服務每連接 if (connections == 0){ service_share_connect = true; connections = 1; } ExchangeClient[] clients = new ExchangeClient[connections]; for (int i = 0; i < clients.length; i++) { if (service_share_connect){ //這里沒有配置connections,就使用getSharedClient //getSharedClient中先去緩存中查找,沒有的話就會新建,也是調用initClient方法 clients[i] = getSharedClient(url); } else { clients[i] = initClient(url); } } return clients; }
直接看initClient方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34
//創建新連接 private ExchangeClient initClient(URL url) { // client type setting. String str = url.getParameter(Constants.CLIENT_KEY, url.getParameter(Constants.SERVER_KEY, Constants.DEFAULT_REMOTING_CLIENT)); String version = url.getParameter(Constants.DUBBO_VERSION_KEY); boolean compatible = (version != null && version.startsWith("1.0.")); url = url.addParameter(Constants.CODEC_KEY, Version.isCompatibleVersion() && compatible ? COMPATIBLE_CODEC_NAME : DubboCodec.NAME); //默認開啟heartbeat url = url.addParameterIfAbsent(Constants.HEARTBEAT_KEY, String.valueOf(Constants.DEFAULT_HEARTBEAT)); // BIO存在嚴重性能問題,暫時不允許使用 if (str != null && str.length() > 0 && ! ExtensionLoader.getExtensionLoader(Transporter.class).hasExtension(str)) { throw new RpcException("Unsupported client type: " + str + "," + " supported client type is " + StringUtils.join(ExtensionLoader.getExtensionLoader(Transporter.class).getSupportedExtensions(), " ")); } ExchangeClient client ; try { //如果lazy屬性沒有配置為true(我們沒有配置,默認為false)ExchangeClient會馬上和服務端建立連接 //設置連接應該是lazy的 if (url.getParameter(Constants.LAZY_CONNECT_KEY, false)){ client = new LazyConnectExchangeClient(url ,requestHandler); } else { //立即和服務端建立連接 client = Exchangers.connect(url ,requestHandler); } } catch (RemotingException e) { throw new RpcException("Fail to create remoting client for service(" + url + "): " + e.getMessage(), e); } return client; }
和服務端建立連接,Exchangers.connect(url ,requestHandler);,其實最后使用的是HeaderExchanger,Exchanger目前只有這一個實現:
1 2 3 4 5 6 7
public ExchangeClient connect(URL url, ExchangeHandler handler) throws RemotingException { //先經過HeaderExchangeHandler包裝 //然后是DecodeHandler //然后是Transporters.connect //返回一個HeaderExchangerClient,這里封裝了client,channel,啟動心跳的定時器等 return new HeaderExchangeClient(Transporters.connect(url, new DecodeHandler(new HeaderExchangeHandler(handler)))); }
Transporters.connect中也是根據SPI擴展獲取Transport的具體實現,這里默認使用NettyTransporter.connect(),在NettyTransporter的connect方法中直接返回一個NettyClient(url, listener);,下面看下具體的NettyClient初始化細節,會先初始化AbstractPeer這里只是吧url和handler賦值;然后是AbstractEndpoint初始化:
1 2 3 4 5 6 7
public AbstractEndpoint(URL url, ChannelHandler handler) { super(url, handler); //獲取編解碼器,這里是DubboCountCodec this.codec = getChannelCodec(url); this.timeout = url.getPositiveParameter(Constants.TIMEOUT_KEY, Constants.DEFAULT_TIMEOUT); this.connectTimeout = url.getPositiveParameter(Constants.CONNECT_TIMEOUT_KEY, Constants.DEFAULT_CONNECT_TIMEOUT); }
接着是AbstractClient的初始化:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21
public AbstractClient(URL url, ChannelHandler handler) throws RemotingException { super(url, handler); send_reconnect = url.getParameter(Constants.SEND_RECONNECT_KEY, false); shutdown_timeout = url.getParameter(Constants.SHUTDOWN_TIMEOUT_KEY, Constants.DEFAULT_SHUTDOWN_TIMEOUT); //默認重連間隔2s,1800表示1小時warning一次. reconnect_warning_period = url.getParameter("reconnect.waring.period", 1800); try { //具體實現在子類中 doOpen(); } catch (Throwable t) {。。。 } try { // 連接 connect(); } catch (RemotingException t) {。。。} // TODO暫沒理解 executor = (ExecutorService) ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().get(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); ExtensionLoader.getExtensionLoader(DataStore.class) .getDefaultExtension().remove(Constants.CONSUMER_SIDE, Integer.toString(url.getPort())); }
看下在NettyClient中doOpen()的實現:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
protected void doOpen() throws Throwable { NettyHelper.setNettyLoggerFactory(); bootstrap = new ClientBootstrap(channelFactory); // config // @see org.jboss.netty.channel.socket.SocketChannelConfig bootstrap.setOption("keepAlive", true); bootstrap.setOption("tcpNoDelay", true); bootstrap.setOption("connectTimeoutMillis", getTimeout()); final NettyHandler nettyHandler = new NettyHandler(getUrl(), this); bootstrap.setPipelineFactory(new ChannelPipelineFactory() { public ChannelPipeline getPipeline() { NettyCodecAdapter adapter = new NettyCodecAdapter(getCodec(), getUrl(), NettyClient.this); ChannelPipeline pipeline = Channels.pipeline(); pipeline.addLast("decoder", adapter.getDecoder()); pipeline.addLast("encoder", adapter.getEncoder()); pipeline.addLast("handler", nettyHandler); return pipeline; } }); }
這里是Netty3中的客戶端連接的一些常規步驟,暫不做具體解析。open之后,就是真正連接服務端的操作了,connect():
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16
protected void connect() throws RemotingException { connectLock.lock(); try { if (isConnected()) { return; } //初始化重連的線程 initConnectStatusCheckCommand(); //連接,在子類中實現 doConnect(); reconnect_count.set(0); reconnect_error_log_flag.set(false); } catch (RemotingException e) {。。。} finally { connectLock.unlock(); } }
NettyClient中的doConnect方法:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38
protected void doConnect() throws Throwable { long start = System.currentTimeMillis(); //消費者端開始連接,這一步的時候,服務提供者端就接到了連接請求,開始處理了 ChannelFuture future = bootstrap.connect(getConnectAddress()); try{ boolean ret = future.awaitUninterruptibly(getConnectTimeout(), TimeUnit.MILLISECONDS); if (ret && future.isSuccess()) { Channel newChannel = future.getChannel(); newChannel.setInterestOps(Channel.OP_READ_WRITE); try { // 關閉舊的連接 Channel oldChannel = NettyClient.this.channel; // copy reference if (oldChannel != null) { try { oldChannel.close(); } finally { NettyChannel.removeChannelIfDisconnected(oldChannel); } } } finally { if (NettyClient.this.isClosed()) { try { newChannel.close(); } finally { NettyClient.this.channel = null; NettyChannel.removeChannelIfDisconnected(newChannel); } } else { NettyClient.this.channel = newChannel; } } } else if (future.getCause() != null) { throw。。。 } else {throw 。。。 } }finally{ if (! isConnected()) { future.cancel(); } } }
這里連接的細節都交給了netty。
NettyClient初始化完成之后,返回給Transporters,再返回給HeaderExchanger,HeaderExchanger中將NettyClient包裝成HeaderExchangeClient返回給DubboProtocol的initClient方法中,到此在getSharedClient中就獲取到了一個ExchangeClient,然后包裝一下返回client = new ReferenceCountExchangeClient(exchagneclient, ghostClientMap);
。
到這里在DubboProtocol的refer方法中這句DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers);
創建DubboInvoker就已經解析完成,創建過程中連接了服務端,包含一個ExchangeClient等:
1 2 3 4 5 6 7 8
public <T> Invoker<T> refer(Class<T> serviceType, URL url) throws RpcException { // create rpc invoker. DubboInvoker<T> invoker = new DubboInvoker<T>(serviceType, url, getClients(url), invokers); //將invoker緩存 invokers.add(invoker); //返回invoker return invoker; }
接着返回ProtocolFilterWrapper的refer方法,在這里會構建invoker鏈:
1 2 3 4 5 6
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return buildInvokerChain(protocol.refer(type, url), Constants.REFERENCE_FILTER_KEY, Constants.CONSUMER); }
接着再返回到ProtocolListenerWrapper的refer方法,這里會初始化監聽器,包裝:
1 2 3 4 5 6 7 8 9
public <T> Invoker<T> refer(Class<T> type, URL url) throws RpcException { if (Constants.REGISTRY_PROTOCOL.equals(url.getProtocol())) { return protocol.refer(type, url); } return new ListenerInvokerWrapper<T>(protocol.refer(type, url), Collections.unmodifiableList( ExtensionLoader.getExtensionLoader(InvokerListener.class) .getActivateExtension(url, Constants.INVOKER_LISTENER_KEY))); }
接着在返回到toInvokers方法,然后返回refreshInvoker方法的Map<String, Invoker<T>> newUrlInvokerMap = toInvokers(invokerUrls) ;
這就獲得了Invoker,接着就是方法名映射Invoker列表:Map<String, List<Invoker<T>>> newMethodInvokerMap = toMethodInvokers(newUrlInvokerMap);
這里將invokers列表轉成與方法的映射關系。到這里refreshInvoker方法就完成了,在往上就返回到AbstractRegistry的notify方法,到這里也完成了。
創建服務代理
到這里有關消費者端注冊到注冊中心和訂閱注冊中心就完事兒了,這部分是在RegistryProtocol.doRefer方法中,這個方法最后一句是return cluster.join(directory);
,這里由Cluster組件創建一個Invoker並返回,這里的cluster默認是用FailoverCluster,最后返回的是經過MockClusterInvoker包裝過的FailoverCluster。繼續返回到ReferenceConfig中createProxy方法,這時候我們已經完成了消費者端引用服務的Invoker。然后最后返回的是根據我們得到的invoker創建的服務代理return (T) proxyFactory.getProxy(invoker);
。這里proxyFactory是我們在最上面列出的動態生成的代碼。
首先經過AbstractProxyFactory的處理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20
public <T> T getProxy(Invoker<T> invoker) throws RpcException { Class<?>[] interfaces = null; String config = invoker.getUrl().getParameter("interfaces"); if (config != null && config.length() > 0) { String[] types = Constants.COMMA_SPLIT_PATTERN.split(config); if (types != null && types.length > 0) { interfaces = new Class<?>[types.length + 2]; interfaces[0] = invoker.getInterface(); interfaces[1] = EchoService.class; for (int i = 0; i < types.length; i ++) { interfaces[i + 1] = ReflectUtils.forName(types[i]); } } } if (interfaces == null) { interfaces = new Class<?>[] {invoker.getInterface(), EchoService.class}; } //這里默認使用的是JavassistProxyFactory的實現 return getProxy(invoker, interfaces); }
然后經過StubProxyFactoryWrapper的處理:
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41 42 43
public <T> T getProxy(Invoker<T> invoker) throws RpcException { T proxy = proxyFactory.getProxy(invoker); if (GenericService.class != invoker.getInterface()) { String stub = invoker.getUrl().getParameter(Constants.STUB_KEY, invoker.getUrl().getParameter(Constants.LOCAL_KEY)); if (ConfigUtils.isNotEmpty(stub)) { Class<?> serviceType = invoker.getInterface(); if (ConfigUtils.isDefault(stub)) { if (invoker.getUrl().hasParameter(Constants.STUB_KEY)) { stub = serviceType.getName() + "Stub"; } else { stub = serviceType.getName() + "Local"; } } try { Class<?> stubClass = ReflectUtils.forName(stub); if (! serviceType.isAssignableFrom(stubClass)) { throw new IllegalStateException("The stub implemention class " + stubClass.getName() + " not implement interface " + serviceType.getName()); } try { Constructor<?> constructor = ReflectUtils.findConstructor(stubClass, serviceType); proxy = (T) constructor.newInstance(new Object[] {proxy}); //export stub service URL url = invoker.getUrl(); if (url.getParameter(Constants.STUB_EVENT_KEY, Constants.DEFAULT_STUB_EVENT)){ url = url.addParameter(Constants.STUB_EVENT_METHODS_KEY, StringUtils.join(Wrapper.getWrapper(proxy.getClass()).getDeclaredMethodNames(), ",")); url = url.addParameter(Constants.IS_SERVER_KEY, Boolean.FALSE.toString()); try{ export(proxy, (Class)invoker.getInterface(), url); }catch (Exception e) { LOGGER.error("export a stub service error.", e); } } } catch (NoSuchMethodException e) { throw new IllegalStateException("No such constructor \"public " + stubClass.getSimpleName() + "(" + serviceType.getName() + ")\" in stub implemention class " + stubClass.getName(), e); } } catch (Throwable t) { LOGGER.error("Failed to create stub implemention class " + stub + " in consumer " + NetUtils.getLocalHost() + " use dubbo version " + Version.getVersion() + ", cause: " + t.getMessage(), t); // ignore } } } return proxy; }
返回代理。到此HelloService helloService = (HelloService) applicationContext.getBean("helloService");
就解析完成了,得到了服務的代理,代理會被注冊到Spring容器中,可以調用服務方法了。接下來的方法調用過程,是消費者發送請求,提供者處理,然后消費者接受處理結果的請求。
初始化的過程:主要做了注冊到注冊中心,監聽注冊中心,連接到服務提供者端,創建代理。這些都是為了下面消費者和提供者之間的通信做准備。