zookeeper如何永久監聽


一 回調基礎知識  

znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,集群管理,分布式鎖等等。

//創建一個Zookeeper實例,第一個參數為目標服務器地址和端口,第二個參數為Session超時時間,第三個為節點變化時的回調方法
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 500000,new Watcher() {
          // 監控所有被觸發的事件
           public void process(WatchedEvent event) {
     //dosomething
          }

     });
將APP1的所有配置配置到/APP1 znode下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist("/APP1",true)),並且實現回調方法Watcher,那么在zookeeper上/APP1 znode節點下數據發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那么應用再取下數據即可(zk.getData("/APP1",false,null));

下面表格列出了寫操作與ZK內部產生的事件的對應關系:

 

event For /path

event For /path/child

create(/path)

EventType.NodeCreated

NA

delete(/path)

EventType.NodeDeleted

NA

setData(/path)

EventType.NodeDataChanged

NA

create(/path/child)

EventType.NodeChildrenChanged

EventType.NodeCreated

delete(/path/child)

EventType.NodeChildrenChanged

EventType.NodeDeleted

setData(/path/child)

NA

EventType.NodeDataChanged

ZK內部的寫事件與所觸發的watcher的對應關系如下:

event For /path

defaultWatcher

exists
(/path)

getData
(/path)

getChildren
(/path)

EventType.None

EventType.NodeCreated

 

 

EventType.NodeDeleted

 

(不正常)

 

EventType.NodeDataChanged

 

 

EventType.NodeChildrenChanged

     

綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:

 

/path

/path/child

 

exists

getData

getChildren

exists

getData

getChildren

create(/path)

       

delete(/path)

     

setData(/path)

       

create(/path/child)

   

 

delete(/path/child)

   

setData(/path/child)

     

 

如果發生session closeauthFailinvalid,那么所有類型的wather都會被觸發

zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過existsgetChildren關注了兩個事件。這樣當create(/path)時,對應path上通過getChildren注冊的listener也會被調用。另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過existsgetData注冊的watcher要么都會觸發,要么都不會觸發。

getData,getChildren(),exists()這三個方法可以針對參數中的path設置watcher,當path對應的Node 有相應變化時,server端會給對應的設置了watcherclient 發送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件后,可以根據自己的業務邏輯進行相應地處理。

注意這個watcher的功能是一次性的,如果還想繼續得到watcher通知,在處理完事件后,要重新register

 二 測試watcher回調機制

// Watcher實例

Watcher wh = new Watcher() {

public void process(WatchedEvent event) {

System.out.println("回調watcher實例: 路徑" + event.getPath() + " 類型:"

+ event.getType());

}

};

 

ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);

System.out.println("---------------------");

// 創建一個節點root,數據是mydata,不進行ACL權限控制,節點為永久性的(即客戶端shutdown了也不會消失)

zk.exists("/root", true);

zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 在root下面創建一個childone znode,數據為childone,不進行ACL權限控制,節點為永久性的

zk.exists("/root/childone", true);

zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,

CreateMode.PERSISTENT);

System.out.println("---------------------");

// 刪除/root/childone這個節點,第二個參數為版本,-1的話直接刪除,無視版本

zk.exists("/root/childone", true);

zk.delete("/root/childone", -1);

System.out.println("---------------------");

zk.exists("/root", true);

zk.delete("/root", -1);

System.out.println("---------------------");

// 關閉session

zk.close();

---------------------

回調watcher實例: 路徑null 類型:None

回調watcher實例: 路徑/root 類型:NodeCreated

---------------------

回調watcher實例: 路徑/root/childone 類型:NodeCreated

---------------------

回調watcher實例: 路徑/root/childone 類型:NodeDeleted

---------------------

回調watcher實例: 路徑/root 類型:NodeDeleted

---------------------

三 永久回調 

3類事件觸發wather后就不再作用,也就是所謂的(一次作用),但是,如何永久監聽呢?這需要我們再程序邏輯上進行控制,網上有更好的辦法,但是,在簡單的應用中,可以再wather方法里面再設置監聽,這個方法很笨,但是,很有效,達到了預期的效果。

Watcher wh = new Watcher() {
        public void process(WatchedEvent event) {
            Log.AC_DEBUG("觸發回調watcher實例: 路徑" + event.getPath() + " 類型:"
                    + event.getType());

            if (event.getType() == EventType.None) {
                try { //
                        // 判斷userauth權限是否能訪問userpath
                    String auth_type = "digest";
                    zk.addAuthInfo(auth_type, userauth.getBytes());
                    zk.getData(userpath, null, null);
                } catch (Exception e) {
//                    e.printStackTrace();
                    Log.AC_ERROR("get node faild:userpath=" + userpath
                            + ",auth=" + userauth + " e:" + e.getMessage());
                    return;
                }
                Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth);
            }

            try {

                switchinfo = getallswitch(); // 更新userpath和匿名用戶路徑下的配置信息,監聽這些節點

                // 監聽用戶路徑節點
                Log.AC_DEBUG("lesson user=" + userpath + " node...");
                zk.exists(userpath, true); // 監聽匿名用戶路徑節點
                Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node...");
                zk.exists(AnonymousUSERpath, true);

                // 監聽用戶路徑下的開關節點
                if (zk.exists(userpath, false) != null) {
                    Log.AC_DEBUG("lesson user=" + userpath
                            + " 's swich node...");
                    List<String> swnodes = zk.getChildren(userpath, true); //
                    // 監聽switch層節點的變化
                    Iterator<String> it_sw = swnodes.iterator();
                    while (it_sw.hasNext()) {
                        String swpath = userpath + "/" + it_sw.next();
                        Log.AC_DEBUG("lesson user=" + swpath + " node...");
                        zk.exists(swpath, true);
                    }
                }
            } catch (Exception e) {
                e.printStackTrace();
                Log.AC_ERROR("lesson znode error:" + e.getMessage());
            }
        }
    };

 

 


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM