7.7 服務遠程暴露 - 訂閱與通知(TODO)


為了安全:服務啟動的ip全部使用10.10.10.10

遠程服務的暴露總體步驟:

  • 將ref封裝為invoker
  • 將invoker轉換為exporter
  • 啟動netty
  • 注冊服務到zookeeper
  • 訂閱與通知
  • 返回新的exporter實例

7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端中,實現了前三步,在7.6 服務遠程暴露 - 注冊服務到zookeeper實現了第四步。本節實現第五步:訂閱。總體代碼如下:RegistryProtocol.export(final Invoker<T> originInvoker)

1         // 訂閱override數據
2         // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。
3         final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
4         final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
5         overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener);
6         registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

說明:

  • 第一句代碼根據registedProviderUrl來獲取overrideSubscribeUrl。
  • 第二句代碼創建overrideSubscribeListener
  • 第三句代碼將{ overrideSubscribeUrl : overrideSubscribeListener放入緩存 }
  • 第四句代碼實現真正的訂閱與通知

 

一  獲取overrideSubscribeUrl

final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
1     /**
2      * 1 將協議改為provider;
3      * 2 添加參數:category=configurators和check=false;
4      */
5     private URL getSubscribedOverrideUrl(URL registedProviderUrl) {
6         return registedProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL)
7                                   .addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false));
8     }

開始時的registedProviderUrl如下:

  • dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5259&side=provider&timestamp=1507294508053

最終的overrideSubscribeUrl如下:

  • provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5259&side=provider&timestamp=1507294508053

 

二  創建overrideSubscribeListener

final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);

overrideSubscribeListener是RegistryProtocol的內部類,來看一下聲明和屬性:

1     private class OverrideListener implements NotifyListener {
2         private final URL subscribeUrl;
3         private final Invoker originInvoker;
4 
5         public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
6             this.subscribeUrl = subscribeUrl;
7             this.originInvoker = originalInvoker;
8         }

這里創建出來的OverrideListener實例屬性如下:

  • subscribeUrl:provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1818&side=provider&timestamp=1507366969962
  • originInvoker:該實例還是在ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)創建出來的AbstractProxyInvoker實例(具體見7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端
    • proxy:DemoServiceImpl實例
    • type:Class<com.alibaba.dubbo.demo.DemoService>
    • url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993&registry=zookeeper&timestamp=1507100319830

最后,將創建出來的OverrideListener實例存儲在RegistryProtocol的屬性Map<URL, NotifyListener> overrideListeners中:

  • key: (overrideSubscribeUrl,也就是subscribeUrl) provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1818&side=provider&timestamp=1507366969962
  • value:  上述的OverrideListener實例

 

三  真正的訂閱

registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);

這里的registry是ZookeeperRegistry實例,subscribe(URL url, NotifyListener listener)方法在其父類FailbackRegistry中,如下:

 1     @Override
 2     public void subscribe(URL url, NotifyListener listener) {
 3         if (destroyed.get()){
 4             return;
 5         }
 6         super.subscribe(url, listener);
 7         removeFailedSubscribed(url, listener);
 8         try {
 9             // 向服務器端發送訂閱請求
10             doSubscribe(url, listener);
11         } catch (Exception e) {
12             Throwable t = e;
13 
14             List<URL> urls = getCacheUrls(url);
15             if (urls != null && urls.size() > 0) {
16                 notify(url, listener, urls);
17                 logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t);
18             } else {
19                 // 如果開啟了啟動時檢測check=true,則直接拋出異常
20                 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true)
21                         && url.getParameter(Constants.CHECK_KEY, true);
22                 boolean skipFailback = t instanceof SkipFailbackWrapperException;
23                 if (check || skipFailback) {
24                     if (skipFailback) {
25                         t = t.getCause();
26                     }
27                     throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t);
28                 } else {
29                     logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
30                 }
31             }
32             // 將失敗的訂閱請求記錄到失敗列表,定時重試
33             addFailedSubscribed(url, listener);
34         }
35     }

步驟:

  • 首先調用其父類AbstractRegistry的方法,將之前創建出來的overrideSubscribeListener實例加入到overrideSubscribeUrl所對應的監聽器集合中;
  • 然后從failedSubscribed/failedUnsubscribed中overrideSubscribeUrl所對應的監聽器集合中刪除overrideSubscribeListener實例;從failedNotified獲取當前url的通知失敗map Map<NotifyListener, List<URL>>,之后從中刪除掉該NotifyListener實例以及其需要通知的所有的url。
  • 之后使用具體的子類(這里是ZookeeperRegistry)向服務器端發送訂閱請求
  • 如果在訂閱的過程中拋出了異常,那么嘗試獲取緩存url,如果有緩存url,則進行失敗通知,之后“將失敗的訂閱請求記錄到失敗列表,定時重試”,如果沒有緩存url,如果開啟了啟動時檢測或者直接拋出的異常是SkipFailbackWrapperException,則直接拋出異常,不會“將失敗的訂閱請求記錄到失敗列表,定時重試”

 

將之前創建出來的overrideSubscribeListener實例加入到overrideSubscribeUrl所對應的監聽器集合中

 1    private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已經訂閱的<URL, Set<NotifyListener>>
 2     
 3      /**
 4      * 首先從ConcurrentMap<URL, Set<NotifyListener>> subscribed中獲取key為url的集合Set<NotifyListener>,
 5      * 如果該集合存在,直接將當前的NotifyListener實例存入該集合,
 6      * 如果集合不存在,先創建,之后放入subscribed中,並將當前的NotifyListener實例存入剛剛創建的集合
 7      *
 8      * @param url      訂閱條件,不允許為空,如:consumer://10.10.10.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin
 9      * @param listener 變更事件監聽器,不允許為空
10      */
11     public void subscribe(URL url, NotifyListener listener) {
12         if (url == null) {
13             throw new IllegalArgumentException("subscribe url == null");
14         }
15         if (listener == null) {
16             throw new IllegalArgumentException("subscribe listener == null");
17         }
18         if (logger.isInfoEnabled()) {
19             logger.info("Subscribe: " + url);
20         }
21         Set<NotifyListener> listeners = subscribed.get(url);
22         if (listeners == null) {
23             subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>());
24             listeners = subscribed.get(url);
25         }
26         listeners.add(listener);
27     }

從失敗集合移除overrideSubscribeListener實例

 1     /**
 2      * 1 從ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed 中獲取當前url的訂閱失敗列表Set<NotifyListener>,之后從中刪除掉該NotifyListener實例;
 3      * 2 從ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed 中獲取當前url的反訂閱失敗列表Set<NotifyListener>,之后從中刪除掉該NotifyListener實例;
 4      * 3 從ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified 中獲取當前url的通知失敗map Map<NotifyListener, List<URL>>,之后從中刪除掉該NotifyListener實例以及其需要通知的所有的url。
 5      *
 6      * @param url
 7      * @param listener
 8      */
 9     private void removeFailedSubscribed(URL url, NotifyListener listener) {
10         Set<NotifyListener> listeners = failedSubscribed.get(url);
11         if (listeners != null) {
12             listeners.remove(listener);
13         }
14         listeners = failedUnsubscribed.get(url);
15         if (listeners != null) {
16             listeners.remove(listener);
17         }
18         Map<NotifyListener, List<URL>> notified = failedNotified.get(url);
19         if (notified != null) {
20             notified.remove(listener);
21         }
22     }

ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)

 1 protected void doSubscribe(final URL url, final NotifyListener listener) {
 2         try {
 3             if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {//這條分支先不說
 4                 String root = toRootPath();
 5                 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
 6                 if (listeners == null) {
 7                     zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
 8                     listeners = zkListeners.get(url);
 9                 }
10                 ChildListener zkListener = listeners.get(listener);
11                 if (zkListener == null) {
12                     listeners.putIfAbsent(listener, new ChildListener() {
13                         public void childChanged(String parentPath, List<String> currentChilds) {
14                             for (String child : currentChilds) {
15                                 child = URL.decode(child);
16                                 if (!anyServices.contains(child)) {
17                                     anyServices.add(child);
18                                     subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child,
19                                             Constants.CHECK_KEY, String.valueOf(false)), listener);
20                                 }
21                             }
22                         }
23                     });
24                     zkListener = listeners.get(listener);
25                 }
26                 zkClient.create(root, false);
27                 List<String> services = zkClient.addChildListener(root, zkListener);
28                 if (services != null && services.size() > 0) {
29                     for (String service : services) {
30                         service = URL.decode(service);
31                         anyServices.add(service);
32                         subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service,
33                                 Constants.CHECK_KEY, String.valueOf(false)), listener);
34                     }
35                 }
36             } else {
37                 /**
38                  * ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners
39                  * 1 根據url獲取ConcurrentMap<NotifyListener, ChildListener>,沒有就創建
40                  * 2 根據listener從ConcurrentMap<NotifyListener, ChildListener>獲取ChildListener,沒有就創建(創建的ChildListener用來監聽子節點的變化)
41                  * 3 創建path持久化節點
42                  * 4 創建path子節點監聽器
43                  */
44                 List<URL> urls = new ArrayList<URL>();
45                 for (String path : toCategoriesPath(url)) {
46                     ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url);
47                     if (listeners == null) {
48                         zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>());
49                         listeners = zkListeners.get(url);
50                     }
51                     ChildListener zkListener = listeners.get(listener);
52                     if (zkListener == null) {
53                         listeners.putIfAbsent(listener, new ChildListener() {
54                             //監聽子節點列表的變化
55                             public void childChanged(String parentPath, List<String> currentChilds) {
56                                 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
57                             }
58                         });
59                         zkListener = listeners.get(listener);
60                     }
61                     zkClient.create(path, false);//創建持久化節點/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
62                     List<String> children = zkClient.addChildListener(path, zkListener);
63                     if (children != null) {
64                         urls.addAll(toUrlsWithEmpty(url, path, children));
65                     }
66                 }
67                 notify(url, listener, urls);
68             }
69         } catch (Throwable e) {
70             throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e);
71         }
72     }

說明:

  • url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076
  • listener:之前創建出來的overrideSubscribeListener實例

步驟:

  • 首先獲取categorypath:實際上就是獲取/dubbo/{servicename}/{url中的category參數,默認是providers,這里是final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);這句代碼中添加到overrideSubscribeUrl上的category=configurators}
 1     private String[] toCategoriesPath(URL url) {
 2         String[] categroies;
 3         if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) {
 4             categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY,
 5                     Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY};
 6         } else {
 7             categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY});
 8         }
 9         String[] paths = new String[categroies.length];
10         for (int i = 0; i < categroies.length; i++) {
11             paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i];
12         }
13         return paths; // /dubbo/com.alibaba.dubbo.demo.DemoService/configurators
14     }
  • 然后就是獲取並創建:ConcurrentMap<overrideSubscribeUrl, ConcurrentMap<overrideSubscribeListener實例, ChildListener>> zkListeners,這里創建出來的ChildListener實例中的childChanged(String parentPath, List<String> currentChilds)方法實際上就是最終當parentPath(實際上就是上邊的categorypath)下的currentChilds發生變化時,執行的邏輯。
  • 之后創建持久化節點:/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
  • 然后使用AbstractZookeeperClient<TargetChildListener>的addChildListener(String path, final ChildListener listener)方法為path下的子節點添加上邊創建出來的內部類ChildListener實例
  • 最后進行通知

AbstractZookeeperClient<TargetChildListener>.addChildListener(String path, final ChildListener listener)

 1     /**
 2      *  1 根據path從ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners獲取ConcurrentMap<ChildListener, TargetChildListener>,沒有就創建
 3      *  2 根據ChildListener獲取TargetChildListener,沒有就創建,TargetChildListener是真正的監聽path的子節點變化的監聽器
 4      *  createTargetChildListener(String path, final ChildListener listener):創建一個真正的用來執行當path節點的子節點發生變化時的邏輯
 5      *  3 addTargetChildListener(path, targetListener):將剛剛創建出來的子節點監聽器訂閱path的變化,這樣之后,path的子節點發生了變化時,TargetChildListener才會執行相應的邏輯。
 6      *  而實際上TargetChildListener又會調用ChildListener的實現類的childChanged(String parentPath, List<String> currentChilds)方法,而該實現類,正好是ZookeeperRegistry中實現的匿名內部類,
 7      *  在該匿名內部類的childChanged(String parentPath, List<String> currentChilds)方法中,調用了ZookeeperRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
 8      */
 9     public List<String> addChildListener(String path, final ChildListener listener) {
10         ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path);
11         if (listeners == null) {
12             childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>());
13             listeners = childListeners.get(path);
14         }
15         TargetChildListener targetListener = listeners.get(listener);
16         if (targetListener == null) {
17             listeners.putIfAbsent(listener, createTargetChildListener(path, listener));
18             targetListener = listeners.get(listener);
19         }
20         return addTargetChildListener(path, targetListener);
21     }

步驟:

  • 首先是一頓獲取和創建:ConcurrentMap<categorypath, ConcurrentMap<ZookeeperRegistry的內部類ChildListener實例, TargetChildListener>> childListeners,這里主要是創建TargetChildListener;
  • 之后是真正的為path添加TargetChildListener實例。

CuratorZookeeperClient.createTargetChildListener(path, listener)

 1     public CuratorWatcher createTargetChildListener(String path, ChildListener listener) {
 2         return new CuratorWatcherImpl(listener);
 3     }
 4 
 5     private class CuratorWatcherImpl implements CuratorWatcher {
 6 
 7         private volatile ChildListener listener;
 8 
 9         public CuratorWatcherImpl(ChildListener listener) {
10             this.listener = listener;
11         }
12 
13         public void unwatch() {
14             this.listener = null;
15         }
16 
17         public void process(WatchedEvent event) throws Exception {
18             if (listener != null) {
19                 listener.childChanged(event.getPath(), client.getChildren().usingWatcher(this).forPath(event.getPath()));
20             }
21         }
22     }

很簡單,就是創建一個監聽path子節點的watcher,當path下有子節點變化時,調用listener(即傳入的ZookeeperRegistry的內部類ChildListener實例的childChanged(String parentPath, List<String> currentChilds)方法)。

CuratorZookeeperClient.addTargetChildListener(String path, CuratorWatcher targetChildListener)

1     public List<String> addTargetChildListener(String path, CuratorWatcher listener) {
2         try {
3             return client.getChildren().usingWatcher(listener).forPath(path);
4         } catch (NoNodeException e) {
5             return null;
6         } catch (Exception e) {
7             throw new IllegalStateException(e.getMessage(), e);
8         }
9     }

從上邊的分析我們可以看出,當path節點下的子節點發生變化的時候,會首先調用TargetChildListener的process(WatchedEvent event)方法,在該方法中又會調用ChildListener實例的childChanged(String parentPath, List<String> currentChilds)方法,那么我們來分析一下該方法:

1                             //監聽子節點列表的變化
2                             public void childChanged(String parentPath, List<String> currentChilds) {
3                                 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds));
4                             }

步驟:

  • 首先獲取子節點urls或者是一個consumer的empty協議的url
     1     /**
     2      * 過濾出providers中與consumer匹配的url集合
     3      */
     4     private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) {
     5         List<URL> urls = new ArrayList<URL>();
     6         if (providers != null && providers.size() > 0) {
     7             for (String provider : providers) {
     8                 provider = URL.decode(provider);
     9                 if (provider.contains("://")) {
    10                     URL url = URL.valueOf(provider);
    11                     if (UrlUtils.isMatch(consumer, url)) {
    12                         urls.add(url);
    13                     }
    14                 }
    15             }
    16         }
    17         return urls;
    18     }
    19 
    20     /**
    21      * 1 首先過濾出providers中與consumer匹配的providerUrl集合
    22      * 2 如果providerUrl集合不為空,直接返回這個集合
    23      * 3 如果為空,首先從path中獲取category,然后將consumer的協議換成empty,添加參數category=configurators
    24      * @param consumer provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076
    25      * @param path /dubbo/com.alibaba.dubbo.demo.DemoService/configurators
    26      * @param providers 
    27      */
    28     private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) {
    29         List<URL> urls = toUrlsWithoutEmpty(consumer, providers);
    30         if (urls == null || urls.isEmpty()) {
    31             int i = path.lastIndexOf('/');
    32             String category = i < 0 ? path : path.substring(i + 1);//configurators
    33             URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category);
    34             urls.add(empty);
    35         }
    36         return urls; // empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1237&side=provider&timestamp=1507352638483
    37     }
  • 之后調用ZookeeperRegistry的父類FailbackRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
     1     @Override
     2     protected void notify(URL url, NotifyListener listener, List<URL> urls) {
     3         if (url == null) {
     4             throw new IllegalArgumentException("notify url == null");
     5         }
     6         if (listener == null) {
     7             throw new IllegalArgumentException("notify listener == null");
     8         }
     9         try {
    10             doNotify(url, listener, urls);
    11         } catch (Exception t) {
    12             // 將失敗的通知請求記錄到失敗列表,定時重試
    13             Map<NotifyListener, List<URL>> listeners = failedNotified.get(url);
    14             if (listeners == null) {
    15                 failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>());
    16                 listeners = failedNotified.get(url);
    17             }
    18             listeners.put(listener, urls);
    19             logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t);
    20         }
    21     }
    22 
    23     protected void doNotify(URL url, NotifyListener listener, List<URL> urls) {
    24         super.notify(url, listener, urls);
    25     }

    說明:這里傳入的

    • url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076

    • listener:之前創建出來的overrideSubscribeListener實例
    • urls:[ empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076 ]
    • 這里首先執行父類的AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls),如果失敗,則獲取或創建ConcurrentMap<overrideSubscribeUrl, Map<overrideSubscribeListener實例, urls>> failedNotified,后續做重試

來看一下通知的最核心部分:

AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)

 1     /**
 2      * 1 首先遍歷List<URL> urls,將urls按照category進行分類,存儲在Map<"categoryName", List<URL>> result中;
 3      * 2 之后遍歷result:(每遍歷一次,都是一個新的category)
 4      * (1)將Map<"categoryName", List<URL>>存儲在ConcurrentMap<URL, Map<String, List<URL>>> notified的Map<String, List<URL>>中
 5      * (2)進行properties設置和文件保存
 6      * (3)調用傳入放入listener的notify()方法。
 7      * @param url
 8      * @param listener
 9      * @param urls
10      */
11     protected void notify(URL url, NotifyListener listener, List<URL> urls) {
12         if (url == null) {
13             throw new IllegalArgumentException("notify url == null");
14         }
15         if (listener == null) {
16             throw new IllegalArgumentException("notify listener == null");
17         }
18         if ((urls == null || urls.size() == 0)
19                 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) {
20             logger.warn("Ignore empty notify urls for subscribe url " + url);
21             return;
22         }
23         if (logger.isInfoEnabled()) {
24             logger.info("Notify urls for subscribe url " + url + ", urls: " + urls);
25         }
26         /**
27          * 遍歷List<URL> urls,將urls按照category進行分類
28          */
29         Map<String, List<URL>> result = new HashMap<String, List<URL>>(); //{ "categoryName" : List<URL> }
30         for (URL u : urls) {
31             if (UrlUtils.isMatch(url, u)) {
32                 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY);
33                 List<URL> categoryList = result.get(category);
34                 if (categoryList == null) {
35                     categoryList = new ArrayList<URL>();
36                     result.put(category, categoryList);
37                 }
38                 categoryList.add(u);
39             }
40         }
41         if (result.size() == 0) {
42             return;
43         }
44         Map<String, List<URL>> categoryNotified = notified.get(url);
45         if (categoryNotified == null) {
46             notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>());
47             categoryNotified = notified.get(url);
48         }
49         for (Map.Entry<String, List<URL>> entry : result.entrySet()) {
50             String category = entry.getKey();
51             List<URL> categoryList = entry.getValue();
52             categoryNotified.put(category, categoryList);//填充notified集合
53             saveProperties(url);//該行代碼為什么不寫在循環體外邊
54             listener.notify(categoryList);
55         }
56     }

說明:這里傳入的

  • url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076

  • listener:之前創建出來的overrideSubscribeListener實例
  • urls:[ empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider&timestamp=1507643800076 ]

步驟:

  • 首先遍歷List<URL> urls,將urls按照category進行分類,存儲在Map<"categoryName", List<URL>> result中;
  • 然后獲取或創建ConcurrentMap<overrideSubscribeUrl, Map<"categoryName", subList(urls)>> notified
  • 最后遍歷Map<"categoryName", List<URL>> result
    • 去填充notified集合
    • 保存傳入的url到Properties properties(本地磁盤緩存中)
    • 調用傳入的listener的notify方法(注意:這里調用的正是文章開頭創建的overrideSubscribeListener實例的notify方法)

AbstractRegistry.saveProperties(URL url)

 1     /**
 2      * 1 按照url從將ConcurrentMap<URL, Map<String, List<URL>>> notified中將Map<String, List<URL>>拿出來,之后將所有category的list組成一串buf(以空格分隔)
 3      * 2 將< serviceKey<->buf >寫入本地磁盤緩存中:Properties properties
 4      * 3 將AtomicLong lastCacheChanged加1
 5      * 4 之后根據syncSaveFile判斷時同步保存properties到文件,還是異步保存properties到文件
 6      * @param url
 7      */
 8     private void saveProperties(URL url) {
 9         if (file == null) {
10             return;
11         }
12 
13         try {
14             StringBuilder buf = new StringBuilder();
15             Map<String, List<URL>> categoryNotified = notified.get(url);
16             if (categoryNotified != null) {
17                 for (List<URL> us : categoryNotified.values()) {
18                     for (URL u : us) {
19                         if (buf.length() > 0) {
20                             buf.append(URL_SEPARATOR);
21                         }
22                         buf.append(u.toFullString());
23                     }
24                 }
25             }
26             properties.setProperty(url.getServiceKey(), buf.toString());
27             long version = lastCacheChanged.incrementAndGet();
28             if (syncSaveFile) {
29                 doSaveProperties(version);
30             } else {
31                 registryCacheExecutor.execute(new SaveProperties(version));
32             }
33         } catch (Throwable t) {
34             logger.warn(t.getMessage(), t);
35         }
36     }

說明:

  • 入參:url:provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5033&side=provider&timestamp=1507720343596
  • properties:{ "com.alibaba.dubbo.demo.DemoService" -> "empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5033&side=provider&timestamp=1507720343596" }
  • 最后采用異步線程將properties中的內容寫入到文件中

AbstractRegistry$SaveProperties

 1     private class SaveProperties implements Runnable {
 2         private long version;
 3 
 4         private SaveProperties(long version) {
 5             this.version = version;
 6         }
 7 
 8         public void run() {
 9             doSaveProperties(version);
10         }
11     }

AbstractRegistry.doSaveProperties(long version)

 1     /**
 2      * 1 先將文件中的內容讀取到一個新的Properties newProperties中;
 3      * 2 之后將properties中的信息寫入這個newProperties中;
 4      * 3 之后創建dubbo-registry-10.211.55.5.cache.lock文件;
 5      * 4 最后將這個newProperties中的內容寫入到文件中
 6      */
 7     public void doSaveProperties(long version) {
 8         if (version < lastCacheChanged.get()) {
 9             return;
10         }
11         if (file == null) {
12             return;
13         }
14         Properties newProperties = new Properties();
15         // 保存之前先讀取一遍,防止多個注冊中心之間沖突
16         InputStream in = null;
17         try {
18             if (file.exists()) {
19                 in = new FileInputStream(file);
20                 newProperties.load(in);
21             }
22         } catch (Throwable e) {
23             logger.warn("Failed to load registry store file, cause: " + e.getMessage(), e);
24         } finally {
25             if (in != null) {
26                 try {
27                     in.close();
28                 } catch (IOException e) {
29                     logger.warn(e.getMessage(), e);
30                 }
31             }
32         }
33         // 保存
34         try {
35             newProperties.putAll(properties);
36             File lockfile = new File(file.getAbsolutePath() + ".lock");
37             if (!lockfile.exists()) {
38                 lockfile.createNewFile();//創建lock文件
39             }
40             RandomAccessFile raf = new RandomAccessFile(lockfile, "rw");
41             try {
42                 FileChannel channel = raf.getChannel();
43                 try {
44                     FileLock lock = channel.tryLock();
45                     if (lock == null) {
46                         throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties");
47                     }
48                     // 保存
49                     try {
50                         if (!file.exists()) {
51                             file.createNewFile();
52                         }
53                         FileOutputStream outputFile = new FileOutputStream(file);
54                         try {
55                             newProperties.store(outputFile, "Dubbo Registry Cache");
56                         } finally {
57                             outputFile.close();
58                         }
59                     } finally {
60                         lock.release();
61                     }
62                 } finally {
63                     channel.close();
64                 }
65             } finally {
66                 raf.close();
67             }
68         } catch (Throwable e) {
69             if (version < lastCacheChanged.get()) {
70                 return;
71             } else {
72                 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet()));
73             }
74             logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e);
75         }
76     }

步驟見注釋。這里有一個version,實際上是一個CAS判斷,我們在saveProperties(URL url)方法中執行了long version = lastCacheChanged.incrementAndGet();之后在doSaveProperties(long version)進行if (version < lastCacheChanged.get())判斷,如果滿足這個條件,說明當前線程在進行doSaveProperties(long version)時,已經有其他線程執行了saveProperties(URL url),馬上就要執行doSaveProperties(long version),所以當前線程放棄操作,讓后邊的這個線程來做保存操作。

保存操作執行之后,會在文件夾/Users/jigangzhao/.dubbo下生成兩個文件:

  • dubbo-registry-10.211.55.5.cache
  • dubbo-registry-10.211.55.5.cache.lock

前者的內容:

#Wed Oct 11 19:42:29 CST 2017
com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category\=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5165&side\=provider&timestamp\=1507722024953

最后就是OverrideListener.notify(List<URL> urls)

 1     /**
 2      * 重新export
 3      * 1.protocol中的exporter destroy問題
 4      * 1.要求registryprotocol返回的exporter可以正常destroy
 5      * 2.notify后不需要重新向注冊中心注冊
 6      * 3.export 方法傳入的invoker最好能一直作為exporter的invoker.
 7      */
 8     private class OverrideListener implements NotifyListener {
 9         private final URL subscribeUrl;
10         private final Invoker originInvoker;
11 
12         public OverrideListener(URL subscribeUrl, Invoker originalInvoker) {
13             this.subscribeUrl = subscribeUrl;
14             this.originInvoker = originalInvoker;
15         }
16 
17         /**
18          * 目的:
19          * 對原本注冊了的providerUrl進行校驗,如果url發生了變化,那么要重新export
20          *
21          * @param urls 已注冊信息列表,總不為空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。
22          */
23         public synchronized void notify(List<URL> urls) {
24             logger.debug("original override urls: " + urls);
25             List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl);
26             logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls);
27             //沒有匹配的
28             if (matchedUrls.isEmpty()) {
29                 return;
30             }
31 
32             List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);//這里是一個空列表
33 
34             final Invoker<?> invoker;
35             if (originInvoker instanceof InvokerDelegete) {
36                 invoker = ((InvokerDelegete<?>) originInvoker).getInvoker();
37             } else {
38                 invoker = originInvoker;
39             }
40             //最原始的invoker
41             URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider&timestamp=1507723571451
42             String key = getCacheKey(originInvoker);//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider&timestamp=1507723571451
43             ExporterChangeableWrapper<?> exporter = bounds.get(key);//在doLocalExport方法中已經存放在這里了
44             if (exporter == null) {
45                 logger.warn(new IllegalStateException("error state, exporter should not be null"));
46                 return;
47             }
48             //當前的,可能經過了多次merge
49             URL currentUrl = exporter.getInvoker().getUrl();//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider&timestamp=1507723571451
50             //與本次配置merge的
51             URL newUrl = getConfigedInvokerUrl(configurators, originUrl);
52             if (!currentUrl.equals(newUrl)) {
53                 RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);//重新將invoker暴露為exporter
54                 logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl);
55             }
56         }
57 
58         private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) {
59             List<URL> result = new ArrayList<URL>();
60             for (URL url : configuratorUrls) {
61                 URL overrideUrl = url;
62                 // 兼容舊版本
63                 if (url.getParameter(Constants.CATEGORY_KEY) == null
64                         && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) {
65                     overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY);
66                 }
67 
68                 //檢查是不是要應用到當前服務上
69                 if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) {
70                     result.add(url);
71                 }
72             }
73             return result;
74         }
75 
76         //合並配置的url
77         private URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) {
78             for (Configurator configurator : configurators) {
79                 url = configurator.configure(url);
80             }
81             return url;
82         }
83     }

最后:總結一下:

當前的provider訂閱了/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,當其下的子節點發生變化時,如果其下的子節點的url或者當前的providerUrl發生了變化,需要重新暴露

重新暴露:

 1     /**
 2      * 對修改了url的invoker重新export
 3      *
 4      * @param originInvoker
 5      * @param newInvokerUrl
 6      */
 7     @SuppressWarnings("unchecked")
 8     private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) {
 9         String key = getCacheKey(originInvoker);
10         final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key);
11         if (exporter == null) {
12             logger.warn(new IllegalStateException("error state, exporter should not be null"));
13         } else {
14             final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl);
15             exporter.setExporter(protocol.export(invokerDelegete));
16         }
17     }

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM