7.6 服務遠程暴露 - 注冊服務到zookeeper


為了安全:服務啟動的ip全部使用10.10.10.10

遠程服務的暴露總體步驟:

  • 將ref封裝為invoker
  • 將invoker轉換為exporter
  • 啟動netty
  • 注冊服務到zookeeper
  • 訂閱
  • 返回新的exporter實例

7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端中,實現了前三步,本節實現第四步:注冊服務到zk。總體代碼如下:RegistryProtocol.export(final Invoker<T> originInvoker)

1         final Registry registry = getRegistry(originInvoker);//創建ZookeeperRegistry實例:創建CuratorClient,並啟動會話。
2         final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);//獲取真正要注冊在zk上的url
3         registry.register(registedProviderUrl);//創建節點(即注冊服務到zk上)

說明:

  • 第一句代碼用來創建ZookeeperRegistry實例:創建CuratorClient,並啟動會話。
  • 第二句代碼獲取真正要注冊在zk上的url
  • 第三句代碼實現創建節點(即注冊服務到zk上)

 

一  創建ZookeeperRegistry實例

1  RegistryProtocol.getRegistry(final Invoker<?> originInvoker)

 1     /**
 2      * 根據invoker的地址獲取registry實例
 3      */
 4     private Registry getRegistry(final Invoker<?> originInvoker) {
 5         URL registryUrl = originInvoker.getUrl();
 6         if (Constants.REGISTRY_PROTOCOL.equals(registryUrl.getProtocol())) {
 7             String protocol = registryUrl.getParameter(Constants.REGISTRY_KEY, Constants.DEFAULT_DIRECTORY);//zookeeper
 8             registryUrl = registryUrl.setProtocol(protocol).removeParameter(Constants.REGISTRY_KEY);
 9         }
10         return registryFactory.getRegistry(registryUrl);
11     }

首先對originInvoker中的url進行處理:

  • 將協議換成zookeeper
  • 去掉registry=zookeeper的參數

來看一下originInvoker的url:(解碼后的)

registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&registry=zookeeper&timestamp=1507262031521

說明:

  • 第一個紅色部分代表協議:zookeeper
  • 第二個紅色部分是export參數
  • 第三個紅色部分是registry=zookeeper

經過處理之后的registryUrl為:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=2791&side=provider&timestamp=1507262031554&pid=2791&timestamp=1507262031521

之后使用注冊工廠來創建注冊中心。

 

2  RegistryFactory$Adaptive.getRegistry(com.alibaba.dubbo.common.URL registryUrl)

 1 public class RegistryFactory$Adaptive implements com.alibaba.dubbo.registry.RegistryFactory {
 2     public com.alibaba.dubbo.registry.Registry getRegistry(com.alibaba.dubbo.common.URL arg0) {
 3         if (arg0 == null)
 4             throw new IllegalArgumentException("url == null");
 5         com.alibaba.dubbo.common.URL url = arg0;
 6         String extName = ( url.getProtocol() == null ? "dubbo" : url.getProtocol() );//zookeeper
 7         if(extName == null)
 8             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.registry.RegistryFactory) name from url(" + url.toString() + ") use keys([protocol])");
 9         com.alibaba.dubbo.registry.RegistryFactory extension = (com.alibaba.dubbo.registry.RegistryFactory)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.registry.RegistryFactory.class).getExtension(extName);
10         return extension.getRegistry(arg0);
11     }
12 }

這里獲取到的extension是ZookeeperRegistryFactory,之后,使用ZookeeperRegistryFactory進行Registry的創建。首先來看一下ZookeeperRegistryFactory的繼承圖:

getRegistry方法在ZookeeperRegistryFactory的父類AbstractRegistryFactory中。

 

3  AbstractRegistryFactory.getRegistry(URL registryUrl)

 1     public Registry getRegistry(URL url) {
 2         url = url.setPath(RegistryService.class.getName())
 3                 .addParameter(Constants.INTERFACE_KEY, RegistryService.class.getName())
 4                 .removeParameters(Constants.EXPORT_KEY, Constants.REFER_KEY);
 5         String key = url.toServiceString();
 6         // 鎖定注冊中心獲取過程,保證注冊中心單一實例
 7         LOCK.lock();
 8         try {
 9             Registry registry = REGISTRIES.get(key);
10             if (registry != null) {
11                 return registry;
12             }
13             registry = createRegistry(url);
14             if (registry == null) {
15                 throw new IllegalStateException("Can not create registry " + url);
16             }
17             REGISTRIES.put(key, registry);
18             return registry;
19         } finally {
20             // 釋放鎖
21             LOCK.unlock();
22         }
23     }

流程:

  • 先處理url,之后獲取Registry的key,然后根據該key從Map<String, Registry> REGISTRIES注冊中心集合緩存中獲取Registry,如果有,直接返回,如果沒有,創建Registry,之后存入緩存,最后返回。

首先處理傳入的registryUrl:

  • 設置:path=com.alibaba.dubbo.registry.RegistryService
  • 添加參數:interface=com.alibaba.dubbo.registry.RegistryService
  • 去除export參數

最終得到的registryUrl如下:

zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=2791&timestamp=1507262031521

之后,很具上述的registryUrl創建Registry的key,該{ key : Registry }最終會被存儲在Map<String, Registry> REGISTRIES注冊中心集合(該屬性是ZookeeperRegistryFactory父類AbstractRegistryFactory的一個屬性)中。

 

根據registryUrl創建Registry的key:url.toServiceString()

 1     public String toServiceString() {
 2         return buildString(true, false, true, true);
 3     }
 4 
 5     private String buildString(boolean appendUser, boolean appendParameter, boolean useIP, boolean useService, String... parameters) {
 6         StringBuilder buf = new StringBuilder();
 7         if (protocol != null && protocol.length() > 0) {  //protocol://
 8             buf.append(protocol);
 9             buf.append("://");
10         }
11         if (appendUser && username != null && username.length() > 0) {  //protocol://username:password@host:port/group/interface{path}:version/parameters
12             buf.append(username);
13             if (password != null && password.length() > 0) {
14                 buf.append(":");
15                 buf.append(password);
16             }
17             buf.append("@");
18         }
19         String host;
20         if (useIP) {
21             host = getIp();
22         } else {
23             host = getHost();
24         }
25         if (host != null && host.length() > 0) {
26             buf.append(host);
27             if (port > 0) {
28                 buf.append(":");
29                 buf.append(port);
30             }
31         }
32         String path;
33         if (useService) {
34             path = getServiceKey();
35         } else {
36             path = getPath();
37         }
38         if (path != null && path.length() > 0) {
39             buf.append("/");
40             buf.append(path);
41         }
42         if (appendParameter) {
43             buildParameters(buf, true, parameters);
44         }
45         return buf.toString();
46     }
47 
48     public String getServiceKey() {
49         String inf = getServiceInterface();//先獲取interface參數,如果沒有的話,取path的值,這里都是com.alibaba.dubbo.registry.RegistryService
50         if (inf == null) return null;
51         StringBuilder buf = new StringBuilder();
52         String group = getParameter(Constants.GROUP_KEY);
53         if (group != null && group.length() > 0) {
54             buf.append(group).append("/"); //interfacegroup
55         }
56         buf.append(inf);
57         String version = getParameter(Constants.VERSION_KEY);
58         if (version != null && version.length() > 0) {
59             buf.append(":").append(version);
60         }
61         return buf.toString();
62     }

最終得到的應該是這樣的形式:protocol://username:password@host:port/group/interface{path}:version?key1=value1&key2=value2...。

這里key=zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService

之后來到了真正創建Registry的地方。

 1 public class ZookeeperRegistryFactory extends AbstractRegistryFactory {
 2     private ZookeeperTransporter zookeeperTransporter;
 3 
 4     public void setZookeeperTransporter(ZookeeperTransporter zookeeperTransporter) {
 5         this.zookeeperTransporter = zookeeperTransporter;
 6     }
 7 
 8     public Registry createRegistry(URL url) {
 9         return new ZookeeperRegistry(url, zookeeperTransporter);
10     }
11 }

這里的zookeeperTransporter對象是一個com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter$Adaptive對象。

 

在創建ZookeeperRegistry之前來看一下其繼承圖:

new ZookeeperRegistry(registryUrl, ZookeeperTransporter$Adaptive對象)

 1     private final static int DEFAULT_ZOOKEEPER_PORT = 2181;
 2     private final static String DEFAULT_ROOT = "dubbo";
 3     private final String root;
 4     private final Set<String> anyServices = new ConcurrentHashSet<String>();
 5     private final ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners = new ConcurrentHashMap<URL, ConcurrentMap<NotifyListener, ChildListener>>();
 6     private final ZookeeperClient zkClient;
 7 
 8     public ZookeeperRegistry(URL url, ZookeeperTransporter zookeeperTransporter) {
 9         super(url);
10         if (url.isAnyHost()) {
11             throw new IllegalStateException("registry address == null");
12         }
13         String group = url.getParameter(Constants.GROUP_KEY, DEFAULT_ROOT);//dubbo
14         if (!group.startsWith(Constants.PATH_SEPARATOR)) {
15             group = Constants.PATH_SEPARATOR + group;
16         }
17         this.root = group;// /dubbo
18         zkClient = zookeeperTransporter.connect(url);//創建zk客戶端,啟動會話
19         zkClient.addStateListener(new StateListener() {//監聽重新連接成功事件,重新連接成功后,之前已經完成注冊和訂閱的url要重新進行注冊和訂閱(因為臨時節點可能已經跪了)
20             public void stateChanged(int state) {
21                 if (state == RECONNECTED) {
22                     try {
23                         recover();
24                     } catch (Exception e) {
25                         logger.error(e.getMessage(), e);
26                     }
27                 }
28             }
29         });
30     }

new FailbackRegistry(registryUrl)

 1     private final ScheduledExecutorService retryExecutor = Executors.newScheduledThreadPool(1, new NamedThreadFactory("DubboRegistryFailedRetryTimer", true));
 2     // 失敗重試定時器,定時檢查是否有請求失敗,如有,無限次重試
 3     private final ScheduledFuture<?> retryFuture;
 4     private final Set<URL> failedRegistered = new ConcurrentHashSet<URL>();
 5     private final Set<URL> failedUnregistered = new ConcurrentHashSet<URL>();
 6     private final ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 7     private final ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();
 8     private final ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified = new ConcurrentHashMap<URL, Map<NotifyListener, List<URL>>>();
 9     private AtomicBoolean destroyed = new AtomicBoolean(false);
10 
11     public FailbackRegistry(URL url) {
12         super(url);
13         int retryPeriod = url.getParameter(Constants.REGISTRY_RETRY_PERIOD_KEY, Constants.DEFAULT_REGISTRY_RETRY_PERIOD);//5*1000
14         this.retryFuture = retryExecutor.scheduleWithFixedDelay(new Runnable() {
15             public void run() {
16                 // 檢測並連接注冊中心
17                 try {
18                     retry();
19                 } catch (Throwable t) { // 防御性容錯
20                     logger.error("Unexpected error occur at failed retry, cause: " + t.getMessage(), t);
21                 }
22             }
23         }, retryPeriod, retryPeriod, TimeUnit.MILLISECONDS);
24     }

new AbstractRegistry(registryUrl)

 1     // URL地址分隔符,用於文件緩存中,服務提供者URL分隔
 2     private static final char URL_SEPARATOR = ' ';
 3     // URL地址分隔正則表達式,用於解析文件緩存中服務提供者URL列表
 4     private static final String URL_SPLIT = "\\s+";
 5     // 本地磁盤緩存,其中特殊的key值.registies記錄注冊中心列表,其它均為notified服務提供者列表
 6     private final Properties properties = new Properties();
 7     // 文件緩存定時寫入
 8     private final ExecutorService registryCacheExecutor = Executors.newFixedThreadPool(1, new NamedThreadFactory("DubboSaveRegistryCache", true));
 9     //是否是同步保存文件
10     private final boolean syncSaveFile;
11     // 本地磁盤緩存文件
12     private File file;
13     private final AtomicLong lastCacheChanged = new AtomicLong();
14     private final Set<URL> registered = new ConcurrentHashSet<URL>();//已經注冊的url集合
15     private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已經訂閱的<URL, Set<NotifyListener>>
16     private final ConcurrentMap<URL, Map<String, List<URL>>> notified = new ConcurrentHashMap<URL, Map<String, List<URL>>>();//已經通知的<URL, Map<String, List<URL>>>
17     private URL registryUrl;//注冊url
18     private AtomicBoolean destroyed = new AtomicBoolean(false);
19 
20     public AbstractRegistry(URL url) {
21         setUrl(url);
22         // 啟動文件保存定時器
23         syncSaveFile = url.getParameter(Constants.REGISTRY_FILESAVE_SYNC_KEY, false);
24         String filename = url.getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/.dubbo/dubbo-registry-" + url.getHost() + ".cache");
25         File file = null;
26         if (ConfigUtils.isNotEmpty(filename)) {
27             file = new File(filename);
28             if (!file.exists() && file.getParentFile() != null && !file.getParentFile().exists()) {
29                 if (!file.getParentFile().mkdirs()) {//創建文件所在的文件夾 /Users/jigangzhao/.dubbo/
30                     throw new IllegalArgumentException("Invalid registry store file " + file + ", cause: Failed to create directory " + file.getParentFile() + "!");
31                 }
32             }
33         }
34         this.file = file;
35         loadProperties();
36         notify(url.getBackupUrls());
37     }

先簡單的總結一下:父子三代分別做的事情:

  • AbstractRegistry主要用來維護緩存文件
  • FailbackRegistry主要用來做失敗重試操作(包括:注冊失敗/反注冊失敗/訂閱失敗/反訂閱失敗/通知失敗的重試);也提供了供ZookeeperRegistry使用的zk重連后的恢復工作的方法。
  • ZookeeperRegistry創建zk客戶端,啟動會話;並且調用FailbackRegistry實現zk重連后的恢復工作

先看AbstractRegistry

  • 設置屬性registryUrl=url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • 創建文件/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache的文件夾/Users/jigangzhao/.dubbo
  • 設置屬性file:/Users/jigangzhao/.dubbo/dubbo-registry-10.211.55.5.cache文件,該文件存儲信息將是這樣的:

    com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category\=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5259&side\=provider&timestamp\=1507294508053

  • 如果file存在,將file中的內容寫入properties屬性;既然有讀file,那么是什么時候寫入file的呢?AbstractRegistry創建了一個含有一個名字為DubboSaveRegistryCache的后台線程的FixedThreadPool,只在在notify(URL url, NotifyListener listener, List<URL> urls)方法中會被調用,我們此處由於ConcurrentMap<URL, Set<NotifyListener>> subscribed為空,所以AbstractRegistry(URL url)中的notify(url.getBackupUrls())不會執行,此處也不會創建文件。
  • 最后是notify(url.getBackupUrls())(TODO 這里后續會寫

再來看FailbackRegistry:

只做了一件事,啟動了一個含有一個名為DubboRegistryFailedRetryTimer的后台線程的ScheduledThreadPool,線程創建5s后開始第一次執行retry(),之后每隔5s執行一次。來看一下retry()

  1     /**
  2      * 將所有注冊失敗的url(failedRegistered中的url)進行注冊,之后從failedRegistered進行移除;
  3      * 將所有反注冊失敗的url(failedUnregistered中的url)進行反注冊,之后從failedUnregistered進行移除;
  4      * 將所有訂閱失敗的url(failedSubscribed中的url)進行重新訂閱,之后從failedSubscribed進行移除;
  5      * 將所有反訂閱失敗的url(failedUnsubscribed中的url)進行反訂閱,之后從failedUnsubscribed進行移除;
  6      * 將所有通知失敗的url(failedNotified中的url)進行通知,之后從failedNotified進行移除;
  7      */
  8     protected void retry() {
  9         if (!failedRegistered.isEmpty()) {
 10             Set<URL> failed = new HashSet<URL>(failedRegistered);
 11             if (failed.size() > 0) {
 12                 if (logger.isInfoEnabled()) {
 13                     logger.info("Retry register " + failed);
 14                 }
 15                 try {
 16                     for (URL url : failed) {
 17                         try {
 18                             doRegister(url);
 19                             failedRegistered.remove(url);
 20                         } catch (Throwable t) { // 忽略所有異常,等待下次重試
 21                             logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 22                         }
 23                     }
 24                 } catch (Throwable t) { // 忽略所有異常,等待下次重試
 25                     logger.warn("Failed to retry register " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 26                 }
 27             }
 28         }
 29         if (!failedUnregistered.isEmpty()) {
 30             Set<URL> failed = new HashSet<URL>(failedUnregistered);
 31             if (failed.size() > 0) {
 32                 if (logger.isInfoEnabled()) {
 33                     logger.info("Retry unregister " + failed);
 34                 }
 35                 try {
 36                     for (URL url : failed) {
 37                         try {
 38                             doUnregister(url);
 39                             failedUnregistered.remove(url);
 40                         } catch (Throwable t) { // 忽略所有異常,等待下次重試
 41                             logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 42                         }
 43                     }
 44                 } catch (Throwable t) { // 忽略所有異常,等待下次重試
 45                     logger.warn("Failed to retry unregister  " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 46                 }
 47             }
 48         }
 49         if (!failedSubscribed.isEmpty()) {
 50             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedSubscribed);
 51             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 52                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 53                     failed.remove(entry.getKey());
 54                 }
 55             }
 56             if (failed.size() > 0) {
 57                 if (logger.isInfoEnabled()) {
 58                     logger.info("Retry subscribe " + failed);
 59                 }
 60                 try {
 61                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
 62                         URL url = entry.getKey();
 63                         Set<NotifyListener> listeners = entry.getValue();
 64                         for (NotifyListener listener : listeners) {
 65                             try {
 66                                 doSubscribe(url, listener);//listener需要一個一個訂閱,每訂閱一個,就將該listener從當前的url監聽列表中移除
 67                                 listeners.remove(listener);
 68                             } catch (Throwable t) { // 忽略所有異常,等待下次重試
 69                                 logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 70                             }
 71                         }
 72                     }
 73                 } catch (Throwable t) { // 忽略所有異常,等待下次重試
 74                     logger.warn("Failed to retry subscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 75                 }
 76             }
 77         }
 78         if (!failedUnsubscribed.isEmpty()) {
 79             Map<URL, Set<NotifyListener>> failed = new HashMap<URL, Set<NotifyListener>>(failedUnsubscribed);
 80             for (Map.Entry<URL, Set<NotifyListener>> entry : new HashMap<URL, Set<NotifyListener>>(failed).entrySet()) {
 81                 if (entry.getValue() == null || entry.getValue().size() == 0) {
 82                     failed.remove(entry.getKey());
 83                 }
 84             }
 85             if (failed.size() > 0) {
 86                 if (logger.isInfoEnabled()) {
 87                     logger.info("Retry unsubscribe " + failed);
 88                 }
 89                 try {
 90                     for (Map.Entry<URL, Set<NotifyListener>> entry : failed.entrySet()) {
 91                         URL url = entry.getKey();
 92                         Set<NotifyListener> listeners = entry.getValue();
 93                         for (NotifyListener listener : listeners) {
 94                             try {
 95                                 doUnsubscribe(url, listener);//listener需要一個一個反訂閱,每反訂閱一個,就將該listener從當前的url監聽列表中移除
 96                                 listeners.remove(listener);
 97                             } catch (Throwable t) { // 忽略所有異常,等待下次重試
 98                                 logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
 99                             }
100                         }
101                     }
102                 } catch (Throwable t) { // 忽略所有異常,等待下次重試
103                     logger.warn("Failed to retry unsubscribe " + failed + ", waiting for again, cause: " + t.getMessage(), t);
104                 }
105             }
106         }
107         if (!failedNotified.isEmpty()) {
108             Map<URL, Map<NotifyListener, List<URL>>> failed = new HashMap<URL, Map<NotifyListener, List<URL>>>(failedNotified);
109             for (Map.Entry<URL, Map<NotifyListener, List<URL>>> entry : new HashMap<URL, Map<NotifyListener, List<URL>>>(failed).entrySet()) {
110                 if (entry.getValue() == null || entry.getValue().size() == 0) {
111                     failed.remove(entry.getKey());
112                 }
113             }
114             if (failed.size() > 0) {
115                 if (logger.isInfoEnabled()) {
116                     logger.info("Retry notify " + failed);
117                 }
118                 try {
119                     for (Map<NotifyListener, List<URL>> values : failed.values()) {
120                         for (Map.Entry<NotifyListener, List<URL>> entry : values.entrySet()) {
121                             try {
122                                 NotifyListener listener = entry.getKey();
123                                 List<URL> urls = entry.getValue();
124                                 listener.notify(urls);
125                                 values.remove(listener);
126                             } catch (Throwable t) { // 忽略所有異常,等待下次重試
127                                 logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
128                             }
129                         }
130                     }
131                 } catch (Throwable t) { // 忽略所有異常,等待下次重試
132                     logger.warn("Failed to retry notify " + failed + ", waiting for again, cause: " + t.getMessage(), t);
133                 }
134             }
135         }
136     }

最后回到我們的主角:ZookeeperRegistry

首先是為屬性設置root=/dubbo,之后創建zk客戶端,啟動會話,最后創建了一個StateListener監聽器,監聽重新連接成功事件,重新連接成功后,之前已經完成注冊和訂閱的url要重新進行注冊和訂閱(因為臨時節點可能已經跪了)。

來看創建zk客戶端,啟動會話的代碼,這是此處最核心的部分:

ZookeeperTransporter$Adaptive.connect(com.alibaba.dubbo.common.URL registryUrl)

 1     public com.alibaba.dubbo.remoting.zookeeper.ZookeeperClient connect(com.alibaba.dubbo.common.URL arg0) {
 2         if (arg0 == null)
 3             throw new IllegalArgumentException("url == null");
 4         com.alibaba.dubbo.common.URL url = arg0;
 5         String extName = url.getParameter("client", url.getParameter("transporter", "zkclient"));//curator
 6         if(extName == null)
 7             throw new IllegalStateException("Fail to get extension(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter) name from url(" + url.toString() + ") use keys([client, transporter])");
 8         com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter extension = (com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter)ExtensionLoader.getExtensionLoader(com.alibaba.dubbo.remoting.zookeeper.ZookeeperTransporter.class).getExtension(extName);
 9         return extension.connect(arg0);
10     }

這里創建的extension是CuratorZookeeperTransporter實例。

1 public class CuratorZookeeperTransporter implements ZookeeperTransporter {
2     public ZookeeperClient connect(URL url) {
3         return new CuratorZookeeperClient(url);
4     }
5 }

new CuratorZookeeperClient(registryUrl)

 1     private final CuratorFramework client;
 2 
 3     public CuratorZookeeperClient(URL url) {
 4         super(url);
 5         try {
 6             CuratorFrameworkFactory.Builder builder = CuratorFrameworkFactory.builder()
 7                     .connectString(url.getBackupAddress())
 8                     .retryPolicy(new RetryNTimes(Integer.MAX_VALUE, 1000))
 9                     .connectionTimeoutMs(5000);
10             String authority = url.getAuthority();
11             if (authority != null && authority.length() > 0) {
12                 builder = builder.authorization("digest", authority.getBytes());
13             }
14             client = builder.build();
15             client.getConnectionStateListenable().addListener(new ConnectionStateListener() {
16                 public void stateChanged(CuratorFramework client, ConnectionState state) {
17                     if (state == ConnectionState.LOST) {
18                         CuratorZookeeperClient.this.stateChanged(StateListener.DISCONNECTED);
19                     } else if (state == ConnectionState.CONNECTED) {
20                         CuratorZookeeperClient.this.stateChanged(StateListener.CONNECTED);
21                     } else if (state == ConnectionState.RECONNECTED) {
22                         CuratorZookeeperClient.this.stateChanged(StateListener.RECONNECTED);
23                     }
24                 }
25             });
26             client.start();
27         } catch (Exception e) {
28             throw new IllegalStateException(e.getMessage(), e);
29         }
30     }

這里首先執行父類AbstractZookeeperClient的構造器來初始化一些參數,之后創建CuratorFramework客戶端,然后添加了ConnectionStateListener監聽器,監聽連接斷開/連接成功/重新連接成功事件,之后作出相應的操作(實際上這里只有重新連接成功事件會被處理,而處理器實際上就是ZookeeperRegistry構造器中的那個執行recover()的StateListener),

    protected void stateChanged(int state) {
        for (StateListener sessionListener : getSessionListeners()) {
            sessionListener.stateChanged(state);//此處查找實現類,只有ZookeeperRegistry構造器中的那個StateListener
        }
    }

最后阻塞,直到創建會話完成。

來看一下父類AbstractZookeeperClient

1     private final URL url;
2     private final Set<StateListener> stateListeners = new CopyOnWriteArraySet<StateListener>();
3     private final ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners = new ConcurrentHashMap<String, ConcurrentMap<ChildListener, TargetChildListener>>();
4     private volatile boolean closed = false;
5 
6     public AbstractZookeeperClient(URL url) {
7         this.url = url;
8     }

說明:

  • 設置屬性url=registryUrl:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • 創建了一個Set<StateListener> stateListeners,ZookeeperRegistry構造器中的那個執行recover()的StateListener就將會放在這里

 

至此,一個完整的ZookeeperRegistry實例就創建完成了,來看一下屬性:

  • ZookeeperClient zkClientCuratorZookeeperClient實例
    • CuratorFramework client:CuratorFrameworkImpl實例
    • String url:zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
    • Set<StateListener> stateListeners:{ 監聽了重連成功事件的執行recover()的StateListener }
  • String root="/dubbo"
  • URL registryUrl = zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&interface=com.alibaba.dubbo.registry.RegistryService&pid=4685&timestamp=1507286468150
  • Set<URL> registered:0//已經注冊的url集合,此處為空
  • ConcurrentMap<URL, Set<NotifyListener>> subscribed:0//已經訂閱的<URL, Set<NotifyListener>>
  • ConcurrentMap<URL, Map<String, List<URL>>> notified:0//已經通知的<URL, Map<String, List<URL>>>
  • Set<URL> failedRegistered:0//注冊失敗的url
  • Set<URL> failedUnregistered:0//反注冊失敗的url
  • ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed:0//訂閱失敗的url
  • ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed:0//反訂閱失敗的url
  • ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified:0//通知失敗的url
  • ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners:0

還有一個定時線程:DubboRegistryFailedRetryTimer每隔5s執行一次retry(),進行失敗重試。

最后,該ZookeeperRegistry會存儲在ZookeeperRegistry的父類的static屬性Map<String, Registry> REGISTRIES中:

Map<String, Registry> REGISTRIES:{ "zookeeper://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService" : ZookeeperRegistry實例 }

 

二  獲取真正要注冊到zk的節點url

1 final URL registedProviderUrl = getRegistedProviderUrl(originInvoker);
 1     /**
 2      * 1 獲取originInvoker的export參數值:就是providerUrl
 3      * 2 去除providerUrl中所有參數名是"."開頭的,然后去除參數monitor
 4      */
 5     private URL getRegistedProviderUrl(final Invoker<?> originInvoker) {
 6         URL providerUrl = getProviderUrl(originInvoker);
 7         //注冊中心看到的地址
 8         final URL registedProviderUrl = providerUrl.removeParameters(getFilteredKeys(providerUrl)).removeParameter(Constants.MONITOR_KEY);
 9         return registedProviderUrl;
10     }
11 
12     /**
13      * 從invoker的URL中的Map<String, String> parameters中獲取key為export的地址providerUrl:
14      */
15     private URL getProviderUrl(final Invoker<?> origininvoker) {
16         String export = origininvoker.getUrl().getParameterAndDecoded(Constants.EXPORT_KEY);
17         if (export == null || export.length() == 0) {
18             throw new IllegalArgumentException("The registry export url is null! registry: " + origininvoker.getUrl());
19         }
20         URL providerUrl = URL.valueOf(export);
21         return providerUrl;
22     }
23 
24     //過濾URL中不需要輸出的參數(以點號開頭的)
25     private static String[] getFilteredKeys(URL url) {
26         Map<String, String> params = url.getParameters();
27         if (params != null && !params.isEmpty()) {
28             List<String> filteredKeys = new ArrayList<String>();
29             for (Map.Entry<String, String> entry : params.entrySet()) {
30                 if (entry != null && entry.getKey() != null && entry.getKey().startsWith(Constants.HIDE_KEY_PREFIX)) {
31                     filteredKeys.add(entry.getKey());
32                 }
33             }
34             return filteredKeys.toArray(new String[filteredKeys.size()]);
35         } else {
36             return new String[]{};
37         }
38     }

最后得到的registedProviderUrl是:

  • dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=4758&side=provider&timestamp=1507289961588

 

三  注冊服務到zk

registry.register(registedProviderUrl);//創建節點(即注冊服務到zk上)

這里的registry是ZookeeperRegistry。register(registedProviderUrl)方法在ZookeeperRegistry的父類FailbackRegistry中實現。

1  FailbackRegistry.register(registedProviderUrl)

 1     @Override
 2     public void register(URL url) {
 3         if (destroyed.get()){
 4             return;
 5         }
 6         super.register(url);
 7         failedRegistered.remove(url);
 8         failedUnregistered.remove(url);
 9         try {
10             // 向服務器端發送注冊請求
11             doRegister(url);
12         } catch (Exception e) {
13             Throwable t = e;
14             // 如果開啟了啟動時檢測check=true,則直接拋出異常,不會加入到failedRegistered中
15             boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
16                     && url.getParameter(Constants.CHECK_KEY, true)
17                     && !Constants.CONSUMER_PROTOCOL.equals(url.getProtocol());
18             boolean skipFailback = t instanceof SkipFailbackWrapperException;
19             if (check || skipFailback) {
20                 if (skipFailback) {
21                     t = t.getCause();
22                 }
23                 throw new IllegalStateException("Failed to register " + url + " to registry " + getUrl().getAddress() + ", cause: " + t.getMessage(), t);
24             } else {
25                 logger.error("Failed to register " + url + ", waiting for retry, cause: " + t.getMessage(), t);
26             }
27             // 將失敗的注冊請求記錄到失敗列表,定時重試
28             failedRegistered.add(url);
29         }
30     }

首先調用父類AbstractRegistry的register(registedProviderUrl)將當前的registeredProviderUrl放到Set<URL> registered屬性中,如下:

1     public void register(URL url) {
2         if (url == null) {
3             throw new IllegalArgumentException("register url == null");
4         }
5         if (logger.isInfoEnabled()) {
6             logger.info("Register: " + url);
7         }
8         registered.add(url);
9     }

之后,從failedRegistered和failedUnregistered兩個url集合中刪除該url。然后執行真正的服務注冊(創建節點,doRegister(url)),如果在創建過程中拋出異常,如果url的協議不是consumer並且開啟了check=true的屬性並且當前存儲的URL registryUrl也有check=true的話,那么直接拋出異常,不會將該url加入到failedRegistered集合;當然拋出的異常如果是SkipFailbackWrapperException,那么也會直接拋出異常,不會將該url加入到failedRegistered集合。否則,會將該url加入到failedRegistered集合,然后DubboRegistryFailedRetryTimer線程會每隔5s執行一次doRegister(url)。

 

我們來看真正doRegister(url)。

2  ZookeeperRegistry.doRegister(registedProviderUrl)

1     protected void doRegister(URL url) {
2         try {
3             zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true));
4         } catch (Throwable e) {
5             throw new RpcException("Failed to register " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
6         }
7     }

首先是對入參registedProviderUrl進行一頓處理,

 1     private String toUrlPath(URL url) {
 2         return toCategoryPath(url) + Constants.PATH_SEPARATOR + URL.encode(url.toFullString());
 3     }
 4 
 5     private String toCategoryPath(URL url) {
 6         return toServicePath(url) + Constants.PATH_SEPARATOR + url.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
 7     }
 8 
 9     private String toServicePath(URL url) {
10         String name = url.getServiceInterface();
11         if (Constants.ANY_VALUE.equals(name)) {
12             return toRootPath();
13         }
14         return toRootDir() + URL.encode(name);// /dubbo/com.alibaba.dubbo.demo.DemoService
15     }
16 
17     private String toRootDir() {
18         if (root.equals(Constants.PATH_SEPARATOR)) {
19             return root;
20         }
21         return root + Constants.PATH_SEPARATOR;// /dubbo/
22     }
23 
24     private String toRootPath() {
25         return root;
26     }

這里就體現了上邊的ZookeeperRegistry的root屬性的作用。最終實際上得到的是:/dubbo/interface/category/encode過的export,該節點也將是創建在zk上的節點。

  • /dubbo是根節點
  • /interface是服務接口
  • /category是providers/consumers/routers/configurators等

最終得到的url是:

  • /dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629
  • 解碼后:/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5148&side=provider&timestamp=1507291294629

 

最后執行zkClient.create(toUrlPath(url), url.getParameter(Constants.DYNAMIC_KEY, true))來創建節點,該方法由CuratorZookeeperClient的父類AbstractZookeeperClient來執行:

 1     public void create(String path, boolean ephemeral) {
 2         int i = path.lastIndexOf('/');
 3         if (i > 0) {
 4             create(path.substring(0, i), false);
 5         }
 6         if (ephemeral) {
 7             createEphemeral(path);
 8         } else {
 9             createPersistent(path);
10         }
11     }

這里實際上是通過遞歸分別創建持久化的/dubbo,/dubbo/com.alibaba.dubbo.demo.DemoService以及/dubbo/com.alibaba.dubbo.demo.DemoService/providers節點;最后創建臨時節點/dubbo/com.alibaba.dubbo.demo.DemoService/providers/dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D5148%26side%3Dprovider%26timestamp%3D1507291294629,而實際上,如果使用了curator的話,可以直接使用遞歸創建節點即可(結合zk的特性,只有最后一個字節點可以是臨時節點,父節點一定是持久化節點),這里這樣的寫法應該是兼容不能遞歸創建節點的Zkclient客戶端。值得注意的是,url.getParameter(Constants.DYNAMIC_KEY, true)為true則最終創建的節點是臨時節點,否則是持久化節點。

創建節點的操作是在CuratorZookeeperClient中進行的。

 1     public void createPersistent(String path) {
 2         try {
 3             client.create().forPath(path);
 4         } catch (NodeExistsException e) {
 5         } catch (Exception e) {
 6             throw new IllegalStateException(e.getMessage(), e);
 7         }
 8     }
 9 
10     public void createEphemeral(String path) {
11         try {
12             client.create().withMode(CreateMode.EPHEMERAL).forPath(path);
13         } catch (NodeExistsException e) {
14         } catch (Exception e) {
15             throw new IllegalStateException(e.getMessage(), e);
16         }
17     }

 

到此為止,我們去zk上看一下節點的創建情況。

或者從zkui上看一下:

隱藏掉的是ip:10.10.10.10。

 

到目前為止,我們再來看看ZookeeperRegistry的屬性變化。相較於注冊前:

  • Set<URL> registered:[ dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5214&side=provider&timestamp=1507293238549 ]


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM