一 回調基礎知識
znode 可以被監控,包括這個目錄節點中存儲的數據的修改,子節點目錄的變化等,一旦變化可以通知設置監控的客戶端,這個功能是zookeeper對於應用最重要的特性,通過這個特性可以實現的功能包括配置的集中管理,集群管理,分布式鎖等等。
//創建一個Zookeeper實例,第一個參數為目標服務器地址和端口,第二個參數為Session超時時間,第三個為節點變化時的回調方法
ZooKeeper zk = new ZooKeeper("127.0.0.1:2181", 500000,new Watcher() {
// 監控所有被觸發的事件
public void process(WatchedEvent event) {
//dosomething
}
});
將APP1的所有配置配置到/APP1 znode下,APP1所有機器一啟動就對/APP1這個節點進行監控(zk.exist("/APP1",true)),並且實現回調方法Watcher,那么在zookeeper上/APP1 znode節點下數據發生變化的時候,每個機器都會收到通知,Watcher方法將會被執行,那么應用再取下數據即可(zk.getData("/APP1",false,null));
下面表格列出了寫操作與ZK內部產生的事件的對應關系:
event For “/path” |
event For “/path/child” |
|
create(“/path”) |
EventType.NodeCreated |
NA |
delete(“/path”) |
EventType.NodeDeleted |
NA |
setData(“/path”) |
EventType.NodeDataChanged |
NA |
create(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeCreated |
delete(“/path/child”) |
EventType.NodeChildrenChanged |
EventType.NodeDeleted |
setData(“/path/child”) |
NA |
EventType.NodeDataChanged |
而ZK內部的寫事件與所觸發的watcher的對應關系如下:
event For “/path” |
defaultWatcher |
exists |
getData |
getChildren |
EventType.None |
√ |
√ |
√ |
√ |
EventType.NodeCreated |
√ |
√ |
||
EventType.NodeDeleted |
√(不正常) |
√ |
||
EventType.NodeDataChanged |
√ |
√ |
||
EventType.NodeChildrenChanged |
√ |
綜合上面兩個表,我們可以總結出各種寫操作可以觸發哪些watcher,如下表所示:
“/path” |
“/path/child” |
|||||||
exists |
getData |
getChildren |
exists |
getData |
getChildren |
|||
create(“/path”) |
√ |
√ |
||||||
delete(“/path”) |
√ |
√ |
√ |
|||||
setData(“/path”) |
√ |
√ |
||||||
create(“/path/child”) |
√ |
√ |
√ |
|||||
delete(“/path/child”) |
√ |
√ |
√ |
√ |
||||
setData(“/path/child”) |
√ |
√ |
如果發生session close、authFail和invalid,那么所有類型的wather都會被觸發
zkClient除了做了一些便捷包裝之外,對watcher使用做了一點增強。比如subscribeChildChanges實際上是通過exists和getChildren關注了兩個事件。這樣當create(“/path”)時,對應path上通過getChildren注冊的listener也會被調用。另外subscribeDataChanges實際上只是通過exists注冊了事件。因為從上表可以看到,對於一個更新,通過exists和getData注冊的watcher要么都會觸發,要么都不會觸發。
getData,getChildren(),exists()這三個方法可以針對參數中的path設置watcher,當path對應的Node 有相應變化時,server端會給對應的設置了watcher的client 發送一個一次性的觸發通知事件。客戶端在收到這個觸發通知事件后,可以根據自己的業務邏輯進行相應地處理。
注意這個watcher的功能是一次性的,如果還想繼續得到watcher通知,在處理完事件后,要重新register。
二 測試watcher回調機制
// Watcher實例
Watcher wh = new Watcher() {
public void process(WatchedEvent event) {
System.out.println("回調watcher實例: 路徑" + event.getPath() + " 類型:"
+ event.getType());
}
};
ZooKeeper zk = new ZooKeeper("10.15.82.166:3351", 500000, wh);
System.out.println("---------------------");
// 創建一個節點root,數據是mydata,不進行ACL權限控制,節點為永久性的(即客戶端shutdown了也不會消失)
zk.exists("/root", true);
zk.create("/root", "mydata".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("---------------------");
// 在root下面創建一個childone znode,數據為childone,不進行ACL權限控制,節點為永久性的
zk.exists("/root/childone", true);
zk.create("/root/childone", "childone".getBytes(), Ids.OPEN_ACL_UNSAFE,
CreateMode.PERSISTENT);
System.out.println("---------------------");
// 刪除/root/childone這個節點,第二個參數為版本,-1的話直接刪除,無視版本
zk.exists("/root/childone", true);
zk.delete("/root/childone", -1);
System.out.println("---------------------");
zk.exists("/root", true);
zk.delete("/root", -1);
System.out.println("---------------------");
// 關閉session
zk.close();
---------------------
回調watcher實例: 路徑null 類型:None
回調watcher實例: 路徑/root 類型:NodeCreated
---------------------
回調watcher實例: 路徑/root/childone 類型:NodeCreated
---------------------
回調watcher實例: 路徑/root/childone 類型:NodeDeleted
---------------------
回調watcher實例: 路徑/root 類型:NodeDeleted
---------------------
三 永久回調
3類事件觸發wather后就不再作用,也就是所謂的(一次作用),但是,如何永久監聽呢?這需要我們再程序邏輯上進行控制,網上有更好的辦法,但是,在簡單的應用中,可以再wather方法里面再設置監聽,這個方法很笨,但是,很有效,達到了預期的效果。
Watcher wh = new Watcher() { public void process(WatchedEvent event) { Log.AC_DEBUG("觸發回調watcher實例: 路徑" + event.getPath() + " 類型:" + event.getType()); if (event.getType() == EventType.None) { try { // // 判斷userauth權限是否能訪問userpath String auth_type = "digest"; zk.addAuthInfo(auth_type, userauth.getBytes()); zk.getData(userpath, null, null); } catch (Exception e) { // e.printStackTrace(); Log.AC_ERROR("get node faild:userpath=" + userpath + ",auth=" + userauth + " e:" + e.getMessage()); return; } Log.AC_INFO("userpath=" + userpath + " userauth=" + userauth); } try { switchinfo = getallswitch(); // 更新userpath和匿名用戶路徑下的配置信息,監聽這些節點 // 監聽用戶路徑節點 Log.AC_DEBUG("lesson user=" + userpath + " node..."); zk.exists(userpath, true); // 監聽匿名用戶路徑節點 Log.AC_DEBUG("lesson user=" + AnonymousUSERpath + " node..."); zk.exists(AnonymousUSERpath, true); // 監聽用戶路徑下的開關節點 if (zk.exists(userpath, false) != null) { Log.AC_DEBUG("lesson user=" + userpath + " 's swich node..."); List<String> swnodes = zk.getChildren(userpath, true); // // 監聽switch層節點的變化 Iterator<String> it_sw = swnodes.iterator(); while (it_sw.hasNext()) { String swpath = userpath + "/" + it_sw.next(); Log.AC_DEBUG("lesson user=" + swpath + " node..."); zk.exists(swpath, true); } } } catch (Exception e) { e.printStackTrace(); Log.AC_ERROR("lesson znode error:" + e.getMessage()); } } };