ZK的watch機制


1.watcher原理框架


由圖看出,zk的watcher由客戶端,客戶端WatchManager,zk服務器組成。整個過程涉及了消息通信及數據存儲。

  • zk客戶端向zk服務器注冊watcher的同時,會將watcher對象存儲在客戶端的watchManager。
  • Zk服務器觸發watcher事件后,會向客戶端發送通知,客戶端線程從watchManager中回調watcher執行相應的功能。

有木有看到小紅旗?加入小紅旗是一個watcher,當小紅旗被創建並注冊到node1節點(會有相應的API實現)后,就會監聽node1+node_a+node_b或node_a+node_b。這里兩種情況是因為在創建watcher注冊時會有多種途徑。並且watcher不能監聽到孫節點。注意,watcher設置后,一旦觸發一次后就會失效,如果要想一直監聽,需要在process回調函數里重新注冊相同的 watcher。

2.通知狀態與事件

public class WatcherTest implements Watcher {
@Override
public void process(WatchedEvent event) {
// TODO Auto-generated method stub
WatcherTest w = new WatcherTest();
ZooKeeper zk = new ZooKeeper(wx.getZkpath(),10000, w); 
}

public static void main(String[] args){
WatcherTest w = new WatcherTest();
ZooKeeper zk = new ZooKeeper(wx.getZkpath(), 10000, w);
}
}


上面例子是把異常處理,邏輯處理等都省掉。watcher的應用很簡單,主要有兩步:繼承 Watcher 接口,重寫 process 回調函數。
當然注冊方式有很多,有默認和重新覆蓋方式,可以一次觸發失效也可以一直有效觸發。這些都可以通過代碼實現。
2.1 KeeperStatus通知狀態
KeeperStatus完整的類名是org.apache.zookeeper.Watcher.Event.KeeperState。
2.2 EventType事件類型
EventType完整的類名是org.apache.zookeeper.Watcher.Event.EventType。

此圖是zookeeper常用的通知狀態與對應事件類型的對應關系。除了客戶端與服務器連接狀態下,有多種事件的變化,其他狀態的事件都是None。這也是符合邏輯的,因為沒有連接服務器肯定不能獲取獲取到當前的狀態,也就無法發送對應的事件類型了。
這里重點說下幾個重要而且容易迷惑的事件:

  • NodeDataChanged事件:無論節點數據發生變化還是數據版本發生變化都會觸發,即使被更新數據與新數據一樣,數據版本dataVersion都會發生變化
  • NodeChildrenChanged:新增節點或者刪除節點
  • AuthFailed:重點是客戶端會話沒有權限而是授權失敗

客戶端只能收到服務器發過來的相關事件通知,並不能獲取到對應數據節點的原始數據及變更后的新數據。因此,如果業務需要知道變更前的數據或者變更后的新數據,需要業務保存變更前的數據(本機數據結構、文件等)和調用接口獲取新的數據

3.watcher注冊過程
3.1涉及接口
創建zk客戶端對象實例時注冊:

  • ZooKeeper(String connectString, int sessionTimeout, Watcher watcher)
  • ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly)
  • ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd)
  • ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)

通過這種方式注冊的watcher將會作為整個zk會話期間的默認watcher,會一直被保存在客戶端ZK WatchManager 的 defaultWatcher 中,如果這個被創建的節點在其它時候被創建watcher並注冊,則這個默認的watcher會被覆蓋。注意,watcher觸發一次就會失效,不管是創建節點時的 watcher 還是以后創建的 watcher。
其他注冊watcher的API:

  • getChildren(String path, boolean watch):Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
  • getData(String path, boolean watch, Stat stat):Boolean watch表示是否使用上下文默認的watcher,即創建zk實例時設置的watcher
  • getData(String path, Watcher watcher, AsyncCallback.DataCallback cb, Object ctx)
  • exists(String path, boolean watch)“Boolean watch表示是否使用上下文中默認的watcher,即創建zk實例時設置的watcher
  • exists(String path, Watcher watcher)

舉栗子

 

 

 

 

這就是watcher的簡單例子,zk的實際應用集群管理,發布訂閱等復雜功能其實就在這個小例子上拓展的。
3.2客戶端注冊

這里的客戶端注冊主要是把上面第一點的zookeeper原理框架的注冊步驟展開,簡單來說就是zk客戶端在注冊時會先向zk服務器請求注冊,服務器會返回請求響應,如果響應成功則zk服務端把watcher對象放到客戶端的WatchManager管理並返回響應給客戶端。
3.3服務器端注冊

FinalRequestProcessor:
/**
* This Request processor actually applies any transaction associated with a
* request and services any queries. It is always at the end of a
* RequestProcessor chain (hence the name), so it does not have a nextProcessor
* member.
*
* This RequestProcessor counts on ZooKeeperServer to populate the
* outstandingRequests member of ZooKeeperServer.
*/
public class FinalRequestProcessor implements RequestProcessor

由源碼注釋得知,FinalRequestProcessor類實際是任何事務請求和任何查詢的的最終處理類。也就是我們客戶端對節點的set/get/delete/create/exists等操作最終都會運行到這里。
以exists函數為例子:

case OpCode.exists: {
lastOp = "EXIS";
// TODO we need to figure out the security requirement for this!
ExistsRequest existsRequest = new ExistsRequest();
ByteBufferInputStream.byteBuffer2Record(request.request,
existsRequest);
String path = existsRequest.getPath();
if (path.indexOf('\0') != -1) {
throw new KeeperException.BadArgumentsException();
}
Stat stat = zks.getZKDatabase().statNode(path, existsRequest
.getWatch() ? cnxn : null);
rsp = new ExistsResponse(stat);
break;
}
existsRequest.getWatch() ? cnxn : null此句是在調用exists API時,判斷是否注冊watcher,若是就返回 cnxn,cnxn是由此句代碼ServerCnxn cnxn = request.cnxn;創建的。
/**
* Interface to a Server connection - represents a connection from a client
* to the server.
*/
public abstract class ServerCnxn implements Stats, Watcher

通過ServerCnxn類的源碼注釋得知,ServerCnxn是維持服務器與客戶端的tcp連接與實現了 watcher。總的來說,ServerCnxn類創建的對象cnxn即包含了連接信息又包含watcher信息。
同時仔細看ServerCnxn類里面的源碼,發現有以下這個函數,process函數正是watcher的回調函數啊。

public abstract class ServerCnxn implements Stats, Watcher {
.
.
public abstract void process(WatchedEvent event);
Stat stat = zks.getZKDatabase().statNode(path, existsRequest.getWatch() ? cnxn : null); 
//getZKDatabase實際上是獲取是在zookeeper運行時的數據庫。請看下面
.
.
}

ZKDatabase:

/**
* This class maintains the in memory database of zookeeper
* server states that includes the sessions, datatree and the
* committed logs. It is booted up after reading the logs
* and snapshots from the disk.
*/
public class ZKDatabase
通過源碼注釋得知ZKDatabase是在zookeeper運行時的數據庫,在FinalRequestProcessor的case exists中會把existsRequest(exists請求傳遞給ZKDatabase)。
/**
* the datatree for this zkdatabase
* @return the datatree for this zkdatabase
*/
public DataTree getDataTree() {
return this.dataTree;
}

ZKDatabase里面有這關鍵的一個函數是從zookeeper運行時展開的節點數型結構中搜索到合適的節點返回。
watchManager

  • Zk服務器端Watcher的管理者
  • 從兩個維度維護watcher
  • watchTable從數據節點的粒度來維護
  • watch2Paths從watcher的粒度來維護
  • 負責watcher事件的觸發
class WatchManager {
private final Map<String, Set<Watcher>> watchTable =
new HashMap<String, Set<Watcher>>();

private final Map<Watcher, Set<String>> watch2Paths = new HashMap<Watcher, Set<String>>();
Set<Watcher> triggerWatch(String path, EventType type) { return triggerWatch(path, type, null);}
}

watcher觸發

public Stat setData(String path, byte data[], int version, long zxid,long time) throws KeeperException.NoNodeException {
Stat s = new Stat();
DataNode n = nodes.get(path);
if (n == null) {
throw new KeeperException.NoNodeException();
}
byte lastdata[] = null;
synchronized (n) {
lastdata = n.data;
n.data = data;
n.stat.setMtime(time);
n.stat.setMzxid(zxid);
n.stat.setVersion(version);
n.copyStat(s);
}
// now update if the path is in a quota subtree.
String lastPrefix = getMaxPrefixWithQuota(path);
if(lastPrefix != null) {
this.updateBytes(lastPrefix, (data == null ? 0 : data.length)
- (lastdata == null ? 0 : lastdata.length));
}
dataWatches.triggerWatch(path, EventType.NodeDataChanged); //觸發事件
return s;
}

客戶端回調watcher步驟:

  • 反序列化,將孒節流轉換成WatcherEvent對象。因為在Java中網絡傳輸肯定是使用了序列化的,主要是為了節省網絡IO和提高傳輸效率。
  • 處理chrootPath。獲取節點的根節點路徑,然后再搜索樹而已。
  • 還原watchedEvent:把WatcherEvent對象轉換成WatchedEvent。主要是把zk服務器那邊的WatchedEvent事件變為WatcherEvent,標為已watch觸發。
  • 回調Watcher:把WatchedEvent對象交給EventThread線程。EventThread線程主要是負責從客戶端的ZKWatchManager中取出Watcher,並放入waitingEvents隊列中,然后供客戶端獲取。


免責聲明!

本站轉載的文章為個人學習借鑒使用,本站對版權不負任何法律責任。如果侵犯了您的隱私權益,請聯系本站郵箱yoyou2525@163.com刪除。



 
粵ICP備18138465號   © 2018-2025 CODEPRJ.COM