Zookeeper的應用場景中配置中心,其中看到watch機制
zookeeper作為一款成熟的分布式協調框架,訂閱-發布功能是很重要的一個。所謂訂閱功能,其實說白了就是觀察者模式。觀察者會訂閱一些感興趣的主題,然后這些主題一旦變化了,就會自動通知到這些觀察者。
zookeeper的訂閱發布也就是watch機制,是一個輕量級的設計。因為它采用了一種推拉結合的模式。一旦服務端感知主題變了,那么只會發送一個事件類型和節點信息給關注的客戶端,而不會包括具體的變更內容,所以事件本身是輕量級的,這就是所謂的“推”部分。然后,收到變更通知的客戶端需要自己去拉變更的數據,這就是“拉”部分。watche機制分為添加數據和監聽節點。
Curator在這方面做了優化,Curator引入了Cache的概念用來實現對ZooKeeper服務器端進行事件監聽。Cache是Curator對事件監聽的包裝,其對事件的監聽可以近似看做是一個本地緩存視圖和遠程ZooKeeper視圖的對比過程。而且Curator會自動的再次監聽,我們就不需要自己手動的重復監聽了。
Curator中的cache共有三種
-
NodeCache(監聽和緩存根節點變化)
-
PathChildrenCache(監聽和緩存子節點變化)
-
TreeCache(監聽和緩存根節點變化和子節點變化)
下面我們分別對三種cache詳解
-
NodeCache是用來監聽節點的數據變化的,當監聽的節點的數據發生變化的時候就會回調對應的函數。
-
1 //創建重試策略
2 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 3 //創建客戶端
4 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 5 //開啟客戶端
6 client.start(); 7 System.out.println("連接成功"); 8 //創建節點數據監聽對象
9 final NodeCache nodeCache = new NodeCache(client, "/hello"); 10 //開始緩存
11 /**
12 * 參數為true:可以直接獲取監聽的節點,System.out.println(nodeCache.getCurrentData());為ChildData{path='/aa', stat=607,765,1580205779732,1580973376268,2,1,0,0,5,1,608 13 , data=[97, 98, 99, 100, 101]} 14 * 參數為false:不可以獲取監聽的節點,System.out.println(nodeCache.getCurrentData());為null 15 */
16 nodeCache.start(true); 17 System.out.println(nodeCache.getCurrentData()); 18 //添加監聽對象
19 nodeCache.getListenable().addListener(new NodeCacheListener() { 20 //如果節點數據有變化,會回調該方法
21 public void nodeChanged() throws Exception { 22 String data = new String(nodeCache.getCurrentData().getData()); 23 System.out.println("數據Watcher:路徑=" + nodeCache.getCurrentData().getPath() 24 + ":data=" + data); 25 } 26 }); 27 System.in.read();
測試
修改節點數據
控制台顯示
-
PathChildrenCache是用來監聽指定節點 的子節點變化情況
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 3 client.start(); 4 //監聽指定節點的子節點變化情況包括新增子節點 子節點數據變更 和子節點刪除 5 //true表示用於配置是否把節點內容緩存起來,如果配置為true,客戶端在接收到節點列表變更的同時,也能夠獲取到節點的數據內容(即:event.getData().getData())ͺ如果為false 則無法取到數據內容(即:event.getData().getData())
6 PathChildrenCache childrenCache = new PathChildrenCache(client,"/hello",true); 7 /**
8 * NORMAL: 普通啟動方式, 在啟動時緩存子節點數據 9 * POST_INITIALIZED_EVENT:在啟動時緩存子節點數據,提示初始化 10 * BUILD_INITIAL_CACHE: 在啟動時什么都不會輸出 11 * 在官方解釋中說是因為這種模式會在start執行執行之前先執行rebuild的方法,而rebuild的方法不會發出任何事件通知。 12 */
13 childrenCache.start(PathChildrenCache.StartMode.POST_INITIALIZED_EVENT); 14 System.out.println(childrenCache.getCurrentData()); 15 //添加監聽
16 childrenCache.getListenable().addListener(new PathChildrenCacheListener() { 17 @Override 18 public void childEvent(CuratorFramework client, PathChildrenCacheEvent event) throws Exception { 19 if(event.getType() == PathChildrenCacheEvent.Type.CHILD_UPDATED){ 20 System.out.println("子節點更新"); 21 System.out.println("節點:"+event.getData().getPath()); 22 System.out.println("數據" + new String(event.getData().getData())); 23 }else if(event.getType() == PathChildrenCacheEvent.Type.INITIALIZED ){ 24 System.out.println("初始化操作"); 25 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_REMOVED ){ 26 System.out.println("刪除子節點"); 27 System.out.println("節點:"+event.getData().getPath()); 28 System.out.println("數據" + new String(event.getData().getData())); 29 }else if(event.getType() == PathChildrenCacheEvent.Type.CHILD_ADDED ){ 30 System.out.println("添加子節點"); 31 System.out.println("節點:"+event.getData().getPath()); 32 System.out.println("數據" + new String(event.getData().getData())); 33 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_SUSPENDED ){ 34 System.out.println("連接失效"); 35 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_RECONNECTED ){ 36 System.out.println("重新連接"); 37 }else if(event.getType() == PathChildrenCacheEvent.Type.CONNECTION_LOST ){ 38 System.out.println("連接失效后稍等一會兒執行"); 39 } 40 } 41 }); 42 System.in.read(); // 使線程阻塞
-
TreeCache有點像上面兩種Cache的結合體,NodeCache能夠監聽自身節點的數據變化(或者是創建該節點),PathChildrenCache能夠監聽自身節點下的子節點的變化,而TreeCache既能夠監聽自身節點的變化、也能夠監聽子節點的變化。
-
1 RetryPolicy retryPolicy = new ExponentialBackoffRetry(1000,1); 2 CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 1000, 1000, retryPolicy); 3 client.start(); 4 TreeCache treeCache = new TreeCache(client,"/hello"); 5 treeCache.start(); 6 System.out.println(treeCache.getCurrentData("/hello")); 7 treeCache.getListenable().addListener(new TreeCacheListener() { 8 @Override 9 public void childEvent(CuratorFramework client, TreeCacheEvent event) throws Exception { 10 if(event.getType() == TreeCacheEvent.Type.NODE_ADDED){ 11 System.out.println(event.getData().getPath() + "節點添加"); 12 }else if (event.getType() == TreeCacheEvent.Type.NODE_REMOVED){ 13 System.out.println(event.getData().getPath() + "節點移除"); 14 }else if(event.getType() == TreeCacheEvent.Type.NODE_UPDATED){ 15 System.out.println(event.getData().getPath() + "節點修改"); 16 }else if(event.getType() == TreeCacheEvent.Type.INITIALIZED){ 17 System.out.println("初始化完成"); 18 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_SUSPENDED){ 19 System.out.println("連接過時"); 20 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_RECONNECTED){ 21 System.out.println("重新連接"); 22 }else if(event.getType() ==TreeCacheEvent.Type.CONNECTION_LOST){ 23 System.out.println("連接過時一段時間"); 24 } 25 } 26 }); 27 System.in.read();
1:Zookeeper的數據結構(樹型結構)
2:節點的分類(4個)
-
持久性(帶序號、不帶序號)
-
臨時性(帶序號、不帶序號)
3:客戶端命令(創建、查詢、修改、刪除)
4:Zookeeper的java的api介紹(創建、查詢、修改、刪除)
-
Curator的客戶端
RetryPolicy retryPolicy = new ExponentialBackoffRetry(3000,3);
CuratorFramework client = CuratorFrameworkFactory.newClient("127.0.0.1:2181", 3000, 3000, retryPolicy);
5:Zookeeper的watch機制
-
NodeCache
-
PathChildrenCache
-
TreeCache(監聽和緩存根幾點變化和子節點變化)(重點)