Zookeeper的watch機制


1.watch機制

Zookeeper的應用場景中配置中心,其中看到watch機制

 

 

1.1 什么是watch機制

zookeeper作為一款成熟的分布式協調框架,訂閱-發布功能是很重要的一個。所謂訂閱功能,其實說白了就是觀察者模式。觀察者會訂閱一些感興趣的主題,然后這些主題一旦變化了,就會自動通知到這些觀察者。

zookeeper的訂閱發布也就是watch機制,是一個輕量級的設計。因為它采用了一種推拉結合的模式。一旦服務端感知主題變了,那么只會發送一個事件類型和節點信息給關注的客戶端,而不會包括具體的變更內容,所以事件本身是輕量級的,這就是所謂的“推”部分。然后,收到變更通知的客戶端需要自己去拉變更的數據,這就是“拉”部分。watche機制分為添加數據和監聽節點。

Curator在這方面做了優化,Curator引入了Cache的概念用來實現對ZooKeeper服務器端進行事件監聽。Cache是Curator對事件監聽的包裝,其對事件的監聽可以近似看做是一個本地緩存視圖和遠程ZooKeeper視圖的對比過程。而且Curator會自動的再次監聽,我們就不需要自己手動的重復監聽了。

Curator中的cache共有三種

  • NodeCache(監聽和緩存根節點變化)

  • PathChildrenCache(監聽和緩存子節點變化)

  • TreeCache(監聽和緩存根節點變化和子節點變化)

下面我們分別對三種cache詳解

1.2. NodeCache

  • 介紹

    NodeCache是用來監聽節點的數據變化的,當監聽的節點的數據發生變化的時候就會回調對應的函數。

  • 增加監聽

 1 //創建重試策略
 2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  3 //創建客戶端
 4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  5 //開啟客戶端
 6 client.start();  7 System.out.println("連接成功");  8 //創建節點數據監聽對象
 9 final NodeCache nodeCache = new NodeCache(client, "/hello"); 10 //開始緩存
11 /**
12  * 參數為true:可以直接獲取監聽的節點,System.out.println(nodeCache.getCurrentData());為ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608 13 , data=[97, 98, 99, 100, 101]} 14  * 參數為false:不可以獲取監聽的節點,System.out.println(nodeCache.getCurrentData());為null 15  */
16 nodeCache.start(true); 17 System.out.println(nodeCache.getCurrentData()); 18 //添加監聽對象
19 nodeCache.getListenable().addListener(new NodeCacheListener() { 20     //如果節點數據有變化,會回調該方法
21     public void nodeChanged() throws Exception { 22         String data = new String(nodeCache.getCurrentData().getData()); 23         System.out.println("數據Watcher:路徑=" + nodeCache.getCurrentData().getPath() 24                 + ":data=" + data); 25  } 26 }); 27 System.in.read();

 測試

修改節點數據

 

 

控制台顯示

1.3. PathChildrenCache

  • 介紹

    PathChildrenCache是用來監聽指定節點 的子節點變化情況

  • 增加監聽

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  3 client.start();  4 //監聽指定節點的子節點變化情況包括͹新增子節點 子節點數據變更 和子節點刪除  5 //true表示用於配置是否把節點內容緩存起來,如果配置為true,客戶端在接收到節點列表變更的同時,也能夠獲取到節點的數據內容(即:event.getData().getData())ͺ如果為false 則無法取到數據內容(即:event.getData().getData())
 6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true);  7 /**
 8  * NORMAL: 普通啟動方式, 在啟動時緩存子節點數據  9  * POST_INITIALIZED_EVENT:在啟動時緩存子節點數據,提示初始化 10  * BUILD_INITIAL_CACHE: 在啟動時什么都不會輸出 11  * 在官方解釋中說是因為這種模式會在start執行執行之前先執行rebuild的方法,而rebuild的方法不會發出任何事件通知。 12  */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 14 System.out.println(childrenCache.getCurrentData()); 15 //添加監聽
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { 17  @Override 18     public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 19         if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){ 20             System.out.println("子節點更新"); 21             System.out.println("節點:"+event.getData().getPath()); 22             System.out.println("數據" + new String(event.getData().getData())); 23         }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){ 24             System.out.println("初始化操作"); 25         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){ 26             System.out.println("刪除子節點"); 27             System.out.println("節點:"+event.getData().getPath()); 28             System.out.println("數據" + new String(event.getData().getData())); 29         }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){ 30             System.out.println("添加子節點"); 31             System.out.println("節點:"+event.getData().getPath()); 32             System.out.println("數據" + new String(event.getData().getData())); 33         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){ 34             System.out.println("連接失效"); 35         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){ 36             System.out.println("重新連接"); 37         }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){ 38             System.out.println("連接失效后稍等一會兒執行"); 39  } 40  } 41 }); 42 System.in.read(); // 使線程阻塞

1.4. TreeCache

  • 介紹

    TreeCache有點像上面兩種Cache的結合體,NodeCache能夠監聽自身節點的數據變化(或者是創建該節點),PathChildrenCache能夠監聽自身節點下的子節點的變化,而TreeCache既能夠監聽自身節點的變化、也能夠監聽子節點的變化。

  • 添加監聽

 1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1);  2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy);  3 client.start();  4 TreeCache treeCache = new TreeCache(client,"/hello");  5 treeCache.start();  6 System.out.println(treeCache.getCurrentData("/hello"));  7 treeCache.getListenable().addListener(new TreeCacheListener() {  8  @Override  9     public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { 10             if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){ 11                 System.out.println(event.getData().getPath() + "節點添加"); 12             }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){ 13                 System.out.println(event.getData().getPath() + "節點移除"); 14             }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){ 15                 System.out.println(event.getData().getPath() + "節點修改"); 16             }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){ 17                 System.out.println("初始化完成"); 18             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){ 19                 System.out.println("連接過時"); 20             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){ 21                 System.out.println("重新連接"); 22             }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){ 23                 System.out.println("連接過時一段時間"); 24  } 25  } 26 }); 27 System.in.read();

【小結】

1:Zookeeper的數據結構(樹型結構)

2:節點的分類(4個)

  • 持久性(帶序號、不帶序號)

  • 臨時性(帶序號、不帶序號)

3:客戶端命令(創建、查詢、修改、刪除)

4:Zookeeper的java的api介紹(創建、查詢、修改、刪除)

  • Curator的客戶端

    RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
    CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);

5:Zookeeper的watch機制

  • NodeCache

  • PathChildrenCache

  • TreeCache(監聽和緩存根幾點變化和子節點變化)(重點)


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM