為了安全:服務啟動的ip全部使用10.10.10.10
遠程服務的暴露總體步驟:
- 將ref封裝為invoker
- 將invoker轉換為exporter
- 啟動netty
- 注冊服務到zookeeper
- 訂閱與通知
- 返回新的exporter實例
在7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端中,實現了前三步,在7.6 服務遠程暴露 - 注冊服務到zookeeper實現了第四步。本節實現第五步:訂閱。總體代碼如下:RegistryProtocol.export(final Invoker<T> originInvoker)
1 // 訂閱override數據 2 // FIXME 提供者訂閱時,會影響同一JVM即暴露服務,又引用同一服務的的場景,因為subscribed以服務名為緩存的key,導致訂閱信息覆蓋。 3 final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl); 4 final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker); 5 overrideListeners.put(overrideSubscribeUrl, overrideSubscribeListener); 6 registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
說明:
- 第一句代碼根據registedProviderUrl來獲取overrideSubscribeUrl。
- 第二句代碼創建overrideSubscribeListener
- 第三句代碼將{ overrideSubscribeUrl : overrideSubscribeListener放入緩存 }
- 第四句代碼實現真正的訂閱與通知
一 獲取overrideSubscribeUrl
final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);
1 /** 2 * 1 將協議改為provider; 3 * 2 添加參數:category=configurators和check=false; 4 */ 5 private URL getSubscribedOverrideUrl(URL registedProviderUrl) { 6 return registedProviderUrl.setProtocol(Constants.PROVIDER_PROTOCOL) 7 .addParameters(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY, Constants.CHECK_KEY, String.valueOf(false)); 8 }
開始時的registedProviderUrl如下:
- dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5259&side=provider×tamp=1507294508053
最終的overrideSubscribeUrl如下:
- provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5259&side=provider×tamp=1507294508053
二 創建overrideSubscribeListener
final OverrideListener overrideSubscribeListener = new OverrideListener(overrideSubscribeUrl, originInvoker);
overrideSubscribeListener是RegistryProtocol的內部類,來看一下聲明和屬性:
1 private class OverrideListener implements NotifyListener { 2 private final URL subscribeUrl; 3 private final Invoker originInvoker; 4 5 public OverrideListener(URL subscribeUrl, Invoker originalInvoker) { 6 this.subscribeUrl = subscribeUrl; 7 this.originInvoker = originalInvoker; 8 }
這里創建出來的OverrideListener實例屬性如下:
- subscribeUrl:provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1818&side=provider×tamp=1507366969962
- originInvoker:該實例還是在ServiceConfig.doExportUrlsFor1Protocol(ProtocolConfig protocolConfig, List<URL> registryURLs)創建出來的AbstractProxyInvoker實例(具體見7.4 服務遠程暴露 - 創建Exporter與啟動netty服務端)
- proxy:DemoServiceImpl實例
- type:Class<com.alibaba.dubbo.demo.DemoService>
- url:registry://10.211.55.5:2181/com.alibaba.dubbo.registry.RegistryService?application=demo-provider&client=curator&dubbo=2.0.0&export=dubbo%3A%2F%2F10.10.10.10%3A20880%2Fcom.alibaba.dubbo.demo.DemoService%3Fanyhost%3Dtrue%26application%3Ddemo-provider%26dubbo%3D2.0.0%26generic%3Dfalse%26interface%3Dcom.alibaba.dubbo.demo.DemoService%26methods%3DsayHello%26pid%3D993%26side%3Dprovider%26timestamp%3D1507100322516&pid=993®istry=zookeeper×tamp=1507100319830
最后,將創建出來的OverrideListener實例存儲在RegistryProtocol的屬性Map<URL, NotifyListener> overrideListeners中:
- key: (overrideSubscribeUrl,也就是subscribeUrl) provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1818&side=provider×tamp=1507366969962
- value: 上述的OverrideListener實例
三 真正的訂閱
registry.subscribe(overrideSubscribeUrl, overrideSubscribeListener);
這里的registry是ZookeeperRegistry實例,subscribe(URL url, NotifyListener listener)方法在其父類FailbackRegistry中,如下:
1 @Override 2 public void subscribe(URL url, NotifyListener listener) { 3 if (destroyed.get()){ 4 return; 5 } 6 super.subscribe(url, listener); 7 removeFailedSubscribed(url, listener); 8 try { 9 // 向服務器端發送訂閱請求 10 doSubscribe(url, listener); 11 } catch (Exception e) { 12 Throwable t = e; 13 14 List<URL> urls = getCacheUrls(url); 15 if (urls != null && urls.size() > 0) { 16 notify(url, listener, urls); 17 logger.error("Failed to subscribe " + url + ", Using cached list: " + urls + " from cache file: " + getUrl().getParameter(Constants.FILE_KEY, System.getProperty("user.home") + "/dubbo-registry-" + url.getHost() + ".cache") + ", cause: " + t.getMessage(), t); 18 } else { 19 // 如果開啟了啟動時檢測check=true,則直接拋出異常 20 boolean check = getUrl().getParameter(Constants.CHECK_KEY, true) 21 && url.getParameter(Constants.CHECK_KEY, true); 22 boolean skipFailback = t instanceof SkipFailbackWrapperException; 23 if (check || skipFailback) { 24 if (skipFailback) { 25 t = t.getCause(); 26 } 27 throw new IllegalStateException("Failed to subscribe " + url + ", cause: " + t.getMessage(), t); 28 } else { 29 logger.error("Failed to subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); 30 } 31 } 32 // 將失敗的訂閱請求記錄到失敗列表,定時重試 33 addFailedSubscribed(url, listener); 34 } 35 }
步驟:
- 首先調用其父類AbstractRegistry的方法,將之前創建出來的overrideSubscribeListener實例加入到overrideSubscribeUrl所對應的監聽器集合中;
- 然后從failedSubscribed/failedUnsubscribed中overrideSubscribeUrl所對應的監聽器集合中刪除overrideSubscribeListener實例;從failedNotified獲取當前url的通知失敗map Map<NotifyListener, List<URL>>,之后從中刪除掉該NotifyListener實例以及其需要通知的所有的url。
- 之后使用具體的子類(這里是ZookeeperRegistry)向服務器端發送訂閱請求
- 如果在訂閱的過程中拋出了異常,那么嘗試獲取緩存url,如果有緩存url,則進行失敗通知,之后“將失敗的訂閱請求記錄到失敗列表,定時重試”,如果沒有緩存url,如果開啟了啟動時檢測或者直接拋出的異常是SkipFailbackWrapperException,則直接拋出異常,不會“將失敗的訂閱請求記錄到失敗列表,定時重試”
將之前創建出來的overrideSubscribeListener實例加入到overrideSubscribeUrl所對應的監聽器集合中
1 private final ConcurrentMap<URL, Set<NotifyListener>> subscribed = new ConcurrentHashMap<URL, Set<NotifyListener>>();//已經訂閱的<URL, Set<NotifyListener>> 2 3 /** 4 * 首先從ConcurrentMap<URL, Set<NotifyListener>> subscribed中獲取key為url的集合Set<NotifyListener>, 5 * 如果該集合存在,直接將當前的NotifyListener實例存入該集合, 6 * 如果集合不存在,先創建,之后放入subscribed中,並將當前的NotifyListener實例存入剛剛創建的集合 7 * 8 * @param url 訂閱條件,不允許為空,如:consumer://10.10.10.10/com.alibaba.foo.BarService?version=1.0.0&application=kylin 9 * @param listener 變更事件監聽器,不允許為空 10 */ 11 public void subscribe(URL url, NotifyListener listener) { 12 if (url == null) { 13 throw new IllegalArgumentException("subscribe url == null"); 14 } 15 if (listener == null) { 16 throw new IllegalArgumentException("subscribe listener == null"); 17 } 18 if (logger.isInfoEnabled()) { 19 logger.info("Subscribe: " + url); 20 } 21 Set<NotifyListener> listeners = subscribed.get(url); 22 if (listeners == null) { 23 subscribed.putIfAbsent(url, new ConcurrentHashSet<NotifyListener>()); 24 listeners = subscribed.get(url); 25 } 26 listeners.add(listener); 27 }
從失敗集合移除overrideSubscribeListener實例
1 /** 2 * 1 從ConcurrentMap<URL, Set<NotifyListener>> failedSubscribed 中獲取當前url的訂閱失敗列表Set<NotifyListener>,之后從中刪除掉該NotifyListener實例; 3 * 2 從ConcurrentMap<URL, Set<NotifyListener>> failedUnsubscribed 中獲取當前url的反訂閱失敗列表Set<NotifyListener>,之后從中刪除掉該NotifyListener實例; 4 * 3 從ConcurrentMap<URL, Map<NotifyListener, List<URL>>> failedNotified 中獲取當前url的通知失敗map Map<NotifyListener, List<URL>>,之后從中刪除掉該NotifyListener實例以及其需要通知的所有的url。 5 * 6 * @param url 7 * @param listener 8 */ 9 private void removeFailedSubscribed(URL url, NotifyListener listener) { 10 Set<NotifyListener> listeners = failedSubscribed.get(url); 11 if (listeners != null) { 12 listeners.remove(listener); 13 } 14 listeners = failedUnsubscribed.get(url); 15 if (listeners != null) { 16 listeners.remove(listener); 17 } 18 Map<NotifyListener, List<URL>> notified = failedNotified.get(url); 19 if (notified != null) { 20 notified.remove(listener); 21 } 22 }
ZookeeperRegistry.doSubscribe(final URL url, final NotifyListener listener)
1 protected void doSubscribe(final URL url, final NotifyListener listener) { 2 try { 3 if (Constants.ANY_VALUE.equals(url.getServiceInterface())) {//這條分支先不說 4 String root = toRootPath(); 5 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 6 if (listeners == null) { 7 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 8 listeners = zkListeners.get(url); 9 } 10 ChildListener zkListener = listeners.get(listener); 11 if (zkListener == null) { 12 listeners.putIfAbsent(listener, new ChildListener() { 13 public void childChanged(String parentPath, List<String> currentChilds) { 14 for (String child : currentChilds) { 15 child = URL.decode(child); 16 if (!anyServices.contains(child)) { 17 anyServices.add(child); 18 subscribe(url.setPath(child).addParameters(Constants.INTERFACE_KEY, child, 19 Constants.CHECK_KEY, String.valueOf(false)), listener); 20 } 21 } 22 } 23 }); 24 zkListener = listeners.get(listener); 25 } 26 zkClient.create(root, false); 27 List<String> services = zkClient.addChildListener(root, zkListener); 28 if (services != null && services.size() > 0) { 29 for (String service : services) { 30 service = URL.decode(service); 31 anyServices.add(service); 32 subscribe(url.setPath(service).addParameters(Constants.INTERFACE_KEY, service, 33 Constants.CHECK_KEY, String.valueOf(false)), listener); 34 } 35 } 36 } else { 37 /** 38 * ConcurrentMap<URL, ConcurrentMap<NotifyListener, ChildListener>> zkListeners 39 * 1 根據url獲取ConcurrentMap<NotifyListener, ChildListener>,沒有就創建 40 * 2 根據listener從ConcurrentMap<NotifyListener, ChildListener>獲取ChildListener,沒有就創建(創建的ChildListener用來監聽子節點的變化) 41 * 3 創建path持久化節點 42 * 4 創建path子節點監聽器 43 */ 44 List<URL> urls = new ArrayList<URL>(); 45 for (String path : toCategoriesPath(url)) { 46 ConcurrentMap<NotifyListener, ChildListener> listeners = zkListeners.get(url); 47 if (listeners == null) { 48 zkListeners.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, ChildListener>()); 49 listeners = zkListeners.get(url); 50 } 51 ChildListener zkListener = listeners.get(listener); 52 if (zkListener == null) { 53 listeners.putIfAbsent(listener, new ChildListener() { 54 //監聽子節點列表的變化 55 public void childChanged(String parentPath, List<String> currentChilds) { 56 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); 57 } 58 }); 59 zkListener = listeners.get(listener); 60 } 61 zkClient.create(path, false);//創建持久化節點/dubbo/com.alibaba.dubbo.demo.DemoService/configurators 62 List<String> children = zkClient.addChildListener(path, zkListener); 63 if (children != null) { 64 urls.addAll(toUrlsWithEmpty(url, path, children)); 65 } 66 } 67 notify(url, listener, urls); 68 } 69 } catch (Throwable e) { 70 throw new RpcException("Failed to subscribe " + url + " to zookeeper " + getUrl() + ", cause: " + e.getMessage(), e); 71 } 72 }
說明:
- url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076
- listener:之前創建出來的overrideSubscribeListener實例
步驟:
- 首先獲取categorypath:實際上就是獲取/dubbo/{servicename}/{url中的category參數,默認是providers,這里是final URL overrideSubscribeUrl = getSubscribedOverrideUrl(registedProviderUrl);這句代碼中添加到overrideSubscribeUrl上的category=configurators}
1 private String[] toCategoriesPath(URL url) { 2 String[] categroies; 3 if (Constants.ANY_VALUE.equals(url.getParameter(Constants.CATEGORY_KEY))) { 4 categroies = new String[]{Constants.PROVIDERS_CATEGORY, Constants.CONSUMERS_CATEGORY, 5 Constants.ROUTERS_CATEGORY, Constants.CONFIGURATORS_CATEGORY}; 6 } else { 7 categroies = url.getParameter(Constants.CATEGORY_KEY, new String[]{Constants.DEFAULT_CATEGORY}); 8 } 9 String[] paths = new String[categroies.length]; 10 for (int i = 0; i < categroies.length; i++) { 11 paths[i] = toServicePath(url) + Constants.PATH_SEPARATOR + categroies[i]; 12 } 13 return paths; // /dubbo/com.alibaba.dubbo.demo.DemoService/configurators 14 }
- 然后就是獲取並創建:ConcurrentMap<overrideSubscribeUrl, ConcurrentMap<overrideSubscribeListener實例, ChildListener>> zkListeners,這里創建出來的ChildListener實例中的childChanged(String parentPath, List<String> currentChilds)方法實際上就是最終當parentPath(實際上就是上邊的categorypath)下的currentChilds發生變化時,執行的邏輯。
- 之后創建持久化節點:/dubbo/com.alibaba.dubbo.demo.DemoService/configurators
- 然后使用AbstractZookeeperClient<TargetChildListener>的addChildListener(String path, final ChildListener listener)方法為path下的子節點添加上邊創建出來的內部類ChildListener實例
- 最后進行通知
AbstractZookeeperClient<TargetChildListener>.addChildListener(String path, final ChildListener listener)
1 /** 2 * 1 根據path從ConcurrentMap<String, ConcurrentMap<ChildListener, TargetChildListener>> childListeners獲取ConcurrentMap<ChildListener, TargetChildListener>,沒有就創建 3 * 2 根據ChildListener獲取TargetChildListener,沒有就創建,TargetChildListener是真正的監聽path的子節點變化的監聽器 4 * createTargetChildListener(String path, final ChildListener listener):創建一個真正的用來執行當path節點的子節點發生變化時的邏輯 5 * 3 addTargetChildListener(path, targetListener):將剛剛創建出來的子節點監聽器訂閱path的變化,這樣之后,path的子節點發生了變化時,TargetChildListener才會執行相應的邏輯。 6 * 而實際上TargetChildListener又會調用ChildListener的實現類的childChanged(String parentPath, List<String> currentChilds)方法,而該實現類,正好是ZookeeperRegistry中實現的匿名內部類, 7 * 在該匿名內部類的childChanged(String parentPath, List<String> currentChilds)方法中,調用了ZookeeperRegistry.notify(URL url, NotifyListener listener, List<URL> urls) 8 */ 9 public List<String> addChildListener(String path, final ChildListener listener) { 10 ConcurrentMap<ChildListener, TargetChildListener> listeners = childListeners.get(path); 11 if (listeners == null) { 12 childListeners.putIfAbsent(path, new ConcurrentHashMap<ChildListener, TargetChildListener>()); 13 listeners = childListeners.get(path); 14 } 15 TargetChildListener targetListener = listeners.get(listener); 16 if (targetListener == null) { 17 listeners.putIfAbsent(listener, createTargetChildListener(path, listener)); 18 targetListener = listeners.get(listener); 19 } 20 return addTargetChildListener(path, targetListener); 21 }
步驟:
- 首先是一頓獲取和創建:ConcurrentMap<categorypath, ConcurrentMap<ZookeeperRegistry的內部類ChildListener實例, TargetChildListener>> childListeners,這里主要是創建TargetChildListener;
- 之后是真正的為path添加TargetChildListener實例。
CuratorZookeeperClient.createTargetChildListener(path, listener)
1 public CuratorWatcher createTargetChildListener(String path, ChildListener listener) { 2 return new CuratorWatcherImpl(listener); 3 } 4 5 private class CuratorWatcherImpl implements CuratorWatcher { 6 7 private volatile ChildListener listener; 8 9 public CuratorWatcherImpl(ChildListener listener) { 10 this.listener = listener; 11 } 12 13 public void unwatch() { 14 this.listener = null; 15 } 16 17 public void process(WatchedEvent event) throws Exception { 18 if (listener != null) { 19 listener.childChanged(event.getPath(), client.getChildren().usingWatcher(this).forPath(event.getPath())); 20 } 21 } 22 }
很簡單,就是創建一個監聽path子節點的watcher,當path下有子節點變化時,調用listener(即傳入的ZookeeperRegistry的內部類ChildListener實例的childChanged(String parentPath, List<String> currentChilds)方法)。
CuratorZookeeperClient.addTargetChildListener(String path, CuratorWatcher targetChildListener)
1 public List<String> addTargetChildListener(String path, CuratorWatcher listener) { 2 try { 3 return client.getChildren().usingWatcher(listener).forPath(path); 4 } catch (NoNodeException e) { 5 return null; 6 } catch (Exception e) { 7 throw new IllegalStateException(e.getMessage(), e); 8 } 9 }
從上邊的分析我們可以看出,當path節點下的子節點發生變化的時候,會首先調用TargetChildListener的process(WatchedEvent event)方法,在該方法中又會調用ChildListener實例的childChanged(String parentPath, List<String> currentChilds)方法,那么我們來分析一下該方法:
1 //監聽子節點列表的變化 2 public void childChanged(String parentPath, List<String> currentChilds) { 3 ZookeeperRegistry.this.notify(url, listener, toUrlsWithEmpty(url, parentPath, currentChilds)); 4 }
步驟:
- 首先獲取子節點urls或者是一個consumer的empty協議的url
1 /** 2 * 過濾出providers中與consumer匹配的url集合 3 */ 4 private List<URL> toUrlsWithoutEmpty(URL consumer, List<String> providers) { 5 List<URL> urls = new ArrayList<URL>(); 6 if (providers != null && providers.size() > 0) { 7 for (String provider : providers) { 8 provider = URL.decode(provider); 9 if (provider.contains("://")) { 10 URL url = URL.valueOf(provider); 11 if (UrlUtils.isMatch(consumer, url)) { 12 urls.add(url); 13 } 14 } 15 } 16 } 17 return urls; 18 } 19 20 /** 21 * 1 首先過濾出providers中與consumer匹配的providerUrl集合 22 * 2 如果providerUrl集合不為空,直接返回這個集合 23 * 3 如果為空,首先從path中獲取category,然后將consumer的協議換成empty,添加參數category=configurators 24 * @param consumer provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076 25 * @param path /dubbo/com.alibaba.dubbo.demo.DemoService/configurators 26 * @param providers 27 */ 28 private List<URL> toUrlsWithEmpty(URL consumer, String path, List<String> providers) { 29 List<URL> urls = toUrlsWithoutEmpty(consumer, providers); 30 if (urls == null || urls.isEmpty()) { 31 int i = path.lastIndexOf('/'); 32 String category = i < 0 ? path : path.substring(i + 1);//configurators 33 URL empty = consumer.setProtocol(Constants.EMPTY_PROTOCOL).addParameter(Constants.CATEGORY_KEY, category); 34 urls.add(empty); 35 } 36 return urls; // empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=1237&side=provider×tamp=1507352638483 37 }
- 之后調用ZookeeperRegistry的父類FailbackRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
1 @Override 2 protected void notify(URL url, NotifyListener listener, List<URL> urls) { 3 if (url == null) { 4 throw new IllegalArgumentException("notify url == null"); 5 } 6 if (listener == null) { 7 throw new IllegalArgumentException("notify listener == null"); 8 } 9 try { 10 doNotify(url, listener, urls); 11 } catch (Exception t) { 12 // 將失敗的通知請求記錄到失敗列表,定時重試 13 Map<NotifyListener, List<URL>> listeners = failedNotified.get(url); 14 if (listeners == null) { 15 failedNotified.putIfAbsent(url, new ConcurrentHashMap<NotifyListener, List<URL>>()); 16 listeners = failedNotified.get(url); 17 } 18 listeners.put(listener, urls); 19 logger.error("Failed to notify for subscribe " + url + ", waiting for retry, cause: " + t.getMessage(), t); 20 } 21 } 22 23 protected void doNotify(URL url, NotifyListener listener, List<URL> urls) { 24 super.notify(url, listener, urls); 25 }
說明:這里傳入的
-
url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076
- listener:之前創建出來的overrideSubscribeListener實例
- urls:[ empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076 ]
- 這里首先執行父類的AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls),如果失敗,則獲取或創建ConcurrentMap<overrideSubscribeUrl, Map<overrideSubscribeListener實例, urls>> failedNotified,后續做重試
-
來看一下通知的最核心部分:
AbstractRegistry.notify(URL url, NotifyListener listener, List<URL> urls)
1 /** 2 * 1 首先遍歷List<URL> urls,將urls按照category進行分類,存儲在Map<"categoryName", List<URL>> result中; 3 * 2 之后遍歷result:(每遍歷一次,都是一個新的category) 4 * (1)將Map<"categoryName", List<URL>>存儲在ConcurrentMap<URL, Map<String, List<URL>>> notified的Map<String, List<URL>>中 5 * (2)進行properties設置和文件保存 6 * (3)調用傳入放入listener的notify()方法。 7 * @param url 8 * @param listener 9 * @param urls 10 */ 11 protected void notify(URL url, NotifyListener listener, List<URL> urls) { 12 if (url == null) { 13 throw new IllegalArgumentException("notify url == null"); 14 } 15 if (listener == null) { 16 throw new IllegalArgumentException("notify listener == null"); 17 } 18 if ((urls == null || urls.size() == 0) 19 && !Constants.ANY_VALUE.equals(url.getServiceInterface())) { 20 logger.warn("Ignore empty notify urls for subscribe url " + url); 21 return; 22 } 23 if (logger.isInfoEnabled()) { 24 logger.info("Notify urls for subscribe url " + url + ", urls: " + urls); 25 } 26 /** 27 * 遍歷List<URL> urls,將urls按照category進行分類 28 */ 29 Map<String, List<URL>> result = new HashMap<String, List<URL>>(); //{ "categoryName" : List<URL> } 30 for (URL u : urls) { 31 if (UrlUtils.isMatch(url, u)) { 32 String category = u.getParameter(Constants.CATEGORY_KEY, Constants.DEFAULT_CATEGORY); 33 List<URL> categoryList = result.get(category); 34 if (categoryList == null) { 35 categoryList = new ArrayList<URL>(); 36 result.put(category, categoryList); 37 } 38 categoryList.add(u); 39 } 40 } 41 if (result.size() == 0) { 42 return; 43 } 44 Map<String, List<URL>> categoryNotified = notified.get(url); 45 if (categoryNotified == null) { 46 notified.putIfAbsent(url, new ConcurrentHashMap<String, List<URL>>()); 47 categoryNotified = notified.get(url); 48 } 49 for (Map.Entry<String, List<URL>> entry : result.entrySet()) { 50 String category = entry.getKey(); 51 List<URL> categoryList = entry.getValue(); 52 categoryNotified.put(category, categoryList);//填充notified集合 53 saveProperties(url);//該行代碼為什么不寫在循環體外邊 54 listener.notify(categoryList); 55 } 56 }
說明:這里傳入的
-
url(overrideSubscribeUrl):provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076
- listener:之前創建出來的overrideSubscribeListener實例
- urls:[ empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=9544&side=provider×tamp=1507643800076 ]
步驟:
- 首先遍歷List<URL> urls,將urls按照category進行分類,存儲在Map<"categoryName", List<URL>> result中;
- 然后獲取或創建ConcurrentMap<overrideSubscribeUrl, Map<"categoryName", subList(urls)>> notified
- 最后遍歷Map<"categoryName", List<URL>> result
- 去填充notified集合
- 保存傳入的url到Properties properties(本地磁盤緩存中)
- 調用傳入的listener的notify方法(注意:這里調用的正是文章開頭創建的overrideSubscribeListener實例的notify方法)
AbstractRegistry.saveProperties(URL url)
1 /** 2 * 1 按照url從將ConcurrentMap<URL, Map<String, List<URL>>> notified中將Map<String, List<URL>>拿出來,之后將所有category的list組成一串buf(以空格分隔) 3 * 2 將< serviceKey<->buf >寫入本地磁盤緩存中:Properties properties 4 * 3 將AtomicLong lastCacheChanged加1 5 * 4 之后根據syncSaveFile判斷時同步保存properties到文件,還是異步保存properties到文件 6 * @param url 7 */ 8 private void saveProperties(URL url) { 9 if (file == null) { 10 return; 11 } 12 13 try { 14 StringBuilder buf = new StringBuilder(); 15 Map<String, List<URL>> categoryNotified = notified.get(url); 16 if (categoryNotified != null) { 17 for (List<URL> us : categoryNotified.values()) { 18 for (URL u : us) { 19 if (buf.length() > 0) { 20 buf.append(URL_SEPARATOR); 21 } 22 buf.append(u.toFullString()); 23 } 24 } 25 } 26 properties.setProperty(url.getServiceKey(), buf.toString()); 27 long version = lastCacheChanged.incrementAndGet(); 28 if (syncSaveFile) { 29 doSaveProperties(version); 30 } else { 31 registryCacheExecutor.execute(new SaveProperties(version)); 32 } 33 } catch (Throwable t) { 34 logger.warn(t.getMessage(), t); 35 } 36 }
說明:
- 入參:url:provider://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5033&side=provider×tamp=1507720343596
- properties:{ "com.alibaba.dubbo.demo.DemoService" -> "empty://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&category=configurators&check=false&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5033&side=provider×tamp=1507720343596" }
- 最后采用異步線程將properties中的內容寫入到文件中
AbstractRegistry$SaveProperties
1 private class SaveProperties implements Runnable { 2 private long version; 3 4 private SaveProperties(long version) { 5 this.version = version; 6 } 7 8 public void run() { 9 doSaveProperties(version); 10 } 11 }
AbstractRegistry.doSaveProperties(long version)
1 /** 2 * 1 先將文件中的內容讀取到一個新的Properties newProperties中; 3 * 2 之后將properties中的信息寫入這個newProperties中; 4 * 3 之后創建dubbo-registry-10.211.55.5.cache.lock文件; 5 * 4 最后將這個newProperties中的內容寫入到文件中 6 */ 7 public void doSaveProperties(long version) { 8 if (version < lastCacheChanged.get()) { 9 return; 10 } 11 if (file == null) { 12 return; 13 } 14 Properties newProperties = new Properties(); 15 // 保存之前先讀取一遍,防止多個注冊中心之間沖突 16 InputStream in = null; 17 try { 18 if (file.exists()) { 19 in = new FileInputStream(file); 20 newProperties.load(in); 21 } 22 } catch (Throwable e) { 23 logger.warn("Failed to load registry store file, cause: " + e.getMessage(), e); 24 } finally { 25 if (in != null) { 26 try { 27 in.close(); 28 } catch (IOException e) { 29 logger.warn(e.getMessage(), e); 30 } 31 } 32 } 33 // 保存 34 try { 35 newProperties.putAll(properties); 36 File lockfile = new File(file.getAbsolutePath() + ".lock"); 37 if (!lockfile.exists()) { 38 lockfile.createNewFile();//創建lock文件 39 } 40 RandomAccessFile raf = new RandomAccessFile(lockfile, "rw"); 41 try { 42 FileChannel channel = raf.getChannel(); 43 try { 44 FileLock lock = channel.tryLock(); 45 if (lock == null) { 46 throw new IOException("Can not lock the registry cache file " + file.getAbsolutePath() + ", ignore and retry later, maybe multi java process use the file, please config: dubbo.registry.file=xxx.properties"); 47 } 48 // 保存 49 try { 50 if (!file.exists()) { 51 file.createNewFile(); 52 } 53 FileOutputStream outputFile = new FileOutputStream(file); 54 try { 55 newProperties.store(outputFile, "Dubbo Registry Cache"); 56 } finally { 57 outputFile.close(); 58 } 59 } finally { 60 lock.release(); 61 } 62 } finally { 63 channel.close(); 64 } 65 } finally { 66 raf.close(); 67 } 68 } catch (Throwable e) { 69 if (version < lastCacheChanged.get()) { 70 return; 71 } else { 72 registryCacheExecutor.execute(new SaveProperties(lastCacheChanged.incrementAndGet())); 73 } 74 logger.warn("Failed to save registry store file, cause: " + e.getMessage(), e); 75 } 76 }
步驟見注釋。這里有一個version,實際上是一個CAS判斷,我們在saveProperties(URL url)方法中執行了long version = lastCacheChanged.incrementAndGet();之后在doSaveProperties(long version)進行if (version < lastCacheChanged.get())判斷,如果滿足這個條件,說明當前線程在進行doSaveProperties(long version)時,已經有其他線程執行了saveProperties(URL url),馬上就要執行doSaveProperties(long version),所以當前線程放棄操作,讓后邊的這個線程來做保存操作。
保存操作執行之后,會在文件夾/Users/jigangzhao/.dubbo下生成兩個文件:
- dubbo-registry-10.211.55.5.cache
- dubbo-registry-10.211.55.5.cache.lock
前者的內容:
#Wed Oct 11 19:42:29 CST 2017 com.alibaba.dubbo.demo.DemoService=empty\://10.10.10.10\:20880/com.alibaba.dubbo.demo.DemoService?anyhost\=true&application\=demo-provider&category\=configurators&check\=false&dubbo\=2.0.0&generic\=false&interface\=com.alibaba.dubbo.demo.DemoService&methods\=sayHello&pid\=5165&side\=provider×tamp\=1507722024953
最后就是OverrideListener.notify(List<URL> urls)
1 /** 2 * 重新export 3 * 1.protocol中的exporter destroy問題 4 * 1.要求registryprotocol返回的exporter可以正常destroy 5 * 2.notify后不需要重新向注冊中心注冊 6 * 3.export 方法傳入的invoker最好能一直作為exporter的invoker. 7 */ 8 private class OverrideListener implements NotifyListener { 9 private final URL subscribeUrl; 10 private final Invoker originInvoker; 11 12 public OverrideListener(URL subscribeUrl, Invoker originalInvoker) { 13 this.subscribeUrl = subscribeUrl; 14 this.originInvoker = originalInvoker; 15 } 16 17 /** 18 * 目的: 19 * 對原本注冊了的providerUrl進行校驗,如果url發生了變化,那么要重新export 20 * 21 * @param urls 已注冊信息列表,總不為空,含義同{@link com.alibaba.dubbo.registry.RegistryService#lookup(URL)}的返回值。 22 */ 23 public synchronized void notify(List<URL> urls) { 24 logger.debug("original override urls: " + urls); 25 List<URL> matchedUrls = getMatchedUrls(urls, subscribeUrl); 26 logger.debug("subscribe url: " + subscribeUrl + ", override urls: " + matchedUrls); 27 //沒有匹配的 28 if (matchedUrls.isEmpty()) { 29 return; 30 } 31 32 List<Configurator> configurators = RegistryDirectory.toConfigurators(matchedUrls);//這里是一個空列表 33 34 final Invoker<?> invoker; 35 if (originInvoker instanceof InvokerDelegete) { 36 invoker = ((InvokerDelegete<?>) originInvoker).getInvoker(); 37 } else { 38 invoker = originInvoker; 39 } 40 //最原始的invoker 41 URL originUrl = RegistryProtocol.this.getProviderUrl(invoker);//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider×tamp=1507723571451 42 String key = getCacheKey(originInvoker);//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider×tamp=1507723571451 43 ExporterChangeableWrapper<?> exporter = bounds.get(key);//在doLocalExport方法中已經存放在這里了 44 if (exporter == null) { 45 logger.warn(new IllegalStateException("error state, exporter should not be null")); 46 return; 47 } 48 //當前的,可能經過了多次merge 49 URL currentUrl = exporter.getInvoker().getUrl();//dubbo://10.10.10.10:20880/com.alibaba.dubbo.demo.DemoService?anyhost=true&application=demo-provider&dubbo=2.0.0&generic=false&interface=com.alibaba.dubbo.demo.DemoService&methods=sayHello&pid=5279&side=provider×tamp=1507723571451 50 //與本次配置merge的 51 URL newUrl = getConfigedInvokerUrl(configurators, originUrl); 52 if (!currentUrl.equals(newUrl)) { 53 RegistryProtocol.this.doChangeLocalExport(originInvoker, newUrl);//重新將invoker暴露為exporter 54 logger.info("exported provider url changed, origin url: " + originUrl + ", old export url: " + currentUrl + ", new export url: " + newUrl); 55 } 56 } 57 58 private List<URL> getMatchedUrls(List<URL> configuratorUrls, URL currentSubscribe) { 59 List<URL> result = new ArrayList<URL>(); 60 for (URL url : configuratorUrls) { 61 URL overrideUrl = url; 62 // 兼容舊版本 63 if (url.getParameter(Constants.CATEGORY_KEY) == null 64 && Constants.OVERRIDE_PROTOCOL.equals(url.getProtocol())) { 65 overrideUrl = url.addParameter(Constants.CATEGORY_KEY, Constants.CONFIGURATORS_CATEGORY); 66 } 67 68 //檢查是不是要應用到當前服務上 69 if (UrlUtils.isMatch(currentSubscribe, overrideUrl)) { 70 result.add(url); 71 } 72 } 73 return result; 74 } 75 76 //合並配置的url 77 private URL getConfigedInvokerUrl(List<Configurator> configurators, URL url) { 78 for (Configurator configurator : configurators) { 79 url = configurator.configure(url); 80 } 81 return url; 82 } 83 }
最后:總結一下:
當前的provider訂閱了/dubbo/com.alibaba.dubbo.demo.DemoService/configurators,當其下的子節點發生變化時,如果其下的子節點的url或者當前的providerUrl發生了變化,需要重新暴露。
重新暴露:
1 /** 2 * 對修改了url的invoker重新export 3 * 4 * @param originInvoker 5 * @param newInvokerUrl 6 */ 7 @SuppressWarnings("unchecked") 8 private <T> void doChangeLocalExport(final Invoker<T> originInvoker, URL newInvokerUrl) { 9 String key = getCacheKey(originInvoker); 10 final ExporterChangeableWrapper<T> exporter = (ExporterChangeableWrapper<T>) bounds.get(key); 11 if (exporter == null) { 12 logger.warn(new IllegalStateException("error state, exporter should not be null")); 13 } else { 14 final Invoker<T> invokerDelegete = new InvokerDelegete<T>(originInvoker, newInvokerUrl); 15 exporter.setExporter(protocol.export(invokerDelegete)); 16 } 17 }