https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_WatchRememberThese
-
zookeeper提供了數據的發布/訂閱功能,多個訂閱者可同時監聽某一特定主題對象,當該主題對象的自身狀態發生變化時例如節點內容改變、節點下的子節點列表改變等,會實時、主動通知所有訂閱者 -
zookeeper采用了Watcher機制實現數據的發布訂閱功能。該機制在被訂閱對象發生變化時會異步通知客戶端,因此客戶端不必在Watcher注冊后輪詢阻塞,從而減輕了客戶端壓力 -
watcher機制事件上與觀察者模式類似,也可看作是一種觀察者模式在分布式場景下的實現方式
watcher架構
watcher實現由三個部分組成
-
zookeeper服務端 -
zookeeper客戶端 -
客戶端的
ZKWatchManager對象
客戶端首先將 Watcher注冊到服務端,同時將 Watcher對象保存到客戶端的watch管理器中。當Zookeeper服務端監聽的數據狀態發生變化時,服務端會主動通知客戶端,接着客戶端的 Watch管理器會

watcher特性
-
-
特性 說明 一次性 watcher是一次性的,一旦被觸發就會移除,再次使用時需要重新注冊客戶端順序回調 watcher回調是順序串行執行的,只有回調后客戶端才能看到最新的數據狀態。一個watcher回調邏輯不應該太多,以免影響別的watcher執行輕量級 WatchEvent是最小的通信單位,結構上只包含通知狀態、事件類型和節點路徑,並不會告訴數據節點變化前后的具體內容時效性 watcher只有在當前session徹底失效時才會無效,若在session有效期內快速重連成功,則watcher依然存在,仍可接收到通知;
watcher接口設計
Watcher是一個接口,任何實現了Watcher接口的類就算一個新的Watcher。Watcher內部包含了兩個枚舉類:KeeperState、EventType

Watcher通知狀態(KeeperState)
KeeperState是客戶端與服務端連接狀態發生變化時對應的通知類型。路徑為org.apache.zookeeper.Watcher.EventKeeperState,是一個枚舉類,其枚舉屬性如下:
-
-
枚舉屬性 說明 SyncConnected客戶端與服務器正常連接時 Disconnected客戶端與服務器斷開連接時 Expired會話 session失效時AuthFailed身份認證失敗時
Watcher事件類型(EventType)
EventType是數據節點znode發生變化時對應的通知類型。EventType變化時KeeperState永遠處於SyncConnected通知狀態下;當keeperState發生變化時,EventType永遠為None。其路徑為org.apache.zookeeper.Watcher.Event.EventType,是一個枚舉類,枚舉屬性如下:
-
-
枚舉屬性 說明 None無 NodeCreatedWatcher監聽的數據節點被創建時NodeDeletedWatcher監聽的數據節點被刪除時NodeDataChangedWatcher監聽的數據節點內容發生更改時(無論數據是否真的變化)NodeChildrenChangedWatcher監聽的數據節點的子節點列表發生變更時 -
注意:客戶端接收到的相關事件通知中只包含狀態以及類型等信息,不包含節點變化前后的具體內容,變化前的數據需業務自身存儲,變化后的數據需要調用
get等方法重新獲取
捕獲相應的事件
上面講到zookeeper客戶端連接的狀態和zookeeper對znode節點監聽的事件類型,下面我們來講解如何建立zookeeper的watcher監聽。在zookeeper中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)這樣的方式來為某個znode注冊監聽 。
下表以node-x節點為例,說明調用的注冊方法和可用監聽事件間的關系:
| 注冊方式 | created | childrenChanged | Changed | Deleted |
|---|---|---|---|---|
zk.exists("/node-x",watcher) |
可監控 | 可監控 | 可監控 | |
zk.getData("/node-x",watcher) |
可監控 | 可監控 | ||
zk.getChildren("/node-x",watcher) |
可監控 | 可監控 |
注冊watcher的方法
客戶端與服務器端的連接狀態
-
KeeperState:通知狀態 -
SyncConnected:客戶端與服務器正常連接時 -
Disconnected:客戶端與服務器斷開連接時 -
Expired:會話session失效時 -
AuthFailed:身份認證失敗時 -
事件類型為:
None
public class ZkConnectionWatcher implements Watcher { @Override public void process(WatchedEvent watchedEvent) { Event.KeeperState state = watchedEvent.getState(); if(state == Event.KeeperState.SyncConnected){ // 正常 System.out.println("正常連接"); }else if (state == Event.KeeperState.Disconnected){ // 可以用Windows斷開虛擬機網卡的方式模擬 // 當會話斷開會出現,斷開連接不代表不能重連,在會話超時時間內重連可以恢復正常 System.out.println("斷開連接"); }else if (state == Event.KeeperState.Expired){ // 沒有在會話超時時間內重新連接,而是當會話超時被移除的時候重連會走進這里 System.out.println("連接過期"); }else if (state == Event.KeeperState.AuthFailed){ // 在操作的時候權限不夠會出現 System.out.println("授權失敗"); } countDownLatch.countDown(); } private static final String IP = "192.168.133.133:2181" ; private static CountDownLatch countDownLatch = new CountDownLatch(1); public static void main(String[] args) throws Exception { // 5000為會話超時時間 ZooKeeper zooKeeper = new ZooKeeper(IP, 5000, new ZkConnectionWatcher()); countDownLatch.await(); // 模擬授權失敗 zooKeeper.addAuthInfo("digest1","itcast1:123451".getBytes()); byte[] data = zooKeeper.getData("/hadoop", false, null); System.out.println(new String(data)); TimeUnit.SECONDS.sleep(50); } }
watcher檢查節點
exists
-
exists(String path, boolean b) -
exists(String path, Watcher w) -
NodeCreated:節點創建 -
NodeDeleted:節點刪除 -
NodeDataChanged:節點內容
public class EventTypeTest { private static final String IP = "192.168.133.133:2181"; private static CountDownLatch countDownLatch = new CountDownLatch(1); private static ZooKeeper zooKeeper; // 采用zookeeper連接創建時的監聽器 public static void exists1() throws Exception{ zooKeeper.exists("/watcher1",true); } // 自定義監聽器 public static void exists2() throws Exception{ zooKeeper.exists("/watcher1",(WatchedEvent w) -> { System.out.println("自定義" + w.getType()); }); } // 演示使用多次的監聽器 public static void exists3() throws Exception{ zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定義的" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1",this); } catch (Exception e) { e.printStackTrace(); } } } }); } // 演示一節點注冊多個監聽器 public static void exists4() throws Exception{ zooKeeper.exists("/watcher1",(WatchedEvent w) -> { System.out.println("自定義1" + w.getType()); }); zooKeeper.exists("/watcher1", new Watcher() { @Override public void process(WatchedEvent watchedEvent) { try { System.out.println("自定義2" + watchedEvent.getType()); } finally { try { zooKeeper.exists("/watcher1",this); } catch (Exception e) { e.printStackTrace(); } } } }); } // 測試 public static void main(String[] args) throws Exception { zooKeeper = new ZooKeeper(IP, 5000, new ZKWatcher()); countDownLatch.await(); exists4(); TimeUnit.SECONDS.sleep(50); } static class ZKWatcher implements Watcher{ @Override public void process(WatchedEvent watchedEvent) { countDownLatch.countDown(); System.out.println("zk的監聽器" + watchedEvent.getType()); } } }
-
getData(String path, boolean b, Stat stat) -
getData(String path, Watcher w, Stat stat) -
NodeDeleted:節點刪除 -
NodeDataChange:節點內容發生變化
getChildren
-
getChildren(String path, boolean b) -
getChildren(String path, Watcher w) -
NodeChildrenChanged:子節點發生變化 -
NodeDeleted:節點刪除
