https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_WatchRememberThese
-
zookeeper
提供了數據的發布/訂閱
功能,多個訂閱者可同時監聽某一特定主題對象,當該主題對象的自身狀態發生變化時例如節點內容改變、節點下的子節點列表改變等,會實時、主動通知所有訂閱者 -
zookeeper
采用了Watcher
機制實現數據的發布訂閱功能。該機制在被訂閱對象發生變化時會異步通知客戶端,因此客戶端不必在Watcher
注冊后輪詢阻塞,從而減輕了客戶端壓力 -
watcher
機制事件上與觀察者模式類似,也可看作是一種觀察者模式在分布式場景下的實現方式
watcher架構
watcher
實現由三個部分組成
-
zookeeper
服務端 -
zookeeper
客戶端 -
客戶端的
ZKWatchManager對象
客戶端首先將 Watcher
注冊到服務端,同時將 Watcher
對象保存到客戶端的watch
管理器中。當Zookeeper
服務端監聽的數據狀態發生變化時,服務端會主動通知客戶端,接着客戶端的 Watch
管理器會
watcher特性
-
-
特性 說明 一次性 watcher
是一次性的,一旦被觸發就會移除,再次使用時需要重新注冊客戶端順序回調 watcher
回調是順序串行執行的,只有回調后客戶端才能看到最新的數據狀態。一個watcher
回調邏輯不應該太多,以免影響別的watcher
執行輕量級 WatchEvent
是最小的通信單位,結構上只包含通知狀態、事件類型和節點路徑,並不會告訴數據節點變化前后的具體內容時效性 watcher
只有在當前session
徹底失效時才會無效,若在session
有效期內快速重連成功,則watcher
依然存在,仍可接收到通知;
watcher接口設計
Watcher
是一個接口,任何實現了Watcher
接口的類就算一個新的Watcher
。Watcher
內部包含了兩個枚舉類:KeeperState
、EventType
Watcher通知狀態(KeeperState)
KeeperState
是客戶端與服務端連接狀態發生變化時對應的通知類型。路徑為org.apache.zookeeper.Watcher.EventKeeperState
,是一個枚舉類,其枚舉屬性如下:
-
-
枚舉屬性 說明 SyncConnected
客戶端與服務器正常連接時 Disconnected
客戶端與服務器斷開連接時 Expired
會話 session
失效時AuthFailed
身份認證失敗時
Watcher事件類型(EventType)
EventType
是數據節點znode
發生變化時對應的通知類型。EventType
變化時KeeperState
永遠處於SyncConnected
通知狀態下;當keeperState
發生變化時,EventType
永遠為None
。其路徑為org.apache.zookeeper.Watcher.Event.EventType
,是一個枚舉類,枚舉屬性如下:
-
-
枚舉屬性 說明 None
無 NodeCreated
Watcher
監聽的數據節點被創建時NodeDeleted
Watcher
監聽的數據節點被刪除時NodeDataChanged
Watcher
監聽的數據節點內容發生更改時(無論數據是否真的變化)NodeChildrenChanged
Watcher
監聽的數據節點的子節點列表發生變更時 -
注意:客戶端接收到的相關事件通知中只包含狀態以及類型等信息,不包含節點變化前后的具體內容,變化前的數據需業務自身存儲,變化后的數據需要調用
get
等方法重新獲取
捕獲相應的事件
上面講到zookeeper
客戶端連接的狀態和zookeeper
對znode
節點監聽的事件類型,下面我們來講解如何建立zookeeper
的watcher
監聽。在zookeeper
中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)
這樣的方式來為某個znode
注冊監聽 。
下表以node-x
節點為例,說明調用的注冊方法和可用監聽事件間的關系:
注冊方式 | created | childrenChanged | Changed | Deleted |
---|---|---|---|---|
zk.exists("/node-x",watcher) |
可監控 | 可監控 | 可監控 | |
zk.getData("/node-x",watcher) |
可監控 | 可監控 | ||
zk.getChildren("/node-x",watcher) |
可監控 | 可監控 |
注冊watcher的方法
客戶端與服務器端的連接狀態
-
KeeperState
:通知狀態 -
SyncConnected
:客戶端與服務器正常連接時 -
Disconnected
:客戶端與服務器斷開連接時 -
Expired
:會話session
失效時 -
AuthFailed
:身份認證失敗時 -
事件類型為:
None
public class ZkConnectionWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ // 正常 System.out.println("正常連接"); }else if (state == Event.KeeperState.Disconnected){ // 可以用Windows斷開虛擬機網卡的方式模擬 // 當會話斷開會出現,斷開連接不代表不能重連,在會話超時時間內重連可以恢復正常 System.out.println("斷開連接"); }else if (state == Event.KeeperState.Expired){ // 沒有在會話超時時間內重新連接,而是當會話超時被移除的時候重連會走進這里 System.out.println("連接過期"); }else if (state == Event.KeeperState.AuthFailed){ // 在操作的時候權限不夠會出現 System.out.println("授權失敗"); } countDownLatch.countDown(); } private static final String IP = "192.168.133.133:2181" ; private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws Exception { // 5000為會話超時時間 ZooKeeper zooKeeper = new ZooKeeper(IP, 5000, new ZkConnectionWatcher()); countDownLatch.await(); // 模擬授權失敗 zooKeeper.addAuthInfo("digest1","itcast1:123451".getBytes()); byte[] data = zooKeeper.getData("/hadoop", false, null); System.out.println(new String(data)); TimeUnit.SECONDS.sleep(50); } }
watcher檢查節點
exists
-
exists(String path, boolean b)
-
exists(String path, Watcher w)
-
NodeCreated
:節點創建 -
NodeDeleted
:節點刪除 -
NodeDataChanged
:節點內容
public class EventTypeTest { private static final String IP = "192.168.133.133:2181"; private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; // 采用zookeeper連接創建時的監聽器 public static void exists1() throws Exception{ zooKeeper.exists("/watcher1",true); } // 自定義監聽器 public static void exists2() throws Exception{ zooKeeper.exists("/watcher1",(WatchedEvent w) -> { System.out.println("自定義" + w.getType()); }); } // 演示使用多次的監聽器 public static void exists3() throws Exception{ zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定義的" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1",this); } catch (Exception e) { e.printStackTrace(); } } } }); } // 演示一節點注冊多個監聽器 public static void exists4() throws Exception{ zooKeeper.exists("/watcher1",(WatchedEvent w) -> { System.out.println("自定義1" + w.getType()); }); zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定義2" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1",this); } catch (Exception e) { e.printStackTrace(); } } } }); } // 測試 public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper(IP, 5000, new ZKWatcher()); countDownLatch.await(); exists4(); TimeUnit.SECONDS.sleep(50); } static class ZKWatcher implements Watcher{ @Override public void process(WatchedEvent watchedEvent) { countDownLatch.countDown(); System.out.println("zk的監聽器" + watchedEvent.getType()); } } }
-
getData(String path, boolean b, Stat stat)
-
getData(String path, Watcher w, Stat stat)
-
NodeDeleted
:節點刪除 -
NodeDataChange
:節點內容發生變化
getChildren
-
getChildren(String path, boolean b)
-
getChildren(String path, Watcher w)
-
NodeChildrenChanged
:子節點發生變化 -
NodeDeleted
:節點刪除