為了安全:服務啟動的ip全部使用10.10.10.10
遠程服務的暴露總體步驟:
- 將ref封裝為invoker
- 將invoker轉換為exporter
- 啟動netty
- 注冊服務到zookeeper
- 訂閱
- 返回新的exporter實例
在7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端中,實現了前三步,本節實現第四步:注冊服務到zk。總體代碼如下:RegistryProtocol.export(final Invoker<T> originInvoker)
1 final Registry registry = getRegistry(originInvoker);//創建ZookeeperRegistry實例:創建CuratorClient,並啟動會話。 2 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//獲取真正要注冊在zk上的url 3 registry.register(registedProviderUrl);//創建節點(即注冊服務到zk上)
說明:
- 第一句代碼用來創建ZookeeperRegistry實例:創建CuratorClient,並啟動會話。
- 第二句代碼獲取真正要注冊在zk上的url
- 第三句代碼實現創建節點(即注冊服務到zk上)
一 創建ZookeeperRegistry實例
1 RegistryProtocol.getRegistry(final Invoker<?> originInvoker)
1 /** 2 * 根據invoker的地址獲取registry實例 3 */ 4 private Registry getRegistry(final Invoker<?> originInvoker) { 5 URL registryUrl = originInvoker.getUrl(); 6 if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) { 7 String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);//zookeeper 8 registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY); 9 } 10 return registryFactory.getRegistry(registryUrl); 11 }
首先對originInvoker中的url進行處理:
- 將協議換成zookeeper
- 去掉registry=zookeeper的參數
來看一下originInvoker的url:(解碼后的)
registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider×tamp=1507262031554&pid=2791®istry=zookeeper×tamp=1507262031521
說明:
- 第一個紅色部分代表協議:zookeeper
- 第二個紅色部分是export參數
- 第三個紅色部分是registry=zookeeper
經過處理之后的registryUrl為:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider×tamp=1507262031554&pid=2791×tamp=1507262031521
之后使用注冊工廠來創建注冊中心。
2 RegistryFactory$Adaptive.getRegistry(com.alibaba.dubbo.common.URL registryUrl)
1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory { 2 public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) { 3 if (arg0 == null) 4 throw new IllegalArgumentException("url == null"); 5 com.alibaba.dubbo.common.URL url = arg0; 6 String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper 7 if(extName == null) 8 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])"); 9 com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName); 10 return extension.getRegistry(arg0); 11 } 12 }
這里獲取到的extension是ZookeeperRegistryFactory,之后,使用ZookeeperRegistryFactory進行Registry的創建。首先來看一下ZookeeperRegistryFactory的繼承圖:
getRegistry方法在ZookeeperRegistryFactory的父類AbstractRegistryFactory中。
3 AbstractRegistryFactory.getRegistry(URL registryUrl)
1 public Registry getRegistry(URL url) { 2 url = url.setPath(RegistryService.class.getName()) 3 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName()) 4 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY); 5 String key = url.toServiceString(); 6 // 鎖定注冊中心獲取過程,保證注冊中心單一實例 7 LOCK.lock(); 8 try { 9 Registry registry = REGISTRIES.get(key); 10 if (registry != null) { 11 return registry; 12 } 13 registry = createRegistry(url); 14 if (registry == null) { 15 throw new IllegalStateException("Can not create registry " + url); 16 } 17 REGISTRIES.put(key, registry); 18 return registry; 19 } finally { 20 // 釋放鎖 21 LOCK.unlock(); 22 } 23 }
流程:
- 先處理url,之后獲取Registry的key,然后根據該key從Map<String, Registry> REGISTRIES注冊中心集合緩存中獲取Registry,如果有,直接返回,如果沒有,創建Registry,之后存入緩存,最后返回。
首先處理傳入的registryUrl:
- 設置:path=com.alibaba.dubbo.registry.RegistryService
- 添加參數:interface=com.alibaba.dubbo.registry.RegistryService
- 去除export參數
最終得到的registryUrl如下:
zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2791×tamp=1507262031521
之后,很具上述的registryUrl創建Registry的key,該{ key : Registry }最終會被存儲在Map<String, Registry> REGISTRIES注冊中心集合(該屬性是ZookeeperRegistryFactory父類AbstractRegistryFactory的一個屬性)中。
根據registryUrl創建Registry的key:url.toServiceString()
1 public String toServiceString() { 2 return buildString(true, false, true, true); 3 } 4 5 private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService, String... parameters) { 6 StringBuilder buf = new StringBuilder(); 7 if (protocol != null && protocol.length() > 0) { //protocol:// 8 buf.append(protocol); 9 buf.append("://"); 10 } 11 if (appendUser && username != null && username.length() > 0) { //protocol://username:password@host:port/group/interface{path}:version/parameters 12 buf.append(username); 13 if (password != null && password.length() > 0) { 14 buf.append(":"); 15 buf.append(password); 16 } 17 buf.append("@"); 18 } 19 String host; 20 if (useIP) { 21 host = getIp(); 22 } else { 23 host = getHost(); 24 } 25 if (host != null && host.length() > 0) { 26 buf.append(host); 27 if (port > 0) { 28 buf.append(":"); 29 buf.append(port); 30 } 31 } 32 String path; 33 if (useService) { 34 path = getServiceKey(); 35 } else { 36 path = getPath(); 37 } 38 if (path != null && path.length() > 0) { 39 buf.append("/"); 40 buf.append(path); 41 } 42 if (appendParameter) { 43 buildParameters(buf, true, parameters); 44 } 45 return buf.toString(); 46 } 47 48 public String getServiceKey() { 49 String inf = getServiceInterface();//先獲取interface參數,如果沒有的話,取path的值,這里都是com.alibaba.dubbo.registry.RegistryService 50 if (inf == null) return null; 51 StringBuilder buf = new StringBuilder(); 52 String group = getParameter(Constants.GROUP_KEY); 53 if (group != null && group.length() > 0) { 54 buf.append(group).append("/"); //interfacegroup 55 } 56 buf.append(inf); 57 String version = getParameter(Constants.VERSION_KEY); 58 if (version != null && version.length() > 0) { 59 buf.append(":").append(version); 60 } 61 return buf.toString(); 62 }
最終得到的應該是這樣的形式:protocol://username:password@host:port/group/interface{path}:version?key1=value1&key2=value2...。
這里key=zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService
之后來到了真正創建Registry的地方。
1 public class ZookeeperRegistryFactory extends AbstractRegistryFactory { 2 private ZookeeperTransporter zookeeperTransporter; 3 4 public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) { 5 this.zookeeperTransporter = zookeeperTransporter; 6 } 7 8 public Registry createRegistry(URL url) { 9 return new ZookeeperRegistry(url, zookeeperTransporter); 10 } 11 }
這里的zookeeperTransporter對象是一個com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive對象。
在創建ZookeeperRegistry之前來看一下其繼承圖:
new ZookeeperRegistry(registryUrl, ZookeeperTransporter$Adaptive對象)
1 private final static int DEFAULT_ZOOKEEPER_PORT = 2181; 2 private final static String DEFAULT_ROOT = "dubbo"; 3 private final String root; 4 private final Set<String> anyServices = new ConcurrentHashSet<String>(); 5 private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>(); 6 private final ZookeeperClient zkClient; 7 8 public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) { 9 super(url); 10 if (url.isAnyHost()) { 11 throw new IllegalStateException("registry address == null"); 12 } 13 String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);//dubbo 14 if (!group.startsWith(Constants.PATH_SEPARATOR)) { 15 group = Constants.PATH_SEPARATOR + group; 16 } 17 this.root = group;// /dubbo 18 zkClient = zookeeperTransporter.connect(url);//創建zk客戶端,啟動會話 19 zkClient.addStateListener(new StateListener() {//監聽重新連接成功事件,重新連接成功后,之前已經完成注冊和訂閱的url要重新進行注冊和訂閱(因為臨時節點可能已經跪了) 20 public void stateChanged(int state) { 21 if (state == RECONNECTED) { 22 try { 23 recover(); 24 } catch (Exception e) { 25 logger.error(e.getMessage(), e); 26 } 27 } 28 } 29 }); 30 }
new FailbackRegistry(registryUrl)
1 private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true)); 2 // 失敗重試定時器,定時檢查是否有請求失敗,如有,無限次重試 3 private final ScheduledFuture<?> retryFuture; 4 private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>(); 5 private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>(); 6 private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); 7 private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>(); 8 private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>(); 9 private AtomicBoolean destroyed = new AtomicBoolean(false); 10 11 public FailbackRegistry(URL url) { 12 super(url); 13 int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);//5*1000 14 this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() { 15 public void run() { 16 // 檢測並連接注冊中心 17 try { 18 retry(); 19 } catch (Throwable t) { // 防御性容錯 20 logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t); 21 } 22 } 23 }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS); 24 }
new AbstractRegistry(registryUrl)
1 // URL地址分隔符,用於文件緩存中,服務提供者URL分隔 2 private static final char URL_SEPARATOR = ' '; 3 // URL地址分隔正則表達式,用於解析文件緩存中服務提供者URL列表 4 private static final String URL_SPLIT = "\\s+"; 5 // 本地磁盤緩存,其中特殊的key值.registies記錄注冊中心列表,其它均為notified服務提供者列表 6 private final Properties properties = new Properties(); 7 // 文件緩存定時寫入 8 private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true)); 9 //是否是同步保存文件 10 private final boolean syncSaveFile; 11 // 本地磁盤緩存文件 12 private File file; 13 private final AtomicLong lastCacheChanged = new AtomicLong(); 14 private final Set<URL> registered = new ConcurrentHashSet<URL>();//已經注冊的url集合 15 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已經訂閱的<URL, Set<NotifyListener>> 16 private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//已經通知的<URL, Map<String, List<URL>>> 17 private URL registryUrl;//注冊url 18 private AtomicBoolean destroyed = new AtomicBoolean(false); 19 20 public AbstractRegistry(URL url) { 21 setUrl(url); 22 // 啟動文件保存定時器 23 syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false); 24 String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache"); 25 File file = null; 26 if (ConfigUtils.isNotEmpty(filename)) { 27 file = new File(filename); 28 if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) { 29 if (!file.getParentFile().mkdirs()) {//創建文件所在的文件夾 /Users/jigangzhao/.dubbo/ 30 throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!"); 31 } 32 } 33 } 34 this.file = file; 35 loadProperties(); 36 notify(url.getBackupUrls()); 37 }
先簡單的總結一下:父子三代分別做的事情:
- AbstractRegistry主要用來維護緩存文件。
- FailbackRegistry主要用來做失敗重試操作(包括:注冊失敗/反注冊失敗/訂閱失敗/反訂閱失敗/通知失敗的重試);也提供了供ZookeeperRegistry使用的zk重連后的恢復工作的方法。
- ZookeeperRegistry創建zk客戶端,啟動會話;並且調用FailbackRegistry實現zk重連后的恢復工作。
先看AbstractRegistry
- 設置屬性registryUrl=url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
- 創建文件/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache的文件夾/Users/jigangzhao/.dubbo
- 設置屬性file:/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache文件,該文件存儲信息將是這樣的:
com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category\=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5259&side\=provider×tamp\=1507294508053
- 如果file存在,將file中的內容寫入properties屬性;既然有讀file,那么是什么時候寫入file的呢?AbstractRegistry創建了一個含有一個名字為DubboSaveRegistryCache的后台線程的FixedThreadPool,只在在notify(URL url, NotifyListener listener, List<URL> urls)方法中會被調用,我們此處由於ConcurrentMap<URL, Set<NotifyListener>> subscribed為空,所以AbstractRegistry(URL url)中的notify(url.getBackupUrls())不會執行,此處也不會創建文件。
- 最后是notify(url.getBackupUrls())(TODO 這里后續會寫)
再來看FailbackRegistry:
只做了一件事,啟動了一個含有一個名為DubboRegistryFailedRetryTimer的后台線程的ScheduledThreadPool,線程創建5s后開始第一次執行retry(),之后每隔5s執行一次。來看一下retry()
1 /** 2 * 將所有注冊失敗的url(failedRegistered中的url)進行注冊,之后從failedRegistered進行移除; 3 * 將所有反注冊失敗的url(failedUnregistered中的url)進行反注冊,之后從failedUnregistered進行移除; 4 * 將所有訂閱失敗的url(failedSubscribed中的url)進行重新訂閱,之后從failedSubscribed進行移除; 5 * 將所有反訂閱失敗的url(failedUnsubscribed中的url)進行反訂閱,之后從failedUnsubscribed進行移除; 6 * 將所有通知失敗的url(failedNotified中的url)進行通知,之后從failedNotified進行移除; 7 */ 8 protected void retry() { 9 if (!failedRegistered.isEmpty()) { 10 Set<URL> failed = new HashSet<URL>(failedRegistered); 11 if (failed.size() > 0) { 12 if (logger.isInfoEnabled()) { 13 logger.info("Retry register " + failed); 14 } 15 try { 16 for (URL url : failed) { 17 try { 18 doRegister(url); 19 failedRegistered.remove(url); 20 } catch (Throwable t) { // 忽略所有異常,等待下次重試 21 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 22 } 23 } 24 } catch (Throwable t) { // 忽略所有異常,等待下次重試 25 logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t); 26 } 27 } 28 } 29 if (!failedUnregistered.isEmpty()) { 30 Set<URL> failed = new HashSet<URL>(failedUnregistered); 31 if (failed.size() > 0) { 32 if (logger.isInfoEnabled()) { 33 logger.info("Retry unregister " + failed); 34 } 35 try { 36 for (URL url : failed) { 37 try { 38 doUnregister(url); 39 failedUnregistered.remove(url); 40 } catch (Throwable t) { // 忽略所有異常,等待下次重試 41 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 42 } 43 } 44 } catch (Throwable t) { // 忽略所有異常,等待下次重試 45 logger.warn("Failed to retry unregister " + failed + ", waiting for again, cause: " + t.getMessage(), t); 46 } 47 } 48 } 49 if (!failedSubscribed.isEmpty()) { 50 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed); 51 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 52 if (entry.getValue() == null || entry.getValue().size() == 0) { 53 failed.remove(entry.getKey()); 54 } 55 } 56 if (failed.size() > 0) { 57 if (logger.isInfoEnabled()) { 58 logger.info("Retry subscribe " + failed); 59 } 60 try { 61 for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { 62 URL url = entry.getKey(); 63 Set<NotifyListener> listeners = entry.getValue(); 64 for (NotifyListener listener : listeners) { 65 try { 66 doSubscribe(url, listener);//listener需要一個一個訂閱,每訂閱一個,就將該listener從當前的url監聽列表中移除 67 listeners.remove(listener); 68 } catch (Throwable t) { // 忽略所有異常,等待下次重試 69 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 70 } 71 } 72 } 73 } catch (Throwable t) { // 忽略所有異常,等待下次重試 74 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 75 } 76 } 77 } 78 if (!failedUnsubscribed.isEmpty()) { 79 Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed); 80 for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) { 81 if (entry.getValue() == null || entry.getValue().size() == 0) { 82 failed.remove(entry.getKey()); 83 } 84 } 85 if (failed.size() > 0) { 86 if (logger.isInfoEnabled()) { 87 logger.info("Retry unsubscribe " + failed); 88 } 89 try { 90 for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) { 91 URL url = entry.getKey(); 92 Set<NotifyListener> listeners = entry.getValue(); 93 for (NotifyListener listener : listeners) { 94 try { 95 doUnsubscribe(url, listener);//listener需要一個一個反訂閱,每反訂閱一個,就將該listener從當前的url監聽列表中移除 96 listeners.remove(listener); 97 } catch (Throwable t) { // 忽略所有異常,等待下次重試 98 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 99 } 100 } 101 } 102 } catch (Throwable t) { // 忽略所有異常,等待下次重試 103 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t); 104 } 105 } 106 } 107 if (!failedNotified.isEmpty()) { 108 Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified); 109 for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) { 110 if (entry.getValue() == null || entry.getValue().size() == 0) { 111 failed.remove(entry.getKey()); 112 } 113 } 114 if (failed.size() > 0) { 115 if (logger.isInfoEnabled()) { 116 logger.info("Retry notify " + failed); 117 } 118 try { 119 for (Map<NotifyListener, List<URL>> values : failed.values()) { 120 for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) { 121 try { 122 NotifyListener listener = entry.getKey(); 123 List<URL> urls = entry.getValue(); 124 listener.notify(urls); 125 values.remove(listener); 126 } catch (Throwable t) { // 忽略所有異常,等待下次重試 127 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); 128 } 129 } 130 } 131 } catch (Throwable t) { // 忽略所有異常,等待下次重試 132 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t); 133 } 134 } 135 } 136 }
最后回到我們的主角:ZookeeperRegistry
首先是為屬性設置root=/dubbo,之后創建zk客戶端,啟動會話,最后創建了一個StateListener監聽器,監聽重新連接成功事件,重新連接成功后,之前已經完成注冊和訂閱的url要重新進行注冊和訂閱(因為臨時節點可能已經跪了)。
來看創建zk客戶端,啟動會話的代碼,這是此處最核心的部分:
ZookeeperTransporter$Adaptive.connect(com.alibaba.dubbo.common.URL registryUrl)
1 public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) { 2 if (arg0 == null) 3 throw new IllegalArgumentException("url == null"); 4 com.alibaba.dubbo.common.URL url = arg0; 5 String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));//curator 6 if(extName == null) 7 throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])"); 8 com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName); 9 return extension.connect(arg0); 10 }
這里創建的extension是CuratorZookeeperTransporter實例。
1 public class CuratorZookeeperTransporter implements ZookeeperTransporter { 2 public ZookeeperClient connect(URL url) { 3 return new CuratorZookeeperClient(url); 4 } 5 }
new CuratorZookeeperClient(registryUrl)
1 private final CuratorFramework client; 2 3 public CuratorZookeeperClient(URL url) { 4 super(url); 5 try { 6 CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder() 7 .connectString(url.getBackupAddress()) 8 .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000)) 9 .connectionTimeoutMs(5000); 10 String authority = url.getAuthority(); 11 if (authority != null && authority.length() > 0) { 12 builder = builder.authorization("digest", authority.getBytes()); 13 } 14 client = builder.build(); 15 client.getConnectionStateListenable().addListener(new ConnectionStateListener() { 16 public void stateChanged(CuratorFramework client, ConnectionState state) { 17 if (state == ConnectionState.LOST) { 18 CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED); 19 } else if (state == ConnectionState.CONNECTED) { 20 CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED); 21 } else if (state == ConnectionState.RECONNECTED) { 22 CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED); 23 } 24 } 25 }); 26 client.start(); 27 } catch (Exception e) { 28 throw new IllegalStateException(e.getMessage(), e); 29 } 30 }
這里首先執行父類AbstractZookeeperClient的構造器來初始化一些參數,之后創建CuratorFramework客戶端,然后添加了ConnectionStateListener監聽器,監聽連接斷開/連接成功/重新連接成功事件,之后作出相應的操作(實際上這里只有重新連接成功事件會被處理,而處理器實際上就是ZookeeperRegistry構造器中的那個執行recover()的StateListener),
protected void stateChanged(int state) { for (StateListener sessionListener : getSessionListeners()) { sessionListener.stateChanged(state);//此處查找實現類,只有ZookeeperRegistry構造器中的那個StateListener } }
最后阻塞,直到創建會話完成。
來看一下父類AbstractZookeeperClient:
1 private final URL url; 2 private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>(); 3 private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>(); 4 private volatile boolean closed = false; 5 6 public AbstractZookeeperClient(URL url) { 7 this.url = url; 8 }
說明:
- 設置屬性url=registryUrl:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
- 創建了一個Set<StateListener> stateListeners,ZookeeperRegistry構造器中的那個執行recover()的StateListener就將會放在這里
至此,一個完整的ZookeeperRegistry實例就創建完成了,來看一下屬性:
- ZookeeperClient zkClient = CuratorZookeeperClient實例
- CuratorFramework client:CuratorFrameworkImpl實例
- String url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
- Set<StateListener> stateListeners:{ 監聽了重連成功事件的執行recover()的StateListener }
- String root="/dubbo"
- URL registryUrl = zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685×tamp=1507286468150
- Set<URL> registered:0//已經注冊的url集合,此處為空
- ConcurrentMap<URL, Set<NotifyListener>> subscribed:0//已經訂閱的<URL, Set<NotifyListener>>
- ConcurrentMap<URL, Map<String, List<URL>>> notified:0//已經通知的<URL, Map<String, List<URL>>>
- Set<URL> failedRegistered:0//注冊失敗的url
- Set<URL> failedUnregistered:0//反注冊失敗的url
- ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed:0//訂閱失敗的url
- ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed:0//反訂閱失敗的url
- ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified:0//通知失敗的url
- ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:0
還有一個定時線程:DubboRegistryFailedRetryTimer每隔5s執行一次retry(),進行失敗重試。
最后,該ZookeeperRegistry會存儲在ZookeeperRegistry的父類的static屬性Map<String, Registry> REGISTRIES中:
Map<String, Registry> REGISTRIES:{ "zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService" : ZookeeperRegistry實例 }
二 獲取真正要注冊到zk的節點url
1 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
1 /** 2 * 1 獲取originInvoker的export參數值:就是providerUrl 3 * 2 去除providerUrl中所有參數名是"."開頭的,然后去除參數monitor 4 */ 5 private URL getRegistedProviderUrl(final Invoker<?> originInvoker) { 6 URL providerUrl = getProviderUrl(originInvoker); 7 //注冊中心看到的地址 8 final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY); 9 return registedProviderUrl; 10 } 11 12 /** 13 * 從invoker的URL中的Map<String, String> parameters中獲取key為export的地址providerUrl: 14 */ 15 private URL getProviderUrl(final Invoker<?> origininvoker) { 16 String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY); 17 if (export == null || export.length() == 0) { 18 throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl()); 19 } 20 URL providerUrl = URL.valueOf(export); 21 return providerUrl; 22 } 23 24 //過濾URL中不需要輸出的參數(以點號開頭的) 25 private static String[] getFilteredKeys(URL url) { 26 Map<String, String> params = url.getParameters(); 27 if (params != null && !params.isEmpty()) { 28 List<String> filteredKeys = new ArrayList<String>(); 29 for (Map.Entry<String, String> entry : params.entrySet()) { 30 if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) { 31 filteredKeys.add(entry.getKey()); 32 } 33 } 34 return filteredKeys.toArray(new String[filteredKeys.size()]); 35 } else { 36 return new String[]{}; 37 } 38 }
最后得到的registedProviderUrl是:
- dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4758&side=provider×tamp=1507289961588
三 注冊服務到zk
registry.register(registedProviderUrl);//創建節點(即注冊服務到zk上)
這里的registry是ZookeeperRegistry。register(registedProviderUrl)方法在ZookeeperRegistry的父類FailbackRegistry中實現。
1 FailbackRegistry.register(registedProviderUrl)
1 @Override 2 public void register(URL url) { 3 if (destroyed.get()){ 4 return; 5 } 6 super.register(url); 7 failedRegistered.remove(url); 8 failedUnregistered.remove(url); 9 try { 10 // 向服務器端發送注冊請求 11 doRegister(url); 12 } catch (Exception e) { 13 Throwable t = e; 14 // 如果開啟了啟動時檢測check=true,則直接拋出異常,不會加入到failedRegistered中 15 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 16 && url.getParameter(Constants.CHECK_KEY, true) 17 && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol()); 18 boolean skipFailback = t instanceof SkipFailbackWrapperException; 19 if (check || skipFailback) { 20 if (skipFailback) { 21 t = t.getCause(); 22 } 23 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t); 24 } else { 25 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t); 26 } 27 // 將失敗的注冊請求記錄到失敗列表,定時重試 28 failedRegistered.add(url); 29 } 30 }
首先調用父類AbstractRegistry的register(registedProviderUrl)將當前的registeredProviderUrl放到Set<URL> registered屬性中,如下:
1 public void register(URL url) { 2 if (url == null) { 3 throw new IllegalArgumentException("register url == null"); 4 } 5 if (logger.isInfoEnabled()) { 6 logger.info("Register: " + url); 7 } 8 registered.add(url); 9 }
之后,從failedRegistered和failedUnregistered兩個url集合中刪除該url。然后執行真正的服務注冊(創建節點,doRegister(url)),如果在創建過程中拋出異常,如果url的協議不是consumer並且開啟了check=true的屬性並且當前存儲的URL registryUrl也有check=true的話,那么直接拋出異常,不會將該url加入到failedRegistered集合;當然拋出的異常如果是SkipFailbackWrapperException,那么也會直接拋出異常,不會將該url加入到failedRegistered集合。否則,會將該url加入到failedRegistered集合,然后DubboRegistryFailedRetryTimer線程會每隔5s執行一次doRegister(url)。
我們來看真正doRegister(url)。
2 ZookeeperRegistry.doRegister(registedProviderUrl)
1 protected void doRegister(URL url) { 2 try { 3 zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true)); 4 } catch (Throwable e) { 5 throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 6 } 7 }
首先是對入參registedProviderUrl進行一頓處理,
1 private String toUrlPath(URL url) { 2 return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString()); 3 } 4 5 private String toCategoryPath(URL url) { 6 return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 7 } 8 9 private String toServicePath(URL url) { 10 String name = url.getServiceInterface(); 11 if (Constants.ANY_VALUE.equals(name)) { 12 return toRootPath(); 13 } 14 return toRootDir() + URL.encode(name);// /dubbo/com.alibaba.dubbo.demo.DemoService 15 } 16 17 private String toRootDir() { 18 if (root.equals(Constants.PATH_SEPARATOR)) { 19 return root; 20 } 21 return root + Constants.PATH_SEPARATOR;// /dubbo/ 22 } 23 24 private String toRootPath() { 25 return root; 26 }
這里就體現了上邊的ZookeeperRegistry的root屬性的作用。最終實際上得到的是:/dubbo/interface/category/encode過的export,該節點也將是創建在zk上的節點。
- /dubbo是根節點
- /interface是服務接口
- /category是providers/consumers/routers/configurators等
最終得到的url是:
- /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629
- 解碼后:/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5148&side=provider×tamp=1507291294629
最后執行zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true))來創建節點,該方法由CuratorZookeeperClient的父類AbstractZookeeperClient來執行:
1 public void create(String path, boolean ephemeral) { 2 int i = path.lastIndexOf('/'); 3 if (i > 0) { 4 create(path.substring(0, i), false); 5 } 6 if (ephemeral) { 7 createEphemeral(path); 8 } else { 9 createPersistent(path); 10 } 11 }
這里實際上是通過遞歸分別創建持久化的/dubbo,/dubbo/com.alibaba.dubbo.demo.DemoService以及/dubbo/com.alibaba.dubbo.demo.DemoService/providers節點;最后創建臨時節點/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629,而實際上,如果使用了curator的話,可以直接使用遞歸創建節點即可(結合zk的特性,只有最后一個字節點可以是臨時節點,父節點一定是持久化節點),這里這樣的寫法應該是兼容不能遞歸創建節點的Zkclient客戶端。值得注意的是,url.getParameter(Constants.DYNAMIC_KEY, true)為true則最終創建的節點是臨時節點,否則是持久化節點。
創建節點的操作是在CuratorZookeeperClient中進行的。
1 public void createPersistent(String path) { 2 try { 3 client.create().forPath(path); 4 } catch (NodeExistsException e) { 5 } catch (Exception e) { 6 throw new IllegalStateException(e.getMessage(), e); 7 } 8 } 9 10 public void createEphemeral(String path) { 11 try { 12 client.create().withMode(CreateMode.EPHEMERAL).forPath(path); 13 } catch (NodeExistsException e) { 14 } catch (Exception e) { 15 throw new IllegalStateException(e.getMessage(), e); 16 } 17 }
到此為止,我們去zk上看一下節點的創建情況。
或者從zkui上看一下:
隱藏掉的是ip:10.10.10.10。
到目前為止,我們再來看看ZookeeperRegistry的屬性變化。相較於注冊前:
- Set<URL> registered:[ dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5214&side=provider×tamp=1507293238549 ]