4、zookeeper的事件監聽機制


watcher概念

https://zookeeper.apache.org/doc/r3.4.14/zookeeperProgrammers.html#sc_WatchRememberThese

  • zookeeper提供了數據的發布/訂閱功能,多個訂閱者可同時監聽某一特定主題對象,當該主題對象的自身狀態發生變化時例如節點內容改變、節點下的子節點列表改變等,會實時、主動通知所有訂閱者

  • zookeeper采用了 Watcher機制實現數據的發布訂閱功能。該機制在被訂閱對象發生變化時會異步通知客戶端,因此客戶端不必在 Watcher注冊后輪詢阻塞,從而減輕了客戶端壓力

  • watcher機制事件上與觀察者模式類似,也可看作是一種觀察者模式在分布式場景下的實現方式

 

watcher架構

watcher實現由三個部分組成

  • zookeeper服務端

  • zookeeper客戶端

  • 客戶端的ZKWatchManager對象

客戶端首先將 Watcher注冊到服務端,同時將 Watcher對象保存到客戶端的watch管理器中。當Zookeeper服務端監聽的數據狀態發生變化時,服務端會主動通知客戶端,接着客戶端的 Watch管理器會觸發相關 Watcher來回調相應處理邏輯,從而完成整體的數據 發布/訂閱流程

 

 

 

watcher特性

  •  

  • 特性 說明
    一次性 watcher一次性的,一旦被觸發就會移除,再次使用時需要重新注冊
    客戶端順序回調 watcher回調是順序串行執行的,只有回調后客戶端才能看到最新的數據狀態。一個watcher回調邏輯不應該太多,以免影響別的watcher執行
    輕量級 WatchEvent是最小的通信單位,結構上只包含通知狀態、事件類型和節點路徑,並不會告訴數據節點變化前后的具體內容
    時效性 watcher只有在當前session徹底失效時才會無效,若在session有效期內快速重連成功,則watcher依然存在,仍可接收到通知;

watcher接口設計

Watcher是一個接口,任何實現了Watcher接口的類就算一個新的WatcherWatcher內部包含了兩個枚舉類:KeeperStateEventType

 

 

Watcher通知狀態(KeeperState)

KeeperState是客戶端與服務端連接狀態發生變化時對應的通知類型。路徑為org.apache.zookeeper.Watcher.EventKeeperState,是一個枚舉類,其枚舉屬性如下:

  •  

  • 枚舉屬性 說明
    SyncConnected 客戶端與服務器正常連接時
    Disconnected 客戶端與服務器斷開連接時
    Expired 會話session失效時
    AuthFailed 身份認證失敗時

     

Watcher事件類型(EventType)

EventType數據節點znode發生變化時對應的通知類型。EventType變化時KeeperState永遠處於SyncConnected通知狀態下;當keeperState發生變化時,EventType永遠為None。其路徑為org.apache.zookeeper.Watcher.Event.EventType,是一個枚舉類,枚舉屬性如下:

  •  

  • 枚舉屬性 說明
    None
    NodeCreated Watcher監聽的數據節點被創建時
    NodeDeleted Watcher監聽的數據節點被刪除時
    NodeDataChanged Watcher監聽的數據節點內容發生更改時(無論數據是否真的變化)
    NodeChildrenChanged Watcher監聽的數據節點的子節點列表發生變更時
  • 注意:客戶端接收到的相關事件通知中只包含狀態以及類型等信息,不包含節點變化前后的具體內容,變化前的數據需業務自身存儲,變化后的數據需要調用get等方法重新獲取

捕獲相應的事件

上面講到zookeeper客戶端連接的狀態和zookeeperznode節點監聽的事件類型,下面我們來講解如何建立zookeeperwatcher監聽。在zookeeper中采用zk.getChildren(path,watch)、zk.exists(path,watch)、zk.getData(path,watcher,stat)這樣的方式來為某個znode注冊監聽 。

下表以node-x節點為例,說明調用的注冊方法和可用監聽事件間的關系:

注冊方式 created childrenChanged Changed Deleted
zk.exists("/node-x",watcher) 可監控   可監控 可監控
zk.getData("/node-x",watcher)     可監控 可監控
zk.getChildren("/node-x",watcher)   可監控   可監控

注冊watcher的方法

客戶端與服務器端的連接狀態
  • KeeperState:通知狀態

  • SyncConnected:客戶端與服務器正常連接時

  • Disconnected:客戶端與服務器斷開連接時

  • Expired:會話session失效時

  • AuthFailed:身份認證失敗時

  • 事件類型為:None

    

public class ZkConnectionWatcher implements Watcher {
    @Override
    public void process(WatchedEvent watchedEvent) {
        Event.KeeperState state = watchedEvent.getState();
        if(state == Event.KeeperState.SyncConnected){
            // 正常
            System.out.println("正常連接");
        }else if (state == Event.KeeperState.Disconnected){
            // 可以用Windows斷開虛擬機網卡的方式模擬
            // 當會話斷開會出現,斷開連接不代表不能重連,在會話超時時間內重連可以恢復正常
            System.out.println("斷開連接");
        }else if (state == Event.KeeperState.Expired){
            // 沒有在會話超時時間內重新連接,而是當會話超時被移除的時候重連會走進這里
            System.out.println("連接過期");
        }else if (state == Event.KeeperState.AuthFailed){
            // 在操作的時候權限不夠會出現
            System.out.println("授權失敗");
        }
        countDownLatch.countDown();
    }
    private static final String IP = "192.168.133.133:2181"
;
    private static CountDownLatch countDownLatch = new CountDownLatch(1);

    public static void main(String[] args) throws Exception {
        // 5000為會話超時時間
        ZooKeeper zooKeeper = new ZooKeeper(IP, 5000, new ZkConnectionWatcher());
        countDownLatch.await();
        // 模擬授權失敗
        zooKeeper.addAuthInfo("digest1","itcast1:123451".getBytes());
        byte[] data = zooKeeper.getData("/hadoop", false, null);
        System.out.println(new String(data));
        TimeUnit.SECONDS.sleep(50);
    }
}

 

watcher檢查節點

exists

  • exists(String path, boolean b)

  • exists(String path, Watcher w)

  • NodeCreated節點創建

  • NodeDeleted節點刪除

  • NodeDataChanged節點內容

  

public class EventTypeTest {
    private static final String IP = "192.168.133.133:2181";
    private static CountDownLatch countDownLatch = new CountDownLatch(1);
    private static ZooKeeper zooKeeper;

    // 采用zookeeper連接創建時的監聽器
    public static void exists1() throws Exception{
        zooKeeper.exists("/watcher1",true);
    }
    // 自定義監聽器
    public static void exists2() throws Exception{
        zooKeeper.exists("/watcher1",(WatchedEvent w) -> {
            System.out.println("自定義" + w.getType());
        });
    }
    // 演示使用多次的監聽器
    public static void exists3() throws Exception{
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    System.out.println("自定義的" + watchedEvent.getType());
                } finally {
                    try {
                        zooKeeper.exists("/watcher1",this);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    // 演示一節點注冊多個監聽器
    public static void exists4() throws Exception{
        zooKeeper.exists("/watcher1",(WatchedEvent w) -> {
            System.out.println("自定義1" + w.getType());
        });
        zooKeeper.exists("/watcher1", new Watcher() {
            @Override
            public void process(WatchedEvent watchedEvent) {
                try {
                    System.out.println("自定義2" + watchedEvent.getType());
                } finally {
                    try {
                        zooKeeper.exists("/watcher1",this);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }
        });
    }
    // 測試
    public static void main(String[] args) throws Exception {
        zooKeeper = new ZooKeeper(IP, 5000, new ZKWatcher());
        countDownLatch.await();
        exists4();
        TimeUnit.SECONDS.sleep(50);
    }

    static class ZKWatcher implements Watcher{
        @Override
        public void process(WatchedEvent watchedEvent) {
            countDownLatch.countDown();
            System.out.println("zk的監聽器" + watchedEvent.getType());
        }
    }

}

 

 

getData

  • getData(String path, boolean b, Stat stat)

  • getData(String path, Watcher w, Stat stat)

  • NodeDeleted節點刪除

  • NodeDataChange節點內容發生變化

getChildren

  • getChildren(String path, boolean b)

  • getChildren(String path, Watcher w)

  • NodeChildrenChanged子節點發生變化

  • NodeDeleted節點刪除


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM